C++でスレッドセーフなリングバッファを実装してみた
18日目担当のつかさです。
最近 C++ でのマルチスレッドプログラミングの勉強としてリングバッファを実装したので、せっかくなら後で見直せるようにまとめておこう!ということでその記事を書こうと思います。
リングバッファとは
まずリングバッファは何かというと、その名の通り下図のような「リング状のバッファ」のことをいいます。
https://gyazo.com/366a13a72afb4c6ba93d0e72f74520f0
要は輪っかになっているキュー(配列)ですね。ただ実際のキューは直線状なので、
のようにインデックスをバッファサイズで割って正規化する操作をします。こうすることで末端の次のインデックスが先頭にくるようにできます。
また、バッファにデータを出し入れする際は FIFO (First In First Out) という方式を取ります。これは出入りにおいて順序を保証する方式で、データを読み出すときは先頭から取り出し、書き込むときは末尾に挿入します。以降、この読み出し位置(データの先頭)をHead、書き込み位置(空き領域の先頭)をTailと呼びます。
このHeadとTailさえわかっていると、現在のバッファの状態を知ることができます。バッファサイズ$ N=8の場合を例に見てみましょう。下図の一番左が空でも満杯でもない通常状態、真ん中が空状態、右が満杯状態のバッファを表しています。Head = Tailであれば空、Head = (Tail + 1) % Nであれば満杯となり、データの長さも(Head - Tail + N) % Nで計算することができます。ただよく一番右の図を見ると一つ空きがあります。これはすべてデータを入れてしまうとHead = Tailとなってしまい空と満杯の区別がつかなくなるため、N - 1個までしかデータを入れられないように制限をしています。
https://gyazo.com/f42e365cc480030bab7f71451dfd0094
スレッドセーフ
タイトルにもあるように、今回はマルチスレッドでアクセスしても正常に動くスレッドセーフなリングバッファの作成を目指します。スレッドセーフの意味は次の通りです。
スレッドセーフ(Thread-safe)は、マルチスレッドプログラミングにおける概念である。あるコードがスレッドセーフであるという場合、そのコードを複数のスレッドが同時並行的に実行しても問題が発生しないことを意味する。特に、ある共有データへの複数のスレッドによるアクセスがあるとき、一度に1つのスレッドのみがその共有データにアクセスするようにして安全性を確保しなければならない。
引用: フリー百科事典『ウィキペディア(Wikipedia)』スレッドセーフ マルチスレッドの場合、書き込みを行うスレッドと読み出しを行うスレッドの2種類のスレッドがそれぞれ複数存在するような状況です。一般的に書き込みを行うスレッドを Producer (生産者)、読み出しを行うスレッドを Consumer (消費者)と呼び、このような問題を 生産者/消費者問題といいます。 スレッドセーフでないリングバッファに複数の Producer と Consumer が同時アクセスしたときに起きる問題のことをデータ競合といいます。
データ競合
データ競合とは、複数スレッド間で共有する変数(今回であればリングバッファクラス)に対して、同時に、読み/書きアクセスが行われる事象を指す。データ競合が生じると、そのマルチスレッド・プログラムの実行結果は、全く予想がつかない ものとなる。例えばC++では未定義動作を引き起こす。
つまり、リングバッファに複数のスレッドから同時アクセスしてしまうとデータ競合が起こり、入れたはずのデータが入っていなかったり想定していたデータが取り出せないなどの不具合が起きてしまいます。そこで、あるスレッドがリングバッファを操作している間は他のスレッドが割り込めないように制御する必要があります。このような処理のことを排他制御といいます。ただ、排他制御がうまくできていないとデッドロックという新たな問題が生じてしまいます。
デッドロック
2つ以上のスレッドあるいはプロセスなどの処理単位が互いの処理終了を待ち、結果としてどの処理も先に進めなくなってしまうことを言う。
引用: フリー百科事典『ウィキペディア(Wikipedia)』デッドロック 例えば、バッファが満杯になって書き込みを行うスレッドが待機している間に、読み出しを行うスレッドがすべてのデータを取り出してしまい、さらに取り出すことができなくなって両方のスレッドが待機状態になるといったイメージです。
リングバッファクラスの作成
シングルスレッド版
リングバッファの説明は疲れてきたので、いよいよ実装のお話に移ります。とはいってもいきなりマルチスレッドでも動くように書くのは簡単ではないので、ひとまずシングルスレッドであればデータの読み書きができるリングバッファクラスを作成しました。データの型は、数値でも文字列でも自作クラスでもいいようにクラステンプレートで作成しています。
code: ringbuffer_single.h
namespace ringbuffer
{
template <typename T>
class RingBuffer
{
private:
T *buffer;
int buffer_size, head, tail, data_length;
int GetDataLength(); //データサイズを計算する関数
public:
RingBuffer(unsigned int size); //コンストラクタ(バッファの初期化)
~RingBuffer(); //デストラクタ
void Push(const T *push_data, const int push_size); //書き込み関数
std::shared_ptr<T> Pop(const int pop_size); //読み出し関数
};
template <typename T>
RingBuffer<T>::RingBuffer(unsigned int size) //コンストラクタ(バッファの初期化)
{
head = tail = data_length = 0;
buffer_size = size;
}
template <typename T>
RingBuffer<T>::~RingBuffer() //デストラクタ
{
delete buffer;
}
template <typename T>
int RingBuffer<T>::GetDataLength() //データサイズを計算する関数
{
if (tail >= head)
return tail - head;
else
return tail - head + buffer_size;
}
template <typename T>
void RingBuffer<T>::Push(const T *push_data, const int push_size) //書き込み関数
{
if (buffer_size - data_length > push_size) //書き込むデータサイズが空き容量より小さければ書き込む
{
for (int i = 0; i < push_size; i++)
{
*(buffer + ((tail + i) % buffer_size)) = *(push_data++); //データの書き込み
tail = (tail + 1) % buffer_size; //インデックスをずらす
}
data_length = GetDataLength(); //データサイズ更新
}
}
template <typename T>
std::shared_ptr<T> RingBuffer<T>::Pop(const int pop_size) //読み出し関数
{
std::shared_ptr<T> pop_data(new Tpop_size, std::default_delete<T[]>()); //読み出したデータを格納する配列 if (data_length >= pop_size) //読み出すデータサイズがバッファ内のデータサイズ以下なら読み出す
{
for (int i = 0; i < push_size; i++)
{
pop_data.get()i = bufferhead; //データの読み出し head = (head + i) % buffer_size; //インデックスをずらす
}
data_length = GetDataLength(); //データサイズ更新
}
return pop_data;
}
下の方に定義されている Push 関数( void RingBuffer<T>::Push(const T* push_data, const int push_size))がデータpush_dataをpush_size分バッファに書き込む関数、Pop 関数(std::shared_ptr<T> RingBuffer<T>::Pop(const int pop_size))がpop_size分データを読み出す関数になります。ここでは1つのスレッドが操作するだけなので、バッファが Push (Pop) 可能な状態ならならデータサイズ分 Push (Pop) して、できないなら何もしないという非常にシンプルな実装になっています。
ちなみに Pop 関数の返り値の型になっている std::shared_ptr はスマートポインタというポインタの一種で、読み込んだデータを格納する pop_data を関数内で new してメモリ確保していますが、返り値として返却後に参照されなくなったら自動的にメモリを解放してくれる特別なポインタです。 このクラスをマルチスレッドでアクセスすると当然データ長の計算が途中でおかしくなったりしてデータ競合を起こしてしまうので、Push 関数と Pop 関数内で排他制御を行う必要があります。そのようなリングバッファクラスの仕様決めから実装までを次に説明します。
スレッドセーフなリングバッファクラスの作成
仕様決め
さて、いよいよ本題の「スレッドセーフなリングバッファ」の実装に入ります。Producer スレッドも Consumer スレッドは複数存在し、各スレッドは一度に複数個のデータを書き込み、読み出しできるとします。イメージ図としてはこんな感じです。
https://gyazo.com/2d60e122f4d83aa5a5d24584cdcab1fb
ただスレッドセーフと一言で言っても実装方法はさまざまなので、もう少し明確に仕様を定義しておきます。
1. ある Producer スレッドが書き込みを開始したら、終了するまで他の Producer スレッドは割り込めない(Consumer 同士も同様に割り込み禁止)
2. Push 時にバッファが満杯になったら、次に発生する Consumer スレッドが終了するか空になるまで待機する (バッファ内のデータを上書いたり、待機中のデータを破棄したりしない)
3. 2. と同様にPop 時にバッファが空になったら、次に発生する Producer スレッドが終了するかバッファが満杯になるまで待機する
4. 書き込みと読み出しは同時に行わない
5. 待機しているスレッドの順番は保証しない
ミューテックスの使い方としては、あるミューテックス変数 mtx がスレッド間で共有されているときstd::lock_guard<std::mutex> lock(mtx);のように書くと、そのスコープ内を実行中は他のスレッドに邪魔されないように排他制御することができます。また、他のスレッドが同じミューテックスのロック中にロックしようとしても、実行中のスレッドが終了するまで待機することになります。
条件変数は、ミューテックスとミューテックスで保護されるステート(今回でいうバッファが満杯か空かそうでないかといった状態)の組で利用され、"ステートの更新を他スレッドに通知"/"ステートが指定条件を満たすまで待機する"といった処理を行うためのオブジェクトです。したがって条件変数を使えば、ロック中でもバッファが満杯や空になったら待機したり、スレッドが終了したことを通知することができるようになります。
4. 5. は簡単のために今回は制限事項として設けました。4. についてはatomic変数を利用するなどすればで書き込みと読み出しを同時に行えると思いますし、5. についてもミューテックスのスケジューリングを定義し直したクラスを自前で作成すれば順序保証できるはずなので、暇なときに取り組んでみようと思います。 設計
一例として、1. Producer が書き込み中に新たに Producer が発生したときの挙動と 2. Producer が書き込み中にバッファが満杯になって待機してから再開するまでのシーケンス図を示しています。
まず、1. のシーケンス図ですが、バッファには十分に空き容量があって、なおかつこの間に Consumer スレッドが発生していない前提とします。一番右のオブジェクトmtx_pushは二つの Producer スレッド (push) から共有されていて、最初の Producer スレッドがミューテックスのロックを解除するまで、後続の Producer スレッドは待機していることがわかります。
https://gyazo.com/246c9018207753c9e05e66d449753808
つづいて 2. のシーケンス図です。少し複雑ですが、cv_emptyとcv_fullという空状態と満杯状態のステートに対応する条件変数が登場しており、バッファが満杯になったらcv_fullが待機関数を呼び出したり、Consumer スレッドの終了時にnotify_all()で待機中の Producer スレッドに満杯ではないという通知を送っている様子を表しています。
https://gyazo.com/42baf530479a4ef336425fb5047d5881
実装
最後に設計通りに実装した最終コードを載せて終わりにします。実際にデッドロックした失敗例などをのせるとおもしろいと思ったのですが、余裕がなかったので書けませんでした。。。また、mutexやcondition_variableの詳しい使い方についてはググっていただいた方がわかりやすいと思うので、ここでは詳しく述べません。コード自体はコメントアウトを追ってもらえれば読めるのではないかと思います。
あと実装してみて学んだ点を以下に挙げておきます。
1つの条件変数オブジェクトに対して、複数個のミューテックスを関連付ける事は出来ない。
通知関数は極力 notify_all を使う (notify_one はライブロックという互いに処理が進まない状態に陥る可能性がある)。
待機関数 wait()は第2引数でステート変数を参照したラムダ式を書くことで、 Spurious Wakeup という勝手に待機状態を終了する現象を防ぐことができる。
最終コード
code: ringbuffer.h
namespace ringbuffer
{
template <typename T>
class RingBuffer
{
private:
T *buffer;
int buffer_size;
int data_length, head, tail;
std::mutex mtx_push, mtx_pop, mtx;
std::condition_variable cv_full, cv_empty;
int GetDataLength();
public:
explicit RingBuffer(unsigned int size);
~RingBuffer();
void Push(const T *push_data, const int push_size);
std::shared_ptr<T> Pop(const int pop_size);
};
template <typename T>
RingBuffer<T>::RingBuffer(unsigned int size)
{
head = tail = data_length = 0;
buffer_size = size;
}
template <typename T>
RingBuffer<T>::~RingBuffer()
{
delete[] buffer;
}
template <typename T>
int RingBuffer<T>::GetDataLength()
{
if (tail >= head)
return tail - head;
else
return tail - head + buffer_size;
}
template <typename T>
void RingBuffer<T>::Push(const T *push_data, const int push_size)
{
std::unique_lock<std::mutex> lk_push(mtx_push); //まず Producer の共通ミューテックス mtx_push をロック
std::unique_lock<std::mutex> lk_full(mtx); ////つづいて同時に読み出しされないように mtx をロック
int tmp_size = buffer_size - data_length - 1; //空き容量の計算
for (int i = 0; i < push_size; i++) //push_size 分 push 開始
{
if (tmp_size-- == 0) //満杯かどうかの判定
{
data_length = GetDataLength(); //データ長の更新
cv_empty.notify_all(); //満杯である(空ではない)ことを待機中の Consumer スレッドに通知
cv_full.wait(lk_full, & { return 1 < (buffer_size - data_length); }); //空きができるまで待機 tmp_size = buffer_size - data_length - 2;//空き容量の計算(直後に1つ書き込むので先に -1 している点に注意)
}
buffertail = *(push_data++); tail = (tail + 1) % buffer_size;
}
data_length = GetDataLength();
cv_empty.notify_all(); //書き込み終了を待機中の Consumer スレッドに通知
}
template <typename T>
std::shared_ptr<T> RingBuffer<T>::Pop(const int pop_size)
{
std::shared_ptr<T> pop_data(new Tpop_size, std::default_delete<T[]>()); std::unique_lock<std::mutex> lk_pop(mtx_pop); //まず Consumer の共通ミューテックス mtx_pop をロック
std::unique_lock<std::mutex> lk_empty(mtx); //つづいて同時に書き込みされないように mtx をロック
int tmp_size = data_length; //現在のデータ長をコピー
for (int i = 0; i < pop_size; i++) //pop_size 分 pop 開始
{
if (tmp_size-- == 0) //空かどうかの判定
{
data_length = GetDataLength(); //データ長の更新
cv_full.notify_all(); //空である(満杯ではない)ことを待機中の Consumer スレッドに通知
cv_empty.wait(lk_empty, & { return data_length > 0; }); //データが追加されるまで待機 tmp_size = data_length - 1; //データ長をコピー(直後に1つ読み出すので先に -1 している点に注意)
}
pop_data.get()i = bufferhead; head = (head + 1) % buffer_size;
}
data_length = GetDataLength();
cv_full.notify_all(); //読み出し終了を Producer スレッドに通知
return pop_data;
}
} // namespace ringbuffer
参考文献