Quay lại bài viết
20 thg 5, 2026
22 min read

Amazon Kinesis Data Streams & Data Firehose: Từ ingestion real-time đến delivery tự động vào S3, Redshift, OpenSearch

Bạn đang xây dựng một hệ thống analytics cho ứng dụng e-commerce. Mỗi giây có hàng nghìn events đổ về — clickstream, page views, add-to-cart, purchases — và bạn cần đưa toàn bộ dữ liệu này vào S3 để chạy analytics với Athena, đồng thời đẩy vào OpenSearch để team product xem dashboard real-time.

Cách đơn giản nhất? Viết một consumer application tự poll data từ stream, transform, rồi ghi xuống S3. Nhưng rồi bạn phải lo quản lý batching (gom nhiều records thành một file lớn thay vì ghi từng record một), compression, format conversion, retry logic khi S3 trả lỗi, monitor throughput, scale consumer khi traffic tăng… Bạn tốn hàng tuần chỉ để xây “đường ống” chứ chưa chạm đến phần analytics thực sự.

Đây chính là bài toán mà combo Amazon Kinesis Data StreamsAmazon Data Firehose giải quyết. Kinesis Data Streams đảm nhận phần ingestion — nhận và lưu trữ luồng dữ liệu real-time với khả năng scale tuyến tính. Data Firehose đảm nhận phần delivery — tự động transform, convert format, buffer, và ghi dữ liệu xuống storage mà bạn không cần viết một dòng consumer code nào.

Bài viết này sẽ đi sâu vào cả hai service — từ kiến trúc shard của Kinesis Data Streams (rất giống Kafka partition), các limit cố định trên mỗi shard, fan-out modes, cho đến pipeline bên trong của Data Firehose: Lambda transformation, Glue format conversion, buffering internals, và lý do kỹ thuật vì sao Firehose phải buffer data trước khi ghi.


1. Kinesis Data Streams là gì?

Amazon Kinesis Data Streams (KDS) là một dịch vụ real-time data streaming được AWS quản lý hoàn toàn — cho phép các ứng dụng thu thập, xử lý, và phân tích luồng dữ liệu liên tục với độ trễ cực thấp (dưới 200ms).

Nếu bạn đã đọc bài viết về Apache Kafka, kiến trúc của Kinesis Data Streams sẽ rất quen thuộc. Cả hai đều dựa trên mô hình distributed commit log — dữ liệu được ghi tuần tự vào các log phân tán, consumer chủ động pull data và tự quản lý vị trí đang đọc. Sự khác biệt lớn nhất là Kinesis Data Streams là fully managed — bạn không cần quản lý broker, không cần lo ZooKeeper/KRaft, không cần tuning JVM heap hay disk I/O. AWS lo toàn bộ infrastructure bên dưới.

Một vài đặc điểm quan trọng:


2. Core Concepts

Vì Kinesis Data Streams và Kafka chia sẻ cùng một mô hình kiến trúc, cách nhanh nhất để hiểu Kinesis là ánh xạ các khái niệm tương đồng:

Kinesis Data StreamsApache KafkaGiải thích
StreamTopicKênh logic chứa dữ liệu. Ví dụ: stream clickstream, stream orders.
ShardPartitionĐơn vị song song hoá — mỗi shard là một log tuần tự, append-only. Dữ liệu được phân phối vào shard dựa trên partition key.
Data RecordMessage / RecordĐơn vị dữ liệu nhỏ nhất — bao gồm partition key, sequence number, và data blob (tối đa 1 MB).
Partition KeyPartition KeyChuỗi dùng để xác định record đi vào shard nào, thông qua hàm hash: MD5(partition_key) → shard. Records cùng partition key luôn vào cùng shard.
Sequence NumberOffsetSố định danh duy nhất, tăng dần cho mỗi record trong shard — giúp consumer biết mình đang đọc đến đâu.
KCL ApplicationConsumer GroupThư viện (Kinesis Client Library) giúp nhiều consumer phối hợp xử lý data từ stream — tự động phân chia shard cho các consumer instances, quản lý checkpoint (tương tự commit offset).

Điểm khác biệt quan trọng

Với Kafka, throughput của mỗi partition phụ thuộc vào phần cứng của broker — disk speed, network bandwidth, RAM. Bạn có thể nâng cấp broker hardware để tăng throughput per partition.

Với Kinesis, throughput của mỗi shard là con số cố định do AWS quy định — không thể thay đổi. Muốn tăng throughput? Cách duy nhất là thêm shard (horizontal scaling). Đây là điểm thiết kế cốt lõi mà bạn cần nắm trước khi đi tiếp.


3. Shard — Đơn vị cốt lõi của Kinesis

3.1. Shard limits — những con số cố định

Mỗi shard trong Kinesis Data Streams có throughput cố định và không thể thay đổi:

MetricLimit
Write throughput1 MB/s per shard
Write records1,000 records/s per shard
Record size tối đa1 MB per record
Read throughput (Shared)2 MB/s per shard — chia sẻ giữa tất cả consumer
Read throughput (Enhanced)2 MB/s per shard per consumer — mỗi consumer có throughput riêng

Điều này có nghĩa: nếu stream của bạn cần nhận 10 MB/s dữ liệu, bạn cần tối thiểu 10 shards (10 shards × 1 MB/s = 10 MB/s). Muốn 100 MB/s? 100 shards. Throughput tổng tăng tuyến tính với số lượng shard.

Giống Kafka, producer dùng partition key để xác định record đi vào shard nào: MD5(partition_key) tạo ra một hash value 128-bit, giá trị này được map vào hash key range của shard tương ứng. Mỗi shard sở hữu một dải hash key range riêng, và các dải này ghép lại phủ toàn bộ không gian hash.

3.2. Shared Fan-Out vs Enhanced Fan-Out

Kinesis cung cấp hai chế độ đọc dữ liệu, ảnh hưởng trực tiếp đến throughput và latency mà consumer nhận được:

Shared Fan-Out (mặc định) — consumer gọi API GetRecords để poll data từ shard. Tất cả consumer chia sẻ chung bandwidth 2 MB/s per shard. Nếu có 5 consumer cùng đọc một shard, mỗi consumer trung bình chỉ được ~400 KB/s. Latency trung bình khoảng 200ms do cơ chế polling.

const { KinesisClient, GetShardIteratorCommand, GetRecordsCommand } = require('@aws-sdk/client-kinesis') const client = new KinesisClient({ region: 'ap-southeast-1' }) const { ShardIterator } = await client.send( new GetShardIteratorCommand({ StreamName: 'clickstream', ShardId: 'shardId-000000000000', ShardIteratorType: 'LATEST', }) ) const { Records, NextShardIterator } = await client.send( new GetRecordsCommand({ ShardIterator, Limit: 100 }) )

Enhanced Fan-Out — consumer đăng ký SubscribeToShard và nhận data qua HTTP/2 push. Mỗi consumer được cấp riêng 2 MB/s per shard, không chia sẻ với ai. 5 consumer cùng đọc một shard? Mỗi consumer vẫn được đầy đủ 2 MB/s. Latency giảm xuống ~70ms vì data được push trực tiếp đến consumer thay vì chờ consumer poll.

Trước khi subscribe, bạn cần đăng ký một stream consumer — đây là identity đại diện cho consumer application của bạn. Mỗi stream consumer được cấp riêng 2 MB/s per shard, tách biệt hoàn toàn với các consumer khác:

const { KinesisClient, RegisterStreamConsumerCommand, SubscribeToShardCommand } = require('@aws-sdk/client-kinesis') const client = new KinesisClient({ region: 'ap-southeast-1' }) const { Consumer } = await client.send( new RegisterStreamConsumerCommand({ StreamARN: 'arn:aws:kinesis:ap-southeast-1:123456789012:stream/clickstream', ConsumerName: 'analytics-consumer', }) ) const response = await client.send( new SubscribeToShardCommand({ ConsumerARN: Consumer.ConsumerARN, ShardId: 'shardId-000000000000', StartingPosition: { Type: 'LATEST' }, }) ) for await (const event of response.EventStream) { if (event.SubscribeToShardEvent) { const { Records, MillisBehindLatest } = event.SubscribeToShardEvent } }
Tiêu chíShared Fan-OutEnhanced Fan-Out
Delivery modelPull (GetRecords API)Push (SubscribeToShard, HTTP/2)
Read throughput2 MB/s per shard — chia sẻ2 MB/s per shard per consumer
Latency~200ms~70ms
Số consumer tối đaSoft limit, nhưng bandwidth chia sẻTối đa 20 consumer per stream
Chi phí thêmKhôngPhụ phí per consumer per shard-hour + data
Khi nào dùngÍt consumer, latency không criticalNhiều consumer, cần low latency

3.3. Data Retention và Replay

Kinesis Data Streams lưu trữ dữ liệu trong stream theo cấu hình retention:

Trong khoảng thời gian retention, consumer có thể replay — đọc lại dữ liệu từ bất kỳ thời điểm nào. Điều này giống với Kafka: nếu consumer có bug và xử lý sai 2 giờ dữ liệu vừa qua, bạn fix bug rồi cho consumer đọc lại từ 2 giờ trước — không mất record nào.


4. Scaling Modes

Kinesis Data Streams cung cấp hai chế độ scaling, quyết định cách bạn quản lý shard:

4.1. Provisioned Mode

Bạn tự quyết định số lượng shard cần thiết và quản lý thủ công. Khi traffic tăng, bạn thực hiện shard splitting — tách 1 shard thành 2 shard mới (chia đôi hash key range). Khi traffic giảm, bạn merge 2 shard liền kề thành 1. Mỗi thao tác split/merge mất vài giây đến vài phút.

Thanh toán theo shard-hour — bạn trả tiền cho số shard mà bạn provision, bất kể có dùng hết throughput hay không.

4.2. On-Demand Mode

AWS tự động scale số lượng shard dựa trên traffic thực tế. Bạn không cần quản lý shard — Kinesis tự thêm hoặc bớt shard khi throughput thay đổi. Giới hạn mặc định: tự động scale lên đến 200 MB/s write (tương đương 200 shards) hoặc gấp đôi peak throughput đã đạt được trong 30 ngày gần nhất — tuỳ con số nào lớn hơn.

Thanh toán theo stream-hour + per GB dữ liệu ingested và retrieved.

Tiêu chíProvisionedOn-Demand
Quản lý shardThủ công (split/merge)Tự động
Scale speedVài giây đến vài phútTự động, vài phút
Giới hạn throughputTuỳ số shard bạn provision200 MB/s write hoặc 2× peak 30 ngày
Pricing modelPer shard-hour (~$0.015/shard-hour)Per stream-hour + per GB ($0.08/GB in, $0.04/GB out)
Khi nào dùngTraffic dự đoán được, muốn tối ưu chi phíTraffic bất thường, không muốn quản lý capacity

Lưu ý: On-Demand mode phù hợp khi bạn không biết trước traffic pattern. Nhưng nếu traffic ổn định và dự đoán được, Provisioned mode thường rẻ hơn đáng kể — giống cách EC2 Reserved Instance rẻ hơn On-Demand.


5. Amazon Data Firehose là gì?

Kinesis Data Streams giải quyết bài toán ingestion — nhận và lưu trữ luồng dữ liệu real-time. Nhưng bước tiếp theo — đưa dữ liệu từ stream xuống storage (S3, Redshift, OpenSearch) — vẫn cần bạn tự viết consumer code: poll data, transform, batch, ghi xuống, xử lý lỗi, retry…

Amazon Data Firehose loại bỏ hoàn toàn phần consumer code đó. Đây là một dịch vụ fully managed delivery pipeline — bạn chỉ cần cấu hình source và destination, Firehose tự động nhận data, transform (nếu cần), buffer, và ghi xuống storage. Không cần viết code, không cần quản lý server, không cần lo scaling.

Nếu hình dung Kinesis Data Streams như một đường cao tốc nơi dữ liệu chạy qua liên tục, thì Data Firehose là đội xe tải được cử ra để thu gom hàng từ cao tốc, đóng gói lại cho gọn gàng (transform, convert format), rồi chở đến kho đích (S3, Redshift, OpenSearch).

Một vài đặc điểm quan trọng:


6. Sources và Destinations

6.1. Sources — Dữ liệu đến từ đâu?

Firehose nhận data từ nhiều nguồn:

6.2. Destinations — Dữ liệu đi đến đâu?

AWS Destinations:

3rd-party Destinations:

Custom Destinations:

6.3. S3 Backup

Bất kể destination chính là gì, Firehose luôn hỗ trợ S3 backup — lưu một bản sao dữ liệu gốc (trước khi transform) vào S3 bucket riêng. Có ba tuỳ chọn:

Với destination là S3, khi enable All or Failed data, Firehose sẽ ghi cả raw data (dữ liệu gốc, trước transform) lẫn transformed data vào S3 — nhưng ở các prefix khác nhau. Điều này hữu ích cho audit trail và debugging khi transformation gặp lỗi.


7. Firehose Pipeline — Under the Hood

Bây giờ hãy đi sâu vào bên trong Firehose — dữ liệu đi qua những bước nào từ lúc nhận đến lúc ghi xuống storage.

7.1. Nhận và gom records

Firehose nhận data thông qua hai API:

Khi nhận data từ Kinesis Data Streams, Firehose tự động poll từ tất cả shards của stream và gom records lại. Bạn không cần quản lý quá trình này.

7.2. Lambda Transformation

Sau khi nhận records, Firehose có thể gọi một AWS Lambda function để transform data trước khi ghi xuống storage. Đây là bước tuỳ chọn — bạn chỉ cần cấu hình khi muốn biến đổi dữ liệu.

Cách hoạt động:

Firehose gom records vào một buffer nhỏ (configurable, 1–3 MB hoặc 60 giây — tuỳ điều kiện nào đến trước), rồi gửi batch đó sang Lambda function. Lambda nhận batch records, xử lý từng record, và trả về batch kết quả. Mỗi record trong kết quả được đánh dấu một trong ba trạng thái:

StatusÝ nghĩa
OkRecord đã được transform thành công → tiếp tục pipeline
DroppedRecord bị loại bỏ có chủ đích (ví dụ: filter invalid data) → không ghi
ProcessingFailedLambda không thể xử lý record → record được ghi vào S3 error bucket

Use cases phổ biến:

Nếu Lambda function bị lỗi hoặc timeout, Firehose sẽ retry tối đa 3 lần. Nếu vẫn thất bại, records bị đánh dấu ProcessingFailed và được ghi vào S3 error bucket — đảm bảo không mất dữ liệu.

7.3. Format Conversion — AWS Glue Data Catalog

Sau bước Lambda transformation (hoặc ngay sau khi nhận records nếu không có Lambda), Firehose có thể convert format dữ liệu từ JSON/CSV sang Apache Parquet hoặc Apache ORC — hai columnar storage formats (định dạng lưu trữ theo cột) được sử dụng rộng rãi trong analytics.

Tại sao cần columnar format?

Các định dạng truyền thống như JSON hay CSV lưu dữ liệu theo hàng — mỗi record lưu tất cả columns cạnh nhau. Khi query analytics chỉ cần 3 trong 50 columns, hệ thống vẫn phải đọc toàn bộ 50 columns của mỗi record rồi bỏ 47 columns không cần.

Columnar format lưu dữ liệu theo cột — tất cả giá trị của cùng một column được gom lại cạnh nhau trên disk. Khi query chỉ cần 3 columns, hệ thống chỉ đọc đúng 3 columns đó, bỏ qua hoàn toàn 47 columns còn lại. Kết quả:

Cách Firehose thực hiện conversion:

Firehose sử dụng schema từ AWS Glue Data Catalog — một dịch vụ metadata store tập trung của AWS, nơi bạn định nghĩa cấu trúc dữ liệu (tên column, kiểu dữ liệu). Firehose đọc schema này để biết cách deserialize JSON đầu vào (phân tích JSON thành các trường dữ liệu) rồi serialize lại thành Parquet/ORC (đóng gói dữ liệu theo định dạng cột).

Lưu ý: Format conversion yêu cầu dữ liệu đầu vào phải có cấu trúc đồng nhất — tất cả records phải khớp với schema trong Glue Catalog. Nếu record có field không đúng kiểu hoặc thiếu field required, record đó sẽ bị lỗi conversion.

7.4. Buffering — Tại sao phải buffer trước khi ghi?

Đây là phần quan trọng nhất để hiểu tại sao Firehose là near real-time chứ không phải real-time.

Thay vì ghi từng record riêng lẻ xuống storage ngay khi nhận được, Firehose gom nhiều records lại thành một batch lớn rồi mới ghi một lần. Quá trình gom này gọi là buffering, và bạn có thể cấu hình theo hai tiêu chí:

Firehose flush khi một trong hai điều kiện đến trước. Ví dụ: buffer size = 64 MB, buffer interval = 300 giây. Nếu sau 120 giây mà buffer đã đầy 64 MB → flush ngay. Nếu sau 300 giây mà buffer mới chỉ có 10 MB → vẫn flush.

Tại sao phải buffer thay vì ghi ngay từng record?

Có bốn lý do kỹ thuật:

1. Giảm chi phí API calls. S3 tính phí $0.005 per 1,000 PUT requests. Nếu bạn nhận 1 triệu records mỗi giờ và ghi từng record một, đó là 1,000,000 PUT requests = $5/giờ = $3,600/tháng. Nếu buffer gom 1,000 records thành 1 file, đó chỉ còn 1,000 PUT requests = $0.005/giờ = $3.6/tháng. Tiết kiệm ~99.9% chi phí S3 PUT.

2. Nén hiệu quả hơn. Thuật toán nén (compression) hoạt động bằng cách tìm patterns lặp lại trong dữ liệu. File càng lớn, càng nhiều data để tìm pattern, tỉ lệ nén càng cao. Một file 64 MB nén bằng Gzip có thể giảm xuống 10 MB (6:1). Nhưng 64,000 file nhỏ 1 KB mỗi file? Mỗi file quá nhỏ để thuật toán tìm được pattern → tỉ lệ nén gần như không đáng kể.

3. Tránh “small file problem”. Khi bạn chạy analytics query trên S3 bằng Amazon Athena hoặc Apache Spark, mỗi file S3 cần được mở, đọc header, parse, rồi đóng. 1,000 file lớn (mỗi file 64 MB) xử lý nhanh hơn rất nhiều so với 1,000,000 file nhỏ (mỗi file 64 KB) — vì overhead mở/đóng file chiếm phần lớn thời gian xử lý khi file quá nhỏ. Đây là vấn đề nổi tiếng trong big data gọi là small file problem.

4. Columnar format cần batch lớn. Apache Parquet và ORC lưu dữ liệu theo row group — một nhóm hàng được nén và lưu cùng nhau. Row group hiệu quả nhất khi có hàng nghìn đến hàng triệu rows. Nếu chỉ có 1 row per file, overhead metadata của Parquet có thể lớn hơn chính dữ liệu, và ưu thế nén columnar gần như mất hoàn toàn.

Buffer configUse caseDelivery latencyChi phíFile size
1 MB / 60sNear real-time monitoring, alerting~60 giâyCao hơnNhỏ
64 MB / 300sGeneral analytics pipeline~5 phútTrung bìnhTrung bình
128 MB / 900sCost-optimized data lake, batch analytics~15 phútThấp nhấtLớn nhất

Best practice: với S3 destination cho analytics, dùng buffer size 64–128 MB để tối ưu chi phí và query performance. Chỉ giảm xuống 1 MB khi cần near real-time delivery (ví dụ: monitoring dashboard cần data trong vòng 1 phút).

7.5. Ghi xuống Destination

Sau khi buffer đầy (hoặc hết thời gian), Firehose ghi batch data xuống destination:

Khi delivery thất bại sau tất cả retries, records được ghi vào S3 error bucket — đảm bảo không bao giờ mất dữ liệu, kể cả khi destination không khả dụng.

7.6. S3 Backup — Lưu giữ dữ liệu gốc

Ngoài S3 error bucket cho failed records, Firehose còn hỗ trợ source record backup — lưu bản sao dữ liệu gốc (trước khi Lambda transform và format conversion) vào một S3 bucket riêng biệt.

Tại sao cần? Ba lý do:


8. Lưu ý khi sử dụng Firehose


9. Kinesis Data Streams vs Data Firehose

Hai service này bổ trợ cho nhau, không phải cạnh tranh. Bảng so sánh dưới đây giúp phân biệt rõ vai trò của từng service:

Tiêu chíKinesis Data StreamsAmazon Data Firehose
Vai trò chínhReal-time data ingestion & processingManaged data delivery to storage
LatencyReal-time (~70–200ms)Near real-time (~60s+)
Quản lý capacityManual (Provisioned) hoặc auto (On-Demand)Hoàn toàn tự động
Data storageCó (24 giờ – 365 ngày)Không
Replay capabilityKhông
Consumer codeCần viết (KCL, Lambda, SDK)Không cần
Data transformationConsumer tự xử lýTích hợp Lambda + Glue format conversion
DestinationsBất kỳ (tuỳ consumer)S3, Redshift, OpenSearch, 3rd party, HTTP
ScalingThêm shard (Provisioned) hoặc autoHoàn toàn tự động, không cần cấu hình
PricingPer shard-hour hoặc per GB (On-Demand)Per GB ingested

Pattern phổ biến nhất: Kết hợp cả hai — Kinesis Data Streams → Data Firehose → S3. KDS nhận data real-time, cho phép multiple consumers xử lý (analytics real-time, alerting). Đồng thời, Firehose consume từ cùng KDS stream, tự động transform, convert format, buffer, và ghi xuống S3 data lake cho batch analytics. Bạn được cả hai thế giới: real-time processing lẫn automated delivery.


Tổng kết

Kinesis Data Streams và Data Firehose giải quyết hai bài toán khác nhau trong pipeline xử lý streaming data:

Nếu bạn đến từ thế giới Kafka, Kinesis Data Streams sẽ rất quen thuộc — và Data Firehose chính là phần mà trong Kafka bạn phải tự xây: consumer code, batching logic, format conversion, S3 writer. AWS đã đóng gói tất cả thành một managed service.

Liên quan