Skip to main content

Skillber v1.0 is here!

Learn more

Advanced Standard Library

Checking access...

argparse — Command-Line Arguments

import argparse
parser = argparse.ArgumentParser(
description="Process some files."
)
# Positional argument (required)
parser.add_argument("input", help="Input file path")
# Optional arguments
parser.add_argument("-o", "--output", help="Output file path")
parser.add_argument("-v", "--verbose", action="store_true",
help="Increase verbosity")
parser.add_argument("--limit", type=int, default=10,
help="Number of results (default: 10)")
parser.add_argument("--format", choices=["json", "csv"],
default="json")
args = parser.parse_args()
print(f"Input: {args.input}")
print(f"Output: {args.output}")
print(f"Verbose: {args.verbose}")
print(f"Limit: {args.limit}")
Terminal window
# Usage
python script.py data.txt -o output.json --verbose --limit 20

subprocess — Running System Commands

import subprocess
# Run command, capture output
result = subprocess.run(
["ls", "-la"],
capture_output=True,
text=True,
check=True, # Raises CalledProcessError on non-zero exit
)
print(result.stdout)
print(result.returncode)
# Run with shell (caution!)
result = subprocess.run(
"ls -la | grep py",
shell=True,
capture_output=True,
text=True,
)
# Run and ignore output
subprocess.run(["mkdir", "-p", "new_dir"], check=True)
# Long-running command with timeout
try:
result = subprocess.run(
["sleep", "10"],
timeout=5,
check=True,
)
except subprocess.TimeoutExpired:
print("Command timed out")
# PIPE for streaming
proc = subprocess.Popen(
["tail", "-f", "log.txt"],
stdout=subprocess.PIPE,
text=True,
)
for line in proc.stdout:
print(line.strip())

threading and concurrent.futures

import threading
import time
from concurrent.futures import ThreadPoolExecutor
# Simple threading
def worker(name):
print(f"Worker {name} starting")
time.sleep(1)
print(f"Worker {name} done")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join() # Wait for completion
# Thread pool
def fetch_url(url):
time.sleep(0.5)
return f"Data from {url}"
urls = ["http://example.com", "http://test.com", "http://demo.com"]
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch_url, urls))
print(results)
# Thread-local data
thread_local = threading.local()
def set_value(value):
thread_local.my_data = value
print(f"Set: {value} on {threading.current_thread().name}")
threads = [
threading.Thread(target=set_value, args=("A",)),
threading.Thread(target=set_value, args=("B",)),
]
for t in threads:
t.start()
for t in threads:
t.join()

Caution

Python’s GIL (Global Interpreter Lock) limits CPU-bound threading. Use multiprocessing for CPU-intensive tasks, threading for I/O-bound tasks.

asyncio — Asynchronous Programming

import asyncio
import aiohttp # third-party, but standard pattern
async def fetch_data(url):
"""Async function with asyncio."""
print(f"Fetching {url}...")
await asyncio.sleep(1) # Simulate network I/O
return f"Data from {url}"
async def main():
# Run tasks concurrently
tasks = [
fetch_data("http://example.com/1"),
fetch_data("http://example.com/2"),
fetch_data("http://example.com/3"),
]
results = await asyncio.gather(*tasks)
print(results)
# Run the async event loop
asyncio.run(main())

multiprocessing — Parallel Processing

from multiprocessing import Pool, Process, cpu_count
def square(n):
return n ** 2
# Process pool
with Pool(processes=cpu_count()) as pool:
results = pool.map(square, range(10))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Process class
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f"Process {self.name} running")
processes = [MyProcess(f"Worker-{i}") for i in range(4)]
for p in processes:
p.start()
for p in processes:
p.join()

logging — Advanced Logging

import logging
# Configure once at application start
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
logger.debug("Detailed debug info")
logger.info("Operation completed")
logger.warning("Resource running low")
logger.error("Failed to connect")
logger.critical("System shutting down")
# Exception logging
try:
1 / 0
except ZeroDivisionError:
logger.exception("Division error occurred") # Includes traceback
# Logger hierarchy
main_logger = logging.getLogger("app")
sub_logger = logging.getLogger("app.database")
sub_logger.info("DB query executed")

functools — Function Tools

from functools import wraps, lru_cache, partial, reduce
# lru_cache — memoization
@lru_cache(maxsize=128)
def fibonacci(n):
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
print(fibonacci(50)) # Instant — cached
# partial — fix arguments
def power(base, exponent):
return base ** exponent
square = partial(power, exponent=2)
cube = partial(power, exponent=3)
print(square(5)) # 25
print(cube(5)) # 125
# reduce — accumulate
from functools import reduce
result = reduce(lambda a, b: a * b, [1, 2, 3, 4, 5])
print(result) # 120

contextlib — Context Manager Utilities

from contextlib import contextmanager, suppress, redirect_stdout
import io
# @contextmanager decorator
@contextmanager
def timed(name):
import time
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"{name} took {elapsed:.4f}s")
with timed("sleep"):
time.sleep(0.5)
# suppress — ignore specific exceptions
with suppress(FileNotFoundError):
open("missing.txt") # No error
# redirect_stdout
buffer = io.StringIO()
with redirect_stdout(buffer):
print("Captured output")
content = buffer.getvalue()

Key Takeaways

  • argparse handles CLI argument parsing with help generation
  • subprocess runs system commands; use check=True to catch failures
  • threading for I/O concurrency; multiprocessing for CPU parallelism
  • asyncio for async I/O with the async/await syntax
  • logging with hierarchy and multiple handlers
  • functools: lru_cache, partial, reduce, @wraps
  • contextlib: @contextmanager, suppress, redirect_stdout