fs2.Pull#compile 勉強会 その5
今回やりたいこと
fs2.Stream の API をざっと学んで Stream への簡易的な直感を生やす
お手元に scala-cli をご用意ください
今回紹介するコードスニペットは、以下のようなコマンドで scala-cli を立ち上げることで、REPL にそのままコピー&ペーストして動作を確認できます。
code:bash
scala-cli repl --dep co.fs2::fs2-core:3.11.0
以下の import を貼り付けておけば、追加の import は不要 (なはず) です。
code:scala
import cats.effect.*
import fs2.*
import cats.effect.unsafe.implicits.global
fs2.Pull / fs2.Stream に持っておくべきメンタルモデル
まず、 fs2.Stream[F[_], O] (直観的に言えば、F は Stream を「流す」ために起こさなければならない作用の型 (IO とか StateT[IO, S, _] とか)、O は Stream の中を「流れてくる」要素の型) は、次のように 定義 されています: code:Scala
final class Stream[+F_, +O] privatefs2 (privatefs2 val underlying: PullF, O, Unit) { ... } なので、Pull[F, O, R] が何なのかが分かっていれば、fs2.Stream[F, O] が何なのかも分かるはずです。
fs2.Pull#compile 勉強会 その1 で軽く触れた気がしますが、Pull[F, O, R] は、IO がそうであったように、「(Pull という名前の) とあるプログラミング言語」で書かれたプログラムの AST (というか、「プログラムそのもの」) になるように GADT として定義されています (どういった構文要素があるのか、というのは詳しくは次回に見ます。sealed trait の定義はここ) にあります。IO に比べるとだいぶ複雑な継承関係をしています)。 直感的な話:
Pull[F, O, R] というのは、F の文脈で Chunk[O] を polling して、最終的に R をひとつ吐いて終了するようなプログラム
Chunk[O] が「横からチョロチョロ流れ出てきた後に R で終わる」みたいな感じ https://scrapbox.io/files/6748585b73e082e7392145c5.png
「F の作用」無しに Chunk[O] を吐き出すような構文要素: Output code:Scala
「F で書かれたプログラムを走らせて Pull の中間結果とする」という構文要素: Eval code:Scala
private final case class Eval[+F_, R](value: FR) extends AlgEffectF, R モナディックな合成をしている中間ノード (の族): Bind https://scrapbox.io/files/6748596142422a3a733c341e.png
code:Scala
private abstract class Bind[+F_, +O, X, +R](val step: PullF, O, X) }
Pull[F, O, R] の R は、Chunk[O] の「流れ」が欲しい人からすれば、ただの内部状態になっている
上で描いた Bind を見てもらえばわかるが、A の値というのは、結局次に何をすればよいかを決めるためにしか使われておらず、後続のプログラムがどう Chunk[O] を吐き出してくるのか、というのは A => Pull[F, O, B] の結果値であるプログラムが全部決めることであるから、Chunk[O] が欲しい人はA や B には興味がない
なので、 Stream[F, O] = Pull[F, O, Unit] なわけです
windymelt.icon Akka Streamにも似たような構造があるンゴねぇ(ここで言うところのRをMaterialized Valueとか読んでた気がする)
じゃあ Stream とは Pull だ!で終わりなのかというと、あんまりそうでもない
Stream を取り扱っているときというのは、 Stream のことを「polling するプログラム」だとは思わない。
Stream[F, O] というのは、あくまで O の「流れ」そのものを表しているといったメンタルモデルで扱う場合がほとんど
これは、ちょうど、「IO[A] の式を書いている時に、『あぁ、今僕は A の値で終わるプログラムを書くプログラムを書いているんだなぁ』等とは考えず、『この処理が終わったらこういう結果が得られるはずだから、次にこれをやって、最終結果をこれにするよなぁ』ぐらいで考える、もっと言えば、「A を生み出す作用そのもの」を取り扱っているようなメンタルモデルでプログラミングすることが多い」のとパラレルに考えられる
windymelt.icon われわれも「あ〜レジスタがぴょんぴょんするな〜」「vtableが動いているな〜」と思いながらプログラミングしてないですね
蓋を開けないといけないときもあるが、ふだんは蓋を開けない
サンプルプログラムを書く前に…
s: Stream[IO, A] が与えられたとき、s.compile.toVector.unsafeRunSync() という呪文があります
これを分解すると…
ioa: IO[A] が与えられたとき、ioa.unsafeRunSync(): A という呪文があって、これは ioa を「実際に実行する」
code:Scala
scala> IO { println("ABC"); 123 } .unsafeRunSync()
ABC
val res3: Int = 123
s: Stream[IO, A] が与えられたとき、s.compile.toVector: IO[Vector[A]] があって、これは s を「polling することで一番最後まで読みつくす (結果を Vector に集める)」というプログラムに変換する
code:Scala
scala> (Stream(1, 2, 3) : StreamIO, Int).compile.toVector val res5: cats.effect.IO[VectorInt] = IO(...) scala> (Stream(1, 2, 3) : StreamIO, Int).compile.toVector.unsafeRunSync() val res6: VectorInt = Vector(1, 2, 3) Rust でいう Iter の .collect::<Vec<_>>()
こちらは副作用をともないながらVecに集めるが・・・
fs2の場合は副作用をともないながらVectorに集めるようなプログラムを生成する
「作用が起こらないことが分かっている」Stream については、 .compile.toVector で F[A] ではなく A がそのまま出てくる
内部的には [x] =>> fs2.Pure[x] という高階型が使われている
サンプルプログラム
一番簡単な Stream: .empty
code:Scala
scala> Stream.empty.compile.toVector
いくつかの要素をそのまま (副作用無しに) 流してくる Stream (.apply)
code:Scala
scala> Stream.apply(1, 2, 3).compile.toVector
val res1: VectorInt = Vector(1, 2, 3) flatMap
List の flatMap を思い出してほしい
code:Scala
scala> List("A", "B", "C", "D").flatMap(s => List(s + "1", s + "2"))
val res16: ListString = List(A1, A2, B1, B2, C1, C2, D1, D2) windymelt.icon 雑談: 自分はよくList("p", "n", "m")からpanamaを生成しがち
code:scala
scala> List("p", "n", "m", "b", "n", "n").flatMap(x => List(x, "a"))
val res9: ListString = List(p, a, n, a, m, a, b, a, n, a, n, a) これは .map(...).flatten と同じである
code:Scala
scala> List("A", "B", "C", "D").map(s => List(s + "1", s + "2"))
val res18: List[ListString] = List(List(A1, A2), List(B1, B2), List(C1, C2), List(D1, D2)) scala> List("A", "B", "C", "D").map(s => List(s + "1", s + "2")).flatten
val res17: ListString = List(A1, A2, B1, B2, C1, C2, D1, D2) Stream#flatMap は、 Stream[F, O] を「O の流れ」だと考えたときに、List#flatMap のような挙動をする
code:Scala
scala> Stream("A", "B", "C", "D").flatMap(s => Stream(s + "1", s + "2")).compile.toVector
val res2: VectorString = Vector(A1, A2, B1, B2, C1, C2, D1, D2) Pull の観点から言えば、これはつまり、こういう Pull を作っていることになる
https://scrapbox.io/files/67486470c539cacda73444bb.png
windymelt.icon Streamはそもそも遅延評価だと考えるとなるほど〜という気持ちになる
「流れ」を連結する (++ / append)
code:Scala
scala> (Stream("A", "B", "C", "D") ++ Stream("E", "F")).compile.toVector
val res3: VectorString = Vector(A, B, C, D, E, F) IO を「流れ」の中に埋める (.exec)
code:Scala
scala> Stream.exec(IO { println("side-effect!") }).compile.toVector.unsafeRunSync()
side-effect!
「いくつかの要素をそのまま (副作用無しに) 流してくる Stream」の間に挟むことも当然できる
code:Scala
scala> (Stream(1, 2, 3) ++
Stream.exec(IO { println("side-effect!") }) ++
Stream(4, 5, 6)).compile.toVector.unsafeRunSync()
side-effect!
val res12: VectorInt = Vector(1, 2, 3, 4, 5, 6) Chunk を「傍受」(intercept)する (.mapChunks)
code:Scala
scala> (Stream(1, 2, 3) ++
Stream.exec(IO { println("side-effect!") }) ++
Stream(4, 5, 6))
.mapChunks(c => Chunk(c))
.compile.toVector.unsafeRunSync()
side-effect!
val res13: Vector[fs2.ChunkInt] = Vector(Chunk(1, 2, 3), Chunk(4, 5, 6)) .mapChunks(c => Chunk(c)) は、Stream に流れてきた chunk c を、それを含むような singleton chunk として Stream に流しなおすので、 .compile.toVector では普通見えない Chunk 構造が見える…
Kory.icon: empty chunk が本当に流れていないか (.exec が Chunk を流していないか) は最適化されて見えなくなってるだけかも
無限に繰り返すストリーム (.repeat)
code:Scala
scala> Stream(1, 2, 3).repeat.compile.toVector
(これは、無限に 1, 2, 3, 1, 2, 3, ... と続く Stream を読み切ろうとするので、有限時間では帰ってこない)
先頭 n 要素を取る (.take(n)) と組み合わせると、n 個の要素が生まれるまでの繰り返しができる
code:Scala
scala> Stream(1, 2, 3).repeat.take(10).compile.toVector
val res0: VectorInt = Vector(1, 2, 3, 1, 2, 3, 1, 2, 3, 1) code:Scala
scala> (Stream.exec(IO.println("A")) ++ Stream(1, 2, 3) ++ Stream.exec(IO.println("B"))).repeat.take(2).compile.toVector.unsafeRunSync()
A
val res2: VectorInt = Vector(1, 2) 副作用が埋め込まれた stream を .repeat すると、.foreverM みたいな挙動になる
code:Scala
scala> Stream.exec(IO.println("y")).repeat.compile.toVector.unsafeRunSync()
y
y
y
...
リソースを確保して、その確保された値を Stream に流して、その後すぐ開放する (.bracket)
code:Scala
def bracket[F_, R](acquire: FR)(release: R => FUnit): StreamF, R 型シグネチャからある程度読み取れるが、これは、 acquire によって R の値 (「リソース」、もしくは「リソースハンドラ」に見立てられる) をひとつ「確保」して、それを Stream 内の要素として「流し」て、その後すぐに release によって R 値を「開放」する (「ハンドラを close する」) ような Stream を作る。
code:Scala
scala> (Stream.bracket(IO.pure(42))(num => IO.println(s"releasing ${num}")) ++
| Stream.exec(IO.println("next effect"))).compile.toVector.unsafeRunSync()
releasing 42
next effect
val res1: VectorInt = Vector(42) これだけだと、singleton stream ができるだけで役に立たないように思えるが、 flatMap と組み合わせて、.flatMap に渡す関数の中で R 値を利用する Stream を作るようなことをすれば、確保→利用→解放の流れを保証したリソース利用というのを表現できる。(↓に書いた例は toy example で、Int を「確保している」のであんまりリソース感がないが、実際には FileInputStream とか、ネットワークへのコネクションとかを acquire / release したりできる)
code:Scala
scala> (
| Stream.bracket(IO.pure(42))(num => IO.println(s"releasing ${num}")).flatMap(n =>
| Stream(n) ++ Stream.exec(IO.println(s"emitted $n"))
| ) ++ Stream.exec(IO.println("next effect"))
| ).compile.toVector.unsafeRunSync()
emitted 42
releasing 42
next effect
val res3: VectorInt = Vector(42) bracketWeak はこれに似た関数で、 release を「Stream 全体が読み切られるまで」遅延するような Stream を作る (遅延タイミングは ++ するごとに後ろに伸びていく)
code:Scala
scala> (Stream.bracketWeak(IO.pure(42))(num => IO.println(s"releasing ${num}")) ++
| Stream.exec(IO.println("next effect"))).compile.toVector.unsafeRunSync()
next effect
releasing 42
val res0: VectorInt = Vector(42) .scope を呼んでおくと、「これ以上先に (直近に開かれたスコープ内で確保されたリソースの) 解放処理を遅延しない」というマークを付けることができる
code:Scala
scala> (Stream.bracketWeak(IO.pure(42))(num => IO.println(s"releasing ${num}")).scope ++
| Stream.exec(IO.println("next effect"))).compile.toVector.unsafeRunSync()
releasing 42
next effect
val res2: VectorInt = Vector(42) windymelt.icon 実用上これ(bracketWeak)どんなのに使うんでしょうね
単純かつ実用的な例を出すのは結構難しいんですが、例えば
「処理すべきデータが複数流れてくるので、それらを適切に複数のファイルに分散しながら書き込んでいきたい」
「書き込み先のファイルへのハンドルは、毎回閉じたり開いたりしたくないが、ストリームが終わってしまうときには全部閉じなければならない」
みたいな微妙な要求があるときに便利(というか、使わざるを得ない)です
直感的には、「局所的にはわざと leak させたいのだが、Stream 全域からは leak すべきでない」ようなリソースを扱う必要があるときに使う(使わざるを得ない)