RustのRayonで並列で処理した結果をシーケンシャルに処理したい
やりたいことがちゃんとPlayground上で実現できた。
やりたいこと
ひとことでいえば、rayon::iter::ParallelIteratorをstd::iter::Iteratorに変換したい。
Rayonで並列処理したものシーケンシャルに値を取り出したい。
具体例は以下。簡単のため具体例は10~19の数値を2倍する並列処理プログラム。
code:rs
use rayon::prelude::*;
fn main() {
let par_iter = (10..20).collect::<Vec<i32>>().into_par_iter().map(|x| x * 2);
par_iter.for_each(|x| {
println!("item: {}", x);
});
}
以下の出力のようにfor_eachに順番の保証はない(並列処理してくれているので当たり前の動作)。
例えば32のほうが30より前にある。
code:出力
item: 20
item: 22
item: 24
item: 26
item: 28
item: 34
item: 36
item: 38
item: 32
item: 30
実現したいことは、
.map()は並列で処理されてほしい
mapされた値は元の順番通りに出力したい
つまり、以下のような出力にするのがやりたいこと。20~38は順に並んでいる。
code:期待する出力
item: 20
item: 22
item: 24
item: 26
item: 28
item: 30
item: 32
item: 34
item: 36
item: 38
実現方法
最終的に、以下のように並列処理用のIterator(IndexedParallelIterator)をstd::iterのIteratorに変換できるようになった。
つまり、into_seq_iter(par_iter)とすることでforで使えるstdのIteratorになる。
into_seq_iter()を使えばいいだけなので、ユーザーは楽ちん。
code:rs
// (コードの一部)
fn main() {
let par_iter = (10..20).collect::<Vec<i32>>().into_par_iter().map(|x| x * 2);
for x in into_seq_iter(par_iter) {
println!("item: {}", x);
}
}
Playground
追記:更新版
変更履歴も含め、変化を管理したくなりgitで管理することになった。
下で触れるが、Arc<Mutex<>>があったが、それもなくしたりとかの更新をした。他にも今後パフォーマンスとか使いやすさの観点で変更を加えるかもしれないためgit管理。
全コード
基本的な方針は
IndexedParallelIteratorのenumerateでインデックスを付与
for_each()でインデックスと要素の値をBinaryHeapにpush()
BinaryHeapから現在のインデックスを満たすならSyncSenderでsend()
例えば、現在のインデックスが4なのに、6とか7とかしか処理が終わってないならsend()はしない
この状態で4と5が来たら、溜まっていた6と7もループ’内でsend()される
Receiverのiter()を使ってIteratorを実装する
code:rs
use rayon::prelude::*;
use std::collections::BinaryHeap;
use std::cmp::Ordering;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
fn main() {
let par_iter = (10..20).collect::<Vec<i32>>().into_par_iter().map(|x| x * 2);
for x in into_seq_iter(par_iter) {
println!("item: {}", x);
}
}
struct ReverseTuple< T>(usize, T);
impl< T> PartialEq for ReverseTuple< T> {
fn eq(&self, o: &Self) -> bool { o.0.eq(&self.0) }
}
impl< T> Eq for ReverseTuple< T> {}
impl< T> PartialOrd for ReverseTuple< T> {
fn partial_cmp(&self, o: &Self) -> Option<Ordering> { o.0.partial_cmp(&self.0) }
}
impl< T> Ord for ReverseTuple< T> {
fn cmp(&self, o: &Self) -> Ordering { o.0.cmp(&self.0) }
}
struct IntoSeqIter<I: Sync + Send> {
iter: mpsc::IntoIter<I>
}
impl <I: Sync + Send> Iterator for IntoSeqIter<I> {
type Item = I;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}
fn into_seq_iter<I: Sync + Send + 'static, P: rayon::iter::IndexedParallelIterator<Item=I> + Sync + 'static>(par_iter: P) -> IntoSeqIter<I> {
// TODO: 1 is OK?
let (sender, receiver) = mpsc::sync_channel(1);
let heap = Arc::new(Mutex::new(BinaryHeap::new()));
let idx = Arc::new(Mutex::new(0));
let heap_clone = Arc::clone(&heap);
let index_clone = Arc::clone(&idx);
rayon::spawn( move || {
par_iter.enumerate().for_each(|(i, x)| {
let mut heap_guard = heap_clone.lock().unwrap();
let mut idx_guard = index_clone.lock().unwrap();
heap_guard.push(ReverseTuple(i, x));
loop {
if let Some(i) = heap_guard.peek().map(|r| r.0) {
if i == *idx_guard {
let x = heap_guard.pop().unwrap().1;
sender.send(x).unwrap();
*idx_guard += 1;
} else {
break
}
} else {
break;
}
}
});
});
IntoSeqIter {
iter: receiver.into_iter()
}
}
'staticはrayon::spawn()が要求しているのでつけている
ReverseTupleというのは、インデックスと要素の組で、インデックスが若いほどBinaryHeapで優先順位が高くなるようになっている。
Mutexとかロックされて処理速度とか遅いような気がする。上記の参考にしたリンクだとArcとかMutexとか使ってないから、うまく、やればできるかもしれない。
一応、うまく行ったが、これからバグがないか色々試していきたい。
Rayon 1.1.0を使っている。(ローカルで実験して、うまく行ったものを少し整頓してPlaygroundに上げた感じ)