ぶらり組込みRustライブラリ探索の旅 BBQueue編 -スレッドセーフなSingle Producer Single Consumer Queue-

ファームウェアエンジニアの中林 (id:tomo-wait-for-it-yuki) です。あけましておめでとうございます。 今年の目標はNature Remoのファームウェア開発にRustを導入すること、です。そこで引くに引けない状況を作り出すためにRustに関するブログエントリを書き、既成事実を積み上げていきます。

ぶらり組込みRustライブラリ探索の旅、と題して組込みRustで使えるライブラリをゆるく紹介していくシリーズをやりたいと思います。第一弾はBBQueueです。

BBQueue

本エントリ内で紹介する使い方や内部実装は、v0.5.1をもとにしています。

github.com

https://docs.rs/bbqueue/0.5.1/bbqueue/

特徴

BBQueueはno_stdで使えるだけでなく、スレッドセーフで、排他制御なしに使える Single Producer / Single Consumer なリングバッファです。BBQueueを使うと動的なメモリ確保なしに、スレッド間で可変長フレームを送受信できます。さらに、フレームは常に連続したメモリブロックに配置されます。「動的なメモリ確保なしに可変長フレームを送受信できる」というところがポイントで、もし固定長 (特定の型) のデータに対する Single Producer / Single Consumer な Queue で良ければ、heapless crate の spsc を使うことができます。

スレッド間で可変長フレームを送受信する一般的な方法としては、Vec (C では malloc) を使う方法があります。しかし、組込みシステムでは、実行時のメモリ使用量を測定可能にしたい、という要求がしばしばあります。BBQueueを使うと固定長のバッファを静的 (もしくはスタック) に確保して、うまく使い回すことができます。

ドキュメントではBBQueueは組込みシステムのDMAで使うことを第一に考えたリングバッファという説明がなされています。通常のリングバッファではheadとtailがラップアラウンドするとメモリブロックが非連続になりますが、BBQueueでは常に連続したメモリブロックを使うため、DMAから直接データを書き込むことができます。その仕組みはバッファの末尾までで十分なメモリブロックを確保出来ない場合に、バッファの先頭からメモリブロックを確保する、というシンプルなものです。

プロセッサのアトミック命令を使っているので一部のターゲットではコンパイルエラーになります。A拡張のないRISC-Vなど。ただし、特別にCortex-M0つまりthumbv6をサポートしており、同様の対応を行えばアトミック命令を実装していないターゲットでもコンパイルできそうです。

もう少し詳しいことは、Ferrous Systemsのブログポストでわかりやすい図付きで説明されています。

ferrous-systems.com

使い方

BBQueueには2つの使い方があります。

  1. データをQueueに read / write する
  2. フレーム化したデータをQueueに read / write する

1 の場合、Queueへの読み書きは次のようになり、複数回に分けて書き込んだ場合、読み出し時にその境界はわかりません。

  • Queueに5バイト書き込む
  • Queueに3バイト書き込む
  • Queueを読み出すと8バイトのデータが読み出せる

2 の場合は、フレームごとに読み書きが発生します。

  • Queueに5バイトのフレームを書き込む
  • Queueに3バイトのフレームを書き込む
  • Queueから5バイトのフレームを読み出す
  • Queueから3バイトのフレームを読み出す

このことを念頭において、bbqueueを使うコードを書いてみます。

Cargo.tomlには、bbqueueへの依存を追加するだけです。後々のテストで使うので、rand crate も追加しています。

Cargo.toml
[dependencies]
bbqueue = "0.5"
# 後々のテストで使う
rand = "0.6"

1回書いて、1回読む

書き込み / 読み出しは次の手順で行います。

  1. grantを得る
  2. 書き込む / 読み出す
  3. commit / release で書き込みの確定 / 読み出しデータの解放をする

コードにすると次の通りです。

    use bbqueue::{BBBuffer, Error};

    #[test]
    fn one_byte_read_write() -> Result<(), Error> {
        let queue: BBBuffer<1024> = BBBuffer::new(); // ★1
        // p: Producer, c: Consumer
        let (mut p, mut c) = queue.try_split()?; // ★2

        // Producer
        let mut wg = p.grant_exact(1)?; // ★3
        assert_eq!(wg.len(), 1);
        wg[0] = 123; // ★4
        wg.commit(1); // ★5

        // Consumer
        let rg = c.read()?; // ★6
        assert_eq!(rg.len(), 1);
        assert_eq!(rg[0], 123); // ★7
        rg.release(1); // ★8

        Ok(())
    }

★1: BBBuffer::new()で固定長のバッファを内部に持つインスタンスを生成します。BBBuffer::new()const fnになっており、staticスコープのインスタンスを生成する場合にも使えます。
★2: try_split()で Producer / Consumer に分割します。すでに分割済みの場合はエラーになります。

Producer 側
★3: grant_exact(1) でQueueに1バイト書くgrantを取得します
★4: Queueにデータ 123 を書き込みます
★5: Queueへのデータ書き込みを確定します

Consumer 側
★6: Queueからデータを読み出すgrantを取得します
★7: 読み出したデータは1バイトで、123 です
★8: 読み出したデータを解放します

grantをcommit / releaseしないと新しいgrantは得られません。

    #[test]
    fn not_granted() -> Result<(), Error> {
        let queue: BBBuffer<1024> = BBBuffer::new();
        let (mut p, mut c) = queue.try_split()?;

        let mut wg = p.grant_exact(1)?;
        assert_eq!(wg.len(), 1);
        wg[0] = 123;

        // write grant not released yet
        let another_wg = p.grant_exact(1);
        assert_eq!(another_wg, Err(Error::GrantInProgress));

        // release write grant
        wg.commit(1);

        let rg = c.read()?;
        assert_eq!(rg[0], 123);

        // read grant not released yet
        let another_rg = c.read();
        assert_eq!(another_rg, Err(Error::GrantInProgress));

        Ok(())
    }

複数バイト読み書きする場合は、次のような感じです。

        let mut wg = p.grant_exact(3)?;
        assert_eq!(wg.len(), 3);
        wg[0..].copy_from_slice(&[1, 2, 3]);
        wg.commit(3);

        let rg = c.read()?;
        assert_eq!(&*rg, &[1, 2, 3]);
        rg.release(3);

複数回書いて、1回で読む

try_split()で分割した Producer / Consumer では、複数回にわけて書き込みをしても、読み込むときは一括で読み出せます。データを分解して処理したい場合は、データをパースして使った分だけrelease()します。

    #[test]
    fn write_twice_read_once() -> Result<(), Error> {
        let queue: BBBuffer<1024> = BBBuffer::new();
        let (mut p, mut c) = queue.try_split()?;

        let mut wg = p.grant_exact(1)?;
        wg[0] = 1;
        wg.commit(1);

        let mut wg = p.grant_exact(1)?;
        wg[0] = 2;
        wg.commit(1);

        let rg = c.read()?;
        assert_eq!(&*rg, &[1, 2]);
        rg.release(2);

        Ok(())
    }

commit / release を Drop 時に自動的にやる

自分でお試しコードを書いていて何度かcommit / release を忘れることがありました。これを防ぐために、commit / release を Drop 時に自動的に行うメソッドが用意されています。書き込みはto_commit() / 読み出しはto_release()を使います。to_release()を使う場合、読み出しのgrantはmutableにします。

        {
            // スコープを抜けると自動的にcommitされる
            let mut wg = p.grant_exact(1)?;
            wg.to_commit(1);
            wg[0] = 1;
        }

        {
            let mut wg = p.grant_exact(1)?;
            wg.to_commit(1);
            wg[0] = 2;
        }

        {
            // grantをmutableに
            let mut rg = c.read()?;
            rg.to_release(2);
            assert_eq!(&*rg, &[1, 2]);
        }

連続したメモリブロックがない

BBQueueの特徴として、連続したメモリブロックにデータを配置する、という点を挙げました。その動作を試すコードを書いてみましょう。

    #[test]
    fn continuous_memory_block_not_available() -> Result<(), Error> {
        // 8バイトのバッファを確保
        let queue: BBBuffer<8> = BBBuffer::new();
        let (mut p, mut c) = queue.try_split()?;

        // 先頭の3バイト書き込み
        // read   write
        // ┌─────┬───────────┐
        // │  3  │           │
        // └─────┴───────────┘
        let mut wg = p.grant_exact(3)?;
        wg[0..].copy_from_slice(&[1; 3]);
        wg.commit(3);
        // 先頭の3バイト読み出し
        //       read
        //       write
        // ┌─────┬───────────┐
        // │     │           │
        // └─────┴───────────┘
        let rg = c.read()?;
        rg.release(3);

        // 次の4バイト書き込み
        //       read      write
        // ┌─────┬────────┬──┐
        // │     │   4    │  │
        // └─────┴────────┴──┘
        let mut wg = p.grant_exact(4)?;
        wg[0..].copy_from_slice(&[1; 4]);
        wg.commit(4);

        // 連続した4バイトを確保できないのでgrant取得に失敗する
        let wg = p.grant_exact(4);
        assert_eq!(wg, Err(Error::InsufficientSize));

ここまでの時点で、8バイトのバッファのうち4バイトを使用しています。残りはバッファの先頭の3バイトとバッファ末尾の1バイトとの合計4バイトです。この状態で4バイトの書き込みgrantを得ようとすると連続した4バイトのメモリブロックを取得できないため、上記コードの通り、Error::InsufficientSize が返ってきます。

ここで問題です。4バイトの書き込みgrantを得るためには、Queueからデータを読み出して解放する必要があります。さて何バイトのデータを解放すればよいでしょうか?

        let rg = c.read()?;
        rg.release(/* 何バイト? */);

正解は、2バイトです。1バイトと思ったあなた!惜しい!私と同じトラップにかかりましたね!

1バイト解放すれば、バッファの先頭から4バイトのデータが未使用になるので、4バイトの書き込みgrantが得られそうですが、なぜでしょうか?真実は実装を読むと判明します。grant_exact()の実装を見てみましょう。書き込みgrantを得ると、readとwriteのラップアラウンドが発生する時だけ特殊な条件が発生します。

bbqueue-0.5.1/src/bbbuffer.rs
    pub fn grant_exact(&mut self, sz: usize) -> Result<GrantW<'a, N>> {
         // 中略
        let already_inverted = write < read;

        let start = if already_inverted {
            if (write + sz) < read {
                // Inverted, room is still available
                write
            } else {
                // Inverted, no room is available
                inner.write_in_progress.store(false, Release);
                return Err(Error::InsufficientSize);
            }
        } else {
            if write + sz <= max {
                // Non inverted condition
                write
            } else {
                // Not inverted, but need to go inverted

                // ★★★ これ! ★★★
                // NOTE: We check sz < read, NOT <=, because
                // write must never == read in an inverted condition, since
                // we will then not be able to tell if we are inverted or not
                if sz < read {
                    // Invertible situation
                    0
                } else {
                    // Not invertible, no space
                    inner.write_in_progress.store(false, Release);
                    return Err(Error::InsufficientSize);
                }
            }
        };
        // 以下略

どういうことかと言うと、ラップアラウンドが発生するときにぴったりバッファの容量を使い切ってしまうと、read / write が指す位置が同じになってしまい、バッファが全て空いている状態なのか、バッファが全て使われている状態なのか判断することができなくなります。そのため、書き込みgrantを得ることで、ラップアラウンドが発生してしまう場合に限って、+1バイト空きが必要となります。

ということで、先程のコードの続きは、次のように書くと、4バイトの書き込みgrantを得られます。

        // 2バイト読み出す
        //           read write
        // ┌──────────┬───┬──┐
        // │          │ 2 │  │
        // └──────────┴───┴──┘
        let rg = c.read()?;
        rg.release(2);

        // バッファの先頭から連続した4バイトを確保できる
        //       write read
        // ┌──────┬───┬───┬──┐
        // │  4   │   │ 2 │  │
        // └──────┴───┴───┴──┘
        let mut wg = p.grant_exact(4)?;
        wg[0..].copy_from_slice(&[1; 4]);
        wg.commit(4);

        Ok(())
    }

フレームごとに読み書きする

可変長のフレームを読み書きします。複数のフレームを書いた場合、読み出し側はフレーム単位でgrantを得ます。

    #[test]
    fn framed_read_write() -> Result<(), Error> {
        let bb: BBBuffer<1024> = BBBuffer::new();
        // p: FrameProducer, c: FrameConsumer
        let (mut p, mut c) = bb.try_split_framed()?; // ★1

        // 10バイトのフレームを書き込む
        let mut wg = p.grant(10)?;
        assert_eq!(wg.len(), 10);
        wg[0..].copy_from_slice(&[1; 10]);
        wg.commit(10);

        // 20バイトのフレームを書き込む
        let mut wg = p.grant(20)?;
        assert_eq!(wg.len(), 20);
        wg[0..].copy_from_slice(&[2; 20]);
        wg.commit(20);

        // 1回目に書き込んだ10バイトのフレームを読み込む
        let frame = c.read().unwrap();
        assert_eq!(&*frame, &[1; 10]);  // ★2
        frame.release();

        // 2回目に書き込んだ10バイトのフレームを読み込む
        let frame = c.read().unwrap();
        assert_eq!(&*frame, &[2; 20]);
        frame.release();

        Ok(())
    }

★1: try_split_framed()でFrameProducer / FrameConsumerに分割するとフレームごとにデータを読み書きします
★2: Queue内には30バイト分のデータがありますが、1フレーム分だけ読み出せています

注意事項として、フレームで読み書きする場合、バッファ格納時にフレームヘッダが追加されます。bbqueueの利用者はフレームヘッダの構成を意識せずに使うことができますが、bbqueueのsrc/vusize.rsを見ると現時点での実装がわかります。フレームの長さが127バイトまでなら1バイトのヘッダが、16,383バイトまでなら2バイトのヘッダが…、という具合です。

//! | Prefix     | Precision | Total Bytes |
//! |------------|-----------|-------------|
//! | `xxxxxxx1` | 7 bits    | 1 byte      |
//! | `xxxxxx10` | 14 bits   | 2 bytes     |
//! | `xxxxx100` | 21 bits   | 3 bytes     |
//! | `xxxx1000` | 28 bits   | 4 bytes     |
//! | `xxx10000` | 35 bits   | 5 bytes     |
//! | `xx100000` | 42 bits   | 6 bytes     |
//! | `x1000000` | 49 bits   | 7 bytes     |
//! | `10000000` | 56 bits   | 8 bytes     |
//! | `00000000` | 64 bits   | 9 bytes     |

commit / release を Drop 時に自動的にやる

FrameConsumerから得たgrantにはto_commit() / auto_release()メソッドが実装されており、このメソッドを呼び出しておくと、Dropのタイミングで自動的にgrantをリリースしてくれます。ライブラリ側でフレームサイズがわかっているため、commit / releaseするサイズを指定する必要はありません。

    #[test]
    fn framed_auto_commit_release() -> Result<(), Error> {
        let bb: BBBuffer<1024> = BBBuffer::new();
        let (mut p, mut c) = bb.try_split_framed()?;

        {
            let mut wg = p.grant(10)?;
            wg.to_commit(10);
            wg[0..].copy_from_slice(&[1; 10]);
        }

        {
            let mut wg = p.grant(20)?;
            wg.to_commit(20);
            wg[0..].copy_from_slice(&[2; 20]);
        }

        {
            let mut frame = c.read().unwrap();
            frame.auto_release(true);
            assert_eq!(&*frame, &[1; 10]);
        }

        {
            let mut frame = c.read().unwrap();
            frame.auto_release(true);
            assert_eq!(&*frame, &[2; 20]);
        }

        Ok(())
    }

マルチスレッドで読み書きする

最後にもう少し実践的な例として、マルチスレッドでフレームを読み書きするコードを書いてみます。テストデータはランダムなバイト列を用意して、ランダムな長さのフレームに分割しています。そのテストデータを、送信スレッドから書き込み、受信スレッドで読み出します。受信スレッドでは期待通りのデータが読み出せているかどうかテストしています。

    #[test]
    fn multi_thread() -> Result<(), Error> {
        use rand::prelude::*;
        use std::thread::spawn;

        // テストデータを作成
        const DATA_SIZE: usize = 1_000_000;
        let mut data = Vec::with_capacity(DATA_SIZE);
        (0..DATA_SIZE).for_each(|_| data.push(rand::random::<u8>()));

        let mut trng = thread_rng();
        let mut chunks_tx = vec![];
        while !data.is_empty() {
            let chunk_sz = trng.gen_range(1, (1024 - 1) / 2);
            if chunk_sz > data.len() {
                continue;
            }
            chunks_tx.push(data.split_off(data.len() - chunk_sz));
        }
        let chunks_rx = chunks_tx.clone();    

        // FrameProducer / FrameConsumer を作成
        static BB: BBBuffer<1024> = BBBuffer::new();
        let (mut tx, mut rx) = BB.try_split_framed().unwrap();

        // 送信スレッドから chunk を 1 frame ごとに queue に書き込む
        let tx_thr = spawn(move || {
            for chunk in chunks_tx.iter() {
                loop {
                    if let Ok(mut wg) = tx.grant(chunk.len()) {
                        wg.copy_from_slice(chunk);
                        wg.commit(chunk.len());
                        break;
                    }
                }
            }
        });

        // 受信スレッドで chunk を 1 frame ごとに読み込み、そのデータ内容をテスト
        let rx_thr = spawn(move || {
            for chunk in chunks_rx.iter() {
                loop {
                    if let Some(frame) = rx.read() {
                        assert_eq!(&*frame, chunk);
                        frame.release();
                        break;
                    }
                }
            }
        });

        tx_thr.join().unwrap();
        rx_thr.join().unwrap();

        Ok(())
    }

おわりに

ぶらり組込みRustライブラリ探索の旅、第一弾はSingle Producer / Single Consumer なリングバッファBBQueueを紹介しました。第二弾があるかどうかはわかりませんが、ご期待ください。


NatureではIoTと電気を組み合わせた新しい電力サービスを作りたい思っており、エンジニアを積極採用中です。

herp.careers

カジュアル面談も実施していますので、興味がある方はお気軽にご応募ください!

herp.careers