Pen
https://pen-lang.org/introduction/building-the-first-program/
pen build fooしてみた
./.pen/default/
.turtle/
https://github.com/raviqqe/turtle-build
archives/
.aだけがある
objects/
.bcとか.oがある
packages/
pen_ffi/
pen_os/
pen_prelude/
git cloneしてきたもの?
scripts/
*.ninjaがある(生成された?)
CPS変換まわりのコードを読む
https://github.com/pen-lang/pen/blob/8ab68891b348902959612b5ab6a856f8ad5261b7/packages/os/ffi/application/src/main.rs#L17
osパッケージ(非同期版)とos-syncパッケージ(同期版)があるとのこと
非同期版ではtokio-mainから_pen_mainをよぶ
そのあと50ms待っている…
bindgen! https://github.com/pen-lang/pen/blob/8ab68891b348902959612b5ab6a856f8ad5261b7/lib/ffi-macro/src/bindgen.rs
Rust関数をPen側にエクスポートする場合、stackとcontinueを受け取るようにラップされる
AsyncStack https://github.com/pen-lang/pen/blob/8ab68891b348902959612b5ab6a856f8ad5261b7/lib/ffi/src/cps/async_stack.rs
中身のStackはこれ https://github.com/pen-lang/pen/blob/main/lib/ffi/src/cps/stack.rs#L79
push時に自動でreallocする
任意の値をpushできるっぽい
ただpop時にも正しい型を指定しないと壊れる…という感じ?
というか、popで何を取れるか呼び出し側がわかってる前提ということか。
現状のRustがTCOに対応していないためトランポリンが必要とのこと
というのを踏まえて…
まずpollをよぶ
Poll::Readyだった(値がとれた)ならstack.trampolineで次に進む
Poll::Pendingだったら、値がとれるまで待つ必要があるので、stack.suspendを呼ぶ
スタックにfuture, resume, continuationの順で積む
stack.resumeがresumeとcontinuationを取り出す
futureはstack.restoreが取り出す
stack.suspendはスタックにデータをしまうだけなので、永久に復活しないそうに見えるが…?
_pen_yield https://github.com/pen-lang/pen/blob/27b15d64fc5d6b240359b4092bb8ad96fa125114/packages/os/ffi/application/src/concurrency.rs#L12
import!というマクロがある
例:import!(pen_ffi_any_is_boolean, fn(any: BoxAny) -> Boolean);
RustからPenの関数を呼ぶためのものっぽい
pen_ffi_any_is_booleanという名前、fn(any: BoxAny) -> Booleanという型でアクセスできるようになる
fnだけでなくasync fnもいける
その場合、import!はcall_function!(...).awaitに展開される
特に、
code:packages/os/ffi/application/src/main.rs
ffi::import!(_pen_main, async fn() -> ffi::None);
#tokio::main
async fn main() {
_pen_main().await;
...
なのでcall_function!(_pen_main).awaitがtokioスケジューラに渡されるということかな?
call_function!マクロ
おおまかにcore::future::poll_fn(|context| ...).awaitを行う
core::future::poll_fn
まずinitializeで指定された関数を呼ぶ
一度もasyncな機能が使われなければ、stack.resolved_value()で結果の値が取れるのでそれを返して終わり
一方途中でsuspendされた場合は、変数trampolineにメモを残してPoll::Pendingを返す
ここからはtokioチュートリアル見ての想像だけど、折を見てwakerが呼ばれる?tokioスケジューラはそれをたどってこのFutureを再度pollする?
trampolineに残されたメモとは何か
bindgen!マクロにその答えがある
Pen自体にsuspendする機能はないので、suspendが起こるのはいつもRust側
bindgen!は、Rust関数をPen側に提供するためのマクロ
対象がasync fnの場合、まずfuture.as_mut().pollを呼ぶ
Pendingが返った場合、resume関数をstackに積む(これがメモの正体)
Futureが再度pollされた際、resume関数が呼ばれる
やることは最初のpollと同じ(値が取れたら再開、Pendingならもう一度待つ)
どのようにしてループ構造が実現されているのか?
ループになるには、tokioランタイム(スケジューラ)にPoll::Pendingを返すことが必要
call_function!では、stackにpollを行う関数を積むことでそれを実現している
spawn
packages/os/ffi/application/src/concurrency.rs に async fn _pen_spawn(closure: ffi::Closure) -> ffi::Closure がある
tokio::spawnにffi::future::from_closure::<_, ffi::Any>(closure)を渡す
lib/ffi/src/future/from_closure.rs に pub async fn from_closure<T, V>(closure: Closure<T>) -> V がある
これでFuture<ffi::Any>ができる
#Milika で同じようなことをやろうとしたら、poll_fnをまたいでFutureを保持できないよというエラーになった
poll_fn内でSendでないデータを触るとtokio::spawnに渡せなくなるっぽい?
Penはどうしているのか
AsyncStack内にF: Future + Unpinを積んでいることはわかった
これはstack.restore()で取り出される
それを呼ぶのは上記bindgenのresume関数
resume関数はstack.suspend()の第一引数(step)として渡され、stackに保存され、stack.resume()で返却される
stack.resumeの呼び出し箇所は、lib/ffi/src/future/from_closure.rsにあった
てか出てたエラーは dyn Future<Output = u64> cannot be sent between threads safelyだった。
あれ、bindgen!で生成した関数ってvoidなのか。
MilikaはいまFutureを返すようにしているのでそこが違うな(←意図的ではなく、そうなってるもんだと思ってた)
あー、PinとSendは排他だよな。そりゃそうか。
Pinてなんでつけたんだっけ
.pollするのに必要だからか。 https://doc.rust-lang.org/std/future/trait.Future.html
とりあえずPinは受け取り側でやるようにした。
あとはdyn Future<Output = u64> + Unpin cannot be sent between threads safelyか。
やはりFutureはenvの中にしまう必要がありそう。
#Pen の場合は、cps::Stackがunsafeを使って任意の値をしまい込めるようになっている
unsafe使ってenvにFutureを入れてみたが、Pendingしたあと再開されない
https://tokio.rs/tokio/topics/tracing-next-steps これ入れてみたらwaker is lostと表示された
waker is lostとは?
どうやってwaker lostを検知している?
保守的GCみたいなことをするのかと思ったけど、Rustだし、Dropで検知してるのかも
https://github.com/tokio-rs/console/blob/4543901094eec18856f49d8dea96993101a23e8c/tokio-console/src/state/tasks.rs#L443
clone数とdrop数の差だった。
tokioのどこかにログを入れたいなあ
tokio/src/time/sleep.rsをみると、Contextはstd::task::Contextを使ってるみたい
runtime/runtime.rsにblock_on<F: Future>(&self, future: F) -> F::Outputがある
runtime/scheduler/current_thread/mod.rsにblock_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Outputがある
runtime/context/runtime.rs enter_runtime<F, R>(handle: &scheduler::Handle, allow_block_in_place: bool, f: F) -> R where F: FnOnce(&mut BlockingRegionGuard) -> R
blockingの型はBlockingRegionGuard
runtime/context/blocking.rs block_on<F>(&mut self, f: F) -> Result<F::Output, AccessError> where F: std::future::Future
runtime/park.rs CachedParkThread block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError>
お、無限ループっぽいものがある
ここでself.waker()でWakerを作っている?
普通にstd::task::Wakerを使っていることがわかった。
std::task::RawWaker::newに*const ()を渡すことでカスタムのデータを入れている。
tokio-consoleはどうやってwakerのdropを監視している?
tokio-consoleは受信側なのでソースを見てもあんまり意味ない?
違うか、アプリにconsole_subscriber::init();を入れさせるってことはそこに仕掛けがあるのか。
record_wake_opはいつ呼ばれる?
meta.targetが"runtime::waker"か"tokio::task::waker"のもの
後者はtokio/src/runtime/task/waker.rsで発見した。
std::task::RawWakerVTableにdropをフックする機能があるのかあ。
drop_wakerにprintlnを足してみると、確かに呼ばれている。
前者は?
githubで検索しても出てこないなあ
env.push_rust_frame(Pin::into_inner(pinned)); ←ここでdropされていることがわかった。
Box::pinはPin<Box<T>>を返す。Pin::newはPin<T>を返す
Pin::newにしてみても一緒だった。
Pinは、futureを.pollするために必要
あーわかった、push_rust_frameに渡した引数がdropされてるんだ。std::mem::forgetを入れたら再開される(そしてSEGVする)ようになった
Pendingから復帰したあとSEGVする
dbgを入れたらSEGVしなくなる…
mem::forgetじゃないような気がしてきた
意味的には、moveを表現したい
あそうか、unsafe impl Sendという手があった
#他作言語