Học bằng cách đọc theo thứ tự

asyncio nâng cao — Đồng thời với gather, Task và Queue

Học asyncio.gather chạy đồng thời và giữ thứ tự đầu vào, create_task để khởi rồi await sau, wait_for cho timeout, và asyncio.Queue cho producer / consumer qua ví dụ thật.

Xây trên async defawait, bài viết này trình bày cách chạy nhiều coroutine đồng thời. asyncio.gather cho "khởi tất cả và chờ tất cả", asyncio.create_task cho "khởi ngay, await sau", và asyncio.Queue cho producer / consumer — ba thứ này gần như bao phủ mọi mẫu async bạn sẽ viết trong dự án thật.

Về việc chạy code ở đây

async / await xoay quanh thời gian, nhưng runner của trang này gom output print lại và hiển thị một lần sau khi script chạy xong. Cảm giác output theo thời gian thực và thời gian trôi sẽ không giống môi trường Python thật. Bài viết dùng sơ đồ để làm rõ hành vi bên trong, nhưng nếu bạn muốn xem print xuất ra theo dòng chảy hoặc cảm nhận thời gian thực, hãy chạy trong môi trường Python local với asyncio.run.

3 API asyncio chính
asyncio.gatherchạy tất cả → await tất cảcreate_task / wait_forkhởi ngay, await sauasyncio.Queuechuyển giá trị qua lại
gather = "khởi tất cả và chờ tất cả". create_task / wait_for = "khởi ngay, await sau, có timeout tùy chọn". Queue = "buffer FIFO để chuyển giá trị giữa các coroutine". Mỗi chương của bài viết trình bày một cái theo lượt.

asyncio.gather — chạy nhiều coroutine đồng thời

"Khởi vài API đồng thời và chỉ tiếp tục khi mọi phản hồi đã về" — một use case async kinh điển. Code đồng bộ cộng dồn các thời gian phản hồi, nhưng gather xong trong thời gian của cái chậm nhất.

await tuần tự vs gather — chênh lệch thời gian
A (1 giây)B (1 giây)C (1 giây)→ tuần tự = 3 giâygather:A, B, C khởi cùng lúcTất cả chờ 1 giâyđồng thờiTất cả xong→ gather = 1 giây
await tuần tự chỉ bắt đầu lệnh gọi tiếp theo sau khi cái trước xong, nên tổng = tổng tất cả các tác vụ. gather khởi tất cả cùng lúc và chờ đồng thời, nên tổng = cái chậm nhất. Ba lệnh chờ 1 giây từ 3 giây → 1 giây.

asyncio.gather(coro1, coro2, ...) khởi mọi coroutine được truyền vào cùng lúctrả về một danh sách kết quả khi tất cả xong. Danh sách trả về theo thứ tự bạn truyền vào — không phải thứ tự hoàn thành — nên input và output luôn khớp.

import asyncio

async def fetch(name):
    await asyncio.sleep(1)        # Mô phỏng lệnh gọi API với chờ 1 giây
    return f"{name} xong"

# Khởi 3 song song → tất cả kết quả trong 1 giây (so với 3 giây tuần tự)
results = await asyncio.gather(
    fetch("A"),
    fetch("B"),
    fetch("C"),
)
print(results)                    # ['A xong', 'B xong', 'C xong']  ← thứ tự đầu vào
gather hoạt động — khởi cùng lúc → chờ tất cả → trả về theo thứ tự đầu vào
gather(A, B, C)A đang chạyawait chuyển raB đang chạyđang chuyển → đồng thờiC đang chạy→ tất cả xong = list kết quả
Ba coroutine khởi cùng lúc, chờ đến khi tất cả xong, và kết quả trả về theo thứ tự đầu vào dạng danh sách. Chúng tiến triển đồng thời khi các điểm await của chúng đổi lượt.
gather trả về thứ tự đầu vào, không phải thứ tự hoàn thành
Input:gather(A, B, C)Đang chạy:thứ tự xong có thể làB → A → CTrả về:[A_result, B_result, C_result]Thứ tự xongkhông được đảm bảoThứ tự đầu vàogiữ nguyên
Bên trong thứ tự hoàn thành có thể xáo trộn, nhưng danh sách kết quả giữ thứ tự bạn truyền vào. Đối với danh sách urls đầu vào bạn nhận một danh sách kết quả tại cùng index — dễ ánh xạ ngược về input sau.

Điều gì xảy ra khi có ngoại lệ

Mặc định, nếu bất kỳ coroutine nào bên trong gather raise, toàn bộ lệnh gọi raise và hủy bỏ. Để thu thập ngoại lệ thay vào đó, hãy truyền asyncio.gather(..., return_exceptions=True)các đối tượng ngoại lệ sẽ trở lại như là phần tử danh sách để bạn có thể kiểm tra type sau đó. Hữu ích khi bạn muốn gọi nhiều API và chấp nhận thất bại một phần.

Chạy 3 coroutine đồng thời với asyncio.gather và xác nhận thứ tự hoàn thành có thể khác nhưng giá trị trả về giữ nguyên thứ tự đầu vào. Mỗi tác vụ chờ một khoảng thời gian khác nhau để bạn có thể quan sát thứ tự hoàn thành (B → A → C) và thứ tự trả về ([A, B, C]) lệch nhau.

① Hãy thêm import asyncio.

② Hãy định nghĩa async def task(name, secs):await asyncio.sleep(secs) rồi return f"{name} xong".

③ Hãy chạy với await asyncio.gather(task("A", 0.3), task("B", 0.1), task("C", 0.5)) và lưu vào results.

④ Hãy in print("results:", results).

⑤ Hãy in print("count:", len(results)).

(Chạy thành công và phần giải thích sẽ hiện ra.)

Python Editor

Chạy code để xem đầu ra

create_task và wait_for — khởi ngay, await sau, có timeout

asyncio.create_task(coro) bọc một coroutine vào đối tượng "Task" và khởi nó ngay lập tức. Khác với "khởi tất cả và chờ tất cả" của gather, đây là mẫu async kinh điển "khởi ngay, làm việc khác, rồi await task để lấy kết quả sau".

asyncio.wait_for(awaitable, timeout=N)lưới an toàn: "raise TimeoutError nếu không xong trong N giây". Tiêu chuẩn là kết hợp với Task như một cơ chế dự phòng khi Web API không phản hồi.

import asyncio

async def slow_api():
    await asyncio.sleep(2)
    return "phản hồi"

# Khởi với create_task (Task bắt đầu chạy ngay)
task = asyncio.create_task(slow_api())

# Việc khác có thể diễn ra trong lúc task chạy ở phía sau
print("task đã khởi, làm việc khác...")

# Lấy kết quả với await khi bạn cần
result = await task
print(result)                       # phản hồi

# wait_for thêm timeout (bỏ cuộc sau 1 giây)
try:
    result = await asyncio.wait_for(slow_api(), timeout=1.0)
except asyncio.TimeoutError:
    print("timeout!")               # 2 giây phản hồi vs 1 giây giới hạn → vào đây
Vòng đời Task
pendingngay sau create_taskrunningloop đang chạy nódoneđã xong (return)
create_task tạo một Task pending; loop đưa nó sang running; cuối cùng nó đến done. Ba trạng thái này là dòng chảy cơ bản.

Ba con đường đến "done" — return / exception / cancel

Có 3 con đường đến done: (1) hoàn thành bình thườngreturn đã tạo ra giá trị, (2) ngoại lệ — có gì đó raise bên trong, (3) canceltask.cancel() đã ngắt nó. task.done() trả về True cho cả ba con đường, và task.exception() lấy ra ngoại lệ nếu có.

create_task và wait_for
asyncio.create_task( coroutine)Đối tượng Task(chạy ở bg)await task→ kết quảasyncio.wait_for( task, timeout=N)≤ N giây → kết quả> N giây→ TimeoutError
create_task bọc một coroutine vào Task và khởi ngay lập tức. Task tiếp tục chạy ở phía sau, và await task lấy kết quả. Thêm wait_for(task, timeout=N) như một lưới an toàn "bỏ cuộc sau N giây".
gather vs create_task — khi nào dùng cái nào
Muốn tất cả kết quảcùng lúcasyncio.gatherKhởi, làm việckhác, await sauasyncio.create_task
Dùng gather khi bạn không tiếp tục cho đến khi mọi kết quả đã về. Dùng create_task khi bạn muốn khởi nó, làm việc khác, rồi lấy kết quả sau. Cả hai chạy mọi thứ đồng thời — phần đó giống nhau.

Các method hữu ích của đối tượng Task

Đối tượng Task được trả về bởi create_task hỗ trợ các thao tác hữu ích: task.cancel() để ngắt, task.done() để kiểm tra hoàn thành, task.result() để lấy kết quả của một Task đã xong (raise nếu chưa xong), và task.exception() để lấy ngoại lệ đã raise. Tiện cho kiểm soát công việc nền chạy lâu.

Khởi hai Task với create_task, làm việc khác ở giữa, rồi lấy kết quả của chúng.

① Hãy thêm import asyncio.

② Hãy định nghĩa async def task(name):await asyncio.sleep(0) để chuyển ra, rồi return f"{name} xong".

③ Hãy dùng asyncio.create_task(task("A"))asyncio.create_task(task("B")) để khởi 2 Task, lưu chúng vào t1t2.

④ Trong khi các Task chạy, hãy in tasks fired.

⑤ Hãy dùng await t1await t2 để lấy mỗi kết quả và in dạng A: ◯ / B: ◯.

Python Editor

Chạy code để xem đầu ra

Dùng asyncio.wait_for để đặt timeout và thử cả trường hợp trong giới hạn và quá giới hạn.

① Hãy thêm import asyncio.

② Hãy định nghĩa async def slow_task():await asyncio.sleep(0.5) để chờ 0.5 giây, rồi return "phản hồi xong".

await asyncio.wait_for(slow_task(), timeout=1.0)xong trong 1 giây, nên thành công. In dạng success: ◯.

try: / except asyncio.TimeoutError: quanh await asyncio.wait_for(slow_task(), timeout=0.1)0.1 giây không đủ, nên TimeoutError sẽ kích hoạt. Bắt nó và in timeout!.

Python Editor

Chạy code để xem đầu ra

asyncio.Queue — producer / consumer

"Tôi muốn một coroutine đẩy giá trị vào và một cái khác tiêu thụ chúng" — một mẫu thường gặp trong scraping, xử lý job, xử lý stream, và các tình huống khác nơi hai vòng lặp với tốc độ khác nhau cần ăn khớp.

Producer → Queue → Consumer
Đẩy URL vàotừng cái mộtQueue(hàng chờ)Lấy URL vàfetch mỗi trangĐẩy job vàotừng cái mộtQueue(hàng chờ)Lấy job vàxử lý theo thứ tựĐẩy dữ liệumới từng cáiQueue(hàng chờ)Lấy dữ liệu vàchạy phân tíchputgetputgetputget
Producer ở bên trái đẩy giá trị vào Queue; consumer ở bên phải lấy chúng ra và xử lý. Cùng mẫu này phù hợp với scraping web pages, xử lý jobs theo thứ tự, xử lý stream dữ liệu đến, và nhiều trường hợp khác.

asyncio.Queue là một queue async để chuyển giá trị giữa các coroutine (một FIFO = First In First Out — giá trị ra theo thứ tự bạn đẩy vào). Dùng await queue.put(value) để chèn và await queue.get() để lấy — và khi queue rỗng / đầy, nó tự động chuyển sang tác vụ khác và chờ, giữ mọi thứ đơn giản.

import asyncio

queue = asyncio.Queue()

# Đẩy giá trị vào
await queue.put("item-1")
await queue.put("item-2")

# Lấy chúng ra (FIFO = vào trước, ra trước)
print(await queue.get())            # item-1
print(await queue.get())            # item-2

# get() trên một queue rỗng chuyển sang tác vụ khác và chờ một giá trị
# print(await queue.get())          # ← tạm dừng ở đây cho đến khi ai đó put
producer / consumer với Queue
producerawait queue.put(item)asyncio.Queuebuffer FIFOconsumerawait queue.get()putget
Producer làm await queue.put(...) để chèn; consumer làm await queue.get() để lấy. FIFO (vào trước, ra trước) giữ thứ tự, và put / get là async, nên chúng tự động chuyển sang tác vụ khác khi queue rỗng / đầy.
Trạng thái Queue và hành vi await
await get()→ chờ giá trịQueue rỗngawait put(item)→ chèn ngayawait get()→ lấy ngayQueue bình thường(0 < count < maxsize)await put(item)→ chèn ngayawait get()→ lấy ngayQueue đầy(chỉ với maxsize)await put(item)→ chờ chỗ trống
Cột giữa cho thấy trạng thái của Queue, và các cột bên trái/phải cho thấy await get() và await put() làm gì. Xanh = tiếp tục ngay / Vàng = chuyển sang tác vụ khác và chờ — màu phân biệt rõ ràng. Đó là điều cho phép chờ không cần polling.

Dừng sạch với một sentinel

Ở phía consumer, while True: item = await queue.get() chờ mãi cho đến khi có gì đó đến. Mẫu là cho producer đẩy một marker kết thúc ở cuối (thường là None, hoặc một sentinel tùy chỉnh = một đối tượng bảo vệ chuyên dụng bạn có thể phân biệt với dữ liệu thật). Consumer thoát khỏi vòng lặp ngay khi thấy marker.

Thử put và get bên trong một coroutine để xác nhận cơ bản về Queue và FIFO (vào trước, ra trước).

① Hãy thêm import asyncio và tạo một asyncio.Queue rỗng.

await queue.put("a"), "b", "c" — chèn 3 giá trị theo thứ tự.

③ Gọi await queue.get() 3 lần và gom các giá trị vào một danh sách, rồi in dạng pulled: ◯.

Python Editor

Chạy code để xem đầu ra

Chạy producer / consumer đồng thời với gather

Queue thực sự đáng giá khi nhiều coroutine chuyển giá trị qua lại. Tách "đẩy" và "tiêu thụ" thành các coroutine riêng và chạy chúng đồng thời với gatherawait put / await get đóng vai trò là điểm chuyển đổi, và hai nửa ăn khớp tự nhiên.

import asyncio

async def producer(queue):
    for i in range(3):
        await queue.put(f"item-{i}")
    await queue.put(None)           # marker kết thúc

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:            # break trên marker kết thúc
            break
        print(f"đang xử lý: {item}")

queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
# Output:
# đang xử lý: item-0
# đang xử lý: item-1
# đang xử lý: item-2
timeline producer / consumer
producer:put("item-0")put("item-1")put("item-2")put(None)marker kết thúcconsumer:get() → item-0get() → item-1get() → item-2get() → None→ break
Mỗi lần producer thực hiện một put, consumer đang chờ trên get được mở khóa và nhận giá trị. put(None) cuối cùng là marker kết thúc để consumer có thể thoát ra sạch.

Xây dựng một thiết lập trong đó producer đẩy 3 item và consumer thu thập chúng vào một danh sách. Dùng None làm marker kết thúc.

① Hãy thêm import asyncio, tạo một asyncio.Queue rỗng, và một danh sách rỗng results.

② Hãy định nghĩa async def producer(queue): — đẩy f"item-{i}" 3 lần, rồi đẩy None làm marker kết thúc.

③ Hãy định nghĩa async def consumer(queue, results):while True: gọi get; nếu None thì break, ngược lại append vào results.

④ Chạy với asyncio.gather(producer(queue), consumer(queue, results)) và in dạng count: ◯ / first: ◯ / last: ◯.

Python Editor

Chạy code để xem đầu ra
QUIZ

Kiểm tra kiến thức

Hãy trả lời từng câu hỏi một.

Câu 1Thứ tự của giá trị trả về từ asyncio.gather(task("A"), task("B"), task("C")) là gì?

Câu 2asyncio.create_task(coroutine) trả về cái gì?

Câu 3Cách chuẩn để dừng consumer trong asyncio.Queue là gì?