Kinesis to S3: Real-World Practices for Scalable, Cost-Efficient Streaming Ingestion
Kinesis-to-S3 ingestion forms the backbone of modern AWS data lakes, yet naive implementations quickly become unsustainable. Excessive S3 PUTs, tiny files, and poorly partitioned data are common symptoms—fixing them requires experience beyond the AWS docs.
Operational Reality
A typical engineering request:
“We want app logs and analytics events flowing into S3, usable by Athena, with costs controlled and query latencies low.”
First attempt: trigger Lambda on Kinesis arrivals, write each record directly to S3. Fast to build—expensive to operate.
Problems surface fast:
- S3 request rates spike: thousands of tiny files per minute, each incurring PUT costs.
- Athena queries crawl: partition pruning is ineffective, data scattered.
- S3 console usage: a morass of unusably fine-grained objects.
Mitigating these issues involves batch processing, file compression, and temporal partitioning.
Smart Batching: The First Line of Defense
Rather than individually uploading each event, buffer records before writing to S3. For Lambda architectures, keep buffers in /tmp
(512 MB ephemeral storage available as of Lambda runtime). For containerized consumers: use process memory or a disk mount.
Practical approach:
- Buffer size: Aggregate ≥5 MB (AWS S3 multipart minimum) or batch by short timer (e.g., 60 seconds).
- Batch window: Use ingestion timestamps to group by time slices rather than invocation times—critical for late or re-ordered events.
Lambda Example (Python 3.11, boto3==1.28):
import boto3, base64, gzip, time
from io import BytesIO
S3_KEY_FMT = 'stream-data/{partition}/part-{stamp}.json.gz'
BUFFER_LIMIT = 5 * 1024 * 1024
TIME_WINDOW_SEC = 60
bucket = 'my-bucket'
buffer, last_flush = [], time.time()
def flush_buffer():
if not buffer:
return
event_time = int(last_flush)
partition = time.strftime('year=%Y/month=%m/day=%d/hour=%H', time.gmtime(event_time))
key = S3_KEY_FMT.format(partition=partition, stamp=int(time.time()))
buf = BytesIO()
with gzip.GzipFile(fileobj=buf, mode='w') as gz:
for record in buffer:
gz.write(record + b'\n')
boto3.client('s3').put_object(Bucket=bucket, Key=key, Body=buf.getvalue())
def lambda_handler(event, context):
global buffer, last_flush
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data'])
buffer.append(payload)
if (sum(len(x) for x in buffer) >= BUFFER_LIMIT
or time.time() - last_flush > TIME_WINDOW_SEC):
flush_buffer()
buffer, last_flush = [], time.time()
Gotcha: Lambda’s internal state only persists during the same container instance. Cold start = empty buffer. This isn’t bulletproof for “exactly-once” semantics.
Compression: Simple, Effective
Do not store raw event data; compress before uploading. Even moderate JSON or CSV shrinks 5–10x under gzip, lowering storage and network costs, and improving query speed (Athena natively reads .gz
).
- Gzip is often adequate. Snappy or Parquet for larger ETL jobs.
put_object
supports streaming uploads; you don’t need to hold the full compressed file in memory if you chunk.
Intriguing issue: cross-partition queries in Athena can timeout if you have thousands of small files. Compressing and batching helps here too.
Partitioning: The Details Matter
Time-based partitioning is essential for efficient Athena, Redshift Spectrum, and lifecycle policies.
Partition pattern:
s3://my-bucket/stream-data/year=2024/month=06/day=24/hour=16/part-1719248493.json.gz
Partition boundaries should ideally reflect the event’s inherent timestamp, not the ingestion time—especially important for late-arriving telemetry.
Non-obvious tip:
Enable object prefixing using dynamic partition keys, not static prefixes. Kinesis Data Firehose and custom consumers both support this. Avoid placing all files under a flat prefix or your S3 storage costs for List operations will balloon.
Managed Option: Kinesis Data Firehose
AWS Kinesis Data Firehose (as of 2024.06) offloads much operational burden:
- Auto-batching: Configurable buffer hints (1–15 min, 1–128 MB).
- Compression: Gzip, Snappy, Parquet (schema evolution support is limited—beware).
- Partitioning: Dynamic S3 prefix substitution (
YYYY/MM/DD/HH
etc).
Sample config (CloudFormation snippet):
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: arn:aws:s3:::my-bucket
Prefix: "stream-data/!{timestamp:yyyy/MM/dd/HH}/"
BufferingHints:
IntervalInSeconds: 300
SizeInMBs: 64
CompressionFormat: GZIP
Known issue:
Delivery lag can reach configured buffer interval (e.g., up to 15 minutes). Immediate S3 availability is not guaranteed.
Quick Reference: Kinesis-to-S3 Pipeline Checklist
Practice | Why | Key Details | Pitfalls |
---|---|---|---|
Batching | Reduce S3 costs, avoid small files | ≥5 MB/file or ≤60s batches | Lambda buffer loss on “cold” |
Compression | Shrink storage, faster queries | GZIP (default), Parquet for analytics | Athena limits: avoid too many files |
Partitioning | Enables Athena partition pruning | year=/month=/day=/hour= | Must sync external tables |
Firehose Usage | Operational simplicity | Built-in batching, prefixing | Staleness, $/GB higher |
Side Note:
If using Parquet for analytics, consider batch ETL in EMR/Spark or AWS Glue, not direct from streaming ingestion—schema drift and file splitting are non-trivial at ingestion time.
Summary:
Engineering an effective Kinesis-to-S3 pipeline is about more than wiring up events. Aggressive batching, disciplined file layout, and compression drop costs and improve analytic utility. Managed services like Firehose simplify many aspects, but come with trade-offs—balance autonomy, latency, and maintenance against billable cost and operational effort.
Questions on edge cases, or patterns for non-JSON data? Reach out. Real-world tuning always beats generic patterns.