Kamaitachi入門(その3)

ちょっと間が空いてしまいましたが、Kamaitachiのコードリーディング第3弾です。

今回はユーザセッションを管理するKamaitachi::Sessionを見て行きます。

ですがその前に、Kamaitachiについて改めておさらいしますよ。

Kamaitachiとは、typesterさんとid:hide-Kさんが鋭意開発中のperlによるRTMPサーバの実装( =~ Flash Media Server)です。Danga::SocketやMooseといったperl界隈では有名な先進的モジュールを駆使したオープンソースプロダクトです。また話題のgithubで開発が公開されているあたりもニクいですね。

そんなオシャレなKamaitachiに触れてみようと、old typeなperlプログラマであるダウンロードたけし(寅年)が、突然入門日記を書き始めてしまったが、2回書いたところで放置じゃん、というそんな気まずい空気まっただ中なところであります。

ちなみに第1弾ではKamaitachiに同梱されているexampleを動かしてみて基本的な「使い方」を探ってみました。
第2弾ではサーバモジュールの入り口であるKamaitachi.pmを見て行きました。Danga::Socketで全体のイベントループが構成されているということがわかりました。

というわけで、前置きが長くなりましたが、今回はKamaitachi::Sessionを見ていこうと思います。

Kamaitachi::Sessionは各クライアント毎のセッションを管理するモジュールであり、Kamaitachiの中でも中枢的役割を担っているモジュールです(たぶん)。

それではコードを1つずつ見て行きましょう、と言いたいところなのですが、全部を掲載していくと大変なので、まず先におおまかな構成を洗い出してみます。

Kamaitachi::Sessionは

  • クライアントの接続ごとにセッションオブジェクトが生成される
  • まずはクライアントからの接続を確立させる(HandShake)
  • 接続が確立したら、以降はパケットタイプによってサーバ側の各種functionをdispatchする
  • dispatch後の結果データの送信やそもそもの受信データのハンドリングはKamaitachi::IOStreamに任せている

といった特徴があるようです。

それではそれらの特徴について見て行きましょう。

特徴1

クライアントの接続ごとにオブジェクトが生成される、という件については前回にみていったKamaitahic.pmのBUILD()を見るとわかります。

Danga::Socket->AddOtherFdsの中でクライアントのソケット接続毎にIDを付与しKamaitachi::Sessionをnewしています。

sub BUILD {
    my $self = shift;

 (中略)

    Danga::Socket->AddOtherFds(
        fileno($ssock) => sub {
            my $csock = $ssock->accept or return;

     (中略)

            my $session = Kamaitachi::Session->new(
                id      => fileno($csock),
                context => $self,
            );
            $self->sessions->[ $session->id ] = $session;

     (中略)

        }
    );
}

Kamaitachi本体のオブジェクトからは$self->sessions->[$session->id] のようにして指定されています。
ちなみに$session->idはクライアントソケットのファイルディスクリプタとなっているようです。

また生成された$sessionはDanga::Socketがon_read_readyとなった段階で以下のようにして呼び出されています。

  my $bref = $socket->read($self->buffer_size):
  (中略)
  $session->io->push($$bref);
  $session->handler->( $session );

「ソケットから読みとったデータをセッションが保持しているIO領域にプシュし、それをハンドラーで処理する」といった解釈でよさそうです。

しかし$session->handler->($session)というのはやや不思議なメソッドの呼び出し方ですね。
「$session->hander」って固定で書いてあるけど、パケットが届くたびに毎回同じハンドラーが処理するんかい?

これについてSession.pmの中を掘り下げてみると、どうやらhandlerというのは処理段階に応じて変化するコードリファレンスのようです。

「処理段階に応じて変化する」と書きましたが、初期(というかデフォルト)ではsub { &handle_packet_connect } となっています。
つまり「まずは接続確立( HandShake)に関するハンドラがキックされる」と解釈できます。

特徴2

接続確立(HandShake)ですが、こんなことをしています。

sub handle_packet_connect {
    my ($self) = @_;

    my $io = $self->io;
    my $bref;

    $io->read(1) or return $io->reset;

 # IOから1536bytes読み出し$client_handshake_packetとする
    $bref = $io->read(0x600) or return $io->reset;
    my $client_handshake_packet = $$bref;

    # IOのバッファをつめる処理
    $io->spin;

    # 0x03と自分自身のhandshake_packetと上記の$client_handshake_packetをつなげてIOに書き込む
    $io->write(
        pack('C', 0x03) . $self->handshake_packet . $client_handshake_packet
    );

 # 次以降の処理ハンドラをhandle_packet_handshakeに設定しておく
    $self->handler( \&handle_packet_handshake );
}

コメントを適当に書いてみましたが、簡単にいうと「0x03というシングルパケット(End of Textを表す制御文字)」と「クライアントから受け取った1536bytes(0x600)のハンドシェイクパケット」と「自分自身のハンドシェイクパケット」とを「連結して返す」ということをしているようです。

ちなみに「自分自身のハンドシェイクパケット」は以下のようなものでした。

has handshake_packet => (
    is      => 'rw',
    isa     => 'Str',
    default => sub {
        my $packet = q[];
        $packet .= pack('C', int rand 0xff) for 1 .. 0x600;
        substr $packet, 4, 4, pack('L', 0);
        $packet;
    },
);

「1536bytes分のランダムなCharバイナリなパケットをつくり、その中の5バイト目から8バイト目までの32bit分を0でリプレイスさせたもの」ということみたいですね。

うーん、ランダムなバイト列というだけなら理解できるけど、32bit分を0で置き換えているのは何のためなんだろう??よくわかりません。教えてtypester!!

まぁなにはともあれ「自分自身のハンドシェイクパケットはランダムっぽい」ということはわかりました。とりあえず進みます。

先ほど「次以降のハンドラをhandle_packet_handshakeに設定しておく」と書いたところですが、そこも見てみましょう。

sub handle_packet_handshake {
    my ($self) = @_;
    my $io = $self->io;

    my $bref = $io->read(0x600) or return $io->reset;

    $io->spin;

    my $packet = $$bref;

    if ($packet eq $self->handshake_packet) {
        $self->logger->debug(sprintf('handshake successful with client: %d', $self->id));
        $self->handler( \&handle_packet );
        $self->handler->($self);
    }
    else {
        $self->logger->debug(sprintf('handshake failed with client: %d', $self->id));
#        $socket->close;
        $self->handler( \&handle_packet ); # TODO: correct handshake impl!
        $self->handler->($self);
    }
}

ふむ。どうやらクライアントからの更なる応答をチェックしているようですね。
「新たに受け取った1536bytes分のパケットが先ほど作った自分自身のハンドシェイクパケットと同一だったらOK!」ということのようです。

が、いまのところまだエラー時の振る舞いを実装していないみたいです。ここら辺は「まだ鋭意開発中だよ」といった臨場感が伝わってきますね。頑張れtypester!!


ちなみに今までのHandShakeについては以下に簡単な説明がありました。

http://osflash.org/documentation/rtmp#handshake

先にここみてからコード読めば良かった。。。理解するのにすごく苦労した。。。

というか今回のエントリは思いっきりRTMPの領域に入ってきてたんですね!ってことに今頃になって気がつきました。


さてさて頑張って続きを見ていくぞ・・・と思ったんですが、だいぶ長くなってきて、書くのが疲れてきたので、今回のエントリはここまでにしておきます。

次回も(あれば)Kamaitachi::Sessionの続きです。