順番に読み進めながら学べます

asyncio 応用 — gather と Task と Queue で並行処理

asyncio.gatherの並行実行と入力順、create_taskの投げて後で待つ、wait_forのタイムアウト、asyncio.Queueのproducer/consumerを実例で学べます。

async defawaitの基本の上に、複数のコルーチンを並行に走らせる書き方を整理します。asyncio.gather「全部投げて全部待つ」asyncio.create_task「投げて、後で待つ」asyncio.Queue「producer / consumer」 — この 3 つで実プロジェクトの async 処理はほとんどカバーできます。

本環境での実行について

async / await は時間軸の挙動が肝心ですが、本環境のランナーはprintの出力をスクリプト完走後に一括表示する実装のため、リアルタイムな経過や体感速度は実機と同じには見えません。本記事は図で内部動作をわかりやすく解説しますが、printの流れや所要時間を実感したい場合はローカルの Python 環境(asyncio.runで実行)で動かすのがおすすめです。

asyncio 応用の 3 つの主要 API
asyncio.gather全部並行 → 全部待つcreate_task / wait_for投げて後で待つasyncio.Queue値の受け渡し
gatherは「全部投げて全部一気に待つ」、create_task / wait_forは「投げて、必要なときに待つ + タイムアウト」、Queueは「コルーチン間で値を受け渡す FIFO バッファ」。本記事の 3 章でこの 3 つを順に扱う。

asyncio.gather — 複数のコルーチンを並行に走らせる

「複数の API を並行に呼んで、全部のレスポンスが揃ってから次に進みたい」 — async の典型的な使いどころです。同期的に書くと API の応答時間が合計されますが、gatherを使えば最も遅い 1 件と同じ時間で済みます。

順次 await と gather の時間差
A (1 秒)B (1 秒)C (1 秒)→ 順次は計 3 秒gather:A, B, C 同時開始全員 1 秒並行で待つ全員完了→ gather は計 1 秒
順次 awaitは 1 つ完了してから次が始まるので合計 = 全タスクの和gather全部同時に開始し並行で待つので合計 = 最も遅い 1 件分で済む。3 件の 1 秒待ちなら 3 秒 → 1 秒に短縮。

asyncio.gather(コルーチン 1, コルーチン 2, ...)は、渡したコルーチンを全部同時に開始し、全員の完了を待ってから結果のリストを返す関数です。戻り値はコルーチンに渡した順に並んだ結果のリスト — 完了順ではないので、入力と出力の対応が崩れません。

import asyncio

async def fetch(name):
    await asyncio.sleep(1)        # API 呼び出しを 1 秒待ちで再現
    return f"{name} 完了"

# 3 件を並行に投げて → 1 秒で全部揃う (順次なら 3 秒)
results = await asyncio.gather(
    fetch("A"),
    fetch("B"),
    fetch("C"),
)
print(results)                    # ['A 完了', 'B 完了', 'C 完了']  ← 入力順
gather の動き — 同時開始 → 全員待つ → 入力順で返す
gather(A, B, C)A 実行await で他に切替B 実行切り替えながら並行C 実行→ 全員完了で結果リスト
3 つのコルーチンを同時に開始し、全員が完了するまで待ってから結果を入力順のリストで返す。各コルーチンのawait切り替わるタイミングで並行に進む。
gather の戻り値は完了順ではなく入力順
入力:gather(A, B, C)実行中:完了は B → A → Cの順だったとしても戻り値:[A_result, B_result, C_result]完了の前後は保証されない入力順は崩れない
内部では完了順序が前後することがあるが、戻り値のリストは渡した順を保つurlsのリストに対して結果も同じインデックスで対応するので、後の処理で入力との突き合わせが楽になる。

例外が出たときの挙動

gather 内の 1 つでも例外が出ると、既定では全体が例外を上げて中断します。例外も結果として収集したい場合はasyncio.gather(..., return_exceptions=True)を渡すと、例外オブジェクトもリストの要素として返るので、後で型判定して扱えます。複数の API を叩いて部分的な失敗を許容したいときに有用です。

3 つのコルーチンを asyncio.gather で並行実行し、完了順は前後しても戻り値は入力順であることを確認します。各タスクの sleep 時間を変えて、完了順 (B → A → C) と戻り値順 ([A, B, C]) のズレを観察してみます。

import asyncioを書いてください

async def task(name, secs):を定義してください — await asyncio.sleep(secs)で待ってから、f"{name} の結果"returnします

await asyncio.gather(task("A", 0.3), task("B", 0.1), task("C", 0.5))で 3 つを並行実行し、結果をresultsに入れてください

print("結果:", results)で表示してください

print("件数:", len(results))で結果の件数を表示してください

(正しく実行できれば解説が表示されます)

Python エディタ

コードを実行してください

create_task と wait_for — 投げて、後で待つ + タイムアウト

asyncio.create_task(コルーチン)は、コルーチンを「Task」というオブジェクトに包んで、その場で実行を開始する関数です。gatherのように「全部投げて全部待つ」ではなく、「先に投げておいて、必要になったタイミングでawait taskで結果を取り出す」という非同期の典型パターンに使えます。

asyncio.wait_for(awaitable, timeout=N)「N 秒以内に終わらなければ TimeoutError を上げる」という安全弁です。Web API が応答しないときの保険として、実プロジェクトでは Task 化と組み合わせるのが標準です。

import asyncio

async def slow_api():
    await asyncio.sleep(2)
    return "応答"

# create_task で投げる (Task はその場で開始)
task = asyncio.create_task(slow_api())

# Task は裏で進む間、別の処理ができる
print("Task 投げ済み、別の処理...")

# 必要なときに await で結果を回収
result = await task
print(result)                       # 応答

# wait_for でタイムアウト付き (1 秒で諦める)
try:
    result = await asyncio.wait_for(slow_api(), timeout=1.0)
except asyncio.TimeoutError:
    print("タイムアウト!")           # 2 秒応答 vs 1 秒で諦め → こちらに入る
Task のライフサイクル
pendingcreate_task 直後runningループが実行中done完了 (return)
create_taskpending(待機中)の Task が生まれ、イベントループがrunning(実行中)に進める。最終的にdone(完了)に行き着く。これら 3 状態が基本の流れ。

done の内訳 — 正常 / 例外 / cancel

doneに到達するパスは 3 つあります: (1) 正常完了 = returnで値を返した、(2) 例外 = 内部で例外が上がって終了、(3) cancel = task.cancel()で中断。task.done()どのパスでも Trueを返し、task.exception()で例外を取り出せます。

create_task と wait_for の関係
asyncio.create_task( コルーチン)Task オブジェクト(裏で実行中)await task→ 結果asyncio.wait_for( task, timeout=N)N 秒以内 → 結果N 秒超過→ TimeoutError
create_taskはコルーチンをTask に包んで即時開始。Task は裏で進行し続け、await taskで結果を回収できる。wait_for(task, timeout=N)N 秒で諦める安全弁を付けられる。
gather と create_task の使い分け
全部の結果を一括で受け取りたいasyncio.gather投げて、別処理を挟んでから待ちたいasyncio.create_task
全部の結果が揃うまで何もしないならgather先に投げて、別の処理を挟んでから後で結果を回収したいならcreate_task。どちらも並行実行という点は同じ。

Task オブジェクトの便利メソッド

create_taskの戻り値の Task オブジェクトは、task.cancel()で中断、task.done()で完了したか確認、task.result()で完了済みの結果を取り出す(未完了なら例外)、task.exception()で例外を取り出す、といった操作が可能です。長時間走るバックグラウンド処理を制御したいときに役立ちます。

create_task で 2 つのタスクを投げて、間に別の処理を挟んでから結果を回収します

import asyncioを書いてください

async def task(name):を定義してください — 中でawait asyncio.sleep(0)で他のタスクに切り替えてから、f"{name} の結果"returnします

asyncio.create_task(task("A"))asyncio.create_task(task("B"))2 つの Task を起動し、t1t2に入れてください

④ Task が裏で進行している間に、タスク投げ済みと表示してください

await t1await t2でそれぞれの結果を取り出し、A: ◯ / B: ◯の形で表示してください

Python エディタ

コードを実行してください

asyncio.wait_for でタイムアウトを設定して、間に合う場合と間に合わない場合の両方を試します。

import asyncioを書いてください

async def slow_task():を定義してください — await asyncio.sleep(0.5)で 0.5 秒待ってから"応答完了"returnします

await asyncio.wait_for(slow_task(), timeout=1.0)1 秒以内に完了するので成功するはず — 結果を成功: ◯の形で表示してください

try: / except asyncio.TimeoutError:await asyncio.wait_for(slow_task(), timeout=0.1)を実行してください — 0.1 秒では間に合わないので TimeoutError が発生、捕捉してタイムアウト!と表示してください

Python エディタ

コードを実行してください

asyncio.Queue — producer / consumer

「投入する側」と「消費する側」を別のコルーチンに分けて書きたい — スクレイピング・ジョブ処理・ストリーム処理など、速度の違う 2 つの処理を噛み合わせる場面でよく出てきます。

投入側 → Queue → 消費側 のパターン
ページの URL を次々入れるQueue(待ち行列)URL を取り出してページを取得新しいデータを次々入れるQueue(待ち行列)データを取り出して計算するputgetputget
左の投入側が値を Queue に入れ、右の消費側が取り出して処理する。Web ページ収集・仕事の順次処理・刻々と入るデータの処理など、多くの場面に同じパターンが当てはまる。

asyncio.Queueコルーチン間で値を受け渡す非同期キュー(FIFO = First In First Out、先に入れた値から先に取り出すバッファ)です。await queue.put(値)で投入、await queue.get()で取り出し、キューが空 / 満杯のときは自動で他のタスクに切り替わって待ってくれるので、シンプルに書けます。

import asyncio

queue = asyncio.Queue()

# put で投入
await queue.put("item-1")
await queue.put("item-2")

# get で取り出し (FIFO = 先入れ先出し)
print(await queue.get())            # item-1
print(await queue.get())            # item-2

# 空のときに get すると、値が来るまで他に切り替わって待つ
# print(await queue.get())          # ← ここで他のコルーチンが put するまで一時停止
Queue でのproducer / consumer
producerawait queue.put(item)asyncio.QueueFIFO バッファconsumerawait queue.get()putget
producerawait queue.put(...)で投入、consumerawait queue.get()で取り出す。FIFO(先入れ先出し)で順序が保たれ、put / get は asyncなので空 / 満杯のときに自動で他のタスクに切り替わる。
Queue の状態と await の挙動
await get()→ 来るまで他に切替Queue 空await put(item)→ 即座に投入await get()→ 即座に取り出しQueue 通常(0 < 件数 < maxsize)await put(item)→ 即座に投入await get()→ 即座に取り出しQueue 満杯(maxsize 設定時のみ)await put(item)→ 空くまで他に切替
中央の Queue の状態によって、左のawait get()と右のawait put()の挙動が決まる。緑 = 即座に進む / 黄 = 他のタスクに切り替えて待つで色分けしてある。これによりポーリングなしで自然な待ち合わせが書ける。

終了マーカーで安全に止める

consumer 側のwhile True: item = await queue.get()は、何か取り出すまで永遠に待つので、「もうデータが無い」と伝える終了マーカー(典型的にはNone、またはsentinel = データと区別できる専用の番兵オブジェクト)を producer が最後に入れる、というパターンを使います。consumer は取り出した値が None なら break、で安全にループを抜けます。

1 つのコルーチンの中で put と get を試して、Queue の基本動作と FIFO(先入れ先出し)を確認します

import asyncioと空のasyncio.Queueを作ってください

await queue.put("a") "b" "c"の 3 件を順に投入してください

await queue.get()を 3 回呼んで取り出した値をリストにし、取り出し: ◯の形で表示してください

Python エディタ

コードを実行してください

producer / consumerを gather で並行実行

Queue の真価は複数のコルーチンの間で値を受け渡す場面で発揮されます。「投入する側」と「消費する側」を別のコルーチンに分けて gather で並行実行すると、await put / await getが切り替えポイントとして機能し、自然に噛み合います。

import asyncio

async def producer(queue):
    for i in range(3):
        await queue.put(f"item-{i}")
    await queue.put(None)           # 終了マーカー

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:            # 終了マーカーを受けたら break
            break
        print(f"処理: {item}")

queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
# 出力:
# 処理: item-0
# 処理: item-1
# 処理: item-2
producer / consumer の並行実行タイムライン
producer:put("item-0")put("item-1")put("item-2")put(None)終了マーカーconsumer:get() → item-0get() → item-1get() → item-2get() → None→ break
producerputするたびに、待っていたconsumergetが解除されて値を受け取る。最後のput(None)終了マーカーになり、consumer はbreakで安全にループを抜ける。

producerが 3 件 put し、consumerが取り出してリストに集める構成を試します。終了マーカーはNoneです。

import asyncioを書き、空のasyncio.Queueと空のリストresultsを用意してください

async def producer(queue):を定義してください — f"item-{i}"を 3 件 put し、最後に終了マーカーとしてNoneを put します

async def consumer(queue, results):を定義してください — while True:で get を呼び、Noneなら break、それ以外はresultsに append します

asyncio.gather(producer(queue), consumer(queue, results))で並行実行し、受信件数: ◯ / 最初: ◯ / 最後: ◯の形で表示してください

Python エディタ

コードを実行してください
QUIZ

理解度チェック

まずは1問ずつ答えてみましょう。

Q1asyncio.gather(task("A"), task("B"), task("C"))の戻り値の順序はどうなりますか?

Q2asyncio.create_task(コルーチン)の戻り値はどれですか?

Q3asyncio.Queueconsumerが止まるタイミングを作る一般的な方法はどれですか?