POE::Component::RemoteTail 書いた
ここのところ色々と忙しくてブログ更新できなかったんですが、久しぶりに時間がとれたので、ここ最近でCPANにうpしたモジュールをいくつか紹介しようと思います。
さてさて、みなさん、サーバのアクセスログなどをリモートの環境からtailしたい。しかも同時に何台もtailしたい。そんなとき、どうしますか?
「そんなの ssh -A SERVER "tail -f ACCESS_LOG" でいいんじゃね?」
そうですね。それで十分なんですが、このtail -f で流れてくる情報を「ノンブロッキングなイベントループ」の中に取り込むようなプログラミングをしようとすると、実はちょっと工夫が必要になります。
例えば tail -f xxxx > outfile みたいにファイルシステムに書き出しつつ、別のプロセスでそのファイルを非同期に取り込んでイベントループとしてハンドリングする、、とか。
それはそれで、なんだかイケテナイ(よね?)
そこで、それをスマートにPOEで実装するためのPOE::Component::RemoteTailなるものを書きました。
POE::Component::RemoteTail - tail to remote server's access_log on ssh connection.
http://search.cpan.org/~miki/POE-Component-RemoteTail/
基本的な使い方はこんな感じ
use POE::Component::RemoteTail; my ( $host, $path, $user, $password ) = @target_host_info; my $alias = 'Remote_Tail'; # spawn component my $tailer = POE::Component::RemoteTail->spawn( alias => $alias ); # create job my $job = $tailer->job( host => $host, path => $path, user => $user, ); # prepare the postback subroutine at main POE session POE::Session->create( inline_states => { _start => sub { my ( $kernel, $session ) = @_[ KERNEL, SESSION ]; # create postback my $postback = $session->postback("MyPostback"); # post to execute $kernel->post( $alias, "start_tail" => { job => $job, postback => $postback } ); }, # return to here MyPostback => sub { my ( $kernel, $session, $data ) = @_[ KERNEL, SESSION, ARG1 ]; my $log = $data->[0]; my $host = $data->[1]; ... do something ...; }, }, ); POE::Kernel->run();
tailしたいサーバの情報を$jobに格納してそれを引数にしてstart_tailというイベントを呼び出して使います。
また任意のpostbackもセットできるので、返ってきたデータ(tailしたログデータ)をPOEループ環境の中で好きなように料理できます。
利用シーン
で、こいつの実際の利用イメージですが、実は簡易なQueueサーバみたいなものを想定しています。
ちょっと前のブログに書きましたが、例えば商用環境と開発環境があったとして、
現在開発中のサーバに対して、商用環境に流れてきているトラヒックを転送してみる、というような用途に使えます。
(というかそのために作りました。)
たとえば以下のようなQueueサーバをどこかのマシンで起動しておくとします。
use strict; use warnings; use POE qw( Component::RemoteTail Component::IKC::Server ); $| = 1; my @queue; my $queue_limit = 100000; # IKC Server POE::Component::IKC::Server->spawn( port => 31338, name => 'LogQueueServer', ); # RemoteTail my $tailer = POE::Component::RemoteTail->spawn; my $job1 = $tailer->job( # 実際の商用環境のサーバ名とパス host => 'webserver_01', path => '/home/httpd/vhost/web/logs/access_log', user => 'hoge', ); # Main Session POE::Session->create( inline_states => { _start => sub { my ( $kernel, $session ) = @_[ KERNEL, SESSION ]; # IKC Server $kernel->alias_set("server"); $kernel->call( IKC => publish => "server", ["dequeue"] ); # RemoteTail my $postback = $session->postback("mypostback"); $kernel->post( $tailer->session_id(), "start_tail", { job => $job1, postback => $postback } ); }, dequeue => sub { my ($kernel, $request) = @_[KERNEL, ARG0]; my ($num, $rsvp) = @$request; $num ||=10; my @log; print "\t$#queue\t"; for(1..$num){ push(@log, shift @queue); } print "$#queue\n"; $kernel->call(IKC => post => $rsvp, \@log); }, mypostback => sub { my ( $kernel, $data ) = @_[ KERNEL, ARG1 ]; my $host = $data->[1]; my $log = $data->[0]; if(@queue > $queue_limit){ print "it reached queue limit $queue_limit\n"; } else{ push(@queue, $log); print "$#queue\n"; } }, stop_job => sub { my ( $kernel, $job ) = @_[ KERNEL, ARG0 ]; $kernel->post( $tailer->session_id(), "stop_tail" => $job ); }, } ); # run POE::Kernel->run();
POE::Component::RemoteTailと同時にPOE::Component::IKC::Serverも使っていますが、
これは別のワーカープロセスに取得したデータをdequeueさせるためのインタフェースを設けるためです。
で、どこか適当なマシンで以下のようなworkerプロセスを起動してやります。
use strict; use POE qw(Component::IKC::ClientLite Component::Client::HTTP); use HTTP::Request::FromLog; use URI; $| = 1; # 開発環境サーバ my $target_host = "develop_server:8080"; # クローラとキューの制限に関する設定 my $dequeue_num = 20; # 1回にデキューする件数 my $minimum_queue_num = 5; # この数を下回ったらデキューする my $max_active_crawler = 10; # クローラ上限数 my $max_yield_num = 10; # 一回に突っ込める上限数 my @local_queue; my $now_crawling; my $remote = create_ikc_client( port => 31338, name => "Client$$", timeout => 10, ); die $POE::Component::IKC::ClientLite::error unless $remote; my $log2hr = HTTP::Request::FromLog->new( host => $target_host ); POE::Component::Client::HTTP->spawn( Alias => 'ua', Timeout => 10, ); POE::Session->create( inline_states => { _start => sub { my ( $kernel, $session ) = @_[ KERNEL, SESSION ]; $kernel->yield("try_dequeue"); }, try_dequeue => sub { my ( $kernel, $session ) = @_[ KERNEL, SESSION ]; ## $minimum_queue_num件以下になっていたらデキュー if ( @local_queue < $minimum_queue_num ) { my $ret = $remote->post_respond( "server/dequeue", $dequeue_num ); for (@$ret) { push( @local_queue, $_ ); } } ## 現在仕事中のクローラが$max_active_crawlerを超えないように制御 if ( $now_crawling < $max_active_crawler ) { my $n = 0; for (@local_queue) { $kernel->yield("do_crawl"); # 一回にyieldする件数を制限 $n++; last if $n == $max_yield_num; } } $kernel->delay_add( "try_dequeue", 1 ); }, do_crawl => sub { my ( $kernel, $session ) = @_[ KERNEL, SESSION ]; my $log = shift @local_queue; if ($log) { $now_crawling++; my $http_request = $log2hr->convert($log); $kernel->post( "ua", "request", "response", $http_request ); } }, response => sub { my $req = $_[ARG0]->[0]; my $res = $_[ARG1]->[0]; print "=" x 80, "\n"; print $req->as_string; print "-" x 80, "\n"; print $res->as_string; print "-" x 80, "\n"; $now_crawling--; }, } ); POE::Kernel->run();
なんかガチャガチャしていますが、まずはPOE::Component::IKC::ClientLiteを使ってQueueサーバと通信します。
次にQueueサーバから取得したアクセスログ文字列をHTTP::Request::FromLogを利用してHTTP::Requestオブジェクトに変換します。
それをPOE::Component::Client::HTTPを使ってジャンジャン開発環境に投げる、といったタスクを行っています。
ガチャガチャして見える理由はPOE::Component::Client::HTTPをある程度制限しながら走らせているからです。
こうしないとPOEのイベントキューがすぐに詰まってしまって、というかタイムアウトになってしまうからです。
(どうでもいいことですが、その場合にPoCo::Client::HTTPが 500 server error を出力するんですが、それってウソじゃん・・)
まとめ
はい、まとめます。
これを使うと開発中のシステムに対して「実際のトラヒックを実際のスケールで」浴びせてみることができるので、リリースする前にばっちり検証がとれるようになります。
なので自信をもってリリースできるようになります。リリース行為が怖くなくなります。
つまり「自信がつく → モテる」という構図が完成します。
開発者が想定していないような不可思議なリクエストが商用環境には日々たくさん来ているので、
リリース前にそれらのトラヒックで「鍛えておく」と、いろいろな意味でクオリティが上がってくるかもね。
というわけで、よければ使ってみてください。
あ、そうそう、sshの認証なんですがパスワード認証は使えません。
DSAなりRSAなり鍵を使って ssh -A で通信できる環境で使ってください。