kafka-streamsのstateListenerにあれもこれもやらせたい
前提
ステートフルなリソースを抱えているkafka-streamsのアプリケーションにおいて、kafka-streamsの状態に呼応した処理をしたいという需要がある
streamsが起動していない間はコネクションを受け付けないようにしたい、等
新しい状態と古い状態を引数にしたonChangeメソッドを実装したリスナー登録すると、kafka-streamsの状態が遷移するタイミングで呼び出してくれる
課題
最大で一つしかリスナーを登録できない
リスナーAが登録されている状態でリスナーBを呼び出すとリスナーAを登録していなかったことになる
巨大なonChangeは書きたくない
そもそもonChangeで触りたいものAとBの初期化が完全に別の所で行なわれるので厳しい
解決
onChangeで実行したい処理達を関数のベクター(in atom)で保持
(swap! state-listeners conj on-change-fn)みたいなノリで処理を外から追加できる
状態遷移以外にもコールバック関数が登録されたタイミングで呼ばれて欲しいニーズがある
e.g. kafka-streamsがRUNNINGの時だけコネクションを貼る、みたいなコールバックをしたいとして、ストリームが完全に起動してしまった後のタイミングでこのコールバックを追加しても自然な状態遷移は発生しないためコネクションが貼れない
のでatomの変更を監視して新しく追加されたコールバックを即呼ぶようにすることができる
stateListerはonChangeでベクターに入っている関数をイテレートして呼び出すようにする
code:clojure
(def streams (KafkaStreams. ...))
(def state-listeners (atom ...)) (when state-listeners
(add-watch state-listeners
::invoke-for-new-listeners
(fn _ _ n
(let [new-listener (last n)
state (.state streams)]
(new-listener state
state)))))
(.setStateListener
streams
(reify KafkaStreams$StateListener
(onChange new old
(listener new old)))))
(swap! state-listeners conj (fn new old (prn "state changed from " new "to" old))) (swap! state-listeners conj (fn new old (when (= new KafkaStreams$State/RUNNING) (start-server...)))) Clojure
Clojure以外の言語は追っていないので比較はしていないが、Clojureだとただの関数のオブジェクトをベクターに追加するだけなのでシンプル
atomのadd-watchで変更を監視する処理が書けるの地味に嬉しい
reifyで軽量なインターフェースの実装を提供できるのでJavaとの相互運用バッチリ