concurrent-rubyを使って並行処理する
concurrent-rubyの並行処理は全てthread safe
Railsで標準で入ってるから無駄な外部依存にはならない
昔はparallelをよく使ってたけどこっちで十分そう(マルチコアで並列処理したいならparallelで良いかも)
code:rb
class ConcurrentProcessor
class ProcessError < StandardError
attr_reader :key, :original_error
def initialize(key, error)
@key = key
@original_error = error
end
end
def initialize(pool_size: 5, timeout: 30)
@processes = {}
@pool_size = pool_size
@timeout = timeout
# 同時実行させるスレッド数を可変できるようにする
@executor = Concurrent::FixedThreadPool.new(pool_size)
end
def register(key, &block)
self
end
def execute
futures = schedule_processes
results = futures.to_h(&:value!)
handle_errors(results)
results
ensure
@executor.shutdown
end
private
def schedule_processes
@processes.map do |key, proc|
Concurrent::Future.execute(executor: @executor) do
process_job(key, proc)
end
end
end
def process_job(key, job)
result = Timeout.timeout(@timeout) { job.call }
rescue StandardError => e
end
def handle_errors(results)
errors = results.select { |_, v| v.is_a?(ProcessError) }
return if errors.empty?
error_messages = errors.map { |key, error|
}.join(', ')
end
end
参考
ruby-concurrency/concurrent-ruby: Modern concurrency tools including agents, futures, promises, thread pools, supervisors, and more. Inspired by Erlang, Clojure, Scala, Go, Java, JavaScript, and classic concurrency patterns.