Skip to content

Web Development · Python

Python asyncio in Production: The Pitfalls No One Warns You About

async/await syntax makes Python code look concurrent. The behavior when you get it wrong is harder to predict than most tutorials explain. Here's what breaks in production and how to fix it before it does.

Anurag Verma

Anurag Verma

8 min read

Python asyncio in Production: The Pitfalls No One Warns You About

Sponsored

Share

The async/await syntax in Python looks like a direct upgrade over synchronous code. Mark a function async def, await the slow calls, and you get concurrency for free. That’s the mental model tutorials teach, and it’s mostly right for the happy path.

Where it breaks down is in the details: blocking calls that slip through, event loop management in frameworks that have their own opinions, CPU-bound work that async doesn’t help with, and task cancellation behavior that surprises almost everyone the first time they hit it.

None of these are reasons to avoid asyncio. They’re reasons to understand it before writing production code with it.

The Blocking Call Problem

The most common asyncio mistake is calling a synchronous blocking function inside an async context without wrapping it:

import asyncio
import time

async def fetch_data():
    time.sleep(2)  # This blocks the entire event loop for 2 seconds
    return "data"

async def main():
    results = await asyncio.gather(
        fetch_data(),
        fetch_data(),
        fetch_data(),
    )
    # These run sequentially, not concurrently — takes 6 seconds

time.sleep(2) gives control back to nothing. It holds the thread, which means it holds the event loop. Your three “concurrent” coroutines run one after another.

The fix is either to use the async version (asyncio.sleep) or to run the blocking call in a thread pool:

import asyncio

async def fetch_data_blocking():
    loop = asyncio.get_event_loop()
    # Run blocking code in a thread pool, freeing the event loop
    result = await loop.run_in_executor(None, time.sleep, 2)
    return result

The tricky part is that blocking calls aren’t always obvious. Database drivers, file I/O, image processing, subprocess calls — all blocking unless you use async-specific variants or executors. A list of what’s safe:

  • httpx.AsyncClient is async. requests is not.
  • asyncpg and aiosqlite are async. psycopg2 is not (use psycopg3 with async support).
  • aiofiles is async. open() and standard pathlib operations are not.
  • asyncio.create_subprocess_exec is async. subprocess.run is not.

If you’re using a library that doesn’t have an async variant, run_in_executor is the right tool. If the blocking work is CPU-intensive, use ProcessPoolExecutor instead of the default ThreadPoolExecutor, because threads share a GIL.

CPU-Bound Work Is Worse Than You Think

Asyncio is concurrency via cooperative multitasking. Coroutines yield control to the event loop at await points. A CPU-bound function that never hits an await blocks the event loop for its entire duration, just like a blocking I/O call.

This is a common mistake when building AI or data-processing APIs with FastAPI:

import asyncio
from fastapi import FastAPI

app = FastAPI()

def heavy_cpu_work(data: list[float]) -> float:
    # Runs for 500ms of CPU time
    return sum(x**2 for x in data) / len(data)

@app.get("/compute")
async def compute(n: int = 1000000):
    data = list(range(n))
    # This blocks the event loop for 500ms — all other requests wait
    result = heavy_cpu_work(data)
    return {"result": result}

Under load, this kills throughput. Every 500ms of CPU work in a request blocks every other request from making progress.

The correct pattern for CPU-bound work in an async API:

import asyncio
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI

app = FastAPI()
executor = ProcessPoolExecutor(max_workers=4)

def heavy_cpu_work(data: list[float]) -> float:
    return sum(x**2 for x in data) / len(data)

@app.get("/compute")
async def compute(n: int = 1000000):
    data = list(range(n))
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(executor, heavy_cpu_work, data)
    return {"result": result}

ProcessPoolExecutor gets around the GIL by using separate processes. The tradeoff: process startup is expensive, data has to be serialized to move between processes, and shared state becomes a problem. For heavy compute, this is usually worth it. For lightweight work, the overhead costs more than the parallelism saves.

Task Cancellation Is Not Instant

When you cancel an asyncio task, the cancellation doesn’t happen immediately:

async def long_running():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        # This runs when the task is cancelled
        print("Cancelled! Cleaning up...")
        await asyncio.sleep(1)  # Cleanup work
        raise  # Must re-raise CancelledError

async def main():
    task = asyncio.create_task(long_running())
    await asyncio.sleep(0.5)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Task is done being cancelled")

A CancelledError is injected at the next await point. If a coroutine catches it without re-raising, the task continues running. This is almost always wrong. The pattern for cleanup is: catch CancelledError, do cleanup, then re-raise.

In web frameworks, this matters for request cancellation. When a client disconnects, the framework cancels the request handler coroutine. If your handler has started a database transaction or is mid-way through a file write, you need cleanup logic or you’ll leave things in an inconsistent state.

FastAPI and Starlette propagate cancellation correctly by default. What they can’t protect you from is cleanup logic in your code that swallows CancelledError.

Sharing State Between Coroutines

Asyncio is single-threaded, which means two coroutines cannot run simultaneously. This makes many concurrency bugs impossible: you don’t need locks for most shared state because coroutines are interleaved, not truly parallel.

The exception is code that yields control in the middle of a multi-step operation:

shared_counter = 0

async def increment():
    global shared_counter
    value = shared_counter  # Read
    await asyncio.sleep(0)  # Yield — another coroutine can run here
    shared_counter = value + 1  # Write (may be based on stale value)

async def main():
    await asyncio.gather(*[increment() for _ in range(100)])
    print(shared_counter)  # Probably not 100

The yield between read and write creates a window where another coroutine modifies the same value. If you have a multi-step operation that must be atomic, don’t put an await in the middle of it, or use asyncio.Lock:

counter_lock = asyncio.Lock()
shared_counter = 0

async def safe_increment():
    global shared_counter
    async with counter_lock:
        value = shared_counter
        await asyncio.sleep(0)  # Still safe — lock is held
        shared_counter = value + 1

Running Sync Code Alongside Async Code

One practical problem for teams migrating codebases is calling async code from synchronous contexts. You can’t await from a non-async function, and you can’t call asyncio.run() when an event loop is already running:

# This fails if called from inside an async context
def sync_function_that_needs_async():
    result = asyncio.run(some_async_function())  # RuntimeError if loop is running
    return result

The cleanest solution in frameworks like FastAPI or Django (with async views) is to make the calling function async too. When that’s not possible (legacy code, external libraries), use asyncio.get_event_loop().run_until_complete() carefully, or use a library like anyio that handles the interop more gracefully.

For testing, pytest-asyncio handles the event loop management for async test functions:

import pytest

@pytest.mark.asyncio
async def test_fetch():
    result = await fetch_data()
    assert result is not None

Without pytest-asyncio (or similar), you end up wrapping every test in asyncio.run(), which works but is verbose.

Timeouts and Graceful Degradation

Async code makes it easy to fire off multiple I/O calls. It makes it less obvious that you need timeouts on all of them:

async def fragile_fetch():
    async with httpx.AsyncClient() as client:
        # No timeout — will hang indefinitely if server is slow or down
        response = await client.get("https://api.example.com/data")
        return response.json()

async def robust_fetch():
    async with httpx.AsyncClient(timeout=10.0) as client:
        try:
            response = await client.get("https://api.example.com/data")
            return response.json()
        except httpx.TimeoutException:
            return None  # Or raise, depending on your error handling strategy

asyncio.wait_for gives you timeouts for any coroutine:

async def with_timeout():
    try:
        result = await asyncio.wait_for(some_coroutine(), timeout=5.0)
        return result
    except asyncio.TimeoutError:
        return default_value

In production, every external call should have an explicit timeout. The event loop will not time out for you.

The Structured Concurrency Gap

Python’s asyncio doesn’t have structured concurrency built in. This means tasks can outlive the scope that created them, and errors in spawned tasks can be silently swallowed if you’re not careful:

async def main():
    task = asyncio.create_task(background_work())
    # If background_work raises an exception and you don't await task,
    # the exception is logged as a warning but otherwise ignored
    await do_other_stuff()
    # task is still running here, possibly forever

The anyio library (which underlies modern async frameworks including FastAPI) provides a task group abstraction that fixes this:

import anyio

async def main():
    async with anyio.create_task_group() as tg:
        tg.start_soon(background_work)
        tg.start_soon(other_work)
    # Both tasks complete (or fail) before we leave this block
    # If either raises, the other is cancelled and the exception propagates

For new async code in 2026, anyio task groups or Python 3.11+‘s asyncio.TaskGroup are the right primitives. Bare asyncio.create_task without tracking the result leads to fire-and-forget tasks that you can’t reason about.

# Python 3.11+ TaskGroup
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coroutine())
        task2 = tg.create_task(another_coroutine())
    # Both done, exceptions handled cleanly

The practical upshot: if you’re seeing occasional unhandled exception warnings in production logs, you have untracked tasks. Find them and either await them properly or use task groups.

asyncio is a genuinely good concurrency model for I/O-heavy Python workloads. The pitfalls above aren’t reasons to avoid it — they’re the curriculum for using it correctly. Most production incidents involving asyncio code come down to one of these patterns.

Sponsored

Sponsored

Discussion

Join the conversation.

Comments are powered by GitHub Discussions. Sign in with your GitHub account to leave a comment.

Sponsored