スレッドベースの並列処理
マルチスレッドに向く問題
ネットワークプログラミングやデータ入出力に関連するプログラムなど、処理時間の多くをI/Oなど外部イベントの待機に費やすようなタスクは、一般的にスレッド化することで性能が向上します。しかし、シミュレーションなど膨大なCPU計算を必要とし、外部イベントの待機にほとんど時間をかけないようなタスクについては、スレッド化では性能向上が期待できません。
もう少し具体的な例を考えてみましょう。例えば、多數のURLからWebスクレイピングしているPythonコードについて考えてみます。各URLには、コンピューターのCPU処理時間よりはるかに大きいダウンロード時間があるため、シングルスレッドでの実装はI / Oバウンドになります。
ダウンロードリソースごとに新しいスレッドを生成することで、プログラムは複数のデータソースを並行してダウンロードすることができます。これは、ダウンロードタスクがそれ以前のWebページのダウンロードタスクの終了を待機していないことになります。この場合、プログラム性能はクライアント/サーバーの帯域幅制限に依存することになります。
別の例では、金融系アプリケーションの多くは、ほとんど数値処理に集中するため、CPUバウンドタスクとなります。また。モンテカルロシミュレーションのように、大規模な数値線形代数解や統計処理を伴うことがよくあり、この場合もPUバウンドタスクとなります。
threading モジュールの利用方法
Threadクラス
Python 標準モジュールである threading で提供される関数でよく使用されるものを列挙します。
threading.active_count(): 生存中(Alive)のThreadオブジェクトの数を返します。
threading.current_thread(): この関数を呼び出している処理のスレッドに対応する Thread オブジェクトを返します。
threading.enumerate(): threading.active_count()と同じ値
threading.get_native_id():スレッドIDを返します。非負の整数です。
Threadオブジェクトには次のメソッドやアトリビュートがあります。
start(): スレッドの活動を開始します。内部で run() を呼び出しています。
run():スレッドを実行します。
join():スレッドが終了するまで待機します。
native_id:スレッドIDを保持します。実行中のスレッドは非負の整数で開始前のスレッドはNoneがセットされています。
Threadオブジェクト
スレッドを使用する最も簡単な方法は、ターゲット関数を使用してThreadクラスをインスタンス化し、start()を呼び出すことです。これによりスレッドが機能するようになります。
code: 01_thread_staart.py
from threading import Thread
def worker():
print('Worker')
print('Main Starting')
threads = []
for _ in range(5):
task = Thread(target=worker)
threads.append(task)
task.start()
code: bash
$ python 01_thread_start.py
Main Starting
Worker
Worker
Worker
Worker
Worker
現在のスレッドを知る
引数を使用してスレッドを識別または名前を付けるのは面倒で不要です。 各Threadインスタンスには、スレッドの作成時に変更できるデフォルト値の名前があります。 スレッドの命名は、異なる操作を処理する複数のサービススレッドを持つサーバープロセスで便利です。
code: 02_thread_name.py
import threading
import time
import random
def worker():
print(threading.current_thread().getName(), 'Starting')
time.sleep(random.random())
print(threading.current_thread().getName(), 'Exiting')
def app():
print(threading.current_thread().getName(), 'Starting')
time.sleep(random.random())
print(threading.current_thread().getName(), 'Exiting')
main = threading.Thread(name='my_app', target=app)
worker1 = threading.Thread(name='worker', target=worker)
worker2 = threading.Thread(target=worker) # デフォルト名を使用
worker1.start()
worker2.start()
main.start()
出力には、各行の現在のスレッドの名前が含まれます。 スレッド名の列に"Thread-1"が含まれている行は、名前のないスレッドworker1に対応しています。
code: bash
$ python 02_thread_name.py
worker Starting
Thread-1 Starting
my_app Starting
Thread-1 Exiting
my_app Exiting
worker Exiting
プログラムのデバッグではprint()を使用せずにloggingモジュールを使用するようにしましょう。 loggingモジュールは、フォーマッタコード"%(threadName)s"を使用したすべてのログメッセージへのスレッド名の埋め込みをサポートしています。 ログメッセージにスレッド名を含めると、それらのメッセージをソースまでさかのぼることができます。 前述のコードは次のようになります。
code: 03_thread_name_logging.py
import threading
import time
import random
import logging
def worker():
logging.debug('Starting')
time.sleep(random.random())
logging.debug('Existing')
def app():
logging.debug('Starting')
time.sleep(random.random())
logging.debug('Existing')
logging.basicConfig(
level=logging.DEBUG,
)
main = threading.Thread(name='my_app', target=app)
worker1 = threading.Thread(name='worker', target=worker)
worker2 = threading.Thread(target=worker) # デフォルト名を使用
worker1.start()
worker2.start()
main.start()
loggingもスレッドセーフであるため、異なるスレッドからのメッセージは出力で区別されます。
code: bash
$ python 03_thread_name_logging.py
DEBUG (Thread-1 ) Starting DEBUG (Thread-1 ) Existing この後も、ログの設定は何度も使用することになるので、cutom_logging.py としてモジュールにしておきます。
code: custom_logging.py
import logging
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
デーモンスレッド
これまでサンプルプログラムは、すべてのスレッドが作業を完了するまで、暗黙的に終了を待っています。プログラムによっては、メインプログラムの終了をブロックせずに実行されるデーモンとしてスレッドを生成させたい場合があります。 デーモンスレッドの使用は、スレッドを中断する簡単な方法がない場合や、作業の途中でスレッドを停止させてもデータが失われたり破損したりしないサービスで便利になります。例えば、サービス監視ツールの場合の死活監視のタスクを生成するスレッドです。スレッドをデーモンとしてマークするには、Threadのコンストラクタに引数daemon=Trueを渡すか、インスタンスでset_daemon()メソッドを呼び出します。 daemon引数のデフォルトはFalseで、スレッドはデーモンではありません。
code: thread_daemon.py
import time
import random
import threading
from custom_logging import logging
def daemon():
logging.debug('Starting')
time.sleep(random.random())
logging.debug('Exiting')
def non_daemon():
logging.debug('Starting')
time.sleep(random.random())
logging.debug('Exiting')
daemon_task = threading.Thread(name='daemon', target=daemon, daemon=True)
normal_task = threading.Thread(name='non-daemon', target=non_daemon)
code: 04_thread_daemon_start.py
from thread_daemon import daemon_task, normal_task
daemon_task.start()
normal_task.start()
デーモンスレッドが sleep()を呼び出してから復帰する前に、すべての非デーモンスレッド(メインスレッドを含む)が終了するため、出力にはデーモンスレッドからの"Exiting"のメッセージがありません。
code: bash
$ python 04_thread_daemon_start.py
(daemon ) Starting
(non-daemon) Starting
(non-daemon) Exiting
デーモンスレッドが作業を完了するまで待つには、join()メソッドを使用します。
code: 05_thread_join.py
from thread_daemon import daemon_task, normal_task
from custom_logging import logging
daemon_task.start()
normal_task.start()
daemon_task.join()
normal_task.join()
このコードは、thread_daemon.py とほとんど同じで、join()メソッドの呼び出しの2行が末尾に追加されているだけです。
今度は、join()メソッドが呼び出されているので、デーモンスレッドが終了するのを待つことになり、デーモンスレッドの"Exiting"メッセージが出力されます。
code: bash
$ python 05_thread_join.py
(daemon ) Starting
(non-daemon) Starting
(non-daemon) Exiting
(daemon ) Exiting
デフォルトでは、join()メソッドは無期限にブロックします。 スレッドが非アクティブになるのを待つ秒数を表すfloat値をjoin()に渡すこともできます。 この場合、スレッドがタイムアウト時間内に完了しない場合でも、join()は戻ってきます。
code: 06_thread_join_timeout.py
rom thread_daemon import daemon_task, normal_task
from custom_logging import logging
daemon_task.start()
normal_task.start()
daemon_task.join(0.1)
print(f'daemon_task is alive?: {daemon_task.is_alive()}')
normal_task.join()
渡されたタイムアウトはデーモンスレッドがスリープしている時間よりも短いときは、join()が戻った後もスレッドは動作しています。
code: bash
% python 06_thread_join_timeout.py
(daemon ) Starting
(non-daemon) Starting
daemon_task is alive?: True
(non-daemon) Exiting
すべてのスレッドを列挙
メインプロセスを終了する前にデーモンスレッドが完了したことを確認するために、すべてのデーモンスレッドへの明示的なハンドルを保持する必要はありません。 threading.enumerate()関数は、アクティブなスレッドインスタンスのリストを返します。 リストには現在のスレッドが含まれ、現在のスレッドに参加するとデッドロック状態が発生するため、スキップする必要があります。
code: 07_thread_enumerate.py
from custom_logging import logging
import time
import random
import threading
def worker():
pause = random.randint(1, 5) / 10
logging.debug(f'sleeping {pause:0.2}')
time.sleep(pause)
logging.debug('ending')
for _ in range(3):
task = threading.Thread(target=worker, daemon=True)
task.start()
main_thread = threading.main_thread()
for task in threading.enumerate():
if task is main_thread:
continue
logging.debug(f'joining {task.getName()}')
task.join()
ワーカーはランダムな時間スリープしているため、このコードからの出力は異なる場合があります。
code: bash
$ python 07_thread_enumerate.py
(Thread-1 ) sleeping 0.4
(Thread-2 ) sleeping 0.5
(Thread-3 ) sleeping 0.3
(MainThread) joining Thread-1
(Thread-3 ) ending
(Thread-1 ) ending
(MainThread) joining Thread-2
(Thread-2 ) ending
(MainThread) joining Thread-3
Threadクラスのサブクラス
起動時に、Threadはいくつかの基本的な初期化を実行してから、コンストラクターに渡されたターゲット関数を呼び出すrun()メソッドを呼び出します。 Threadのサブクラスを作成するには、run()をオーバーライドする必要があります。
code: 08_thread_subclass.py
from threading import Thread
from custom_logging import logging
class MyThread(Thread):
def run(self):
logging.debug('running')
for _ in range(5):
task = MyThread()
task.start()
code: bash
$ python 08_thread_subclass.py
(Thread-1 ) running
(Thread-2 ) running
(Thread-3 ) running
(Thread-4 ) running
(Thread-5 ) running
Threadコンストラクターに渡される引数*argsと**kwargsは、接頭辞「__」を使用してプライベート変数に保存されるため、サブクラスから簡単にアクセスすることはできません。 カスタムスレッドタイプに引数を渡すには、コンストラクターを再定義して、サブクラスに表示されるインスタンス属性に値を保存します。
code: 09_thread_subclass_args.py
import threading
from custom_logging import logging
class MyThreadWithArgs(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
super().__init__(group=group, target=target, name=name,
daemon=daemon)
self.args = args
self.kwargs = kwargs
def run(self):
logging.debug(f'running with {self.args} and {self.kwargs}')
for _ in range(5):
task = MyThreadWithArgs(args=(i,), kwargs={'a': 1, 'b': 2})
task.start()
MyThreadWithArgsはThreadと同じAPIを使用しますが、スレッドの目的に直接関連するより多くの異なる引数を取ることができます。
code: bash
$ python 09_thread_subclass_args.py
(Thread-1 ) running with (0,) and {'a': 1, 'b': 2}
(Thread-2 ) running with (1,) and {'a': 1, 'b': 2}
(Thread-3 ) running with (2,) and {'a': 1, 'b': 2}
(Thread-4 ) running with (3,) and {'a': 1, 'b': 2}
(Thread-5 ) running with (4,) and {'a': 1, 'b': 2}
タイマースレッド
ThreadインスタンスのTimer()メソッドによってタイマーが提供されます。タイマーは遅延後に作業を開始し、その遅延時間内の任意の時点でキャンセルすることができます。
code: thread_timer.py
import time
import threading
from custom_logging import logging
def delayed():
logging.debug('worker running')
task1 = threading.Timer(3, delayed)
task1.setName('task1')
task2 = threading.Timer(2, delayed)
task2.setName('task2')
logging.debug('starting timers')
task1.start()
task2.start()
logging.debug(f'waiting before canceling {task2.getName()}')
time.sleep(1)
logging.debug(f'canceling {task2.getName()}')
task2.cancel()
logging.debug('done')
code: bash
$ python 10_thread_timer.py
(MainThread) starting timers
(MainThread) waiting before canceling task2
(MainThread) canceling task2
(MainThread) done
(task1 ) worker running
スレッドの同期
複数のスレッドを同期するためには次の方法があります。
スレッド間でシグナルを送受信
Lock/RLockオブジェクトを使用したロックの設定
Conditionオブジェクトの使用した条件設定
Barriersオブジェクトの使用したコントロールポイントの設定
Semaphoreオブジェクトの使用したリソースへの同時アクセス制御
スレッド間でのシグナル
複数のスレッドを使用するとき、別々の操作を同時に実行することになりますが、2つ以上のスレッドで操作を同期できることが重要になる場合があります。 Eventオブジェクトは、スレッド間で安全に通信するための簡単な方法です。 Eventは、呼び出し元がset()メソッドとclear()メソッドで制御できる内部フラグを管理します。 他のスレッドは、wait()を使用してフラグが設定されるまで一時停止し、続行が許可されるまで進行を効果的にブロックできます。
code: 11_thread_event_demo.py
import time
import threading
from custom_logging import logging
def wait_for_event(event):
logging.debug('wait_for_event starting')
event_is_set = event.wait()
logging.debug(f'event set: {event_is_set}')
def wait_for_event_timeout(event, task):
while not event.is_set():
logging.debug('wait_for_event_timeout starting')
event_is_set = event.wait(task)
logging.debug(f'event set: {event_is_set}')
if event_is_set:
logging.debug('processing event')
else:
logging.debug('doing other work')
event = threading.Event()
task1 = threading.Thread(
name='block',
target=wait_for_event,
args=(event,),
)
task1.start()
task2 = threading.Thread(
name='nonblock',
target=wait_for_event_timeout,
args=(event, 2),
)
task2.start()
logging.debug('Waiting before calling Event.set()')
time.sleep(3)
event.set()
logging.debug('Event is set')
wait()メソッドは、タイムアウトする前にイベントを待機する秒数を表す引数を取ることができます。 wait()メソッドはイベントが設定されているかどうかを示すブール値を返すので、呼び出し元はwait()が返された理由を確認できます。 is_set()メソッドは、ブロックされることなくイベントで個別に使用できます。
この例では、wait_for_event_timeout()は、無期限にブロックすることなくイベントステータスをチェックします。 wait_for_event()は、wait()の呼び出しをブロックします。これは、イベントステータスが変更されるまで戻りません。
code: bash
$ python 11_thread_event_demo.py
(block ) wait_for_event starting
(nonblock ) wait_for_event_timeout starting
(MainThread) Waiting before calling Event.set()
(nonblock ) event set: False
(nonblock ) doing other work
(nonblock ) wait_for_event_timeout starting
(MainThread) Event is set
(nonblock ) event set: True
(block ) event set: True
(nonblock ) processing event
ロックでリソースへのアクセスを制御
スレッドの操作を同期することに加えて、共有リソースへのアクセスを制御して、データの破損や欠落を防ぐことができることも重要です。 Pythonの組み込みデータ構造(リスト、辞書など)は、スレッドセーフです(Pythonの内部データ構造を更新するとき、データを保護するためにGILが使用され、更新の途中では解放されません)。 Pythonで実装された他のデータ構造、または整数や浮動小数点数などのより単純な型には、そのような保護がありません。 オブジェクトへの同時アクセスを防ぐには、Lockオブジェクトを使用します。
code: 12_thread_lock.py
import time
import random
import threading
from custom_logging import logging
class Counter:
def __init__(self, start=0):
self.lock = threading.Lock()
self.value = start
def increment(self):
logging.debug('Waiting for lock')
self.lock.acquire()
try:
logging.debug('Acquired lock')
self.value = self.value + 1
finally:
self.lock.release()
def worker(c):
for i in range(2):
pause = random.random()
logging.debug(f'Sleeping {pause:0.02}')
time.sleep(pause)
c.increment()
logging.debug('Done')
counter = Counter()
for _ in range(2):
task = threading.Thread(target=worker, args=(counter,))
task.start()
logging.debug('Waiting for worker threads')
main_thread = threading.main_thread()
for task in threading.enumerate():
if task is not main_thread:
task.join()
logging.debug(f'Counter: {counter.value}')
この例では、worker()関数がCounterインスタンスをインクリメントします。これは、2つのスレッドが同時に内部状態を変更しないように、ロックを管理します。 Lockが使用しない場合、valueアトリビュートへの変更がうまくいかなくなる可能性があります。
code: bash
python 12_thread_lock.py
(Thread-1 ) Sleeping 0.59
(Thread-2 ) Sleeping 0.67
(MainThread) Waiting for worker threads
(Thread-1 ) Waiting for lock
(Thread-1 ) Acquired lock
(Thread-1 ) Sleeping 0.99
(Thread-2 ) Waiting for lock
(Thread-2 ) Acquired lock
(Thread-2 ) Sleeping 0.3
(Thread-2 ) Waiting for lock
(Thread-2 ) Acquired lock
(Thread-2 ) Done
(Thread-1 ) Waiting for lock
(Thread-1 ) Acquired lock
(Thread-1 ) Done
(MainThread) Counter: 4
別のスレッドが現在のスレッドを保持せずにロックを取得したかどうかを確認するには、acquire()にbloking=Falseを渡します。 次の例では、worker()が3回別々にロックを取得しようとし、そのために必要な試行回数をカウントします。 その間、lock_holder()はロックの保持と解放を繰り返し、各状態で短い一時停止を使用して負荷をシミュレートします。
code: 13_thread_nolock.py
import random
import time
import threading
from custom_logging import logging
def lock_holder(lock):
logging.debug('Starting')
while True:
lock.acquire()
try:
logging.debug('Holding')
time.sleep(random.random())
finally:
logging.debug('Not holding')
lock.release()
time.sleep(random.random())
def worker(lock):
logging.debug('Starting')
num_tries = 0
num_acquires = 0
while num_acquires < 3:
time.sleep(random.random())
logging.debug('Trying to acquire')
have_it = lock.acquire(0)
try:
num_tries += 1
if have_it:
logging.debug(f'Iteration {num_tries}: Acquired')
num_acquires += 1
else:
logging.debug(f'Iteration {num_tries}: Not acquired')
finally:
if have_it:
lock.release()
logging.debug(f'Done after {num_tries} iterations')
lock = threading.Lock()
holder = threading.Thread(
target=lock_holder,
args=(lock,),
name='LockHolder',
daemon=True,
)
holder.start()
worker = threading.Thread(
target=worker,
args=(lock,),
name='Worker',
)
worker.start()
ロックを3回別々に取得するには、worker()を3回以上繰り返す必要があります。
code: bash
% python 13_thread_noblock.py
(LockHolder) Starting
(LockHolder) Holding
(Worker ) Starting
(Worker ) Trying to acquire
(Worker ) Iteration 1: Not acquired
(LockHolder) Not holding
(Worker ) Trying to acquire
(Worker ) Iteration 2: Acquired
(LockHolder) Holding
(Worker ) Trying to acquire
(Worker ) Iteration 3: Not acquired
(LockHolder) Not holding
(Worker ) Trying to acquire
(Worker ) Iteration 4: Acquired
(LockHolder) Holding
(LockHolder) Not holding
(Worker ) Trying to acquire
(Worker ) Iteration 5: Acquired
(Worker ) Done after 5 iterations
リエントラントロック
通常のLockオブジェクトは、同じスレッドであっても、複数回取得することはできません。 これにより、同じコールチェーン内の複数の関数がロックにアクセスした場合に、望ましくない副作用が発生する可能性があります。
code: 14_thread_lock_multi.py
import threading
lock = threading.Lock()
print('First lock :', lock.acquire())
print('Second lock:', lock.acquire(0))
この場合、最初の呼び出したacquire()によってロックが取得されます。2回目のacquire()呼び出しには、ブロックされないように、タイムアウト時間をゼロ(0)を与えます。
code: bash
$ python 14_thread_lock_multi.py
First lock : True
Second lock: False
同じスレッドからの個別のコードがロックを「再取得」する必要がある場合、Lock()ではなく、RLock()を使用します。
code: 15_thread_rlock_multi.py
import threading
lock = threading.RLock()
print('First lock :', lock.acquire())
print('Second lock:', lock.acquire(0))
前回のときと違い2度めのロックが取得できています。
code: bash
$ python 15_thread_rlock_multi.py
First lock : True
Second lock: True
コンテキストマネージャーとしてのロック
ロックはコンテキストマネージャーAPIを実装し、withステートメントと互換性があります。 withを使用すると、ロックを明示的に取得して解放する必要がなくなります。
code: 16_thread_lock_with.py
import threading
from custom_logging import logging
def worker_with(lock):
with lock:
logging.debug('Lock acquired via with')
def worker_no_with(lock):
lock.acquire()
try:
logging.debug('Lock acquired directly')
finally:
lock.release()
lock = threading.Lock()
worker1 = threading.Thread(target=worker_with, args=(lock,))
worker2 = threading.Thread(target=worker_no_with, args=(lock,))
worker1.start()
worker2.start()
同じ処理での実装の違いを理解するようにしましょう。
code: bash
% python 16_thread_lock_with.py
(Thread-1 ) Lock acquired via with
(Thread-2 ) Lock acquired directly
Conditionオブジェクトで同期
イベントの使用に加えて、スレッドを同期する別の方法は、Conditionオブジェクトを使用することです。 条件はロックを使用するため、共有リソースに関連付けることができ、複数のスレッドがリソースの更新を待機できるようになります。 この例では、consumer()スレッドは、条件が設定されるのを待ってから続行します。 producer()スレッドは、条件を設定し、他のスレッドに続行できることを通知しなければいけません。
code: 17_thread_condition.py
import time
import threading
from custom_logging import logging
def consumer(cond):
logging.debug('Starting consumer thread')
with cond:
cond.wait()
logging.debug('Resource is available to consumer')
def producer(cond):
logging.debug('Starting producer thread')
with cond:
logging.debug('Making resource available')
cond.notifyAll()
condition = threading.Condition()
consumer1 = threading.Thread(name='consumer1', target=consumer,
args=(condition,))
consumer2 = threading.Thread(name='consumer2', target=consumer,
args=(condition,))
producer = threading.Thread(name='producer', target=producer,
args=(condition,))
consumer1.start()
time.sleep(1)
consumer2.start()
time.sleep(1)
producer.start()
スレッドは、条件に関連付けられたロックを取得するためにを使用します。
code: bash
% python 17_thread_condition.py
(consumer1 ) Starting consumer thread
(consumer2 ) Starting consumer thread
(producer ) Starting producer thread
(producer ) Making resource available
(consumer1 ) Resource is available to consumer
(consumer2 ) Resource is available to consumer
明示的にacquire()メソッドとrelease()メソッドを使用することもできます。
Barriersでスレッドを同期
Barriersは、スレッドを同期する別の方法です。 Barriersはコントロールポイントを確立し、参加しているすべての「パーティ」がそのポイントに到達するまで、参加しているすべてのスレッドがブロックします。 これにより、スレッドは個別に起動し、すべて続行する準備ができるまで一時停止させることができます。
code: 18_thread_barrier.py
import time
import threading
def worker(barrier):
print(f'{threading.current_thread().name} '
f'waiting for barrier with {barrier.n_waiting} others')
worker_id = barrier.wait()
print(f'{threading.current_thread().name} after barrier {worker_id}')
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS)
threads = [
threading.Thread(
name='worker-%s' % i,
target=worker,
args=(barrier,),
)
for i in range(NUM_THREADS)
]
for task in threads:
print(f'{task.name} starting')
task.start()
time.sleep(0.1)
for task in threads:
task.join()
この例では、Barrierは3つのスレッドが待機するまでブロックするコントロールポイントを設定します。条件が満たされると、すべてのスレッドが同時にコントロールポイントを超えて解放されます。 wait()からの戻り値は、解放されているパーティの数を示し、一部のスレッドが共有リソースのクリーンアップなどのアクションを実行するのを制限するために使用することができます。
code: bash
$ python 18_thread_barrier.py
worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-2 after barrier 2
worker-0 after barrier 0
worker-1 after barrier 1
Barrierのabort()メソッドにより、待機中のすべてのスレッドがBrokenBarrierError例外を受け取ります。 これにより、wait()でブロックされている間に処理が停止した場合に、スレッドをクリーンアップできます。
code: 19_thread_barrier_abort.py
import time
import threading
def worker(barrier):
print(f'{threading.current_thread().name} '
f'waiting for barrier with {barrier.n_waiting} others')
worker_id = barrier.wait()
print(f'{threading.current_thread().name} after barrier {worker_id}')
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS+1) # 前のコードとの違いはここ!
threads = [
threading.Thread(
name='worker-%s' % i,
target=worker,
args=(barrier,),
)
for i in range(NUM_THREADS)
]
for task in threads:
print(f'{task.name} starting')
task.start()
time.sleep(0.1)
barrier.abort() # 前のコードとの違いはここ!
for task in threads:
task.join()
この例では、すべてのスレッドでの処理がブロックされるように、実際に開始されるよりも1つ多い参加スレッドを期待するようにBarrierを構成します。 abort()呼び出しは、ブロックされた各スレッドで例外を発生させます。
code: bash
% python 19_thread_barrier_abort.py
worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-2 after barrier 2
worker-1 after barrier 1
worker-0 after barrier 0
リソースへの同時アクセスの制限
全体の数を制限しながら、一度に複数のワーカーがリソースにアクセスできるようにすると便利な場合があります。 たとえば、接続プールが固定数の同時接続をサポートしている場合や、ネットワークアプリケーションが固定数の同時ダウンロードをサポートしている場合などです。Semaphoreは、これらの接続を管理する方法のひとつです。
code: 20_thread_semaphore.py
import time
import threading
import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s (%(threadName)-2s) %(message)s',
)
class ActivePool:
def __init__(self):
super(ActivePool, self).__init__()
self.active = []
self.lock = threading.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
logging.debug(f'Running: {self.active}')
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
logging.debug(f'Running: {self.active}')
def worker(s, pool):
logging.debug('Waiting to join the pool')
with s:
name = threading.current_thread().getName()
pool.makeActive(name)
time.sleep(0.1)
pool.makeInactive(name)
pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
task = threading.Thread(
target=worker,
name=str(i),
args=(s, pool),
)
task.start()
この例では、ActivePoolクラスは、特定の瞬間に実行できるスレッドを特定するための便利な方法として機能します。 実リソースプールは、接続またはその他の値を新しくアクティブなスレッドに割り当て、スレッドが完了すると値を再利用します。 ここでは、アクティブなスレッドの名前を保持するために使用され、最大2つが同時に実行されていることを示します。
code: bash
$ python 20_thread_semaphore.py
2021-01-14 16:17:37,125 (0 ) Waiting to join the pool
2021-01-14 16:17:37,125 (0 ) Running: '0' 2021-01-14 16:17:37,125 (1 ) Waiting to join the pool
2021-01-14 16:17:37,126 (1 ) Running: '0', '1' 2021-01-14 16:17:37,126 (2 ) Waiting to join the pool
2021-01-14 16:17:37,126 (3 ) Waiting to join the pool
2021-01-14 16:17:37,229 (0 ) Running: '1' 2021-01-14 16:17:37,229 (1 ) Running: []
2021-01-14 16:17:37,229 (2 ) Running: '2' 2021-01-14 16:17:37,229 (3 ) Running: '2', '3' 2021-01-14 16:17:37,330 (2 ) Running: '3' 2021-01-14 16:17:37,330 (3 ) Running: []
スレッド固有のデータ
複数のスレッドがそれらを使用できるようにロックする必要があるリソースもあれば、それらを所有していないスレッドから非表示になるように保護する必要があるリソースもあります。 threading.localクラスは、個別のスレッドのビューから値を非表示にできるオブジェクトを作成します。
code: 21_thread_local.py
import random
import threading
from custom_logging import logging
def show_value(data):
try:
val = data.value
except AttributeError:
logging.debug('No value yet')
else:
logging.debug(f'value={val}')
def worker(data):
show_value(data)
data.value = random.randint(1, 100)
show_value(data)
local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)
for i in range(2):
task = threading.Thread(target=worker, args=(local_data,))
task.start()
local_data.valueアトリビュートは、そのスレッドで設定されるまで、どのスレッドにも存在しません。
code: bash
% python 21_thread_local.py
(MainThread) No value yet
(MainThread) value=1000
(Thread-1 ) No value yet
(Thread-1 ) value=76
(Thread-2 ) No value yet
(Thread-2 ) value=93
すべてのスレッドが同じ値で始まるように設定を初期化するには、サブクラスを使用してコンストラクタで属性を設定します。
code: 22_thread_local_with_default.py
import random
import threading
from custom_logging import logging
def show_value(data):
try:
val = data.value
except AttributeError:
logging.debug('No value yet')
else:
logging.debug(f'value={val}')
def worker(data):
show_value(data)
data.value = random.randint(1, 100)
show_value(data)
class MyLocal(threading.local):
def __init__(self, value):
super().__init__()
logging.debug(f'Initializing {self}')
self.value = value
local_data = MyLocal(1000)
show_value(local_data)
for i in range(2):
task = threading.Thread(target=worker, args=(local_data,))
task.start()
__init__()は、デフォルト値を設定するために各スレッドで1回、同じオブジェクト(オブジェクトIDに注目)で呼び出されます。
code: bash
$ python 22_thread_local_with_default.py
(MainThread) Initializing <__main__.MyLocal object at 0x7fc6472dc7c0>
(MainThread) value=1000
(Thread-1 ) Initializing <__main__.MyLocal object at 0x7fc6472dc7c0>
(Thread-1 ) value=1000
(Thread-1 ) value=61
(Thread-2 ) Initializing <__main__.MyLocal object at 0x7fc6472dc7c0>
(Thread-2 ) value=1000
(Thread-2 ) value=7
スレッドには優先権がない
スレッドには優先権が与えられない(Non-Preemptive)ため、3つ以上のスレッドが実行しているとき、はじめにロックをリクエストしたスレッドよりも、あとからリクエストしたスレッドに奪われる可能性があります。
https://gyazo.com/9e31bce2eebac2e1eac99e8806d3907c
さらに、スレッドの切り替える制御はOS側にあるため、I/O処理主体のスレッドとCPU計算主体が同時に実行されているようなときなどでは、コンボイ効果(Convoy Effective)という非効率な状態になることがあります。
処理全体の効率を考えると、I/O処理主体のスレッドに優先的にロックを獲得できるようににすれば、I/O処理の間にCPU処理主体のスレッドが実行できて効率的です。逆に(結果的に)CPU処理主体のスレッドが優先的にロックを獲得することになれば、I/O処理主体のスレッドが残ることになり、無駄なI/O待ち時間が多くなることになります。
https://gyazo.com/07592be9434d26054abe9f9fea53ec57
マルチスレッドの例
CPUバウンドタスクでの例
次のコードは、リストに乱数を順番に追加するものです。
code: 50_demo_single.pyy
import random
def list_append(count, id, out_list):
for i in range(count):
out_list.append(random.random())
if __name__ == "__main__":
size = 100_000_000
out_list = list()
list_append(size, 0, out_list)
code: ipython
In 3: %time %run 50_threading_demo_single.py CPU times: user 21.7 s, sys: 1.86 s, total: 23.6 s
Wall time: 24 s
このコードをマルチスレッド化したものを次に示します。
code: 51_demo_thread.pyy
# thread_demo_multi.py
import random
def list_append(count, id, out_list):
for i in range(count):
out_list.append(random.random())
if __name__ == "__main__":
import threading
size = 100_000_000 # 追加する乱数の数
threads = 2 # 生成するスレッド数
# ジョブのリストを作成してから、
# 各スレッドをジョブリストに追加するスレッド数繰り返す
jobs = []
proc_size = int( size / threads )
for i in range(0, threads):
out_list = list()
job = threading.Thread(target=list_append(proc_size, i, out_list))
jobs.append(job)
# スレッドを開始
for j in jobs:
j.start()
# すべてのスレッドの終了を待つ
for j in jobs:
j.join()
code: ipython
In 4: %time %run 51_threading_demo_multi.py CPU times: user 21.3 s, sys: 2.1 s, total: 23.4 s
Wall time: 23.9 s
このコードは、乱数を追加したリストに対しては何も処理せず、treadingライブラリの使用方法を示しているだけです。
CPUバウンドなタスクではシングルスレッド実装で得られる以上の高速化はあまり期待できないことを示す目的もあります。
I/Oバウンドタスクでの例
このコードは リストsymbolsで与えた企業の株価をダウンロードしてCSVファイルとして保存するものです。
code: stock_downloader.py
from datetime import datetime
import pandas as pd
import pandas_datareader as pdr
from custom_logging import logging
def stock_download(symbol, start=None, end=None):
if start is None:
start=datetime(2010, 1, 1)
if end is None:
end=datetime(2021, 1, 1)
df = pdr.DataReader(symbol, 'yahoo', start, end)
df.to_csv(f'{symbol}.csv')
logging.debug(f'Downloaded {symbol}')
return df
これを呼び出してダウンロードするシリアル版のコードです。
code: 60_download_stock_sirial.py
from stock_downloader import stock_download
from custom_logging import logging
logging.getLogger().setLevel(logging.INFO)
for symbol in stock_symbols:
stock_download(symbol)
これをマルチスレッド版のコードは次のようになります。
code: 61_download_stock_thread.py
rom stock_downloader import stock_download
from queue import Queue
from threading import Thread
from custom_logging import logging
logging.getLogger().setLevel(logging.INFO)
class DownloadWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
symbol = self.queue.get()
try:
stock_download(symbol)
finally:
self.queue.task_done()
# ワーカースレッドと通信するためのキューを作成
queue = Queue()
# ワーカースレッドを作成 4thread
for x in range(4):
worker = DownloadWorker(queue)
# Setting daemon to True will let the main thread exit even though the workers are blocking
# デーモンモードではワーカーがブロックしている場合でも
# メインスレッドが終了する
worker.daemon = True
worker.start()
# ダウンロードする株価コードをキューに入れる
for symbol in stock_symbols:
queue.put(symbol)
# キューがすべてのタスクの処理を終了するのをメインスレッドが待機
queue.join()
ここで、Threadクラスを継承したDownloadWorkerクラスを定義、run()メソッドをオーバーライドさせています。このrun()メソッドでは、queueオブジェクトから株価コードを読み出すと、ダウンロードしCSVファイルに格納する処理を記述し、スレッドを終了します。
あとは、メインスレッドでqueueオブジェクトにダウンロードする株価コードをキューに入れるだけです。
code: python
In 1: %time %run download_stok_sirial.py CPU times: user 1.13 s, sys: 255 ms, total: 1.38 s
Wall time: 9.46 s
In 2: %time %run download_stok_thread.py CPU times: user 694 ms, sys: 97.4 ms, total: 791 ms
Wall time: 3 s
簡単な例ですが、約3.5倍速くなっていることが確認できます。
i/Oバウンドタスクではマルチスレッドでの並列知りは有効なことがわかります。
まとめ
threadingモジュールで複数のスレッドとデータを安全にやり取りするためには、Threadクラスを継承したサブクラスを定義し、スレッドが作業を監視するためのキューを作成する必要があります。
threading では、複数のスレッドを使用して並行性が実現されますが、GILにより、一度に実行できるスレッドは1つだけです。
実際のところ、新規開発のプロジェクトでは threading モジュールを使うことはあまりないかもしれません。
理由は、次のようなものがあります。
Python 3.2 で導入された concurrent.futures モジュールの方が便利で、簡単に記述できる
GILを回避できない
マルチノードで実行できない
参考