2021/6/6 aws-sdk-rails AWS SQS Active Job
onk.icon
SQS を利用する ActiveJob adapter には shoryuken があるが、何か違いがあるのかとかを見ていく。 README
enqueue
:amazon_sqs or :amazon
ActionMailer でも :amazon が使われているため alias が貼ってある
:amazon_sqs でヨサソウ
:amazon_sqs_async
enqueue 時に async になる。 Concurrent::Promise を利用
FIFO queue は async にできない
enqueue 時に async にしたいの、どういう状況だろう?
SQS あんまり使ってないから重さの感覚が無い
shoryuken よりパフォーマンスが良いと主張している
hash 作って SQS Client に send_message するだけなのに差が出るの……?
SQS の設定は config/aws_sqs_active_job/#{::Rails.env}.yml に書く
runner
2種類用意されている
polling
lambda
polling
Aws::Rails::SqsActiveJob::Poller.new.run しているだけ
initialize 時に set_environment して
run の頭で boot_rails する
boot_rails は config/environment.rb を読み込む処理
onk.icon 多分これ用意すれば sinatra でも使えると思うんだよねー
@executor を用意して、INT/TERM が送られるまで poll する
poll は Aws::SQS::QueuePoller を呼んでいるだけ
onk.icon while loop とか無かったので、Aws::SQS::QueuePoller の中でやってくれているのかな
message が取れたら executor に渡す
code:rb
@executor.execute(Aws::SQS::Message.new(
queue_url: queue_url,
receipt_handle: msg.receipt_handle,
data: msg,
client: client
))
initialize 時に Concurrent::ThreadPoolExecutor を作っておく
execute では ThreadPoolExecutor に対して post(message) する
短ッ!
code:ruby
ActiveJob::Base.execute @job_data
これだけなんだ。ActiveJob の本領発揮って感じ
lambda
以下の 5 つの手順を実行すれば良いトノコト
2. Push your image to ecr
3. Create a lambda function from your image (see the lambda docs for details).
4. Add an SQS Trigger for the queue(s) you want to process jobs from.
5. Set the ENTRYPOINT to /usr/local/bundle/bin/aws_lambda_ric and the CMD to config/environment.Aws::Rails::SqsActiveJob.lambda_job_handler - this will load Rails and then use the lambda handler provided by aws-sdk-rails. You can do this either as function config or in your Dockerfile.
この場合は ENTRYPOINT ["/lambda-entrypoint.sh"] になっている
CMD を config/environment.Aws::Rails::SqsActiveJob.lambda_job_handler にする
onk.icon えっ! こんな指定アリなの!!
config/environment.rb を読み込んで
Aws::Rails::SqsActiveJob.lambda_job_handler メソッドを起動する
以下の形しか知らなかったので、別モジュールのメソッドを呼べると思ってなかった
code:lambda.rb
def handler
...
code:Dockerfile
言われてみたら source.LambdaFunctions::Handler.process も指定したことあったけど、これも source.rb の中で定義していたので別ファイルのイメージが無かったんだなぁ
lambda_job_handler を見ていく
event で渡ってきたデータを Aws::SQS::Message に変換して
job を起動する
shoryuken との差
たぶん時間切れになるが見るだけ見ていく
enqueue はほとんど差がなさそう
bin/shoryuken は
Shoryuken.polling_strategy をココで指定できそう
shoryuken 思ってた以上に重厚で少し心が折れてきた
肝は
code:ruby
Shoryuken::Manager.new(
Shoryuken::Fetcher.new(group),
Shoryuken.polling_strategy(group).new(options:queues, Shoryuken.delay(group)), executor
)
Manager に渡すのは、fetcher, polling_strategy, concurrency, executor
fetcher
client.receive_message
Aws::SQS::QueuePoller も同じ
polling_strategy
デフォルトだと WeightedRoundRobin
Manager は start されると dispatch_loop を回す
code:ruby
@executor.post { dispatch }
dispatch は
@fetcher.fetch して
@polling_strategy.messages_found して
assign する
assign
code:ruby
Concurrent::Promise
.execute(executor: @executor) { Processor.process(queue_name, sqs_msg) }
.then { processor_done(queue_name) }
.rescue { processor_done(queue_name) }
Processor.process
code:ruby
worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do
worker.perform(sqs_msg, body)
end
middleware の仕組みあるんだー
worker 実行側は特に ActiveJob の仕組みには乗っかっていない
まとめ
ActiveJob の queue_adapter = :amazon_sqs すると SQS に enqueue できる
SQS Client に send_message しているだけで素朴
polling runner は
polloer を起動したら Aws::SQS::QueuePoller が無限ループしてくれて
Message を受け取ると、executor の Thread が動く
Thread では ActiveJob::Base.execute するだけ
lambda runner は
code:Dockerfile
した Container Image を SQS Trigger で動かすだけで、いいかんじに動く
config/environment を読み込ませるって発想が無かったので感動した
読みやすかったので aws-sdk-rails を使っていようかなぁ
queue ごとにポーリング間隔を変えるとか group とかを使いたくなったら shoryuken を使うことになりそう