Haskell による並列・並行プログラミング #13 10章 ソフトウェアトランザクショナルメモリ
担当: wado さん
10.4 STMを使ったマージ から
以下、メモ
前回まで
STM の導入
不可分に実行されるコード片
TMVar:TVarを使ったMVarの実現
Maybeを使う
ブロックを retry で実現する
10.4 マージ
orElse a b
a が retry すると a のエフェクトを捨てて b を実行
左偏性があるので公平性に影響する
AND的な表現は >>= で実現する
10.5 Async再考
MVar を TMVar で置き換える
waitSTM, waitCatchSTM の導入
waitEither, waitAny
orElse を使って簡潔に書ける
10.6 チャネル
STM導入の効果
実装が簡単になる
MVarでは実装ができなかった unGetChan を実装できる
StreamではなくTVarList
空のMVarの代わりにTNil構築子を使う
ブロッキングは retry 呼び出しで実現
API群はSTMモナドを返す
合成して外側で atomically を呼ぶ
非同期例外安全
throwSTM, catchSTM
IO モナドと違い、エフェクト m が例外を投げると、エフェクトはすべて破棄される
例外ハンドラ、bracket、mask などを気にしなくて良い
10.7 もう1つのチャネル実装
dupChan操作を諦めればもっと効率の良いチャネルを実装できる
キュー(2リストキュー)
値の書き込みがO(1)になる(償却計算量)
10.8 有界チャネル
単一項チャネル MVar / TMVar
十分な並行性が得られない
無限項チャネル Chan / TChan
書き込みが多いとメモリを使い潰す
有界チャネル
書き込み量の上限を持つ
危険性
デッドロックを含むプログラムが書けてしまう
2つのキューを2つのスレッドから扱う場合
それぞれを atomically で操作しているのが問題
10.9 STMでできないこと
STM の優位点
合成可能な不可分性
合成可能なブロッキング
単純なエラー処理
MVar の優位点
速度(常にではない)
公平性
公平性
トランザクションは任意の条件でブロックできる
FIFO順でブロックを解除することを保証できない
STM を用いた公平性の実装
ブロックされたスレッドをTVar (Maybe a)のリストで表現する
実装するには合成可能性を諦めて、操作をIOモナド内で行わなければならない
できないこと:スレッド間の多対多の通信
10.10 性能
トランザクションは readTVar / writeTVar のログを蓄積しながら動いている
writeTVar: トランザクションのエフェクトを破棄しやすくなっている
readTVar: ログの長さに関してO(n)の操作
ログをメモリの内容と比較し、一致すればトランザクションのエフェクトをメモリにコミット
一致しなければログを破棄される
経験則
単一のトランザクションで大量のTVarを読み出してはいけない
atomically $ mapM takeMVar ts よりは mapM (atomically . takeMVar) ts のほうが軽い
トランザクション内で高価な計算はできるだけ避けよ
retry
トランザクション中に読まれるTVarの数に対してO(n)
11章 並行性の高水準な抽象化
担当: maton
Presentation Mode
本章の目標
10章で導入したSTMを使い、Async APIをより洗練させる
スレッドの木を作れるようにする
親スレッドがKillされたら子スレッドはキャンセルされる
子スレッドがKillされたら親スレッドに通知される
すべてのスレッドにはKillされたときの例外をハンドリングする機会が与えられる
章立て
11.1 スレッド漏れの回避
11.2 対称型並行性コンビネータ
11.3 Functor インスタンスの追加
11.4 まとめ:Async API
11.1 スレッド漏れの回避
目標:現在のスレッドがKillされたら自動的にキャンセルされるようなAsyncを生成する
動機となる例
code:haskell
main = do
r1 <- wait a1
r2 <- wait a2
print (B.length r1, B.length r2)
上記のコードの a1 で例外が発生するとどうなる?
wait a1 でも例外が投げられ、例外が main の外へと伝搬する
一方で a2 は走ったままになる
main の外からは a2 に手が出せなくなっている→スレッド漏れ状態
とりわけ複数のスレッドを生成するときの開放処理は注意しないと抜けやすい
bracket による対処
やりたいこと
Asyncの生成
例外時にAsyncをキャンセルする
→bracket を使ったリソース獲得/開放パターンが使える
bracket (async io) cancel operation
code:haskell
main = do
cancel $ \a1 -> do
cancel $ \a2 -> do
r1 <- wait a1
r2 <- wait a2
print (B.length r1, B.length r2)
安全になった。しかしちょっと冗長
すっきりさせる
code:haskell
withAsync :: IO a -> (Async a -> IO b) -> IO b
withAsync io operation = bracket (async io) cancel operation
これを使うと…
code:haskell
r1 <- wait a1
r2 <- wait a2
print (B.length r1, B.length r2)
OK。ただし、a2 が先に失敗した場合には a1 は直ちにはキャンセルされない(対称性がない)。
11.2 対称型並行性コンビネータ
目標:2つのAsyncを走らせて、どちらかが失敗したらすぐに停止するようなコンビネータを作る
言い換えれば、どちらの完了も待つコンビネータ
waitBoth :: Async a -> Async b -> IO (a, b)
以前扱ったのは waitEither :: Async a -> Async b -> IO (Either a b)
waitBoth の実装
code:haskell
waitBoth :: Async a -> Async b -> IO (a, b)
waitBoth a1 a2 =
atomically $ do =
r1 <- waitSTM a1 orElse (do waitSTM a2; retry) -- (*1)
r2 <- waitSTM a2
return (a1, a2)
意外に複雑。なぜ?
STMアクションには「結果を返す」「例外を投げる」「リトライする」の3つの可能性がある
table:withBothの結果は対称的でなければならない
a1 \ a2 結果を返す 例外を投げる リトライする
結果を返す 結果を返す 例外を投げる リトライする
例外を投げる 例外を投げる 例外を投げる 例外を投げる
リトライする リトライする(*1) 例外を投げる(*1) リトライする(*1)
(*1) を実現するために、a1 がリトライする場合に orElse を使って a2 を試行する必要がある
waitBoth を使った抽象化
2つのIOアクションを走らせてどちらか一方でも失敗したら中断する対称な関数
code:haskell
concurrently :: IO a -> IO b -> IO (a, b)
concurrently ioa iob =
withAsync ioa $ \a ->
withAsync iob $ \b ->
waitBoth a b
geturl に適用すると
code:haskell
main = do
(r1,r2) <- concurrently
print (B.length r1, B.length r2)
URLのリストへの対応
code:haskell
main = do
xs <- foldr conc (return []) (map getURL sites)
print (map B.length xs)
where
conc ioa ioas = do
(a,as) <- concurrently ioa ioas
return (a:as)
気持ちとしては
conc getURL1 (conc getURL2 ... (conc getURLN (return [])) ... ) のような感じ
どちらか一方を待つ race
一方が結果を返すか例外を投げたら、もう一方はキャンセル
code:haskell
race :: IO a -> IO b -> IO (Either a b)
race ioa iob =
withAsync ioa $ \a ->
withAsync iob $ \b ->
waitEither a b
スレッドの木
race と concurrently を繰り返し呼ぶことでスレッドの木が組み上がる
親が例外を投げたり非同期例外を受け取ったら、子は再帰的にキャンセルされる
ある子が例外を受け取ったら兄弟はキャンセルされる
直接木構造を作るわけではないので木を管理する必要はない
11.2.1 race を使ったタイムアウト
かなり簡潔になる
code:haskell
timeout :: Int -> IO a -> IO (Maybe a)
timeout n m
| n < 0 = fmap Just m
| n == 0 = return Nothing
| otherwise = do
r <- race (threadDelay n) m
case r of
Left _ -> return Nothing -- タイムアウト
Right a -> return (Just a) -- m の結果
ただし、元々の実装と違って、
他のスレッドから ThrowTo で例外を投げても race が ThreadKilled を送出するので元々の例外をハンドルできない
タイムアウト時にも race が ThreadKilled を送出するのでタイムアウトの場合だけ特別な処理をすることができない
race は余分に1つスレッドを生成してしまう(元々の実装では現在のスレッドを使いまわしていた)
11.3 Functor インスタンスの追加
code:haskell
waitAny asyncs =
atomically $ foldr orElse retry $ map waitSTM asyncs
複数のAsyncが異なる結果の型を持つ場合に対応できない
Async の結果の型を特定の型へ変換できるようにサポートする
Async を Functor のインスタンスにする
code:haskell
class Functor f where
fmap :: (a -> b) -> f a -> f b
準備:データ型の変更
code:haskell
data Async a = Async ThreadId (TMVar (Either SomeException a))
TMVar は TVar を使って定義されているが、 TVar は Functor のインスタンスではない
code:haskell
data Async a = Async ThreadId (STM (Either SomeException a))
そこで、STMを使って定義することにする
STM は Functor のインスタンスを持つ
実はこの変更に伴うasyncの実装の変化は、readTMVar を移動させるだけで済む
(Diff を写す)
fmap の実装
code:haskell
instance Functor Async where
fmap f (Async t stm) = Async t stm'
where stm' = do
r <- stm
case r of
Left e -> return (Left e)
Right a -> return (Right (f a))
11.4 まとめ:Async API
ここまでのAsyncの実装で学んだこと
8章: 非同期な実行 async と待ち wait によるシンプルなAPI
forkIO, MVar
9章: 非同期コード中の例外の補足と呼び出し元への伝搬
mask + forkIO
10章: STM を用いた再実装
TMVar
本章
対称型コンビネータ waitEither, waitBoth
withAsync によるスレッド漏れの回避
高水準の対称型コンビネータ race と concurrently
完全なライブラリは async パッケージに
次回12章