OpenTelemetry Operator OpAMPBridge編
こんにちは、@sugar235711です。
この記事は「ひとりで気になるOSSのソースコード全部読んで何かする Advent Calendar 2025」25日目の記事です。
前回の記事: OpenTelemetry Operator Instrumentation編
OpAMPとは?
大規模なKubernetesクラスタでは、数十〜数百のOpenTelemetry Collectorが稼働することがあります。これらを手動で管理するのは困難です:
設定変更: 各Collectorの設定を個別に更新する必要がある
バージョン管理: どのCollectorがどのバージョンで動いているか把握しづらい
ヘルス監視: 各Collectorの状態を一箇所で確認できない
トラブルシューティング: 問題発生時に個別にログを確認する必要がある
OpAMP (Open Agent Management Protocol) は、OpenTelemetryコミュニティが策定したエージェント管理のための標準プロトコルです。
code:_
┌─────────────────────────────────────────────────────────────────┐
│ OpAMP Server │
│ (管理プラットフォーム) │
│ │
│ • すべてのエージェントの状態を一覧表示 │
│ • 設定を一括配布 │
│ • ヘルスチェック │
│ • バージョン管理 │
└─────────────────────────────────────────────────────────────────┘
▲
│ OpAMP Protocol (双方向通信)
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Agent 1 │ │ Agent 2 │ │ Agent N │
│ (Collector) │ │ (Collector) │ │ (Collector) │
└─────────────┘ └─────────────┘ └─────────────┘
アーキテクチャ
code:_
┌─────────────────────────────────────────────────────────────────────────────┐
│ OpAMP Server (外部) │
│ (Collectorの集中管理プラットフォーム) │
└───────────────────────────────────┬──────────────────────────────────────────┘
│ OpAMP Protocol (WebSocket/HTTP)
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Kubernetes Cluster │
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ OpAMPBridge Pod │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ Agent │ │ │
│ │ │ • OpAMPサーバーとの双方向通信 │ │ │
│ │ │ • リモート設定の受信・適用 │ │ │
│ │ │ • ヘルス/設定情報の報告 │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ Proxy Server │ │ │
│ │ │ • Collector Podからの接続を受け付け │ │ │
│ │ │ • 各Collectorのヘルス/設定を集約 │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ │ ┌──────────────────────────────────────────────────────────────┐ │ │
│ │ │ Operator Client │ │ │
│ │ │ • K8s APIでCollector CRDをCRUD │ │ │
│ │ │ • コンポーネント/ラベル検証 │ │ │
│ │ └──────────────────────────────────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ OpenTelemetryCollector CRD Instances │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ reporting: true │ │ managed: true │ │ managed: bridge │ │ │
│ │ │ (読み取り専用) │ │ (CRUD可能) │ │ (特定Bridge用) │ │ │
│ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
OpAMPサーバーとの通信(Agent)
OpAMPサーバーとWebSocket/HTTPで双方向通信を行います。
table:_
方向 内容
サーバー → Bridge リモート設定、接続設定更新、再起動コマンド
Bridge → サーバー 現在設定、ヘルス情報、エージェント識別情報、設定適用ステータス
code:go
// cmd/operator-opamp-bridge/internal/agent/agent.go
func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) {
// リモート設定の適用
if agent.remoteConfigEnabled && msg.RemoteConfig != nil {
status, err := agent.applyRemoteConfig(msg.RemoteConfig)
agent.opampClient.SetRemoteConfigStatus(status)
agent.opampClient.UpdateEffectiveConfig(ctx)
}
}
Kubernetes APIとの通信(Operator Client)
K8s API Serverと通信してOpenTelemetryCollector CRDを操作します。
code:go
// cmd/operator-opamp-bridge/internal/operator/client.go
func (c Client) Apply(name, namespace string, configmap *protobufs.AgentConfigFile) error {
// 1. YAMLをパース
var collector v1beta1.OpenTelemetryCollector
yaml.Unmarshal(configmap.Body, &collector)
// 2. コンポーネント検証(componentsAllowed)
err = c.validateComponents(&collector.Spec.Config)
// 3. ラベル検証(opamp-managed: true?)
err = c.validateLabels(instance)
// 4. 作成または更新
if instance == nil {
return c.create(ctx, name, namespace, updatedCollector)
}
return c.update(ctx, instance, updatedCollector)
}
Collector Podとの通信(Proxy Server)
Collector Pod内のOpAMP Extensionからの接続を受け付け、情報を集約します。
code:go
// cmd/operator-opamp-bridge/internal/proxy/server.go
func (s *OpAMPProxy) onMessage(ctx context.Context, conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
instanceId, _ := getInstanceId(msg)
// 新しいCollectorの接続を記録
if _, ok := s.agentsByIdinstanceId; !ok {
s.agentsByIdinstanceId = NewAgent(s.logger, instanceId, conn)
}
// ステータス更新
s.agentsByIdinstanceId.UpdateStatus(msg, response)
// ホスト名(Pod名)→ インスタンスIDマッピング
if hostName := s.agentsByIdinstanceId.GetHostname(); len(hostName) > 0 {
s.agentsByHostNamehostName = instanceId
}
return response
}
CRD仕様
code:go
// apis/v1alpha1/opampbridge_types.go
type OpAMPBridgeSpec struct {
// === 必須フィールド ===
Endpoint string json:"endpoint" // OpAMPサーバーURL
Capabilities mapOpAMPBridgeCapabilitybool json:"capabilities" // サポート機能
// === オプションフィールド ===
Headers mapstringstring json:"headers,omitempty" // 認証ヘッダー
ComponentsAllowed mapstring[]string json:"componentsAllowed,omitempty" // 許可コンポーネント
// === Pod設定 ===
Replicas *int32 json:"replicas,omitempty" // Max: 1
Image string json:"image,omitempty"
// ... その他Pod設定フィールド
}
Capabilities一覧
code:go
// apis/v1alpha1/opampbridge_capabilities.go
const (
OpAMPBridgeCapabilityReportsStatus = "ReportsStatus" // 必須(自動有効化)
OpAMPBridgeCapabilityAcceptsRemoteConfig = "AcceptsRemoteConfig" // リモート設定受信
OpAMPBridgeCapabilityReportsEffectiveConfig = "ReportsEffectiveConfig" // 設定報告
OpAMPBridgeCapabilityReportsHealth = "ReportsHealth" // ヘルス報告
OpAMPBridgeCapabilityReportsRemoteConfig = "ReportsRemoteConfig" // 設定状態報告
OpAMPBridgeCapabilityAcceptsRestartCommand = "AcceptsRestartCommand" // 再起動受信
// ...
)
table:_
Capability 方向 説明
ReportsStatus Bridge→Server 必須。Bridgeの状態をサーバーに報告
AcceptsRemoteConfig Server→Bridge サーバーから送られた設定をK8sに適用
ReportsEffectiveConfig Bridge→Server 各Collectorの現在設定をサーバーに報告
ReportsHealth Bridge→Server 各Collectorのヘルス状態をサーバーに報告
Webhook実装
code:go
// apis/v1alpha1/opampbridge_webhook.go
func (o *OpAMPBridgeWebhook) defaulter(r *OpAMPBridge) error {
// UpgradeStrategy のデフォルト
if len(r.Spec.UpgradeStrategy) == 0 {
r.Spec.UpgradeStrategy = UpgradeStrategyAutomatic
}
// Replicas のデフォルト (1)
one := int32(1)
if r.Spec.Replicas == nil {
r.Spec.Replicas = &one
}
// ReportsStatus は必須で有効化
if r.Spec.Capabilities == nil {
r.Spec.Capabilities = make(mapOpAMPBridgeCapabilitybool)
}
r.Spec.CapabilitiesOpAMPBridgeCapabilityReportsStatus = true
return nil
}
バリデーション
code:go
func (o *OpAMPBridgeWebhook) validate(r *OpAMPBridge) (admission.Warnings, error) {
// 1. Endpoint必須チェック
if len(strings.TrimSpace(r.Spec.Endpoint)) == 0 {
return nil, fmt.Errorf("the OpAMP server endpoint is not specified")
}
// 2. Capabilities必須チェック
if len(r.Spec.Capabilities) == 0 {
return nil, fmt.Errorf("capabilities not specified")
}
// 3. Replicas上限チェック (Max: 1)
if r.Spec.Replicas != nil && *r.Spec.Replicas > 1 {
return nil, fmt.Errorf("replica count must not be greater than 1")
}
return nil, nil
}
Reconcileループ
code:go
// internal/controllers/opampbridge_controller.go
func (r *OpAMPBridgeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// 1. CRの取得
var instance v1alpha1.OpAMPBridge
if err := r.Client.Get(ctx, req.NamespacedName, &instance); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 2. 削除中ならスキップ
if instance.GetDeletionTimestamp() != nil {
return ctrl.Result{}, nil
}
// 3. マニフェスト生成
params := r.getParams(instance)
desiredObjects, err := BuildOpAMPBridge(params)
// 4. リコンサイル実行(作成/更新/削除)
err = reconcileDesiredObjects(ctx, r.Client, log, &instance, params.Scheme, desiredObjects, nil)
// 5. ステータス更新
return opampbridgeStatus.HandleReconcileStatus(ctx, log, params, err)
}
Watch対象リソース
code:go
func (r *OpAMPBridgeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.OpAMPBridge{}). // 主リソース
Owns(&corev1.ConfigMap{}). // ConfigMap
Owns(&corev1.ServiceAccount{}). // ServiceAccount
Owns(&corev1.Service{}). // Service
Owns(&appsv1.Deployment{}). // Deployment
Complete(r)
}
マニフェスト生成
Build関数
code:go
// internal/manifests/opampbridge/opampbridge.go
func Build(params manifests.Params) ([]client.Object, error) {
var resourceManifests []client.Object
resourceFactories := []manifests.K8sManifestFactorymanifests.Params{
manifests.FactoryWithoutError(Deployment), // Deployment
manifests.Factory(ConfigMap), // ConfigMap
manifests.FactoryWithoutError(ServiceAccount), // ServiceAccount
manifests.FactoryWithoutError(Service), // Service
}
for _, factory := range resourceFactories {
res, err := factory(params)
if err != nil {
return nil, err
} else if manifests.ObjectIsNotNil(res) {
resourceManifests = append(resourceManifests, res)
}
}
return resourceManifests, nil
}
生成されるリソース
code:_
OpAMPBridge CR
│
└─── OpenTelemetry Operator Controller
│
├── Deployment
│ └── Pod (Replicas: 1)
│ └── Container: operator-opamp-bridge
│ ├── /conf/remoteconfiguration.yaml
│ └── Env: OTELCOL_NAMESPACE
│
├── ConfigMap
│ └── remoteconfiguration.yaml
│ ├── endpoint
│ ├── capabilities
│ └── componentsAllowed
│
├── ServiceAccount
│
└── Service (portsが指定されている場合)
ConfigMap生成
code:go
// internal/manifests/opampbridge/configmap.go
func ConfigMap(params manifests.Params) (*corev1.ConfigMap, error) {
config := make(mapinterface{}interface{})
config"endpoint" = params.OpAMPBridge.Spec.Endpoint
config"headers" = params.OpAMPBridge.Spec.Headers
config"capabilities" = params.OpAMPBridge.Spec.Capabilities
config"componentsAllowed" = params.OpAMPBridge.Spec.ComponentsAllowed
configYAML, _ := yaml.Marshal(config)
return &corev1.ConfigMap{
Data: mapstringstring{
"remoteconfiguration.yaml": string(configYAML),
},
}, nil
}
Container生成
code:go
// internal/manifests/opampbridge/container.go
func Container(cfg config.Config, logger logr.Logger, opampBridge v1alpha1.OpAMPBridge) corev1.Container {
// イメージ選択
image := opampBridge.Spec.Image
if len(image) == 0 {
image = cfg.OperatorOpAMPBridgeImage // デフォルトイメージ
}
// ConfigMap Volumeマウント
volumeMounts := []corev1.VolumeMount{{
Name: naming.OpAMPBridgeConfigMapVolume(),
MountPath: "/conf",
}}
// 環境変数(Downward API)
envVars := append(opampBridge.Spec.Env, corev1.EnvVar{
Name: "OTELCOL_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"},
},
})
return corev1.Container{
Name: naming.OpAMPBridgeContainer(),
Image: image,
Env: envVars,
VolumeMounts: volumeMounts,
Resources: opampBridge.Spec.Resources,
SecurityContext: opampBridge.Spec.SecurityContext,
}
}
OpAMPBridge Pod内部実装
エントリポイント
code:go
// cmd/operator-opamp-bridge/main.go
func main() {
cfg, _ := config.Load(l, os.Args)
// 1. Kubernetes Client 作成
kubeClient, _ := cfg.GetKubernetesClient()
operatorClient := operator.NewClient(cfg.Name, l, kubeClient, cfg.GetComponentsAllowed())
// 2. OpAMP Client 作成(Server接続用)
opampClient := cfg.CreateClient()
// 3. Proxy Server 作成(Collector接続用)
opampProxy := proxy.NewOpAMPProxy(l, cfg.ListenAddr)
// 4. Agent 作成(中核ロジック)
opampAgent := agent.NewAgent(l, operatorClient, cfg, opampClient, opampProxy)
// 5. 起動
opampAgent.Start()
opampProxy.Start()
// 6. シャットダウン待ち
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
opampAgent.Shutdown()
opampProxy.Stop(context.Background())
}
リモート設定適用フロー
code:go
// cmd/operator-opamp-bridge/internal/agent/agent.go
func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*protobufs.RemoteConfigStatus, error) {
var multiErr error
// 1. 受信した設定を適用
for key, file := range config.Config.GetConfigMap() {
// key = "namespace/name"
colKey, _ := kubeResourceFromKey(key)
// K8s APIでCollector CRDを作成/更新
err = agent.applier.Apply(colKey.name, colKey.namespace, file)
agent.appliedKeyscolKey = true
}
// 2. 削除されたリソースを検出・削除
for collectorKey := range agent.appliedKeys {
if _, ok := config.Config.GetConfigMap()collectorKey.String(); !ok {
agent.applier.Delete(collectorKey.name, collectorKey.namespace)
}
}
// 3. ステータス返却
return &protobufs.RemoteConfigStatus{
LastRemoteConfigHash: config.GetConfigHash(),
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
}, nil
}
セキュリティモデル
ラベルによるアクセス制御
OpAMPBridgeはすべてのCollectorを無条件に変更できません。ラベルによる明示的な許可が必要です。
table:_
ラベル 値 動作
opentelemetry.io/opamp-reporting "true" 報告のみ(変更不可)
opentelemetry.io/opamp-managed "true" 報告 + 管理(CRUD可能)
opentelemetry.io/opamp-managed "<bridge-name>" 特定Bridgeのみ管理
code:go
// cmd/operator-opamp-bridge/internal/operator/client.go
func (c Client) validateLabels(collector *v1beta1.OpenTelemetryCollector) error {
resourceLabels := collector.GetLabels()
// 報告専用(reporting)ラベルがあれば変更不可
if labelSetContainsLabel(resourceLabels, ReportingLabelKey, "true") {
return errors.NewBadRequest("cannot modify a collector with opamp-reporting: true")
}
// 管理(managed)ラベルがなければ変更不可
if !labelSetContainsLabel(resourceLabels, ManagedLabelKey, "true") &&
!labelSetContainsLabel(resourceLabels, ManagedLabelKey, c.name) {
return errors.NewBadRequest("cannot modify a collector without opamp-managed label")
}
return nil
}
コンポーネント制限
componentsAllowedで使用可能なコンポーネントを制限できます:
code:yaml
spec:
componentsAllowed:
receivers:
- otlp
- prometheus
processors:
- batch
exporters:
- otlphttp
許可されていないコンポーネントを含む設定は拒否されます。
OpAMPBridge CR
code:yaml
apiVersion: opentelemetry.io/v1alpha1
kind: OpAMPBridge
metadata:
name: opamp-bridge
namespace: observability
spec:
# OpAMPサーバー接続設定
endpoint: ws://opamp-server.example.com:4320
headers:
Authorization: "Bearer ${OPAMP_AUTH_TOKEN}"
# 有効化するCapabilities
capabilities:
AcceptsRemoteConfig: true
ReportsEffectiveConfig: true
ReportsHealth: true
ReportsRemoteConfig: true
# 許可するコンポーネント
componentsAllowed:
receivers:
- otlp
- prometheus
processors:
- memory_limiter
- batch
exporters:
- otlphttp
# リソース設定
resources:
requests:
cpu: 100m
memory: 128Mi
管理対象Collector
code:yaml
apiVersion: opentelemetry.io/v1beta1
kind: OpenTelemetryCollector
metadata:
name: managed-collector
namespace: prod
labels:
opentelemetry.io/opamp-managed: "true" # このラベルが必要
spec:
mode: deployment
config:
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
processors:
batch: {}
exporters:
otlphttp:
endpoint: http://backend:4318
service:
pipelines:
traces:
receivers: otlp
processors: batch
exporters: otlphttp
必要なRBAC
code:yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: opamp-bridge-role
rules:
- apiGroups: "opentelemetry.io"
resources: "opentelemetrycollectors"
verbs: "get", "list", "watch", "create", "update", "patch", "delete"
- apiGroups: ""
resources: "pods"
verbs: "get", "list", "watch"
制限事項
table:_
項目 制限
Replicas 最大 1(単一インスタンスのみ)
プロトコル WebSocket (ws://, wss://) または HTTP
管理対象 opamp-managed ラベル付きCollectorのみ変更可能
コンポーネント componentsAllowed で許可されたもののみ使用可能
まとめ
OpAMPBridgeを見ました。