Haskell による並列・並行プログラミング #11 9章 キャンセルとタイムアウト
担当: maton
プレゼンテーションモード
章立て
9.1 非同期例外
9.2 非同期例外のマスク
9.3 bracket
9.4 チャネルに対する非同期例外の安全性
9.5 タイムアウト
9.6 非同期例外の捕捉
9.7 maskとforkIO
9.8 非同期例外に関して
キャンセルの必要性
ネットワーク通信中のユーザーからの割り込み要求
サーバーの、クライアントからの要求待ちのタイムアウト
ユーザーインターフェースを介した重い処理の割り込み
などなど
キャンセルの実現方法
1. キャンセルをされうるスレッドがポーリングする
ポーリング: 定期的なキャンセル条件の確認
スレッドによるポーリングの実施はプログラマの責任
忘却の恐れがある
2. 当該スレッドを直接キャンセルする
データ不整合を防ぐため、クリティカルセクションの保護が必要
キャンセルの設計
Haskellでは 2. をデフォルトとする
殆どのコードが純粋関数
ポーリングはできない(それは副作用を持つ関数になる)
クリティカルセクションを持たず、キャンセル可能
以下、IOモナド中でのキャンセルの実現方法について見ていく。
9.1 非同期例外
同期例外:発生タイミングが決まっている(想定内の)例外
例: openFile: does not exist (No such file or directory)
throw, throwIO などから投げられる(プログラマが例外を投げるコードを書く)
非同期例外: 発生タイミングが不明な例外
例: ユーザーからの割り込み(SIGINTシグナルなど)
プログラム中で非同期例外を起こす方法として、 throwTo が提供されている
throwTo
code:haskell
throwTo :: Exception e
=> ThreadId -- 例外を投げる対象
-> e -- 投げたい例外
-> IO ()
ThreadId を指定して例外を投げる
ThreadId は forkIO :: IO () -> IO ThreadId を呼び出すことで得られる
非同期例外を投げてみよう
(1/3)
例題: (8.2 節の) Async の API を拡張
(geturlscancel を表示)
ユーザーが「q」キーを入力して任意の時点でダウンロードを停止できるようにする
非同期例外を投げてみよう
(2/3)
キャンセル操作の実装
cancel :: Async a -> IO ()
実装には ThreadId が必要
Async のデータ型 を拡張して、ThreadId を持つようにする
throwTo で Control.Exception.ThreadKilled を投げてあげる
waitCatch :: Async a -> IO (Either SomeException a) は Left ThreadKilled を返すようになる。
async :: IO a -> IO (Async a) の拡張
forkIO が返す ThreadId を Async 構成子に積む
非同期例外を投げてみよう
(3/3)
main の拡張
<1> ダウンロード処理を async
<2> 標準有力からの入力待ち受けを行うスレッドを生成
stdin の行バッファリングはオフにしておく
じゃないとnewlineを送出するまで入力がバッファされる
qキーが入力されたら mapM_ cancel as で全スレッドに非同期例外を送出
<3> すべてのスレッドを待ち合わせ(完了するかキャンセルされる)
<4> ダウンロード処理の結果を表示
動かしてみよう
9.2 非同期例外のマスク
非同期例外の危険性
共有されている状態を更新する最中に例外が起こったとき、データ不整合が発生する
クリティカルセクションの間は非同期例外の伝達を一旦止めて、あとで再開するという方法が考えられる
本当に大丈夫?
例題: problem関数
(書籍のみ)
code:haskell
problem :: MVar a -> (a -> IO a) -> IO ()
problem m f = do
a <- takeMVar m --── <1>
r <- f a catch \e -> do --┐
putMVar m a --┼─ <2>
throw e --┘
putMVar m r --── <3>
takeMVar した値に関数を適用して、例外を適切にハンドリングするコンビネータを考えてみる
処理中に発生した例外が catch されたら、関数適用前の値を putMVar してMVarを開放する
例外が<1>と<2>の間や、<2>と<3>の間で起こると、MVarは空のままになってしまう。
mask コンビネータ
mask :: ((IO a -> IO a) -> IO b) -> IO b
mask に関数を渡すと、その関数の実行が終了するまで非同期例外の伝達が遅延される
先ほどの例に mask を適用する
code:haskell
problem :: MVar a -> (a -> IO a) -> IO ()
problem m f = mask $ \restore -> do
a <- takeMVar m --── <1>
r <- restore (f a) catch \e -> do --┐
putMVar m a --┼─ <2>
throw e --┘
putMVar m r --── <3>
mask :: ((IO a -> IO a) -> IO b) -> IO b
restore :: IO a -> IO a
restore の後ろの do 全体は IO b すなわち IO ()
下:元のコード(比較用)
code:haskell
problem :: MVar a -> (a -> IO a) -> IO ()
problem m f = do
a <- takeMVar m --── <1>
r <- f a catch \e -> do --┐
putMVar m a --┼─ <2>
throw e --┘
putMVar m r --── <3>
先ほどの例に mask を適用する
https://gyazo.com/6e08c6dd8d16854f6698da647b993ca7
restore を適用した関数以外はマスクされ、 (f a) を除いた部分では非同期例外が発生しない
新たな問題
https://gyazo.com/c7aad9a263f9c42d6ed115057c125943
takeMVar が長期に渡ってブロックされている場合、takeMVar はマスク内にあるのでスレッドが無反応になる
それでは困るので、takeMVar を含むいくつかの操作は割り込み可能になっている
割り込み可能関数は、マスク内にあったとしても非同期例外を受け取れる
無限にブロックされる可能性のある操作はすべて割り込み可能操作に指定されている
別の問題
https://gyazo.com/19baf215e9f4713d6542f82cb2618139
なお、putMVar は無限にブロックされる可能性があるが、割り込み可能関数ではない
割り込み可能にしてしまうと、例外が<1>と<2>の間や、<2>と<3>の間で起こりうることになり、安全ではなくなる
無限にブロックされる可能性のほうは大丈夫なの?
(MVar操作の書き方を守れば)大丈夫です
/emoji/warning.icon どの操作も一貫して takeMVar した後に putMVar すること!
守らなければデッドロックの危険性がある
高レベルのコンビネータ
modifyMVar_ :: MVar a -> (a -> IO a) -> IO ()
先ほどの problem 関数
modifyMVar :: MVar a -> (a -> IO (a, b)) -> IO b
MVar への値とは別に返り値が欲しい場合
使用例
CAS (compare and swap) 操作
code:haskell
casMVar :: Eq a => MVar a -> a -> a -> IO Bool
casMVar m old new =
modifyMVar m $ \cur ->
if cur == old
then return (new, True)
else return (cur, False)
現在の MVar の中身が old と等しければ new に置き換えて True を返す
そうでなければ変更せず False を返す
使用例2
2つのMVarに対する変更
code:haskell
modifyTwo :: MVar a -> MVar b -> (a -> b -> IO (a, b)) -> IO ()
modifyTwo ma mb f =
modifyMVar_ mb $ \b ->
modifyMVar ma $ \a -> f a b
内側の modifyMVar で例外が発生したら、外側の modifyMVar_ がMVarの内容を復元する
/emoji/warning.icon 2つ以上のMVarから値を取るときはいつも同じ順番でとること
デッドロックのリスクがある(10章で議論)
質問: modifyMVar の引数の順番を逆にすると動かなくなる?
要検証
その他のトピック
割り込み可能関数をどうしても割り込み不可にして使いたい場合
uninterruptibleMask :: ((IO a -> IO a) -> IO b) -> IO b
/emoji/warning.icon うっかり無限ブロックがおきるとプログラムが無反応になるので注意
デバッグでは現在のスレッドのマスク状態をがわかると便利
getMaskingState :: IO MaskingState
data MaskingState = Unmasked | MaskedInterruptible | MaskedUninterruptible
9.3 bracket
code:haskell
bracket
:: IO a -- リソース獲得
-> (a -> IO b) -- リソース開放
-> (a -> IO c) -- リソースに対する計算
-> IO c -- 計算結果
bracket before after thing =
mask $ \restore -> do
a <- before
r <- restore (thing a) onException after a
_ <- after a
return r
実は、mask を使って非同期例外を安全にハンドリングするようできている
before がブロックされている間に例外が発生しても問題ない
ただし、before で実行するブロックされる操作は1つだけにすること
2つ以上のブロックされる操作を使うなら、 bracket をネストして使用すること
after 内でブロックされる操作を使うなら、操作が割り込み可能であり、非同期例外を受け取るかもしれないことに注意すること
9.4 チャネルに対する非同期例外の安全性
大部分の MVar を含むコードは takeMVar と putMVar の代わりに modifyMVar_ を使って非同期例外に対して安全に書ける
例題: 7.5 バッファ付きチャネル (chan2.hs)
code:haskell
readChan :: Chan a -> IO a
readChan (Chan readVar _) = do
stream <- takeMVar readVar -- <!>
Item val tail <- readMVar stream
putMVar readVar tail
return val
<!> で非同期例外が起こると、readVar は空のままになる
後からそのチャネルを読もうとするとデッドロック
modifyMVar を使って直す (chan3.hs)
code:haskell
readChan :: Chan a -> IO a
readChan (Chan readVar _) = do
modifyMVar readVar $ \stream -> do
Item val tail <- readMVar stream -- <!>
return (tail, val)
これでOK…ではない
<!> の readMVar の中身をみると、
code:haskell
readMVar :: MVar a -> IO a
readMVar m = do
a <- takeMVar m
-- <!>
putMVar m a
return a
<!> の位置で例外が起こると、MVarが空のままになる
readMVar を直す
modifyMVar :: MVar a -> (a -> IO (a, b)) -> IO b でもよいし
withMVar :: MVar a -> (a -> IO b) -> IO b もある
最も単純なのは mask を使うこと
code:haskell
readMVar :: MVar a -> IO a
readMVar m = mask_ $ do
a <- takeMVar m
putMVar m a
return a
mask_ は mask の restore 関数を渡さないバージョン
内部で例外ハンドリングをしない場合はこれでよい
writeChan を直す
code:haskell
writeChan :: Chan a -> a -> IO ()
writeChan (Chan _ writeVar) val = do
newHole <- newEmptyMVar
oldHole <- takeMVar writeVar
putMVar oldHole (Item val newHole)
putMVar writeVar newHole
writeVar を modifyMVar_ を使って操作すればよさそう
writeChan を直す(失敗)
code:haskell
wrongWriteChan :: Chan a -> a -> IO ()
wrongWriteChan (Chan _ writeVar) val = do
newHole <- newEmptyMVar
modifyMVar_ writeVar $ \oldHole -> do
putMVar oldHole (Item val newHole) -- <1>
return newHole -- <2>
<1>と<2>の間で非同期例外が発生すると、oldHole に値が入った段階で writeVar は oldHole を指したままになってしまう。
先頭は空っぽというデータ構造の不変条件に違反する
writeChan を直す(解決)
code:haskell
writeChan :: Chan a -> a -> IO ()
writeChan (Chan _ writeVar) val = do
newHole <- newEmptyMVar
mask_ $ do
oldHole <- takeMVar writeVar
putMVar oldHole (Item val newHole)
putMVar writeVar newHole
writeVar への操作を mask_ で包む
putVar は割り込み可能でない
2つのputVarの間で非同期例外が発生するリスクがない!
9.5 タイムアウト
非同期例外を用いたプログラミングの良い例題となる
code:haskell
timeout :: Int -> IO a -> IO (Maybe a)
timeout t m = ...
期待される振る舞い
1. t マイクロ秒以内に m が結果を返す(同期/非同期例外含め)ならば、 fmap Just m と同様
2. そうでなければ m に非同期例外 Timeout u が送られ、 Nothing を返す
u は Unique 型(何らかの一意な値)
複数の Timeout u の送出を区別するため
タイムアウトへの要求
t マイクロ秒は厳密に実時間である必要はない
m は現在のスレッドで実行されてほしい
m が内部で myThreadId などを呼ぶ場合、 timeout の有無で ThreadId が変化してはいけない
別のスレッドが throwTo を使って m に割り込めるようになっていなければならない
タイムアウトがネストしていても期待通りの振る舞いをしてほしい
タイムアウトの実装
cf. System.Timeout.timeout
code:haskell
timeout t m
| t < 0 = fmap Just m --┐
| t == 0 = return Nothing --┼─ <1>
| otherwise = do --┘
pid <- myThreadId --── <2>
u <- newUnique --┬─ <3>
let ex = Timeout u --┘
handleJust --── <4> (*)
(\e -> if e == ex then Just () else Nothing)
(\_ -> return Nothing)
(bracket (forkIO $ do threadDelay t -- (**)
throwTo pid ex))
(\tid -> throwTo tid ThreadKilled)
(\_ -> fmap Just m))
トップダウンに見ていくと
1. t の値の場合分け
2. 現在のスレッドID取得
3. ユニーク値の取得およびタイムアウト例外の作成
4. タイムアウト処理及びタイムアウトのハンドリング
例外ハンドラ (*)
タイムアウトの本体 (**)
タイムアウトの実装(*)
(*)部分へフォーカス
code:haskell
handleJust
(\e -> e == ex then Just () else Nothing) --── <1>
(\_ -> return Nothing) --── <2>
(bracket (forkIO $ do threadDelay t --┐
throwTo pid ex) --┤
(\tid -> throwTo tid ThreadKilled) --┼─ <3> (**)
(\_ -> fmap Just m)) --┘
code:haskell
handleJust :: Exception e
=> (e -> Maybe b) -- 例外の捕捉
-> (b -> IO a) -- 例外のハンドリング
-> IO a -- 実行するアクション
-> IO a
トップダウンに見ていくと
1. 用意したタイムアウト例外 ex が実行するアクションの中から投げられたら、その例外のみを捕捉する
2. ハンドリングの結果としては Nothing を返す
3. bracket の部分がハンドリングされる対象のアクションである(**)
タイムアウトの実装(**)
(**)部分へフォーカス
code:haskell
bracket (forkIO $ do threadDelay t --┐
throwTo pid ex) --┴─ <1>
(\tid -> throwTo tid ThreadKilled) --── <2>
(\_ -> fmap Just m) --── <3>
再掲: 9.3 bracket
code:haskell
bracket
:: IO a -- リソース獲得
-> (a -> IO b) -- リソース開放
-> (a -> IO c) -- リソースに対する計算
-> IO c -- 計算結果
1. t マイクロ秒待って親スレッドにタイムアウト例外を投げる子スレッドを作る(タイマー)
2. 子スレッドの ThreadId tid が手に入るので、ThreadKilled を送出してキルする
3. 本体の計算。引数 m を実行して Just で包む
タイムアウトの実装は正しい?
https://gyazo.com/3bd86a7343c187b0c0edf3b14d2fb9d8
ここまで。
9.6 非同期例外の捕捉
catch などの関数で非同期例外を捕捉したいとき、別の非同期例外が飛んできたらどうする?
catch を mask で包み、restore と対にすれば良い
code:haskell
mask $ \restore ->
restore action catch handler
実は、Haskellの例外ハンドラは暗黙的にマスクされている
暗黙のmaskの落とし穴
例外ハンドラから別の関数を末尾呼び出しすると、その関数は暗黙のmaskの内側に残ってしまう
悪い例: catch-mask.hs
実行すると、1回目のgetMaskingStateはUnmaskedを返すが、2回目ではMaskedInterruptibleになる
L14 の例外ハンドリング(MaskedInterruptible)中の loop 関数呼び出しが原因
良い例: catch-mask2.hs
Control.Exception.try を使って書く
例外ハンドラは try 内部に隠されたので、 mask 内部で再帰呼び出しされることが防がれた!
9.7 maskとforkIO
async 関数再訪
code:haskell
async :: IO a -> IO (Async a)
async action = do
m <- newEmptyMVar
t <- forkIO (do r <- try action; putMVar m r)
return (Async t m)
実は、try と putMVar の間に例外が発生すると、MVarは空のままになる
プログラムが Async の結果をwaitした場合にデッドロックしてしまう
mask で防げるのだが、 try の直前に例外が発生すると防ぎようがない
じゃあ forkIO も mask で包んじゃえばいいじゃん?
子プロセスってマスクされるものなの…?
forkIO の mask 対応
実は、folkIO は親スレッドのマスク状態を継承したスレッドを生成する
つまりこう書ける
code:haskell
async :: IO a -> IO (Async a)
async action = do
m <- newEmptyMVar
t <- mask $ \restore ->
forkIO (do r <- try (restore action); putMVar m r)
return (Async t m)
forkIO の一般化
「スレッドが完了した時に何らかのアクションを実行する」という形式は頻出
Control.Concurrent.forkFinally
code:haskell
forkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally action fun =
mask $ \restore ->
forkIO (do r <- try (restore action); fun r)
forkFinally を使う
code:haskell
async :: IO a -> IO (Async a)
async action = do
m <- newEmptyMVar
t <- forkFinally action (putMVar m)
return (Async t m)
forkIO (x \`finally\` y) という箇所をみつけたら forkFinally x (\_ -> y) と書いたほうが良い
9.8 非同期例外に関して
ここまで、timeout や Chan などの複雑な抽象化を見てきた
Haskellでプログラミングする上でこのレベルで非同期例外を扱うことは稀である
IOを含まないHaskellコードは概ね安全
bracket のような非同期例外安全な抽象化を使おう
MVar を使うなら modifyMVar 族を使おう
複雑な場合
(リソースなど)状態に依存するアクションが連続する場合
一連のアクションを mask に包んで非同期例外をポーリングするようにしよう
その中で割り込み可能な操作を見つけて、割り込み可能な操作が発生させる例外を正しくハンドリングするようにしよう
MVar などの状態表現の代わりにソフトウェアトランザクショナルメモリ(STM)を使おう
複数の操作を合成して、不可分な単位を作ることができる(10章にて)
非同期例外は便利
スタックオーバーフローやユーザ割り込み(Ctrl+C)など多くの例外を非同期例外として扱える
サードパーティ製コードであっても割り込みはちゃんとできる
例外は外部関数呼び出し(15.3.1で議論)
Haskellではスレッドが何もできずにただ死ぬということはなく、後始末をする機会や例外ハンドラを走らせる機会がある
近年の非同期例外のトピック
safe-exception モジュール
syocy さんの記事、9.8節の楽観的な話と対比してやっぱり非同期例外がツラいという話をされている