S3 To Dynamodb

S3 To Dynamodb

Reading time1 min
#AWS#Cloud#Data#S3#DynamoDB#Streaming

Efficient Streaming: AWS S3 to DynamoDB for Real-Time Analytics

Traditional batch ETL pipelines moving data from S3 to DynamoDB often introduce latency and increase operational overhead. In contrast, an event-driven streaming architecture offers sub-minute ingest times—essential for dashboards, anomaly detection, and operational analytics. Below is a practical approach to directly stream new S3 data into DynamoDB, leveraging only AWS managed services and Python 3.9 runtime.


Architecture Summary

ASCII flow:

[S3 Bucket] --(PUT event)--> [Lambda Function] --(PutItem/BatchWrite)--> [DynamoDB Table]

This pipeline enables immediate data availability with minimal infrastructure management.

Key components:

  • S3 bucket with event notifications for object creation.
  • Lambda for event processing/transformation.
  • DynamoDB table as the low-latency, indexed data store.

S3 Event Notification Configuration

Goal: Trigger Lambda asynchronously on every data update.

  1. In AWS S3 console, enable Event Notifications at the bucket level.
  2. Set the event type to ObjectCreated:Put.
  3. Route to the target Lambda function. Prefixes and suffixes can exclude backup files or irrelevant formats.

Example notification filter:

  • Prefix: incoming/
  • Suffix: .csv

Avoid excessive scope—too broad event config leads to noisy triggers and unnecessary Lambda executions.


Lambda Function: Parsing and Ingest

Permissions:

  • s3:GetObject
  • dynamodb:BatchWriteItem / dynamodb:PutItem

Real-world IAM principal policy:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject"
      ],
      "Resource": "arn:aws:s3:::your-bucket-name/incoming/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "dynamodb:BatchWriteItem",
        "dynamodb:PutItem"
      ],
      "Resource": "arn:aws:dynamodb:us-east-1:123456789012:table/YourDynamoDBTable"
    }
  ]
}

Python Lambda (tested on boto3==1.26.158):

import json
import boto3
import csv
from urllib.parse import unquote_plus

dynamodb = boto3.resource('dynamodb')
s3_client = boto3.client('s3')
TABLE_NAME = 'YourDynamoDBTable'

def lambda_handler(event, context):
    table = dynamodb.Table(TABLE_NAME)
    errors = []

    for record in event.get('Records', []):
        bucket = record['s3']['bucket']['name']
        key = unquote_plus(record['s3']['object']['key'])

        try:
            obj = s3_client.get_object(Bucket=bucket, Key=key)
        except Exception as e:
            # CloudWatch will aggregate these
            print(f"S3 get_object failed: {e}")
            errors.append(key)
            continue

        # Explicitly handle .csv—support for .json left as an exercise
        rows = obj['Body'].read().decode('utf-8').splitlines()
        reader = csv.DictReader(rows)

        with table.batch_writer(overwrite_by_pkeys=['id']) as batch:
            for row in reader:
                batch.put_item(Item={
                    'id': row['id'],
                    'name': row.get('name', None),
                    'value': int(row.get('value', 0)),
                    'timestamp': row.get('timestamp', None)
                })

    if errors:
        raise Exception(f"Failed object(s): {errors}")

    return {"statusCode": 200, "body": json.dumps("Processed")}

Practical notes:

  • For large files (>6 MB), consider splitting at upload time; Lambda memory and timeout constraints apply.
  • The above example assumes id is the primary key (string). Adjust types as per your table schema.

Gotcha:
batch_writer() retries automatically on unprocessed items up to 256 times, but does not guarantee strict ordering.


DynamoDB Table Preparation

Table schema (example):

AttributeTypeKeyExample Value
idStringHash PK"101"
nameString"Alice"
valueNumber45
timestampString"2024-06-01T12:00Z"

Configuration tips:

  • Set ReadCapacityUnits and WriteCapacityUnits conservatively, then enable autoscaling.
  • For unpredictable workloads, use On-Demand mode.
  • Avoid hot partitions—if your IDs are sequential, consider a composite key design.
  • DAX accelerates read-heavy workloads but isn’t essential unless single-digit ms SLA required.

End-to-End Test

Sample CSV (incoming/sample_data.csv):

id,name,value,timestamp
101,Alice,23,2024-06-01T12:00:00Z
102,Bob,45,2024-06-01T12:05:00Z
103,Charlie,30,2024-06-01T12:10:00Z

Upload via AWS CLI:

aws s3 cp sample_data.csv s3://your-bucket-name/incoming/

Check Lambda logs in CloudWatch:

Processing file incoming/sample_data.csv from bucket your-bucket-name

Query DynamoDB:

aws dynamodb get-item --table-name YourDynamoDBTable --key '{"id": {"S": "101"}}'

Output should mirror the CSV, arrival is typically < 5 seconds post-upload.


Production Considerations

  • Error handling: Dead-letter queue (DLQ) for failed Lambda executions. DLQ can be SQS or SNS.
  • Data validation: Implement type checks and schema versioning in Lambda; reject or quarantine invalid payloads.
  • File size limits: Lambda is capped at 512 MB per execution, 15 minutes max runtime. Step Functions are preferable for batched ingest, or preprocess large files client-side before S3 upload. For truly massive throughput, Kinesis Firehose offers managed buffer, but incurs extra latency.
  • Monitoring: Enable detailed Lambda metrics and set CloudWatch alarms on DynamoDB ThrottledRequests and Lambda ErrorCount.

Non-obvious tip:
If multiple files can arrive simultaneously, design idempotency into Lambda (e.g., upsert instead of insert-only logic) to avoid duplicate writes.


Trade-offs and Limitations

  • Eventual consistency: DynamoDB default write consistency may cause ~1s lag on cross-region reads.
  • Parquet and native columnar formats: Lambda parsing requires pyarrow/pandas layer in deployment package. Deployment package size limit (250 MB unzipped) is another cap.
  • Known Issue: On rare occasions, S3 event notifications can be delayed or dropped due to internal AWS issues (documented in AWS S3 Event Notifications caveats); always design for at-least-once delivery.

This streaming architecture minimizes data latency from data lake to analytics engine and scales automatically as usage fluctuates—ideal for real-time analytics without the operational headaches of batch ETL. There’s more flexibility using Kinesis Data Streams when strict ordering is essential, but for most uses, this Lambda-based model is cost-effective and robust.

If your use case involves more complex transformation, consider embedding validation logic with libraries such as Pydantic for type enforcement.

Summary Table: Service Limits and Bottlenecks

ServiceLimit KeyDefaultScalable
LambdaMax timeout15mNo
Lambda/tmp disk space512 MBNo
DynamoDBWCU/RCU per partition1000/3000By partition
S3Event notification delay~1s typicalN/A

For most mid-volume workloads, this pattern works with minimal friction. Alternates exist—Kinesis and Step Functions—when you hit any hard limit above.

Known imperfection: Both Lambda and S3 event notifications have operational edge cases; test with burst and failure scenarios during UAT.


Questions or edge-case scenarios? Review CloudWatch logs closely—the pipeline rarely fails silently. Adjust batch size in Lambda, or partition strategy in S3, as needed for your scale.