Confluent Cloudでkafka-streamsのapplicationをリセットする
kafka-streamsのアプリケーションを運用していて予期せぬメッセージを処理してハンドルされない例外が出た場合、アプリケーションが停止する。
現代ではこのアプリケーションがコンテナに載っていたりして自動的に再起動されたりする
が、例外の原因となるメッセージは処理されていないので同じメッセージを処理しようとし、またハンドルされない例外が出てアプリケーションが停止、というループになる。
ミッションクリティカルなアプリケーションでこのような状況に陥った場合、問題のあるメッセージだけ飛ばしてとりあえず処理を再開したい、というニーズがある。
そのニーズに答えるツールがkafkaの開発元のconfluentより提供されている
色々と注意点があったり、confluentのブローカーに対して実行する際の設定が自明ではないのでこの記事ではその辺りを社内のメモから書き移す
流れ
3. リセットしたいアプリケーションを全て停止する
5. リセットツールを実行する。以下は実行例
code:bash
bin/confluent/bin/kafka-streams-application-reset \
--bootstrap-servers <broker>.us-west-2.aws.confluent.cloud:9092 \
--application-id <application-id>
--input-topics topic-one,topic-two --config-file config --to-latest --dry-run
6. アプリケーションを再度起動する
パラメーターの解説
--bootstrap-servers
特に説明の必要は無い気がする
--application-id
全く存在しない値を指定しても正常に終了してしまう (実際には何も起らない)ので注意が必要。
--input-topics
複雑なアプリケーションの場合複数存在し得る。カンマ区切りで指定する。
--config-file
confluentのブローカーとしゃべるには認証が必要で、ツールにその情報を渡す必要がある。
こんな感じに書くと繋がる
code:text
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>";
--to-latest
inputトピックのオフセットがlatestになる。つまり最後のメッセージまで処理を飛ばす。
他にも特定のオフセットを指定できるオプションもあり、実際にはメッセージはなるべく飛ばしたく無いと思われるが、ピンポイントで指定する必要があり難かしそう。
--dry-run
大事