asyncioを使ってみよう
asyncioの概要
threadingモジュールは複数のスレッドを生成することで同時実行を実装し、multiprocessingモジュールはシステムプロセスを使用して同時実行を実装します。これに対して、asyncioは、アプリケーションの一部が協力して最適なタイミングでタスクを明示的に切り替えるシングルスレッドのシングルプロセスアプローチを使用します。 ほとんどの場合、このコンテキストスイッチングは、プログラムがデータの読み取りまたは書き込みの待機をブロックするときに発生しますが、asyncioには、システム信号を処理するために、あるコルーチンが別のコルーチンの完了を待機できるように、また、アプリケーションが作業内容を変更する理由となる可能性のある他のイベントを認識するために、特定の将来の時間に実行されるコードのスケジューリングのサポートも含まれています。
非同期処理で実行されるタスクをコルーチン関数、あるいは単にコルーチンといいます。
コルーチンはサブルーチンのより一般的な形式で、サブルーチンは決められた地点から入り、別の決められた地点から出るのに対して、コルーチンは多くの様々な地点から入る、出る、再開することができます。コルーチンは中断することができる関数と考えると理解が早いでしょう。
asyncioモジュールは、コルーチンを使用して並行アプリケーションを構築するためのツールを提供します。
コルーチンは async def で定義されます。
def func(): 関数 func() を定義する。
funcは関数オブジェクトを返す。
関数を呼び出す(実行する)ためにはfunc()
async def func(): コルーチン func() を定義する
funcは関数オブジェクトを返す。
func() と呼び出してもコルーチンオブジェクトを返すだけ
コルーチンの実行にはイベントループもしくはasyncio.run() を使う
Python 3.6 までは次のようにイベントループから実行します。
code: python
import asyncio
async def hello_world():
print("Hello")
await asyncio.sleep(1)
print("World!")
loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world())
loop.close()
Python 3.7 以降では単純にasyncio.run() にコルーチンを与えて実行することができます。
code: python
import asyncio
async def hello_world():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(hello_world())
これらの例にある、await は、この処理を終わるのを待つことを意味しています。その間、このコルーチンは中断されて、他のコルーチンが処理されます。await で指示した処理が終わると、コルーチンが再開されます。
非同期処理のHTTPサーバー/クライアントのための拡張ライブラリ aiohttp にあるサンプルコードが非同期処理を適用するための、わかりやすい例になります。 code: python
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
print(html)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
この例では、与えたURLからデータ(つまりHTML)を取得して、出力しています。
非同期同時実行の概念
他の同時実行モデルを使用するほとんどのプログラムは直線的に記述され、言語ランタイムまたはオペレーティングシステムの基盤となるスレッドまたはプロセス管理に依存して、必要に応じてコンテキストを変更します。 asyncioに基づくアプリケーションでは、コンテキストの変更を明示的に処理するアプリケーションコードが必要で、それを正しく行うための手法を使用するには、相互に関連するいくつかの概念を理解する必要があります。
asyncioが提供するフレームワークは、I/Oイベント、システムイベント、およびアプリケーションコンテキストの変更を効率的に処理するためのファーストクラスオブジェクトであるイベントループを中心としています。オペレーティングシステムの機能を効率的に利用するために、いくつかのループ実装が提供されています。通常、適切なデフォルトが自動的に選択されますが、アプリケーション内から特定のイベントループ実装を選択することもできます。これは、たとえば、一部のループクラスがネットワークI/Oの効率を犠牲にするような方法で外部プロセスのサポートを追加するWindowsで役立ちます。
アプリケーションはイベントループと明示的に対話して実行するコードを登録し、リソースが利用可能になったときにイベントループがアプリケーションコードに必要な呼び出しを行うようにします。たとえば、ネットワークサーバーはソケットを開き、入力イベントが発生したときに通知されるようにソケットを登録します。イベントループは、新しい着信接続があるとき、または読み取るデータがあるときにサーバーコードに警告します。アプリケーションコードは、現在のコンテキストでこれ以上作業を行うことができない短い期間の後に、再び制御をもたらすことが期待されます。たとえば、ソケットから読み取るデータがこれ以上ない場合、サーバーは制御をイベントループに戻す必要があります。
制御をイベントループに戻すメカニズムは、Pythonのコルーチン、つまり状態を失うことなく呼び出し元に制御を渡す特別な関数に依存しています。コルーチンはジェネレーター関数に似ており、実際、ジェネレーターを使用して、コルーチンオブジェクトのネイティブサポートなしで3.5より前のバージョンのPythonでコルーチンを実装できます。 asyncioは、コルーチンを直接書き込む代わりに、コールバックを使用してコードを記述するためのプロトコルとトランスポートのクラスベースの抽象化レイヤーも提供します。クラスベースモデルとコルーチンモデルの両方で、イベントループに再び入ることによってコンテキストを明示的に変更することは、Pythonのスレッド実装における暗黙的なコンテキスト変更の代わりになります。
Futureオブジェクトは、まだ完了していない作業の結果を表すデータ構造です。イベントループは、Futureオブジェクトが完了に設定されるのを監視できるため、アプリケーションの一部が別の部分が作業を終了するのを待つことができます。Futureオブジェクトに加えて、asyncioには、LockやSemaphoreなどの他の同時実行プリミティブが含まれています。
タスクは、コルーチンの実行をラップして管理する方法を知っているFutureのサブクラスです。タスクをイベントループでスケジュールして、必要なリソースが利用可能になったときに実行し、他のコルーチンで使用できる結果を生成できます。
コルーチンとの協調マルチタスク
コルーチンは、並行操作用に設計された言語構造です。 コルーチン関数は、呼び出されるとコルーチンオブジェクトを作成し、呼び出し元はコルーチンのsend()メソッドを使用して関数のコードを実行できます。 コルーチンは、別のコルーチンでawaitキーワードを使用して実行を一時停止できます。 一時停止している間、コルーチンの状態は維持され、次に目覚めたときに中断したところから再開できます。
コルーチンの開始
asyncioイベントループでコルーチンを開始する方法はいくつかあります。 最も簡単なのは、run_until_complete()を使用して、コルーチンを直接渡すことです。
code: async_coroutine.py
import asyncio
async def coroutine():
print('in coroutine')
event_loop = asyncio.get_event_loop()
try:
print('starting coroutine')
coro = coroutine()
print('entering event loop')
event_loop.run_until_complete(coro)
finally:
print('closing event loop')
event_loop.close()
最初のステップは、イベントループへの参照を取得することです。 デフォルトのループタイプを使用することも、特定のループクラスをインスタンス化することもできます。 この例では、デフォルトのループが使用されています。 run_until_complete()メソッドは、コルーチンオブジェクトでループを開始し、コルーチンが終了するとループを停止して戻ります。
code: bash
$ python asyncio_coroutine.py
starting coroutine
entering event loop
in coroutine
closing event loop
コルーチンからの戻り値
コルーチンの戻り値は、コルーチンを開始して待機するコードに返されます。
code: asyncio_coroutine_return.py
import asyncio
async def coroutine():
print('in coroutine')
return 'result'
event_loop = asyncio.get_event_loop()
try:
return_value = event_loop.run_until_complete(
coroutine()
)
print('it returned: {!r}'.format(return_value))
finally:
event_loop.close()
この場合、run_until_complete()は、待機しているコルーチンの結果も返します。
code: bash
$ python asyncio_coroutine_return.py
in coroutine
it returned: 'result'
コルーチンの連鎖
1つのコルーチンが別のコルーチンを開始し、結果を待つことができます。 これにより、タスクを再利用可能な部分に簡単に分解できます。 次の例には、順番に実行する必要がある2つのフェーズがありますが、他の操作と同時に実行できます。
code: asyncio_coroutine_chain.py
import asyncio
async def outer():
print('in outer')
print('waiting for result1')
result1 = await phase1()
print('waiting for result2')
result2 = await phase2(result1)
return (result1, result2)
async def phase1():
print('in phase1')
return 'result1'
async def phase2(arg):
print('in phase2')
return f'result2 derived from {arg}'
event_loop = asyncio.get_event_loop()
try:
return_value = event_loop.run_until_complete(outer())
print('return value: {!r}'.format(return_value))
finally:
event_loop.close()
新しいコルーチンをループに追加する代わりに、awaitキーワードが使用されます。これは、制御フローがすでにループによって管理されているコルーチンの内部にあるため、新しいコルーチンを管理するようにループに指示する必要がないためです。
code: bash
$ python asyncio_coroutine_chain.py
in outer
waiting for result1
in phase1
waiting for result2
in phase2
return value: ('result1', 'result2 derived from result1')
コルーチンの代替としてジェネレーター
コルーチン関数は、asyncioの設計の重要なコンポーネントです。 これらは、プログラムの一部の実行を停止し、その呼び出しの状態を保持し、後で状態に再び入るための言語構造を提供します。これらはすべて、並行性フレームワークの重要な機能です。
Python 3.5では、async defを使用してネイティブにコルーチンを定義し、awaitを使用して制御を生成するための新しい言語機能が導入され、asyncioの例では新しい機能を利用しています。 以前のバージョンのPython3は、@asyncio.coroutineデコレータでラップされたジェネレーター関数を使用し、から生成して同じ効果を実現できます。
code: asyncio_coroutine_generator.py
import asyncio
@asyncio.coroutine
def outer():
print('in outer')
print('waiting for result1')
result1 = yield from phase1()
print('waiting for result2')
result2 = yield from phase2(result1)
return (result1, result2)
@asyncio.coroutine
def phase1():
print('in phase1')
return 'result1'
@asyncio.coroutine
def phase2(arg):
print('in phase2')
return f'result2 derived from {arg}'
event_loop = asyncio.get_event_loop()
try:
return_value = event_loop.run_until_complete(outer())
print('return value: {!r}'.format(return_value))
finally:
event_loop.close()
前の例では、ネイティブコルーチンの代わりにジェネレーター関数を使用してasyncio_coroutine_chain.pyを再現しています。
code: bash
$ python asyncio_coroutine_generator.py
in outer
waiting for result1
in phase1
waiting for result2
in phase2
return value: ('result1', 'result2 derived from result1')
通常の関数への呼び出しのスケジューリング
コルーチンとI / Oコールバックの管理に加えて、asyncioイベントループは、ループに保持されているタイマー値に基づいて、通常の関数への呼び出しをスケジュールできます。
コールバックを「すぐに」スケジュールする
コールバックのタイミングが重要でない場合は、call_soon()を使用して、ループの次の反復の呼び出しをスケジュールできます。 関数の後の余分な位置引数は、呼び出されたときにコールバックに渡されます。 キーワード引数をコールバックに渡すには、functoolsモジュールのpartial()関数を使用します。
code: asyncio_call_soon.py
import asyncio
import functools
def callback(arg, *, kwarg='default'):
print(f'callback invoked with {arg} and {kwarg}')
async def main(loop):
print('registering callbacks')
loop.call_soon(callback, 1)
wrapped = functools.partial(callback, kwarg='not default')
loop.call_soon(wrapped, 2)
await asyncio.sleep(0.1)
event_loop = asyncio.get_event_loop()
try:
print('entering event loop')
event_loop.run_until_complete(main(event_loop))
finally:
print('closing event loop')
event_loop.close()
コールバックは、スケジュールされた順序で呼び出されます。
code: bash
$ python asyncio_call_soon.py
entering event loop
registering callbacks
callback invoked with 1 and default
callback invoked with 2 and not default
closing event loop
遅延のあるコールバックのスケジュール
コールバックを将来のある時点まで遅延させるためには、call_later()を使用します。 最初の引数は秒単位の遅延で、2番目の引数はコールバックです。
code: asyncio_call_later.py
import asyncio
def callback(n):
print(f'callback {n} invoked')
async def main(loop):
print('registering callbacks')
loop.call_later(0.2, callback, 1)
loop.call_later(0.1, callback, 2)
loop.call_soon(callback, 3)
await asyncio.sleep(0.4)
event_loop = asyncio.get_event_loop()
try:
print('entering event loop')
event_loop.run_until_complete(main(event_loop))
finally:
print('closing event loop')
event_loop.close()
この例では、同じコールバック関数が、異なる引数を使用して複数の異なる時間にスケジュールされています。 最後のインスタンスは、call_soon()を使用して、タイムスケジュールされたインスタンスの前に引数3でコールバックが呼び出されるようにします。これは、"now"は通常、最小限の遅延を意味することを示しています。
code: bash
$ python asyncio_call_later.py
entering event loop
registering callbacks
callback 3 invoked
callback 2 invoked
callback 1 invoked
closing event loop
特定の時間のコールバックのスケジュール
特定の時間に発生するように呼び出しをスケジュールすることも可能です。 ループは、実時間ではなく単調な時計を使用して、"now"の値が決して後退しないようにします。 スケジュールされたコールバックの時間を選択するには、ループのtime()メソッドを使用してその時計の内部状態から開始する必要があります。
code: asyncio_call_at.py
import time
import asyncio
def callback(n, loop):
print('callback {} invoked at {}'.format(n, loop.time()))
async def main(loop):
now = loop.time()
print('clock time: {time.time()}')
print('loop time: {now}')
print('registering callbacks')
loop.call_at(now + 0.2, callback, 1, loop)
loop.call_at(now + 0.1, callback, 2, loop)
loop.call_soon(callback, 3, loop)
await asyncio.sleep(1)
event_loop = asyncio.get_event_loop()
try:
print('entering event loop')
event_loop.run_until_complete(main(event_loop))
finally:
print('closing event loop')
event_loop.close()
ループに従った時間は、time.time()によって返される値と一致しないことに注意してください。
code: bash
$ python asyncio_call_at.py
entering event loop
clock time: {time.time()}
loop time: {now}
registering callbacks
callback 3 invoked at 0.082787363
callback 2 invoked at 0.185166158
callback 1 invoked at 0.285039865
closing event loop
非同期で結果を生成する
Futureオブジェクトは、まだ完了していない作業の結果を表します。 イベントループは、Futureオブジェクトの状態を監視して、それが完了したことを示すことができます。これにより、アプリケーションの一部が、別の部分が作業を終了するのを待つことができます。
Futureオブジェクトを待つ
Futureオブジェクトはコルーチンのように機能するため、コルーチンを待つのに役立つテクニックを使用して、Futureオブジェクトが完了としてマークされるのを待つこともできます。
次の例では、futureをイベントループのrun_until_complete()メソッドに渡します。
code: asyncio_future_event_loop.py
import asyncio
def mark_done(future, result):
print('setting future result to {!r}'.format(result))
future.set_result(result)
event_loop = asyncio.get_event_loop()
try:
all_done = asyncio.Future()
print('scheduling mark_done')
event_loop.call_soon(mark_done, all_done, 'the result')
print('entering event loop')
result = event_loop.run_until_complete(all_done)
print('returned result: {!r}'.format(result))
finally:
print('closing event loop')
event_loop.close()
print('future result: {!r}'.format(all_done.result()))
set_result()が呼び出されると、Futureの状態はdoneに変わり、Futureインスタンスは、後で取得するためにメソッドに与えられた結果を保持します。
code: bash
$ python asyncio_future_event_loop.py
scheduling mark_done
entering event loop
setting future result to 'the result'
returned result: 'the result'
closing event loop
future result: 'the result'
この例のように、Futureをawaitキーワードとともに使用することもできます。
code: asyncio_future_await.py
import asyncio
def mark_done(future, result):
print('setting future result to {!r}'.format(result))
future.set_result(result)
async def main(loop):
all_done = asyncio.Future()
print('scheduling mark_done')
loop.call_soon(mark_done, all_done, 'the result')
result = await all_done
print('returned result: {!r}'.format(result))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()
Futureの結果はawait()によって返されるため、通常のコルーチンとFutureインスタンスで同じコードを機能させることができる場合がよくあります。
code: bash
$ python asyncio_future_await.py
scheduling mark_done
setting future result to 'the result'
returned result: 'the result'
Furetureオブジェクトのコールバック
コルーチンのように機能することに加えて、Futureは完了時にコールバックを呼び出すことができます。 コールバックは、登録された順序で呼び出されます。
code: asyncio_future_callback.py
import asyncio
import functools
def callback(future, n):
print('{}: future done: {}'.format(n, future.result()))
async def register_callbacks(all_done):
print('registering callbacks on future')
all_done.add_done_callback(functools.partial(callback, n=1))
all_done.add_done_callback(functools.partial(callback, n=2))
async def main(all_done):
await register_callbacks(all_done)
print('setting result of future')
all_done.set_result('the result')
event_loop = asyncio.get_event_loop()
try:
all_done = asyncio.Future()
event_loop.run_until_complete(main(all_done))
finally:
event_loop.close()
コールバックは、1つの引数Futureインスタンスを予期する必要があります。 コールバックに追加の引数を渡すには、functools.partial()を使用してラッパーを作成します。
code: bash
$ python asyncio_future_callback.py
registering callbacks on future
setting result of future
1: future done: the result
2: future done: the result
タスクの同時実行
タスクは、イベントループと対話するための主要な方法の1つです。 タスクはコルーチンをラップし、完了したときに追跡します。 タスクはFutureのサブクラスであるため、他のコルーチンはそれらを待機でき、各コルーチンには、タスクの完了後に取得できる結果があります。
タスクの開始
タスクを開始するには、create_task()を使用してタスクインスタンスを作成します。 結果のタスクは、ループが実行されていてコルーチンが戻らない限り、イベントループによって管理される並行操作の一部として実行されます。
code: asyncio_create_task.py
import asyncio
async def task_func():
print('in task_func')
return 'the result'
async def main(loop):
print('creating task')
task = loop.create_task(task_func())
print('waiting for {!r}'.format(task))
return_value = await task
print('task completed {!r}'.format(task))
print('return value: {!r}'.format(return_value))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()
この例では、main()関数が終了する前に、タスクが結果を返すのを待ちます。
code: bash
$ python asyncio_create_task.py
creating task
waiting for <Task pending coro=<task_func() running at asyncio_create_task.py:3>>
in task_func
task completed <Task finished coro=<task_func() done, defined at asyncio_create_task.py:3> result='the result'>
return value: 'the result'
タスクのキャンセル
create_task()から返されたTaskオブジェクトを保持することにより、タスクが完了する前にタスクの操作をキャンセルすることができます。
code: asyncio_cancel_task.py
import asyncio
async def task_func():
print('in task_func')
return 'the result'
async def main(loop):
print('creating task')
task = loop.create_task(task_func())
print('canceling task')
task.cancel()
print('canceled task {!r}'.format(task))
try:
await task
except asyncio.CancelledError:
print('caught error from canceled task')
else:
print('task result: {!r}'.format(task.result()))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()
この例では、イベントループを開始する前に、タスクを作成してキャンセルします。 結果は、run_until_complete()からのCancelledError例外です。
code: bash
$ python asyncio_cancel_task.py
creating task
canceling task
canceled task <Task cancelling coro=<task_func() running at asyncio_cancel_task.py:3>>
caught error from canceled task
別の並行操作の待機中にタスクがキャンセルされた場合、待機しているポイントでCancelledError例外が発生することにより、タスクにキャンセルが通知されます。
code: asyncio_cancel_task2.py
import asyncio
async def task_func():
print('in task_func, sleeping')
try:
await asyncio.sleep(1)
except asyncio.CancelledError:
print('task_func was canceled')
raise
return 'the result'
def task_canceller(t):
print('in task_canceller')
t.cancel()
print('canceled the task')
async def main(loop):
print('creating task')
task = loop.create_task(task_func())
loop.call_soon(task_canceller, task)
try:
await task
except asyncio.CancelledError:
print('main() also sees task as canceled')
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()
例外をキャッチすると、必要に応じて、すでに実行された作業をクリーンアップする機会が提供されます。
code: bash
$ python asyncio_cancel_task2.py
creating task
in task_func, sleeping
in task_canceller
canceled the task
task_func was canceled
main() also sees task as canceled
コルーチンからのタスクの作成
sure_future()関数は、コルーチンの実行に関連付けられたタスクを返します。 次に、そのタスクインスタンスを他のコードに渡すことができます。他のコードは、元のコルーチンがどのように構築または呼び出されたかを知らなくても、それを待つことができます。
code: asyncio_ensure_future.py
import asyncio
async def wrapped():
print('wrapped')
return 'result'
async def inner(task):
print('inner: starting')
print('inner: waiting for {!r}'.format(task))
result = await task
print('inner: task returned {!r}'.format(result))
async def starter():
print('starter: creating task')
task = asyncio.ensure_future(wrapped())
print('starter: waiting for inner')
await inner(task)
print('starter: inner returned')
event_loop = asyncio.get_event_loop()
try:
print('entering event loop')
result = event_loop.run_until_complete(starter())
finally:
event_loop.close()
sure_future()に指定されたコルーチンは、何かがawait()を使用して実行を許可するまで開始されないことに注意してください。
code: bash
$ python asyncio_ensure_future.py
entering event loop
starter: creating task
starter: waiting for inner
inner: starting
inner: waiting for <Task pending coro=<wrapped() running at asyncio_ensure_future.py:3>>
wrapped
inner: task returned 'result'
starter: inner returned
制御構造を持つコルーチンの構成
一連のコルーチン間の線形制御フローは、Python組み込みの言語キーワードawaitを使用して簡単に管理できます。 asyncioのツールを使用して、1つのコルーチンが他の複数のコルーチンが並行して完了するのを待つことができるより複雑な構造も可能です。
複数のコルーチンを待つ
多くの場合、1つの操作を多くの部分に分割し、それらを別々に実行すると便利です。 たとえば、いくつかのリモートリソースをダウンロードしたり、リモートAPIにクエリを実行したりします。 実行の順序が重要ではなく、任意の数の操作が存在する可能性がある状況では、wait()を使用して、他のバックグラウンド操作が完了するまで1つのコルーチンを一時停止できます。
code: asyncio_wait.py
import asyncio
async def phase(i):
print(f'in phase {i}')
await asyncio.sleep(0.1 * i)
print(f'done with phase {i}')
return f'phase {i} result'
async def main(num_phases):
print('starting main')
phases = [
phase(i)
for i in range(num_phases)
]
print('waiting for phases to complete')
completed, pending = await asyncio.wait(phases)
print('results: {!r}'.format(results))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()
code: bash
$ python asyncio_wait.py
starting main
waiting for phases to complete
in phase 0
in phase 2
in phase 1
done with phase 0
done with phase 1
done with phase 2
wait()がタイムアウト値とともに使用された場合にのみ、保留中の操作が残ります。
code: asyncio_wait_timeout.py
import asyncio
async def phase(i):
print(f'in phase {i}')
try:
await asyncio.sleep(0.1 * i)
except asyncio.CancelledError:
print(f'phase {i} canceled')
raise
else:
print(f'done with phase {i}')
return f'phase {i} result'
async def main(num_phases):
print('starting main')
phases = [
phase(i)
for i in range(num_phases)
]
print('waiting 0.1 for phases to complete')
completed, pending = await asyncio.wait(phases, timeout=0.1)
print('{} completed and {} pending'.format(
len(completed), len(pending),
))
# タスクを終了せずに終了するときにエラーが発生しないように
# 残りのタスクをキャンセルする
if pending:
print('canceling tasks')
for t in pending:
t.cancel()
print('exiting main')
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()
これらの残りのバックグラウンド操作は、キャンセルするか、それらを待つことによって終了する必要があります。 イベントループが続く間、それらを保留のままにしておくと、それらはさらに実行されます。これは、操作全体が中止されたと見なされる場合は望ましくない場合があります。 プロセスの最後にそれらを保留のままにしておくと、警告が報告されます。
code: bash
$ python asyncio_wait_timeout.py
starting main
waiting 0.1 for phases to complete
in phase 0
in phase 1
in phase 2
done with phase 0
1 completed and 2 pending
canceling tasks
exiting main
phase 1 canceled
phase 2 canceled
コルーチンからの結果を収集
バックグラウンドフェーズが明確に定義されており、それらのフェーズの結果のみが重要である場合、gather()は複数の操作を待機するのに役立つ場合があります。
code: asyncio_gather.py
import asyncio
async def phase1():
print('in phase1')
await asyncio.sleep(2)
print('done with phase1')
return 'phase1 result'
async def phase2():
print('in phase2')
await asyncio.sleep(1)
print('done with phase2')
return 'phase2 result'
async def main():
print('starting main')
print('waiting for phases to complete')
results = await asyncio.gather(
phase1(),
phase2(),
)
print('results: {!r}'.format(results))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main())
finally:
event_loop.close()
収集によって作成されたタスクは公開されないため、キャンセルすることはできません。 戻り値は、バックグラウンド操作が実際に完了した順序に関係なく、gather()に渡された引数と同じ順序の結果のリストです。
code: bash
$ python asyncio_gather.py
starting main
waiting for phases to complete
in phase1
in phase2
done with phase2
done with phase1
バックグラウンド操作終了時の処理
as_completed()は、与えられたコルーチンのリストの実行を管理し、実行が終了すると一度に1つずつ結果を生成するジェネレーターです。 wait()と同様に、順序はas_completed()によって保証されませんが、他のアクションを実行する前に、すべてのバックグラウンド操作が完了するのを待つ必要はありません。
code: asyncio_as_completed.py
import asyncio
async def phase(i):
print(f'in phase {i}')
await asyncio.sleep(0.5 - (0.1 * i))
print(f'done with phase {i}')
return f'phase {i} result'
async def main(num_phases):
print('starting main')
phases = [
phase(i)
for i in range(num_phases)
]
print('waiting for phases to complete')
results = []
for next_to_complete in asyncio.as_completed(phases):
answer = await next_to_complete
print('received answer {!r}'.format(answer))
results.append(answer)
print('results: {!r}'.format(results))
return results
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()
この例では、開始とは逆の順序で終了するいくつかのバックグラウンドフェーズを開始します。 ジェネレーターが消費されると、ループはawait()を使用してコルーチンの結果を待ちます。
code: bash
$ python asyncio_as_completed.py
starting main
waiting for phases to complete
in phase 0
in phase 2
in phase 1
done with phase 2
received answer 'phase 2 result'
done with phase 1
received answer 'phase 1 result'
done with phase 0
received answer 'phase 0 result'
同期プリミティブ
asyncioアプリケーションは通常、シングルスレッドプロセスとして実行されますが、それでも並行アプリケーションとして構築されます。 各コルーチンまたはタスクは、I / Oおよびその他の外部イベントからの遅延および割り込みに基づいて、予測できない順序で実行される場合があります。 安全な並行性をサポートするために、asyncioには、スレッドモジュールとマルチプロセッシングモジュールにあるのと同じ低レベルプリミティブのいくつかの実装が含まれています。
ロック
ロックを使用して、共有リソースへのアクセスを保護できます。 ロックの所有者のみがリソースを使用できます。 ロックを取得しようと複数回試行するとブロックされるため、一度に1つのホルダーしかありません。
code: async_lock.py
import asyncio
import functools
def unlock(lock):
print('callback releasing lock')
lock.release()
async def coro1(lock):
print('coro1 waiting for the lock')
async with lock:
print('coro1 acquired lock')
print('coro1 released lock')
async def coro2(lock):
print('coro2 waiting for the lock')
await lock.acquire()
try:
print('coro2 acquired lock')
finally:
print('coro2 released lock')
lock.release()
async def main(loop):
# 共有ロックを作成して取得します。
lock = asyncio.Lock()
print('acquiring the lock before starting coroutines')
await lock.acquire()
print('lock acquired: {}'.format(lock.locked()))
# ロックを解除するためのコールバックをスケジュールする
loop.call_later(0.1, functools.partial(unlock, lock))
# ロックを使用するコルーチンを実行
print('waiting for coroutines')
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()
この例のcoro2()のように、Lock()のacquire()メソッドは、await()を使用して直接呼び出すことができ、実行時にrelease()メソッドを呼び出すことができます。 また、coro1()のように、withawaitキーワード引数を使用して非同期コンテキストマネージャーとして使用することもできます。
code: bash
$ python asyncio_lock.py
acquiring the lock before starting coroutines
lock acquired: True
waiting for coroutines
coro2 waiting for the lock
coro1 waiting for the lock
callback releasing lock
coro2 acquired lock
coro2 released lock
coro1 acquired lock
coro1 released lock
Eventクラス
asyncio.Eventはthreading.Eventに基づいており、通知に関連付けられる特定の値を探すことなく、複数のコンシューマーが何かが発生するのを待つことができるようにするために使用されます。
code: asyncio_event.py
import asyncio
import functools
def set_event(event):
print('setting event in callback')
event.set()
async def coro1(event):
print('coro1 waiting for event')
await event.wait()
print('coro1 triggered')
async def coro2(event):
print('coro2 waiting for event')
await event.wait()
print('coro2 triggered')
async def main(loop):
# Create a shared event
event = asyncio.Event()
print(f'event start state: {event.is_set()}')
loop.call_later(
0.1, functools.partial(set_event, event)
)
print(f'event end state: {event.is_set()}')
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()
Lockと同様に、coro1()とcoro2()はどちらもイベントが設定されるのを待ちます。 違いは、両方ともイベント状態が変化するとすぐに開始でき、イベントオブジェクトの一意のホールドを取得する必要がないことです。
code: bash
$ python asyncio_event.py
event start state: False
coro2 waiting for event
coro1 waiting for event
setting event in callback
coro2 triggered
coro1 triggered
event end state: True
Conditionオブジェクト
ConditionオブジェクトはEventオブジェクトと同様に機能しますが、待機中のすべてのコルーチンに通知するのではなく、ウェイターの数がnotify()の引数で制御される点が異なります。
code: asyncio_condition.py
import asyncio
async def consumer(condition, n):
async with condition:
print(f'consumer {n} is waiting')
await condition.wait()
print(f'consumer {n} triggered')
print(f'ending consumer {n}')
async def manipulate_condition(condition):
print('starting manipulate_condition')
# 一時停止してコンシューマーが開始できるようにする
await asyncio.sleep(0.1)
for i in range(1, 3):
async with condition:
print(f'notifying {i} consumers')
condition.notify(n=i)
await asyncio.sleep(0.1)
async with condition:
print('notifying remaining consumers')
condition.notify_all()
print('ending manipulate_condition')
async def main(loop):
# Create a condition
condition = asyncio.Condition()
# Set up tasks watching the condition
consumers = [
consumer(condition, i)
for i in range(5)
]
# condition変数を操作するタスクをスケジュールする
loop.create_task(manipulate_condition(condition))
# コンシューマーが終了するのを待つ
await asyncio.wait(consumers)
event_loop = asyncio.get_event_loop()
try:
result = event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()
この例では、条件の5つのコンシューマーを開始します。 それぞれがwait()メソッドを使用して、続行できるという通知を待ちます。 manage_condition()は、1つのコンシューマー、次に2つのコンシューマー、次に残りのすべてのコンシューマーに通知します。
code: bash
$ python asyncio_condition.py
starting manipulate_condition
consumer 2 is waiting
consumer 3 is waiting
consumer 0 is waiting
consumer 4 is waiting
consumer 1 is waiting
notifying 1 consumers
consumer 2 triggered
ending consumer 2
notifying 2 consumers
consumer 3 triggered
ending consumer 3
consumer 0 triggered
ending consumer 0
notifying remaining consumers
ending manipulate_condition
consumer 4 triggered
ending consumer 4
consumer 1 triggered
ending consumer 1
Queuesクラス
asyncio.Queueは、queue.Queueがスレッドに対して、またはmultiprocessing.Queueがプロセスに対して行うように、コルーチンに対して先入れ先出しのデータ構造を提供します。
code: async_queues.py
import asyncio
async def consumer(n, q):
print(f'consumer {n}: starting')
while True:
print(f'consumer {n}: waiting for item')
item = await q.get()
print(f'consumer {n}: has item {item}')
if item is None:
# None は終了させるためのシグナル
q.task_done()
break
else:
await asyncio.sleep(0.01 * item)
q.task_done()
print(f'consumer {n}: ending')
async def producer(q, num_workers):
print('producer: starting')
# ジョブをシミュレートするためにキューにいくつかの番号を追加
for i in range(num_workers * 3):
await q.put(i)
print(f'producer: added task {i} to the queue')
# キューにNoneエントリを追加して、
# コンシューマーに終了するように通知
print('producer: adding stop signals to the queue')
for i in range(num_workers):
await q.put(None)
print('producer: waiting for queue to empty')
await q.join()
print('producer: ending')
async def main(loop, num_consumers):
# 固定サイズのキューを作成して、
# コンシューマーがいくつかのアイテムを引き出すまで
# プロデューサーがブロックする
q = asyncio.Queue(maxsize=num_consumers)
# Scheduled the consumer tasks.
consumers = [
loop.create_task(consumer(i, q))
for i in range(num_consumers)
]
# プロデューサータスクをスケジュール
prod = loop.create_task(producer(q, num_consumers))
# すべてのコルーチンが終了するのを待つ
await asyncio.wait(consumers + prod) event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop, 2))
finally:
event_loop.close()
put()を使用したアイテムの追加またはget()を使用したアイテムの削除は、キューサイズが固定されている(追加をブロックしている)か、キューが空である(アイテムをフェッチする呼び出しをブロックしている)可能性があるため、どちらも非同期操作です。
code: bash
$ python asyncio_queueus.py
consumer 0: starting
consumer 0: waiting for item
consumer 1: starting
consumer 1: waiting for item
producer: starting
producer: added task 0 to the queue
producer: added task 1 to the queue
consumer 0: has item 0
consumer 1: has item 1
producer: added task 2 to the queue
producer: added task 3 to the queue
consumer 0: waiting for item
consumer 0: has item 2
producer: added task 4 to the queue
consumer 1: waiting for item
consumer 1: has item 3
producer: added task 5 to the queue
producer: adding stop signals to the queue
consumer 0: waiting for item
consumer 0: has item 4
consumer 1: waiting for item
consumer 1: has item 5
producer: waiting for queue to empty
consumer 0: waiting for item
consumer 0: waiting for item
consumer 0: has item None
consumer 0: ending
consumer 1: waiting for item
consumer 1: has item None
consumer 1: ending
producer: ending
Semaphoreクラス
セマフォ(Semaphre)には、取得や解放の呼び出しが行われるたびに加算/減算される内部カウンターがあります。コードのブロックをSemaphoreで保護し、Semaphoreのカウンターの初期値を2に設定したとします。最初のワーカーがSemaphoreを取得すると、カウンターの値に1が減算され、2つ目のワーカーがSemaphoreに到達するとカウンターの値がまた1だけ減算されてゼロ(0)になります。
この時点で、別のワーカーがSemaphoreに到達すると、実行が拒否されます。 セマフォは、リソースが過剰に使用されないように保護する有益な方法です。
次の例では、Semaphoreの単純なインスタンスを作成してから、そのセマフォを取得しようとする3つのワーカー関数を作成します。 このセマフォの初期値は2であるため、2つのワーカー関数がセマフォを正常に取得してから解放し、3番目のワーカーがセマフォを取得できるようにします。
code: async_semaphore.py
import asyncio
import time
async def myWorker(semaphore, i):
await semaphore.acquire()
print(f"Worker {i}: セマフォの取得に成功")
await asyncio.sleep(3)
print(f"Worker {i}: セマフォを開放")
semaphore.release()
async def main(loop):
mySemaphore = asyncio.Semaphore(value=2)
await asyncio.wait([myWorker(mySemaphore, 1),
myWorker(mySemaphore, 2),
myWorker(mySemaphore, 3)])
print("メイン・コルーチン")
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
print("すべてのワーカーが終了")
loop.close()
これを実行すると、最初に2つのワーカー(Worker1とWorker3)がセマフォを取得してから解放し、その後、3番目のワーカー(Worker2)が続き、セマフォを取得して開放しています。
セマフォのカウンタの初期値で与えた数が同時にセマフォを取得できるようになります。
code: bash
% python asyncio_semaphore.py
Worker 3: セマフォの取得に成功
Worker 1: セマフォの取得に成功
Worker 3: セマフォを開放
Worker 1: セマフォを開放
Worker 2: セマフォの取得に成功
Worker 2: セマフォを開放
メイン・コルーチン
すべてのワーカーが終了
制限付きセマフォ
通常のセマフォと制限付きセマフォ(Bounded Semaphore)の間には微妙な違いがあります。 制限付きセマフォは、取得よりも多くの開放を許可しないという点でのみ異なります。 値を超えると、ValueErrorが発生します。
code: async_boundedsemaphore.py
import asyncio
import time
async def myWorker(semaphore, i):
await semaphore.acquire()
print(f"Worker {i}: セマフォの取得に成功")
await asyncio.sleep(3)
print(f"Worker {i}: セマフォを開放")
semaphore.release()
print(f"Worker {i}: セマフォを開放 2回目")
semaphore.release()
async def main(loop):
mySemaphore = asyncio.BoundedSemaphore(value=2)
await asyncio.wait([myWorker(mySemaphore, 1),
myWorker(mySemaphore, 2),
myWorker(mySemaphore, 3)])
print("メイン・コルーチン")
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
print("すべてのワーカーが終了")
loop.close()
code: bash
% python asyncio_bounded_semaphore.py
Worker 3: セマフォの取得に成功
Worker 1: セマフォの取得に成功
Worker 3: セマフォを開放
Worker 3: セマフォを開放 2回目
Worker 1: セマフォを開放
Worker 2: セマフォの取得に成功
Worker 2: セマフォを開放
Worker 2: セマフォを開放 2回目
Task exception was never retrieved
future: <Task finished coro=<myWorker() done, defined at asyncio_bounded_semaphore.py:4> exception=ValueError('BoundedSemaphore released too many times')>
Traceback (most recent call last):
File "asyncio_bounded_semaphore.py", line 11, in myWorker
semaphore.release()
File "/Users/goichiiisaka/anaconda3/envs/parallel/lib/python3.7/asyncio/locks.py", line 506, in release
raise ValueError('BoundedSemaphore released too many times')
ValueError: BoundedSemaphore released too many times
Task exception was never retrieved
future: <Task finished coro=<myWorker() done, defined at asyncio_bounded_semaphore.py:4> exception=ValueError('BoundedSemaphore released too many times')>
Traceback (most recent call last):
File "asyncio_bounded_semaphore.py", line 9, in myWorker
semaphore.release()
File "/Users/goichiiisaka/anaconda3/envs/parallel/lib/python3.7/asyncio/locks.py", line 506, in release
raise ValueError('BoundedSemaphore released too many times')
ValueError: BoundedSemaphore released too many times
メイン・コルーチン
すべてのワーカーが終了
プロトコルクラスの抽象化を使用した非同期I/O
この時点まで、例はすべて、一度に1つの概念に焦点を合わせるために並行性とI / O操作を混在させることを避けてきました。 ただし、I / Oブロック時にコンテキストを切り替えることは、非同期の主な使用例の1つです。 このセクションでは、すでに紹介した並行性の概念に基づいて、ソケットセクションとソケットサーバーセクションで使用されている例と同様に、単純なエコーサーバーとクライアントを実装する2つのサンプルプログラムについて説明します。 クライアントはサーバーに接続し、データを送信してから、応答と同じデータを受信できます。 I / O操作が開始されるたびに、実行中のコードはイベントループへの制御を放棄し、I / Oの準備ができるまで他のタスクを実行できるようにします。
エコーサーバー
サーバーは、非同期とロギングを設定するために必要なモジュールをインポートすることから始め、次にイベントループオブジェクトを作成します。
code: asyncio_echo_server_protocol.py
import sys
import logging
import asyncio
SERVER_ADDRESS = ('localhost', 10000)
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s: %(message)s',
stream=sys.stderr,
)
log = logging.getLogger('main')
event_loop = asyncio.get_event_loop()
次に、クライアント通信を処理するためにasyncio.Protocolのサブクラスを定義します。 プロトコルオブジェクトのメソッドは、サーバーソケットに関連付けられたイベントに基づいて呼び出されます。
code: python
class EchoServer(asyncio.Protocol):
新しいクライアント接続ごとに、connection_made()の呼び出しがトリガーされます。 transport引数はasyncio.Transportのインスタンスであり、ソケットを使用して非同期I/Oを実行するための抽象化を提供します。 異なるタイプの通信は、すべて同じAPIを使用して、異なるトランスポート実装を提供します。 たとえば、ソケットを操作するためと、サブプロセスへのパイプを操作するための別々のTransportクラスがあります。 着信クライアントのアドレスは、実装固有のメソッドであるget_extra_info()を介してトランスポートから取得できます。
code: python
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.log = logging.getLogger(
'EchoServer_{}_{}'.format(*self.address)
)
self.log.debug('connection accepted')
接続が確立された後、データがクライアントからサーバーに送信されると、プロトコルのdata_received()メソッドが呼び出され、データが処理のために渡されます。 データはバイト文字列として渡され、適切な方法でデコードするのはアプリケーションの責任です。 ここで結果がログに記録され、transport.write()を呼び出すことにより、応答がすぐにクライアントに返送されます。
code: python
def data_received(self, data):
self.log.debug('received {!r}'.format(data))
self.transport.write(data)
self.log.debug('sent {!r}'.format(data))
一部のTransportは、特別なファイルの終わりインジケータ(EOF)をサポートします。 EOFが検出されると、eof_received()メソッドが呼び出されます。 この実装では、EOFがクライアントに返送され、受信されたことを示します。 すべてのTransportが明示的なEOFをサポートしているわけではないため、このプロトコルは最初にTransportにEOFを送信しても安全かどうかを尋ねます。
code: python
def eof_received(self):
self.log.debug('received EOF')
if self.transport.can_write_eof():
self.transport.write_eof()
通常またはエラーの結果として接続が閉じられると、プロトコルのconnection_lost()メソッドが呼び出されます。 エラーが発生した場合、引数には適切な例外オブジェクトが含まれます。 それ以外の場合はNoneです。
code: python
def connection_lost(self, error):
if error:
self.log.error('ERROR: {}'.format(error))
else:
self.log.debug('closing')
super().connection_lost(error)
サーバーを起動するには2つのステップがあります。 最初に、アプリケーションはイベントループに、プロトコルクラスと、リッスンするホスト名とソケットを使用して新しいサーバーオブジェクトを作成するように指示します。 create_server()メソッドはコルーチンであるため、実際にサーバーを起動するには、結果をイベントループで処理する必要があります。 コルーチンを完了すると、イベントループに関連付けられたasyncio.Serverインスタンスが生成されます。
code: python
# サーバーを作成し、実際のイベントループを開始する前に、ループでコルーチンを終了させる
factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))
次に、イベントを処理してクライアントリクエストを処理するために、イベントループを実行する必要があります。 長時間実行されるサービスの場合、run_forever()メソッドがこれを行う最も簡単な方法です。 アプリケーションコードまたはプロセスのシグナリングのいずれかによってイベントループが停止すると、サーバーを閉じてソケットを適切にクリーンアップし、プログラムが終了する前にイベントループを閉じて他のコルーチンの処理を終了できます。
code: python
# すべての接続を処理するために、イベントループに永続的に入る
try:
event_loop.run_forever()
finally:
log.debug('closing server')
server.close()
event_loop.run_until_complete(server.wait_closed())
log.debug('closing event loop')
event_loop.close()
エコークライアント
Protocolクラスを使用してクライアントを構築することは、サーバーを構築することと非常に似ています。 コードは、非同期とロギングを設定するために必要なモジュールをインポートしてから、イベントループオブジェクトを作成することから始まります。 code: asyncio_echo_client_protocl.py
import sys
import logging
import functools
import asyncio
MESSAGES = [
b'This is the message. ',
b'It will be sent ',
b'in parts.',
]
SERVER_ADDRESS = ('localhost', 10000)
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s: %(message)s',
stream=sys.stderr,
)
log = logging.getLogger('main')
event_loop = asyncio.get_event_loop()
クライアントプロトコルクラスは、サーバーと同じメソッドを定義しますが、実装は異なります。 クラスコンストラクターは、送信するメッセージのリストと、サーバーからの応答を受信してクライアントが作業サイクルを完了したことを通知するために使用するFutureインスタンスの2つの引数を受け入れます。
code: pyton
class EchoClient(asyncio.Protocol):
def __init__(self, messages, future):
super().__init__()
self.messages = messages
self.log = logging.getLogger('EchoClient')
self.f = future
クライアントがサーバーに正常に接続すると、すぐに通信を開始します。 基になるネットワークコードが複数のメッセージを1つの送信に結合する場合がありますが、メッセージのシーケンスは一度に1つずつ送信されます。 すべてのメッセージが使い果たされると、EOFが送信されます。
データはすべてすぐに送信されているように見えますが、実際には、Transportオブジェクトは送信データをバッファリングし、ソケットのバッファがデータを受信する準備ができたときに実際に送信するコールバックを設定します。 これはすべて透過的に処理されるため、I/O操作がすぐに行われているようにアプリケーションコードを記述できます。
code: python
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.log.debug(
'connecting to {} port {}'.format(*self.address)
)
# これは、送信されているメッセージの各部分を表示するのが難しくなることを除いて、
# transport.writelines()である可能性があります。
for msg in self.messages:
transport.write(msg)
self.log.debug('sending {!r}'.format(msg))
if transport.can_write_eof():
transport.write_eof()
サーバーからの応答を受信すると、ログに記録されます。
code: python
def data_received(self, data):
self.log.debug('received {!r}'.format(data))
また、ファイルの終わりマーカーを受信するか、サーバー側から接続を閉じると、ローカルトランスポートオブジェクトが閉じられ、結果を設定することで、将来のオブジェクトに完了のマークが付けられます。
code: python
def eof_received(self):
self.log.debug('received EOF')
self.transport.close()
if not self.f.done():
self.f.set_result(True)
def connection_lost(self, exc):
self.log.debug('server closed connection')
self.transport.close()
if not self.f.done():
self.f.set_result(True)
super().connection_lost(exc)
通常、プロトコルクラスは、接続を作成するためにイベントループに渡されます。 この場合、イベントループにはプロトコルコンストラクターに追加の引数を渡す機能がないため、クライアントクラスをラップし、送信するメッセージのリストとFutureインスタンスを渡すパーシャルを作成する必要があります。 次に、create_connection()を呼び出してクライアント接続を確立するときに、その新しいcallableがクラスの代わりに使用されます。
code: python
client_completed = asyncio.Future()
client_factory = functools.partial(
EchoClient,
messages=MESSAGES,
future=client_completed,
)
factory_coroutine = event_loop.create_connection(
client_factory,
*SERVER_ADDRESS,
)
クライアントの実行をトリガーするために、イベントループは、クライアントを作成するためのコルーチンで1回呼び出され、クライアントに指定されたFutureインスタンスで再度呼び出され、終了時に通信します。 このように2つの呼び出しを使用すると、クライアントプログラムで無限ループが発生するのを回避できます。このループは、サーバーとの通信が終了した後に終了する可能性があります。 コルーチンがクライアントを作成するのを待つために最初の呼び出しのみが使用された場合、すべての応答データを処理せず、サーバーへの接続を適切にクリーンアップしない可能性があります。
code: python
log.debug('waiting for client to complete')
try:
event_loop.run_until_complete(factory_coroutine)
event_loop.run_until_complete(client_completed)
finally:
log.debug('closing event loop')
event_loop.close()
出力
サーバーを1つのウィンドウで実行し、クライアントを別のウィンドウで実行すると、次の出力が生成されます。
code: bash
% python asyncio_echo_client_protocl.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoServer_::1_55405: connection accepted
EchoClient: sending b'in parts.'
EchoServer_::1_55405: received b'This is the message. It will be sent in parts.'
EchoServer_::1_55405: sent b'This is the message. It will be sent in parts.'
EchoServer_::1_55405: received EOF
EchoServer_::1_55405: closing
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop
$ python asyncio_echo_client_protocl.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoServer_::1_55410: connection accepted
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoServer_::1_55410: received b'This is the message. '
EchoClient: sending b'It will be sent '
EchoServer_::1_55410: sent b'This is the message. '
EchoClient: sending b'in parts.'
EchoServer_::1_55410: received b'It will be sent in parts.'
EchoServer_::1_55410: sent b'It will be sent in parts.'
EchoClient: received b'This is the message. '
EchoServer_::1_55410: received EOF
EchoClient: received b'It will be sent in parts.'
EchoServer_::1_55410: closing
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop
$ python asyncio_echo_client_protocl.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoServer_::1_55415: connection accepted
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoServer_::1_55415: received b'This is the message. '
EchoClient: sending b'in parts.'
EchoServer_::1_55415: sent b'This is the message. '
EchoClient: received b'This is the message. '
EchoServer_::1_55415: received b'It will be sent in parts.'
EchoServer_::1_55415: sent b'It will be sent in parts.'
EchoClient: received b'It will be sent in parts.'
EchoServer_::1_55415: received EOF
EchoServer_::1_55415: closing
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop
クライアントは常にメッセージを個別に送信しますが、クライアントが最初に実行されたときに、サーバーは1つの大きなメッセージを受信し、それをクライアントにエコーバックします。 これらの結果は、ネットワークのビジー状態と、すべてのデータが準備される前にネットワークバッファーがフラッシュされるかどうかに基づいて、後続の実行で異なります。
code: bash
$ python asyncio_echo_server_protocol.py
asyncio: Using selector: KqueueSelector
main: starting up on localhost port 10000
EchoServer_::1_55431: connection accepted
EchoServer_::1_55431: received b'This is the message. It will be sent in parts.'
EchoServer_::1_55431: sent b'This is the message. It will be sent in parts.'
EchoServer_::1_55431: received EOF
EchoServer_::1_55431: closing
EchoServer_::1_55434: connection accepted
EchoServer_::1_55434: received b'This is the message. It will be sent in parts.'
EchoServer_::1_55434: sent b'This is the message. It will be sent in parts.'
EchoServer_::1_55434: received EOF
EchoServer_::1_55434: closing
EchoServer_::1_55435: connection accepted
EchoServer_::1_55435: received b'This is the message. It will be sent in parts.'
EchoServer_::1_55435: sent b'This is the message. It will be sent in parts.'
EchoServer_::1_55435: received EOF
EchoServer_::1_55435: closing
サブプロセスの操作
他のプログラムやプロセスを操作したり、既存のコードを書き直さずに利用したり、Python内から利用できないライブラリや機能にアクセスしたりする必要があることがよくあります。 ネットワークI/Oと同様に、asyncioには、別のプログラムを開始してからそれと通信するための2つの抽象化が含まれています。
サブプロセスでのプロトコル抽象化の使用
この例では、コルーチンを使用してプロセスを起動し、Unixコマンドdfを実行して、ローカルディスクの空き領域を見つけます。 subprocess_exec()を使用してプロセスを起動し、dfコマンドの出力を読み取って解析する方法を知っているプロトコルクラスに関連付けます。 プロトコルクラスのメソッドは、サブプロセスのI / Oイベントに基づいて自動的に呼び出されます。 stdin引数とstderr引数の両方がNoneに設定されているため、これらの通信チャネルは新しいプロセスに接続されていません。
code: async_subprocess_protocol.py
import asyncio
import functools
from dfprotocol import DFProtocol
async def run_df(loop):
print('in run_df')
cmd_done = asyncio.Future(loop=loop)
factory = functools.partial(DFProtocol, cmd_done)
proc = loop.subprocess_exec(
factory,
'df', '-hl',
stdin=None,
stderr=None,
)
try:
print('launching process')
transport, protocol = await proc
print('waiting for process to complete')
await cmd_done
finally:
transport.close()
return cmd_done.result()
event_loop = asyncio.get_event_loop()
try:
return_code, results = event_loop.run_until_complete(
run_df(event_loop)
)
finally:
event_loop.close()
if return_code:
print(f'error exit {return_code}')
else:
print('\nFree space:')
for r in results:
print('{Mounted:25}: {Avail}'.format(**r))
DFProtocolクラスは、パイプを介して別のプロセスと通信するクラスのAPIを定義するSubprocessProtocolから派生しています。 完了した引数は、呼び出し元がプロセスの終了を監視するために使用するFutureであることが期待されます。
code: dfprotocol.py
import asyncio
class DFProtocol(asyncio.SubprocessProtocol):
def __init__(self, done_future):
self.done = done_future
self.buffer = bytearray()
super().__init__()
def connection_made(self, transport):
print(f'process started {transport.get_pid()}')
self.transport = transport
def pipe_data_received(self, fd, data):
print(f'read {len(data)} bytes from {self.FD_NAMESfd}') if fd == 1:
self.buffer.extend(data)
def process_exited(self):
print('process exited')
return_code = self.transport.get_returncode()
print(f'return code {return_code}')
if not return_code:
cmd_output = bytes(self.buffer).decode()
results = self._parse_results(cmd_output)
else:
results = []
self.done.set_result((return_code, results))
def parse_results(output):
print('parsing results')
# 出力には1行のヘッダーがあり、すべて1ワード
# 残りの行はファイルシステムごとに1つで 列はヘッダーと一致する
# マウントポイントの名前に空白がないことが前提
if not output:
return []
lines = output.splitlines()
results = [
dict(zip(headers, line.split()))
for line in devices
]
return results
ソケット通信と同様に、connection_made()は、新しいプロセスへの入力チャネルが設定されるときに呼び出されます。 transport引数は、BaseSubprocessTransportのサブクラスのインスタンスです。 プロセスが入力を受信するように構成されている場合は、プロセスによって出力されたデータを読み取り、プロセスの入力ストリームにデータを書き込むことができます。
プロセスが出力を生成すると、pipe_data_received()が呼び出され、データが発行されたファイル記述子と実際のデータがパイプから読み取られます。 プロトコルクラスは、後の処理のために、プロセスの標準出力チャネルからの出力をバッファに保存します。
プロセスが終了すると、process_exited()が呼び出されます。 プロセスの終了コードは、get_returncode()を呼び出すことによってトランスポートオブジェクトから利用できます。 この場合、エラーが報告されていない場合、Futureインスタンスを介して返される前に、使用可能な出力がデコードおよび解析されます。 エラーが発生した場合、結果は空であると見なされます。 futureの結果を設定すると、プロセスが終了したことがrun_df()に通知されるため、プロセスはクリーンアップしてから結果を返します。
コマンド出力は、ヘッダー名を出力の各行の値にマッピングする一連の辞書に解析され、結果のリストが返されます。
run_df()コルーチンはrun_until_complete()を使用して実行され、結果が調べられ、各デバイスの空き領域が出力されます。
以下の出力は、実行された一連の手順と、それが実行されたシステム上のディスクの空き領域を示しています。
code: bash
% python asyncio_subprocess_protocol.py
in run_df
launching process
process started 31517
waiting for process to complete
read 520 bytes from stdout
process exited
return code 0
parsing results
Free space:
/ : 198Gi
/System/Volumes/VM : 198Gi
/System/Volumes/Preboot : 198Gi
/System/Volumes/Update : 198Gi
/System/Volumes/Data : 198Gi
コルーチンとストリームを使用したサブプロセスの呼び出し
コルーチンを使用してプロセスを直接実行するには、Protocolサブクラスを介してプロセスにアクセスする代わりに、create_subprocess_exec()を呼び出し、パイプに接続するstdout、stderr、およびstdinを指定します。 サブプロセスを生成するコルーチンの結果は、サブプロセスを操作したり、サブプロセスと通信したりするために使用できるProcessインスタンスです。
code: asyncio_subprocess_coroutine.py
import asyncio
import asyncio.subprocess
from dfprotocol import parse_results
async def run_df():
print('in run_df')
buffer = bytearray()
create = asyncio.create_subprocess_exec(
'df', '-hl',
stdout=asyncio.subprocess.PIPE,
)
print('launching process')
proc = await create
print(f'process started {proc.pid}')
while True:
line = await proc.stdout.readline()
print('read {!r}'.format(line))
if not line:
print('no more output from command')
break
buffer.extend(line)
print('waiting for process to complete')
await proc.wait()
return_code = proc.returncode
print(f'return code {return_code}')
if not return_code:
cmd_output = bytes(buffer).decode()
results = parse_results(cmd_output)
else:
results = []
return (return_code, results)
event_loop = asyncio.get_event_loop()
try:
return_code, results = event_loop.run_until_complete(
run_df()
)
finally:
event_loop.close()
if return_code:
print(f'error exit {return_code}')
else:
print('\nFree space:')
for r in results:
print('{Mounted:25}: {Avail}'.format(**r))
この例では、dfはコマンドライン引数以外の入力を必要としないため、次のステップはすべての出力を読み取ることです。 プロトコルでは、一度に読み取るデータの量を制御することはできません。 この例ではreadline()を使用していますが、read()を直接呼び出して、行指向ではないデータを読み取ることもできます。 コマンドの出力は、プロトコルの例と同様にバッファリングされるため、後で解析できます。
readline()メソッドは、プログラムが終了したために出力がなくなると、空のバイト文字列を返します。 プロセスが適切にクリーンアップされるようにするには、次のステップはプロセスが完全に終了するのを待つことです。
その時点で、終了ステータスを調べて、出力を解析するか、出力が生成されなかったためにエラーを処理するかを決定できます。 解析ロジックは前の例と同じですが、非表示にするプロトコルクラスがないため、スタンドアロン関数(ここには示されていません)にあります。
データが解析された後、結果と終了コードは発信者に返されます。
実装の変更はrun_df()で分離されているため、メインプログラムはプロトコルベースの例に似ています。
dfからの出力は一度に1行ずつ読み取ることができるため、プログラムの進行状況を示すためにエコーされます。 それ以外の場合、出力は前の例と同様になります。
code: bash
$ python asyncio_subprocess_coroutine.py
in run_df
launching process
process started 40792
read b'Filesystem Size Used Avail Capacity iused ifree %iused Mounted on\n'
read b'/dev/disk1s5s1 466Gi 14Gi 199Gi 7% 567557 4881885323 0% /\n'
read b'/dev/disk1s4 466Gi 5.0Gi 199Gi 3% 6 4882452874 0% /System/Volumes/VM\n'
read b'
no more output from command
waiting for process to complete
return code 0
parsing results
Free space:
/ : 199Gi
/System/Volumes/VM : 199Gi
/System/Volumes/Preboot : 199Gi
/System/Volumes/Update : 199Gi
/System/Volumes/Data : 199Gi
サブプロセスへのデータ送信
前の例は両方とも、2番目のプロセスからデータを読み取るために単一の通信チャネルのみを使用していました。 多くの場合、処理のためにデータをコマンドに送信する必要があります。 この例では、入力ストリーム内の文字を変換するためのUnixコマンドtrを実行するコルーチンを定義します。 この場合、trは小文字を大文字に変換するために使用されます。
to_upper()コルーチンは、引数としてイベントループと入力文字列を取ります。 tr[:lower:][:upper:]を実行する2番目のプロセスを生成します。
code: asyncio_subprocess_coroutine_write.py
import asyncio
import asyncio.subprocess
async def to_upper(input):
print('in to_upper')
create = asyncio.create_subprocess_exec(
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
print('launching process')
proc = await create
print('pid {}'.format(proc.pid))
print('communicating with process')
stdout, stderr = await proc.communicate(input.encode())
print('waiting for process to complete')
await proc.wait()
return_code = proc.returncode
print('return code {}'.format(return_code))
if not return_code:
results = bytes(stdout).decode()
else:
results = ''
return (return_code, results)
MESSAGE = """
This message will be converted
to all caps.
"""
event_loop = asyncio.get_event_loop()
try:
return_code, results = event_loop.run_until_complete(
to_upper(MESSAGE)
)
finally:
event_loop.close()
if return_code:
print('error exit {}'.format(return_code))
else:
print('Original: {!r}'.format(MESSAGE))
print('Changed : {!r}'.format(results))
次に、to_upper()は、プロセスのcommunicate()メソッドを使用して、入力文字列をコマンドに送信し、結果のすべての出力を非同期で読み取ります。 同じメソッドのsubprocess.Popenバージョンと同様に、communicate()は完全な出力バイト文字列を返します。 コマンドがメモリに快適に収まるよりも多くのデータを生成する可能性がある場合、入力を一度に生成できないか、出力を段階的に処理する必要があります。
communication()を呼び出す代わりに、プロセスのstdin、stdout、およびstderrハンドルを直接使用することができます。
I / Oが完了したら、プロセスが完全に終了するのを待つことで、プロセスが適切にクリーンアップされます。
プログラムの主要部分は、変換されるメッセージ文字列を確立し、to_upper()を実行するようにイベントループを設定して、結果を出力します。
出力には、操作のシーケンスと、単純なテキストメッセージがどのように変換されるかが表示されます。
code: bash
$ python asyncio_subprocess_coroutine_write.py
in to_upper
launching process
pid 41849
communicating with process
waiting for process to complete
return code 0
Original: '\nThis message will be converted\nto all caps.\n'
Changed : '\nTHIS MESSAGE WILL BE CONVERTED\nTO ALL CAPS.\n'
Unixシグナルの受信
Unixシステムのイベント通知は通常、アプリケーションを中断し、ハンドラーをトリガーします。 asyncioとともに使用すると、シグナルハンドラーのコールバックは、イベントループによって管理される他のコルーチンおよびコールバックとインターリーブされます。 これにより、中断される機能が少なくなり、その結果、不完全な操作をクリーンアップするためのセーフガードを提供する必要があります。
シグナルハンドラーは、コルーチンではなく、通常の呼び出し可能である必要があります。
code: asyncio_signals.py
import asyncio
import functools
import os
import signal
def signal_handler(name):
print('signal_handler({!r})'.format(name))
event_loop = asyncio.get_event_loop()
event_loop.add_signal_handler(
signal.SIGHUP,
functools.partial(signal_handler, name='SIGHUP'),
)
event_loop.add_signal_handler(
signal.SIGUSR1,
functools.partial(signal_handler, name='SIGUSR1'),
)
event_loop.add_signal_handler(
signal.SIGINT,
functools.partial(signal_handler, name='SIGINT'),
)
async def send_signals():
pid = os.getpid()
print('starting send_signals for {}'.format(pid))
print(f'sending {name}')
os.kill(pid, getattr(signal, name))
# シグナルがプログラムフローを中断しないため、
# シグナルハンドラを実行できるようにするための歩留まり調整
print('yielding control')
await asyncio.sleep(0.01)
return
try:
event_loop.run_until_complete(send_signals())
finally:
event_loop.close()
シグナルハンドラーは、add_signal_handler()を使用して登録されます。 最初の引数はシグナルで、2番目の引数はコールバックです。 コールバックには引数が渡されないため、引数が必要な場合は、関数をfunctools.partial()でラップできます。
このサンプルプログラムは、コルーチンを使用して、os.kill()を介してシグナルを自身に送信します。 各シグナルが送信された後、コルーチンはハンドラーを実行できるように制御を生成します。 通常のアプリケーションでは、アプリケーションコードがイベントループに戻る場所が多くなり、このような人為的な結果は必要ありません。
メインプログラムは、すべてのシグナルを送信するまでsend_signals()を実行します。
出力は、send_signals()がシグナルの送信後に制御を生成したときにハンドラーがどのように呼び出されるかを示しています。
code: bash
% python asyncio_signals.py
starting send_signals for 42591
sending SIGHUP
yielding control
signal_handler('SIGHUP')
sending SIGHUP
yielding control
signal_handler('SIGHUP')
sending SIGUSR1
yielding control
signal_handler('SIGUSR1')
sending SIGINT
yielding control
signal_handler('SIGINT')
asyncioを使用したデバッグ
asyncioにはいくつかの便利なデバッグ機能が組み込まれています。
まず、イベントループはログを使用して、実行時にステータスメッセージを発行します。これらの一部は、アプリケーションでロギングが有効になっている場合に使用できます。その他は、より多くのデバッグメッセージを発行するようにループに指示することでオンにできます。 set_debug()を呼び出して、デバッグを有効にするかどうかを示すブール値を渡します。
asyncioに基づいて構築されたアプリケーションは、制御を生成できない貪欲なコルーチンに非常に敏感であるため、イベントループに組み込まれた低速のコールバックを検出するためのサポートがあります。デバッグを有効にしてオンにし、ループのslow_callback_durationプロパティを警告が発行されるまでの秒数に設定して「slow」の定義を制御します。
最後に、asyncioを使用するアプリケーションがコルーチンやその他のリソースの一部をクリーンアップせずに終了する場合は、アプリケーションコードの一部を実行できない論理エラーがあることを意味している可能性があります。 ResourceWarning警告を有効にすると、プログラムの終了時にこれらのケースが報告されます。
code: asyncio_debug.py
import sys
import time
import argparse
import asyncio
import logging
import warnings
parser = argparse.ArgumentParser('debugging asyncio')
parser.add_argument(
'-v',
dest='verbose',
default=False,
action='store_true',
)
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG,
format='%(levelname)7s: %(message)s',
stream=sys.stderr,
)
LOG = logging.getLogger('')
async def inner():
LOG.info('inner starting')
# Use a blocking sleep to simulate
# doing work inside the function.
time.sleep(0.1)
LOG.info('inner completed')
async def outer(loop):
LOG.info('outer starting')
await asyncio.ensure_future(loop.create_task(inner()))
LOG.info('outer completed')
event_loop = asyncio.get_event_loop()
if args.verbose:
LOG.info('enabling debugging')
# デバッグを有効にする
event_loop.set_debug(True)
# Make the threshold for "slow" tasks very very small for
# illustration. The default is 0.1, or 100 milliseconds.
# 説明のために、「遅い」タスクのしきい値を非常に小さくする
# デフォルトは0.1、つまり100ミリ秒
event_loop.slow_callback_duration = 0.001
# 非同期リソースの管理に関するすべての間違いを報告する
warnings.simplefilter('always', ResourceWarning)
LOG.info('entering event loop')
event_loop.run_until_complete(outer(event_loop))
デバッグを有効にせずに実行すると、このアプリケーションではすべてが正常に表示されます。
code: bash
$ python asyncio_debug.py
DEBUG: Using selector: KqueueSelector
INFO: entering event loop
INFO: outer starting
INFO: inner starting
INFO: inner completed
INFO: outer completed
デバッグをオンにすると、inner()が終了しても、設定されているslow_callback_durationよりも終了に時間がかかることや、プログラムの終了時にイベントループが適切に閉じられないことなど、問題のいくつかが明らかになります。
code: bash
% python asyncio_debug.py -v
DEBUG: Using selector: KqueueSelector
INFO: enabling debugging
INFO: entering event loop
INFO: outer starting
INFO: inner starting
INFO: inner completed
WARNING: Executing <Task finished coro=<inner() done, defined at asyncio_debug.py:24> result=None created at asyncio_debug.py:33> took 0.102 seconds
INFO: outer completed
参考
PEP 380 – Syntax for Delegating to a Subgenerator PEP 492 – Coroutines with async and await syntax PEP 3156 – Asynchronous IO Support Rebooted: the “asyncio” Module