Pubsub To Bigquery

Pubsub To Bigquery

Reading time1 min
#Cloud#Data#Analytics#PubSub#BigQuery#DataPipeline

How to Build a Reliable, Real-Time Data Pipeline from Pub/Sub to BigQuery Without Losing a Single Message

Ingesting streaming data reliably into BigQuery is critical for real-time analytics and decision-making. Ensuring zero data loss and maintaining throughput under load distinguishes scalable pipelines from fragile ones.

Forget simplistic tutorials that gloss over failures—this guide dives into robust strategies for exactly-once delivery, error handling, and monitoring in Pub/Sub to BigQuery pipelines, the kind of know-how that founders of data-driven companies swear by.


Why Pub/Sub to BigQuery Pipelines Are So Important

Google Cloud Pub/Sub is a fully-managed real-time messaging service designed to ingest massive amounts of streaming data. BigQuery, Google’s serverless data warehouse, provides lightning-fast SQL analytics at scale.

Together, they form a backbone for streaming analytics, powering fraud detection, IoT telemetry, user behavior tracking, and much more. But this simplicity is deceptive. Without careful planning:

  • Messages can be lost or duplicated.
  • Backpressure can overwhelm downstream systems.
  • Error handling can become a nightmare.
  • Monitoring and alerting become unreliable.

Building a reliable real-time pipeline means guaranteeing exactly-once message delivery from Pub/Sub into BigQuery, handling backpressure gracefully, and quickly surfacing failures for remediation.


Core Challenges with Pub/Sub to BigQuery Pipelines

Before we jump into the "how", let's quickly cover the common pitfalls:

  • At-least-once delivery semantics: Pub/Sub inherently guarantees at-least-once delivery, not exactly-once. That means message duplication is possible.
  • Idempotency: Your pipeline must tolerate duplicates to prevent data skew.
  • Backpressure and scaling: BigQuery streaming inserts have quotas and rate limits; overwhelming them leads to errors.
  • Error handling: Transient network or service errors should be retried; poison messages (bad data) should be quarantined without blocking the entire pipeline.
  • Monitoring: Visibility into pipeline health and message lag is essential to prevent silent failures.

Step 1: Setting Up Pub/Sub and BigQuery

Pub/Sub Topic and Subscription

Create a Pub/Sub topic to receive your streaming data:

gcloud pubsub topics create my-events-topic

Create a subscription for your pipeline to consume messages:

gcloud pubsub subscriptions create my-events-subscription \
  --topic=my-events-topic \
  --ack-deadline=60

Setting an ack-deadline higher than the expected processing time reduces premature message redelivery.

BigQuery Table Setup

Prepare a BigQuery dataset and table with a schema matching your event data. Here’s an example schema:

  • event_id (STRING) [unique message ID for deduplication]
  • event_timestamp (TIMESTAMP)
  • user_id (STRING)
  • event_type (STRING)
  • event_payload (STRING)

Creating a table:

bq mk --table my_dataset.my_events_table \
  event_id:STRING,event_timestamp:TIMESTAMP,user_id:STRING,event_type:STRING,event_payload:STRING

Step 2: Achieving Exactly-Once Delivery with Deduplication

Because Pub/Sub can deliver duplicates, you must handle idempotency on the BigQuery side.

  • Use a unique event_id (UUID or hash) generated when producing messages.
  • Use BigQuery’s insertId field during streaming inserts, which automatically deduplicates inserts with the same insertId.

Example: Streaming Insert with insertId

Using the BigQuery client libraries (Python example):

from google.cloud import bigquery
from google.cloud import pubsub_v1
import json

bq_client = bigquery.Client()
dataset_id = 'my_dataset'
table_id = 'my_events_table'

def callback(message):
    data = json.loads(message.data)
    row = {
        "event_id": data['event_id'],
        "event_timestamp": data['event_timestamp'],
        "user_id": data['user_id'],
        "event_type": data['event_type'],
        "event_payload": json.dumps(data['event_payload'])
    }

    errors = bq_client.insert_rows_json(
        f"{dataset_id}.{table_id}",
        [row],
        row_ids=[data['event_id']]  # insertId for deduplication
    )

    if errors:
        print(f"Failed to insert row: {errors}")
        message.nack()  # Let Pub/Sub redeliver
    else:
        message.ack()  # Acknowledge after successful insert

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path('your-project', 'my-events-subscription')
subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages...")

Key points:

  • row_ids param sets the insertId to your unique event_id.
  • Only ack the message if BigQuery insert succeeds.
  • Nack messages to retry failures.

Step 3: Handling Backpressure and Quotas

BigQuery streaming insert quotas:

  • 100,000 rows per second per table.
  • 1,000,000 rows per second per project.
  • 10,000 rows per second per second per table (per second per table limit varies).

If your pipeline exceeds these, inserts will be throttled or rejected.

Strategies:

  • Batch inserts: Buffer messages and insert in batches of ~500 rows.
  • Flow control in the subscriber: Limit the number of concurrent messages being processed.
  • Exponential backoff: Retry transient errors with increasing delays.
  • Use Dataflow: For high-volume pipelines, Google Cloud Dataflow manages scaling, retries, checkpoints, and batch vs streaming writes under the hood.

Example: Batch insert with manual ack:

import time

buffer = []
batch_size = 500

def callback(message):
    data = json.loads(message.data)
    buffer.append({
        "event_id": data['event_id'],
        "event_timestamp": data['event_timestamp'],
        "user_id": data['user_id'],
        "event_type": data['event_type'],
        "event_payload": json.dumps(data['event_payload'])
    })

    if len(buffer) >= batch_size:
        errors = bq_client.insert_rows_json(
            f"{dataset_id}.{table_id}",
            buffer,
            row_ids=[row['event_id'] for row in buffer]
        )
        if errors:
            print(f"Batch insert errors: {errors}")
            # Do not ack messages to trigger retries
        else:
            message.ack()
            buffer.clear()

# Implement a timer to flush the buffer every X seconds as well.

Step 4: Graceful Error Handling and Poison Message Management

  • Transient errors: Retry streaming inserts with exponential backoff.
  • Serialization or schema errors: These may be poison messages that can't be processed.
  • Dead-letter topics (DLQ): Configure a dead-letter Pub/Sub topic for messages that repeatedly fail.
  • Logging and alerts: Log errors with context and send alerts on failure spikes.

Set up a dead-letter topic and configure your subscription:

gcloud pubsub topics create my-events-dead-letter-topic

gcloud pubsub subscriptions update my-events-subscription \
  --dead-letter-topic=my-events-dead-letter-topic \
  --max-delivery-attempts=5

After 5 failed delivery attempts, messages land in the dead-letter topic for manual inspection.


Step 5: Monitoring and Alerting

Keep an eye on:

  • Subscription ack latency: High latency means slow processing.
  • Pub/Sub errors or throttling.
  • BigQuery streaming insert error rates.
  • Subscription message backlog.
  • Dead-letter topic messages.

Use Google Cloud Console dashboards or build custom alerting with:

  • Cloud Monitoring custom metrics.
  • Log-based metrics with alerts on error patterns.

Bonus: Use Dataflow for Simplified, Scalable Pipelines

Google Cloud Dataflow automates many of these concerns:

  • Built-in exactly-once processing semantics.
  • Auto-scaling to handle load.
  • Native BigQuery streaming inserts with deduplication.
  • Dead-letter sinks for failed messages.

Here’s a very simple Dataflow pipeline in Python (Apache Beam):

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class ParseAndFormat(beam.DoFn):
    def process(self, element):
        import json
        data = json.loads(element)
        yield {
            "event_id": data['event_id'],
            "event_timestamp": data['event_timestamp'],
            "user_id": data['user_id'],
            "event_type": data['event_type'],
            "event_payload": json.dumps(data['event_payload'])
        }

options = PipelineOptions(streaming=True)
with beam.Pipeline(options=options) as p:
    (p
     | "Read From PubSub" >> beam.io.ReadFromPubSub(subscription="projects/my-project/subscriptions/my-events-subscription").with_output_types(bytes)
     | "Decode" >> beam.Map(lambda x: x.decode("utf-8"))
     | "Parse and format" >> beam.ParDo(ParseAndFormat())
     | "Write to BigQuery" >> beam.io.WriteToBigQuery(
            'my-project:my_dataset.my_events_table',
            schema='event_id:STRING,event_timestamp:TIMESTAMP,user_id:STRING,event_type:STRING,event_payload:STRING',
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR' )
    )

Final Thoughts

Building a perfectly reliable pipeline from Pub/Sub to BigQuery is challenging but absolutely achievable with a combination of:

  • Idempotent writes with BigQuery insertId.
  • Proper flow control and batch-ingestion.
  • Thoughtful error handling with retries and dead-letter topics.
  • Continuous monitoring with alerting.
  • Optionally offloading complexity to managed services like Dataflow.

This infrastructure supercharges data-driven decision-making by keeping your analytical warehouse in sync with live event streams — no lost messages, no gaps in insight.


If you found this tutorial useful or have questions about implementing these patterns, feel free to reach out or leave a comment below!