Belajar dengan membaca secara berurutan

Task asyncio — Eksekusi Konkuren dengan gather, Task, dan Queue

Pelajari eksekusi paralel asyncio.gather plus urutan input, pola fire-and-await create_task, timeout wait_for, dan producer / consumer asyncio.Queue lewat contoh.

Berdasarkan async def dan await, artikel ini membahas cara menjalankan beberapa coroutine secara konkuren. asyncio.gather untuk "jalankan semua dan tunggu semua", asyncio.create_task untuk "jalankan sekarang, tunggu nanti", dan asyncio.Queue untuk producer / consumer — ketiganya mencakup hampir semua pola async yang akan kamu tulis di proyek nyata.

Tentang menjalankan kode di sini

async / await berfokus pada timing, tetapi runner di situs ini menyangga output print dan menampilkannya sekaligus setelah skrip selesai. Output real-time dan rasa waktu yang berlalu tidak akan sama dengan environment Python asli. Artikel ini menggunakan diagram untuk memperjelas perilaku internalnya, tetapi jika kamu ingin melihat aliran print secara langsung atau merasakan timing yang sebenarnya, jalankan di environment Python lokalmu dengan asyncio.run.

3 API utama asyncio
asyncio.gatherjalankan semua → tunggu semuacreate_task / wait_forjalankan sekarang, tunggu nantiasyncio.Queueteruskan nilai antar coroutine
gather = "jalankan semua dan tunggu semua". create_task / wait_for = "jalankan sekarang, tunggu nanti, dengan timeout opsional". Queue = "buffer FIFO untuk meneruskan nilai antar coroutine". Setiap bab artikel ini membahas satu per satu.

asyncio.gather — jalankan beberapa coroutine secara konkuren

"Memicu beberapa API secara konkuren dan baru lanjut setelah semua respons masuk" — kasus pemakaian async standar. Kode sinkron menjumlahkan waktu respons, tetapi gather selesai dalam waktu yang tunggal terlama.

await sekuensial vs gather — selisih waktu
A (1 detik)B (1 detik)C (1 detik)→ sekuensial = 3 detikgather:A, B, C mulai sekaligusSemua menunggu 1 detiksecara konkurenSemua selesai→ gather = 1 detik
await sekuensial memulai panggilan berikutnya hanya setelah yang sebelumnya selesai, jadi total = jumlah dari setiap task. gather memulai semuanya sekaligus dan menunggu secara konkuren, jadi total = yang paling lambat. Tiga penantian 1 detik berubah dari 3 detik → 1 detik.

asyncio.gather(coro1, coro2, ...) memulai setiap coroutine yang diteruskan sekaligus dan mengembalikan list hasil setelah semuanya selesai. List-nya kembali dalam urutan kamu meneruskannya — bukan urutan penyelesaian — sehingga input dan output tetap selaras.

import asyncio

async def fetch(name):
    await asyncio.sleep(1)        # Simulasi panggilan API dengan penantian 1 detik
    return f"{name} done"

# Picu 3 secara paralel → semua hasil dalam 1 detik (vs 3 detik sekuensial)
results = await asyncio.gather(
    fetch("A"),
    fetch("B"),
    fetch("C"),
)
print(results)                    # ['A done', 'B done', 'C done']  ← urutan input
Cara kerja gather — mulai bersama → tunggu semua → kembali dalam urutan input
gather(A, B, C)A berjalanawait beralih keluarB berjalanbergiliran → konkurenC berjalan→ semua selesai = list hasil
Tiga coroutine mulai sekaligus, menunggu sampai semua selesai, dan hasilnya kembali dalam urutan input sebagai list. Mereka maju secara konkuren saat titik await mereka bergiliran.
gather mengembalikan urutan input, bukan urutan penyelesaian
Input:gather(A, B, C)Berjalan:urutan selesai bisaB → A → CMengembalikan:[A_result, B_result, C_result]Urutan penyelesaiantidak dijaminUrutan inputtetap utuh
Urutan penyelesaian internal bisa berubah-ubah, tetapi list hasil mempertahankan urutan kamu meneruskannya. Untuk list urls sebagai input, kamu mendapatkan list hasil di indeks yang sama — mudah dipetakan kembali ke input nantinya.

Apa yang terjadi saat exception

Secara default, jika ada coroutine di dalam gather yang melempar exception, seluruh panggilan akan melempar dan dibatalkan. Untuk mengumpulkan exception sebagai gantinya, teruskan asyncio.gather(..., return_exceptions=True)objek exception kemudian akan masuk sebagai elemen list sehingga kamu bisa memeriksa tipenya setelah fakta. Berguna saat kamu ingin memanggil beberapa API dan mentolerir kegagalan parsial.

Jalankan 3 coroutine secara konkuren dengan asyncio.gather dan konfirmasi bahwa urutan penyelesaian bisa berbeda tetapi nilai kembalian tetap dalam urutan input. Setiap task menunggu dengan jumlah waktu berbeda agar kamu bisa menyaksikan urutan penyelesaian (B → A → C) vs urutan kembalian ([A, B, C]) menyimpang.

① Tambahkan import asyncio.

② Definisikan async def task(name, secs):await asyncio.sleep(secs) lalu return f"{name} done".

③ Jalankan dengan await asyncio.gather(task("A", 0.3), task("B", 0.1), task("C", 0.5)) dan simpan di results.

④ Print print("results:", results).

⑤ Print print("count:", len(results)).

(Jalankan dengan sukses dan penjelasan akan muncul.)

Python Editor

Jalankan kode untuk melihat output

create_task dan wait_for — jalankan sekarang, tunggu nanti, dengan timeout

asyncio.create_task(coro) membungkus coroutine dalam objek "Task" dan langsung memulainya. Berbeda dari "jalankan semua dan tunggu semua" milik gather, ini adalah pola async klasik "jalankan sekarang, kerjakan hal lain, lalu await task untuk mengumpulkan hasil nanti".

asyncio.wait_for(awaitable, timeout=N) adalah jaring pengaman: "lempar TimeoutError jika tidak selesai dalam N detik". Standarnya dikombinasikan dengan Task sebagai fail-safe saat Web API tidak merespons.

import asyncio

async def slow_api():
    await asyncio.sleep(2)
    return "response"

# Picu dengan create_task (Task langsung mulai berjalan)
task = asyncio.create_task(slow_api())

# Pekerjaan lain bisa terjadi selagi task berjalan di background
print("task fired, doing other work...")

# Kumpulkan hasil dengan await saat dibutuhkan
result = await task
print(result)                       # response

# wait_for menambahkan timeout (menyerah setelah 1 detik)
try:
    result = await asyncio.wait_for(slow_api(), timeout=1.0)
except asyncio.TimeoutError:
    print("timeout!")               # respons 2 detik vs anggaran 1 detik → di sini
Lifecycle Task
pendingtepat setelah create_taskrunningloop sedang menjalankannyadoneselesai (return)
create_task membuat Task pending; loop memindahkannya ke running; akhirnya mencapai done. Tiga state ini adalah alur dasar.

Tiga cara mencapai "done" — return / exception / cancel

Ada 3 jalur menuju done: (1) penyelesaian normalreturn menghasilkan nilai, (2) exception — sesuatu dilempar di dalam, (3) canceltask.cancel() menginterupsinya. task.done() mengembalikan True untuk ketiga jalur, dan task.exception() mengekstrak exception jika ada.

create_task dan wait_for
asyncio.create_task( coroutine)Objek Task(berjalan di bg)await task→ resultasyncio.wait_for( task, timeout=N)≤ N detik → result> N detik→ TimeoutError
create_task membungkus coroutine dalam Task dan langsung memulainya. Task terus berjalan di background, dan await task mengumpulkan hasilnya. Tambahkan wait_for(task, timeout=N) sebagai jaring pengaman "menyerah setelah N detik".
gather vs create_task — kapan memakai yang mana
Ingin semua hasilsekaligusasyncio.gatherPicu, kerjakan hallain, await nantiasyncio.create_task
Pakai gather saat kamu tidak melanjutkan sampai semua hasil masuk. Pakai create_task saat kamu ingin memicunya, mengerjakan hal lain, lalu mengumpulkan nanti. Keduanya menjalankan secara konkuren — bagian itu sama.

Method objek Task yang berguna

Objek Task yang dikembalikan create_task mendukung operasi yang berguna: task.cancel() untuk menginterupsi, task.done() untuk memeriksa penyelesaian, task.result() untuk mengambil hasil Task yang selesai (melempar jika belum selesai), dan task.exception() untuk mengambil exception yang dilempar. Berguna untuk mengontrol pekerjaan background yang berjalan lama.

Picu dua Task dengan create_task, kerjakan hal lain di antaranya, lalu kumpulkan hasilnya.

① Tambahkan import asyncio.

② Definisikan async def task(name):await asyncio.sleep(0) untuk beralih keluar, lalu return f"{name} done".

③ Pakai asyncio.create_task(task("A")) dan asyncio.create_task(task("B")) untuk memicu 2 Task, simpan di t1 dan t2.

④ Selagi Task berjalan, print tasks fired.

⑤ Pakai await t1 dan await t2 untuk mengumpulkan setiap hasil dan print sebagai A: ◯ / B: ◯.

Python Editor

Jalankan kode untuk melihat output

Pakai asyncio.wait_for untuk mengatur timeout dan coba kasus dalam anggaran dan melebihi anggaran.

① Tambahkan import asyncio.

② Definisikan async def slow_task():await asyncio.sleep(0.5) untuk menunggu 0.5 detik, lalu return "response done".

await asyncio.wait_for(slow_task(), timeout=1.0)selesai dalam 1 detik, jadi sukses. Print sebagai success: ◯.

try: / except asyncio.TimeoutError: di sekitar await asyncio.wait_for(slow_task(), timeout=0.1)0.1 detik tidak cukup, jadi TimeoutError menyala. Tangkap dan print timeout!.

Python Editor

Jalankan kode untuk melihat output

asyncio.Queue — producer / consumer

"Saya ingin satu coroutine memasok nilai dan satu lagi mengonsumsinya" — pola yang sering muncul di scraping, pemrosesan job, penanganan stream, dan situasi lain di mana dua loop dengan kecepatan berbeda perlu cocok satu sama lain.

Producer → Queue → Consumer
Jatuhkan URLsatu per satuQueue(antrean tunggu)Tarik URL danambil setiap halamanJatuhkan jobsatu per satuQueue(antrean tunggu)Tarik job danproses berurutanJatuhkan data barusatu per satuQueue(antrean tunggu)Tarik data danjalankan analisisputgetputgetputget
Producer di sebelah kiri menjatuhkan nilai ke Queue; consumer di sebelah kanan menariknya keluar dan memprosesnya. Pola yang sama cocok untuk scraping halaman web, memproses job secara berurutan, menangani stream data masuk, dan banyak kasus lain.

asyncio.Queue adalah antrean async untuk meneruskan nilai antar coroutine (sebuah FIFO = First In First Out — nilai keluar dalam urutan kamu memasukkannya). Pakai await queue.put(value) untuk memasukkan dan await queue.get() untuk mengambil — dan saat antrean kosong / penuh, ia otomatis beralih ke task lain dan menunggu, menjaga semuanya tetap sederhana.

import asyncio

queue = asyncio.Queue()

# Masukkan nilai
await queue.put("item-1")
await queue.put("item-2")

# Keluarkan (FIFO = first in, first out)
print(await queue.get())            # item-1
print(await queue.get())            # item-2

# get() pada antrean kosong beralih ke task lain dan menunggu nilai
# print(await queue.get())          # ← berhenti di sini sampai ada yang put
producer / consumer dengan Queue
producerawait queue.put(item)asyncio.Queuebuffer FIFOconsumerawait queue.get()putget
Producer melakukan await queue.put(...) untuk memasukkan; consumer melakukan await queue.get() untuk menariknya. FIFO (first in, first out) mempertahankan urutan, dan put / get adalah async, jadi mereka otomatis beralih ke task lain saat antrean kosong / penuh.
State Queue dan perilaku await
await get()→ tunggu nilaiQueue kosongawait put(item)→ langsung masukkanawait get()→ langsung tarikQueue normal(0 < count < maxsize)await put(item)→ langsung masukkanawait get()→ langsung tarikQueue penuh(hanya dengan maxsize)await put(item)→ tunggu ruang
Kolom tengah menunjukkan state Queue, dan kolom kiri/kanan menunjukkan apa yang dilakukan await get() dan await put(). Hijau = langsung berlanjut / Kuning = beralih ke task lain dan menunggu — diberi kode warna untuk kejelasan. Inilah yang memungkinkan penantian tanpa polling.

Berhenti dengan rapi memakai sentinel

Di sisi consumer, while True: item = await queue.get() menunggu selamanya sampai ada sesuatu yang datang. Polanya adalah dengan producer mendorong penanda terminasi di akhir (biasanya None, atau sentinel kustom = objek penjaga khusus yang bisa kamu bedakan dari data nyata). Consumer keluar dari loop saat melihat penandanya.

Coba put dan get di dalam satu coroutine untuk mengonfirmasi dasar Queue dan FIFO (first in, first out).

① Tambahkan import asyncio dan buat asyncio.Queue kosong.

await queue.put("a"), "b", "c" — masukkan 3 nilai berurutan.

③ Panggil await queue.get() 3 kali dan kumpulkan nilai dalam list, lalu print sebagai pulled: ◯.

Python Editor

Jalankan kode untuk melihat output

Jalankan producer / consumer secara konkuren dengan gather

Queue benar-benar terbayar saat beberapa coroutine saling meneruskan nilai. Pisahkan "memasok" dan "mengonsumsi" menjadi coroutine terpisah dan jalankan secara konkuren dengan gatherawait put / await get bertindak sebagai titik switching, dan kedua bagian itu bertemu secara alami.

import asyncio

async def producer(queue):
    for i in range(3):
        await queue.put(f"item-{i}")
    await queue.put(None)           # penanda terminasi

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:            # break pada penanda terminasi
            break
        print(f"processing: {item}")

queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
# Output:
# processing: item-0
# processing: item-1
# processing: item-2
Timeline producer / consumer
producer:put("item-0")put("item-1")put("item-2")put(None)penanda terminasiconsumer:get() → item-0get() → item-1get() → item-2get() → None→ break
Setiap kali producer melakukan put, consumer yang menunggu di get tidak terblokir dan menerima nilai. put(None) terakhir adalah penanda terminasi sehingga consumer bisa keluar dengan rapi.

Bangun setup di mana producer memasukkan 3 item dan consumer mengumpulkannya ke dalam list. Pakai None sebagai penanda terminasi.

① Tambahkan import asyncio, buat asyncio.Queue kosong, dan list kosong results.

② Definisikan async def producer(queue): — put f"item-{i}" 3 kali, lalu put None sebagai penanda terminasi.

③ Definisikan async def consumer(queue, results):while True: panggil get; jika None maka break, jika tidak append ke results.

④ Jalankan dengan asyncio.gather(producer(queue), consumer(queue, results)) dan print sebagai count: ◯ / first: ◯ / last: ◯.

Python Editor

Jalankan kode untuk melihat output
QUIZ

Cek Pemahaman

Jawab setiap pertanyaan satu per satu.

Soal 1Apa urutan nilai kembalian dari asyncio.gather(task("A"), task("B"), task("C"))?

Soal 2Apa yang dikembalikan asyncio.create_task(coroutine)?

Soal 3Apa cara standar untuk menghentikan consumer di asyncio.Queue?