Backend & APIs
Python Async Programming: asyncio, Concurrency Patterns, and Performance Tips
13 min
PythonAsyncioConcurrencyPerformanceasync/await
Master Python's async/await with practical asyncio patterns, understand when to use threads vs async, and build high-performance concurrent applications with real-world examples.
Python's asyncio enables writing concurrent code that handles thousands of I/O operations efficiently. This guide covers async/await fundamentals, when to use async vs threading, and patterns for building responsive applications.
Async Basics: Understanding the Event Loop
1import asyncio
2import time
3
4# ❌ Blocking (synchronous) code
5def fetch_data_sync(n):
6 """Simulates API call"""
7 time.sleep(2) # Blocks entire program
8 return f"Data {n}"
9
10def main_sync():
11 start = time.time()
12 results = []
13 for i in range(5):
14 results.append(fetch_data_sync(i))
15 print(f"Total time: {time.time() - start:.2f}s") # ~10 seconds
16
17# ✅ Non-blocking (asynchronous) code
18async def fetch_data_async(n):
19 """Async API call"""
20 await asyncio.sleep(2) # Yields control to event loop
21 return f"Data {n}"
22
23async def main_async():
24 start = time.time()
25 # Run all coroutines concurrently
26 results = await asyncio.gather(
27 fetch_data_async(0),
28 fetch_data_async(1),
29 fetch_data_async(2),
30 fetch_data_async(3),
31 fetch_data_async(4)
32 )
33 print(f"Total time: {time.time() - start:.2f}s") # ~2 seconds
34
35# Run async function
36asyncio.run(main_async())
37
38# Key concepts:
39# - Coroutine: async function that can be paused/resumed
40# - await: Pause execution until coroutine completes
41# - Event loop: Manages execution of coroutines
42# - asyncio.gather(): Run multiple coroutines concurrently
Async Patterns for Real Applications
1import aiohttp
2import asyncio
3
4# 1. Concurrent HTTP requests
5async def fetch_url(session, url):
6 async with session.get(url) as response:
7 return await response.text()
8
9async def fetch_multiple_urls(urls):
10 async with aiohttp.ClientSession() as session:
11 tasks = [fetch_url(session, url) for url in urls]
12 results = await asyncio.gather(*tasks)
13 return results
14
15# 2. Async context managers
16class AsyncDatabase:
17 async def __aenter__(self):
18 self.conn = await asyncpg.connect('postgresql://localhost')
19 return self.conn
20
21 async def __aexit__(self, exc_type, exc_val, exc_tb):
22 await self.conn.close()
23
24async def query_database():
25 async with AsyncDatabase() as conn:
26 result = await conn.fetch('SELECT * FROM users')
27 return result
28
29# 3. Async generators for streaming
30async def fetch_logs_stream(url):
31 async with aiohttp.ClientSession() as session:
32 async with session.get(url) as response:
33 async for line in response.content:
34 yield line.decode('utf-8')
35
36async def process_logs():
37 async for log in fetch_logs_stream('http://api.example.com/logs'):
38 print(log)
39 if should_stop(log):
40 break
41
42# 4. Async task queue with semaphore
43async def worker(semaphore, queue, results):
44 async with semaphore: # Limit concurrent workers
45 while True:
46 url = await queue.get()
47 if url is None:
48 break
49 result = await fetch_url(url)
50 results.append(result)
51 queue.task_done()
52
53async def process_urls_with_limit(urls, max_concurrent=10):
54 semaphore = asyncio.Semaphore(max_concurrent)
55 queue = asyncio.Queue()
56 results = []
57
58 # Add URLs to queue
59 for url in urls:
60 await queue.put(url)
61
62 # Create workers
63 workers = [
64 asyncio.create_task(worker(semaphore, queue, results))
65 for _ in range(max_concurrent)
66 ]
67
68 await queue.join() # Wait for all tasks
69
70 # Stop workers
71 for _ in range(max_concurrent):
72 await queue.put(None)
73
74 await asyncio.gather(*workers)
75 return results
When to Use Async vs Threading vs Multiprocessing
1import asyncio
2import threading
3import multiprocessing
4import time
5
6# Decision Matrix:
7
8# 1. I/O-Bound Tasks (API calls, DB queries, file I/O)
9# ✅ Use asyncio
10async def io_bound_async():
11 tasks = [asyncio.sleep(1) for _ in range(100)]
12 await asyncio.gather(*tasks)
13 # Completes in ~1 second, single thread
14
15# 2. I/O-Bound with blocking libraries
16# ✅ Use threading (when library doesn't support async)
17def io_bound_threading():
18 import requests
19 def fetch(url):
20 return requests.get(url).text
21
22 with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
23 urls = ['http://example.com'] * 100
24 results = list(executor.map(fetch, urls))
25
26# 3. CPU-Bound Tasks (data processing, calculations)
27# ✅ Use multiprocessing
28def cpu_bound_task(n):
29 return sum(i * i for i in range(n))
30
31def cpu_bound_multiprocessing():
32 with multiprocessing.Pool() as pool:
33 results = pool.map(cpu_bound_task, [10000000] * 4)
34 # Uses multiple CPU cores
35
36# ❌ Don't use asyncio for CPU-bound (GIL prevents parallelism)
37async def cpu_bound_async_bad(): # This won't help!
38 await asyncio.gather(*[
39 asyncio.to_thread(cpu_bound_task, 10000000)
40 for _ in range(4)
41 ])
42
43# Summary:
44# asyncio: I/O-bound, many connections, single core
45# threading: I/O-bound with blocking libs, shared state
46# multiprocessing: CPU-bound, need parallelism, no GIL
Error Handling and Timeouts
1import asyncio
2
3# 1. Timeouts
4async def fetch_with_timeout(url, timeout=5):
5 try:
6 async with asyncio.timeout(timeout):
7 # Operation must complete within 5 seconds
8 result = await fetch_data(url)
9 return result
10 except asyncio.TimeoutError:
11 print(f"Request to {url} timed out")
12 return None
13
14# 2. Handle individual task failures
15async def fetch_all_ignore_errors(urls):
16 tasks = [fetch_with_timeout(url) for url in urls]
17 results = await asyncio.gather(*tasks, return_exceptions=True)
18
19 # Filter out errors
20 successful = [r for r in results if not isinstance(r, Exception)]
21 failed = [r for r in results if isinstance(r, Exception)]
22
23 return successful, failed
24
25# 3. Retry with exponential backoff
26async def fetch_with_retry(url, max_retries=3):
27 for attempt in range(max_retries):
28 try:
29 return await fetch_data(url)
30 except aiohttp.ClientError as e:
31 if attempt == max_retries - 1:
32 raise
33 wait_time = 2 ** attempt # 1s, 2s, 4s
34 print(f"Retry {attempt + 1} after {wait_time}s")
35 await asyncio.sleep(wait_time)
36
37# 4. Graceful shutdown
38async def main():
39 tasks = []
40 try:
41 # Create long-running tasks
42 tasks = [
43 asyncio.create_task(worker(i))
44 for i in range(10)
45 ]
46 await asyncio.gather(*tasks)
47 except KeyboardInterrupt:
48 print("Shutting down gracefully...")
49 # Cancel all tasks
50 for task in tasks:
51 task.cancel()
52 # Wait for cancellation
53 await asyncio.gather(*tasks, return_exceptions=True)
54
55# 5. Task groups (Python 3.11+)
56async def main_with_taskgroup():
57 async with asyncio.TaskGroup() as tg:
58 task1 = tg.create_task(fetch_data(1))
59 task2 = tg.create_task(fetch_data(2))
60 # If any task fails, all tasks are cancelled
Performance Tips
- Use connection pooling with aiohttp.TCPConnector(limit=100)
- Batch database queries with executemany() instead of individual queries
- Profile with python -m cProfile or py-spy for bottlenecks
- Use asyncio.create_task() to fire-and-forget background tasks
- Avoid blocking calls - wrap with asyncio.to_thread() if needed
- Set appropriate timeouts to prevent hanging forever
- Use asyncio.Queue for producer-consumer patterns
- Limit concurrent tasks with Semaphore to prevent resource exhaustion
- Use asyncio.Event for coordination between coroutines
- Consider uvloop for 2-4x faster event loop (drop-in replacement)