EventEmitter
1回の要求に対して複数回の結果が発生する非同期処理はどう実装するか?
Webサーバが接続を受け付ける処理
1回起動して複数回のHTTPリクエストを受け付ける サーバの起動完了やエラーなどのタイミングでも処理を実行したい
Node.jsの標準モジュール node:http は Observer パターン Subject (監視対象) に Observer (監視役、リスナ) を登録し、Subjectはイベントが発生したらObserverに通知する
1つのSubjectが複数のObserverを持てる
EventEmitterはSubjectとして機能
code:hsrv.mjs
import http from "node:http";
// サーバを作成。EventEmitterのインスタンスでもある
const server = http.createServer();
// requestイベント (リクエストが来たとき) のリスナを登録
server.on("request", (req, res) => {
res.writeHead(200, { "Content-Type": "text/plain" });
res.write("Hello, world!");
res.end();
});
// listeningイベント (リクエストの受付開始) のリスナを登録
server.on("listening", () => {
console.log("listening");
});
// errorイベントのリスナを登録
server.on("error", (err) => {
console.error("error");
});
// closeイベント (リクエストの受付終了) のリスナを登録
server.on("close", () => {
console.log("close");
});
// サーバを起動、ポート8000でlisten
server.listen(8000);
EventEmitter インスタンスの on メソッドにイベント名とコールバックを渡す
process.on を使ってプロセス全体のエラーハンドリングができる
process も EventEmitter インスタンス
code:uncaught.mjs
process.on("uncaughtException", (err) => {
console.error("uncaughtException!");
console.error(err.message);
});
process.on("unhandledRejection", (err) => {
console.error("unhandledRejection!");
console.error(err.message);
});
Promise.reject(
new Error(
"This error will be caught by 'unhandledRejection' event of the process!"
)
);
throw new Error(
"This error will be caught by 'uncaughtException' event of the process!"
);
eventEmitter.on(event, listener)
イベント event が発生するたびに listener を実行
eventEmitter.once(event, listener)
event が発生したら listener を1回だけ実行
listener を実行したらリスナが自動的に削除される
eventEmitter.off(event, listener)
イベントリスナを削除
eventEmitter.emit(event, ...args)
event を発行
EventEmitterインスタンスを作る処理の中で同期的にイベントを発行するな
code:js
import { EventEmitter } from "node:events";
async function emitEvents(event) {
// 非同期関数であってもawaitしない限りイベントを同期的に発行します!!
event.emit("start");
Promise.resolve().then(() => event.emit("foo"));
event.emit("end");
}
function createEvent() {
const event = new EventEmitter();
emitEvents(event); // async関数が同期的に実行!!
return event;
}
createEvent() // ここの時点でstart/endが同期的に発行されてしまう
.on("start", () => console.log(start)) // ここでハンドルが生えてももう遅い
.on("end", () => console.log(end))
.on("foo", () => console.log("foo")); // こいつは大丈夫
function createEventSanely() {
const event = new EventEmitter();
Promise.resolve().then(() => emitEvents(event)); // async関数を非同期的に実行!!
return event;
}
createEventSanely() // ここの時点でstart/endはまだ発行されない
.on("start", () => console.log(start)) // ここでハンドルが生えるのが間に合う
.on("end", () => console.log(end))
.on("foo", () => console.log("foo"));
// この後非同期処理が走る
イベントハンドラの消し忘れに注意しよう!
code:js
import { EventEmitter } from "events";
const handler = () => console.log("noop!");
const e = new EventEmitter();
{
const handler2 = () => console.log(noop2!);
e.on("e", handler2);
e.on("e", handler2);
e.on("e", handler2);
e.on("e", handler2);
e.on("e", handler2);
} // handler2はここでスコープを抜けるけど関数そのものはeがつかんだまま
e.on("e", handler);
e.on("e", handler);
e.on("e", handler);
e.on("e", handler);
e.on("e", handler);
e.on("e", handler);
e.on("e", handler);
e.on("e", handler);
// (node:5325) MaxListenersExceededWarning: Possible EventEmitter memory leak detected.
// 11 e listeners added to EventEmitter. Use emitter.setMaxListeners() to increase limit e.emit("e");
"error" イベントはNode.jsが特別扱いしている
"error" のハンドラがない状態でイベントがemitされるとエラーがthrowされる
code:js
import events from "node:events";
try {
new events.EventEmitter().emit("error", new Error(Errorインスタンス));
} catch (error) {
console.error(catch: ${error});
}
try {
new events.EventEmitter()
.on("error", (error) => console.error(errorイベントハンドラ: ${error}))
.emit("error", new Error(Errorインスタンス));
} catch (error) {
console.error(catch: ${error});
}
EventEmitterを継承したクラス
code:js
import { EventEmitter } from "node:events";
import { setTimeout } from "node:timers/promises";
class MyEE extends EventEmitter {
constructor(until) {
super();
this.until = until;
}
async start() {
this.emit("start");
for (let count = 0; count < this.until; count++) {
const delay = 200 * Math.random();
await setTimeout(delay);
const num = Math.round(delay / 10);
switch (num % 2) {
case 0: {
this.emit("even", num);
}
case 1: {
this.emit("odd", num);
}
}
}
this.emit("end");
}
}
new MyEE(10) // ここの時点でstart/endが同期的に発行されてしまう
.on("start", () => console.log(start)) // ここでハンドルが生えてももう遅い
.on("end", () => console.log(end))
.on("even", (value) => console.log(even: ${value}))
.on("odd", (value) => console.log(odd: ${value}))
.start();
主要なイベントのハンドラをon以外から渡すパターン
code:js
// 普通にon使うパターン
import http from "node:http";
const server = http.createServer();
server.on("request", (req, res) => {
console.log(request from: ${req.headers.host} ${req.headers["user-agent"]});
res.write(on("request")!\n);
res.end();
});
const PORT = 8000;
server.on("listening", () => {
console.log(started listening at port: ${PORT});
});
server.listen(PORT);
code:js
// インスタンス生成やlistenのタイミングでハンドラを登録する
import http from "node:http";
const server = http.createServer((req, res) => {
console.log(request from: ${req.headers.host} ${req.headers["user-agent"]});
res.write(on("request")!\n);
res.end();
});
const PORT = 8000;
server.listen(PORT, () => {
console.log(started listening at port: ${PORT});
EventEmitterからPromiseやAsyncIteratorを生成する
code:js
import events from "node:events";
import http from "node:http";
const server = http.createServer();
const reqAIter = events.on(server, "request");
const listenPromise = events.once(server, "listening");
console.log(server.listenerCount("request"));
(async () => {
for await (const req, res of reqAIter) { console.log(
request from: ${req.headers.host} ${req.headers["user-agent"]}
);
res.write(events.on(server, "request")!\n);
res.end();
}
})();
const PORT = 8000;
listenPromise.then(() => {
console.log(started listening at port: ${PORT});
});
server.listen(PORT);