OpenTelemetry Colloctor Exporter編
Exporter実装のコードを読みながら、API設計と内部アーキテクチャの勘所をまとめます。
対象は exporter パッケージ配下と、代表的な実装としてotlpexporterです。
OpenTelemetry Collector における Exporter は、Pipeline のデータを外部の Observability Backend へ送信する最終レイヤです。
exporterhelper: Sender チェーン・アーキテクチャ
exporterhelper は Exporter 実装者向けのヘルパーです。NewTraces/Logs/Metrics に「実処理関数」を渡すだけで、以下の機能が自動で付与されます:
Start/Shutdown のライフサイクル管理
Timeout Sender - 各リクエストにタイムアウト設定
Retry Sender - Exponential Backoff によるリトライ
ObsReport Sender - トレース Span とメトリクス計測
Queue Sender - バッファリングとバッチ処理
これらは Sender チェーン として連結されます。利用自体は実装者で決めれますが今回は全部使う前提で実装を見ます。
code:md
↓
QueueSender (バッファリング・バッチ処理)
↓
ObsReportSender (Span作成・メトリクス計測)
↓
RetrySender (Exponential Backoff)
↓
TimeoutSender (タイムアウト制御)
↓
基底となるinterface
code:go
type SenderT any interface { component.Component // Start/Shutdown
Send(context.Context, T) error
}
type SendFuncT any func(ctx context.Context, data T) error QueueSender
非同期バッファリングとバッチ処理を提供する Sender です。
メモリおよび、永続化のどちらかを選んでキューイングができるようになっています。
code:go
// 1. Base Queue(メモリ or 永続化)
q := newBaseQueue(set)
// 2. AsyncQueue(Consumer goroutine を起動)
// 3. ObsQueue(メトリクス/トレース)
oq, _ := newObsQueue(set, newAsyncQueue(q, set.NumConsumers, next, set.ReferenceCounter))
return oq, nil
}
if set.StorageID == nil {
return newMemoryQueueT(set) // インメモリ }
return newPersistentQueueT(set) // 永続化(ファイルストレージ) }
code:go
obsQueue (メトリクス/トレース)
│
▼
asyncQueue (Consumer goroutine 管理)
│
▼
memoryQueue / persistentQueue (実際のデータ保持)
PersistentQueueに関してはfilestorage等のStorage Extensionを使うことでCollectorのクラッシュ等に備えたデータの退避ができます。
code:go
// ┌───────file extension-backed queue───────┐
// │ │
// │ ┌───┐ ┌───┐ ┌───┐ ┌───┐ ┌───┐ │
// │ n+1 │ n │ ... │ 4 │ │ 3 │ │ 2 │ │ 1 │ │
// │ └───┘ └───┘ └─x─┘ └─|─┘ └─x─┘ │
// │ x | x │
// └───────────────────────x─────|─────x─────┘
// ▲ ▲ x | x
// │ │ x | xxxx deleted
// │ │ x |
// write read x └── currently dispatched item
// index index x
// xxxx deleted
KubeConでもログ等は外だししておいたほうが良い話されてましたね。
ObsReportSender
Spanの作成・記録、メトリクス の記録(送信成功/失敗件数)を自動的に行うためのもの
code:go
switch signal {
case pipeline.SignalTraces:
or.itemsSentInst = telemetryBuilder.ExporterSentSpans
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedSpans
case pipeline.SignalMetrics:
or.itemsSentInst = telemetryBuilder.ExporterSentMetricPoints
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedMetricPoints
case pipeline.SignalLogs:
or.itemsSentInst = telemetryBuilder.ExporterSentLogRecords
or.itemsFailedInst = telemetryBuilder.ExporterSendFailedLogRecords
}
RetrySender
Exponential Backoffでエクスポート失敗時のリトライを行えるようになっています。
code:yaml
retry_on_failure:
enabled: true
initial_interval: 5s
randomization_factor: 0.5
multiplier: 1.5
max_interval: 30s
max_elapsed_time: 5m
内部的に↓利用しています。
TimeoutSender
各送信試行に対してタイムアウトを設定します。
code:yaml
exporters:
otlp:
endpoint: backend:4317
timeout: 10s # デフォルト 5s を 10s に変更
code:go
func (ts *TimeoutConfig) Validate() error {
// Negative timeouts are not acceptable, since all sends will fail.
if ts.Timeout < 0 {
return errors.New("'timeout' must be non-negative")
}
return nil
}
あとは任意のpusherが実際のバックエンドに対してリクエストを送ってくれます。
code:go
// 1. 初期化時: pusher を Sender Chain の末端として登録
be.firstSender = sender.NewSender(pusher) // pusher を保持
// 2. データ送信時: Sender Chain を通って最後に pusher が呼ばれる
func (es *senderT) Send(ctx context.Context, req T) error { return es.consFunc(ctx, req) // consFunc = pusher
}
// 3. pusher 内で traceExporter を使って送信
func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
req := ptraceotlp.NewExportRequestFromTraces(td)
resp, respErr := e.traceExporter.Export(ctx, req, e.callOptions...)
// ...
}
まとめ
Exporterの実装を見ました