Effectを用いたProducer-Consumerパターンの例
Webhook の配送処理システムを簡易的に再現したサンプルを書いている
Webhook (外部URL) にイベントを送る
送信が失敗することもある
なので、リトライする
処理は複数の worker で同時に行う
全体のイメージ図
code:_
┌────────────┐
│
▼
retry失敗時
│
▼
deadQ ──▶ deadLetterMonitor
code:index.ts
import {
Effect,
Queue,
Fiber,
Schedule,
Duration,
Console,
Random,
Chunk,
pipe,
Array,
} from 'effect';
メイン処理
code:index.ts
const program = Effect.gen(function* () {
// 通常の配送タスクが流れるメインキュー
const mainQ = yield* Queue.bounded<WebhookTask>(64); // bounded: back-pressure を効かせる
// 処理失敗したものを保管するデッドレターキュー
const deadQ = yield* Queue.unbounded<WebhookTask>();
code:index.ts
// 並列で4つのワーカーを起動
const workers = yield* Effect.all(
1, 2, 3, 4.map(i => Effect.fork(worker(i, mainQ, deadQ))), );
code:index.ts
// dead-letter監視 (何度リトライしても失敗するものを保管して、後で手動で処理する)
const dlFiber = yield* Effect.fork(deadLetterMonitor(deadQ));
実運用なら、通知やDB記録に回す
code:index.ts
// 100件のタスクを投入
yield* producer(mainQ, 100);
Webhook配送タスクを100件作って mainQ に入れる
そのため、ワーカーの処理速度以上には積まれない
code:index.ts
// しばらく処理を待つ(デモ用)
yield* Effect.sleep(Duration.seconds(5));
// ここではデモとしてシャットダウン(実際は「キューが空 && 実行中なし」を待つのが定石)
yield* Console.log('🛑 shutting down queues...');
yield* Queue.shutdown(mainQ);
yield* Queue.shutdown(deadQ);
それにより 無限ループが止まる
code:index.ts
// Fiber終了待ち(shutdownにより take/offer 待機は中断される)
yield* Effect.all(workers.map(Fiber.interrupt));
yield* Fiber.interrupt(dlFiber);
yield* Console.log('👋 done');
});
code:index.ts
// 実行
Effect.runPromise(program).catch(err => {
console.error(err);
process.exit(1);
});
code:index.ts
// == ドメイン定義
type WebhookTask = {
id: number;
url: string;
payload: Record<string, unknown>;
};
デモとして3割失敗するようにする
code:index.ts
// 疑似配送: 成功/失敗をランダムにする
function deliver(task: WebhookTask) {
return Effect.gen(function* () {
// 30% くらいは失敗させる
const n = yield* Random.nextIntBetween(1, 10);
if (n <= 3) {
yield* Console.warn(❌ id=${task.id}: delivery failed);
return yield* Effect.fail(new Error('network error'));
}
yield* Console.log(✅ id=${task.id}: delivered);
return task.id;
});
}
code:index.ts
// リトライ方針: 指数バックオフ + 最大3回(合計4トライ)
const retryPolicy = pipe(
Schedule.exponential(Duration.millis(200)),
Schedule.compose(Schedule.recurs(3)),
);
worker
inQから取り出して処理する、という無限ループ
3回リトライしても失敗するものは、deadLetterQに入れる
これは、DLQというもので、何回やっても失敗するやつを隔離しておいて、後で手動で作業するなどする code:index.ts
// == ワーカーロジック
// タスクを取り出して配送。失敗をリトライし、全滅したらデッドレターへ。
function worker(
id: number,
inQ: Queue.Dequeue<WebhookTask>, // 取り出しのみ (タスクを取得する)
deadLetterQ: Queue.Enqueue<WebhookTask>, // 追加のみ (配送失敗したものを保管する)
) {
return Effect.gen(function* () {
yield* Console.log(👷 worker-${id} started);
yield* Effect.forever(
Effect.gen(function* () {
const task = yield* Queue.take(inQ); // 空ならサスペンド
yield* deliver(task)
.pipe(Effect.retry({ schedule: retryPolicy }))
.pipe(
Effect.catchAll(() =>
Effect.gen(function* () {
yield* Console.error(💀 id=${task.id}: send to dead-letter);
yield* Queue.offer(deadLetterQ, task); // 最終的に処理不能のため dead-letter に積む
}),
),
);
}),
);
});
}
code:index.ts
// == プロデューサ: まとめてタスク投入
function producer(outQ: Queue.Enqueue<WebhookTask>, count: number) {
return Effect.gen(function* () {
yield* Console.log(🚚 producer generating ${count} tasks);
const tasks = Array.makeBy(count, i => ({
id: i + 1,
payload: { event: 'user.created', userId: i + 1 },
}));
yield* Queue.offerAll(outQ, tasks);
yield* Console.log(🚚 producer done);
});
}
code:index.ts
// == 監視: Dead Letterをバルクで抜いて可視化
function deadLetterMonitor(dlQ: Queue.Dequeue<WebhookTask>) {
return Effect.gen(function* () {
yield* Effect.forever(
Effect.gen(function* () {
yield* Effect.sleep(Duration.seconds(1));
const chunk = yield* Queue.takeUpTo(dlQ, 100);
if (Chunk.size(chunk) > 0) {
const ids = Chunk.toReadonlyArray(chunk).map(t => t.id);
yield* Console.warn(📬 dead-letter received: ids=${ids.join(', ')});
}
}),
);
});
}