Web Development · Python
Python asyncio in Production: The Pitfalls No One Warns You About
async/await syntax makes Python code look concurrent. The behavior when you get it wrong is harder to predict than most tutorials explain. Here's what breaks in production and how to fix it before it does.
Anurag Verma
8 min read
Sponsored
The async/await syntax in Python looks like a direct upgrade over synchronous code. Mark a function async def, await the slow calls, and you get concurrency for free. That’s the mental model tutorials teach, and it’s mostly right for the happy path.
Where it breaks down is in the details: blocking calls that slip through, event loop management in frameworks that have their own opinions, CPU-bound work that async doesn’t help with, and task cancellation behavior that surprises almost everyone the first time they hit it.
None of these are reasons to avoid asyncio. They’re reasons to understand it before writing production code with it.
The Blocking Call Problem
The most common asyncio mistake is calling a synchronous blocking function inside an async context without wrapping it:
import asyncio
import time
async def fetch_data():
time.sleep(2) # This blocks the entire event loop for 2 seconds
return "data"
async def main():
results = await asyncio.gather(
fetch_data(),
fetch_data(),
fetch_data(),
)
# These run sequentially, not concurrently — takes 6 seconds
time.sleep(2) gives control back to nothing. It holds the thread, which means it holds the event loop. Your three “concurrent” coroutines run one after another.
The fix is either to use the async version (asyncio.sleep) or to run the blocking call in a thread pool:
import asyncio
async def fetch_data_blocking():
loop = asyncio.get_event_loop()
# Run blocking code in a thread pool, freeing the event loop
result = await loop.run_in_executor(None, time.sleep, 2)
return result
The tricky part is that blocking calls aren’t always obvious. Database drivers, file I/O, image processing, subprocess calls — all blocking unless you use async-specific variants or executors. A list of what’s safe:
httpx.AsyncClientis async.requestsis not.asyncpgandaiosqliteare async.psycopg2is not (usepsycopg3with async support).aiofilesis async.open()and standardpathliboperations are not.asyncio.create_subprocess_execis async.subprocess.runis not.
If you’re using a library that doesn’t have an async variant, run_in_executor is the right tool. If the blocking work is CPU-intensive, use ProcessPoolExecutor instead of the default ThreadPoolExecutor, because threads share a GIL.
CPU-Bound Work Is Worse Than You Think
Asyncio is concurrency via cooperative multitasking. Coroutines yield control to the event loop at await points. A CPU-bound function that never hits an await blocks the event loop for its entire duration, just like a blocking I/O call.
This is a common mistake when building AI or data-processing APIs with FastAPI:
import asyncio
from fastapi import FastAPI
app = FastAPI()
def heavy_cpu_work(data: list[float]) -> float:
# Runs for 500ms of CPU time
return sum(x**2 for x in data) / len(data)
@app.get("/compute")
async def compute(n: int = 1000000):
data = list(range(n))
# This blocks the event loop for 500ms — all other requests wait
result = heavy_cpu_work(data)
return {"result": result}
Under load, this kills throughput. Every 500ms of CPU work in a request blocks every other request from making progress.
The correct pattern for CPU-bound work in an async API:
import asyncio
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI
app = FastAPI()
executor = ProcessPoolExecutor(max_workers=4)
def heavy_cpu_work(data: list[float]) -> float:
return sum(x**2 for x in data) / len(data)
@app.get("/compute")
async def compute(n: int = 1000000):
data = list(range(n))
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, heavy_cpu_work, data)
return {"result": result}
ProcessPoolExecutor gets around the GIL by using separate processes. The tradeoff: process startup is expensive, data has to be serialized to move between processes, and shared state becomes a problem. For heavy compute, this is usually worth it. For lightweight work, the overhead costs more than the parallelism saves.
Task Cancellation Is Not Instant
When you cancel an asyncio task, the cancellation doesn’t happen immediately:
async def long_running():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
# This runs when the task is cancelled
print("Cancelled! Cleaning up...")
await asyncio.sleep(1) # Cleanup work
raise # Must re-raise CancelledError
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task is done being cancelled")
A CancelledError is injected at the next await point. If a coroutine catches it without re-raising, the task continues running. This is almost always wrong. The pattern for cleanup is: catch CancelledError, do cleanup, then re-raise.
In web frameworks, this matters for request cancellation. When a client disconnects, the framework cancels the request handler coroutine. If your handler has started a database transaction or is mid-way through a file write, you need cleanup logic or you’ll leave things in an inconsistent state.
FastAPI and Starlette propagate cancellation correctly by default. What they can’t protect you from is cleanup logic in your code that swallows CancelledError.
Sharing State Between Coroutines
Asyncio is single-threaded, which means two coroutines cannot run simultaneously. This makes many concurrency bugs impossible: you don’t need locks for most shared state because coroutines are interleaved, not truly parallel.
The exception is code that yields control in the middle of a multi-step operation:
shared_counter = 0
async def increment():
global shared_counter
value = shared_counter # Read
await asyncio.sleep(0) # Yield — another coroutine can run here
shared_counter = value + 1 # Write (may be based on stale value)
async def main():
await asyncio.gather(*[increment() for _ in range(100)])
print(shared_counter) # Probably not 100
The yield between read and write creates a window where another coroutine modifies the same value. If you have a multi-step operation that must be atomic, don’t put an await in the middle of it, or use asyncio.Lock:
counter_lock = asyncio.Lock()
shared_counter = 0
async def safe_increment():
global shared_counter
async with counter_lock:
value = shared_counter
await asyncio.sleep(0) # Still safe — lock is held
shared_counter = value + 1
Running Sync Code Alongside Async Code
One practical problem for teams migrating codebases is calling async code from synchronous contexts. You can’t await from a non-async function, and you can’t call asyncio.run() when an event loop is already running:
# This fails if called from inside an async context
def sync_function_that_needs_async():
result = asyncio.run(some_async_function()) # RuntimeError if loop is running
return result
The cleanest solution in frameworks like FastAPI or Django (with async views) is to make the calling function async too. When that’s not possible (legacy code, external libraries), use asyncio.get_event_loop().run_until_complete() carefully, or use a library like anyio that handles the interop more gracefully.
For testing, pytest-asyncio handles the event loop management for async test functions:
import pytest
@pytest.mark.asyncio
async def test_fetch():
result = await fetch_data()
assert result is not None
Without pytest-asyncio (or similar), you end up wrapping every test in asyncio.run(), which works but is verbose.
Timeouts and Graceful Degradation
Async code makes it easy to fire off multiple I/O calls. It makes it less obvious that you need timeouts on all of them:
async def fragile_fetch():
async with httpx.AsyncClient() as client:
# No timeout — will hang indefinitely if server is slow or down
response = await client.get("https://api.example.com/data")
return response.json()
async def robust_fetch():
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.get("https://api.example.com/data")
return response.json()
except httpx.TimeoutException:
return None # Or raise, depending on your error handling strategy
asyncio.wait_for gives you timeouts for any coroutine:
async def with_timeout():
try:
result = await asyncio.wait_for(some_coroutine(), timeout=5.0)
return result
except asyncio.TimeoutError:
return default_value
In production, every external call should have an explicit timeout. The event loop will not time out for you.
The Structured Concurrency Gap
Python’s asyncio doesn’t have structured concurrency built in. This means tasks can outlive the scope that created them, and errors in spawned tasks can be silently swallowed if you’re not careful:
async def main():
task = asyncio.create_task(background_work())
# If background_work raises an exception and you don't await task,
# the exception is logged as a warning but otherwise ignored
await do_other_stuff()
# task is still running here, possibly forever
The anyio library (which underlies modern async frameworks including FastAPI) provides a task group abstraction that fixes this:
import anyio
async def main():
async with anyio.create_task_group() as tg:
tg.start_soon(background_work)
tg.start_soon(other_work)
# Both tasks complete (or fail) before we leave this block
# If either raises, the other is cancelled and the exception propagates
For new async code in 2026, anyio task groups or Python 3.11+‘s asyncio.TaskGroup are the right primitives. Bare asyncio.create_task without tracking the result leads to fire-and-forget tasks that you can’t reason about.
# Python 3.11+ TaskGroup
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coroutine())
task2 = tg.create_task(another_coroutine())
# Both done, exceptions handled cleanly
The practical upshot: if you’re seeing occasional unhandled exception warnings in production logs, you have untracked tasks. Find them and either await them properly or use task groups.
asyncio is a genuinely good concurrency model for I/O-heavy Python workloads. The pitfalls above aren’t reasons to avoid it — they’re the curriculum for using it correctly. Most production incidents involving asyncio code come down to one of these patterns.
Sponsored
More from this category
More from Web Development
Sponsored
Discussion
Join the conversation.
Comments are powered by GitHub Discussions. Sign in with your GitHub account to leave a comment.
Sponsored