![見出し画像](https://assets.st-note.com/production/uploads/images/115044794/rectangle_large_type_2_ffb171f05989caa430286364ab5d1eb8.png?width=800)
Pyhonで非同期処理を考える。
まずは"asyncio"を使った処理を考えていきます。Colabを使って実行していこうと思って少しコードを試したときに、エラーが出た時の対処法です。
を参考に
import nest_asyncio
nest_asyncio.apply()
を付け加えればうまくエラーなしで実行されるようになります。
参考
juputerで動かすときの注意点:Colabでも上記の対応以外にこの方法が使えます
「RuntimeError: asyncio.run() cannot be called from a running event loop」
なぜこんなエラーが発生するか?といえば、なんとjuputerそのものがイベントループで動いているらしい。
最初の方に述べたが、「イベントループの中でイベントループを発生させることはできない」からエラーになるのである。
解決策はいくつかあるが、簡単なのは以下のようにasyncio.run()の代わりにawait関数として実行させてやればいい
"asyncio"の使い所
・何が便利?
非同期(処理をしている間、同期して完了を待つのでなく、次の処理を実行するやり方)を実現します。
非同期により、全体の 処理速度を爆上げ できる場合がよくあります。
例えば、外部サービスリクエストや、ファイル・DBへの読み書きなど、I/O関連は時間がかかる割にCPUは空いてたりするので、そこが有効に活用されるようになるわけです。
多くのシステムはI/Oが大量にあったりするので圧倒的です。
・それってスレッドでもできるのでは?
もちろん可能ですが、コンセプトが異なり、スレッドに比べて 圧倒的にお手軽 です。
スレッドと何が違うのかというと、主な違いは、スレッドは複数の処理が同時に走るということです。処理の裏で別の処理が同時に走るため、値の読み/書きでは必ずロックやスレッドセーフを考慮しなければなりません。
一方で asyncio は処理の裏で別の処理が走ることはあり得ません。(I/Oが走ってることはあり得ますけど。)
すべての処理は直列に走ります。走ってる処理が await になると、待ち行列に並んでいる次の処理が走り始めます。
これが最高に良いのです。ロックもスレッドセーフも考慮不要なのですから。
なぜなら、裏で別の処理が走ることはないわけですから、1つの処理を実行している間は(await するまでは)必ずその処理だけが全てを占有できるわけです。占有できるのでロックやスレッドセーフの考慮は不要となり、ロック/スレッドセーフの高コストな処理がない点も高速化に寄与します。それ以上にコードがシンプルになるのでメンテナンスが楽になります。
では逆に、スレッドが有利になる場合はなんでしょうか。複数のCPUがあり、それを各スレッドに割り当てた時ですかね。(プロセスと呼ぶべきか。)
ただ、ロックやスレッドセーフはそれなりにコストもかかるので、本当にCPUを複数使うことで効果が出るかは要件次第です。
デバッグもとても大変なものになりますので、まずは asyncio から検討するのが良いかと思います。
その他、上記サイトでは具体的な使い方として以下紹介されています。
まず基本的なところですが
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
asyncio.run(main())
を実行すると'Hello ...'が出てきて1秒後に'... World!'が出てきます。
await asyncio.sleep(1)
で1秒間待ってから次が実行されます。
他の機能として、sleep、gather、create_task / Task、Task.result、Task.done、TaskGroup、Lock、Semaphore、Event、Queue、PriorityQueue
などサンプルコードをあげて紹介されています。
次にこちらのサイトのコードを使わせていただきColabで実行してみてasyncioの効果を測定します。
まず"requests"を利用してやってみます。
import requests
import time
import pandas as pd
result = []
def get_poke(id):
r = requests.get(f'https://pokeapi.co/api/v2/pokemon/{id}')
result.append([r.json()['id'], r.json()['name']])
def main():
start = time.time()
for poke_id in range(1,152):
get_poke(poke_id)
end = time.time()
print(f"processing time:{end-start}") #処理時間を測定する
main()
print(pd.DataFrame(result,columns=['id','name']).sort_values('id').reset_index(drop=True))
asyncioの効果を測定
実行すると
processing time:25.087804317474365
id name
0 1 bulbasaur
1 2 ivysaur
2 3 venusaur
3 4 charmander
4 5 charmeleon
.. ... ...
146 147 dratini
147 148 dragonair
148 149 dragonite
149 150 mewtwo
150 151 mew
[151 rows x 2 columns]
processing time:25.087804317474365
25秒かかっていることがわかります。
次にasyncio:並行処理でやってみます。
これを実行する前にasyncio版のrequestsである"httpx"を
!pip install httpx
とインストールして以下実行します。
import httpx #asyncio版のrequests
import time
import asyncio
import pandas as pd
result = []
async def get_poke(id):
async with httpx.AsyncClient() as client:
r = await client.get(f'https://pokeapi.co/api/v2/pokemon/{id}')
result.append([r.json()['id'], r.json()['name']])
#コルーチン
async def main():
start = time.time()
tasks = []
#タスクを設定する(151匹分)
for poke_id in range(1,152):
tasks.append(get_poke(poke_id))
#タスク実行
await asyncio.gather(*tasks)
end = time.time()
print(f"processing time:{end-start}") #処理時間を測定する
asyncio.run(main()) #イベントループ作成
print(pd.DataFrame(result,columns=['id','name']).sort_values('id').reset_index(drop=True))
を実行すると
processing time:17.935431003570557
id name
0 1 bulbasaur
1 2 ivysaur
2 3 venusaur
3 4 charmander
4 5 charmeleon
.. ... ...
146 147 dratini
147 148 dragonair
148 149 dragonite
149 150 mewtwo
150 151 mew
[151 rows x 2 columns]
processing time:17.935431003570557
17秒となりました。参考サイトでは3秒となっていましたがColabの環境では少し時間がかかるようですが、asyncioを使うことで短縮できました。
この記事が気に入ったらサポートをしてみませんか?