Skip to main content

Skillber v1.0 is here!

Learn more

Advanced Topics Project

Checking access...

Apply generators, decorators, and concurrency to build an async web scraper.

Project: Async Web Scraper

Create web_scraper.py:

"""An async web scraper demonstrating generators, decorators, and concurrency."""
import asyncio
import time
from functools import wraps
from typing import Callable, List, Optional
from dataclasses import dataclass, field
from collections import Counter
from urllib.parse import urljoin, urlparse
# --- Rate Limiting Decorator ---
def rate_limit(period: float):
"""Decorator: limit function calls to one per `period` seconds."""
def decorator(func):
last_called = [0.0] # Mutable closure for state
@wraps(func)
async def wrapper(*args, **kwargs):
elapsed = time.perf_counter() - last_called[0]
if elapsed < period:
await asyncio.sleep(period - elapsed)
last_called[0] = time.perf_counter()
return await func(*args, **kwargs)
return wrapper
return decorator
# --- Retry Decorator ---
def retry(max_attempts: int = 3, delay: float = 1.0):
"""Decorator: retry async function on failure."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
print(f" Retry {attempt + 1}/{max_attempts}: {e}")
await asyncio.sleep(delay * (attempt + 1)) # Exponential
raise last_exception
return wrapper
return decorator
# --- URL Generator ---
def url_generator(base_url: str, paths: List[str]):
"""Generator that yields full URLs from base and paths."""
for path in paths:
yield urljoin(base_url, path)
def sitemap_parser(sitemap_text: str):
"""Generator that yields URLs from a sitemap-like text."""
for line in sitemap_text.splitlines():
line = line.strip()
if line and not line.startswith("#"):
yield line
# --- Data Models ---
@dataclass
class ScrapedPage:
"""Result of scraping a single page."""
url: str
title: str
status: int
word_count: int
links: List[str] = field(default_factory=list)
error: Optional[str] = None
# --- Async Fetcher ---
class AsyncWebScraper:
"""Async web scraper with rate limiting and retry."""
def __init__(self, max_concurrent: int = 5, delay: float = 0.1):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.delay = delay
self.results: List[ScrapedPage] = []
self.word_counts: Counter = Counter()
async def fetch(self, session, url: str) -> str:
"""Fetch a URL (simulated)."""
async with self.semaphore:
await asyncio.sleep(self.delay) # Simulate network
# In real code, use aiohttp.ClientSession.get(url)
return f"<html><title>{url}</title><body>Content of {url}</body></html>"
def parse_html(self, url: str, html: str) -> ScrapedPage:
"""Parse HTML content."""
import re
# Extract title
title_match = re.search(r"<title>(.*?)</title>", html, re.IGNORECASE)
title = title_match.group(1) if title_match else url
# Extract text (strip tags)
text = re.sub(r"<[^>]+>", " ", html)
words = text.split()
word_count = len(words)
# Extract links
links = re.findall(r'href=["\'](.*?)["\']', html, re.IGNORECASE)
return ScrapedPage(
url=url,
title=title,
status=200,
word_count=word_count,
links=links[:20], # Limit links
)
async def scrape_one(self, session, url: str) -> ScrapedPage:
"""Scrape a single URL with retry."""
try:
html = await self.fetch(session, url)
page = self.parse_html(url, html)
self.results.append(page)
self.word_counts[urlparse(url).netloc] += page.word_count
return page
except Exception as e:
error_page = ScrapedPage(url=url, title="", status=0, word_count=0, error=str(e))
self.results.append(error_page)
return error_page
async def scrape_many(self, urls: List[str]) -> List[ScrapedPage]:
"""Scrape multiple URLs concurrently."""
connector = type("FakeConnector", (), {})() # Simulated connector
# In real code: async with aiohttp.ClientSession() as session:
tasks = [self.scrape_one(connector, url) for url in urls]
return await asyncio.gather(*tasks)
def report(self):
"""Generate scraping report."""
successful = [p for p in self.results if not p.error]
failed = [p for p in self.results if p.error]
print("\n=== Scraping Report ===")
print(f"Total URLs: {len(self.results)}")
print(f"Successful: {len(successful)}")
print(f"Failed: {len(failed)}")
if successful:
total_words = sum(p.word_count for p in successful)
print(f"Total words scraped: {total_words}")
print(f"Top domains: {self.word_counts.most_common(5)}")
if failed:
print(f"\nFailed URLs:")
for p in failed[:5]:
print(f" ❌ {p.url}: {p.error}")
print("\n--- Sample Results ---")
for page in successful[:3]:
print(f" ✓ {page.title} ({page.word_count} words)")
# --- Pipeline: Generator + Async ---
def read_urls_from_source(source: str) -> List[str]:
"""Read URLs from various sources using generators."""
if source.startswith(("http://", "https://")):
# Single URL — just crawl it
return [source]
elif source.endswith(".txt"):
# File with URLs
try:
with open(source) as f:
return list(sitemap_parser(f.read()))
except FileNotFoundError:
print(f"File not found: {source}")
return []
else:
# Treat as a single URL
return [source]
async def main():
print("=== Async Web Scraper ===\n")
# Demo URLs
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
"https://example.com/page4",
"https://example.com/page5",
"https://example.com/page6",
]
# Using generator
print("URLs from generator:")
for url in url_generator("https://docs.python.org", ["/3/", "/3/tutorial/", "/3/library/"]):
print(f" {url}")
# Scrape
print("\nScraping...")
scraper = AsyncWebScraper(max_concurrent=5, delay=0.05)
start = time.perf_counter()
results = await scraper.scrape_many(urls)
elapsed = time.perf_counter() - start
scraper.report()
print(f"\nTotal time: {elapsed:.2f}s (would be {len(urls) * 0.05:.2f}s with 5 concurrent)")
if __name__ == "__main__":
asyncio.run(main())

What You Practiced

ConceptUsage
Generatorsurl_generator() yields URLs; sitemap_parser() yields lines
Async/awaitasync def scrape_many(), await asyncio.gather()
Decorators@rate_limit and @retry wrap async fetcher
Semaphoreasyncio.Semaphore(max_concurrent) limits concurrent requests
asyncio.gatherRuns all scrape tasks concurrently
Closure statelast_called = [0.0] for rate limiter state
CounterTracks word count per domain

Extensions

  1. Real HTTP — Replace simulated fetch with aiohttp.ClientSession
  2. Crawler — Follow links discovered on each page (BFS/DFS)
  3. Robots.txt — Respect robots.txt with urllib.robotparser
  4. Caching — Cache responses to avoid re-fetching
  5. Streaming — Use aiofiles to stream results to disk as they arrive