Skip to main content

Skillber v1.0 is here!

Learn more

Standard Library Project

Checking access...

Apply the standard library — re, collections, datetime, argparse, pathlib — to build a log analyzer.

Project: Log File Analyzer

Create log_analyzer.py:

"""A powerful log file analyzer using standard library modules."""
import argparse
import re
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
class LogAnalyzer:
"""Analyze server log files."""
# Common log formats
APACHE_COMBINED = re.compile(
r'(?P<ip>\S+) \S+ \S+ '
r'\[(?P<time>[^\]]+)\] '
r'"(?P<method>\S+) (?P<path>\S+) \S+" '
r'(?P<status>\d{3}) (?P<size>\S+) '
r'"(?P<referer>[^"]*)" '
r'"(?P<ua>[^"]*)"'
)
ERROR_PATTERNS = {
"404": "Not Found",
"500": "Internal Server Error",
"403": "Forbidden",
"401": "Unauthorized",
"503": "Service Unavailable",
}
def __init__(self, log_path: str):
self.log_path = Path(log_path)
self.entries: List[Dict] = []
self.errors: List[Dict] = []
def parse(self) -> int:
"""Parse the log file and return entry count."""
if not self.log_path.exists():
raise FileNotFoundError(f"Log not found: {self.log_path}")
content = self.log_path.read_text(encoding="utf-8", errors="ignore")
for line in content.splitlines():
match = self.APACHE_COMBINED.match(line)
if match:
entry = match.groupdict()
try:
entry["time"] = datetime.strptime(
entry["time"], "%d/%b/%Y:%H:%M:%S %z"
)
except ValueError:
entry["time"] = None
entry["size"] = int(entry["size"]) if entry["size"] != "-" else 0
self.entries.append(entry)
if entry["status"] in self.ERROR_PATTERNS:
self.errors.append(entry)
return len(self.entries)
def status_code_summary(self) -> Counter:
"""Count status codes."""
return Counter(e["status"] for e in self.entries)
def top_paths(self, n: int = 10) -> List[Tuple[str, int]]:
"""Most requested paths."""
paths = Counter(e["path"] for e in self.entries)
return paths.most_common(n)
def top_ips(self, n: int = 10) -> List[Tuple[str, int]]:
"""Most active IP addresses."""
ips = Counter(e["ip"] for e in self.entries)
return ips.most_common(n)
def hourly_distribution(self) -> Dict[int, int]:
"""Requests per hour of the day."""
hourly = Counter()
for e in self.entries:
if e["time"]:
hourly[e["time"].hour] += 1
return dict(sorted(hourly.items()))
def error_rate(self) -> float:
"""Percentage of requests that resulted in errors."""
if not self.entries:
return 0.0
return (len(self.errors) / len(self.entries)) * 100
def response_size_stats(self) -> Dict[str, float]:
"""Statistics on response sizes."""
sizes = [e["size"] for e in self.entries]
if not sizes:
return {}
return {
"total": sum(sizes),
"avg": sum(sizes) / len(sizes),
"max": max(sizes),
"min": min(sizes),
}
def generate_report(self) -> str:
"""Generate a comprehensive analysis report."""
lines = []
lines.append("=" * 60)
lines.append("LOG ANALYSIS REPORT")
lines.append(f"File: {self.log_path}")
lines.append(f"Generated: {datetime.now():%Y-%m-%d %H:%M:%S}")
lines.append("=" * 60)
# Overview
lines.append(f"\nTotal entries: {len(self.entries)}")
lines.append(f"Total errors: {len(self.errors)}")
lines.append(f"Error rate: {self.error_rate():.1f}%")
# Status codes
lines.append(f"\n--- Status Codes ---")
for status, count in self.status_code_summary().most_common():
label = self.ERROR_PATTERNS.get(status, "OK")
lines.append(f" {status} ({label}): {count}")
# Top paths
lines.append(f"\n--- Top 10 Paths ---")
for path, count in self.top_paths(10):
lines.append(f" {count:5d} {path}")
# Top IPs
lines.append(f"\n--- Top 10 IPs ---")
for ip, count in self.top_ips(10):
lines.append(f" {count:5d} {ip}")
# Hourly distribution
lines.append(f"\n--- Hourly Distribution ---")
hourly = self.hourly_distribution()
if hourly:
max_count = max(hourly.values())
for hour in range(24):
count = hourly.get(hour, 0)
bar = "" * int((count / max_count) * 40) if max_count else ""
lines.append(f" {hour:02d}:00 {count:5d} {bar}")
# Response sizes
lines.append(f"\n--- Response Sizes ---")
sizes = self.response_size_stats()
if sizes:
lines.append(f" Total: {sizes['total']:,} bytes")
lines.append(f" Avg: {sizes['avg']:,.0f} bytes")
lines.append(f" Max: {sizes['max']:,} bytes")
lines.append(f" Min: {sizes['min']:,} bytes")
lines.append("\n" + "=" * 60)
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Analyze Apache combined log files",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
log_analyzer.py access.log
log_analyzer.py access.log --top-ips 20 --no-report
log_analyzer.py /var/log/apache2/access.log --output analysis.txt
""",
)
parser.add_argument("logfile", help="Path to log file")
parser.add_argument("--top-paths", type=int, default=10,
help="Number of top paths to show")
parser.add_argument("--top-ips", type=int, default=10,
help="Number of top IPs to show")
parser.add_argument("--output", "-o",
help="Save report to file")
parser.add_argument("--no-report", action="store_true",
help="Don't print report, just summary")
args = parser.parse_args()
try:
analyzer = LogAnalyzer(args.logfile)
count = analyzer.parse()
print(f"Parsed {count} log entries")
if args.no_report:
print(f" Statuses: {dict(analyzer.status_code_summary().most_common())}")
print(f" Error rate: {analyzer.error_rate():.1f}%")
else:
report = analyzer.generate_report()
print(report)
if args.output:
Path(args.output).write_text(report)
print(f"\nReport saved: {args.output}")
except FileNotFoundError as e:
print(f"Error: {e}")
except PermissionError:
print(f"Error: Permission denied reading {args.logfile}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()

Sample Log Data

Save this as sample.log to test:

192.168.1.1 - - [10/Dec/2024:13:55:36 -0500] "GET /index.html HTTP/1.1" 200 2326 "https://example.com/" "Mozilla/5.0"
192.168.1.2 - - [10/Dec/2024:13:56:10 -0500] "GET /about HTTP/1.1" 200 1542 "-" "curl/7.68"
192.168.1.1 - - [10/Dec/2024:13:57:22 -0500] "POST /api/login HTTP/1.1" 401 128 "https://example.com/login" "Mozilla/5.0"
10.0.0.1 - - [10/Dec/2024:14:01:45 -0500] "GET /products HTTP/1.1" 200 4521 "-" "Python/3.9"
192.168.1.3 - - [10/Dec/2024:14:02:33 -0500] "GET /images/logo.png HTTP/1.1" 404 234 "https://example.com/" "Mozilla/5.0"
192.168.1.1 - - [10/Dec/2024:14:05:18 -0500] "GET /dashboard HTTP/1.1" 500 523 "https://example.com/login" "Mozilla/5.0"
10.0.0.2 - - [10/Dec/2024:14:10:00 -0500] "GET /index.html HTTP/1.1" 200 2326 "-" "wget/1.21"
192.168.1.2 - - [10/Dec/2024:14:15:22 -0500] "POST /api/data HTTP/1.1" 201 89 "https://example.com/" "Mozilla/5.0"

What You Practiced

ModuleUsage
reParsing Apache combined log format with named groups
collectionsCounter for status/IP/path counts; defaultdict for grouping
datetimeParsing log timestamps, report generation timestamp
argparseCLI with positional, optional, type, and choice arguments
pathlibFile existence check, reading/writing, path manipulation
typingFull type annotations on all functions

Extensions

  1. GeoIP lookup — Use an IP geolocation API to map IPs to countries
  2. Anomaly detection — Detect sudden spikes in traffic or error rates
  3. Real-time monitoring — Use watchdog to analyze logs as they’re written
  4. Visualization — Generate HTML report with charts using matplotlib
  5. Multi-file analysis — Accept glob patterns like logs/*.log