Kamaitachi入門

Kamaitachiとは、typesterさんが鋭意開発中のperlによるRTMPサーバの実装( =~ Flash Media Server)です。

RTMPサーバのオープンソース実装としてはRed5やWowza、RubyIzumiなどが知られていますが、「perl使いだったらKamaitachiだよ!」ということで、年の瀬の忙しい中、家族の冷たい視線に刺されつつKamaitachiに入門してみようと思います。

Kamaitachi とは

まずは周辺情報の整理から。

Kamaitachiの紹介サイト
 http://bm11.kayac.com/project/kamaitachi/

作っている人
 id:typester
 id:hide-K

 typesterさんのブログ http://unknownplace.org/memo/search?q=kamaitachi

ソース
 http://github.com/typester/kamaitachi
 http://www.cpan.org..

IRC
 kamaitachi@chat.freenode.net

shibuya.pmでのプレゼン
 http://svn.coderepos.org/share/docs/typester/shibuyapm-10/start.html

サンプルコードを追っかけてみる

さて、さっそくコードを追っかけてみようと思います。

まずはKamaitachi本体ではなくサンプルコードのソースから見て行きます。githubもしくはCPANからソースを持ってきます。

http://github.com/typester/kamaitachi/tree/master


持ってきたコードのなかにexampleというディレクトリがあるので、その中を見て行きます。

サーバ本体としてserver.plというスクリプトがありました。

#!/usr/bin/env perl
 
use strict;
use warnings;
 
use FindBin::libs;
use lib "$FindBin::Bin/lib";
 
use Kamaitachi;
 
my $kamaitachi = Kamaitachi->new;
 
# setup record output directory
use Service::LiveStreamingRecorder;
 
use Path::Class qw/dir/;
my $dir = dir($FindBin::Bin, 'streams');
$dir->mkpath unless -d $dir;
 
$kamaitachi->register_services(
    'rpc/echo' => 'Service::Echo',
    'rpc/chat' => 'Service::Chat',
    'stream/live' => 'Service::LiveStreaming',
    'stream/rec' => Service::LiveStreamingRecorder->new( record_output_dir => $dir ),
);
 
$kamaitachi->run;

いろいろな処理をしているように見えますが、要するに

my $kamaitachi = Kamaitachi->new;

$kamaitachi->register_services(
	いろいろなサービスを登録
);

$kamaitachi->run;

というのが主な流れですね。
レジストするサービスは別のモジュールとして自由に書けばよいようです。

では実際のサービスモジュールはどうなっているのか?

今回は一番手頃なところで、Service::Chatを見て行くことにします。

Service::Chatはこんなコードでした。

package Service::Chat;
use Moose;
 
extends 'Kamaitachi::Service';
 
with qw/Kamaitachi::Service::AutoConnect
        Kamaitachi::Service::Broadcaster
       /;
 
sub on_invoke_send {
    my ($self, $session, $req) = @_;
 
    my $msg = $req->args->[1];
    my $res = $self->broadcast_notify_packet( onMessage => $msg );
 
    $self->broadcast( $session => $res );
 
    return $req->response; # return null response
}
 
__PACKAGE__->meta->make_immutable;

お、Moose使ってますね。むむむ、むうすかぁ。。個人的に「なんとなく敬遠」モードだったんですが、ここで会ったが100年目、この機会にちょっとかじってみるかぁ。

さてさて、ソースコードの方にもどりますが、Kamaitachi::Serviceを継承しつつ、そのほかにもAutoConnectとBroadcasterなるものをインプリメントしてますね。ここらへんは後からKamaitachi本体のコードを追っかけてみます。

このクラス(Service::Chat)としてやっていることは on_invoke_send なる関数を書いてるだけですね。えらくシンプルです。

ではこれはどこからどのように呼ばれるのかな?と思って調べてみたところ、

  • lib/Kamaitachi/Session.pmの中に'on_invoke_'とfunc_nameを連結する記述があった
  • example/client/chat/chat.asの中のfunction send()のなかでnc.call() している部分の第1引数が'send' だった

という2点から、どうやら「actionscript中のnc.callの第1引数で指定した文字列と'on_invoke’を連結させた文字列」でサーバ側に関数を用意しておくとRPC的にそこが叩かれる」という仕組みであると推測されます。

クライアント側のasスクリプトはこんな感じでした。as3じゃなくてas2なんですね。

import flash.net.*;
import flash.events.*;

private var nc:NetConnection;

private function init():void {
    nc = new NetConnection();
    nc.addEventListener(NetStatusEvent.NET_STATUS, status_handler);
    nc.objectEncoding = ObjectEncoding.AMF0;
    nc.client = this;
    nc.connect("rtmp:/rpc/chat");
}

private function status_handler(event:NetStatusEvent):void {
    switch (event.info.code) {
    case "NetConnection.Connect.Success":
        setStatus("Connected.");
        break;
    default:
        setStatus(event.info.code);
    }
}

private function setStatus(text:String):void {
    status.text = text;
}

private function send():void {
    if (!input.text) return;
    if (input.length >= 200) return; // ignore long input ;)

    nc.call("send", new Responder(function ():void {}), input.text);
    onMessage(input.text);

    input.text = "";
}

public function onMessage(message:String):void {
    result.text = message + "\n" + result.text;
}

init()の中でNetConnectionオブジェクトを作りnc.connect("rtmp:/rpc/chat")とすることでKamaitachiサーバと接続しています。あと特筆すべきこととしてnc.objectEncoding = ObjectEncoding.AMF0としている箇所がありますが、KamaitachiではAMF3はまだ対応していないのでここではAMF0としているようです。(正確にはKamaitachiがというよりData::AMFがAMF3未対応だから、と言う方が正しいか?)


さて次にサーバからのブロードキャスト部分を見て行きます。

Kamaitachi本体側のlib/Kamaitachi/Service/Broadcaster.pmの中を覗いてみました。

一部抜粋

sub broadcast_notify_packet {
    my ($self, $method, $data) = @_;

    my $notify_packet = Kamaitachi::Packet::Function->new(
        number => 3,
        type   => 0x14,
        id     => undef,
        method => $method,
        args   => [undef, $data],
    );
}

一部抜粋

sub broadcast {
    my ($self, $session, $packet) = @_;

    for my $child_session (@{ $self->child }) {
        next unless defined $child_session;
        next if $session->id eq $child_session->id;
        $child_session->io->write($packet);
    }
}

というわけで、Broadcaster.pmでは

  • broadcast_notify_packetでパケットオブジェクトをつくり
  • broadcastで接続されているすべてのセッションのIOStreamに対してそのパケットを送り込む

という手順を踏んでブロードキャストを実現しているようです。

またパケットを作る時にmethodを指定していますが、これはアクションスクリプト側の関数名を指定するようです。(このチャットアプリの例だとonMessageという関数名を指定してパケットをつくっています)

まとめ

さて、これで一応クライアントとサーバ間の流れがつながったことになります。
もう一度整理してみます。

  • サーバ生成時に任意のサービスモジュールを登録する
  • クライアント(actionscript)からはNetConnection.callでサーバを呼び出す
  • 呼び出されるサーバ側ではon_invoke_xxxという名前で関数を用意しておく
  • サーバからクライアントに何か送る際は(今回だと「ブロードキャスト」)応答パケット生成時にactionscript側のコールバック関数を指定する

ということがわかりました。


次回以降、Kamaitachi本体のコードに突入していきます!