Aggregation Pipeline
Checking access...
The aggregation pipeline transforms documents through a series of stages — like a data processing assembly line. Each stage passes its results to the next, enabling powerful analytics and data transformations.
Pipeline Concept
Collection → $match → $group → $sort → $project → Result ↓ ↓ ↓ ↓ ↓All docs Filtered Grouped Sorted Shaped docs results results outputEach stage is an array entry:
db.orders.aggregate([ { $match: { status: "completed" } }, { $group: { _id: "$category", total: { $sum: "$amount" } } }, { $sort: { total: -1 } }, { $project: { _id: 0, category: "$_id", total: 1 } },]);Core Pipeline Stages
$match
Filters documents (like find()). Put $match early to reduce data flowing through the pipeline:
// Only completed orders from 2024db.orders.aggregate([ { $match: { status: "completed", createdAt: { $gte: ISODate("2024-01-01"), $lt: ISODate("2025-01-01") }, }},]);$group
Groups documents by a key expression and applies accumulators:
// Total sales per categorydb.orders.aggregate([ { $group: { _id: "$category", // Group by category field totalSales: { $sum: "$amount" }, avgOrder: { $avg: "$amount" }, count: { $sum: 1 }, minOrder: { $min: "$amount" }, maxOrder: { $max: "$amount" }, }},]);
// Result[ { _id: "Electronics", totalSales: 15000, avgOrder: 750, count: 20, ... }, { _id: "Clothing", totalSales: 5000, avgOrder: 50, count: 100, ... },]Common Accumulators
| Accumulator | Description | Example |
|---|---|---|
$sum | Total or count | { $sum: "$amount" } or { $sum: 1 } |
$avg | Average | { $avg: "$rating" } |
$min | Minimum | { $min: "$price" } |
$max | Maximum | { $max: "$price" } |
$first | First value (after sort) | { $first: "$name" } |
$last | Last value (after sort) | { $last: "$name" } |
$push | Array of values | { $push: "$name" } |
$addToSet | Unique array | { $addToSet: "$category" } |
$sort
Sorts documents. Uses indexes if possible; otherwise sorts in memory (100MB limit):
db.orders.aggregate([ { $group: { _id: "$category", total: { $sum: "$amount" } } }, { $sort: { total: -1 } }, // Highest total first]);
// Multi-field sort{ $sort: { category: 1, total: -1 } }$project
Shapes output — include, exclude, add computed fields:
db.orders.aggregate([ { $project: { _id: 0, // Exclude _id orderId: "$_id", // Rename _id to orderId customerName: 1, // Include as-is total: { $round: ["$total", 2] }, // Round to 2 decimals fullName: { $concat: ["$firstName", " ", "$lastName"] }, orderYear: { $year: "$createdAt" }, isExpensive: { $gte: ["$total", 100] }, }},]);$limit and $skip
db.orders.aggregate([ { $match: { status: "completed" } }, { $sort: { createdAt: -1 } }, { $skip: 20 }, // Skip first 20 (pagination) { $limit: 10 }, // Return next 10]);$unwind
Deconstructs an array field — creates one document per array element:
// Input document{ name: "Laptop", tags: ["electronics", "computers", "sale"] }
// After $unwind: "$tags"{ name: "Laptop", tags: "electronics" }{ name: "Laptop", tags: "computers" }{ name: "Laptop", tags: "sale" }
// Practical: count orders per product (where orders have an items array)db.orders.aggregate([ { $unwind: "$items" }, { $group: { _id: "$items.productId", totalSold: { $sum: "$items.quantity" }, }},]);$lookup (JOIN)
Performs a left outer join with another collection:
// Orders collection{ _id: 1, customerId: "abc", items: ["prod1", "prod2"] }
// Customers collection{ _id: "abc", name: "Alice", email: "alice@example.com" }
// Join orders with customersdb.orders.aggregate([ { $lookup: { from: "customers", // Foreign collection localField: "customerId", // Field in orders foreignField: "_id", // Field in customers as: "customer", // Output array field }}, { $unwind: "$customer" }, // Deconstruct array { $project: { customerName: "$customer.name", customerEmail: "$customer.email", items: 1, }},]);
// Result{ customerName: "Alice", customerEmail: "alice@example.com", items: ["prod1", "prod2"] }$addFields
Adds new fields without affecting existing ones:
db.orders.aggregate([ { $addFields: { totalWithTax: { $multiply: ["$total", 1.1] }, orderYear: { $year: "$createdAt" }, orderMonth: { $month: "$createdAt" }, }},]);Real-World Examples
Sales Dashboard
db.orders.aggregate([ // Step 1: Filter to last 30 days { $match: { createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }, }},
// Step 2: Group by day { $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$createdAt" } }, dailyRevenue: { $sum: "$total" }, orderCount: { $sum: 1 }, avgOrderValue: { $avg: "$total" }, }},
// Step 3: Sort by date { $sort: { _id: 1 } },
// Step 4: Calculate running total { $setWindowFields: { sortBy: { _id: 1 }, output: { runningRevenue: { $sum: "$dailyRevenue", window: { documents: ["unbounded", "current"] } }, }, }},
// Step 5: Format output { $project: { _id: 0, date: "$_id", dailyRevenue: { $round: ["$dailyRevenue", 2] }, orderCount: 1, avgOrderValue: { $round: ["$avgOrderValue", 2] }, runningRevenue: { $round: ["$runningRevenue", 2] }, }},]);Customer Analysis
db.orders.aggregate([ // Unwind order items { $unwind: "$items" },
// Group by customer { $group: { _id: "$customerId", totalSpent: { $sum: "$items.price" }, orderCount: { $sum: 1 }, categories: { $addToSet: "$items.category" }, firstPurchase: { $min: "$createdAt" }, lastPurchase: { $max: "$createdAt" }, }},
// Lookup customer details { $lookup: { from: "customers", localField: "_id", foreignField: "_id", as: "info", }}, { $unwind: "$info" },
// Add computed fields { $addFields: { avgOrderValue: { $divide: ["$totalSpent", "$orderCount"] }, daysSinceLastPurchase: { $dateDiff: { startDate: "$lastPurchase", endDate: "$$NOW", unit: "day" }, }, categoryCount: { $size: "$categories" }, }},
// Customer segmentation { $addFields: { segment: { $switch: { branches: [ { case: { $gte: ["$totalSpent", 10000] }, then: "VIP" }, { case: { $gte: ["$totalSpent", 5000] }, then: "Premium" }, { case: { $gte: ["$totalSpent", 1000] }, then: "Regular" }, ], default: "Occasional", }, }, }},
// Sort by value { $sort: { totalSpent: -1 } },]);Data Export (JSON Report)
db.orders.aggregate([ { $match: { status: "completed" } }, { $lookup: { from: "customers", localField: "customerId", foreignField: "_id", as: "customer", }}, { $unwind: "$customer" }, { $project: { _id: 0, orderDate: { $dateToString: { format: "%Y-%m-%d", date: "$createdAt" } }, customerEmail: "$customer.email", customerName: "$customer.name", items: { $map: { input: "$items", as: "item", in: { product: "$$item.name", qty: "$$item.quantity", price: "$$item.price" }, }, }, total: { $round: ["$total", 2] }, }}, { $sort: { orderDate: -1 } },]);Pipeline Optimization
1. $match Early
Filter as early as possible to reduce documents flowing through:
// ❌ Slow — filters after groupdb.orders.aggregate([ { $group: { _id: "$category", total: { $sum: "$amount" } } }, { $match: { total: { $gt: 10000 } } },]);
// ✅ Fast — filter before groupdb.orders.aggregate([ { $match: { status: "completed" } }, { $group: { _id: "$category", total: { $sum: "$amount" } } }, { $match: { total: { $gt: 10000 } } },]);2. $sort Before $group
If the sort field is indexed:
db.orders.aggregate([ { $sort: { createdAt: -1 } }, // Uses index { $limit: 100 }, { $group: { _id: "$category", recentCount: { $sum: 1 } } },]);3. Use Indexes
$match and $sort at the start of the pipeline can use indexes:
// This can use an index on { status: 1, createdAt: -1 }db.orders.aggregate([ { $match: { status: "completed" } }, { $sort: { createdAt: -1 } }, { $group: ... },]);Pipeline Limits
| Constraint | Limit |
|---|---|
| Memory per stage | 100 MB (default) |
| BSON document size | 16 MB |
| Pipeline stages | No hard limit |
| $lookup results | 100 MB (if allowDiskUse: true, 1 GB) |
Enable disk use for large pipelines:
db.orders.aggregate([ { $group: { _id: "$category", total: { $sum: "$amount" } } }, { $sort: { total: -1 } },], { allowDiskUse: true });Quick Reference
// Common pipeline stages{ $match: { status: "active" } }{ $group: { _id: "$field", total: { $sum: 1 } } }{ $sort: { field: -1 } }{ $project: { field: 1, _id: 0 } }{ $limit: 10 }{ $unwind: "$arrayField" }{ $lookup: { from: "collection", localField: "x", foreignField: "y", as: "result" } }{ $addFields: { newField: { $multiply: ["$a", 2] } } }
// Common expressions{ $sum: "$field" }, { $avg: "$field" }{ $year: "$date" }, { $month: "$date" }{ $toUpper: "$field" }, { $concat: ["a", "b"] }{ $cond: [if, then, else] }Practice Exercises
Category report: Using an orders collection, write an aggregation that groups orders by category. Calculate total revenue, average order value, and order count per category. Sort by revenue descending. Only include categories with revenue over $1000.
Customer orders with $lookup: Join orders with customers using $lookup. For each customer, calculate total spent, order count, and most recent order date. Include customer name and email in the output.
Product popularity: Use $unwind on an orders items array, then group by product to find total quantity sold per product. Sort by popularity. Add a field calculating the percentage of total orders each product represents.
Time series analysis: Group orders by month using $dateToString. Calculate month-over-month growth percentage using $setWindowFields. Output: month, revenue, growth%.