Mongodb To Redshift

Mongodb To Redshift

Reading time1 min
#Data#Cloud#Analytics#MongoDB#Redshift#ETL

Efficient Migration of Real-Time Operational Data from MongoDB to Redshift

Operational pipelines frequently face a trade-off: MongoDB provides flexible, high-performance write operations for semi-structured data, but demanding analytics workloads strain primary databases and lack the advanced SQL/DWH capabilities organizations expect. Redshift, meanwhile, handles OLAP queries at scale with efficient storage and native AWS ecosystem integration.

Below: a pipeline design—proven in production—to move transactional data from MongoDB 4.2+ into Redshift with minimal lag, strict consistency, and zero interruption to live systems.


Core Motivations

  • MongoDB strengths: rapid write throughput, schema-on-read design, tolerance for evolving JSON documents. Used for orders, session logs, IoT signals, etc.
  • Where it falters: aggregations degrade cluster performance, ad-hoc analytics starve operational workloads, BI tooling support is limited.
  • Redshift strengths: MPP (Massively Parallel Processing) for complex joins; columnar storage for fast scans; seamless connection to QuickSight, Athena, SageMaker, and other AWS services.

Critically, running analytics in Redshift not only preserves OLTP database health but also exposes workloads to enterprise-grade tooling—unavailable (or awkward) on MongoDB.


Practical Obstacles Encountered

  1. Data Latency: Hourly or daily ETL windows simply push staleness into decision-making.
  2. Consistency: Naive ingestion risks out-of-order or partial event capture (double-counting or missing updates).
  3. Schema Incompatibility: Mongo’s document shape is nontrivial to flatten—Redshift expects every row to fit a table. No native JSONB support.
  4. Continuous Operation: Business systems run 24/7; downtime windows are unacceptable.

Most “quick fix” ETL approaches break: batch dumps induce lock contention, and point-in-time replication is rarely sufficient.


Reference Architecture

MongoDB (change streams/oplog) ──► Kafka/Kinesis ──► S3 (staging, raw) ──► AWS Glue/Spark ──► Redshift (via COPY+upsert)

Components are loosely coupled to preserve event order and scale each stage independently.


1. Change Data Capture (CDC) from MongoDB

Relying on MongoDB 4.0+ Change Streams, CDC enables continuous, low-impact observation of inserts, updates, and deletes.

Example (Node.js, native driver 4.x):

const { MongoClient } = require('mongodb');

async function monitor() {
  const client = new MongoClient('mongodb://mongo.prod.cluster:27017', { useUnifiedTopology: true });
  await client.connect();
  const coll = client.db('main').collection('orders');

  const stream = coll.watch([], { fullDocument: 'updateLookup' });

  stream.on('change', event => {
    // Forward event downstream (Kafka, etc)
    if (!event.documentKey || !event.fullDocument) {
      console.error('Malformed event:', JSON.stringify(event));
      // Common in deletes
      return;
    }
    // Example event shape logged for validation
    /*
      {
        "_id": {...},
        "operationType": "update",
        "fullDocument": {...},
        "documentKey": {...}
      }
    */
  });
}

monitor().catch(err => console.error(err));

Known gotcha: Change Streams require replica sets; standalone deployments aren’t eligible. Lag between primary and secondaries directly impacts event recency.

Alternative: For sharded clusters with legacy versions, Debezium (Kafka Connect) can tail MongoDB oplog but requires additional setup and careful checkpointing (see FAQ on handling crash recovery).


2. Stream Changes: Buffer Layer

For durability and decoupling, pipe events into Kafka 2.x or AWS Kinesis Data Streams. Both provide order guarantees per partition/shard and durable retention (24+ hours).

  • Kafka: Better in hybrid/self-hosted setups; explicit offset management.
  • Kinesis: Simpler for pure-cloud/AWS; automatic scaling, integrates directly with Firehose and Lambda.

Trade-off note: Kinesis offers easier scaling but can introduce ~1 second batching lag.

Schema discipline: All CDC events must be enriched with op type, timestamp, and a unique document key in upstream producers to prevent ambiguity downstream.


3. Transformation and Enrichment

AWS Glue (PySpark 3.x ETL jobs) or custom Spark jobs running on EMR handle denormalization:

  • Flatten nested documents: e.g., orders with embedded items
  • Type casting: Convert MongoDB’s flexible types to Redshift-compatible (ISO8601 to TIMESTAMP, numbers to DECIMAL(18,4), etc)
  • Upsert and deletion handling: Each event tagged with operationType; deletions must be propagated (i.e., create “tombstone” events).

Sample Glue script fragment:

# Glue job: process CDC JSON from S3
import sys
from awsglue.context import GlueContext
from pyspark.context import SparkContext

sc = SparkContext()
glueContext = GlueContext(sc)

# CDC JSON to DataFrame
df = glueContext.create_dynamic_frame.from_catalog(database="cdc_db", table_name="orders_cdc")

flat_df = df.toDF().selectExpr(
    "documentKey.order_id as order_id",
    "fullDocument.customer_id as customer_id",
    "operationType",
    "fullDocument.created_at as created_at",
    "eventTime"
)

# Write back to S3 for Redshift ingestion (partitioned by date)
flat_df.write.parquet("s3://datalake/processed/orders/", mode="append")

Non-obvious tip: Use Glue’s job bookmarks to avoid reprocessing previously handled CDC objects—reduces duplicate events with minimal bookkeeping.


4. Loading into Redshift

Redshift’s COPY command is the workhorse for high-throughput ingestion from S3.

Example command (run via Lambda or cron):

COPY staging.orders
FROM 's3://datalake/processed/orders/YYYY/MM/DD/'
IAM_ROLE 'arn:aws:iam::<acct>:role/RedshiftCopyRole'
FORMAT AS PARQUET;

After loading to staging:

BEGIN;

-- Remove outdated versions of records for upsert
DELETE FROM analytics.orders a
USING staging.orders s
WHERE a.order_id = s.order_id;

INSERT INTO analytics.orders
SELECT * FROM staging.orders;

COMMIT;

Known issue: Redshift’s lack of native “upsert” (until late 2023) means manual delete-then-insert is required. Watch for “row not found” warnings; expect minor performance dip with keys that see high update rates.

For deletes: propagate deletion markers (e.g., operationType = 'delete') and perform explicit DELETE from analytics.orders as a separate step.


Monitoring and Latency

  • Change Stream lag: Check rs.printReplicationInfo() output regularly.
  • End-to-end delay: Instrument Kafka/Kinesis (e.g., using CloudWatch metrics) and S3 ingestion times.
  • Redshift ingest errors: Expect transient “File not found in S3” or “Missing required fields”—fix source-side duct tape jobs first.

Tip: Set up synthetic inserts to MongoDB and measure until arrival in Redshift; automate alerting (>5 min) for troubleshooting.


Summary Table

ComponentPrimary FunctionNotes
MongoDBOperational data source (OLTP)Replica set, >=4.0 required for Change Streams
Kafka/KinesisDurable event bufferPartitioning by collection or documentId
AWS Glue/SparkTransform, flatten, enrichPySpark codebase; job bookmarks advised
Amazon S3Staging/raw data lakePartitioned by ingest date
Amazon RedshiftAnalytical warehouse (OLAP)Manual upsert management

Side Notes and Real-World Constraints

  • Schema drift: Expect document structure to evolve in MongoDB; build Glue scripts defensively.
  • Switchover: To test, run pipeline in parallel. Compare row counts and spot-check for “missing updates” using known _id values.
  • Alternative approaches: For lower throughput or POC, consider AWS DMS if strong consistency is not critical (but be aware that DMS struggles with advanced transformations).

Closing Observations

No ETL pipeline for MongoDB → Redshift is turnkey—especially with near real-time requirements. Each step (CDC tap, event buffering, ETL, upsert) has edge cases and monitorable points of failure. Expect to iterate, but the outlined architecture handles high-velocity, mutable datasets without sacrificing performance or consistency.

Over time, further optimize Spark transform jobs and incorporate automatic schema evolution support. Eventually, Redshift’s newer MERGE command may remove need for manual upsert scripting; for now, workarounds remain necessary.

For playbooks, extended Glue job templates, or live troubleshooting logs, contact your cloud operations team or data engineering guild.