POEでログをtailするComet

今更ながら、perl(POE)でCometっぽいものを書いてみようと思い挑戦してみました。

「Cometと言えばチャット」というくらい、サンプルや例題のほとんどがチャット関連のネタばかりなので、あえてそういう類いのものではなく、もっと地味なネタでせめてみようと思います。

ズバリ、ログ監視です。

アクセスログをちょっと確認するだけなのに、いちいちコンソールから入ってtail -fするのが面倒」というズボラなひとに向けです。

まず要点を整理しておきますと、

  1. ブラウザからログ監視をする
  2. ログ監視のON/OFFはブラウザから操作可能
  3. Cometの方式はLong-Pollモデルを採用
  4. サーバ側の実装は POE::Component::Server::HTTP と Wheel::FollowTail がメイン
  5. クライアント側はjavascript(Mini Ajax)を使って実装

です。

色々なblogを参考にさせてもらいましたが、ベースはid:dayflowerdaily dayflower - PoCo::HTTP で Comet チャットサーバを作るを参考にさせてもらいました。あとid:kdaibacometネタの記事もかなり参考にさせてもらってます。(お世話になっております)

細かい解説は後に回して、まずはソースをご覧ください。

use POE qw(Component::Server::HTTP Wheel::FollowTail);
use HTTP::Status;
use Scalar::Util qw(refaddr);
use strict;
use warnings;
use Data::Dumper;

my $access_log = '/home/httpd/vhost/default/logs/access_log';
my @logs;
my $content;
my %receiver;

my $server = POE::Component::Server::HTTP->new(
    Port           => 32080,
    ContentHandler => {
        '/'           => \&print_page,
        '/long_poll'  => \&long_poll,
        '/tail_start' => \&tail_start,
        '/tail_stop'  => \&tail_stop,
    },
    PostHandler  => { '/long_poll' => [ \&cleanup ] },
    ErrorHandler => { '/long_poll' => \&cleanup },
);

my $session = POE::Session->create(
    inline_states => {
        _start => sub {
            $_[HEAP]->{next_alarm_time} = int( time() ) + 2;
            $_[KERNEL]->alarm( tick => $_[HEAP]->{next_alarm_time} );
        },
        tick => sub {
            broadcast();
            $_[HEAP]->{next_alarm_time} += 2;
            $_[KERNEL]->alarm( tick => $_[HEAP]->{next_alarm_time} );
        },
        tail_start => sub {
            print "START\n";
            $_[HEAP]->{wheel} = POE::Wheel::FollowTail->new(
                Filename   => $access_log,
                InputEvent => 'got_line',
                SeekBack   => '1024',
            );
        },
        tail_stop => sub {
            print "STOP\n";
            $_[HEAP]->{wheel} = undef;
        },
        got_line => sub {
            push( @logs, $_[ARG0] . "\n" );
        },
    },
);

POE::Kernel->run();

exit;

sub print_page {
    my ( $req, $res ) = @_;

    unless ($content) {
        while (<DATA>) {
            $content .= $_;
        }
    }
    $res->headers->header( Connection => 'close' );
    $res->code(RC_OK);
    $res->headers->header( Pragma => 'no-cache' );
    $res->content_type('text/html; charset=UTF-8');
    $res->content($content);

    return RC_OK;

}

sub long_poll {
    my ( $req, $res ) = @_;

    $receiver{ refaddr $res } = $res;
    $res->headers->header( Connection => 'close' );
    print "poll\n";
    return RC_WAIT;
}

sub tail_start {
    my ( $req, $res ) = @_;
    $poe_kernel->post( $session, "tail_start" );
}

sub tail_stop {
    my ( $req, $res ) = @_;
    $poe_kernel->post( $session, "tail_stop" );

}

sub broadcast {
    my @wlogs = @logs;
    @logs = ();
    for my $res ( values %receiver ) {
        $res->code(RC_OK);
        $res->content_type('text/plain');
        $res->headers->header( CacheControl => 'no-cache' );
        $res->headers->header( Expires      => '-1' );
        my $msg;
        for (@wlogs) {
            $msg .= $_;
        }
        $msg ||= " ";
        $res->content($msg);
        $res->continue();
    }
}

sub cleanup {
    my ( $req, $res ) = @_;
    delete $receiver{ refaddr $res};
}

__DATA__
<html>
<head>
<script type="text/javascript">
<\!--

// MiniAjax
function $(e){if(typeof e=='string')e=document.getElementById(e);return e};
function collect(a,f){var n=[];for(var i=0;i<a.length;i++){var v=f(a[i]);if(v!=null)n.push(v)}return n};
ajax={};
ajax.x=function(){try{return new ActiveXObject('Msxml2.XMLHTTP')}catch(e){try{return new ActiveXObject('Microsoft.XMLHTTP')}catch(e){return new XMLHttpRequest()}}};
ajax.serialize=function(f){var g=function(n){return f.getElementsByTagName(n)};var nv=function(e){if(e.name)return encodeURIComponent(e.name)+'='+encodeURIComponent(e.value);else return ''};var i=collect(g('input'),function(i){if((i.type!='radio'&&i.type!='checkbox')||i.checked)return nv(i)});var s=collect(g('select'),nv);var t=collect(g('textarea'),nv);return i.concat(s).concat(t).join('&');};
ajax.send=function(u,f,m,a){var x=ajax.x();x.open(m,u,true);x.onreadystatechange=function(){if(x.readyState==4)f(x.responseText)};if(m=='POST')x.setRequestHeader('Content-type','application/x-www-form-urlencoded');x.send(a)};
ajax.get=function(url,func){ajax.send(url,func,'GET')};
ajax.gets=function(url){var x=ajax.x();x.open('GET',url,false);x.send(null);return x.responseText};
ajax.post=function(url,func,args){ajax.send(url,func,'POST',args)};
ajax.update=function(url,elm){var e=$(elm);var f=function(r){e.innerHTML=r};ajax.get(url,f)};
ajax.submit=function(url,elm,frm){var e=$(elm);var f=function(r){e.innerHTML=r};ajax.post(url,f,ajax.serialize(frm))};
//

function long_poll(){
	ajax.get('/long_poll',arrive_msg);
}

function arrive_msg(text){
	if (text == '')
		return;
	$('msg').value = $('msg').value + text;
	$('msg').scrollTop = $('msg').scrollHeight;
	long_poll();

}

function tail_start(){
	ajax.post('/tail_start');
}

function tail_stop(){
	ajax.post('/tail_stop');
}

function screen_clear(){
	$('msg').value = "";
}

//-->
</script>
</head>
<body onload="long_poll()">
<textarea id="msg" cols="150" rows="30"></textarea>
<input type="button" value="start" name="start" onclick="tail_start()">
<input type="button" value="stop" name="stop" onclick="tail_stop()">
<input type="button" value="clear" name="clear" onclick="screen_clear()">
</body>
</html>

解説

ソースコードの__DATA__セクションより下はブラウザからアクセスした際の画面表示用HTMLです。別ファイルにした方が管理上はすっきりするのですが、面倒だったので、ここでは手を抜いてソースと一緒にしています。

コードの中身ですが、最初の方でPoco::Server::HTTPとアプリケーションのメインセッションをcreateしています。一般的なPOEの書き方そのものなので詳しい解説は割愛、と言いたいところですが、ちょっと変なことをしているので説明しておきます。ログをtailしてくれるPOE::Wheel::FollowTailですが、これはinline_statesのtail_startというメソッド内で作ってます。こいつが呼び出されるタイミングはブラウザの「start」ボタンを押したときです。また「stop」ボタンを押すとWheelが止まります。というかwheelオブジェクトごとundefするという原始人的な方法をとっていますが。。。(誰かもっとお行儀のいい方法教えてください)

POE::Kernel->run()した後はサブルーチンがいくつかあるだけですが、一応解説しておきます。

■print_page()
ブラウザからアクセスした際の画面を返すサブルーチン。tailの状況を表示するためのtextareaと操作用のボタンがあるだけのものです。
■long_poll()
 javascriptから呼び出されるメソッド。この部分がCometの要です。return RC_WAIT;とすることで掴みっぱなしにします。
■tail_start() / tail_stop()
 ブラウザからの操作(ボタンクリック)で呼び出せるメソッド。POE::Wheel::FollowTailをcreateしたりundefしたりします。
■broadcast()
 掴みっぱなしにしておいたブラウザ達に一気にtailの中身を送りつけます。冒頭で「my @wlogs = @logs;@logs = ();」とベタなことをしてますが、これはPOE::Wheel::FollowTailが溜め込んでおいたtailログデータを作業用にコピーしてその時点でもとの配列を空にしています。なおわざわざ「$msg ||= " ";」としているのは何かしら値がないと$res->contentがうまく動作しなかったからです。
■cleanup()
 アトカタヅケ



perl/POEの部分はこんなところです。

次にjavascript部分ですが、こちらはほとんどヒネリがありません。というか、javascriptはあまり真面目に勉強してないのでよくわかってません。MiniAjaxなるものをもってきて、それで簡単にAjax環境を作っておいて、あとは /long_poll というパスに対して ajax.get して、それらの戻り値を整形して表示するのと、画面上のボタンに対応した関数をおいているだけです。
1点だけ工夫したのはtailの挙動としてスクロールバーがいつも画面最後尾にあるようにすることです。「$('msg').scrollTop = $('msg').scrollHeight;」の部分です。工夫ってほどのモンじゃないですね。。ちなみに動きはFireFoxでしか試してません。


このソースを適当なサーバに上げてブラウザからアクセスすると、サーバのアクセスログが流れてくるはずです。でstopとかstartのボタンを押すとログの流れが止まったり再開したりするはずです。

ちなみに複数のブラウザで同時にアクセスして操作してみると、あるブラウザでの操作が他のブラウザでも再現されます。
(要するにどっかでstopすると他のブラウザでもstopします)


今回のスクリプトでやれたことは

  1. サーバ側のタスク(ログtail処理)とCometのLong-Poll部分はロジック的には分離
  2. ユーザからサーバ側タスクに対して操作が可能
  3. 複数のユーザで状態を共有

ということになります。

ただしこのままでは本質的にはあまりCometのメリットが出ていないと思います。
tailが動いていてもいなくても、3秒おきにtailの結果データをのせてbroadcastし続けているので、結局のところベタなポーリングモデルと一緒。どうせならブラウザから「stop」ボタンを押された際にポーリング自体も止めるようにしないと効率がわるいな。


今後もうちょっと違うサンプルケースを考えてみながらCometの習熟につとめてみようと思う今日この頃です。