見出し画像

Rustのスレッド間で変数を共有する (連載17)

前回、Windows PCとRaspberry Pi 4をネットワークで接続し、TCP/IP通信でRaspberry Pi 4が取得した加速度センサADXL345の値をPCに渡すコードが書けました。

今回は、
その1:加速度値取得をスレッド化
その2:加速度値取得スレッドとTCP/IPサーバスレッドで使用できる共有変数設置
その3:それぞれのスレッドから、排他制御により共有変数使用
を行います。

前回、最新の100個をリングバッファにためて、、、と書きましたが、変更します。
バッファを一つ、どんっとアサインし、リングバッファの形で書き込みはするのですが、PCに送信する際にはバッファ先頭から最新値までを送ります。

時間があれば残りのデータについても送る検討を行いたいとは思いますが、今回はそこが目的ではないので、バッファ制御については簡単にするという事でご容赦ください。

1.加速度値取得をスレッド化

前回参考にさせて頂いたTCP/IPサーバサンプルコードは、実はスレッドを使っています。

let send_job_ingress = send_job_ingress.clone();
std::thread::spawn(move || worker_loop(Worker(i), send_job_ingress));

thread::spawnで、スレッド起動するようです。公式サイトを見てみます。

spawn(move|| スレッドとしてやりたいことをここに書く )
で大丈夫そうです。

moveクロージャーをここに書く理由は、値を使用したいため使用権を渡しますよ、と理解しました。

という事で、加速度値を取得するコードをまるっとここに書いてみます。

    let databuf = [0u8; 4096];
    let num = 0;

     std::thread::spawn(move || {
        loop {
            let axis_array: [f32; 3] =  spi_adxl345::current_acceleration_array();

            let retbytes_x: [u8; 4] = axis_array[0].to_le_bytes();
            let retbytes_y: [u8; 4] = axis_array[1].to_le_bytes();
            let retbytes_z: [u8; 4] = axis_array[2].to_le_bytes();

            if num>(4095-12){ 
                num = 0;
            }
            for n in 0..4 {
                databuf[num + n] = retbytes_x[n];
                databuf[num + n + 4] = retbytes_y[n];
                databuf[num + n + 8] = retbytes_z[n];
            }
            num+=12; 

            println!("x_axis:{} y_axis:{} z_axis:{} num={}", axis_array[0], axis_array[1], axis_array[2], num);

            std::thread::sleep(Duration::from_millis(10));
        }});

(いろいろ間違ってます。有識者の方、言いたいことが沢山あると思います。)
databufに測定した加速度値を入れていきます。
numは、次の測定値が入る場所を示します。
加速度センサの値を読んだ後は10ms秒sleepし、再度加速度センサの値を読む、を繰り返します。

※まだビルドは通しません。

2.共有変数を付ける

ここで、以下2つの変数は、このスレッドだけでなく、TCP/IP送信を行うスレッドでも使います。

    let databuf = [0u8; 4096];
    let num = 0;

排他制御付きで、共有変数にするためにはどうすればよいのでしょう?

答えはこのURLにあります。

let data_mutex = Arc::new(Mutex::new(vec![1, 2, 3, 4]));
let res_mutex = Arc::new(Mutex::new(0));

これ、この形です。
このサイト内のコメントを抜粋します。
// Here we're using an Arc to share memory among threads, and the data inside
// the Arc is protected with a mutex.

Arcを付けると共有メモリにすることができて、その変数はMutexでプロテクトできる、と書かれています。

じゃ、そうしてみましょう!

    let databuf = Arc::new(Mutex::new([0u8; 4096]));
    let num = Arc::new(Mutex::new(0));

次は排他制御のためのコードを書かないといけないはずです。

排他制御:
・スレッドに持っていくため、cloneする
・スレッド内で使うとき、lock する
・lock()後にunwrap()する。
という操作が必要であるようです。

そして、

unlockされるタイミング:
Mutex::lockが返したMutexGuardがスコープを抜けて破棄されるとunlockされる。
早速使ってみましょう!

databufは、スレッドに持っていく前にcloneしてdatabuf_inにしました。
スレッド内でlockしdatabuf_in_1という名前にしました。

numについては、共有変数の定義場所ではposという名前にし、スレッドに持っていく前にcloneしてpos_inにしました。
スレッド内でlockしnumという名前にしました。

    let databuf = Arc::new(Mutex::new([0u8; 4096]));
    let pos = Arc::new(Mutex::new(0));

    let databuf_in = Arc::clone(&databuf);
    let pos_in = Arc::clone(&pos);
     std::thread::spawn(move || {
        loop {
            let axis_array: [f32; 3] =  spi_adxl345::current_acceleration_array();

            let retbytes_x: [u8; 4] = axis_array[0].to_le_bytes();
            let retbytes_y: [u8; 4] = axis_array[1].to_le_bytes();
            let retbytes_z: [u8; 4] = axis_array[2].to_le_bytes();

            let mut num = pos_in.lock().unwrap();
            let mut databuf_in_1 = databuf_in.lock().unwrap();

            if num>(4095-12){   
                num = 0;
            }
            for n in 0..4 {
                databuf_in_1[num + n] = retbytes_x[n];
                databuf_in_1[num + n + 4] = retbytes_y[n];
                databuf_in_1[num + n + 8] = retbytes_z[n];
            }
            num+=12; 

            println!("x_axis:{} y_axis:{} z_axis:{} num={}", axis_array[0], axis_array[1], axis_array[2], *num);

            std::thread::sleep(Duration::from_millis(10));
        }});

良さそうと思ったのですが、ビルドもしていないのに、もう怒られました。

何やら難しそうな。。。

ちょっとビルドしてみましょう。何か教えてくれるかも!

教えてくれました!

あれ、アスタリスクって何だっけ。何かあったような気がする。。。

過去記事を見てみました。

https://note.com/yn_2022/n/nf64d5d2b3547

「” * ”によって()の中身の生ポインタを一旦参照解除し参照先の場所にアクセスするようにします。」

参照解除!
だだし、ここでは生ポインタではなく、Mutex::lockが返したMutexGuardを参照解除しています。

            if *num>(4095-12){
                *num = 0;
            }
            for n in 0..4 {
                databuf_in_1[*num + n] = retbytes_x[n];
                databuf_in_1[*num + n + 4] = retbytes_y[n];
                databuf_in_1[*num + n + 8] = retbytes_z[n];
            }
            *num+=12;

じゃ、ビルドしてみます。
通りました!

3.加速度値をネットワーク経由で返すようにする

ではその2つの共有変数、TCP/IP送信部で使えるようにします。

3.1 TCP/IP送信本体関数で使用するための準備

まず、TCP/IP送信本体関数にもっていかなければいけません。

    for i in 0..NUM_WORKERS {
        let send_job_ingress = send_job_ingress.clone();
        let databuf_out = Arc::clone(&databuf);
        let pos_out = Arc::clone(&pos);
        std::thread::spawn(move || worker_loop(Worker(i), send_job_ingress, databuf_out, pos_out));
    }

先程と同様、

databufは、スレッドに持っていく前にcloneしてdatabuf_outにしました。
posはスレッドに持っていく前にcloneしてpos_outにしました。

スレッド本体の関数は以下になりました。

fn worker_loop(worker: Worker, send_job_ingress: mpsc::SyncSender, databuf_out: Arc<Mutex<[u8; 4096]>>, pos_out: Arc<Mutex>) {
    let mut buf = [0u8; 4096]; 

    loop {
        // Get a new job by sending a FD sender to the acceptor
        let (send_client_fd, recv_client_fd) = mpsc::sync_channel(1);
        let _ = send_job_ingress.send(JobIngress { send_client_fd });
        let Ok(client_fd) = recv_client_fd.recv() else {
            // `Err(RecvError)` indicates that `send_client_fd` has been dropped.
            //
            // This implies the `JobIngress` we sent was discarded without being consumed,
            // meaning the acceptor exited for some reason. 
            return;
        };

        // Serve the client
        let client_fd_raw = client_fd.as_raw_fd(); // for logging
        log::info!("{worker:?}: Serving client FD {client_fd_raw}");
        if let Err(e) = serve_client(&client_fd, &mut buf, &databuf_out, &pos_out) {
            log::info!("{worker:?}: Finished serving client FD {client_fd_raw} with error: {e:?}");
        } else {
            log::info!("{worker:?}: Finished serving client FD {client_fd_raw} successfully");
        }
    }
}

先程と同様、

databufは、スレッドに持っていく前にcloneしてdatabuf_outにしました。
posはスレッドに持っていく前にcloneしてpos_outにしました。

スレッド本体の関数は以下になりました。

fn worker_loop(worker: Worker, send_job_ingress: mpsc::SyncSender, databuf_out: Arc<Mutex<[u8; 4096]>>, pos_out: Arc<Mutex>) {
    let mut buf = [0u8; 4096]; 

    loop {
        // Get a new job by sending a FD sender to the acceptor
        let (send_client_fd, recv_client_fd) = mpsc::sync_channel(1);
        let _ = send_job_ingress.send(JobIngress { send_client_fd });
        let Ok(client_fd) = recv_client_fd.recv() else {
            // `Err(RecvError)` indicates that `send_client_fd` has been dropped.
            //
            // This implies the `JobIngress` we sent was discarded without being consumed,
            // meaning the acceptor exited for some reason. 
            return;
        };

        // Serve the client
        let client_fd_raw = client_fd.as_raw_fd(); // for logging
        log::info!("{worker:?}: Serving client FD {client_fd_raw}");
        if let Err(e) = serve_client(&client_fd, &mut buf, &databuf_out, &pos_out) {
            log::info!("{worker:?}: Finished serving client FD {client_fd_raw} with error: {e:?}");
        } else {
            log::info!("{worker:?}: Finished serving client FD {client_fd_raw} successfully");
        }
    }
}

本体はもう一つ下ですね。
本体はこちらです。

fn serve_client(client_fd: &TcpStream, buffer: &mut [u8], databuf_out: &Arc<Mutex<[u8; 4096]>>, pos_out: &Arc<Mutex>) -> io::Result<()> {
    client_fd.set_write_timeout(Some(Duration::from_secs(30)))?;
    client_fd.set_read_timeout(Some(Duration::from_secs(30)))?; 

    loop {
        // Read data from the socket
        let num_read_bytes = (&*client_fd).read(buffer)?;
        if num_read_bytes == 0 {
            client_fd.shutdown(Shutdown::Both)?;
            break; 
        }

        // Write back the data to the socket
        let mut num = pos_out.lock().unwrap();
        let databuf_out_1 = databuf_out.lock().unwrap();

        match (&*client_fd).write_all(&buffer[..num_read_bytes]) {
            Err(e) if e.kind() == io::ErrorKind::WriteZero => {
                break;
            }
            result => result?,
        };
    }

    Ok(())
}

定義した共有変数を、TCP/IP送信関数内で使用する準備が整いました。

3.2 TCP/IP送信データをバッファから取得する

ではいよいよdatabuf_out_1の中のデータを、インデックス0から(num-1)までTCP/IP送信するコードを書きます。

まずはlock()して、アクセス権を取ります。

let databuf_out_1 = databuf_out.lock().unwrap();

そして、バッファにアクセスします。

match (&*client_fd).write_all(databuf_out_1[..*num]) { 

このままだと、以下のように指摘されます。

また安直に、ビルドに頼ってみます。

ビルドすると、以下のようにアドバイス頂きました。

あ、そうか。参照するには&がいるんだっけ。
過去記事によると、「&をつけ、借用であることを明示的に伝えます。」と書いています。

match (&*client_fd).write_all(&databuf_out_1[..*num]) {

3.3 バッファ内の最新データ格納位置を知る

では、numの値を取得し、バッファ内でTCP/IP送信する領域を決定します。

まずはlock()して、アクセス権を取ります。

let mut num = pos_out.lock().unwrap();

databuf_out_1バッファ内インデックスがnumになる手前まで送信します。(この行は先ほど既に書きましが)

さらに、送信し終わった後にnumをゼロクリアします。

        match (&*client_fd).write_all(&databuf_out_1[..*num]) {
            Err(e) if e.kind() == io::ErrorKind::WriteZero => {
                break;
            }
            result => result?,
        };
        println!("num={}", *num);
        *num = 0;

できました!
ビルドも通りました。

4.PC側アプリの変更

PC側のC#アプリを以下のように変更します。

仕様:
・getボタンでデータ取得(これは同じ)
・取得したデータの最後の加速度値をダイアログに表示
・取得したデータ数も表示
・取得したデータはすべてファイルにセーブする。
・格納するファイル名は、年月日_時分秒.csv。

5.実行!

実行できました!

なんと、また、SOLID側デバッグ不要でした!
ビルドが通ったら動きました。

・動作ログ出力

Worker(0)と出ているところで、TCP/IP受信しました。
そこからnum=3324と出ているところまでの間で、たまった加速度センサの値をPCにTCP/IP送信しています。

送信データは3324バイト。X/Y/Z軸のデータ一式が12バイトなので、
3324/12=277セットのデータが送られています。

・PC側アプリ表示

277セット分のデータが送られていることがわかります。
最終加速度値は、表示している通りです。上記ターミナルの値と同様です。

・ファイルセーブ内容
セーブしたファイルの先頭です。

最後はこうなっています。

CSVファイルにし、Excelでグラフ表示してみました。

Raspberry Pi 4を入れた容器を指ピンしてみました。こんな感じで取れました。

最後に、デバッグ用に加速度センサの値を読んだ直後にその値を表示している部分を削除(コメントアウト)しました。
これがあるとUART出力のために時間を要するため、です。

あまり変わらないですね。

※加速度センサを読んだ後にsleepする時間を1msにしてみましたが、正しい値がセンサから読めないようだったので、深追いせず断念しました。

いろいろありましたが、ここまで動かすことができました!
参照の&と*が、自発的に使えるほど理解はしていないですが、今回の内容についてはエラーメッセージに助けられて書くことができました。

次回は、有識者チェックをして頂いて、その結果を書きたいと思います。

この記事が気に入ったらサポートをしてみませんか?