Concurrency in Python
Checking access...
Python offers three approaches to concurrency: threading (I/O-bound), multiprocessing (CPU-bound), and asyncio (I/O-bound async).
When to Use What
| Approach | Best For | GIL Impact |
|---|---|---|
threading | I/O-bound tasks (network, disk) | Yes — limited CPU parallelism |
multiprocessing | CPU-bound tasks (computation) | No — separate processes |
asyncio | I/O-bound with many connections | Single-threaded cooperative |
Threading
import threadingimport timefrom concurrent.futures import ThreadPoolExecutor
def download(url): """Simulate downloading a file.""" print(f"Downloading {url}...") time.sleep(1) # Simulate I/O print(f"Finished {url}")
urls = ["http://example.com/1", "http://example.com/2", "http://example.com/3"]
# With futures (preferred)with ThreadPoolExecutor(max_workers=3) as executor: results = list(executor.map(download, urls))# All 3 complete in ~1 second (not 3 seconds)
# Thread safety with Lockcounter = 0lock = threading.Lock()
def increment(): global counter for _ in range(100000): with lock: # Prevents race conditions counter += 1
threads = [threading.Thread(target=increment) for _ in range(4)]for t in threads: t.start()for t in threads: t.join()print(counter) # 400000Thread-Safe Data Structures
from queue import Queueimport threadingimport time
# Queue — thread-safe producer-consumerdef producer(q): for i in range(5): time.sleep(0.1) q.put(f"item-{i}") print(f"Produced item-{i}")
def consumer(q): while True: item = q.get() if item is None: # Poison pill break print(f"Consumed {item}") q.task_done()
q = Queue()producers = [threading.Thread(target=producer, args=(q,))]consumers = [threading.Thread(target=consumer, args=(q,))]
for t in producers: t.start()for t in consumers: t.start()for t in producers: t.join()q.put(None) # Signal consumers to stopfor t in consumers: t.join()Multiprocessing
from multiprocessing import Pool, Process, Queue, cpu_countimport time
def is_prime(n): """Check if n is prime (CPU-intensive).""" if n < 2: return False for i in range(2, int(n ** 0.5) + 1): if n % i == 0: return False return True
# Using Pool (maps like map() but parallel)numbers = [15485863, 15485867, 15485869, 15485917, 15485927]
start = time.time()with Pool(processes=cpu_count()) as pool: results = pool.map(is_prime, numbers)elapsed = time.time() - start
print(f"Results: {results}")print(f"Time with {cpu_count()} processes: {elapsed:.2f}s")
# Shared memory with Value/Arrayfrom multiprocessing import Value, Array
def increment_shared(v): for _ in range(100): with v.get_lock(): v.value += 1
counter = Value("i", 0) # 'i' = signed intprocesses = [Process(target=increment_shared, args=(counter,)) for _ in range(4)]for p in processes: p.start()for p in processes: p.join()print(counter.value) # 400Asyncio
import asyncioimport time
async def fetch_url(name, delay): """Simulate async HTTP request.""" print(f"Fetching {name}...") await asyncio.sleep(delay) # Non-blocking wait print(f"Completed {name}") return f"Data from {name}"
async def main(): # Sequential (slow) start = time.time() r1 = await fetch_url("page1", 2) r2 = await fetch_url("page2", 1) seq_time = time.time() - start
# Concurrent (fast!) start = time.time() r3, r4 = await asyncio.gather( fetch_url("page3", 2), fetch_url("page4", 1), ) conc_time = time.time() - start
print(f"\nSequential: {seq_time:.1f}s") print(f"Concurrent: {conc_time:.1f}s") # ~2s vs ~3s
asyncio.run(main())Async vs Sync Performance
import asyncioimport time
# Synchronous versiondef sync_download_all(): def download(n): time.sleep(0.5) # Blocking I/O return n
return [download(i) for i in range(10)]
# Asynchronous versionasync def async_download_all(): async def download(n): await asyncio.sleep(0.5) # Non-blocking return n
tasks = [download(i) for i in range(10)] return await asyncio.gather(*tasks)
start = time.time()sync_download_all()print(f"Sync: {time.time() - start:.2f}s") # ~5s
start = time.time()asyncio.run(async_download_all())print(f"Async: {time.time() - start:.2f}s") # ~0.5sAsync Context Managers and Iteration
import asyncio
class AsyncResource: async def __aenter__(self): print("Acquiring resource") await asyncio.sleep(0.1) return self
async def __aexit__(self, *args): print("Releasing resource") await asyncio.sleep(0.1)
async def work(self): await asyncio.sleep(0.1) return "Done"
async def use_resource(): async with AsyncResource() as res: result = await res.work() return result
# Async iterationclass AsyncRange: def __init__(self, n): self.n = n self.i = 0
def __aiter__(self): return self
async def __anext__(self): if self.i >= self.n: raise StopAsyncIteration await asyncio.sleep(0.1) val = self.i self.i += 1 return val
async def iterate_async(): async for num in AsyncRange(5): print(num)Choosing the Right Approach
import asyncio, threading, multiprocessing, timefrom concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def io_bound_task(item): """Network I/O, file I/O, database queries.""" time.sleep(0.5) # Simulated I/O return item * 2
def cpu_bound_task(n): """Number crunching, data processing.""" total = 0 for i in range(n): total += i ** 2 return total
# I/O-bound: threading or asynciowith ThreadPoolExecutor(max_workers=4) as pool: results = list(pool.map(io_bound_task, range(10)))# ~1.25s vs ~5s sequential
# CPU-bound: multiprocessingwith ProcessPoolExecutor(max_workers=4) as pool: results = list(pool.map(cpu_bound_task, [10_000_000] * 4))# Faster than threading for CPU workKey Takeaways
threadingfor I/O-bound work;multiprocessingfor CPU-bound workasynciofor high-concurrency I/O (many simultaneous connections)concurrent.futuresprovides consistent API (ThreadPoolExecutor,ProcessPoolExecutor)- Use
Lock(threading) orget_lock()(multiprocessing) to prevent race conditions Queuefor thread-safe producer-consumer patterns- Async functions use
async def/await; run withasyncio.run() asyncio.gather()runs tasks concurrently;asyncio.sleep()is non-blocking- The GIL limits threads for CPU work — use multiprocessing for that
- Async context managers:
__aenter__/__aexit__; async iteration:__aiter__/__anext__