Pubsub To Bigquery

Pubsub To Bigquery

Reading time1 min
#Cloud#Data#Analytics#PubSub#BigQuery#DataPipeline

Reliable Real-Time Data Ingestion: Pub/Sub to BigQuery, Minus the Data Loss

Building a production-grade streaming pipeline from Google Cloud Pub/Sub to BigQuery involves more upfront planning and rigor than cloud blog examples usually admit. In real estates, missed or duplicated messages cut straight to data integrity; few things spiral faster than an undetected analytics drift due to messy event sinks.


Problem: Streaming Data, Real-World Failures

Transactional event data powers risk assessment, growth metrics, and alerting in most SaaS ecosystems. The trio of Pub/Sub (for ingestion scale), BigQuery (for low-latency SQL over petabyte-scale datasets), and custom logic in the middle forms the canonical Google Cloud event backbone.

Theory suggests wiring these up is “simple.” In practice, operational detail is everything:

  • Duplicate or dropped messages snarl analytic outputs.
  • Rate limits (not visible until hit) silently throttle or block inserts.
  • Uncaught poison events halt pipelines or lead to data drift over time.
  • Monitors lag behind failures—unacknowledged for hours.

Almost every data incident I’ve seen traces back to one of these.


Decompose the Pipeline: Pub/Sub, Event Processor, BigQuery

Diagram:

[ Producer(s) ]
      |
      v
[ Pub/Sub Topic ] ----> [ Dead-letter Topic ]
      |
      v
[ Consumer / Processor ]
      |
      v
[ BigQuery Table ]

Control points:

  • Pub/Sub: Delivery at-least-once, per docs (and reality). No free lunch on exactly-once.
  • Processing layer: Must enforce idempotency, manage backpressure, own retry semantics.
  • BigQuery: Deduplication possible during streaming inserts if insertId logic is correct.

Versioning note: Details below were stable as of Google Cloud SDK 469+ and Python client libraries google-cloud-pubsub>=2.18.4, google-cloud-bigquery>=3.11.4.


Initial Setup (Topics, Subscription, Table)

Create Pub/Sub resources:

gcloud pubsub topics create my-events-topic
gcloud pubsub subscriptions create my-events-subscription \
    --topic=my-events-topic \
    --ack-deadline=60

Note: If you routinely process events >60s, increase --ack-deadline. Underestimating this is the main cause of repeated redeliveries in high-latency extracts.

Create BigQuery Table:

Schema should be explicit, and every event must include a globally unique identifier—ideally a UUID from the producer, never a simple hash.

bq mk --table my_dataset.my_events_table \
  event_id:STRING,event_timestamp:TIMESTAMP,user_id:STRING,event_type:STRING,event_payload:STRING

Avoid schema looseness—reject JSON blobs when upstream teams “accidentally” mutate event structure.


Message Deduplication: Achieving Idempotency

Pub/Sub may redeliver (and, rarely, even after explicit ack). It’s the pipeline’s job to tolerate this.

Non-negotiable: Use BigQuery streaming API’s insertId field, populated with your unique event_id. BigQuery drops subsequent inserts with the same insertId within a deduplication window (currently at least 1 minute).

Python Example: Streaming Insert with Deduplication

from google.cloud import pubsub_v1, bigquery
import json

bq = bigquery.Client()
table = "my_dataset.my_events_table"
subscriber = pubsub_v1.SubscriberClient()
buffer = []

def process_event(message):
    data = json.loads(message.data)
    payload = {
        "event_id": data["event_id"],
        "event_timestamp": data["event_timestamp"],
        "user_id": data["user_id"],
        "event_type": data["event_type"],
        "event_payload": json.dumps(data.get("event_payload", {}))
    }

    result = bq.insert_rows_json(
        table,
        [payload],
        row_ids=[payload["event_id"]]
    )

    if not result:
        message.ack()
    else:
        print("BigQuery insert failed:", result)
        message.nack()  # Retries

subscription = subscriber.subscription_path("your-project", "my-events-subscription")
subscriber.subscribe(subscription, callback=process_event)

Side Note: There’s no deduplication if you leave insertId blank or just hash the event payload. Use true upstream-unique IDs generated at event origination time.


Backpressure: BigQuery Quotas and Pipeline Throttling

BigQuery streaming insert limits (subject to change—check doc version):

UnitLimit
Rows/sec/table100,000
Rows/sec/project1,000,000
Requests/sec/table10,000

Symptoms of throttle:
BigQueryError: Exceeded rate limits: too many table insert requests per second for this table

Practical Countermeasures

  • Batch Writes: Aggregate up to ~500 rows per insert. Below 100, the network/computation overhead dominates.
  • Subscriber Flow Control: Limit both outstanding messages and bytes (linear scaling breaks otherwise).
  • Exponential Backoff: For rateLimitExceeded or internalError, retry with (e.g.) capped 2^N s delays.
  • Load Smoothing: (If possible) Temporally shard producers, or use Dataflow’s windowing.

Example: Batch Insert with Buffer

import time
buffer, max_batch = [], 500
last_flush = time.time()

def buffer_events(message):
    buffer.append(json.loads(message.data))
    # Flush on batch size or 2s interval
    if len(buffer) >= max_batch or (time.time() - last_flush) > 2:
        errors = bq.insert_rows_json(
            table,
            buffer,
            row_ids=[row["event_id"] for row in buffer]
        )
        if not errors:
            for _ in buffer: message.ack()
            buffer.clear()
        last_flush = time.time()

Known issue: Batch size that’s too large increases chance of “all-or-nothing” failure due to a single bad row.


Error Handling: Poison Messages, Retrying, Dead-Letter Strategies

Most production outages trace to unhandled data shape changes. Without a dead-letter queue (DLQ), a single poison event stalls the whole subscription.

Implementation:

  • Dead-letter topic: Any message failing insert >N times redirects for manual review.
  • Retries: Apply for transient errors, but not on serialization or schema mismatches.
  • Structured logging: Log offending event ID, trace, and error details (messageId, attributes, etc).

Configure Dead-letter in Pub/Sub:

gcloud pubsub topics create my-events-dead-letter-topic
gcloud pubsub subscriptions update my-events-subscription \
    --dead-letter-topic=my-events-dead-letter-topic \
    --max-delivery-attempts=5

Missed step: Not all client SDKs expose DLQ natively—hook these at infra level, not just inside handler code.


Monitoring and Alerting: Don’t Trust “Green” Dashboards

Watch for:

  • Ack Latency (subscription/ack_message_request_count vs subscription/pull_request_count): Indicates lag.
  • Subscription Backlogs: Sudden spikes = pipeline blockages.
  • BigQuery Insert Error Rate: Spurious increases indicate schema drift or rate capping.
  • DLQ Depth: If this grows, someone’s broken the contract.

Google Cloud Monitoring can be configured with these metrics. Real-world practice: Log-based alerting with Cloud Logging sinks is more responsive when error patterns shift subtly.


Scaling Up: When to Use Dataflow

Manual pipelines are fine for <50k events/sec. Beyond that (or if strict exactly-once semantics are enforced), use Google Cloud Dataflow + Apache Beam. Dataflow abstracts checkpointing, retries, batching, deduplication, and DLQ—all via pipeline options.

Example: Minimal Apache Beam Pipeline

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class Parse(beam.DoFn):
    def process(self, element):
        import json
        d = json.loads(element)
        yield {
           "event_id": d["event_id"],
           "event_timestamp": d["event_timestamp"],
           "user_id": d["user_id"],
           "event_type": d["event_type"],
           "event_payload": json.dumps(d.get("event_payload", {}))
        }

p = beam.Pipeline(options=PipelineOptions(streaming=True))
(p
    | beam.io.ReadFromPubSub(subscription="projects/my-project/subscriptions/my-events-subscription")
    | beam.Map(lambda x: x.decode("utf-8"))
    | beam.ParDo(Parse())
    | beam.io.WriteToBigQuery(
        table='my-project:my_dataset.my_events_table',
        schema='event_id:STRING,event_timestamp:TIMESTAMP,user_id:STRING,event_type:STRING,event_payload:STRING',
        insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR')
)
p.run()

Pro-Tip: Dataflow pipelines are not always cheaper. For trivial scale, managed Batch jobs suffice.


Non-Obvious: Data Consistency Roach-Motel

Even with insertId deduplication, BigQuery doesn’t guarantee “no duplicates ever”—the deduplication guarantee holds for a rolling interval (60s+), not infinitely. For regulatory-grade pipelines, downstream validation is still required (usually via maintenance queries).


Summary

A resilient Pub/Sub→BigQuery pipeline comprises:

  • Upstream-generated, unique event IDs;
  • Use of BigQuery streaming API’s insertId for deduplication;
  • Batching; explicit backoff and quota awareness;
  • Dead-letter topics for poison event isolation;
  • Metric-based monitoring, not just “it ran” logging.

Trade-off: Pure streaming introduces some inevitable complexities; for infrequent, low-latency updates, consider hybrid microbatching plus audit queries.

Final note: anyone running these pipelines in production should budget for schema evolution, contract tests, and (periodically) offloading archived tables for cost and performance stability.

Questions, edge-case war stories, or tuning tips—address them with context and logs. Debug output always speeds resolution.