Reliable Real-Time Data Ingestion: Pub/Sub to BigQuery, Minus the Data Loss
Building a production-grade streaming pipeline from Google Cloud Pub/Sub to BigQuery involves more upfront planning and rigor than cloud blog examples usually admit. In real estates, missed or duplicated messages cut straight to data integrity; few things spiral faster than an undetected analytics drift due to messy event sinks.
Problem: Streaming Data, Real-World Failures
Transactional event data powers risk assessment, growth metrics, and alerting in most SaaS ecosystems. The trio of Pub/Sub (for ingestion scale), BigQuery (for low-latency SQL over petabyte-scale datasets), and custom logic in the middle forms the canonical Google Cloud event backbone.
Theory suggests wiring these up is “simple.” In practice, operational detail is everything:
- Duplicate or dropped messages snarl analytic outputs.
- Rate limits (not visible until hit) silently throttle or block inserts.
- Uncaught poison events halt pipelines or lead to data drift over time.
- Monitors lag behind failures—unacknowledged for hours.
Almost every data incident I’ve seen traces back to one of these.
Decompose the Pipeline: Pub/Sub, Event Processor, BigQuery
Diagram:
[ Producer(s) ]
|
v
[ Pub/Sub Topic ] ----> [ Dead-letter Topic ]
|
v
[ Consumer / Processor ]
|
v
[ BigQuery Table ]
Control points:
- Pub/Sub: Delivery at-least-once, per docs (and reality). No free lunch on exactly-once.
- Processing layer: Must enforce idempotency, manage backpressure, own retry semantics.
- BigQuery: Deduplication possible during streaming inserts if
insertId
logic is correct.
Versioning note: Details below were stable as of Google Cloud SDK 469+ and Python client libraries google-cloud-pubsub>=2.18.4
, google-cloud-bigquery>=3.11.4
.
Initial Setup (Topics, Subscription, Table)
Create Pub/Sub resources:
gcloud pubsub topics create my-events-topic
gcloud pubsub subscriptions create my-events-subscription \
--topic=my-events-topic \
--ack-deadline=60
Note: If you routinely process events >60s, increase --ack-deadline
. Underestimating this is the main cause of repeated redeliveries in high-latency extracts.
Create BigQuery Table:
Schema should be explicit, and every event must include a globally unique identifier—ideally a UUID from the producer, never a simple hash.
bq mk --table my_dataset.my_events_table \
event_id:STRING,event_timestamp:TIMESTAMP,user_id:STRING,event_type:STRING,event_payload:STRING
Avoid schema looseness—reject JSON blobs when upstream teams “accidentally” mutate event structure.
Message Deduplication: Achieving Idempotency
Pub/Sub may redeliver (and, rarely, even after explicit ack). It’s the pipeline’s job to tolerate this.
Non-negotiable: Use BigQuery streaming API’s insertId
field, populated with your unique event_id
. BigQuery drops subsequent inserts with the same insertId within a deduplication window (currently at least 1 minute).
Python Example: Streaming Insert with Deduplication
from google.cloud import pubsub_v1, bigquery
import json
bq = bigquery.Client()
table = "my_dataset.my_events_table"
subscriber = pubsub_v1.SubscriberClient()
buffer = []
def process_event(message):
data = json.loads(message.data)
payload = {
"event_id": data["event_id"],
"event_timestamp": data["event_timestamp"],
"user_id": data["user_id"],
"event_type": data["event_type"],
"event_payload": json.dumps(data.get("event_payload", {}))
}
result = bq.insert_rows_json(
table,
[payload],
row_ids=[payload["event_id"]]
)
if not result:
message.ack()
else:
print("BigQuery insert failed:", result)
message.nack() # Retries
subscription = subscriber.subscription_path("your-project", "my-events-subscription")
subscriber.subscribe(subscription, callback=process_event)
Side Note: There’s no deduplication if you leave insertId blank or just hash the event payload. Use true upstream-unique IDs generated at event origination time.
Backpressure: BigQuery Quotas and Pipeline Throttling
BigQuery streaming insert limits (subject to change—check doc version):
Unit | Limit |
---|---|
Rows/sec/table | 100,000 |
Rows/sec/project | 1,000,000 |
Requests/sec/table | 10,000 |
Symptoms of throttle:
BigQueryError: Exceeded rate limits: too many table insert requests per second for this table
Practical Countermeasures
- Batch Writes: Aggregate up to ~500 rows per insert. Below 100, the network/computation overhead dominates.
- Subscriber Flow Control: Limit both outstanding messages and bytes (linear scaling breaks otherwise).
- Exponential Backoff: For
rateLimitExceeded
orinternalError
, retry with (e.g.) capped 2^N s delays. - Load Smoothing: (If possible) Temporally shard producers, or use Dataflow’s windowing.
Example: Batch Insert with Buffer
import time
buffer, max_batch = [], 500
last_flush = time.time()
def buffer_events(message):
buffer.append(json.loads(message.data))
# Flush on batch size or 2s interval
if len(buffer) >= max_batch or (time.time() - last_flush) > 2:
errors = bq.insert_rows_json(
table,
buffer,
row_ids=[row["event_id"] for row in buffer]
)
if not errors:
for _ in buffer: message.ack()
buffer.clear()
last_flush = time.time()
Known issue: Batch size that’s too large increases chance of “all-or-nothing” failure due to a single bad row.
Error Handling: Poison Messages, Retrying, Dead-Letter Strategies
Most production outages trace to unhandled data shape changes. Without a dead-letter queue (DLQ), a single poison event stalls the whole subscription.
Implementation:
- Dead-letter topic: Any message failing insert >N times redirects for manual review.
- Retries: Apply for transient errors, but not on serialization or schema mismatches.
- Structured logging: Log offending event ID, trace, and error details (
messageId
,attributes
, etc).
Configure Dead-letter in Pub/Sub:
gcloud pubsub topics create my-events-dead-letter-topic
gcloud pubsub subscriptions update my-events-subscription \
--dead-letter-topic=my-events-dead-letter-topic \
--max-delivery-attempts=5
Missed step: Not all client SDKs expose DLQ natively—hook these at infra level, not just inside handler code.
Monitoring and Alerting: Don’t Trust “Green” Dashboards
Watch for:
- Ack Latency (
subscription/ack_message_request_count
vssubscription/pull_request_count
): Indicates lag. - Subscription Backlogs: Sudden spikes = pipeline blockages.
- BigQuery Insert Error Rate: Spurious increases indicate schema drift or rate capping.
- DLQ Depth: If this grows, someone’s broken the contract.
Google Cloud Monitoring can be configured with these metrics. Real-world practice: Log-based alerting with Cloud Logging sinks is more responsive when error patterns shift subtly.
Scaling Up: When to Use Dataflow
Manual pipelines are fine for <50k events/sec. Beyond that (or if strict exactly-once semantics are enforced), use Google Cloud Dataflow + Apache Beam. Dataflow abstracts checkpointing, retries, batching, deduplication, and DLQ—all via pipeline options.
Example: Minimal Apache Beam Pipeline
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class Parse(beam.DoFn):
def process(self, element):
import json
d = json.loads(element)
yield {
"event_id": d["event_id"],
"event_timestamp": d["event_timestamp"],
"user_id": d["user_id"],
"event_type": d["event_type"],
"event_payload": json.dumps(d.get("event_payload", {}))
}
p = beam.Pipeline(options=PipelineOptions(streaming=True))
(p
| beam.io.ReadFromPubSub(subscription="projects/my-project/subscriptions/my-events-subscription")
| beam.Map(lambda x: x.decode("utf-8"))
| beam.ParDo(Parse())
| beam.io.WriteToBigQuery(
table='my-project:my_dataset.my_events_table',
schema='event_id:STRING,event_timestamp:TIMESTAMP,user_id:STRING,event_type:STRING,event_payload:STRING',
insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR')
)
p.run()
Pro-Tip: Dataflow pipelines are not always cheaper. For trivial scale, managed Batch jobs suffice.
Non-Obvious: Data Consistency Roach-Motel
Even with insertId deduplication, BigQuery doesn’t guarantee “no duplicates ever”—the deduplication guarantee holds for a rolling interval (60s+), not infinitely. For regulatory-grade pipelines, downstream validation is still required (usually via maintenance queries).
Summary
A resilient Pub/Sub→BigQuery pipeline comprises:
- Upstream-generated, unique event IDs;
- Use of BigQuery streaming API’s insertId for deduplication;
- Batching; explicit backoff and quota awareness;
- Dead-letter topics for poison event isolation;
- Metric-based monitoring, not just “it ran” logging.
Trade-off: Pure streaming introduces some inevitable complexities; for infrequent, low-latency updates, consider hybrid microbatching plus audit queries.
Final note: anyone running these pipelines in production should budget for schema evolution, contract tests, and (periodically) offloading archived tables for cost and performance stability.
Questions, edge-case war stories, or tuning tips—address them with context and logs. Debug output always speeds resolution.