Q1asyncio.gather(task("A"), task("B"), task("C"))の戻り値の順序はどうなりますか?
asyncio 応用 — gather と Task と Queue で並行処理
asyncio.gatherの並行実行と入力順、create_taskの投げて後で待つ、wait_forのタイムアウト、asyncio.Queueのproducer/consumerを実例で学べます。
async defとawaitの基本の上に、複数のコルーチンを並行に走らせる書き方を整理します。asyncio.gatherで「全部投げて全部待つ」、asyncio.create_taskで「投げて、後で待つ」、asyncio.Queueで「producer / consumer」 — この 3 つで実プロジェクトの async 処理はほとんどカバーできます。
本環境での実行について
async / await は時間軸の挙動が肝心ですが、本環境のランナーはprintの出力をスクリプト完走後に一括表示する実装のため、リアルタイムな経過や体感速度は実機と同じには見えません。本記事は図で内部動作をわかりやすく解説しますが、printの流れや所要時間を実感したい場合はローカルの Python 環境(asyncio.runで実行)で動かすのがおすすめです。
asyncio.gather — 複数のコルーチンを並行に走らせる
「複数の API を並行に呼んで、全部のレスポンスが揃ってから次に進みたい」 — async の典型的な使いどころです。同期的に書くと API の応答時間が合計されますが、gatherを使えば最も遅い 1 件と同じ時間で済みます。
asyncio.gather(コルーチン 1, コルーチン 2, ...)は、渡したコルーチンを全部同時に開始し、全員の完了を待ってから結果のリストを返す関数です。戻り値はコルーチンに渡した順に並んだ結果のリスト — 完了順ではないので、入力と出力の対応が崩れません。
import asyncio
async def fetch(name):
await asyncio.sleep(1) # API 呼び出しを 1 秒待ちで再現
return f"{name} 完了"
# 3 件を並行に投げて → 1 秒で全部揃う (順次なら 3 秒)
results = await asyncio.gather(
fetch("A"),
fetch("B"),
fetch("C"),
)
print(results) # ['A 完了', 'B 完了', 'C 完了'] ← 入力順
awaitが切り替わるタイミングで並行に進む。urlsのリストに対して結果も同じインデックスで対応するので、後の処理で入力との突き合わせが楽になる。例外が出たときの挙動
gather 内の 1 つでも例外が出ると、既定では全体が例外を上げて中断します。例外も結果として収集したい場合はasyncio.gather(..., return_exceptions=True)を渡すと、例外オブジェクトもリストの要素として返るので、後で型判定して扱えます。複数の API を叩いて部分的な失敗を許容したいときに有用です。
create_task と wait_for — 投げて、後で待つ + タイムアウト
asyncio.create_task(コルーチン)は、コルーチンを「Task」というオブジェクトに包んで、その場で実行を開始する関数です。gatherのように「全部投げて全部待つ」ではなく、「先に投げておいて、必要になったタイミングでawait taskで結果を取り出す」という非同期の典型パターンに使えます。
asyncio.wait_for(awaitable, timeout=N)は「N 秒以内に終わらなければ TimeoutError を上げる」という安全弁です。Web API が応答しないときの保険として、実プロジェクトでは Task 化と組み合わせるのが標準です。
import asyncio
async def slow_api():
await asyncio.sleep(2)
return "応答"
# create_task で投げる (Task はその場で開始)
task = asyncio.create_task(slow_api())
# Task は裏で進む間、別の処理ができる
print("Task 投げ済み、別の処理...")
# 必要なときに await で結果を回収
result = await task
print(result) # 応答
# wait_for でタイムアウト付き (1 秒で諦める)
try:
result = await asyncio.wait_for(slow_api(), timeout=1.0)
except asyncio.TimeoutError:
print("タイムアウト!") # 2 秒応答 vs 1 秒で諦め → こちらに入る
create_taskでpending(待機中)の Task が生まれ、イベントループがrunning(実行中)に進める。最終的にdone(完了)に行き着く。これら 3 状態が基本の流れ。done の内訳 — 正常 / 例外 / cancel
doneに到達するパスは 3 つあります: (1) 正常完了 = returnで値を返した、(2) 例外 = 内部で例外が上がって終了、(3) cancel = task.cancel()で中断。task.done()はどのパスでも Trueを返し、task.exception()で例外を取り出せます。
gather。先に投げて、別の処理を挟んでから後で結果を回収したいならcreate_task。どちらも並行実行という点は同じ。Task オブジェクトの便利メソッド
create_taskの戻り値の Task オブジェクトは、task.cancel()で中断、task.done()で完了したか確認、task.result()で完了済みの結果を取り出す(未完了なら例外)、task.exception()で例外を取り出す、といった操作が可能です。長時間走るバックグラウンド処理を制御したいときに役立ちます。
asyncio.Queue — producer / consumer
「投入する側」と「消費する側」を別のコルーチンに分けて書きたい — スクレイピング・ジョブ処理・ストリーム処理など、速度の違う 2 つの処理を噛み合わせる場面でよく出てきます。
asyncio.Queueはコルーチン間で値を受け渡す非同期キュー(FIFO = First In First Out、先に入れた値から先に取り出すバッファ)です。await queue.put(値)で投入、await queue.get()で取り出し、キューが空 / 満杯のときは自動で他のタスクに切り替わって待ってくれるので、シンプルに書けます。
import asyncio
queue = asyncio.Queue()
# put で投入
await queue.put("item-1")
await queue.put("item-2")
# get で取り出し (FIFO = 先入れ先出し)
print(await queue.get()) # item-1
print(await queue.get()) # item-2
# 空のときに get すると、値が来るまで他に切り替わって待つ
# print(await queue.get()) # ← ここで他のコルーチンが put するまで一時停止
await queue.put(...)で投入、consumerがawait queue.get()で取り出す。FIFO(先入れ先出し)で順序が保たれ、put / get は asyncなので空 / 満杯のときに自動で他のタスクに切り替わる。await get()と右のawait put()の挙動が決まる。緑 = 即座に進む / 黄 = 他のタスクに切り替えて待つで色分けしてある。これによりポーリングなしで自然な待ち合わせが書ける。終了マーカーで安全に止める
consumer 側のwhile True: item = await queue.get()は、何か取り出すまで永遠に待つので、「もうデータが無い」と伝える終了マーカー(典型的にはNone、またはsentinel = データと区別できる専用の番兵オブジェクト)を producer が最後に入れる、というパターンを使います。consumer は取り出した値が None なら break、で安全にループを抜けます。
producer / consumerを gather で並行実行
Queue の真価は複数のコルーチンの間で値を受け渡す場面で発揮されます。「投入する側」と「消費する側」を別のコルーチンに分けて gather で並行実行すると、await put / await getが切り替えポイントとして機能し、自然に噛み合います。
import asyncio
async def producer(queue):
for i in range(3):
await queue.put(f"item-{i}")
await queue.put(None) # 終了マーカー
async def consumer(queue):
while True:
item = await queue.get()
if item is None: # 終了マーカーを受けたら break
break
print(f"処理: {item}")
queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
# 出力:
# 処理: item-0
# 処理: item-1
# 処理: item-2
putするたびに、待っていたconsumerのgetが解除されて値を受け取る。最後のput(None)が終了マーカーになり、consumer はbreakで安全にループを抜ける。理解度チェック
まずは1問ずつ答えてみましょう。
Q2asyncio.create_task(コルーチン)の戻り値はどれですか?
Q3asyncio.Queueでconsumerが止まるタイミングを作る一般的な方法はどれですか?