Advanced Standard Library
Checking access...
argparse — Command-Line Arguments
import argparse
parser = argparse.ArgumentParser( description="Process some files.")
# Positional argument (required)parser.add_argument("input", help="Input file path")
# Optional argumentsparser.add_argument("-o", "--output", help="Output file path")parser.add_argument("-v", "--verbose", action="store_true", help="Increase verbosity")parser.add_argument("--limit", type=int, default=10, help="Number of results (default: 10)")parser.add_argument("--format", choices=["json", "csv"], default="json")
args = parser.parse_args()
print(f"Input: {args.input}")print(f"Output: {args.output}")print(f"Verbose: {args.verbose}")print(f"Limit: {args.limit}")# Usagepython script.py data.txt -o output.json --verbose --limit 20subprocess — Running System Commands
import subprocess
# Run command, capture outputresult = subprocess.run( ["ls", "-la"], capture_output=True, text=True, check=True, # Raises CalledProcessError on non-zero exit)
print(result.stdout)print(result.returncode)
# Run with shell (caution!)result = subprocess.run( "ls -la | grep py", shell=True, capture_output=True, text=True,)
# Run and ignore outputsubprocess.run(["mkdir", "-p", "new_dir"], check=True)
# Long-running command with timeouttry: result = subprocess.run( ["sleep", "10"], timeout=5, check=True, )except subprocess.TimeoutExpired: print("Command timed out")
# PIPE for streamingproc = subprocess.Popen( ["tail", "-f", "log.txt"], stdout=subprocess.PIPE, text=True,)for line in proc.stdout: print(line.strip())threading and concurrent.futures
import threadingimport timefrom concurrent.futures import ThreadPoolExecutor
# Simple threadingdef worker(name): print(f"Worker {name} starting") time.sleep(1) print(f"Worker {name} done")
threads = []for i in range(3): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start()
for t in threads: t.join() # Wait for completion
# Thread pooldef fetch_url(url): time.sleep(0.5) return f"Data from {url}"
urls = ["http://example.com", "http://test.com", "http://demo.com"]
with ThreadPoolExecutor(max_workers=3) as executor: results = list(executor.map(fetch_url, urls))
print(results)
# Thread-local datathread_local = threading.local()
def set_value(value): thread_local.my_data = value print(f"Set: {value} on {threading.current_thread().name}")
threads = [ threading.Thread(target=set_value, args=("A",)), threading.Thread(target=set_value, args=("B",)),]for t in threads: t.start()for t in threads: t.join()Caution
Python’s GIL (Global Interpreter Lock) limits CPU-bound threading. Use multiprocessing for CPU-intensive tasks, threading for I/O-bound tasks.
asyncio — Asynchronous Programming
import asyncioimport aiohttp # third-party, but standard pattern
async def fetch_data(url): """Async function with asyncio.""" print(f"Fetching {url}...") await asyncio.sleep(1) # Simulate network I/O return f"Data from {url}"
async def main(): # Run tasks concurrently tasks = [ fetch_data("http://example.com/1"), fetch_data("http://example.com/2"), fetch_data("http://example.com/3"), ]
results = await asyncio.gather(*tasks) print(results)
# Run the async event loopasyncio.run(main())multiprocessing — Parallel Processing
from multiprocessing import Pool, Process, cpu_count
def square(n): return n ** 2
# Process poolwith Pool(processes=cpu_count()) as pool: results = pool.map(square, range(10)) print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# Process classclass MyProcess(Process): def __init__(self, name): super().__init__() self.name = name
def run(self): print(f"Process {self.name} running")
processes = [MyProcess(f"Worker-{i}") for i in range(4)]for p in processes: p.start()for p in processes: p.join()logging — Advanced Logging
import logging
# Configure once at application startlogging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[ logging.FileHandler("app.log"), logging.StreamHandler(), ],)
logger = logging.getLogger(__name__)
logger.debug("Detailed debug info")logger.info("Operation completed")logger.warning("Resource running low")logger.error("Failed to connect")logger.critical("System shutting down")
# Exception loggingtry: 1 / 0except ZeroDivisionError: logger.exception("Division error occurred") # Includes traceback
# Logger hierarchymain_logger = logging.getLogger("app")sub_logger = logging.getLogger("app.database")sub_logger.info("DB query executed")functools — Function Tools
from functools import wraps, lru_cache, partial, reduce
# lru_cache — memoization@lru_cache(maxsize=128)def fibonacci(n): if n < 2: return n return fibonacci(n-1) + fibonacci(n-2)
print(fibonacci(50)) # Instant — cached
# partial — fix argumentsdef power(base, exponent): return base ** exponent
square = partial(power, exponent=2)cube = partial(power, exponent=3)
print(square(5)) # 25print(cube(5)) # 125
# reduce — accumulatefrom functools import reduceresult = reduce(lambda a, b: a * b, [1, 2, 3, 4, 5])print(result) # 120contextlib — Context Manager Utilities
from contextlib import contextmanager, suppress, redirect_stdoutimport io
# @contextmanager decorator@contextmanagerdef timed(name): import time start = time.perf_counter() try: yield finally: elapsed = time.perf_counter() - start print(f"{name} took {elapsed:.4f}s")
with timed("sleep"): time.sleep(0.5)
# suppress — ignore specific exceptionswith suppress(FileNotFoundError): open("missing.txt") # No error
# redirect_stdoutbuffer = io.StringIO()with redirect_stdout(buffer): print("Captured output")content = buffer.getvalue()Key Takeaways
argparsehandles CLI argument parsing with help generationsubprocessruns system commands; usecheck=Trueto catch failuresthreadingfor I/O concurrency;multiprocessingfor CPU parallelismasynciofor async I/O with theasync/awaitsyntaxloggingwith hierarchy and multiple handlersfunctools:lru_cache,partial,reduce,@wrapscontextlib:@contextmanager,suppress,redirect_stdout