並行バッチ処理パッケージの話
PyPIへ公開した経緯
複数の処理を関連づけて一括で実行することを「バッチ処理」と呼びますが、これをpythonで気軽に並行処理で実行させるためのsimplejobというパッケージをPyPIへ公開しました。
元々は数年前にPythonの勉強がてらに作ったCommandLineExecutorというソース・コードだったのですが、pipでインストールできるパッケージを作成するという個人的な勉強のためにリファクタリングしました。
大規模開発だと数時間かかるような夜間バッチなどを回すことになるので、JP1やSystemWalker、Airflowなどのジョブ管理ツールを導入し、ジョブの実行状況をリアルタイム監視しながら運用することになりますが、短時間の随時バッチ処理を扱うためだけにツールを導入するのは大げさすぎます。
Pythonには豊富なパッケージがあるので、バッチ処理を記述するパッケージくらいあるだろうと思い検索してみたのですが、workflowやprefect、Luigiなどメソッド・ベースで定義するものが多い印象を受けました。
複数の実行ファイルを組み合わせるパッケージはpycommandoしか見つけられませんでした。pycommandoはシリアル実行にしか対応していないので、ノード・グラフにしたがって実行ファイルを並行処理できるsimplejobには需要があるかもしれないと思い、PyPIへ公開することにしたのです。
simplejobにできること
simplejobにできることは多くありません。その名の通りシンプルなバッチ処理のみを扱うことができます。たとえば、次のようなノード・グラフにしたがってジョブを実行させることができます。
ノードグラフの定義
ジョブの情報はノード・グラフで定義します。リスト型のデータを使って個々のジョブの情報を記載し、待ち受けるジョブがあればwaitsへ記述します。
上記のノード・グラフであれば、次のように定義します。各ジョブにIDを振り、waitsで待ち合わせるジョブIDを指定します。いきなりジョブを開始させたい場合はwaitsを指定しません。
jobContexts = [
{ "id": "hoge", "commandLine": r"C:\temp\bin\hoge.exe" },
{ "id": "piyo", "commandLine": r"D:\work\piyo.bat --output piyo_result.txt", "waits": [ "hoge" ] },
{ "id": "fuga", "commandLine": r"py C:\temp\python\fuga.py", "waits": [ "hoge" ] },
{ "id": "moga", "commandLine": r"C:\temp\bin\moga.exe", "waits": [ "piyo", "fuga" ] },
{ "id": "foo", "commandLine": r"D:\worok\foo.exe" },
{ "id": "bar", "commandLine": r"C:\temp\bin\foo.exe", "waits": [ "moga", "foo" ] },
]
これをSimpleJobManager.entry()に食わせると、各ジョブがスレッド・オブジェクトとして生成されます。
jobManager = SimpleJobManager()
jobManager.entry(jobContexts)
スレッドは、待ち受けるジョブを持たないか、あるいは待ち受けているすべてのジョブが完了した状態になったものから実行されていきます。ジョブのキックと監視はループ処理を組んで行います。
ジョブの監視
ジョブの監視はループ処理で行います。すべてのジョブが完了するか、途中のジョブでエラーが発生するまでループさせます。
ジョブの監視ループは次のように書くことができます。
while True:
jobManager.runAllReadyJobs()
if jobManager.errorOccurred():
print("error occurred")
jobManager.join()
break
if jobManager.completed():
break
print(jobManager.getRunningStatus())
time.sleep(1)
ジョブを監視するためのループ内では、実行な状態にあるジョブをスタートさせるためにSimpleJobManager.runAllReadyJobs()を呼び続ける必要があります。SimpleJobManager.runAllReadyJobs()は実行可能な状態になったすべてのジョブをスタートさせます。
途中でエラーが発生したかどうかを確認するためのメソッドはSimpleJobManager.errorOccurred()です。SimpleJobManager.errorOccurred()は途中でエラーが起きた場合にTrueを返します。
途中でエラーが起きた場合は、他のジョブがまだ実行中である可能性があるため、SimpleJobManager.join()を呼び出し、スレッドの実行が完了するまでを待つ必要があります。
すべてのジョブが完了したかどうかを確認するためのメソッドはSimpleJobManager.completed()です。SimpleJobManager.completed()はすべてのジョブが完了した場合にTrueを返します。
SimpleJobManager.getRunningStatus()を呼び出すと、呼び出し時点のジョブ・ステータスを取得できます。たとえば、2つのジョブが実行中であれば{ "Running": 2 }という値が返ってきます。バッチの進捗状況などを表示したい場合に便利です。
ジョブ・ステータスをリアル・タイムで知る必要がないのであれば、SimpleJobManager.run()を呼び出すといいでしょう。監視ループをラッピングしているため、1行で記述できます。SimpleJobManager.run()は途中のジョブでエラーが発生した場合に例外CalledJobErrorを投げて知らせてくれます。
SimpleJobManager.run()を利用したバッチ処理は、次のように書くことができます。
jobContexts = [
{ "id": "hoge", "commandLine": r"timeout /t 1 /nobreak" },
{ "id": "piyo", "commandLine": r"timeout /t 3 /nobreak", "waits": [ "hoge" ] },
{ "id": "fuga", "commandLine": r"timeout /t 1 /nobreak", "waits": [ "hoge" ] },
{ "id": "moga", "commandLine": r"timeout /t 2 /nobreak", "waits": [ "piyo", "fuga" ] },
]
jobManager = SimpleJobManager()
jobManager.entry(jobContexts)
jobManager.run()
エラー処理
エラー処理は、すべてのスレッドが完了した後に行います。SimpleJobManager.errorOccurred()は呼び出し時点でのエラー検知を行うだけなので、たとえば、最後の処理のみがエラーとなった場合、SimpleJobManager.errorOccurred()を呼び出す前に監視ループを抜けてしまうことも考えられます。
したがって、実行中のスレッドがすべて完了した時点で改めてエラー・チェックをしなければなりません。すべてのジョブの完了を待ち合わせるには、すでに述べた通り、SimpleJobManager.join()を呼び出します。
注意が必要なのは、エラーが発生したジョブのrunningStatusも"Completed"になっている点です。ジョブ自体は完了しているため、ステータスは"Completed"となります。具体的なエラー判定はexitCodeの値を見て行わなければなりません。
SimpleJobManager.run()を呼び出した場合は先ほど記述したように途中のジョブでエラーが発生した場合に例外CalledJobErrorを投げますので、例外をキャッチしてエラー処理をさせます。
リトライの設定
各ジョブにはタイム・アウト時間を設定することができます。タイム・アウトした場合、処理をリトライさせることが可能です。
処理をリトライさせるためのパラメータは以下の通りです。
タイム・アウトする時間(秒)
リトライ回数(n > 0)
ディレイ(秒)
バック・オフ(遅延係数)
タイム・アウトしたジョブが検出された場合は、ステータスが"RetryOut"となります。
リラン
ジョブをリランさせることもできます。エラーやタイム・アウトになった原因を取り除いた後、SimpleJobManager.rerun()を呼び出すと、エラー、あるいはタイム・アウトになったスレッド・オブジェクトが再作成されるので、再びジョブを実行させられます。
jobManager.rerun()
短いバッチ処理であればエラーが解消するのを待ち受けるよりも、最初から実行しなおしたほうが手っ取り早いかもしれません。
実行結果の取得
ジョブの実行結果を取得するにはSimpleJobManager.report()を呼び出します。
たとえば、次のような結果が返されます。
{
"results": [
{
"hoge": {
"runnigStatus": "Completed",
"retried": null,
"exitCode": 0,
"startDateTime": "2023/05/18 21:45:24.243201",
"finishDateTime": "2023/05/18 21:45:25.189776",
"elapsedTime": "00:00:00.946575"
}
},
{
"piyo": {
"runnigStatus": "Completed",
"retried": null,
"exitCode": 0,
"startDateTime": "2023/05/18 21:45:25.253739",
"finishDateTime": "2023/05/18 21:45:28.116363",
"elapsedTime": "00:00:02.862623"
}
},
{
"fuga": {
"runnigStatus": "Completed",
"retried": null,
"exitCode": 1,
"startDateTime": "2023/05/18 21:45:25.253739",
"finishDateTime": "2023/05/18 21:45:25.269691",
"elapsedTime": "00:00:00.15952"
}
},
{
"moga": {
"runnigStatus": "Ready",
"retried": null,
"exitCode": null,
"startDateTime": null,
"finishDateTime": null,
"elapsedTime": null
}
}
]
}
ログ出力
各ジョブの出力をログ・ファイルへ書き出すことができます。SimpleJobManagerのコンストラクタに出力先ディレクトリを指定すると、ジョブに付与したIDに拡張子を付与したファイルがジョブ実行後に生成されるようになります。
標準エラーは標準出力へリダイレクトしているため、ログ・ファイルには両方のストリームの情報が含まれます。もし、ストリームを細かくハンドリングしたいのであれば、各ジョブの中で必要な処理を記述し、SimpleJobManagerではログを出力しないようにします。
SimpleJobManagerのログ出力機能は、各処理を作りっぱなしの状態(標準出力と標準エラーを垂れ流した状態)でも利用できるようにするために実装された機能です。簡易的な機能ですが大半のケースでは十分な効果を得ることができるでしょう。
JSONファイルの読み込み
ジョブ・コンテキストをJSONファイルに定義することができます。
{
"jobContexts": [
{ "id": "hoge", "commandLine": "timeout /t 1 /nobreak" },
{ "id": "piyo", "commandLine": "timeout /t 3 /nobreak" }
]
}
jobContextsキーにコマンド・ラインの情報を羅列し、SimpleJobMangaer.entryFromJson()で読み込みます。
セマフォ
リソース管理をするためにセマフォを利用することができます。たとえば、同じファイルへの更新を行う処理が存在する場合、セマフォでロックを掛けておけば、同時更新してファイルが壊れるような状況を避けられます。
semaphore = threading.Semaphore(1)
jobContexts = [
{ "id": "hoge", "commandLine": "hoge.bat", "semaphore": semaphore" },
{ "id": "piyo", "commandLine": "piyo.bat", "waits": [ "hoge" ] },
{ "id": "fuga", "commandLine": "fuaga.bat", "waits": [ "piyo" ], "semaphore": semaphore },
]
上記のジョブ定義ではジョブpiyoはジョブhogeの完了を待っていますが、ジョブfugaはジョブhogeが利用しているセマフォが空くまで待ちます。
セマフォもwatisによるジョブの待ち合わせも、他のジョブの完了を待ちますが、セマフォによる待ち合わせは処理に順序性を持たせる必要がないという点が異なります。
次の図ではセマフォを使ってmoga.exeとbar.batのジョブを待ち合わせています。
セマフォは空いているものを最初に取った人が使うことができる仕組みなので、たとえば、同じファイルを更新する処理の待ち合わせに利用することができます。ただ、順不同で更新されるので同じファイルへの更新をセマフォで管理したい場合、順不同のアクセスをして問題がないかを見極める必要があります。
moga.exeとbar.batのジョブをwaitsで関連づけてシリアライズするのも悪くはないのですが、セマフォを利用しておけば先に終わった処理がその時点で処理を開始できるので、全体的にはシリアライズするよりも処理時間の短縮が見込めます。
なお、JSONでジョブ・コンテキストを指定する場合、セマフォを利用することはできません。
並行処理の注意点
ジョブ並行処理をさせると実行時間を短縮することに期待できます。一方で、並行処理が複雑になるほどノード・グラフの管理コストが跳ね上がります。処理を1つ追加するだけでも難儀する場合があります。
リソース管理が複雑になる場合もあります。処理を同時に実行するということはリソースの奪い合いになるということです。一番わかりやすいのがメモリでしょうか。シリアル実行させたいた場合はギリギリまでメモリを使う処理を実行させることに問題にはならなかったとしても、並列処理を行った事でメモリが不足し、思ったように処理が実行できなくなります。
リソース管理をサポートするための機能として、セマフォを利用できるようになっています。たとえばメモリを多く使う処理をセマフォで管理し、待ち合わせることができます。ただし、実行時間が長くなるジョブをセマフォでロックすると、処理が完了してセマフォを空けるまで他のジョブを待たせることになります。
ジョブ1つの実行時間が長くなっている処理は分割するか、ジョブ全体の見直しをしましょう。一般的なバッチ設計では1つのジョブ内であれもこれもと処理をさせることがないのは、ジョブの組み替えコストを下げるためでもあります。
この記事が気に入ったらサポートをしてみませんか?