ファームウェアエンジニアの中林 (id:tomo-wait-for-it-yuki) です。あけましておめでとうございます。 今年の目標はNature Remoのファームウェア開発にRustを導入すること、です。そこで引くに引けない状況を作り出すためにRustに関するブログエントリを書き、既成事実を積み上げていきます。
ぶらり組込みRustライブラリ探索の旅、と題して組込みRustで使えるライブラリをゆるく紹介していくシリーズをやりたいと思います。第一弾はBBQueueです。
BBQueue
本エントリ内で紹介する使い方や内部実装は、v0.5.1をもとにしています。
https://docs.rs/bbqueue/0.5.1/bbqueue/
特徴
BBQueueはno_std
で使えるだけでなく、スレッドセーフで、排他制御なしに使える Single Producer / Single Consumer なリングバッファです。BBQueueを使うと動的なメモリ確保なしに、スレッド間で可変長フレームを送受信できます。さらに、フレームは常に連続したメモリブロックに配置されます。「動的なメモリ確保なしに可変長フレームを送受信できる」というところがポイントで、もし固定長 (特定の型) のデータに対する Single Producer / Single Consumer な Queue で良ければ、heapless crate の spsc を使うことができます。
スレッド間で可変長フレームを送受信する一般的な方法としては、Vec (C では malloc) を使う方法があります。しかし、組込みシステムでは、実行時のメモリ使用量を測定可能にしたい、という要求がしばしばあります。BBQueueを使うと固定長のバッファを静的 (もしくはスタック) に確保して、うまく使い回すことができます。
ドキュメントではBBQueueは組込みシステムのDMAで使うことを第一に考えたリングバッファという説明がなされています。通常のリングバッファではheadとtailがラップアラウンドするとメモリブロックが非連続になりますが、BBQueueでは常に連続したメモリブロックを使うため、DMAから直接データを書き込むことができます。その仕組みはバッファの末尾までで十分なメモリブロックを確保出来ない場合に、バッファの先頭からメモリブロックを確保する、というシンプルなものです。
プロセッサのアトミック命令を使っているので一部のターゲットではコンパイルエラーになります。
A
拡張のないRISC-Vなど。ただし、特別にCortex-M0つまりthumbv6
をサポートしており、同様の対応を行えばアトミック命令を実装していないターゲットでもコンパイルできそうです。
もう少し詳しいことは、Ferrous Systemsのブログポストでわかりやすい図付きで説明されています。
使い方
BBQueueには2つの使い方があります。
- データをQueueに read / write する
- フレーム化したデータをQueueに read / write する
1 の場合、Queueへの読み書きは次のようになり、複数回に分けて書き込んだ場合、読み出し時にその境界はわかりません。
- Queueに5バイト書き込む
- Queueに3バイト書き込む
- Queueを読み出すと8バイトのデータが読み出せる
2 の場合は、フレームごとに読み書きが発生します。
- Queueに5バイトのフレームを書き込む
- Queueに3バイトのフレームを書き込む
- Queueから5バイトのフレームを読み出す
- Queueから3バイトのフレームを読み出す
このことを念頭において、bbqueueを使うコードを書いてみます。
Cargo.tomlには、bbqueue
への依存を追加するだけです。後々のテストで使うので、rand
crate も追加しています。
Cargo.toml
[dependencies] bbqueue = "0.5" # 後々のテストで使う rand = "0.6"
1回書いて、1回読む
書き込み / 読み出しは次の手順で行います。
grant
を得る- 書き込む / 読み出す
- commit / release で書き込みの確定 / 読み出しデータの解放をする
コードにすると次の通りです。
use bbqueue::{BBBuffer, Error}; #[test] fn one_byte_read_write() -> Result<(), Error> { let queue: BBBuffer<1024> = BBBuffer::new(); // ★1 // p: Producer, c: Consumer let (mut p, mut c) = queue.try_split()?; // ★2 // Producer let mut wg = p.grant_exact(1)?; // ★3 assert_eq!(wg.len(), 1); wg[0] = 123; // ★4 wg.commit(1); // ★5 // Consumer let rg = c.read()?; // ★6 assert_eq!(rg.len(), 1); assert_eq!(rg[0], 123); // ★7 rg.release(1); // ★8 Ok(()) }
★1: BBBuffer::new()
で固定長のバッファを内部に持つインスタンスを生成します。BBBuffer::new()
はconst fn
になっており、static
スコープのインスタンスを生成する場合にも使えます。
★2: try_split()
で Producer / Consumer に分割します。すでに分割済みの場合はエラーになります。
Producer 側
★3: grant_exact(1)
でQueueに1バイト書くgrant
を取得します
★4: Queueにデータ 123
を書き込みます
★5: Queueへのデータ書き込みを確定します
Consumer 側
★6: Queueからデータを読み出すgrant
を取得します
★7: 読み出したデータは1バイトで、123
です
★8: 読み出したデータを解放します
grant
をcommit / releaseしないと新しいgrantは得られません。
#[test] fn not_granted() -> Result<(), Error> { let queue: BBBuffer<1024> = BBBuffer::new(); let (mut p, mut c) = queue.try_split()?; let mut wg = p.grant_exact(1)?; assert_eq!(wg.len(), 1); wg[0] = 123; // write grant not released yet let another_wg = p.grant_exact(1); assert_eq!(another_wg, Err(Error::GrantInProgress)); // release write grant wg.commit(1); let rg = c.read()?; assert_eq!(rg[0], 123); // read grant not released yet let another_rg = c.read(); assert_eq!(another_rg, Err(Error::GrantInProgress)); Ok(()) }
複数バイト読み書きする場合は、次のような感じです。
let mut wg = p.grant_exact(3)?; assert_eq!(wg.len(), 3); wg[0..].copy_from_slice(&[1, 2, 3]); wg.commit(3); let rg = c.read()?; assert_eq!(&*rg, &[1, 2, 3]); rg.release(3);
複数回書いて、1回で読む
try_split()
で分割した Producer / Consumer では、複数回にわけて書き込みをしても、読み込むときは一括で読み出せます。データを分解して処理したい場合は、データをパースして使った分だけrelease()
します。
#[test] fn write_twice_read_once() -> Result<(), Error> { let queue: BBBuffer<1024> = BBBuffer::new(); let (mut p, mut c) = queue.try_split()?; let mut wg = p.grant_exact(1)?; wg[0] = 1; wg.commit(1); let mut wg = p.grant_exact(1)?; wg[0] = 2; wg.commit(1); let rg = c.read()?; assert_eq!(&*rg, &[1, 2]); rg.release(2); Ok(()) }
commit / release を Drop 時に自動的にやる
自分でお試しコードを書いていて何度かcommit / release を忘れることがありました。これを防ぐために、commit / release を Drop 時に自動的に行うメソッドが用意されています。書き込みはto_commit()
/ 読み出しはto_release()
を使います。to_release()
を使う場合、読み出しのgrantはmutableにします。
{ // スコープを抜けると自動的にcommitされる let mut wg = p.grant_exact(1)?; wg.to_commit(1); wg[0] = 1; } { let mut wg = p.grant_exact(1)?; wg.to_commit(1); wg[0] = 2; } { // grantをmutableに let mut rg = c.read()?; rg.to_release(2); assert_eq!(&*rg, &[1, 2]); }
連続したメモリブロックがない
BBQueueの特徴として、連続したメモリブロックにデータを配置する、という点を挙げました。その動作を試すコードを書いてみましょう。
#[test] fn continuous_memory_block_not_available() -> Result<(), Error> { // 8バイトのバッファを確保 let queue: BBBuffer<8> = BBBuffer::new(); let (mut p, mut c) = queue.try_split()?; // 先頭の3バイト書き込み // read write // ┌─────┬───────────┐ // │ 3 │ │ // └─────┴───────────┘ let mut wg = p.grant_exact(3)?; wg[0..].copy_from_slice(&[1; 3]); wg.commit(3); // 先頭の3バイト読み出し // read // write // ┌─────┬───────────┐ // │ │ │ // └─────┴───────────┘ let rg = c.read()?; rg.release(3); // 次の4バイト書き込み // read write // ┌─────┬────────┬──┐ // │ │ 4 │ │ // └─────┴────────┴──┘ let mut wg = p.grant_exact(4)?; wg[0..].copy_from_slice(&[1; 4]); wg.commit(4); // 連続した4バイトを確保できないのでgrant取得に失敗する let wg = p.grant_exact(4); assert_eq!(wg, Err(Error::InsufficientSize));
ここまでの時点で、8バイトのバッファのうち4バイトを使用しています。残りはバッファの先頭の3バイトとバッファ末尾の1バイトとの合計4バイトです。この状態で4バイトの書き込みgrantを得ようとすると連続した4バイトのメモリブロックを取得できないため、上記コードの通り、Error::InsufficientSize
が返ってきます。
ここで問題です。4バイトの書き込みgrantを得るためには、Queueからデータを読み出して解放する必要があります。さて何バイトのデータを解放すればよいでしょうか?
let rg = c.read()?; rg.release(/* 何バイト? */);
正解は、2バイトです。1バイトと思ったあなた!惜しい!私と同じトラップにかかりましたね!
1バイト解放すれば、バッファの先頭から4バイトのデータが未使用になるので、4バイトの書き込みgrantが得られそうですが、なぜでしょうか?真実は実装を読むと判明します。grant_exact()
の実装を見てみましょう。書き込みgrantを得ると、readとwriteのラップアラウンドが発生する時だけ特殊な条件が発生します。
bbqueue-0.5.1/src/bbbuffer.rs
pub fn grant_exact(&mut self, sz: usize) -> Result<GrantW<'a, N>> { // 中略 let already_inverted = write < read; let start = if already_inverted { if (write + sz) < read { // Inverted, room is still available write } else { // Inverted, no room is available inner.write_in_progress.store(false, Release); return Err(Error::InsufficientSize); } } else { if write + sz <= max { // Non inverted condition write } else { // Not inverted, but need to go inverted // ★★★ これ! ★★★ // NOTE: We check sz < read, NOT <=, because // write must never == read in an inverted condition, since // we will then not be able to tell if we are inverted or not if sz < read { // Invertible situation 0 } else { // Not invertible, no space inner.write_in_progress.store(false, Release); return Err(Error::InsufficientSize); } } }; // 以下略
どういうことかと言うと、ラップアラウンドが発生するときにぴったりバッファの容量を使い切ってしまうと、read / write が指す位置が同じになってしまい、バッファが全て空いている状態なのか、バッファが全て使われている状態なのか判断することができなくなります。そのため、書き込みgrantを得ることで、ラップアラウンドが発生してしまう場合に限って、+1バイト空きが必要となります。
ということで、先程のコードの続きは、次のように書くと、4バイトの書き込みgrantを得られます。
// 2バイト読み出す // read write // ┌──────────┬───┬──┐ // │ │ 2 │ │ // └──────────┴───┴──┘ let rg = c.read()?; rg.release(2); // バッファの先頭から連続した4バイトを確保できる // write read // ┌──────┬───┬───┬──┐ // │ 4 │ │ 2 │ │ // └──────┴───┴───┴──┘ let mut wg = p.grant_exact(4)?; wg[0..].copy_from_slice(&[1; 4]); wg.commit(4); Ok(()) }
フレームごとに読み書きする
可変長のフレームを読み書きします。複数のフレームを書いた場合、読み出し側はフレーム単位でgrantを得ます。
#[test] fn framed_read_write() -> Result<(), Error> { let bb: BBBuffer<1024> = BBBuffer::new(); // p: FrameProducer, c: FrameConsumer let (mut p, mut c) = bb.try_split_framed()?; // ★1 // 10バイトのフレームを書き込む let mut wg = p.grant(10)?; assert_eq!(wg.len(), 10); wg[0..].copy_from_slice(&[1; 10]); wg.commit(10); // 20バイトのフレームを書き込む let mut wg = p.grant(20)?; assert_eq!(wg.len(), 20); wg[0..].copy_from_slice(&[2; 20]); wg.commit(20); // 1回目に書き込んだ10バイトのフレームを読み込む let frame = c.read().unwrap(); assert_eq!(&*frame, &[1; 10]); // ★2 frame.release(); // 2回目に書き込んだ10バイトのフレームを読み込む let frame = c.read().unwrap(); assert_eq!(&*frame, &[2; 20]); frame.release(); Ok(()) }
★1: try_split_framed()
でFrameProducer / FrameConsumerに分割するとフレームごとにデータを読み書きします
★2: Queue内には30バイト分のデータがありますが、1フレーム分だけ読み出せています
注意事項として、フレームで読み書きする場合、バッファ格納時にフレームヘッダが追加されます。bbqueueの利用者はフレームヘッダの構成を意識せずに使うことができますが、bbqueueのsrc/vusize.rs
を見ると現時点での実装がわかります。フレームの長さが127バイトまでなら1バイトのヘッダが、16,383バイトまでなら2バイトのヘッダが…、という具合です。
//! | Prefix | Precision | Total Bytes | //! |------------|-----------|-------------| //! | `xxxxxxx1` | 7 bits | 1 byte | //! | `xxxxxx10` | 14 bits | 2 bytes | //! | `xxxxx100` | 21 bits | 3 bytes | //! | `xxxx1000` | 28 bits | 4 bytes | //! | `xxx10000` | 35 bits | 5 bytes | //! | `xx100000` | 42 bits | 6 bytes | //! | `x1000000` | 49 bits | 7 bytes | //! | `10000000` | 56 bits | 8 bytes | //! | `00000000` | 64 bits | 9 bytes |
commit / release を Drop 時に自動的にやる
FrameConsumerから得たgrant
にはto_commit()
/ auto_release()
メソッドが実装されており、このメソッドを呼び出しておくと、Dropのタイミングで自動的にgrantをリリースしてくれます。ライブラリ側でフレームサイズがわかっているため、commit / releaseするサイズを指定する必要はありません。
#[test] fn framed_auto_commit_release() -> Result<(), Error> { let bb: BBBuffer<1024> = BBBuffer::new(); let (mut p, mut c) = bb.try_split_framed()?; { let mut wg = p.grant(10)?; wg.to_commit(10); wg[0..].copy_from_slice(&[1; 10]); } { let mut wg = p.grant(20)?; wg.to_commit(20); wg[0..].copy_from_slice(&[2; 20]); } { let mut frame = c.read().unwrap(); frame.auto_release(true); assert_eq!(&*frame, &[1; 10]); } { let mut frame = c.read().unwrap(); frame.auto_release(true); assert_eq!(&*frame, &[2; 20]); } Ok(()) }
マルチスレッドで読み書きする
最後にもう少し実践的な例として、マルチスレッドでフレームを読み書きするコードを書いてみます。テストデータはランダムなバイト列を用意して、ランダムな長さのフレームに分割しています。そのテストデータを、送信スレッドから書き込み、受信スレッドで読み出します。受信スレッドでは期待通りのデータが読み出せているかどうかテストしています。
#[test] fn multi_thread() -> Result<(), Error> { use rand::prelude::*; use std::thread::spawn; // テストデータを作成 const DATA_SIZE: usize = 1_000_000; let mut data = Vec::with_capacity(DATA_SIZE); (0..DATA_SIZE).for_each(|_| data.push(rand::random::<u8>())); let mut trng = thread_rng(); let mut chunks_tx = vec![]; while !data.is_empty() { let chunk_sz = trng.gen_range(1, (1024 - 1) / 2); if chunk_sz > data.len() { continue; } chunks_tx.push(data.split_off(data.len() - chunk_sz)); } let chunks_rx = chunks_tx.clone(); // FrameProducer / FrameConsumer を作成 static BB: BBBuffer<1024> = BBBuffer::new(); let (mut tx, mut rx) = BB.try_split_framed().unwrap(); // 送信スレッドから chunk を 1 frame ごとに queue に書き込む let tx_thr = spawn(move || { for chunk in chunks_tx.iter() { loop { if let Ok(mut wg) = tx.grant(chunk.len()) { wg.copy_from_slice(chunk); wg.commit(chunk.len()); break; } } } }); // 受信スレッドで chunk を 1 frame ごとに読み込み、そのデータ内容をテスト let rx_thr = spawn(move || { for chunk in chunks_rx.iter() { loop { if let Some(frame) = rx.read() { assert_eq!(&*frame, chunk); frame.release(); break; } } } }); tx_thr.join().unwrap(); rx_thr.join().unwrap(); Ok(()) }
おわりに
ぶらり組込みRustライブラリ探索の旅、第一弾はSingle Producer / Single Consumer なリングバッファBBQueueを紹介しました。第二弾があるかどうかはわかりませんが、ご期待ください。
NatureではIoTと電気を組み合わせた新しい電力サービスを作りたい思っており、エンジニアを積極採用中です。
カジュアル面談も実施していますので、興味がある方はお気軽にご応募ください!