OpenTelemetry Colloctor Processor編
Processor実装のコードを読みながら、API設計と内部アーキテクチャの勘所をまとめます。
対象は processor パッケージ配下と、代表的な実装として batchprocessor と memorylimiterprocessor です。今回はmemorylimiterprocessorを見ます。
OpenTelemetry Collector における Processor は、Receiver と Exporter の間でテレメトリを変換・制御するレイヤです。
このリポジトリでは以下の Signal を扱います。
traces: processor.Traces
metrics: processor.Metrics
logs: processor.Logs
profiles: xprocessor.Profiles (実験的シグナル)
中核は Factory + helper で、各 Processor は「設定値の検証/初期化」→「Consume 時の処理」→「次の consumer へ委譲」という形を共通化しています。
processor は各 Signal 用のインターフェースと、Factory の共通 API を提供します。
processor.Traces/Metrics/Logs は component.Component と consumer.* の合成
processor.Settings に ID/Telemetry/BuildInfo を集約
processor.Factory が Create* で統一
code:go
// CreateTraces は対応していなければ ErrSignalNotSupported
func (f *factory) CreateTraces(...) (Traces, error) {
if f.createTracesFunc == nil {
return nil, pipeline.ErrSignalNotSupported
}
if set.ID.Type() != f.Type() {
return nil, internal.ErrIDMismatch(set.ID, f.Type())
}
return f.createTracesFunc(...)
}
実装者向けに最重要なのが processorhelper です。
NewTraces/Logs/Metrics に「実処理関数」を渡すだけで、以下が自動で付与されます。
Start/Shutdown のライフサイクル管理
consumer.New* による Wrapper
トレース Span にイベントを追加
処理時間/入出力件数のメトリクス計測
code:go
metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
startTime := time.Now()
pointsIn := md.DataPointCount()
md, errFunc := metricsFunc(ctx, md)
obs.recordInternalDuration(ctx, startTime)
span.AddEvent("End processing.", eventOptions)
if errFunc != nil {
obs.recordInOut(ctx, pointsIn, 0)
if errors.Is(errFunc, ErrSkipProcessingData) {
return nil
}
return errFunc
}
pointsOut := md.DataPointCount()
obs.recordInOut(ctx, pointsIn, pointsOut)
return nextConsumer.ConsumeMetrics(ctx, md)
}, bs.consumerOptions...)
Batch Processor
singleShardとmultiShardがあります。
metadata_keys なし: singleShardBatcher でロック/Map不要
metadata_keys あり: multiShardBatcher が metadata 組み合わせごとに shard を作成
metadata の key を lower-case + sort
attribute.Set をキーに sync.Map で shard 管理
metadata_cardinality_limit を超えると errTooManyBatchers
code:go
aset := attribute.NewSet(attrs...)
if b, ok := mb.batchers.Load(aset); !ok {
if mb.metadataLimit != 0 && mb.size >= mb.metadataLimit {
return errTooManyBatchers
}
b, loaded := mb.batchers.LoadOrStore(aset, mb.processor.newShard(md))
if !loaded { b.(*shardT).start(); mb.size++ } }
shard毎にgoroutineが起動します。
code:go
func (b *shardT) start() { b.processor.goroutines.Add(1)
go b.startLoop()
}
これを操作するためにmetadata_cardinality_limitをユーザー側で設定できるシャード数の合計として定義されています。
デフォルトは1000の様です。これで十分な気はしますが、テナント数が増えると足りなくなるのかもしれません。
The maximum number of distinct combinations is limited to the configured metadata_cardinality_limit, which defaults to 1000 to limit memory impact.
まとめ
明日はexporterへ