Kamaitachi入門(その4)

Kamaitachi コードリーティングの4回目です。

前回はKamaitachi::Sessionで接続確立の処理(HandShake)をすることろまで読みすすめました。今回はKamaitachi::Sessionの続きです。

HandShakeが成功するとSessionのhandlerに&handle_packetという関数が登録されます。
これにより次回以降のパケット受信時にはこの関数が呼び出されることになりました。

さて、それではhandle_packetでは何をするのでしょうか。

sub handle_packet {
    my ($self) = @_;

    while (my $packet = $self->io->get_packet) {
        next if $packet->type == 0x14 and $packet->size > bytes::length($packet->data);

        my $name = $self->packet_names->[ $packet->type ] || 'unknown';

        if ($name eq 'packet_invoke') {
            $self->packet_invoke($packet);
        }
        else {
            $self->dispatch( "on_$name", $packet );
        }
    }
}

まずはioからパケットを取得しています。そしてループの先頭で「無視する条件」を定義しています。
「パケットタイプが0x14(Invoke)だけどパケットサイズがパケットデータより大きい場合はnext」ということでしょうか。うーん、いまいちわかりません。

ちなみにパケットタイプ0x14について調べてみたところ、http://osflash.org/documentation/rtmp#rtmp_datatypes に「like remoting call, used for stream actions too. 」と書いてありました。要するにFlash RemotingのようなRPC呼び出しのパケットだと思われます。

とりあえず進みます。

ループの中のおいて、もしもパケットタイプがInvokeであればpacket_invokeというメソッドに引き継がれます。それ以外のパケットであればdispatchメソッドに引き継がれます。このとき引数にて'on_'を頭に付けた文字列が指定されています。


それでは引き継ぎ先であるpacket_invokeとdispatchを見てみましょう。

まずは先にdispatchを見てみます。

sub dispatch {
    my ($self, $name, @args) = @_;
    my $service = $self->service or return;

    if ($service->can($name)) {
        return $service->$name( $self, @args );
    }
    return;
}

$serviceというのはKamaitachiサーバを起動する時にregiseter_servicesで登録されたサービスモジュールのことです。

該当するモジュールが渡された$nameというメソッドを持っていればそれを実行してreturnしています。ここは単純な処理になっていますね。

次にpacket_invokeです。

sub packet_invoke {
    my ($self, $packet) = @_;

    my $func_packet = $packet->function or return;

    $self->logger->debug(sprintf('[invoke] -> %s', $func_packet->method));

    if ($func_packet->method eq 'connect') {
        my $connect_info = $func_packet->args->[0];
        for my $service ( @{$self->context->services} ) {
            if ($connect_info->{app} =~ $service->[0]) {
                $self->service( $service->[1] );
                $self->dispatch( on_connect => $packet );
                last;
            }
        }

        unless ($self->service) {
            my $res = $func_packet->response(undef, {
                level       => 'error',
                code        => 'NetConnection.Connect.InvalidApp',
                description => '-',
            });
            $self->io->write( $res );
            return;
        }
    }

    my $res = $self->dispatch('on_invoke_' . $func_packet->method, $func_packet );
    if (defined $res and (ref($res) || '') =~ /^Kamaitachi::Packet/) {
        $self->io->write( $res );
    }
}

ここはremotingなどのRPC呼び出しの際に叩かれるメソッドです。
なのでまず$packet->functionとすることでRTMPのファンクションパケット(要するにAMFオブジェクト?)を取り出します。

そして「connect」という関数呼び出しである場合のみ、当該サービスモジュールのon_connectをdispatchします。

$connect_info->{app} =~ $service->[0] の部分など、少し直感的にはわかりづらい実装になっていますが、まぁ要するに「当該サービスモジュールを探してそいつにdispatchするよ」ということのようです。

さらにconnectが呼び出される場合においては $self->dispatch('on_invoke_' . $func_packet->method, $func_packet )という行もあるので、ここでもさらにon_invoke_connectをdispatchすることになります。

このconnectというRPC呼び出しですが、actionscript側でサーバに接続するためにまずNetConnectionオブジェクト(nc = new NetConnection )を作ってからnc.connect("rtmp:/xxxx")としますが、おそらくその部分のことです。

なのでconnectについてはKamaitachi側ではon_connectとon_invoke_connectの2つをdispatchしていることになりますね。なんで2重に待ち構えるような構成にしてるんだろ?なにか意味があるような気がしますが、よくわかりません。

なお、connect以外の関数呼び出しであれば$self->dispatch('on_invoke_' . $func_packet->method, $func_packet )だけが呼ばれます。

最後に有効な応答パケットがあれば$self->io->write($res)しています。

まとめ

これでKamaitachi::Sessionの主立った処理はリーディングできました。

Kamaitachi::Sessionを読んでみての感想ですが、Sessionの役割の中にPacketとかServiceとかが織り交ぜられているので、流れを正しく理解するためにはこれらの相互関係をしっかり解ってないと混乱するな、と思いました。

というわけで、次はKamaitachi::Packetあたりを読んでみたいと思います。