Ractorで並列処理を試みる
はじめまして、グロービスのサーバーサイドエンジニアをしている大澤(@qwyngg)と申します。
Ruby3、そしてRactorがでましたね!
さっそくrbenv global 3.0.0して試していこうと思います。
Ractorとは?
並行性のあるコードを書くための新機能です。RubyKaigiでGuildという名前を聞いたことがある方もいらっしゃると思いますが、そのGuildが名称を変えてリリースされました。
名前の通りActorモデルを参考にした機能だそうです。
Ractor.newにブロックを渡すことで並列処理を実行させることができます。
Ractor.new {
5.times do
puts :hoge
end
}
Ractor.new {
5.times do
puts :fuga
end
}
puts :sleep
sleep(5) # Ractorの処理を待ちたいので適当にsleep
このコードは以下のような出力をします
sleep
hogefuga
fuga
hoge
fuga
hoge
fuga
hoge
fuga
hoge
並列処理が行われていることがわかりますね。
注意点としてはRactor.newに渡すブロックはProc#isolateによって外部スコープから分離されることです。よって以下のようなコードはエラーとなります。
x = 1
Ractor.new do
puts x + 1
end
#=>
<internal:ractor>:267:in `new': can not isolate a Proc because it accesses outer variables (x). (ArgumentError)
from sample.rb:2:in `<main>'
Ractorでは別のRactorにオブジェクトを送信することができます。その際の同期処理もRactorが行ってくれます。
このオブジェクトの送信の方式は以下の2種類です。
1. push型: r.send(obj) -> Ractor.receive
r = Ractor.new do
x = Ractor.receive
x + 1
end
# Ractor.newに渡された引数はメッセージとして送信されブロックパラメータに渡される
Ractor.new(r) do |r|
r.send(100)
end
puts r.take #takeによって値が返却されるまで待つ
#=> 101
2. pull型: Ractor.yield(obj) -> r.take
r1 = Ractor.new do
Ractor.yield(:hoge)
end
r2 = Ractor.new(r1) do |r1|
r1.take
end
puts r2.take
#=> hoge
実際にこれらの機能を試してみましょう。
CSPのバッファとして使う
Actorモデルに対してよくCSPが持ち出されますね。
最初に記載した笹田さんがRactorのインターフェースについて書いている記事にある文章を引用します。
いくつかのパターンでは、CSP のほうが書きやすい、というのがわかっていたのですが、Ractor 自体をチャンネルのように使えば、性能を気にしなければ、実は CSP とほぼ同じようなことができることがわかったので、とりあえず Actor model 風のインターフェースをベースにしました。性能はあとでなんとかしよう、と思っています。Actor っぽい push 型のコミュニケーション手段と、Actor っぽくない pull 型のコミュニケーション手段が混ざっているのは、この辺を作りやすくするためです。
ということでCSPにおけるバッファとしても使えそうです。
チャンネル(バッファ)を使ってメッセージをやり取りしてみます。公式のドキュメントでもpipeとして紹介されているやり方です
pipe = Ractor.new do
loop do
Ractor.yield Ractor.receive
end
end
Ractor.new(pipe) do |pipe|
pipe << 'hello!'
end
Ractor.new(pipe) do |pipe|
pipe << 'world!'
end
2.times do
puts pipe.take
end
# world!
# hello!
hello!とworld!の出力の順番は実行時のスケジューリングによって変わります
Ractorがpush型のsend/receiveとpull型のyield/takeどちらも持っているのでこういう書き方ができますね。
フィボナッチサーバー
フィボナッチ数を計算するプロセスとそれを管理するスケジューラーを使って並列処理時の効果を確かめたいと思います。
Elixirが好きな方はお分かりだと思いますが、笹田さんが翻訳に貢献してくださっている『プログラミングElixir』にて扱われていた方法です。
整数の配列を渡された時にそれぞれのフィボナッチ数を並列に計算するような処理を書いていきます。
[37, 37, 37, 37, 37, 37]
=>
[{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817}]
以下に目指すべき流れの図を示します。
スケジューラーはフィボナッチサーバープロセスを望む並列数の数だけ生成します。
フィボナッチサーバーはフィボナッチ数を実際に計算するプロセスです。自らの計算の準備ができれば:ready、計算が終われば:answerを送信します。スケジューラーは計算結果を記録し必要がなくなればフィボナッチサーバーに:shutdownを送信します。
まずはフィボナッチサーバーの実装です。
def fib(n)
if n.zero?
0
elsif n < 2
1
else
fib(n - 2) + fib(n - 1)
end
end
fib_server = lambda do
Ractor.new do
loop do
Ractor.yield([:ready])
case Ractor.receive
in :shutdown
break
in n
Ractor.yield([:answer, n, fib(n)])
end
end
end
end
Procインスタンスのfib_serverがフィボナッチサーバーとなります。Ractor.receiveによってオブジェクトを受け取るまではブロッキングされることになります。その後Ractor.yieldを使って計算結果を送信しています。
module Scheduler
module_function
def run(num_processes, server, queue)
result = []
processes = num_processes.times.map do
server.call
end
loop do
case Ractor.select(*processes)
in process, [:ready]
if queue.empty?
process << :shutdown
if processes.size > 1
processes.delete(process)
else
break
end
else
queue in [head, *tail]
process << head
queue = tail
end
in _process, [:answer, n, ans]
result.push({ n => ans })
end
end
result
end
end
Scheduler.run(num_processes, server, queue)はserverオブジェクトをnum_processes分起動します。このスケジューラープロセスは生成したプロセス達からメッセージを受け取ります。他のプロセスから:readyを受け取った場合にはそのプロセスに対してqueueの最初の要素を送信して計算させます。この時、queueが空だった場合は送信してきたプロセスに対して:shutdownを送信します。:answerを受け取った場合は回答を記録します。
1~10まで並行処理する数を変えてfib(37)を6回計算させます。メソッドfib(n)がナイーブな実装なので単体だと結構時間がかかりそうな処理です。
samples = [37, 37, 37, 37, 37, 37]
label = "number of processes"
Benchmark.benchmark(label + Benchmark::CAPTION) do |bench|
(1..10).each do |num_processes|
bench.report(" " * label.size + "#{num_processes}") { Scheduler.run(num_processes, fib_server, samples) }
end
end
このような出力になります。
number of processes user system total real
1 16.742956 0.000000 16.742956 ( 16.743398)
2 17.034824 0.000000 17.034824 ( 8.520948)
3 17.836377 0.000000 17.836377 ( 6.043382)
4 18.765851 0.010000 18.775851 ( 5.792512)
5 20.254153 0.000000 20.254153 ( 6.113864)
6 21.012237 0.000000 21.012237 ( 3.930977)
7 21.365843 0.019993 21.385836 ( 3.808822)
8 20.250363 0.000000 20.250363 ( 3.524195)
9 20.078534 0.000000 20.078534 ( 3.646173)
10 20.645769 0.039976 20.685745 ( 3.763823)
実行時間に大幅な減少が見られますね!
今回、筆者の8コアのシステムではプロセス数8まで実行時間が減少しました。ちゃんと8並列で処理されていることがわかります。
実際の結果の確認もしておきましょう。
puts "result when 1 process"
pp Scheduler.run(1, fib_server, samples)
puts "result when 10 process"
pp Scheduler.run(10, fib_server, samples)
#=>
result when 1 process
[{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817}]
result when 10 process
[{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817}]
問題ないですね。
まとめ
実際にRactorを使って並行性のあるコードを書いてみました。メッセージパッシングがシンプルなコードで書けて楽しいです。
今までのRubyになかった概念での記法ですし、これから変更されていくことも多いと思います。どんどん進化するRubyが楽しみですね!
この記事が気に入ったらサポートをしてみませんか?