Back to Blog
PythonAsyncioConcurrency

Python Asyncio: The Zen of Not Waiting

  May 2025   14 min read   Sushil Kumar

Imagine you’re a chef. You put a pot of water on the stove to boil. Do you stand there staring at it for 10 minutes? Or do you chop vegetables, prep the sauce, and check back when it bubbles?

That choice — stand and wait vs. do something useful while waiting — is exactly what Python’s concurrency tools are about.

The Default: Python Does One Thing at a Time

Out of the box, Python runs your code sequentially in a single thread. Every statement finishes before the next one starts.

import time

def fetch_user():
    time.sleep(2)        # simulates a slow API call
    return "Alice"

def fetch_orders():
    time.sleep(2)        # another slow API call
    return ["order1", "order2"]

start = time.time()
user   = fetch_user()    # waits 2 seconds
orders = fetch_orders()  # waits another 2 seconds
print(f"Done in {time.time() - start:.1f}s")  # → 4.0s

Both calls are independent — neither needs the other to finish first. Yet we waited 4 seconds when 2 would have been enough. That’s the problem concurrency solves.

Three Tools, Three Philosophies

Python gives you three ways to do multiple things “at once”:

ToolMechanismBest forGIL?
threadingMultiple threads, one processI/O-bound tasksLimited by it
multiprocessingMultiple processesCPU-bound tasksBypasses it
asyncioSingle thread, cooperativeI/O-bound, high concurrencyNot relevant

Let’s understand each one properly.


Threads — Concurrent but Constrained

A thread is a lightweight unit of execution inside your process. All threads share the same memory, so they can read and write the same variables.

import threading
import time

results = {}

def fetch_user():
    time.sleep(2)
    results["user"] = "Alice"

def fetch_orders():
    time.sleep(2)
    results["orders"] = ["order1", "order2"]

start = time.time()
t1 = threading.Thread(target=fetch_user)
t2 = threading.Thread(target=fetch_orders)

t1.start()   # start both
t2.start()
t1.join()    # wait for both to finish
t2.join()

print(f"Done in {time.time() - start:.1f}s")  # → ~2.0s ✓
print(results)

Both calls run concurrently and we’re done in ~2 seconds instead of 4.

The GIL Problem

Here’s the catch. CPython has a Global Interpreter Lock (GIL) — a mutex that allows only one thread to execute Python bytecode at a time.

Think of it like a single microphone in a meeting room. Multiple people (threads) are present, but only one can speak (execute Python) at a time. While one person is waiting for an answer from another room (I/O), they put the mic down, and someone else can pick it up.

This means:

Rule of thumb: Use threads for I/O-bound work where tasks spend most of their time waiting — not for heavy computation.

Processes — True Parallelism

For CPU-heavy work (image processing, number crunching, ML inference), you need multiprocessing. Each process gets its own Python interpreter and its own GIL, so they run truly in parallel on separate CPU cores.

from multiprocessing import Process, Queue
import time

def crunch_numbers(result_queue, label):
    total = sum(range(10_000_000))   # CPU-intensive
    result_queue.put((label, total))

if __name__ == "__main__":
    q = Queue()
    start = time.time()

    p1 = Process(target=crunch_numbers, args=(q, "worker-1"))
    p2 = Process(target=crunch_numbers, args=(q, "worker-2"))
    p1.start(); p2.start()
    p1.join();  p2.join()

    print(f"Done in {time.time() - start:.1f}s")
    print(q.get(), q.get())

The tradeoff: processes are expensive to create (each spawns a new Python interpreter) and don’t share memory — you pass data via queues or pipes.


Asyncio — The Elegant Solution for I/O

asyncio takes a completely different approach. Instead of many threads or processes, it uses one thread and one event loop that switches between tasks extremely fast — giving the illusion of parallelism.

Back to the chef analogy:

The key insight: for I/O-bound work, your program spends most of its time waiting (for network, disk, database). That waiting time is wasted if you’re doing nothing. Asyncio fills that gap.


Coroutines — Functions That Can Pause

A coroutine is a special function that can pause itself mid-execution, let other code run, and later resume from exactly where it stopped.

You define one with async def:

import asyncio

async def make_coffee():
    print("Starting coffee...")
    await asyncio.sleep(3)   # pause here for 3 seconds (simulates waiting)
    print("Coffee ready!")
    return "☕"
Important: Calling make_coffee() does NOT run it. It returns a coroutine object — a recipe, not a result. You must await it or schedule it as a task to actually execute it.
# Wrong — this does nothing:
result = make_coffee()   # just creates the object, prints nothing

# Right — this runs it:
result = await make_coffee()          # inside another async function
asyncio.run(make_coffee())            # at the top level

The await Keyword — Yielding Control

await does two things simultaneously:

  1. Suspends the current coroutine until the awaited operation finishes
  2. Hands control back to the event loop so other coroutines can run

This is the crucial difference from a regular time.sleep():

# Blocking — freezes everything for 3 seconds:
import time
time.sleep(3)

# Non-blocking — pauses THIS coroutine, lets others run:
await asyncio.sleep(3)

Think of await as saying: “I’ll be busy for a bit — event loop, go do something else, and come back to me when I’m ready.”


The Event Loop — The Traffic Controller

The event loop is the heart of asyncio. It’s a scheduler that:

  1. Maintains a queue of ready coroutines
  2. Picks the next ready one
  3. Runs it until it hits an await
  4. While it’s waiting, runs another coroutine
  5. When the wait completes, puts it back in the ready queue

Here’s what actually happens step by step when you run two coroutines concurrently:

Event Loop Timeline:
─────────────────────────────────────────────────
t=0.0s   Loop picks Task A → runs → hits `await sleep(2)` → pauses A
t=0.0s   Loop picks Task B → runs → hits `await sleep(2)` → pauses B
t=2.0s   Both sleeps done → resumes A → resumes B
t=2.0s   Both finish — total time: ~2 seconds (not 4!)
─────────────────────────────────────────────────

You start the event loop with asyncio.run():

async def main():
    await some_coroutine()

asyncio.run(main())   # creates loop → runs main → closes loop

asyncio.gather() — Running Many at Once

asyncio.gather() is how you run multiple coroutines concurrently and collect all their results:

import asyncio
import time

async def fetch_data(name, delay):
    print(f"  → Fetching {name}...")
    await asyncio.sleep(delay)       # simulate network call
    print(f"  ✓ {name} done")
    return f"data from {name}"

async def main():
    start = time.time()

    results = await asyncio.gather(
        fetch_data("users",   2),
        fetch_data("orders",  2),
        fetch_data("products",2),
    )

    elapsed = time.time() - start
    print(f"\nAll done in {elapsed:.1f}s")   # → ~2.0s, not 6.0s!
    print(results)

asyncio.run(main())

Visual execution timeline:

Sequential (6s total):
users:    ████████ (2s)
orders:             ████████ (2s)
products:                     ████████ (2s)
          0    1    2    3    4    5    6

asyncio.gather (~2s total):
users:    ████████ (2s)
orders:   ████████ (2s)
products: ████████ (2s)
          0    1    2

All three run during the same 2-second window.

Error handling: By default, if one coroutine in gather() raises an exception, the others are cancelled. Pass return_exceptions=True to collect errors as values instead of stopping everything.

Common Pitfall 1: Forgetting await

This is the most common mistake. It’s silent — no error, the coroutine just never runs:

async def send_email():
    await asyncio.sleep(1)
    print("Email sent!")

async def main():
    send_email()   # ❌ creates coroutine object, does nothing
    await send_email()              # ✅ runs it
    asyncio.create_task(send_email())  # ✅ schedules it as a background task

Python 3.8+ will emit a RuntimeWarning: coroutine 'send_email' was never awaited — but only as a warning, not an error. Always watch for it.


Common Pitfall 2: Blocking the Event Loop

Asyncio uses a single thread. If you call a regular blocking function inside a coroutine, you freeze the entire event loop — nothing else can run:

import time, asyncio, requests

async def bad_fetch(url):
    response = requests.get(url)    # ❌ BLOCKS the whole event loop!
    return response.text

async def good_fetch(url):
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:   # ✅ non-blocking
            return await response.text()

The rule: inside an async def function, never call synchronous blocking code (time.sleep, requests.get, file reads with plain open). Use their async equivalents instead.

If you absolutely must call blocking code (e.g., a legacy library), offload it to a thread:

import asyncio

def slow_legacy_function():
    import time
    time.sleep(2)
    return "done"

async def main():
    loop = asyncio.get_event_loop()
    # runs in a thread pool — won't block the event loop
    result = await loop.run_in_executor(None, slow_legacy_function)
    print(result)

When to Use What — Quick Guide

Is the bottleneck CPU computation (matrix math, compression, ML)?
  └── YES → multiprocessing
  └── NO (it's mostly waiting — network, DB, disk)
        ├── Need simple parallelism, small number of tasks?
        │     └── threading
        └── High concurrency (100s of connections), modern codebase?
              └── asyncio  ← usually the best choice for I/O today
ScenarioBest Tool
Scraping 1000 URLsasyncio + aiohttp
Resizing 500 imagesmultiprocessing
10 simultaneous DB queriesasyncio + async DB driver
Calling a blocking legacy librarythreading or run_in_executor
Simple scripts with a few parallel tasksthreading

Putting It All Together

Here’s a complete, real-world async script that fetches multiple API endpoints concurrently, handles errors gracefully, and shows timing:

import asyncio
import time

# Simulated async API calls
async def fetch(endpoint, delay, should_fail=False):
    await asyncio.sleep(delay)
    if should_fail:
        raise ValueError(f"{endpoint} returned 500")
    return {"endpoint": endpoint, "data": f"response from {endpoint}"}

async def main():
    start = time.time()

    tasks = [
        fetch("/users",    2),
        fetch("/orders",   1.5),
        fetch("/products", 1, should_fail=True),   # this one will fail
    ]

    # return_exceptions=True means errors become values, not crashes
    results = await asyncio.gather(*tasks, return_exceptions=True)

    for result in results:
        if isinstance(result, Exception):
            print(f"  ✗ Error: {result}")
        else:
            print(f"  ✓ {result['endpoint']}: {result['data']}")

    print(f"\nTotal time: {time.time() - start:.1f}s")   # ~2.0s

asyncio.run(main())

Output:

  ✓ /users: response from /users
  ✓ /orders: response from /orders
  ✗ Error: /products returned 500

Total time: 2.0s

Three “API calls”, one failure handled cleanly, all done in the time of the slowest single call.


Summary

Where to go next: Once you're comfortable with these basics, explore asyncio.create_task() for fire-and-forget background work, asyncio.Queue for producer-consumer patterns, and aiohttp / httpx for real async HTTP clients.
Previous Your AI Is Only as Smart as Your Prompt All Posts Next Getting Started with Amazon Connect