Data Pipeline with Pub/Sub, Dataflow, BigQuery
Checking access...
Project Overview
Build a real-time streaming pipeline that ingests e-commerce events, processes them with Apache Beam on Dataflow, and stores results in BigQuery for analysis.
Architecture
Mobile / Web App │ ▼Pub/Sub Topic (order-events) │ ▼Dataflow Streaming Pipeline (Apache Beam) │ ├──► BigQuery (processed purchases) │ └──► Cloud Storage (dead-letter, failed records)Prerequisites
- A GCP project with billing enabled
- BigQuery, Dataflow, Pub/Sub, and Cloud Storage APIs enabled
- Python 3.9+ with
apache-beam[gcp]installed
Step 1: Create Pub/Sub Topic and Subscription
gcloud config set project <your-project-id>
gcloud pubsub topics create order-events
gcloud pubsub subscriptions create order-sub \ --topic order-events \ --ack-deadline 60 \ --message-retention-duration 7dStep 2: Set Up BigQuery Dataset and Table
bq mk --dataset my_project:analytics
bq mk \ --table \ --schema "event_id:STRING,user_id:STRING,amount:FLOAT,item_count:INTEGER,category:STRING,event_timestamp:TIMESTAMP,processing_time:TIMESTAMP" \ analytics.purchasesStep 3: Create a Cloud Storage Bucket
gcloud storage buckets create gs://<your-project-id>-pipeline \ --location us-central1
gcloud storage buckets create gs://<your-project-id>-deadletter \ --location us-central1Step 4: Write the Dataflow Pipeline
import argparseimport jsonimport loggingimport apache_beam as beamfrom apache_beam.options.pipeline_options import PipelineOptionsfrom datetime import datetime
class ParseEvent(beam.DoFn): def process(self, element): try: event = json.loads(element.decode("utf-8")) event["processing_time"] = datetime.utcnow().isoformat() yield event except Exception as e: logging.error(f"Parse error: {e}") yield beam.pvalue.TaggedOutput("deadletter", element)
class ValidatePurchase(beam.DoFn): def process(self, element): required = ["event_id", "user_id", "amount", "event_timestamp"] if all(k in element for k in required) and element["amount"] > 0: yield element else: yield beam.pvalue.TaggedOutput("deadletter", element)
def run(pipeline_args=None): pipeline_options = PipelineOptions(pipeline_args, streaming=True)
with beam.Pipeline(options=pipeline_options) as p: events = ( p | "ReadFromPubSub" >> beam.io.ReadFromPubSub( subscription="projects/<your-project>/subscriptions/order-sub" ) )
parsed = ( events | "ParseJSON" >> beam.ParDo(ParseEvent()).with_outputs( "deadletter", main="valid" ) )
validated = ( parsed.valid | "Validate" >> beam.ParDo(ValidatePurchase()).with_outputs( "deadletter", main="clean" ) )
( validated.clean | "WriteToBigQuery" >> beam.io.WriteToBigQuery( table="<your-project>:analytics.purchases", schema="event_id:STRING,user_id:STRING,amount:FLOAT," "item_count:INTEGER,category:STRING," "event_timestamp:TIMESTAMP,processing_time:TIMESTAMP", write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED ) )
deadletter = ( {"parsed": parsed.deadletter, "validated": validated.deadletter} | "FlattenDeadletter" >> beam.Flatten() | "WriteDeadletter" >> beam.io.WriteToText( "gs://<your-project>-deadletter/failed-" ) )
if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) run()Step 5: Run the Pipeline on Dataflow
python pipeline.py \ --runner DataflowRunner \ --project <your-project-id> \ --region us-central1 \ --temp_location gs://<your-project-id>-pipeline/temp \ --staging_location gs://<your-project-id>-pipeline/staging \ --requirements_file requirements.txtCreate requirements.txt:
apache-beam[gcp]==2.58.0Tip
Test locally first with --runner DirectRunner and a small sample of messages before deploying to Dataflow.
Step 6: Publish Test Messages
from google.cloud import pubsub_v1import jsonimport uuidimport randomfrom datetime import datetime
publisher = pubsub_v1.PublisherClient()topic_path = publisher.topic_path("<your-project>", "order-events")
categories = ["electronics", "clothing", "home", "books"]for i in range(100): event = { "event_id": str(uuid.uuid4()), "user_id": f"user_{random.randint(1000, 9999)}", "amount": round(random.uniform(10, 500), 2), "item_count": random.randint(1, 5), "category": random.choice(categories), "event_timestamp": datetime.utcnow().isoformat() } data = json.dumps(event).encode("utf-8") future = publisher.publish(topic_path, data) print(f"Published {future.result()}")python publisher.pyStep 7: Query Results in BigQuery
SELECT category, COUNT(*) AS order_count, ROUND(AVG(amount), 2) AS avg_order_value, SUM(item_count) AS total_itemsFROM analytics.purchasesWHERE event_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)GROUP BY categoryORDER BY order_count DESC;Step 8: Clean Up
# Stop the Dataflow job via console orgcloud dataflow jobs list --region us-central1 --status=activegcloud dataflow jobs cancel <job-id> --region us-central1
# Delete resourcesgcloud pubsub topics delete order-eventsgcloud pubsub subscriptions delete order-subbq rm -r --dataset analyticsgcloud storage rm -r gs://<your-project-id>-pipelinegcloud storage rm -r gs://<your-project-id>-deadletterCaution
Dataflow streaming jobs run continuously and incur costs until stopped. Always cancel the job when you finish the exercise.
Summary
You built a complete streaming data pipeline on GCP: Pub/Sub ingests events, Dataflow processes them with Apache Beam, and BigQuery stores the results for analytics. This is the same architecture pattern used by Google internally and by thousands of organizations for real-time data processing.