RustでStreamの終わりを検知できるStreamを作る
やりたいこと
最終的な使い方例
code:rs
// (Stream, oneshot::Receiver<()>)を返す
let (my_stream2, finish_receiver) = finish_detectable_stream(
my_stream // なんらかのStream
);
上記の型はそれぞれ、
finish_receiverはoneshot::Receiver<()>
終了を検知して任意の処理をしたいときは、
finish_receiver.await.unwrap()などとして待たせて、そのあとに好きな処理を書いたりできる。
つまりoneshot::Receiver<()>もFutureの一種なので.awaitが使える。
finish_detectable_stream()の実装
以下が実装。
code:rs
use futures::channel::oneshot;
use futures::task::{Context, Poll};
use std::pin::Pin;
pub struct FinishDetectableStream<S> {
stream_pin: Pin<Box<S>>,
finish_notifier: Option<oneshot::Sender<()>>,
}
impl<S: futures::stream::Stream> futures::stream::Stream for FinishDetectableStream<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.stream_pin.as_mut().poll_next(cx) {
// If body is finished
Poll::Ready(None) => {
// Notify finish
if let Some(notifier) = self.finish_notifier.take() {
notifier.send(()).unwrap();
}
Poll::Ready(None)
}
poll => poll
}
}
}
pub fn finish_detectable_stream<S>(stream: S) -> (FinishDetectableStream<S>, oneshot::Receiver<()>) {
let (finish_notifier, finish_waiter) = oneshot::channel::<()>();
(
FinishDetectableStream {
stream_pin: Box::pin(stream),
finish_notifier: Some(finish_notifier),
},
finish_waiter
)
}