All posts
Backend & APIs

Python Async Programming: asyncio, Concurrency Patterns, and Performance Tips

13 min
Share:
PythonAsyncioConcurrencyPerformanceasync/await

Master Python's async/await with practical asyncio patterns, understand when to use threads vs async, and build high-performance concurrent applications with real-world examples.

Python's asyncio enables writing concurrent code that handles thousands of I/O operations efficiently. This guide covers async/await fundamentals, when to use async vs threading, and patterns for building responsive applications.

Async Basics: Understanding the Event Loop

1import asyncio
2import time
3
4# ❌ Blocking (synchronous) code
5def fetch_data_sync(n):
6    """Simulates API call"""
7    time.sleep(2)  # Blocks entire program
8    return f"Data {n}"
9
10def main_sync():
11    start = time.time()
12    results = []
13    for i in range(5):
14        results.append(fetch_data_sync(i))
15    print(f"Total time: {time.time() - start:.2f}s")  # ~10 seconds
16
17# ✅ Non-blocking (asynchronous) code
18async def fetch_data_async(n):
19    """Async API call"""
20    await asyncio.sleep(2)  # Yields control to event loop
21    return f"Data {n}"
22
23async def main_async():
24    start = time.time()
25    # Run all coroutines concurrently
26    results = await asyncio.gather(
27        fetch_data_async(0),
28        fetch_data_async(1),
29        fetch_data_async(2),
30        fetch_data_async(3),
31        fetch_data_async(4)
32    )
33    print(f"Total time: {time.time() - start:.2f}s")  # ~2 seconds
34
35# Run async function
36asyncio.run(main_async())
37
38# Key concepts:
39# - Coroutine: async function that can be paused/resumed
40# - await: Pause execution until coroutine completes
41# - Event loop: Manages execution of coroutines
42# - asyncio.gather(): Run multiple coroutines concurrently

Async Patterns for Real Applications

1import aiohttp
2import asyncio
3
4# 1. Concurrent HTTP requests
5async def fetch_url(session, url):
6    async with session.get(url) as response:
7        return await response.text()
8
9async def fetch_multiple_urls(urls):
10    async with aiohttp.ClientSession() as session:
11        tasks = [fetch_url(session, url) for url in urls]
12        results = await asyncio.gather(*tasks)
13        return results
14
15# 2. Async context managers
16class AsyncDatabase:
17    async def __aenter__(self):
18        self.conn = await asyncpg.connect('postgresql://localhost')
19        return self.conn
20    
21    async def __aexit__(self, exc_type, exc_val, exc_tb):
22        await self.conn.close()
23
24async def query_database():
25    async with AsyncDatabase() as conn:
26        result = await conn.fetch('SELECT * FROM users')
27        return result
28
29# 3. Async generators for streaming
30async def fetch_logs_stream(url):
31    async with aiohttp.ClientSession() as session:
32        async with session.get(url) as response:
33            async for line in response.content:
34                yield line.decode('utf-8')
35
36async def process_logs():
37    async for log in fetch_logs_stream('http://api.example.com/logs'):
38        print(log)
39        if should_stop(log):
40            break
41
42# 4. Async task queue with semaphore
43async def worker(semaphore, queue, results):
44    async with semaphore:  # Limit concurrent workers
45        while True:
46            url = await queue.get()
47            if url is None:
48                break
49            result = await fetch_url(url)
50            results.append(result)
51            queue.task_done()
52
53async def process_urls_with_limit(urls, max_concurrent=10):
54    semaphore = asyncio.Semaphore(max_concurrent)
55    queue = asyncio.Queue()
56    results = []
57    
58    # Add URLs to queue
59    for url in urls:
60        await queue.put(url)
61    
62    # Create workers
63    workers = [
64        asyncio.create_task(worker(semaphore, queue, results))
65        for _ in range(max_concurrent)
66    ]
67    
68    await queue.join()  # Wait for all tasks
69    
70    # Stop workers
71    for _ in range(max_concurrent):
72        await queue.put(None)
73    
74    await asyncio.gather(*workers)
75    return results

When to Use Async vs Threading vs Multiprocessing

1import asyncio
2import threading
3import multiprocessing
4import time
5
6# Decision Matrix:
7
8# 1. I/O-Bound Tasks (API calls, DB queries, file I/O)
9# ✅ Use asyncio
10async def io_bound_async():
11    tasks = [asyncio.sleep(1) for _ in range(100)]
12    await asyncio.gather(*tasks)
13    # Completes in ~1 second, single thread
14
15# 2. I/O-Bound with blocking libraries
16# ✅ Use threading (when library doesn't support async)
17def io_bound_threading():
18    import requests
19    def fetch(url):
20        return requests.get(url).text
21    
22    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
23        urls = ['http://example.com'] * 100
24        results = list(executor.map(fetch, urls))
25
26# 3. CPU-Bound Tasks (data processing, calculations)
27# ✅ Use multiprocessing
28def cpu_bound_task(n):
29    return sum(i * i for i in range(n))
30
31def cpu_bound_multiprocessing():
32    with multiprocessing.Pool() as pool:
33        results = pool.map(cpu_bound_task, [10000000] * 4)
34    # Uses multiple CPU cores
35
36# ❌ Don't use asyncio for CPU-bound (GIL prevents parallelism)
37async def cpu_bound_async_bad():  # This won't help!
38    await asyncio.gather(*[
39        asyncio.to_thread(cpu_bound_task, 10000000)
40        for _ in range(4)
41    ])
42
43# Summary:
44# asyncio: I/O-bound, many connections, single core
45# threading: I/O-bound with blocking libs, shared state
46# multiprocessing: CPU-bound, need parallelism, no GIL

Error Handling and Timeouts

1import asyncio
2
3# 1. Timeouts
4async def fetch_with_timeout(url, timeout=5):
5    try:
6        async with asyncio.timeout(timeout):
7            # Operation must complete within 5 seconds
8            result = await fetch_data(url)
9            return result
10    except asyncio.TimeoutError:
11        print(f"Request to {url} timed out")
12        return None
13
14# 2. Handle individual task failures
15async def fetch_all_ignore_errors(urls):
16    tasks = [fetch_with_timeout(url) for url in urls]
17    results = await asyncio.gather(*tasks, return_exceptions=True)
18    
19    # Filter out errors
20    successful = [r for r in results if not isinstance(r, Exception)]
21    failed = [r for r in results if isinstance(r, Exception)]
22    
23    return successful, failed
24
25# 3. Retry with exponential backoff
26async def fetch_with_retry(url, max_retries=3):
27    for attempt in range(max_retries):
28        try:
29            return await fetch_data(url)
30        except aiohttp.ClientError as e:
31            if attempt == max_retries - 1:
32                raise
33            wait_time = 2 ** attempt  # 1s, 2s, 4s
34            print(f"Retry {attempt + 1} after {wait_time}s")
35            await asyncio.sleep(wait_time)
36
37# 4. Graceful shutdown
38async def main():
39    tasks = []
40    try:
41        # Create long-running tasks
42        tasks = [
43            asyncio.create_task(worker(i))
44            for i in range(10)
45        ]
46        await asyncio.gather(*tasks)
47    except KeyboardInterrupt:
48        print("Shutting down gracefully...")
49        # Cancel all tasks
50        for task in tasks:
51            task.cancel()
52        # Wait for cancellation
53        await asyncio.gather(*tasks, return_exceptions=True)
54
55# 5. Task groups (Python 3.11+)
56async def main_with_taskgroup():
57    async with asyncio.TaskGroup() as tg:
58        task1 = tg.create_task(fetch_data(1))
59        task2 = tg.create_task(fetch_data(2))
60    # If any task fails, all tasks are cancelled

Performance Tips

  • Use connection pooling with aiohttp.TCPConnector(limit=100)
  • Batch database queries with executemany() instead of individual queries
  • Profile with python -m cProfile or py-spy for bottlenecks
  • Use asyncio.create_task() to fire-and-forget background tasks
  • Avoid blocking calls - wrap with asyncio.to_thread() if needed
  • Set appropriate timeouts to prevent hanging forever
  • Use asyncio.Queue for producer-consumer patterns
  • Limit concurrent tasks with Semaphore to prevent resource exhaustion
  • Use asyncio.Event for coordination between coroutines
  • Consider uvloop for 2-4x faster event loop (drop-in replacement)