Skip to main content

Skillber v1.0 is here!

Learn more

Concurrency in Python

Checking access...

Python offers three approaches to concurrency: threading (I/O-bound), multiprocessing (CPU-bound), and asyncio (I/O-bound async).

When to Use What

ApproachBest ForGIL Impact
threadingI/O-bound tasks (network, disk)Yes — limited CPU parallelism
multiprocessingCPU-bound tasks (computation)No — separate processes
asyncioI/O-bound with many connectionsSingle-threaded cooperative

Threading

import threading
import time
from concurrent.futures import ThreadPoolExecutor
def download(url):
"""Simulate downloading a file."""
print(f"Downloading {url}...")
time.sleep(1) # Simulate I/O
print(f"Finished {url}")
urls = ["http://example.com/1", "http://example.com/2", "http://example.com/3"]
# With futures (preferred)
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(download, urls))
# All 3 complete in ~1 second (not 3 seconds)
# Thread safety with Lock
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # Prevents race conditions
counter += 1
threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 400000

Thread-Safe Data Structures

from queue import Queue
import threading
import time
# Queue — thread-safe producer-consumer
def producer(q):
for i in range(5):
time.sleep(0.1)
q.put(f"item-{i}")
print(f"Produced item-{i}")
def consumer(q):
while True:
item = q.get()
if item is None: # Poison pill
break
print(f"Consumed {item}")
q.task_done()
q = Queue()
producers = [threading.Thread(target=producer, args=(q,))]
consumers = [threading.Thread(target=consumer, args=(q,))]
for t in producers: t.start()
for t in consumers: t.start()
for t in producers: t.join()
q.put(None) # Signal consumers to stop
for t in consumers: t.join()

Multiprocessing

from multiprocessing import Pool, Process, Queue, cpu_count
import time
def is_prime(n):
"""Check if n is prime (CPU-intensive)."""
if n < 2:
return False
for i in range(2, int(n ** 0.5) + 1):
if n % i == 0:
return False
return True
# Using Pool (maps like map() but parallel)
numbers = [15485863, 15485867, 15485869, 15485917, 15485927]
start = time.time()
with Pool(processes=cpu_count()) as pool:
results = pool.map(is_prime, numbers)
elapsed = time.time() - start
print(f"Results: {results}")
print(f"Time with {cpu_count()} processes: {elapsed:.2f}s")
# Shared memory with Value/Array
from multiprocessing import Value, Array
def increment_shared(v):
for _ in range(100):
with v.get_lock():
v.value += 1
counter = Value("i", 0) # 'i' = signed int
processes = [Process(target=increment_shared, args=(counter,)) for _ in range(4)]
for p in processes: p.start()
for p in processes: p.join()
print(counter.value) # 400

Asyncio

import asyncio
import time
async def fetch_url(name, delay):
"""Simulate async HTTP request."""
print(f"Fetching {name}...")
await asyncio.sleep(delay) # Non-blocking wait
print(f"Completed {name}")
return f"Data from {name}"
async def main():
# Sequential (slow)
start = time.time()
r1 = await fetch_url("page1", 2)
r2 = await fetch_url("page2", 1)
seq_time = time.time() - start
# Concurrent (fast!)
start = time.time()
r3, r4 = await asyncio.gather(
fetch_url("page3", 2),
fetch_url("page4", 1),
)
conc_time = time.time() - start
print(f"\nSequential: {seq_time:.1f}s")
print(f"Concurrent: {conc_time:.1f}s") # ~2s vs ~3s
asyncio.run(main())

Async vs Sync Performance

import asyncio
import time
# Synchronous version
def sync_download_all():
def download(n):
time.sleep(0.5) # Blocking I/O
return n
return [download(i) for i in range(10)]
# Asynchronous version
async def async_download_all():
async def download(n):
await asyncio.sleep(0.5) # Non-blocking
return n
tasks = [download(i) for i in range(10)]
return await asyncio.gather(*tasks)
start = time.time()
sync_download_all()
print(f"Sync: {time.time() - start:.2f}s") # ~5s
start = time.time()
asyncio.run(async_download_all())
print(f"Async: {time.time() - start:.2f}s") # ~0.5s

Async Context Managers and Iteration

import asyncio
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, *args):
print("Releasing resource")
await asyncio.sleep(0.1)
async def work(self):
await asyncio.sleep(0.1)
return "Done"
async def use_resource():
async with AsyncResource() as res:
result = await res.work()
return result
# Async iteration
class AsyncRange:
def __init__(self, n):
self.n = n
self.i = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.i >= self.n:
raise StopAsyncIteration
await asyncio.sleep(0.1)
val = self.i
self.i += 1
return val
async def iterate_async():
async for num in AsyncRange(5):
print(num)

Choosing the Right Approach

import asyncio, threading, multiprocessing, time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def io_bound_task(item):
"""Network I/O, file I/O, database queries."""
time.sleep(0.5) # Simulated I/O
return item * 2
def cpu_bound_task(n):
"""Number crunching, data processing."""
total = 0
for i in range(n):
total += i ** 2
return total
# I/O-bound: threading or asyncio
with ThreadPoolExecutor(max_workers=4) as pool:
results = list(pool.map(io_bound_task, range(10)))
# ~1.25s vs ~5s sequential
# CPU-bound: multiprocessing
with ProcessPoolExecutor(max_workers=4) as pool:
results = list(pool.map(cpu_bound_task, [10_000_000] * 4))
# Faster than threading for CPU work

Key Takeaways

  • threading for I/O-bound work; multiprocessing for CPU-bound work
  • asyncio for high-concurrency I/O (many simultaneous connections)
  • concurrent.futures provides consistent API (ThreadPoolExecutor, ProcessPoolExecutor)
  • Use Lock (threading) or get_lock() (multiprocessing) to prevent race conditions
  • Queue for thread-safe producer-consumer patterns
  • Async functions use async def/await; run with asyncio.run()
  • asyncio.gather() runs tasks concurrently; asyncio.sleep() is non-blocking
  • The GIL limits threads for CPU work — use multiprocessing for that
  • Async context managers: __aenter__/__aexit__; async iteration: __aiter__/__anext__