POE::Component::RemoteTail 書いた

ここのところ色々と忙しくてブログ更新できなかったんですが、久しぶりに時間がとれたので、ここ最近でCPANにうpしたモジュールをいくつか紹介しようと思います。

さてさて、みなさん、サーバのアクセスログなどをリモートの環境からtailしたい。しかも同時に何台もtailしたい。そんなとき、どうしますか?

「そんなの ssh -A SERVER "tail -f ACCESS_LOG" でいいんじゃね?」

そうですね。それで十分なんですが、このtail -f で流れてくる情報を「ノンブロッキングなイベントループ」の中に取り込むようなプログラミングをしようとすると、実はちょっと工夫が必要になります。

例えば tail -f xxxx > outfile みたいにファイルシステムに書き出しつつ、別のプロセスでそのファイルを非同期に取り込んでイベントループとしてハンドリングする、、とか。

それはそれで、なんだかイケテナイ(よね?)

そこで、それをスマートにPOEで実装するためのPOE::Component::RemoteTailなるものを書きました。

POE::Component::RemoteTail - tail to remote server's access_log on ssh connection.
http://search.cpan.org/~miki/POE-Component-RemoteTail/


基本的な使い方はこんな感じ

use POE::Component::RemoteTail;
  
my ( $host, $path, $user, $password ) = @target_host_info;
my $alias = 'Remote_Tail';
  
# spawn component
my $tailer = POE::Component::RemoteTail->spawn( alias => $alias );
  
# create job
my $job = $tailer->job(
    host          => $host,
    path          => $path,
    user          => $user,
);
  
# prepare the postback subroutine at main POE session
POE::Session->create(
    inline_states => {
         _start => sub {
            my ( $kernel, $session ) = @_[ KERNEL, SESSION ];
            # create postback
            my $postback = $session->postback("MyPostback");
            # post to execute
            $kernel->post( $alias,
                "start_tail" => { job => $job, postback => $postback } );
        },
  
        # return to here
        MyPostback => sub {
            my ( $kernel, $session, $data ) = @_[ KERNEL, SESSION, ARG1 ];
            my $log  = $data->[0];
           my $host = $data->[1];
            ... do something ...;
        },
    },
);
  
POE::Kernel->run();

tailしたいサーバの情報を$jobに格納してそれを引数にしてstart_tailというイベントを呼び出して使います。

また任意のpostbackもセットできるので、返ってきたデータ(tailしたログデータ)をPOEループ環境の中で好きなように料理できます。

利用シーン

で、こいつの実際の利用イメージですが、実は簡易なQueueサーバみたいなものを想定しています。

ちょっと前のブログに書きましたが、例えば商用環境と開発環境があったとして、
現在開発中のサーバに対して、商用環境に流れてきているトラヒックを転送してみる、というような用途に使えます。
(というかそのために作りました。)

たとえば以下のようなQueueサーバをどこかのマシンで起動しておくとします。

use strict;
use warnings;
use POE qw( Component::RemoteTail Component::IKC::Server );

$| = 1;
my @queue;
my $queue_limit = 100000;

# IKC Server
POE::Component::IKC::Server->spawn(
    port => 31338,
    name => 'LogQueueServer',
);

# RemoteTail
my $tailer = POE::Component::RemoteTail->spawn;

my $job1 = $tailer->job(
    # 実際の商用環境のサーバ名とパス
    host => 'webserver_01',
    path => '/home/httpd/vhost/web/logs/access_log',
    user => 'hoge',
);

# Main Session
POE::Session->create(
    inline_states => {
        _start => sub {
            my ( $kernel, $session ) = @_[ KERNEL, SESSION ];
            # IKC Server
            $kernel->alias_set("server");
            $kernel->call( IKC => publish => "server", ["dequeue"] );             
	    # RemoteTail
            my $postback = $session->postback("mypostback");
            $kernel->post( $tailer->session_id(), "start_tail",
                { job => $job1, postback => $postback } );
        },
        dequeue => sub {
            my ($kernel, $request) = @_[KERNEL, ARG0];
            my ($num, $rsvp) = @$request;
            $num ||=10;
            my @log;
            print "\t$#queue\t";
            for(1..$num){
                push(@log, shift @queue);
            }
            print "$#queue\n";
            $kernel->call(IKC => post => $rsvp, \@log);
        },
        mypostback => sub {
            my ( $kernel, $data ) = @_[ KERNEL, ARG1 ];
            my $host = $data->[1];
            my $log  = $data->[0];
            if(@queue > $queue_limit){
                print "it reached queue limit $queue_limit\n";
            }
            else{
                push(@queue, $log);
                print "$#queue\n";
            }
        },
        stop_job => sub {
            my ( $kernel, $job ) = @_[ KERNEL, ARG0 ];
            $kernel->post( $tailer->session_id(), "stop_tail" => $job );
        },
    }
);

# run
POE::Kernel->run();

POE::Component::RemoteTailと同時にPOE::Component::IKC::Serverも使っていますが、
これは別のワーカープロセスに取得したデータをdequeueさせるためのインタフェースを設けるためです。

で、どこか適当なマシンで以下のようなworkerプロセスを起動してやります。

use strict;
use POE qw(Component::IKC::ClientLite Component::Client::HTTP);
use HTTP::Request::FromLog;
use URI;

$| = 1;

# 開発環境サーバ
my $target_host = "develop_server:8080";

# クローラとキューの制限に関する設定
my $dequeue_num        = 20;    # 1回にデキューする件数
my $minimum_queue_num  = 5;     # この数を下回ったらデキューする
my $max_active_crawler = 10;    # クローラ上限数
my $max_yield_num      = 10;    # 一回に突っ込める上限数

my @local_queue;
my $now_crawling;

my $remote = create_ikc_client(
    port    => 31338,
    name    => "Client$$",
    timeout => 10,
);
die $POE::Component::IKC::ClientLite::error unless $remote;

my $log2hr = HTTP::Request::FromLog->new( host => $target_host );

POE::Component::Client::HTTP->spawn(
    Alias   => 'ua',
    Timeout => 10,
);

POE::Session->create(
    inline_states => {
        _start => sub {
            my ( $kernel, $session ) = @_[ KERNEL, SESSION ];
            $kernel->yield("try_dequeue");
        },
        try_dequeue => sub {
            my ( $kernel, $session ) = @_[ KERNEL, SESSION ];

            ## $minimum_queue_num件以下になっていたらデキュー
            if ( @local_queue < $minimum_queue_num ) {
                my $ret =
                  $remote->post_respond( "server/dequeue", $dequeue_num );
                for (@$ret) {
                    push( @local_queue, $_ );
                }
            }

            ## 現在仕事中のクローラが$max_active_crawlerを超えないように制御
            if ( $now_crawling < $max_active_crawler ) {
                my $n = 0;
                for (@local_queue) {
                    $kernel->yield("do_crawl");

                    # 一回にyieldする件数を制限
                    $n++;
                    last if $n == $max_yield_num;
                }
            }
            $kernel->delay_add( "try_dequeue", 1 );
        },
        do_crawl => sub {
            my ( $kernel, $session ) = @_[ KERNEL, SESSION ];
            my $log = shift @local_queue;
            if ($log) {
                $now_crawling++;
                my $http_request = $log2hr->convert($log);
                $kernel->post( "ua", "request", "response", $http_request );
            }
        },
        response => sub {
            my $req = $_[ARG0]->[0];
            my $res = $_[ARG1]->[0];

            print "=" x 80, "\n";
            print $req->as_string;
            print "-" x 80, "\n";
            print $res->as_string;
            print "-" x 80, "\n";
            
	    $now_crawling--;
        },

    }
);

POE::Kernel->run();

なんかガチャガチャしていますが、まずはPOE::Component::IKC::ClientLiteを使ってQueueサーバと通信します。
次にQueueサーバから取得したアクセスログ文字列をHTTP::Request::FromLogを利用してHTTP::Requestオブジェクトに変換します。
それをPOE::Component::Client::HTTPを使ってジャンジャン開発環境に投げる、といったタスクを行っています。

ガチャガチャして見える理由はPOE::Component::Client::HTTPをある程度制限しながら走らせているからです。
こうしないとPOEのイベントキューがすぐに詰まってしまって、というかタイムアウトになってしまうからです。
(どうでもいいことですが、その場合にPoCo::Client::HTTPが 500 server error を出力するんですが、それってウソじゃん・・)

まとめ

はい、まとめます。

これを使うと開発中のシステムに対して「実際のトラヒックを実際のスケールで」浴びせてみることができるので、リリースする前にばっちり検証がとれるようになります。

なので自信をもってリリースできるようになります。リリース行為が怖くなくなります。
つまり「自信がつく → モテる」という構図が完成します。

開発者が想定していないような不可思議なリクエストが商用環境には日々たくさん来ているので、
リリース前にそれらのトラヒックで「鍛えておく」と、いろいろな意味でクオリティが上がってくるかもね。

というわけで、よければ使ってみてください。


あ、そうそう、sshの認証なんですがパスワード認証は使えません。
DSAなりRSAなり鍵を使って ssh -A で通信できる環境で使ってください。