Standard Library Project
Checking access...
Apply the standard library — re, collections, datetime, argparse, pathlib — to build a log analyzer.
Project: Log File Analyzer
Create log_analyzer.py:
"""A powerful log file analyzer using standard library modules."""
import argparseimport refrom collections import Counter, defaultdictfrom datetime import datetimefrom pathlib import Pathfrom typing import Dict, List, Optional, Tuple
class LogAnalyzer: """Analyze server log files."""
# Common log formats APACHE_COMBINED = re.compile( r'(?P<ip>\S+) \S+ \S+ ' r'\[(?P<time>[^\]]+)\] ' r'"(?P<method>\S+) (?P<path>\S+) \S+" ' r'(?P<status>\d{3}) (?P<size>\S+) ' r'"(?P<referer>[^"]*)" ' r'"(?P<ua>[^"]*)"' )
ERROR_PATTERNS = { "404": "Not Found", "500": "Internal Server Error", "403": "Forbidden", "401": "Unauthorized", "503": "Service Unavailable", }
def __init__(self, log_path: str): self.log_path = Path(log_path) self.entries: List[Dict] = [] self.errors: List[Dict] = []
def parse(self) -> int: """Parse the log file and return entry count.""" if not self.log_path.exists(): raise FileNotFoundError(f"Log not found: {self.log_path}")
content = self.log_path.read_text(encoding="utf-8", errors="ignore")
for line in content.splitlines(): match = self.APACHE_COMBINED.match(line) if match: entry = match.groupdict() try: entry["time"] = datetime.strptime( entry["time"], "%d/%b/%Y:%H:%M:%S %z" ) except ValueError: entry["time"] = None entry["size"] = int(entry["size"]) if entry["size"] != "-" else 0
self.entries.append(entry)
if entry["status"] in self.ERROR_PATTERNS: self.errors.append(entry)
return len(self.entries)
def status_code_summary(self) -> Counter: """Count status codes.""" return Counter(e["status"] for e in self.entries)
def top_paths(self, n: int = 10) -> List[Tuple[str, int]]: """Most requested paths.""" paths = Counter(e["path"] for e in self.entries) return paths.most_common(n)
def top_ips(self, n: int = 10) -> List[Tuple[str, int]]: """Most active IP addresses.""" ips = Counter(e["ip"] for e in self.entries) return ips.most_common(n)
def hourly_distribution(self) -> Dict[int, int]: """Requests per hour of the day.""" hourly = Counter() for e in self.entries: if e["time"]: hourly[e["time"].hour] += 1 return dict(sorted(hourly.items()))
def error_rate(self) -> float: """Percentage of requests that resulted in errors.""" if not self.entries: return 0.0 return (len(self.errors) / len(self.entries)) * 100
def response_size_stats(self) -> Dict[str, float]: """Statistics on response sizes.""" sizes = [e["size"] for e in self.entries] if not sizes: return {} return { "total": sum(sizes), "avg": sum(sizes) / len(sizes), "max": max(sizes), "min": min(sizes), }
def generate_report(self) -> str: """Generate a comprehensive analysis report.""" lines = [] lines.append("=" * 60) lines.append("LOG ANALYSIS REPORT") lines.append(f"File: {self.log_path}") lines.append(f"Generated: {datetime.now():%Y-%m-%d %H:%M:%S}") lines.append("=" * 60)
# Overview lines.append(f"\nTotal entries: {len(self.entries)}") lines.append(f"Total errors: {len(self.errors)}") lines.append(f"Error rate: {self.error_rate():.1f}%")
# Status codes lines.append(f"\n--- Status Codes ---") for status, count in self.status_code_summary().most_common(): label = self.ERROR_PATTERNS.get(status, "OK") lines.append(f" {status} ({label}): {count}")
# Top paths lines.append(f"\n--- Top 10 Paths ---") for path, count in self.top_paths(10): lines.append(f" {count:5d} {path}")
# Top IPs lines.append(f"\n--- Top 10 IPs ---") for ip, count in self.top_ips(10): lines.append(f" {count:5d} {ip}")
# Hourly distribution lines.append(f"\n--- Hourly Distribution ---") hourly = self.hourly_distribution() if hourly: max_count = max(hourly.values()) for hour in range(24): count = hourly.get(hour, 0) bar = "█" * int((count / max_count) * 40) if max_count else "" lines.append(f" {hour:02d}:00 {count:5d} {bar}")
# Response sizes lines.append(f"\n--- Response Sizes ---") sizes = self.response_size_stats() if sizes: lines.append(f" Total: {sizes['total']:,} bytes") lines.append(f" Avg: {sizes['avg']:,.0f} bytes") lines.append(f" Max: {sizes['max']:,} bytes") lines.append(f" Min: {sizes['min']:,} bytes")
lines.append("\n" + "=" * 60) return "\n".join(lines)
def main(): parser = argparse.ArgumentParser( description="Analyze Apache combined log files", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""Examples: log_analyzer.py access.log log_analyzer.py access.log --top-ips 20 --no-report log_analyzer.py /var/log/apache2/access.log --output analysis.txt """, )
parser.add_argument("logfile", help="Path to log file") parser.add_argument("--top-paths", type=int, default=10, help="Number of top paths to show") parser.add_argument("--top-ips", type=int, default=10, help="Number of top IPs to show") parser.add_argument("--output", "-o", help="Save report to file") parser.add_argument("--no-report", action="store_true", help="Don't print report, just summary")
args = parser.parse_args()
try: analyzer = LogAnalyzer(args.logfile) count = analyzer.parse() print(f"Parsed {count} log entries")
if args.no_report: print(f" Statuses: {dict(analyzer.status_code_summary().most_common())}") print(f" Error rate: {analyzer.error_rate():.1f}%") else: report = analyzer.generate_report() print(report)
if args.output: Path(args.output).write_text(report) print(f"\nReport saved: {args.output}")
except FileNotFoundError as e: print(f"Error: {e}") except PermissionError: print(f"Error: Permission denied reading {args.logfile}") except Exception as e: print(f"Error: {e}")
if __name__ == "__main__": main()Sample Log Data
Save this as sample.log to test:
192.168.1.1 - - [10/Dec/2024:13:55:36 -0500] "GET /index.html HTTP/1.1" 200 2326 "https://example.com/" "Mozilla/5.0"192.168.1.2 - - [10/Dec/2024:13:56:10 -0500] "GET /about HTTP/1.1" 200 1542 "-" "curl/7.68"192.168.1.1 - - [10/Dec/2024:13:57:22 -0500] "POST /api/login HTTP/1.1" 401 128 "https://example.com/login" "Mozilla/5.0"10.0.0.1 - - [10/Dec/2024:14:01:45 -0500] "GET /products HTTP/1.1" 200 4521 "-" "Python/3.9"192.168.1.3 - - [10/Dec/2024:14:02:33 -0500] "GET /images/logo.png HTTP/1.1" 404 234 "https://example.com/" "Mozilla/5.0"192.168.1.1 - - [10/Dec/2024:14:05:18 -0500] "GET /dashboard HTTP/1.1" 500 523 "https://example.com/login" "Mozilla/5.0"10.0.0.2 - - [10/Dec/2024:14:10:00 -0500] "GET /index.html HTTP/1.1" 200 2326 "-" "wget/1.21"192.168.1.2 - - [10/Dec/2024:14:15:22 -0500] "POST /api/data HTTP/1.1" 201 89 "https://example.com/" "Mozilla/5.0"What You Practiced
| Module | Usage |
|---|---|
re | Parsing Apache combined log format with named groups |
collections | Counter for status/IP/path counts; defaultdict for grouping |
datetime | Parsing log timestamps, report generation timestamp |
argparse | CLI with positional, optional, type, and choice arguments |
pathlib | File existence check, reading/writing, path manipulation |
typing | Full type annotations on all functions |
Extensions
- GeoIP lookup — Use an IP geolocation API to map IPs to countries
- Anomaly detection — Detect sudden spikes in traffic or error rates
- Real-time monitoring — Use
watchdogto analyze logs as they’re written - Visualization — Generate HTML report with charts using
matplotlib - Multi-file analysis — Accept glob patterns like
logs/*.log