@opentelemetry/exporter-*編
OpenTelemetry Collectorに送信するためのエクスポーターです。基本的には@opentelemetry/otlp-exporter-baseをベースにそれぞれのexporterが構成されています。
主な特徴:
OTLPプロトコルを使用した通信
TLS/mTLSのサポート
カスタムメタデータの付与
gzip圧縮のサポート
エクスポートフロー
code:ts
export(internalRepresentation: Internal, resultCallback: (result: ExportResult) => void): void {
// 1. 同時実行制限チェック
if (this._promiseQueue.hasReachedLimit()) {
resultCallback({ code: ExportResultCode.FAILED, error: new Error('Concurrent export limit reached') });
return;
}
// 2. シリアライズ
const serializedRequest = this._serializer.serializeRequest(internalRepresentation);
// 3. トランスポートで送信
this._promiseQueue.pushPromise(
this._transport.send(serializedRequest, this._timeout).then(response => {
// レスポンス処理
if (response.status === 'success') {
// Partial Success対応
this._responseHandler.handleResponse(this._serializer.deserializeResponse(response.data));
resultCallback({ code: ExportResultCode.SUCCESS });
} else if (response.status === 'retryable') {
// リトライ対象(上位層でハンドリング)
} else {
resultCallback({ code: ExportResultCode.FAILED, error: response.error });
}
})
);
}
1. 同時実行制限チェック
BoundedQueueExportPromiseHandlerというhandlerないでPromiseの同時実行う数を制限しています。
code:ts
public hasReachedLimit(): boolean {
return this._sendingPromises.length >= this._concurrencyLimit;
}
デフォルトは30件だそうです、まあローカルコレクターなのでマイクロタスクが詰まる...みたいなことはないのかな...
code:ts
export function getSharedConfigurationDefaults(): OtlpSharedConfiguration {
return {
timeoutMillis: 10000,
concurrencyLimit: 30,
compression: 'none',
};
}
2. シリアライズ
シリアライズに関してはProtobufとJSON シリアライザがそれぞれ@opentelemetry/otlp-transformerに定義されています。
この中でSDKがOTLP形式にSpanを変換しています。
code:ts
export function sdkSpanToOtlpSpan(span: ReadableSpan, encoder: Encoder): ISpan {
const ctx = span.spanContext();
const status = span.status;
const parentSpanId = span.parentSpanContext?.spanId
? encoder.encodeSpanContext(span.parentSpanContext?.spanId)
: undefined;
return {
traceId: encoder.encodeSpanContext(ctx.traceId),
spanId: encoder.encodeSpanContext(ctx.spanId),
parentSpanId: parentSpanId,
traceState: ctx.traceState?.serialize(),
name: span.name,
// Span kind is offset by 1 because the API does not define a value for unset
kind: span.kind == null ? 0 : span.kind + 1,
startTimeUnixNano: encoder.encodeHrTime(span.startTime),
endTimeUnixNano: encoder.encodeHrTime(span.endTime),
attributes: toAttributes(span.attributes),
droppedAttributesCount: span.droppedAttributesCount,
events: span.events.map(event => toOtlpSpanEvent(event, encoder)),
droppedEventsCount: span.droppedEventsCount,
status: {
// API and proto enums share the same values
code: status.code as unknown as EStatusCode,
message: status.message,
},
links: span.links.map(link => toOtlpLink(link, encoder)),
droppedLinksCount: span.droppedLinksCount,
flags: buildSpanFlagsFrom(ctx.traceFlags, span.parentSpanContext?.isRemote),
};
}
これをjsonはJSON.stringifyしてencode, protobufはバイナリ化しています。
3.トランスポートで送信
ブラウザ、Node.jsどちらの環境でも扱えるようになっています。
code:ts
const abortController = new AbortController();
const timeout = setTimeout(() => abortController.abort(), timeoutMillis);
try {
const isBrowserEnvironment = !!globalThis.location;
const url = new URL(this._parameters.url);
const response = await fetch(url.href, {
method: 'POST',
headers: await this._parameters.headers(),
body: data,
signal: abortController.signal,
keepalive: isBrowserEnvironment, // ブラウザでページ遷移時もリクエスト継続
mode: isBrowserEnvironment
? globalThis.location?.origin === url.origin
? 'same-origin'
: 'cors'
: 'no-cors',
});
gzip圧縮する場合はzlibが使われています。
ちなみに転送量削減のためgzip圧縮されるのがデフォルトです。
code:ts
export function compressAndSend(
req: http.ClientRequest,
compression: 'gzip' | 'none',
data: Uint8Array,
onError: (error: Error) => void
) {
let dataStream = readableFromUint8Array(data);
if (compression === 'gzip') {
req.setHeader('Content-Encoding', 'gzip');
dataStream = dataStream
.on('error', onError)
.pipe(zlib.createGzip()) // Node.js zlibでストリーム圧縮
.on('error', onError);
}
dataStream.pipe(req).on('error', onError);
}
Partial SuccessはOTLPで定義された部分的成功の概念です。エクスポートリクエストの一部のみが処理され、残りが拒否された場合に返されます。
code:ts
// otlp-export-delegate.ts
if (response.status === 'success') {
// Partial Successをハンドリング
this._responseHandler.handleResponse(
this._serializer.deserializeResponse(response.data)
);
// 結果はSUCCESS
resultCallback({ code: ExportResultCode.SUCCESS });
}
まとめ
opentelemetry/exporterのベース実装を見ていきました。