見出し画像

Rustのスレッド間で変数を共有する ー 有識者レビュー (連載18)

前回書いたコードについて、レビューをして頂きました。
そのフィードバック内容について記載します。

また、フィードバッグ内容に基づいて、一つ実験をしてみましたので、そちらについても記載します。

1.スレッドに渡すデータをまとめる

Mutexとして定義し、各スレッドに渡すデータは以下のように二つあります。

    let databuf = Arc::new(Mutex::new([0u8; 4096]));
    let pos = Arc::new(Mutex::new(0));

 ここは、一つにまとめるべきです。

個別に渡すと、渡す経路で毎回 databuf と pos という名前を言及することになる。渡すデータの組み合わせを後から変えようとすると、多くのコード箇所の変更が必要。

有識者の方のフィードバック

構造体にまとめましょう。

    struct AccelSampleBuffer {
        bytes: [u8; 4096],
        num_bytes: usize,
    }
    let state = Arc::new(Mutex::new(AccelSampleBuffer {bytes:[0u8; 4096], num_bytes:0}));

state構造体をMutex化しました。

次に、以下のようにクローンを作り、各スレッドに渡して処理をします。

・加速度センサ値を配列にためていくスレッド用

let state_in = Arc::clone(&state);

・TCP/IP送信スレッド用

let state_in = Arc::clone(&state);

これらに伴う変更箇所を細々と修正し、ビルドが通り、無事動作しました。

本件は別にRustに限った話ではなく、プログラムを作成する上での一般的な注意事項ですね。

2.lock()後にunwrap()することが多い理由

以下に記述があります。
Mutex in std::sync - Rust (rust-lang.org)

有識者の方に意訳して頂きました。

あるスレッドがMutexを持ったままパニックを起こした場合、唐突にロックが解除されるため、含まれるデータの一貫性が失われてしまうおそれがある。すなわちパニックによってデータが「汚染」されてしまう。汚染されたデータに他のスレッドがアクセスするのを防ぐために、こうして解放されたMutexは "poisoned" 状態になり、デフォルトではアクセスが禁止される。

lockやtry_lockメゾッドのResult型の戻り値で、そうなっているかを判別できる。

通常のMutexの使い方では、戻り値を単純にunwrap()すればよい。こうすることでパニックを自分のスレッドに伝搬し、汚染されたデータに触れるのを避けられる。

しかし、汚染されたMutexはもうアクセスできなくなる、という訳ではない。

汚染されている場合の戻り値はPoisonError型だが、この型のinto_innerメソッドを呼び出すことにより、ロックが成功したときに返されるはずのガード値を取得し、データにアクセスできる。

有識者の方のフィードバック

要は、

ロックされたまま如何ともしがたい状態になったMutexがあっても、ロック解除待ちで永遠に他スレッドが待たされずに済む方法。

さらに、Mutexを持ったままパニックを起こした場合にそのMutexを自動的に開放する、というアプローチも考えられるが、その場合にも「それは汚染している」というマークを付けることで警鐘を鳴らす。

という事ですかね。

Rustでは、Mutexもより安全に使えるよう、考えられているという事ですね。

3.別の共有方法を検討する

別の共有方法があると教えて頂きました。

Arc<Mutex<_>> は最も汎用的な方法ですが、ヒープ割当て・参照カウント操作・ミューテックス操作が伴う。アプリケーション固有の要件に基づいて、トレードオフに注意しながら、より軽量なアプローチを取ることも考えられる。

有識者の方のフィードバック

全部試しているととても時間がかかりそうなので、今回はご紹介のみでご容赦ください。

3.1 グローバル変数 (static) に Mutex<_> を置く

Mutex<>は、それ自体を各スレッドに参照してもらうため、クローンが不要です。
このため、実行速度も速く使用メモリも少なくて済みます。

今回のプログラムに適用する場合、

static STATE: Mutex<AccelMeasureState> = Mutex::new(AccelMeasureState { ... });

のように定義して使うとよい、と助言頂きました。

3.2 アトミック変数 (std::sync::atomic::Atomic*) に状態を格納する

以下のように、構造体の各要素をアトミック化する方法です。

struct AccelSampleBuffer {
 samples: [[AtomicU32; 3]; 4096],
 num_samples: AtomicUsize,
}

let state = Arc::new(AccelSampleBuffer { ... });
let i = 0;
state.samples[i][0].store(axis_array[0].to_bits(), Ordering::Relaxed);
let x_axis = f32::from_bits(state.samples[i][0].load(Ordering::Relaxed));

3.3 チャンネルを使う

チャンネルを使ってバッファ制御をする手もあります。

1.SenderとRecieverを備えたMPSC(multiple producer, single consumer)チャネル

std::sync::mpsc

2.SPSC (single producer, single consumer)チャネルであるリングバッファを使う

rtrb

今回のサンプルの場合、最初からリングバッファを使うのが一番正解だったような。。。

4.【実験】アサインしたバッファをオーバーフローした場合の動作

今回、加速度センサからの値を保持しておくバッファを4096バイト分アサインさいています。
これがオーバーフローした場合、どう動くのか見てみました。

Rustはメモリ安全でに設計されていますが、アーキテクチャまで保証しているわけではありません。それはプログラマが考える事です。

そのプログラマが失敗した場合、どのように動くのか?

4.1 オーバーフローさせてみる

加速度センサからの値をバッファに格納しているコードは以下です。

let mut state_in_1 = state_in.lock().unwrap();

if state_in_1.num_bytes>(4095-12){
    state_in_1.num_bytes = 0;
}

let count = state_in_1.num_bytes;
for n in 0..4 {
    state_in_1.bytes[count + n] = retbytes_x[n];
    state_in_1.bytes[count + n + 4] = retbytes_y[n];
    state_in_1.bytes[count + n + 8] = retbytes_z[n];
    }
state_in_1.num_bytes+=12;

この中の以下の部分。

if state_in_1.num_bytes>(4095-12){
       state_in_1.num_bytes = 0;
}

この部分が、バッファオーバーフローを防いでいる部分です。
これを以下のように変更し、わざとオーバーフローさせてみましょう。

if state_in_1.num_bytes>(4095){
       state_in_1.num_bytes = 0;
}

実行してみました。


超えたからパニックになったよ、と表示されました。
CPUの例外に飛んで暴走したりはしませんでした。

ちなみに、SOLID-IDEの操作で、パニック時にブレークさせることが可能です。
シンボル名 rust_panic を指定してブレークポイントを設定すればOKです。

すなわち、配列のバッファオーバーランなどの実行時エラーについて、デバッグ時には rust_panic()にブレークポイント設定しておけば、それが発生したときに捕まえることができる、という事ですね。

やらかした!をデバッグするのが容易になりそうです。

さらに、ソースコード上で std::panic::catch_unwind 関数を使って、パニックのキャッチコードを書くことができます。ですのでパニックを捕捉し復帰を試みる等が可能です。

加えて、パニックを復帰させるより、システムをリセットしてしまいたい場合もあります。
この場合、パニックの捕捉を無効化し、常にアボートするようにするコンパイルオプションもあります。

4.2 他スレッドからアクセスしてみる

このバッファはArc<Mutex<_>>型で各スレッドに渡されています。
せっかくなので、この状態のままTCP/IP送信スレッドからアクセスする実験をしてみましょう。

PCアプリからTCP/IP送信してみます。

unwrap()でPoisonErrorが帰ってきました!
そして特にデッドロックはしていない様子です。

もう一度PCアプリからTCP/IP送信してみます。

同じエラーがもう一つ増えました。という事はデッドロックはしていないという事です。


今回はここまで。

今回まで作成したプログラムをzip化しました。
以下からダウンロードできます。

spi_adxl345クレート

SOLID-IDE Rustアプリケーション

PC側C#プログラム(Visual Studio 2019系)
(bin\Releaseフォルダ内にexecutableファイルがあります)

Raspberry pi4があれば試してみることが可能です。
何かのお役に立てば幸いです。

#PC側プログラムは何のエラーチェックもしていないので、データが返信されてこない場合等例外が発生します。

次回は、SPI制御に戻り、割り込み処理の実装について考えていきたいと思います。

この記事が気に入ったらサポートをしてみませんか?