RustのHyperでBodyの終わりを検出して好きな処理をする
追記
futuresのAPIが更新されてあたらしいfuturesだと以下のコードがコンパイルできない。
上記のページはHyperのBodyに限らず、より一般的にStreamに対しての終わりを検知できる実装を載せていて上記が使える。 hr.icon
コード
以下のFinishDetectableBodyのストリームがあると、Body終了時を検出できる。使用例はこのコードの下。
code:rs
struct FinishDetectableBody {
body: Body,
finish_notifier: Option<oneshot::Sender<()>>,
}
impl futures::stream::Stream for FinishDetectableBody {
type Item = Chunk;
type Error = hyper::Error;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
match self.body.poll() {
// If body is finished
Ok(Async::Ready(None)) => {
// Notify finish
if let Some(notifier) = self.finish_notifier.take() {
notifier.send(()).unwrap();
}
Ok(Async::Ready(None))
},
r@ _ => r
}
}
}
impl FinishDetectableBody {
fn new(body: Body, finish_notifier: oneshot::Sender<()>) -> FinishDetectableBody {
FinishDetectableBody {
body,
finish_notifier: Some(finish_notifier)
}
}
}
そして、Ok(Async::Ready(None))のとき、つまりNoneのときストリームの終了に相当する
そのときに、notifier.send(()).unwrap();でSenderに()を送っている。対応するReceiverを使えば終了が検知できる。
下記の使用例のようにReceiver#then()を使えば終了後の処理が書きやすい
Option<oneshot::Sender<()>>になっているのは、fn poll(&mut self)のselfが&mutだから
.send(self, t: T)でselfを要求するので、それを満たすために、Option#take()している。
FinishDetectableBody::new()で常にSome()でラップして作られるので、絶対if let Some(...は一度通過できる
Ok(Async::Ready(None))は終了時に一回だけ通過するはずなので、もう一度呼ばれることもないし、呼ばれても.sendされる恐れもない
futures::stream::Streamを実装する理由は、Body::wrap_stream(FinishDetectableBody::new(...)がBody型になるから
FinishDetectableBodyの使用例
code:rs
use futures::sync::oneshot;
// (my_body: Bodyをどこかで定義されている状態)
// 終了検出用と終了待ち用のSenderとReceiverを作る
let (finish_notifier, finish_waiter) = oneshot::channel::<()>();
// 終了検知ができるようになったmy_body
let my_body2 = Body::wrap_stream(FinishDetectableBody::new(
my_body, // my_bodyの終了を検知したい
finish_notifier
));
// ... なんでも良い処理 ... //
// my_bodyの終了を待つ
hyper::rt::spawn(finish_waiter.then(move |_| {
// ... ここでmy_bodyが終わったときの処理をする ...
Ok(())
}));
経緯
実際のPiping Server (Hyper) のv0.2.2: