Kamaitachi入門(その2)

前回に引き続きperlRTMP実装であるKamaitachiのコードリーディングです。

今回はKamaitachi本体のソースコードを見ていきます。

lib以下のモジュールファイルの構成

[miki@vmware kamaitachi]$ tree lib
lib
|-- Kamaitachi
|   |-- IOStream.pm
|   |-- Packet
|   |   `-- Function.pm
|   |-- Packet.pm
|   |-- Service
|   |   |-- AMFHandler.pm
|   |   |-- AutoConnect.pm
|   |   |-- AutoConnectACL.pm
|   |   |-- Broadcaster.pm
|   |   |-- ChildHandler.pm
|   |   |-- ConnectionHandler.pm
|   |   |-- Core.pm
|   |   |-- NetStreamHandler.pm
|   |   |-- Recorder.pm
|   |   |-- StreamAudienceCounter.pm
|   |   `-- Streaming.pm
|   |-- Service.pm
|   |-- Session.pm
|   `-- Socket.pm
`-- Kamaitachi.pm

3 directories, 18 files


それでは1つずつ見ていきましょう。

Kamaitachi.pm

さて、まずはKamaitachi.pmです。
POD部分などは割愛してコードの部分のみ以下に表示しています。

package Kamaitachi;
use 5.008001;
use Moose;

our $VERSION = '0.03';

use IO::Handle;
use IO::Socket::INET;
use Socket qw/IPPROTO_TCP TCP_NODELAY SOCK_STREAM/;
use Danga::Socket;
use Danga::Socket::Callback;
use Data::AMF;
use Text::Glob qw/glob_to_regex/;

use Kamaitachi::Socket;
use Kamaitachi::Session;

with 'MooseX::LogDispatch';

has port => (
    is      => 'rw',
    isa     => 'Int',
    default => sub { 1935 },
);

has sessions => (
    is      => 'rw',
    isa     => 'ArrayRef',
    default => sub { [] },
);

has parser => (
    is      => 'rw',
    isa     => 'Object',
    lazy    => 1,
    default => sub {
        Data::AMF->new( version => 0 ),
    },
);

has buffer_size => (
    is      => 'rw',
    isa     => 'Int',
    default => sub { 8192 },
);

has services => (
    is      => 'rw',
    isa     => 'ArrayRef',
    default => sub { [] },
);

no Moose;


sub BUILD {
    my $self = shift;

    my $ssock = IO::Socket::INET->new(
        LocalPort => $self->port,
        Type      => SOCK_STREAM,
        Blocking  => 0,
        ReuseAddr => 1,
        Listen    => 10,
    ) or die $!;
    IO::Handle::blocking($ssock, 0);

    Danga::Socket->AddOtherFds(
        fileno($ssock) => sub {
            my $csock = $ssock->accept or return;

            $self->logger->debug(sprintf("Listen child making a Client for %d.", fileno($csock)));

            IO::Handle::blocking($csock, 0);
            setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack('l', 1)) or die;

            my $session = Kamaitachi::Session->new(
                id      => fileno($csock),
                context => $self,
            );
            $self->sessions->[ $session->id ] = $session;

            Kamaitachi::Socket->new(
                handle        => $csock,
                context       => $self,
                session       => $session,
                on_read_ready => sub {
                    my $socket = shift;

                    my $bref = $socket->read( $self->buffer_size );
                    unless (defined $bref) {
                        $socket->close;
                        return;
                    }

                    $session->io->push($$bref);
                    $session->handler->( $session );
                },
            );
        }
    );
}


sub register_services {
    my ($self, @args) = @_;

    local $Text::Glob::strict_wildcard_slash = 0;

    while (@args) {
        my $key   = shift @args;
        my $class = shift @args;

        unless (ref($class)) {
            eval qq{ use $class };
            die $@ if $@;

            $class = $class->new;
        }

        push @{ $self->services }, [ glob_to_regex($key), $class ];
    }
}

sub run {
    my $self = shift;

    Danga::Socket->AddTimer(
        0,
        sub {
            my $poll
                = $Danga::Socket::HaveKQueue ? 'kqueue'
                : $Danga::Socket::HaveEpoll  ? 'epoll'
                :                              'poll';
            $self->logger->debug(
                "started kamaitachi port $self->{port} with $poll"
            );
        }
    );

    Danga::Socket->EventLoop;
}


__PACKAGE__->meta->make_immutable;

前回も触れましたが、KamaitachiはMooseを採用しています。なので前半部分はほとんどプロパティの定義ですね。ここらへんはとりあえずスルーします。

大事なのはそれ以外の3つのメソッド(BUILD, register_services, run )になります。
以下、各々のメソッドを見ていきます。

BUILD()

BUILDというのはMooseで記述するときのコンストラクタ的なもの、だそうです。
まぁあまり深いことは考えず、とりあえずnewのかわりだな、と理解しておきます。

このコンストラクタ内ではDanga::Socketによるソケットの準備を行っています。
「Danga::Socketってなあに?」という方はid:tokuhiromの記事などが参考になります。

Danga::Socket 徹底解説

またid:restrratのDanga::Socket::CallBackというモジュールも理解しておく必要があるでしょう。

Danga::Socket::Callback がクールな件について

Danga::Socketを一言で説明すると「IO多重化をepollやkqueueなどで効率的に管理するための便利なモジュール」と言えばよいかな?

個人的にはPOE大好き人間なのでDanga::Socketは使ったことなかったんですが、慣れてくるとこっちもシンプルでいいかも、と思います。

さて、Danga::Socketを理解したうえでBUILDのついてもう少しみてみましょう。
想定も含めて適当なコメントをつけてみました。

sub BUILD {
    my $self = shift;

  # サーバソケットをつくる
    my $ssock = IO::Socket::INET->new(
        LocalPort => $self->port,
        Type      => SOCK_STREAM,
        Blocking  => 0,
        ReuseAddr => 1,
        Listen    => 10,
    ) or die $!;
    IO::Handle::blocking($ssock, 0);

    # Danga::Socketのオーソドックスな使いかた(?)
    Danga::Socket->AddOtherFds(
        # さっき作ったsocketのファイルディスクリプタを監視対象に登録
        # サーバソケットがたたかれたときに以下の無名関数がキックされるらしい
        fileno($ssock) => sub {

            # acceptすることでクライアントソケットを取得
            my $csock = $ssock->accept or return;

            $self->logger->debug(sprintf("Listen child making a Client for %d.", fileno($csock)));

            IO::Handle::blocking($csock, 0);
            setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack('l', 1)) or die;

            # 独自のセッションオブジェクトを生成
            my $session = Kamaitachi::Session->new(
                id      => fileno($csock),
                context => $self,
            );
            $self->sessions->[ $session->id ] = $session;

            # クライアントの相手をするオブジェクトを生成
            # 実際にはDanga::Socket::CallBackを継承したもの
            Kamaitachi::Socket->new(
                handle        => $csock,
                context       => $self,
                session       => $session,
        # クライアントソケットが読み込み可能になったら以下の無名関数が呼ばれる
                on_read_ready => sub {
                    my $socket = shift;
                    # データ読み込み
                    my $bref = $socket->read( $self->buffer_size );
                    unless (defined $bref) {
                        $socket->close;
                        return;
                    }
                    # kamaitachiの独自セッションオブジェクトにデータを渡していろいろ処理
                    $session->io->push($$bref);
                    $session->handler->( $session );
                },
            );
        }
    );
}

Danga::Socketの使い方はなんとなく理解できましたが、ここで新たな疑問が出てきました。

  • Kamaitachi::Socketってなんだ?
  • Kamaitachi::Sessionってなんだ?

という2点です。


Kamaitachi::Socketはソースをみたらすぐに理解できました。
Kamaitachi独自の拡張を施したDanga::Socket::Callbackでした。
Danga::Socket::Callbackが何であるか理解できればOKですね。だからこれはこれ以上深追いしません。


さて難しそうなのはもうひとつのKamaitachi::Sessionです。

どうやらPacketとIOStreamを駆使してクライアントとのセッションを司るもののようです。

Kamaitachiサーバの基本的な仕組みを理解するためにはSessionとPacketとIOStreamあたりの相互関係を正しく理解する必要がありそうです。ここら辺はまた後日きっちり整理して理解してみます。

とりえあず今はKamaitachi.pmの続きを読み込んで見ます。
BUILDの処理がつかめたので、あとはregister_servicesとrunですね。

register_services()

sub register_services {
    my ($self, @args) = @_;

    local $Text::Glob::strict_wildcard_slash = 0;

    while (@args) {
        my $key   = shift @args;
        my $class = shift @args;

        unless (ref($class)) {
            eval qq{ use $class };
            die $@ if $@;

            $class = $class->new;
        }

        push @{ $self->services }, [ glob_to_regex($key), $class ];
    }
}

サービスクラスを動的にuseして$self->servicesに登録していく処理です。
そういった意味ではやりたい処理はUNIVERSAL::requireとかと同じようなことでしょうか。

URLパスの正規表現的な指定にText::Globとか使っているあたり、関心してしまいました。
これってPlaggerとかで使われてるの?

run()

sub run {
    my $self = shift;

    Danga::Socket->AddTimer(
        0,
        sub {
            my $poll
                = $Danga::Socket::HaveKQueue ? 'kqueue'
                : $Danga::Socket::HaveEpoll  ? 'epoll'
                :                              'poll';
            $self->logger->debug(
                "started kamaitachi port $self->{port} with $poll"
            );
        }
    );

    Danga::Socket->EventLoop;
}

これはロギングのために書いてある処理を無視すれば、単にEventLoopを開始している部分だということがわかりますね。



さて、荒削りではありますが、これでKamaitachi.pmの大まかな流れはつかめました。
このモジュールではDanga::Socketの基本的な使いかたを理解しておく必要がありました。

MooseといいDanga::Socketといい「なんとなくナニモノかは知っているけど実際に触ったことないよ」という人には学習材料としてもKamaitachiはオススメかもしれません。


次回はKamaitachi::Sessionあたりを攻めてみたいと思います。