簡略化してEffectのScheduleの内部設計を理解する
簡略化してSchedule (effect)の内部設計を理解する
実装として良いポイントmrsekut.icon
Strategyパターンで実装されている
そのため、下記のような密結合した実装にはなっていない
code:typescript
function retry(fn, schedule) {
switch (schedule.type) {
case "exponential":
// exponential固有のロジック
break
case "fixed":
// fixed固有のロジック
break
case "fibonacci":
// fibonacci固有のロジック
break
// 新しいScheduleを追加するたびにここを変更...
}
}
Schedule自体がcomposableになっている
mapやintersectionが用意されており、合成してScheduleを構築できる
Claude Code.iconに簡略化版を書いてもらったので、本実装も参照しつつ理解する
Scheduleというinterfaceが用意されており、これで状態遷移を表現できる ref
code:sample.ts
/** スケジュールの決定: 続行するか終了するか */
type Decision = { tag: "Continue"; delayMs: number } | { tag: "Done" }
/** スケジュール: 状態遷移関数として抽象化 */
interface Schedule<State, Output> {
/** 初期状態 */
initial: State
/** 状態遷移関数: 現在時刻と状態から、新状態・出力・決定を返す */
step(now: number, state: State): newState: State, output: Output, decision: Decision
}
実行側は
1. stepを呼ぶ
2. 返ってきたDecisionがContinueかDoneかだけを見る
3. Continueなら指定された時間待って再実行
この最小のinterface、マジでかしこですねmrsekut.icon
基本的なScheule関数は、イメージ、こんな感じで実装される
だいたい、packages/effect/src/internal/schedule.tsにある
Scheduleを実装している
code:sample.ts
/** 無限に続くスケジュール(0, 1, 2, 3, ...とカウント) */
function forever(): Schedule<number, number> {
return {
initial: 0,
step(now, state) {
return state + 1, state, { tag: "Continue", delayMs: 0 }
}
}
}
/** N回だけ続くスケジュール */
function recurs(n: number): Schedule<number, number> {
return {
initial: 0,
step(now, state) {
if (state >= n) {
return state, state, { tag: "Done" }
}
return state + 1, state, { tag: "Continue", delayMs: 0 }
}
}
}
/** 固定間隔のスケジュール */
function fixed(delayMs: number): Schedule<number, number> {
return {
initial: 0,
step(now, state) {
return state + 1, state, { tag: "Continue", delayMs }
}
}
}
実際はこんな感じだったりしておしゃれ
code:ts
const forever: Schedule.Schedule<number> = unfold(0, (n) => n + 1)
ScheduleDriver
https://github.com/Effect-TS/effect/blob/0d1a44fa142c0da25fe36a1ac35675f666944803/packages/effect/src/internal/schedule.ts#L131-L202
Scheduleのみに依存し、良い感じに状態遷移を実行する
code:sample.ts
/** Scheduleを駆動するドライバー */
class ScheduleDriver<S, O> {
private state: S
constructor(private schedule: Schedule<S, O>) {
this.state = schedule.initial
}
/** 次のステップを実行。Continueなら出力を返し、Doneならnullを返す */
next(): { output: O; delayMs: number } | null {
const now = Date.now()
const newState, output, decision = this.schedule.step(now, this.state)
this.state = newState
if (decision.tag === "Done") {
return null
}
return { output, delayMs: decision.delayMs }
}
}
retry関数
ScheduleとScheduleDriverのみに依存してリトライ処理をする
code:sample.ts
async function retry<T, S, O>(fn: () => Promise<T>, schedule: Schedule<S, O>): Promise<T> {
const driver = new ScheduleDriver(schedule)
while (true) {
try {
return await fn()
} catch (error) {
const next = driver.next()
if (next === null) {
// スケジュール終了、最後のエラーを投げる
throw error
}
console.log(Retry in ${next.delayMs}ms (attempt output: ${next.output}))
if (next.delayMs > 0) {
await sleep(next.delayMs)
}
}
}
}
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
メイン処理
code:sample.ts
async function main() {
let attempt = 0
// 失敗する関数(3回目で成功)
const unstableOperation = async () => {
attempt++
console.log(Attempt ${attempt})
if (attempt < 3) {
throw new Error("Failed")
}
return "Success!"
}
// 指数バックオフでリトライ(最大5回)
// retry関数は exponentialWithLimit の内部実装を知らない
const result = await retry(unstableOperation, exponentialWithLimit(100, 5))
console.log(result)
}
main()
Scheduleはcomposableである
mapなどの合成用の関数が用意されている
code:sample.ts
/** Scheduleの出力を変換 */
function map<S, A, B>(schedule: Schedule<S, A>, f: (a: A) => B): Schedule<S, B> {
return {
initial: schedule.initial,
step(now, state) {
const newState, output, decision = schedule.step(now, state)
return newState, f(output), decision
}
}
}
/** Scheduleに遅延を追加(出力値を遅延として使用) */
function delayed<S>(schedule: Schedule<S, number>): Schedule<S, number> {
return {
initial: schedule.initial,
step(now, state) {
const newState, output, decision = schedule.step(now, state)
if (decision.tag === "Done") {
return newState, output, decision
}
// 出力値をdelayMsとして使用
return newState, output, { tag: "Continue", delayMs: output }
}
}
}
/** 2つのScheduleを組み合わせる(両方Continueなら続行) */
function intersect<S1, S2, A, B>(
s1: Schedule<S1, A>,
s2: Schedule<S2, B>
): Schedule<S1, S2, A, B> {
return {
initial: s1.initial, s2.initial,
step(now, state1, state2) {
const newState1, out1, dec1 = s1.step(now, state1)
const newState2, out2, dec2 = s2.step(now, state2)
if (dec1.tag === "Done" || dec2.tag === "Done") {
return [newState1, newState2, out1, out2, { tag: "Done" }]
}
// 両方Continueなら、より長い方の遅延を採用
const delayMs = Math.max(dec1.delayMs, dec2.delayMs)
return [newState1, newState2, out1, out2, { tag: "Continue", delayMs }]
}
}
}
mapなどを使って合成して新たなScheduleを作れる
code:sample.ts
/** 指数バックオフ: base * factor^i */
function exponential(baseMs: number, factor: number = 2): Schedule<number, number> {
// forever() の出力を exponential な遅延に変換
return delayed(map(forever(), (i) => baseMs * Math.pow(factor, i)))
}
/** 指数バックオフ + 最大N回 */
function exponentialWithLimit(
baseMs: number,
maxRetries: number,
factor: number = 2
): Schedule<number, number, number, number> {
return intersect(exponential(baseMs, factor), recurs(maxRetries))
}
/** フィボナッチ遅延 */
function fibonacci(baseMs: number): Schedule<number, number, number> {
return {
initial: 0, 1,
step(now, a, b) {
const delay = a * baseMs
return [b, a + b, delay, { tag: "Continue", delayMs: delay }]
}
}
}
実行結果の例
code:_
Attempt 1
Retry in 100ms (attempt output: 0,0)
Attempt 2
Retry in 200ms (attempt output: 1,1)
Attempt 3
Success!