見出し画像

Ractorで並列処理を試みる

はじめまして、グロービスのサーバーサイドエンジニアをしている大澤(@qwyngg)と申します。

Ruby3、そしてRactorがでましたね!

さっそくrbenv global 3.0.0して試していこうと思います。

Ractorとは?

並行性のあるコードを書くための新機能です。RubyKaigiでGuildという名前を聞いたことがある方もいらっしゃると思いますが、そのGuildが名称を変えてリリースされました。

名前の通りActorモデルを参考にした機能だそうです。

Ractor.newにブロックを渡すことで並列処理を実行させることができます。

Ractor.new {
 5.times do
   puts :hoge
 end
}

Ractor.new {
 5.times do
   puts :fuga
 end
}

puts :sleep
sleep(5) # Ractorの処理を待ちたいので適当にsleep

このコードは以下のような出力をします

sleep
hogefuga

fuga
hoge
fuga
hoge
fuga
hoge
fuga
hoge

並列処理が行われていることがわかりますね。

注意点としてはRactor.newに渡すブロックはProc#isolateによって外部スコープから分離されることです。よって以下のようなコードはエラーとなります。

x = 1
Ractor.new do
 puts x + 1
end
#=>
<internal:ractor>:267:in `new': can not isolate a Proc because it accesses outer variables (x). (ArgumentError)
       from sample.rb:2:in `<main>'

Ractorでは別のRactorにオブジェクトを送信することができます。その際の同期処理もRactorが行ってくれます。

このオブジェクトの送信の方式は以下の2種類です。

1. push型: r.send(obj) -> Ractor.receive

r = Ractor.new do
 x = Ractor.receive
 x + 1
end

# Ractor.newに渡された引数はメッセージとして送信されブロックパラメータに渡される
Ractor.new(r) do |r| 
 r.send(100)
end

puts r.take #takeによって値が返却されるまで待つ 

#=> 101

2. pull型: Ractor.yield(obj) -> r.take

r1 = Ractor.new do
 Ractor.yield(:hoge)
end

r2 = Ractor.new(r1) do |r1|
 r1.take
end

puts r2.take

#=> hoge

実際にこれらの機能を試してみましょう。


CSPのバッファとして使う

Actorモデルに対してよくCSPが持ち出されますね。

最初に記載した笹田さんがRactorのインターフェースについて書いている記事にある文章を引用します。

いくつかのパターンでは、CSP のほうが書きやすい、というのがわかっていたのですが、Ractor 自体をチャンネルのように使えば、性能を気にしなければ、実は CSP とほぼ同じようなことができることがわかったので、とりあえず Actor model 風のインターフェースをベースにしました。性能はあとでなんとかしよう、と思っています。Actor っぽい push 型のコミュニケーション手段と、Actor っぽくない pull 型のコミュニケーション手段が混ざっているのは、この辺を作りやすくするためです。

ということでCSPにおけるバッファとしても使えそうです。
チャンネル(バッファ)を使ってメッセージをやり取りしてみます。公式のドキュメントでもpipeとして紹介されているやり方です

pipe = Ractor.new do
 loop do
   Ractor.yield Ractor.receive
 end
end

Ractor.new(pipe) do |pipe|
 pipe << 'hello!'
end

Ractor.new(pipe) do |pipe|
 pipe << 'world!'
end

2.times do
 puts pipe.take
end

# world!
# hello!


hello!とworld!の出力の順番は実行時のスケジューリングによって変わります

Ractorがpush型のsend/receiveとpull型のyield/takeどちらも持っているのでこういう書き方ができますね。

フィボナッチサーバー

フィボナッチ数を計算するプロセスとそれを管理するスケジューラーを使って並列処理時の効果を確かめたいと思います。
Elixirが好きな方はお分かりだと思いますが、笹田さんが翻訳に貢献してくださっている『プログラミングElixir』にて扱われていた方法です。
整数の配列を渡された時にそれぞれのフィボナッチ数を並列に計算するような処理を書いていきます。

[37, 37, 37, 37, 37, 37] 
=> 
[{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817}]

以下に目指すべき流れの図を示します。

画像1

スケジューラーはフィボナッチサーバープロセスを望む並列数の数だけ生成します。

フィボナッチサーバーはフィボナッチ数を実際に計算するプロセスです。自らの計算の準備ができれば:ready、計算が終われば:answerを送信します。スケジューラーは計算結果を記録し必要がなくなればフィボナッチサーバーに:shutdownを送信します。

まずはフィボナッチサーバーの実装です。

def fib(n)
 if n.zero?
   0
 elsif n < 2
   1
 else
   fib(n - 2) + fib(n - 1)
 end
end

fib_server = lambda do
 Ractor.new do
   loop do
     Ractor.yield([:ready])
     case Ractor.receive
     in :shutdown
       break
     in n
       Ractor.yield([:answer, n, fib(n)])
     end
   end
 end
end

Procインスタンスのfib_serverがフィボナッチサーバーとなります。Ractor.receiveによってオブジェクトを受け取るまではブロッキングされることになります。その後Ractor.yieldを使って計算結果を送信しています。

module Scheduler 
 module_function
 
 def run(num_processes, server, queue)
   result = []
   processes = num_processes.times.map do
     server.call
   end
   
   loop do
     case Ractor.select(*processes)
     in process, [:ready]
       if queue.empty?
         process << :shutdown
         if processes.size > 1
           processes.delete(process)
         else
           break
         end
       else
         queue in [head, *tail]
         process << head
         queue = tail
       end
     in _process, [:answer, n, ans]
       result.push({ n => ans })
     end
   end
   
   result
 end
end

Scheduler.run(num_processes, server, queue)はserverオブジェクトをnum_processes分起動します。このスケジューラープロセスは生成したプロセス達からメッセージを受け取ります。他のプロセスから:readyを受け取った場合にはそのプロセスに対してqueueの最初の要素を送信して計算させます。この時、queueが空だった場合は送信してきたプロセスに対して:shutdownを送信します。:answerを受け取った場合は回答を記録します。

1~10まで並行処理する数を変えてfib(37)を6回計算させます。メソッドfib(n)がナイーブな実装なので単体だと結構時間がかかりそうな処理です。

samples = [37, 37, 37, 37, 37, 37]
label = "number of processes"

Benchmark.benchmark(label + Benchmark::CAPTION) do |bench|
 (1..10).each do |num_processes|
   bench.report(" " * label.size + "#{num_processes}") { Scheduler.run(num_processes, fib_server, samples) }
 end
end

このような出力になります。

 number of processes      user     system      total        real
                  1 16.742956   0.000000  16.742956 ( 16.743398)
                  2 17.034824   0.000000  17.034824 (  8.520948)
                  3 17.836377   0.000000  17.836377 (  6.043382)
                  4 18.765851   0.010000  18.775851 (  5.792512)
                  5 20.254153   0.000000  20.254153 (  6.113864)
                  6 21.012237   0.000000  21.012237 (  3.930977)
                  7 21.365843   0.019993  21.385836 (  3.808822)
                  8 20.250363   0.000000  20.250363 (  3.524195)
                  9 20.078534   0.000000  20.078534 (  3.646173)
                  10 20.645769   0.039976  20.685745 (  3.763823)

実行時間に大幅な減少が見られますね!  
今回、筆者の8コアのシステムではプロセス数8まで実行時間が減少しました。ちゃんと8並列で処理されていることがわかります。

実際の結果の確認もしておきましょう。

puts "result when 1 process"
pp Scheduler.run(1, fib_server, samples)
puts "result when 10 process"
pp Scheduler.run(10, fib_server, samples)

#=>
result when 1 process
[{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817}]
result when 10 process
[{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817},
{37=>24157817}]

問題ないですね。

まとめ

実際にRactorを使って並行性のあるコードを書いてみました。メッセージパッシングがシンプルなコードで書けて楽しいです。
今までのRubyになかった概念での記法ですし、これから変更されていくことも多いと思います。どんどん進化するRubyが楽しみですね!


この記事が気に入ったらサポートをしてみませんか?