Skip to main content

Skillber v1.0 is here!

Learn more

Data Pipeline with Pub/Sub, Dataflow, BigQuery

Checking access...

Project Overview

Build a real-time streaming pipeline that ingests e-commerce events, processes them with Apache Beam on Dataflow, and stores results in BigQuery for analysis.

Architecture

Mobile / Web App
Pub/Sub Topic (order-events)
Dataflow Streaming Pipeline (Apache Beam)
├──► BigQuery (processed purchases)
└──► Cloud Storage (dead-letter, failed records)

Prerequisites

  • A GCP project with billing enabled
  • BigQuery, Dataflow, Pub/Sub, and Cloud Storage APIs enabled
  • Python 3.9+ with apache-beam[gcp] installed

Step 1: Create Pub/Sub Topic and Subscription

Terminal window
gcloud config set project <your-project-id>
gcloud pubsub topics create order-events
gcloud pubsub subscriptions create order-sub \
--topic order-events \
--ack-deadline 60 \
--message-retention-duration 7d

Step 2: Set Up BigQuery Dataset and Table

Terminal window
bq mk --dataset my_project:analytics
bq mk \
--table \
--schema "event_id:STRING,user_id:STRING,amount:FLOAT,item_count:INTEGER,category:STRING,event_timestamp:TIMESTAMP,processing_time:TIMESTAMP" \
analytics.purchases

Step 3: Create a Cloud Storage Bucket

Terminal window
gcloud storage buckets create gs://<your-project-id>-pipeline \
--location us-central1
gcloud storage buckets create gs://<your-project-id>-deadletter \
--location us-central1

Step 4: Write the Dataflow Pipeline

pipeline.py
import argparse
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime
class ParseEvent(beam.DoFn):
def process(self, element):
try:
event = json.loads(element.decode("utf-8"))
event["processing_time"] = datetime.utcnow().isoformat()
yield event
except Exception as e:
logging.error(f"Parse error: {e}")
yield beam.pvalue.TaggedOutput("deadletter", element)
class ValidatePurchase(beam.DoFn):
def process(self, element):
required = ["event_id", "user_id", "amount", "event_timestamp"]
if all(k in element for k in required) and element["amount"] > 0:
yield element
else:
yield beam.pvalue.TaggedOutput("deadletter", element)
def run(pipeline_args=None):
pipeline_options = PipelineOptions(pipeline_args, streaming=True)
with beam.Pipeline(options=pipeline_options) as p:
events = (
p
| "ReadFromPubSub" >> beam.io.ReadFromPubSub(
subscription="projects/<your-project>/subscriptions/order-sub"
)
)
parsed = (
events
| "ParseJSON" >> beam.ParDo(ParseEvent()).with_outputs(
"deadletter", main="valid"
)
)
validated = (
parsed.valid
| "Validate" >> beam.ParDo(ValidatePurchase()).with_outputs(
"deadletter", main="clean"
)
)
(
validated.clean
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
table="<your-project>:analytics.purchases",
schema="event_id:STRING,user_id:STRING,amount:FLOAT,"
"item_count:INTEGER,category:STRING,"
"event_timestamp:TIMESTAMP,processing_time:TIMESTAMP",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
deadletter = (
{"parsed": parsed.deadletter, "validated": validated.deadletter}
| "FlattenDeadletter" >> beam.Flatten()
| "WriteDeadletter" >> beam.io.WriteToText(
"gs://<your-project>-deadletter/failed-"
)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()

Step 5: Run the Pipeline on Dataflow

Terminal window
python pipeline.py \
--runner DataflowRunner \
--project <your-project-id> \
--region us-central1 \
--temp_location gs://<your-project-id>-pipeline/temp \
--staging_location gs://<your-project-id>-pipeline/staging \
--requirements_file requirements.txt

Create requirements.txt:

apache-beam[gcp]==2.58.0

Tip

Test locally first with --runner DirectRunner and a small sample of messages before deploying to Dataflow.

Step 6: Publish Test Messages

publisher.py
from google.cloud import pubsub_v1
import json
import uuid
import random
from datetime import datetime
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("<your-project>", "order-events")
categories = ["electronics", "clothing", "home", "books"]
for i in range(100):
event = {
"event_id": str(uuid.uuid4()),
"user_id": f"user_{random.randint(1000, 9999)}",
"amount": round(random.uniform(10, 500), 2),
"item_count": random.randint(1, 5),
"category": random.choice(categories),
"event_timestamp": datetime.utcnow().isoformat()
}
data = json.dumps(event).encode("utf-8")
future = publisher.publish(topic_path, data)
print(f"Published {future.result()}")
Terminal window
python publisher.py

Step 7: Query Results in BigQuery

SELECT
category,
COUNT(*) AS order_count,
ROUND(AVG(amount), 2) AS avg_order_value,
SUM(item_count) AS total_items
FROM analytics.purchases
WHERE event_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
GROUP BY category
ORDER BY order_count DESC;

Step 8: Clean Up

Terminal window
# Stop the Dataflow job via console or
gcloud dataflow jobs list --region us-central1 --status=active
gcloud dataflow jobs cancel <job-id> --region us-central1
# Delete resources
gcloud pubsub topics delete order-events
gcloud pubsub subscriptions delete order-sub
bq rm -r --dataset analytics
gcloud storage rm -r gs://<your-project-id>-pipeline
gcloud storage rm -r gs://<your-project-id>-deadletter

Caution

Dataflow streaming jobs run continuously and incur costs until stopped. Always cancel the job when you finish the exercise.

Summary

You built a complete streaming data pipeline on GCP: Pub/Sub ingests events, Dataflow processes them with Apache Beam, and BigQuery stores the results for analytics. This is the same architecture pattern used by Google internally and by thousands of organizations for real-time data processing.