Câu 1Thứ tự của giá trị trả về từ asyncio.gather(task("A"), task("B"), task("C")) là gì?
asyncio nâng cao — Đồng thời với gather, Task và Queue
Học asyncio.gather chạy đồng thời và giữ thứ tự đầu vào, create_task để khởi rồi await sau, wait_for cho timeout, và asyncio.Queue cho producer / consumer qua ví dụ thật.
Xây trên async def và await, bài viết này trình bày cách chạy nhiều coroutine đồng thời. asyncio.gather cho "khởi tất cả và chờ tất cả", asyncio.create_task cho "khởi ngay, await sau", và asyncio.Queue cho producer / consumer — ba thứ này gần như bao phủ mọi mẫu async bạn sẽ viết trong dự án thật.
Về việc chạy code ở đây
async / await xoay quanh thời gian, nhưng runner của trang này gom output print lại và hiển thị một lần sau khi script chạy xong. Cảm giác output theo thời gian thực và thời gian trôi sẽ không giống môi trường Python thật. Bài viết dùng sơ đồ để làm rõ hành vi bên trong, nhưng nếu bạn muốn xem print xuất ra theo dòng chảy hoặc cảm nhận thời gian thực, hãy chạy trong môi trường Python local với asyncio.run.
asyncio.gather — chạy nhiều coroutine đồng thời
"Khởi vài API đồng thời và chỉ tiếp tục khi mọi phản hồi đã về" — một use case async kinh điển. Code đồng bộ cộng dồn các thời gian phản hồi, nhưng gather xong trong thời gian của cái chậm nhất.
asyncio.gather(coro1, coro2, ...) khởi mọi coroutine được truyền vào cùng lúc và trả về một danh sách kết quả khi tất cả xong. Danh sách trả về theo thứ tự bạn truyền vào — không phải thứ tự hoàn thành — nên input và output luôn khớp.
import asyncio
async def fetch(name):
await asyncio.sleep(1) # Mô phỏng lệnh gọi API với chờ 1 giây
return f"{name} xong"
# Khởi 3 song song → tất cả kết quả trong 1 giây (so với 3 giây tuần tự)
results = await asyncio.gather(
fetch("A"),
fetch("B"),
fetch("C"),
)
print(results) # ['A xong', 'B xong', 'C xong'] ← thứ tự đầu vào
await của chúng đổi lượt.urls đầu vào bạn nhận một danh sách kết quả tại cùng index — dễ ánh xạ ngược về input sau.Điều gì xảy ra khi có ngoại lệ
Mặc định, nếu bất kỳ coroutine nào bên trong gather raise, toàn bộ lệnh gọi raise và hủy bỏ. Để thu thập ngoại lệ thay vào đó, hãy truyền asyncio.gather(..., return_exceptions=True) — các đối tượng ngoại lệ sẽ trở lại như là phần tử danh sách để bạn có thể kiểm tra type sau đó. Hữu ích khi bạn muốn gọi nhiều API và chấp nhận thất bại một phần.
create_task và wait_for — khởi ngay, await sau, có timeout
asyncio.create_task(coro) bọc một coroutine vào đối tượng "Task" và khởi nó ngay lập tức. Khác với "khởi tất cả và chờ tất cả" của gather, đây là mẫu async kinh điển "khởi ngay, làm việc khác, rồi await task để lấy kết quả sau".
asyncio.wait_for(awaitable, timeout=N) là lưới an toàn: "raise TimeoutError nếu không xong trong N giây". Tiêu chuẩn là kết hợp với Task như một cơ chế dự phòng khi Web API không phản hồi.
import asyncio
async def slow_api():
await asyncio.sleep(2)
return "phản hồi"
# Khởi với create_task (Task bắt đầu chạy ngay)
task = asyncio.create_task(slow_api())
# Việc khác có thể diễn ra trong lúc task chạy ở phía sau
print("task đã khởi, làm việc khác...")
# Lấy kết quả với await khi bạn cần
result = await task
print(result) # phản hồi
# wait_for thêm timeout (bỏ cuộc sau 1 giây)
try:
result = await asyncio.wait_for(slow_api(), timeout=1.0)
except asyncio.TimeoutError:
print("timeout!") # 2 giây phản hồi vs 1 giây giới hạn → vào đây
create_task tạo một Task pending; loop đưa nó sang running; cuối cùng nó đến done. Ba trạng thái này là dòng chảy cơ bản.Ba con đường đến "done" — return / exception / cancel
Có 3 con đường đến done: (1) hoàn thành bình thường — return đã tạo ra giá trị, (2) ngoại lệ — có gì đó raise bên trong, (3) cancel — task.cancel() đã ngắt nó. task.done() trả về True cho cả ba con đường, và task.exception() lấy ra ngoại lệ nếu có.
Các method hữu ích của đối tượng Task
Đối tượng Task được trả về bởi create_task hỗ trợ các thao tác hữu ích: task.cancel() để ngắt, task.done() để kiểm tra hoàn thành, task.result() để lấy kết quả của một Task đã xong (raise nếu chưa xong), và task.exception() để lấy ngoại lệ đã raise. Tiện cho kiểm soát công việc nền chạy lâu.
asyncio.Queue — producer / consumer
"Tôi muốn một coroutine đẩy giá trị vào và một cái khác tiêu thụ chúng" — một mẫu thường gặp trong scraping, xử lý job, xử lý stream, và các tình huống khác nơi hai vòng lặp với tốc độ khác nhau cần ăn khớp.
asyncio.Queue là một queue async để chuyển giá trị giữa các coroutine (một FIFO = First In First Out — giá trị ra theo thứ tự bạn đẩy vào). Dùng await queue.put(value) để chèn và await queue.get() để lấy — và khi queue rỗng / đầy, nó tự động chuyển sang tác vụ khác và chờ, giữ mọi thứ đơn giản.
import asyncio
queue = asyncio.Queue()
# Đẩy giá trị vào
await queue.put("item-1")
await queue.put("item-2")
# Lấy chúng ra (FIFO = vào trước, ra trước)
print(await queue.get()) # item-1
print(await queue.get()) # item-2
# get() trên một queue rỗng chuyển sang tác vụ khác và chờ một giá trị
# print(await queue.get()) # ← tạm dừng ở đây cho đến khi ai đó put
await queue.put(...) để chèn; consumer làm await queue.get() để lấy. FIFO (vào trước, ra trước) giữ thứ tự, và put / get là async, nên chúng tự động chuyển sang tác vụ khác khi queue rỗng / đầy.Dừng sạch với một sentinel
Ở phía consumer, while True: item = await queue.get() chờ mãi cho đến khi có gì đó đến. Mẫu là cho producer đẩy một marker kết thúc ở cuối (thường là None, hoặc một sentinel tùy chỉnh = một đối tượng bảo vệ chuyên dụng bạn có thể phân biệt với dữ liệu thật). Consumer thoát khỏi vòng lặp ngay khi thấy marker.
Chạy producer / consumer đồng thời với gather
Queue thực sự đáng giá khi nhiều coroutine chuyển giá trị qua lại. Tách "đẩy" và "tiêu thụ" thành các coroutine riêng và chạy chúng đồng thời với gather — await put / await get đóng vai trò là điểm chuyển đổi, và hai nửa ăn khớp tự nhiên.
import asyncio
async def producer(queue):
for i in range(3):
await queue.put(f"item-{i}")
await queue.put(None) # marker kết thúc
async def consumer(queue):
while True:
item = await queue.get()
if item is None: # break trên marker kết thúc
break
print(f"đang xử lý: {item}")
queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
# Output:
# đang xử lý: item-0
# đang xử lý: item-1
# đang xử lý: item-2
put, consumer đang chờ trên get được mở khóa và nhận giá trị. put(None) cuối cùng là marker kết thúc để consumer có thể thoát ra sạch.Kiểm tra kiến thức
Hãy trả lời từng câu hỏi một.
Câu 2asyncio.create_task(coroutine) trả về cái gì?
Câu 3Cách chuẩn để dừng consumer trong asyncio.Queue là gì?