selectorsでsocket通信をイベント駆動に(Python)

 socket通信を実装する時にselectorという仕組みを使うと少し実装が楽になります。selectorssはsocketやfileなどのIO系のストリームの処理を分配してくれるご機嫌な仕組みです。selectorsにサーバーsocketを登録すると、受信が起こった際に指定のコールバック関数を呼んでくれます。

 実際にコードを見ながら動きを検討していきましょう。

selectorsを使ったサンプルサーバー

 まずはクライアントから送られてくる文字列をサーバー側のコンソールに表示するシンプルなサーバーをselectorsを使って作ってみます:

import selectors
import socket

def accept( sv ):
    client, addr = sv.accept()  # ブロッキングしていないので即受信
    data = client.recv( 2048 )
    client.close()

    print( "accepted", addr[0], ":", data.decode("utf-8") )

sv = socket.socket()
sv.bind( ( "192.168.11.16", 15769 ) )
sv.listen( 100 )
sv.setblocking( False )   # ブロッキングしない!

# selectorsにサーバーsocketとコールバックを登録
sel = selectors.DefaultSelector()
sel.register( sv, selectors.EVENT_READ, accept )

# selectorsの監視開始
while True:
    events = sel.select()   # イベント発生待ち
    for key, mask in events:
        key.data( key.fileobj )

 サーバー用のsocketオブジェクトはいつも通りにlistenまで済ませます。普段と違うのはsetblocking関数にFalseを渡してsocketオブジェクトをノンブロッキングにする点です。この意味は後で分かります。

 次にselectorsモジュールが持つDefaultSelector関数を呼び出してselectorsオブジェクトを一つ作ります。特にこだわりが無ければDefaultSelectorで十分です。続いてselector.register関数にサーバー用のsocketオブジェクト等を登録します:

sel.register( sv, selectors.EVENT_READ, accept )

 第1引数に管理してもらいたいsocketオブジェクト(sv)を渡します。第2引数は起こすイベントの種類(イベントマスク)を指定します。サーバーsocketの場合受信したデータを読むのでselectors.EVENT_READを指定します。書き込みするファイルなどはselectors.EVENT_WRITEを指定する事もあります。読み書きする場合はORで指定する事も出来ます。第3引数には受信があった時に呼んでもらいたいコールバック関数を渡します。これは必須です。

 管理してもらいたいsocketオブジェクトを登録したら、selectors.select()関数を呼び出します。するとここで登録したsocketオブジェクトがlistenしているポートに受信情報が入るまでブロッキングで待機してくれます。

 対応するポートに受信情報が飛び込んで来たらイベントが発火し、select関数を抜けます。戻り値のeventsには対応するsocketオブジェクト等の情報がごそっと返ります:

    events = sel.select()   # イベント発生待ち
    for key, mask in events:
        key.data( key.fileobj )

 一つのイベントは(key, eventmask)のタプルになっているのですが、イベントは複数同時に発火する場合もあるため、events自体はタプルのリストになっています。それをfor文で回して一つずつ処理します。key.dataにはselectors.register関数の第3引数に与えたコールバック関数(accept関数)があります。そしてkey.fileobjには第1引数に与えたsocketオブジェクトが格納されています。maskには今回の場合だとselectors.EVNET_READの値(1)が格納されていますが、特に使わないのでコールバックには渡していません。

 上のkey.dataはaccept関数なので、以後はaccept関数での処理になります:

def accept( sv ):
    client, addr = sv.accept()  # ブロッキングしていないので即受信
    data = client.recv( 2048 )
    client.close()

    print( "accepted", addr[0], ":", data.decode("utf-8") )

初っ端でsv.accept()としています。受信データの有無を確認しています。ここ、ノンブロッキングです。これがポイントで、selectors.select()でsocketのイベントが発火したというのは、受信データがある事の保障になっているんですね。なのでノンブロッキングに設定したわけです。かくしてclientとaddrに有効な値が返えり、後は受信データを受け取って処理する流れに進めます。

selectorsを使う理由はイベント駆動のスッキリさ

 上のselectorsを使ったsocketサーバは一見すると余計なプロセスが一つ入っているように見えます。selectorsを使わないsocketサーバは、プロセスだけ羅列するなら、

sv.listen( 100 )             # 受信開始
client, addr = sv.accept()   # 受信待機
data = client.recv( 2048 )   # 受信データコピー

というプロセスだけなのですが、selectorsが入ると、

sv.listne( 100 )
key, mask = sel.select()   # selectorsの待機
accept( sv ):
   client, addr = sv.accept() # 受信待機(ノンブロッキング)
   data = client.recv( 2048 ) # 受信データコピー

こんな感じでselectorsによる待ち受けが挟まっています。なんでわざわざこんな事をするのか?これは上のsocketのみの方が「逐次処理」なのに対し、下のselectorsを使った方がコードの流れを「イベント駆動」に落とし込めるからです。

 例えば3つのポートを同時に監視して、各ポートそれぞれの情報を別処理したい場合を考えます。逐次処理側でこれをやろうと思うと中々大変ですaccept関数をブロッキングすると3つ同時に監視は出来ません。スレッドに分けるなど対処が必要になります。ノンブロッキングにした場合も下手に書くと、

while True:
   for i in range( serverNum ):
      try:
         client, addr = sv[ i ].accept()   # ノンブロッキング
      except:
         continue
      callbacks[ i ]( client, addr )

こういう感じに大ループの中で各サーバーsocketのaccept()を確認して、受信があればそれに対応した処理関数を配列で呼んで…、みたいなコードになりかねません。しかも上の大ループはビジーループなので無駄処理の負荷ばかり高くなります。

 これをselectorsで書くと、

sel.register( sv1, selectors.EVENT_READ, callback1 )
sel.register( sv2, selectors.EVENT_READ, callback2 )
sel.register( sv3, selectors.EVENT_READ, callback3 )

# selectorsの監視開始
while True:
    events = sel.select()   # イベント発生待ち
    for key, mask in events:
        key.data( key.fileobj )

このようにサーバーソケットをselectorsに複数登録して、後は受信について監視を代行してもらうだけなんです。しかもselectorsのイベント発生待ちはビジーループでは無い為CPU負荷も非常に低いです。これによりsocketを使う方はイベントハンドラとなるコールバック関数の中の実装に集中できます。また、後々でさらに別のポート監視を追加する事になったとしても1行加えて終わりです。非常にスッキリしています。

 という事で、selectorsを使う事はコードの流れをイベント駆動にする利点があり、使わない手はありません。

終わりに

 今回はselectorsとsocketを組み合わせてイベント駆動なサーバーを作るサンプルを見てみました。selectorsが発火したイベントに対応した関数をコールしてくれる事により、コードの流れがイベント駆動になり、プログラマはハンドラとなる関数の中を書く事に集中できるようになります。selectors.registerはもちろんどこででも登録できるため、新しいイベントを追加するのも簡単です。例えばサーバーで受信した後にクライアントに返答するsocketもイベントとして登録すれば、コールバックするサーバーもスッキリ書けます。

 selector.select()自体はブロッキング関数です(タイムアウトを指定できます)。なのでここはサブスレッド内で回すのが常套手段かなと思います。そうするとメインの裏で勝手にイベントが発火して対応するハンドラ関数が呼ばれ処理されていくリモートアプリケーションになっていきます。ちょっとワクワクしますよね。

ではまた(^-^)/

この記事が気に入ったらサポートをしてみませんか?