pybottersのdiscordで話題に挙がったasycio.to_thread()について

全文無料で読めます。
ためになったなーと感じたら、コーヒー一杯分投げ銭いただけると幸いです。

まずasyncioについてざっくりと説明します。単一スレッドにて処理を行いますので、cpuバウンドとioバウンドが混在するとき、GILによってioバウンドはブロッキングされてしまいます。これの何が問題になるかというと、例えばwebsocketに接続してtickerから秒足を作成しながらテクニカル指標に基づき発注を行いたいというときに、指標の計算はcpuバウンドですので計算が終了するまでその他の動作がブロッキングされてしまうということです。

では、どうすればよいのか...?について本記事で触れていきます。

以前pybottersのdiscord内でまちゅけんさんがこのように仰っておりましたので、asyncio.to_thread()の動作を確認してみました。
※to_threadはpython3.9以降で使用できます。3.8以前の方はloop.run_in_executor()を使用することになります。尚、loop.run_in_executor()の使用例については最後の方に参考記事を貼っておりますので、ご確認ください。

下記にコード例を載せておきます。例によってインデントずれが生じているでしょうから、このままコピペしてもうまく動作しないであろう点にご注意ください。

①別スレッドでcpuバウンドの処理を行った場合

def cpu_bound():
    print(f"start cpu_bound at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    for i in range(3):
        my_list = []
        for i in range(10000000):
            my_list.append(i)
    print(f"cpu_bound complete at {time.strftime('%X')}")
    # print(my_list)

async def io_bound():
    for i in range(3):
        print("io_bound start")
        await asyncio.sleep(1.5)
        print("io_bound finish")

async def main():
    print(f"started main at {time.strftime('%X')}")
    await asyncio.gather(
        asyncio.to_thread(cpu_bound),
        io_bound())
    print(f"finished main at {time.strftime('%X')}")

asyncio.run(main())

>>started main at 08:05:41
>>start cpu_bound at 08:05:42
>>io_bound start
>>io_bound finish
>>io_bound start
>>cpu_bound complete at 08:05:44
>>io_bound finish
>>io_bound start
>>io_bound finish
>>finished main at 08:05:46

結果を見てみますと、io処理はブロッキングされずに並列で動いていることがわかります。では、asyncio.to_thread()を使わない場合を確認してみましょう。

②単一スレッドでcpuバウンドの処理を行った場合

async def cpu_bound():
    print(f"start cpu_bound at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    for i in range(3):
        my_list = []
        for i in range(10000000):
            my_list.append(i)
    print(f"cpu_bound complete at {time.strftime('%X')}")
    # print(my_list)

async def test():
    for i in range(3):
        print("test start")
        await asyncio.sleep(1.5)
        print("test finish")

async def main():
    print(f"started main at {time.strftime('%X')}")
    await asyncio.gather(
        cpu_bound(),
        test())
    print(f"finished main at {time.strftime('%X')}")

asyncio.run(main())

>>started main at 08:29:15
>>start cpu_bound at 08:29:15
>>cpu_bound complete at 08:29:17
>>test start
>>test finish
>>test start
>>test finish
>>test start
>>test finish
>>finished main at 08:29:22

以上より、asyncio.to_thread()を使用することで、ioバウンド処理を邪魔せずにcpuバウンド処理を実行できることがわかりました。

ただし、以下のような記事も見かけましたので、スレッドを分けたとしても必ずしもうまくいくとは限らない(?)ようです。
ご自身の状況に合わせてご対応ください。

今回は以上になります。
最後までご覧いただきありがとうございました。

ここから先は

0字

¥ 100

この記事が気に入ったらサポートをしてみませんか?