並行性データ構造の適切なテスト
1、2、3、2
- Rustライブラリのloomを使って、ロックフリーなデータ構造を徹底的にテストできる
- シンプルな並行カウンターのサンプルコードを掲載
- コードのバグは、インクリメント演算が原子的ではないことにある
use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
#[derive(Default)]
pub struct Counter {
value: AtomicU32,
}
impl Counter {
pub fn increment(&self) {
let value = self.value.load(SeqCst);
self.value.store(value + 1, SeqCst);
}
pub fn get(&self) -> u32 {
self.value.load(SeqCst)
}
}
シンプルなテスト
- 複数スレッドから同じカウンターを繰り返しインクリメントし、結果を確認するテスト
- テストは期待どおり失敗するが、タイミング依存のため再現が難しい
#[test]
fn threaded_test() {
let counter = Counter::default();
let thread_count = 100;
let increment_count = 100;
std::thread::scope(|scope| {
for _ in 0..thread_count {
scope.spawn(|| {
for _ in 0..increment_count {
counter.increment()
}
});
}
});
assert_eq!(counter.get(), thread_count * increment_count);
}
プロパティベーステスト(PBT)
- 状態機械のテストに適したプロパティベーステストの適用を試みる
- スレッドを手動で段階的に実行できれば、別スレッドのload&storeの間に割り込ませやすい
#[test]
fn state_machine_test() {
arbtest::arbtest(|rng| {
let mut state: i32 = 0;
let step_count: usize = rng.int_in_range(0..=100)?;
for _ in 0..step_count {
match *rng.choose(&["inc", "dec"])? {
"inc" => state += 1,
"dec" => state -= 1,
_ => unreachable!(),
}
}
Ok(())
});
}
シンプルな計測
- スレッドが原子的操作の間で「一時停止」できるようにする方法
pub fn increment(&self) {
pause();
let value = self.value.load(SeqCst);
pause();
self.value.store(value + 1, SeqCst);
pause();
}
fn pause() {
// ¯\_(ツ)_/¯
}
管理スレッドAPI
- API設計のルールの1つは、単一ユーザー向けから始めてAPIの感触を理解してから実装を進めること
- 管理スレッドを使ってプロパティベーステストを作成
let counter = Counter::default();
let t1 = managed_thread::spawn(&counter);
let t2 = managed_thread::spawn(&counter);
while !rng.is_empty() {
let coin_flip: bool = rng.arbitrary()?;
if t1.is_paused() {
if coin_flip {
t1.unpause();
}
} else if t2.is_paused() {
if coin_flip {
t2.unpause();
}
}
}
管理スレッドの実装
- 制御スレッドと管理スレッドの間で通信が必要
- 状態を保護するミューテックスと条件変数を使って実装
struct SharedContext {
state: Mutex<State>,
cv: Condvar,
}
#[derive(PartialEq, Eq, Default)]
enum State {
#[default]
Running,
Paused,
}
impl SharedContext {
fn pause(&self) {
let mut guard = self.state.lock().unwrap();
assert_eq!(*guard, State::Running);
*guard = State::Paused;
self.cv.notify_all();
guard = self.cv.wait_while(guard, |state| *state == State::Paused).unwrap();
assert_eq!(*guard, State::Running);
}
}
全コードの統合
#[test]
fn test_counter() {
arbtest::arbtest(|rng| {
eprintln!("begin trace");
let counter = Counter::default();
let mut counter_model: u32 = 0;
std::thread::scope(|scope| {
let t1 = managed_thread::spawn(scope, &counter);
let t2 = managed_thread::spawn(scope, &counter);
let mut threads = [t1, t2];
while !rng.is_empty() {
for (tid, t) in threads.iter_mut().enumerate() {
if rng.arbitrary()? {
if t.is_paused() {
eprintln!("{tid}: unpause");
t.unpause()
} else {
eprintln!("{tid}: increment");
t.submit(|c| c.increment());
counter_model += 1;
}
}
}
}
for t in threads {
t.join();
}
assert_eq!(counter_model, counter.get());
Ok(())
})
});
}
GN⁺の要約
- この記事は並行データ構造をテストする方法を説明している
- Rustのloomライブラリを使って、原子的でない操作をテストする方法を探っている
- 管理スレッドを使い、並行性の問題を再現可能かつデバッグしやすい形でテストしている
- 並行プログラミングに関心のある開発者にとって有用な内容
- 類似機能を持つプロジェクトとしてJavaのJCStressがある
1件のコメント
Hacker Newsのコメント
RustでTemperというライブラリを開発しており、Rustのメモリモデルの複雑な部分を扱うために多くの努力が必要
Rustで共有メモリのアトミックスナップショットを実装しており、自動テストを非常に重要視している
このアプローチの欠点は、テストコードに合わせてコード自体を修正しなければならない点
JetBrainsのLincheckはKotlin/Javaの世界では優れたライブラリ
C++向けに「Loom」のようなライブラリがあるのか気になる
このアプローチはソフトな進行保証に限界があるかもしれない
cmpxchgループでは、実際のハードウェアとスケジューラ上では中断される可能性は非常に低い実務的な知識が必要で、実際のスレッドを生成しなければならない
ptraceを使ってスレッドをシングルステップ実行し、命令レベルで別のインターリービングを作れる
Loomを使うには条件付きコンパイルを使う必要があり、これはやや侵襲的
Pythonで同じことを行う方法を知りたい