Advanced Topics Project
Checking access...
Apply generators, decorators, and concurrency to build an async web scraper.
Project: Async Web Scraper
Create web_scraper.py:
"""An async web scraper demonstrating generators, decorators, and concurrency."""
import asyncioimport timefrom functools import wrapsfrom typing import Callable, List, Optionalfrom dataclasses import dataclass, fieldfrom collections import Counterfrom urllib.parse import urljoin, urlparse
# --- Rate Limiting Decorator ---def rate_limit(period: float): """Decorator: limit function calls to one per `period` seconds."""
def decorator(func): last_called = [0.0] # Mutable closure for state
@wraps(func) async def wrapper(*args, **kwargs): elapsed = time.perf_counter() - last_called[0] if elapsed < period: await asyncio.sleep(period - elapsed) last_called[0] = time.perf_counter() return await func(*args, **kwargs)
return wrapper return decorator
# --- Retry Decorator ---def retry(max_attempts: int = 3, delay: float = 1.0): """Decorator: retry async function on failure.""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_exception = e print(f" Retry {attempt + 1}/{max_attempts}: {e}") await asyncio.sleep(delay * (attempt + 1)) # Exponential raise last_exception return wrapper return decorator
# --- URL Generator ---def url_generator(base_url: str, paths: List[str]): """Generator that yields full URLs from base and paths.""" for path in paths: yield urljoin(base_url, path)
def sitemap_parser(sitemap_text: str): """Generator that yields URLs from a sitemap-like text.""" for line in sitemap_text.splitlines(): line = line.strip() if line and not line.startswith("#"): yield line
# --- Data Models ---@dataclassclass ScrapedPage: """Result of scraping a single page.""" url: str title: str status: int word_count: int links: List[str] = field(default_factory=list) error: Optional[str] = None
# --- Async Fetcher ---class AsyncWebScraper: """Async web scraper with rate limiting and retry."""
def __init__(self, max_concurrent: int = 5, delay: float = 0.1): self.semaphore = asyncio.Semaphore(max_concurrent) self.delay = delay self.results: List[ScrapedPage] = [] self.word_counts: Counter = Counter()
async def fetch(self, session, url: str) -> str: """Fetch a URL (simulated).""" async with self.semaphore: await asyncio.sleep(self.delay) # Simulate network # In real code, use aiohttp.ClientSession.get(url) return f"<html><title>{url}</title><body>Content of {url}</body></html>"
def parse_html(self, url: str, html: str) -> ScrapedPage: """Parse HTML content.""" import re
# Extract title title_match = re.search(r"<title>(.*?)</title>", html, re.IGNORECASE) title = title_match.group(1) if title_match else url
# Extract text (strip tags) text = re.sub(r"<[^>]+>", " ", html) words = text.split() word_count = len(words)
# Extract links links = re.findall(r'href=["\'](.*?)["\']', html, re.IGNORECASE)
return ScrapedPage( url=url, title=title, status=200, word_count=word_count, links=links[:20], # Limit links )
async def scrape_one(self, session, url: str) -> ScrapedPage: """Scrape a single URL with retry.""" try: html = await self.fetch(session, url) page = self.parse_html(url, html) self.results.append(page) self.word_counts[urlparse(url).netloc] += page.word_count return page except Exception as e: error_page = ScrapedPage(url=url, title="", status=0, word_count=0, error=str(e)) self.results.append(error_page) return error_page
async def scrape_many(self, urls: List[str]) -> List[ScrapedPage]: """Scrape multiple URLs concurrently.""" connector = type("FakeConnector", (), {})() # Simulated connector
# In real code: async with aiohttp.ClientSession() as session: tasks = [self.scrape_one(connector, url) for url in urls] return await asyncio.gather(*tasks)
def report(self): """Generate scraping report.""" successful = [p for p in self.results if not p.error] failed = [p for p in self.results if p.error]
print("\n=== Scraping Report ===") print(f"Total URLs: {len(self.results)}") print(f"Successful: {len(successful)}") print(f"Failed: {len(failed)}")
if successful: total_words = sum(p.word_count for p in successful) print(f"Total words scraped: {total_words}") print(f"Top domains: {self.word_counts.most_common(5)}")
if failed: print(f"\nFailed URLs:") for p in failed[:5]: print(f" ❌ {p.url}: {p.error}")
print("\n--- Sample Results ---") for page in successful[:3]: print(f" ✓ {page.title} ({page.word_count} words)")
# --- Pipeline: Generator + Async ---def read_urls_from_source(source: str) -> List[str]: """Read URLs from various sources using generators.""" if source.startswith(("http://", "https://")): # Single URL — just crawl it return [source] elif source.endswith(".txt"): # File with URLs try: with open(source) as f: return list(sitemap_parser(f.read())) except FileNotFoundError: print(f"File not found: {source}") return [] else: # Treat as a single URL return [source]
async def main(): print("=== Async Web Scraper ===\n")
# Demo URLs urls = [ "https://example.com/page1", "https://example.com/page2", "https://example.com/page3", "https://example.com/page4", "https://example.com/page5", "https://example.com/page6", ]
# Using generator print("URLs from generator:") for url in url_generator("https://docs.python.org", ["/3/", "/3/tutorial/", "/3/library/"]): print(f" {url}")
# Scrape print("\nScraping...") scraper = AsyncWebScraper(max_concurrent=5, delay=0.05)
start = time.perf_counter() results = await scraper.scrape_many(urls) elapsed = time.perf_counter() - start
scraper.report() print(f"\nTotal time: {elapsed:.2f}s (would be {len(urls) * 0.05:.2f}s with 5 concurrent)")
if __name__ == "__main__": asyncio.run(main())What You Practiced
| Concept | Usage |
|---|---|
| Generators | url_generator() yields URLs; sitemap_parser() yields lines |
| Async/await | async def scrape_many(), await asyncio.gather() |
| Decorators | @rate_limit and @retry wrap async fetcher |
| Semaphore | asyncio.Semaphore(max_concurrent) limits concurrent requests |
| asyncio.gather | Runs all scrape tasks concurrently |
| Closure state | last_called = [0.0] for rate limiter state |
| Counter | Tracks word count per domain |
Extensions
- Real HTTP — Replace simulated fetch with
aiohttp.ClientSession - Crawler — Follow links discovered on each page (BFS/DFS)
- Robots.txt — Respect
robots.txtwithurllib.robotparser - Caching — Cache responses to avoid re-fetching
- Streaming — Use
aiofilesto stream results to disk as they arrive