Aprende leyendo en orden

Tasks de asyncio — Ejecución concurrente con gather, Task y Queue

Aprende ejecución concurrente y orden de entrada de asyncio.gather, create_task para disparar y esperar después, wait_for con timeout, y productor / consumidor con asyncio.Queue.

Sobre la base de async def y await, este artículo cubre cómo ejecutar varias corrutinas en concurrencia. asyncio.gather para "dispara todo y espera todo", asyncio.create_task para "dispara ahora, espera después" y asyncio.Queue para productor / consumidor — estos tres cubren prácticamente cualquier patrón async que vayas a escribir en proyectos reales.

Sobre la ejecución de código aquí

async / await es todo cuestión de tiempos, pero el runner de este sitio acumula la salida de print y la muestra toda de golpe cuando termina el script. La salida en tiempo real y la sensación del tiempo transcurrido no coincidirán con un entorno Python real. El artículo usa diagramas para dejar claro el comportamiento interno, pero si quieres ver el flujo de print en vivo o sentir los tiempos reales, ejecútalo en tu entorno Python local con asyncio.run.

Las 3 APIs principales de asyncio
asyncio.gatherejecuta todo → espera todocreate_task / wait_fordispara ahora, espera despuésasyncio.Queuepasa valores entre tareas
gather = "dispara todo y espera todo". create_task / wait_for = "dispara ahora, espera después, con timeout opcional". Queue = "búfer FIFO para pasar valores entre corrutinas". Cada capítulo de este artículo cubre una por turno.

asyncio.gather — ejecutar varias corrutinas en concurrencia

"Dispara varias APIs en concurrencia y solo avanza cuando estén todas las respuestas" — un caso de uso clásico de async. El código síncrono suma los tiempos de respuesta, pero gather termina en el tiempo de la más lenta.

await secuencial vs gather — diferencia de tiempo
A (1 seg)B (1 seg)C (1 seg)→ secuencial = 3 seggather:A, B, C arrancan a la vezTodas esperan 1 segen concurrenciaTodas listas→ gather = 1 seg
El await secuencial arranca la siguiente llamada solo cuando la anterior termina, así que el total = la suma de todas las tareas. gather las arranca todas a la vez y espera en concurrencia, así que el total = la más lenta. Tres esperas de 1 segundo pasan de 3 seg → 1 seg.

asyncio.gather(coro1, coro2, ...) arranca a la vez todas las corrutinas que le pases y devuelve una lista de resultados cuando todas terminan. La lista regresa en el orden en que las pasaste — no en orden de finalización — así que la entrada y la salida quedan alineadas.

import asyncio

async def fetch(name):
    await asyncio.sleep(1)        # Simula una llamada API con espera de 1 seg
    return f"{name} done"

# Dispara 3 en paralelo → todos los resultados en 1 seg (vs 3 seg secuencial)
results = await asyncio.gather(
    fetch("A"),
    fetch("B"),
    fetch("C"),
)
print(results)                    # ['A done', 'B done', 'C done']  ← orden de entrada
Cómo funciona gather — arrancan juntas → esperar a todas → devolver en orden de entrada
gather(A, B, C)A corriendoawait cambia fueraB corriendoturnándose → concurrenteC corriendo→ todas listas = lista de resultados
Tres corrutinas arrancan a la vez, esperamos hasta que todas terminen y los resultados regresan como lista en orden de entrada. Avanzan en concurrencia mientras sus puntos await se turnan.
gather devuelve en orden de entrada, no de finalización
Entrada:gather(A, B, C)Ejecutando:orden de fin puede serB → A → CDevuelve:[A_result, B_result, C_result]El orden de finno está garantizadoEl orden de entradase conserva
El orden interno de finalización puede mezclarse, pero la lista de resultados mantiene el orden en que las pasaste. Para una lista de entrada urls obtienes una lista de resultados en los mismos índices — fácil de mapear de vuelta a la entrada después.

Qué pasa con las excepciones

Por defecto, si alguna corrutina dentro de gather lanza, toda la llamada lanza y aborta. Para recolectar las excepciones en su lugar, pasa asyncio.gather(..., return_exceptions=True)los objetos de excepción regresan entonces como elementos de la lista, así puedes comprobar los tipos a posteriori. Útil cuando quieres llamar a varias APIs y tolerar fallos parciales.

Ejecuta 3 corrutinas en concurrencia con asyncio.gather y confirma que el orden de finalización puede diferir pero el valor de retorno se mantiene en orden de entrada. Cada tarea espera una cantidad de tiempo distinta para que veas cómo el orden de finalización (B → A → C) y el orden de retorno ([A, B, C]) se separan.

① Añade import asyncio.

② Define async def task(name, secs):await asyncio.sleep(secs) y luego return f"{name} done".

③ Ejecuta con await asyncio.gather(task("A", 0.3), task("B", 0.1), task("C", 0.5)) y guarda en results.

④ Imprime print("results:", results).

⑤ Imprime print("count:", len(results)).

(Ejecuta con éxito y aparecerá la explicación.)

Editor Python

Ejecutar el código para ver el resultado

create_task y wait_for — dispara ahora, espera después, con timeout

asyncio.create_task(coro) envuelve una corrutina en un objeto "Task" y la arranca de inmediato. A diferencia del "dispara todo y espera todo" de gather, este es el patrón clásico async de "dispara ahora, haz otro trabajo y luego await task para recoger el resultado más tarde".

asyncio.wait_for(awaitable, timeout=N) es una red de seguridad: "lanza TimeoutError si no termina en N segundos". Es estándar combinarlo con Tasks como salvaguarda cuando una Web API no responde.

import asyncio

async def slow_api():
    await asyncio.sleep(2)
    return "response"

# Dispárala con create_task (la Task arranca corriendo de inmediato)
task = asyncio.create_task(slow_api())

# Mientras la Task corre en segundo plano, puede pasar otro trabajo
print("task fired, doing other work...")

# Recoge el resultado con await cuando lo necesites
result = await task
print(result)                       # response

# wait_for añade un timeout (rendirse tras 1 seg)
try:
    result = await asyncio.wait_for(slow_api(), timeout=1.0)
except asyncio.TimeoutError:
    print("timeout!")               # respuesta de 2 seg vs presupuesto de 1 seg → aquí
Ciclo de vida de una Task
pendingjusto tras create_taskrunningel bucle la está ejecutandodoneterminada (return)
create_task crea una Task en estado pending; el bucle la pasa a running; finalmente alcanza done. Estos tres estados son el flujo básico.

Tres caminos a "done" — return / excepción / cancel

Hay 3 caminos a done: (1) finalización normalreturn produjo un valor, (2) excepción — algo lanzó dentro, (3) canceltask.cancel() la interrumpió. task.done() devuelve True para los tres caminos, y task.exception() extrae la excepción si la hay.

create_task y wait_for
asyncio.create_task( coroutine)Objeto Task(corriendo en bg)await task→ resultadoasyncio.wait_for( task, timeout=N)≤ N seg → resultado> N seg→ TimeoutError
create_task envuelve una corrutina en una Task y la arranca de inmediato. La Task sigue corriendo en segundo plano, y await task recoge el resultado. Añade wait_for(task, timeout=N) como red de seguridad "rendirse tras N segundos".
gather vs create_task — cuándo usar cada uno
Quiero todos losresultados a la vezasyncio.gatherDisparar, hacer otrotrabajo, await despuésasyncio.create_task
Usa gather cuando no avances hasta tener todos los resultados. Usa create_task cuando quieras dispararla, hacer otro trabajo y luego recoger después. Ambos ejecutan cosas en concurrencia — eso es lo mismo.

Métodos útiles del objeto Task

El objeto Task que devuelve create_task admite operaciones útiles: task.cancel() para interrumpir, task.done() para comprobar finalización, task.result() para obtener el resultado de una Task terminada (lanza si no está done) y task.exception() para obtener cualquier excepción lanzada. Útil para controlar trabajo en segundo plano de larga duración.

Dispara dos Tasks con create_task, haz otro trabajo en medio y luego recoge sus resultados.

① Añade import asyncio.

② Define async def task(name):await asyncio.sleep(0) para cambiar fuera, y luego return f"{name} done".

③ Usa asyncio.create_task(task("A")) y asyncio.create_task(task("B")) para disparar 2 Tasks, guardándolas en t1 y t2.

④ Mientras corren las Tasks, imprime tasks fired.

⑤ Usa await t1 y await t2 para recoger cada resultado e imprime como A: ◯ / B: ◯.

Editor Python

Ejecutar el código para ver el resultado

Usa asyncio.wait_for para fijar un timeout y prueba los casos dentro del presupuesto y por encima del presupuesto.

① Añade import asyncio.

② Define async def slow_task():await asyncio.sleep(0.5) para esperar 0.5 seg, y luego return "response done".

await asyncio.wait_for(slow_task(), timeout=1.0)termina dentro de 1 seg, así que tiene éxito. Imprime como success: ◯.

try: / except asyncio.TimeoutError: alrededor de await asyncio.wait_for(slow_task(), timeout=0.1)0.1 seg no es suficiente, así que se dispara TimeoutError. Captúralo e imprime timeout!.

Editor Python

Ejecutar el código para ver el resultado

asyncio.Queue — productor / consumidor

"Quiero que una corrutina alimente valores y otra los consuma" — un patrón frecuente en scraping, procesamiento de jobs, manejo de streams y otras situaciones donde dos bucles a velocidades distintas necesitan engranar.

Productor → Queue → Consumidor
Depositar URLsuna a unaQueue(cola de espera)Sacar URLs ydescargar cada páginaDepositar jobsuno a unoQueue(cola de espera)Sacar jobs yprocesarlos en ordenDepositar datosfrescos uno a unoQueue(cola de espera)Sacar datos yejecutar análisisputgetputgetputget
El productor a la izquierda deposita valores en la Queue; el consumidor a la derecha los saca y los procesa. El mismo patrón encaja en scraping de páginas web, procesar jobs en orden, manejar streams de datos entrantes y muchos otros casos.

asyncio.Queue es una cola async para pasar valores entre corrutinas (un FIFO = First In First Out — los valores salen en el orden en que los pones). Usa await queue.put(value) para insertar y await queue.get() para sacar — y cuando la cola está vacía / llena, automáticamente cambia a otra tarea y espera, manteniendo todo simple.

import asyncio

queue = asyncio.Queue()

# Pone valores
await queue.put("item-1")
await queue.put("item-2")

# Los saca (FIFO = first in, first out)
print(await queue.get())            # item-1
print(await queue.get())            # item-2

# get() en una cola vacía cambia a otra tarea y espera por un valor
# print(await queue.get())          # ← se pausa aquí hasta que alguien haga put
productor / consumidor con una Queue
productorawait queue.put(item)asyncio.Queuebúfer FIFOconsumidorawait queue.get()putget
El productor hace await queue.put(...) para insertar; el consumidor hace await queue.get() para sacar. FIFO (first in, first out) preserva el orden, y put / get son async, así que cambian automáticamente a otras tareas cuando la cola está vacía / llena.
Estado de la Queue y comportamiento de await
await get()→ esperar valorQueue vacíaawait put(item)→ inserta de inmediatoawait get()→ saca de inmediatoQueue normal(0 < count < maxsize)await put(item)→ inserta de inmediatoawait get()→ saca de inmediatoQueue llena(solo con maxsize)await put(item)→ esperar espacio
La columna del centro muestra el estado de la Queue, y las columnas izquierda/derecha muestran qué hacen await get() y await put(). Verde = avanza de inmediato / Amarillo = cambia a otra tarea y espera — codificado por color para mayor claridad. Esto es lo que permite esperar sin polling.

Para detener limpiamente, usa un sentinel

Del lado del consumidor, while True: item = await queue.get() espera para siempre a que llegue algo. El patrón es que el productor empuje un marcador de terminación al final (típicamente None, o un sentinel = un objeto guardián dedicado que distingues de los datos reales). El consumidor sale del bucle en cuanto ve el marcador.

Prueba put y get dentro de una sola corrutina para confirmar los fundamentos de Queue y FIFO (first in, first out).

① Añade import asyncio y crea una asyncio.Queue vacía.

await queue.put("a"), "b", "c" — inserta 3 valores en orden.

③ Llama a await queue.get() 3 veces y reúne los valores en una lista, luego imprime como pulled: ◯.

Editor Python

Ejecutar el código para ver el resultado

Ejecutar productor / consumidor en concurrencia con gather

Queue da realmente sus frutos cuando varias corrutinas se pasan valores de un lado a otro. Separa "alimentar" y "consumir" en corrutinas distintas y ejecútalas en concurrencia con gatherawait put / await get actúan como puntos de cambio, y las dos mitades engranan de forma natural.

import asyncio

async def producer(queue):
    for i in range(3):
        await queue.put(f"item-{i}")
    await queue.put(None)           # marcador de terminación

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:            # rompe con el marcador de terminación
            break
        print(f"processing: {item}")

queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
# Salida:
# processing: item-0
# processing: item-1
# processing: item-2
Cronograma productor / consumidor
productor:put("item-0")put("item-1")put("item-2")put(None)marcador de terminaciónconsumidor:get() → item-0get() → item-1get() → item-2get() → None→ break
Cada vez que el productor hace un put, el consumidor que espera en get se desbloquea y recibe el valor. El último put(None) es el marcador de terminación para que el consumidor pueda salir limpiamente.

Construye una configuración en la que el productor pone 3 ítems y el consumidor los recolecta en una lista. Usa None como marcador de terminación.

① Añade import asyncio, crea una asyncio.Queue vacía y una lista vacía results.

② Define async def producer(queue): — pon f"item-{i}" 3 veces, y luego pon None como marcador de terminación.

③ Define async def consumer(queue, results):while True: llama a get; si None entonces break, si no añade a results.

④ Ejecuta con asyncio.gather(producer(queue), consumer(queue, results)) e imprime como count: ◯ / first: ◯ / last: ◯.

Editor Python

Ejecutar el código para ver el resultado
QUIZ

Verificación de conocimientos

Responde cada pregunta una a una.

Pregunta 1¿Cuál es el orden del valor de retorno de asyncio.gather(task("A"), task("B"), task("C"))?

Pregunta 2¿Qué devuelve asyncio.create_task(coroutine)?

Pregunta 3¿Cuál es la forma estándar de detener al consumidor en asyncio.Queue?