2024/12/20 セマフォ
https://scrapbox.io/files/676558199bbe31151e434111.webp
セマフォって知ってますか? わたしは最近知りました GPT-4.icon
セマフォ(Semaphore)とは、複数のプロセスやスレッドがリソースを安全に共有するための同期の仕組みです。並行処理や並列処理で、リソースへのアクセスを制限したり、順序を制御したりするために使われます。
簡単に言うと、「共有リソースの利用を管理するカウンター」 だと思ってください。
Goで並行実行で処理を進めなくてはいけない場面がありました 「並行実行で処理を進めなくてはいけない」は具体的なHowになってしまいますが、問題はこうです:
Goで実行されるLambda関数があり、タイムアウトは30sに設定している その関数は、ある外部APIへリクエストし結果を加工して RDS にレコードをinsertする
外部APIへのリクエストはパラメータにページングを指定して取得するため、当初想定していたよりも外部APIへのリクエスト数が増えて、Lambdaはタイムアウトしてしまう、といった課題が前提にあります
外部 API レスポンスのページング仕様において、初回のリクエストのレスポンスには nextPageLink といった次ページのパラメータURLと、lastPageLink という最後のページのフィールドが与えられ、既存の実装ではリクエストしレスポンスを受けたあとに nextPageLink をリクエストするという逐次実行の実装でした
安直にタイムアウト延ばせば?といったHowも考えられます
しかし今回起きている問題は外部APIへ問い合わせる量が時間経過や利用傾向で増えているということ
ということは、タイムアウトを延長するHowを選択すると、今後もタイムアウトのチューニングが続くことになります
また、担当する別の古いWebアプリケーションでは、手入れするにも手数がかかるという判断で、ALBの idle_timeout を延長したり、HTTPクライアントライブラリの timeout 設定を延長したり、と忸怩たる思いで逃げのHowを取ることも多かったため、そのようなことはしたくないという気持ちもありますね 自分がコントロール可能なことはコントロールしておきたいです
GoでWebサーバを標準ライブラリで実装していくと自然とgoroutineで処理することになります
Serve accepts incoming HTTP connections on the listener l, creating a new service goroutine for each. The service goroutines read requests and then call handler to reply to them.
しかし自分でgoroutineを使った実装をすることはここまでありませんでした
前述のとおり nextPageLink を使った逐次リクエストでは goroutine で実行できないため、lastPageLink から最後のページを特定し、for ループで各ページ分のリクエストを goroutine で並行実行するプランです
とはいえ、使ってみるとそこまで難しくなく、各goroutineが共有するリソースに気を使えばそこまで難しくない
code:goroutine.go
var mu sync.Mutex
// リクエストが一度でも失敗したら実行を停止させたいので errgroup を使いました
var eg errgroup.Group
// 初回リクエストで lastPage がわかるのですでに初回リクエストは済んでおり2ページ目からリクエストする想定です
for page := 2; page <= lastPage; page++ {
eg.Go(func() error {
res, err := getResponse(ctx, page)
if err != nil {
return err
}
mu.Lock()
result := append(result, res.Data...)
mu.Unlock()
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
return result, nil
デプロイして課題は解決しよかったね、ではあるのですが、lambdaにおいて同時実行数の制限についても知っておくべきです
特に実行時にFDを使い果たす、メモリを使い果たす、などが考えられます
see:
と考えると同時実行数を制限する必要がありますね、そうです、セマフォです
典型的には以下のような実装が望ましいでしょう
code:semaphore.go
const maxConcurrent = 5 // 同時に実行する最大リクエスト数
semaphore := make(chan struct{}, maxConcurrent)
for page := 2; page <= lastPage; page++ {
eg.Go(func() error {
semaphore <- struct{}{} // セマフォを取得
defer func() { <-semaphore }() // セマフォを解放
res, err := getResponse(ctx, page)
if err != nil {
return err
}
// 以下同
})
}
ここでようやくセマフォが出てきました。同時実行数の制限です
人間も機械も制約があればこそ、スループットを高めることができます 今日は以上です