Skip to main content

Skillber v1.0 is here!

Learn more

Aggregation Pipeline

Checking access...

The aggregation pipeline transforms documents through a series of stages — like a data processing assembly line. Each stage passes its results to the next, enabling powerful analytics and data transformations.

Pipeline Concept

Collection → $match → $group → $sort → $project → Result
↓ ↓ ↓ ↓ ↓
All docs Filtered Grouped Sorted Shaped
docs results results output

Each stage is an array entry:

db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } },
{ $project: { _id: 0, category: "$_id", total: 1 } },
]);

Core Pipeline Stages

$match

Filters documents (like find()). Put $match early to reduce data flowing through the pipeline:

// Only completed orders from 2024
db.orders.aggregate([
{ $match: {
status: "completed",
createdAt: { $gte: ISODate("2024-01-01"), $lt: ISODate("2025-01-01") },
}},
]);

$group

Groups documents by a key expression and applies accumulators:

// Total sales per category
db.orders.aggregate([
{ $group: {
_id: "$category", // Group by category field
totalSales: { $sum: "$amount" },
avgOrder: { $avg: "$amount" },
count: { $sum: 1 },
minOrder: { $min: "$amount" },
maxOrder: { $max: "$amount" },
}},
]);
// Result
[
{ _id: "Electronics", totalSales: 15000, avgOrder: 750, count: 20, ... },
{ _id: "Clothing", totalSales: 5000, avgOrder: 50, count: 100, ... },
]

Common Accumulators

AccumulatorDescriptionExample
$sumTotal or count{ $sum: "$amount" } or { $sum: 1 }
$avgAverage{ $avg: "$rating" }
$minMinimum{ $min: "$price" }
$maxMaximum{ $max: "$price" }
$firstFirst value (after sort){ $first: "$name" }
$lastLast value (after sort){ $last: "$name" }
$pushArray of values{ $push: "$name" }
$addToSetUnique array{ $addToSet: "$category" }

$sort

Sorts documents. Uses indexes if possible; otherwise sorts in memory (100MB limit):

db.orders.aggregate([
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } }, // Highest total first
]);
// Multi-field sort
{ $sort: { category: 1, total: -1 } }

$project

Shapes output — include, exclude, add computed fields:

db.orders.aggregate([
{ $project: {
_id: 0, // Exclude _id
orderId: "$_id", // Rename _id to orderId
customerName: 1, // Include as-is
total: { $round: ["$total", 2] }, // Round to 2 decimals
fullName: { $concat: ["$firstName", " ", "$lastName"] },
orderYear: { $year: "$createdAt" },
isExpensive: { $gte: ["$total", 100] },
}},
]);

$limit and $skip

db.orders.aggregate([
{ $match: { status: "completed" } },
{ $sort: { createdAt: -1 } },
{ $skip: 20 }, // Skip first 20 (pagination)
{ $limit: 10 }, // Return next 10
]);

$unwind

Deconstructs an array field — creates one document per array element:

// Input document
{ name: "Laptop", tags: ["electronics", "computers", "sale"] }
// After $unwind: "$tags"
{ name: "Laptop", tags: "electronics" }
{ name: "Laptop", tags: "computers" }
{ name: "Laptop", tags: "sale" }
// Practical: count orders per product (where orders have an items array)
db.orders.aggregate([
{ $unwind: "$items" },
{ $group: {
_id: "$items.productId",
totalSold: { $sum: "$items.quantity" },
}},
]);

$lookup (JOIN)

Performs a left outer join with another collection:

// Orders collection
{ _id: 1, customerId: "abc", items: ["prod1", "prod2"] }
// Customers collection
{ _id: "abc", name: "Alice", email: "alice@example.com" }
// Join orders with customers
db.orders.aggregate([
{ $lookup: {
from: "customers", // Foreign collection
localField: "customerId", // Field in orders
foreignField: "_id", // Field in customers
as: "customer", // Output array field
}},
{ $unwind: "$customer" }, // Deconstruct array
{ $project: {
customerName: "$customer.name",
customerEmail: "$customer.email",
items: 1,
}},
]);
// Result
{ customerName: "Alice", customerEmail: "alice@example.com", items: ["prod1", "prod2"] }

$addFields

Adds new fields without affecting existing ones:

db.orders.aggregate([
{ $addFields: {
totalWithTax: { $multiply: ["$total", 1.1] },
orderYear: { $year: "$createdAt" },
orderMonth: { $month: "$createdAt" },
}},
]);

Real-World Examples

Sales Dashboard

db.orders.aggregate([
// Step 1: Filter to last 30 days
{ $match: {
createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) },
}},
// Step 2: Group by day
{ $group: {
_id: { $dateToString: { format: "%Y-%m-%d", date: "$createdAt" } },
dailyRevenue: { $sum: "$total" },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$total" },
}},
// Step 3: Sort by date
{ $sort: { _id: 1 } },
// Step 4: Calculate running total
{ $setWindowFields: {
sortBy: { _id: 1 },
output: {
runningRevenue: { $sum: "$dailyRevenue", window: { documents: ["unbounded", "current"] } },
},
}},
// Step 5: Format output
{ $project: {
_id: 0,
date: "$_id",
dailyRevenue: { $round: ["$dailyRevenue", 2] },
orderCount: 1,
avgOrderValue: { $round: ["$avgOrderValue", 2] },
runningRevenue: { $round: ["$runningRevenue", 2] },
}},
]);

Customer Analysis

db.orders.aggregate([
// Unwind order items
{ $unwind: "$items" },
// Group by customer
{ $group: {
_id: "$customerId",
totalSpent: { $sum: "$items.price" },
orderCount: { $sum: 1 },
categories: { $addToSet: "$items.category" },
firstPurchase: { $min: "$createdAt" },
lastPurchase: { $max: "$createdAt" },
}},
// Lookup customer details
{ $lookup: {
from: "customers",
localField: "_id",
foreignField: "_id",
as: "info",
}},
{ $unwind: "$info" },
// Add computed fields
{ $addFields: {
avgOrderValue: { $divide: ["$totalSpent", "$orderCount"] },
daysSinceLastPurchase: {
$dateDiff: { startDate: "$lastPurchase", endDate: "$$NOW", unit: "day" },
},
categoryCount: { $size: "$categories" },
}},
// Customer segmentation
{ $addFields: {
segment: {
$switch: {
branches: [
{ case: { $gte: ["$totalSpent", 10000] }, then: "VIP" },
{ case: { $gte: ["$totalSpent", 5000] }, then: "Premium" },
{ case: { $gte: ["$totalSpent", 1000] }, then: "Regular" },
],
default: "Occasional",
},
},
}},
// Sort by value
{ $sort: { totalSpent: -1 } },
]);

Data Export (JSON Report)

db.orders.aggregate([
{ $match: { status: "completed" } },
{ $lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customer",
}},
{ $unwind: "$customer" },
{ $project: {
_id: 0,
orderDate: { $dateToString: { format: "%Y-%m-%d", date: "$createdAt" } },
customerEmail: "$customer.email",
customerName: "$customer.name",
items: {
$map: {
input: "$items",
as: "item",
in: { product: "$$item.name", qty: "$$item.quantity", price: "$$item.price" },
},
},
total: { $round: ["$total", 2] },
}},
{ $sort: { orderDate: -1 } },
]);

Pipeline Optimization

1. $match Early

Filter as early as possible to reduce documents flowing through:

// ❌ Slow — filters after group
db.orders.aggregate([
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $match: { total: { $gt: 10000 } } },
]);
// ✅ Fast — filter before group
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $match: { total: { $gt: 10000 } } },
]);

2. $sort Before $group

If the sort field is indexed:

db.orders.aggregate([
{ $sort: { createdAt: -1 } }, // Uses index
{ $limit: 100 },
{ $group: { _id: "$category", recentCount: { $sum: 1 } } },
]);

3. Use Indexes

$match and $sort at the start of the pipeline can use indexes:

// This can use an index on { status: 1, createdAt: -1 }
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $sort: { createdAt: -1 } },
{ $group: ... },
]);

Pipeline Limits

ConstraintLimit
Memory per stage100 MB (default)
BSON document size16 MB
Pipeline stagesNo hard limit
$lookup results100 MB (if allowDiskUse: true, 1 GB)

Enable disk use for large pipelines:

db.orders.aggregate([
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } },
], { allowDiskUse: true });

Quick Reference

// Common pipeline stages
{ $match: { status: "active" } }
{ $group: { _id: "$field", total: { $sum: 1 } } }
{ $sort: { field: -1 } }
{ $project: { field: 1, _id: 0 } }
{ $limit: 10 }
{ $unwind: "$arrayField" }
{ $lookup: { from: "collection", localField: "x", foreignField: "y", as: "result" } }
{ $addFields: { newField: { $multiply: ["$a", 2] } } }
// Common expressions
{ $sum: "$field" }, { $avg: "$field" }
{ $year: "$date" }, { $month: "$date" }
{ $toUpper: "$field" }, { $concat: ["a", "b"] }
{ $cond: [if, then, else] }

Practice Exercises

  1. Category report: Using an orders collection, write an aggregation that groups orders by category. Calculate total revenue, average order value, and order count per category. Sort by revenue descending. Only include categories with revenue over $1000.

  2. Customer orders with $lookup: Join orders with customers using $lookup. For each customer, calculate total spent, order count, and most recent order date. Include customer name and email in the output.

  3. Product popularity: Use $unwind on an orders items array, then group by product to find total quantity sold per product. Sort by popularity. Add a field calculating the percentage of total orders each product represents.

  4. Time series analysis: Group orders by month using $dateToString. Calculate month-over-month growth percentage using $setWindowFields. Output: month, revenue, growth%.