Tips on various topic learned when troubleshooting issues

Record Aggregation

The record aggregation mechanism is enabled by default, which means that multiple records could be aggregated into one as stated before, to make more efficient use of available bandwidth and reduce cost.

In our architecture, in order to be resilient, we had the requirement to consume exactly one record for each read operation, so we had to write and read exactly one record each time. For this specific reason we had to disable explicitly the aggregation at KPL side otherwise we would have inconsistent behavior: KCL would have one message containing more records when we wanted exactly one.

We eventually spotted the issue and realized it was due to aggregation by inspecting the putRecords into the stream using the AWS CLI (https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html).

Checkpoint mechanism

For each Amazon Kinesis Data Stream application, the KCL uses a unique Amazon DynamoDB table to keep track of the application's state. The KCL uses the application name, as tag, in order to tag and associate the DynamoDB table created, for this reason each application name must be unique. By default, the KCL creates the table with a provisioned throughput of 10 reads per second and 10 writes per second.

Each row in the DynamoDB table represents a shard that is being processed by your application. In addition to the shard ID, each row also includes checkpoint, checkpointSubSequenceNumber, leaseCounter, leaseKey, leaseOwner, ownerSwitchesSinceCheckpoint and parentShardId. The most relevant are the checkpoint and leaseKey which respectively give the position related to the last message consumed and the shard id of the related shard processor.

KCL checkpoint mechanism in detail:

  • at startup KCL set as default value "LATEST", if no checkpoint available in the KCL table, otherwise it will use the available checkpoint id. There is the possibility to override the default value using one of the following: AT_TIMESTAMP, TRIM_HORIZON, AT/AFTER_SEQUENCE_NUMBER or LATEST.
  • on each message consumed by KCL, you need to explicitly commit the checkpoint. It is necessary to track the last message consumed and keep the application state consistent.

Possible checkpoint values:

AT_TIMESTAMP: the estimate time of the event put into the stream. You will use this option if you want to find specific events to process based on their timestamp.

TRIM_HORIZON: the newest events in the stream, and ignore all the past events. You will use this option if you start a new application that you want to process in teal time immediately.

LATEST: the oldest events that are still in the stream shards before they are automatically trimmed (default 1 day, but can be extended up to 7 days).

AT/AFTER_SEQUENCE_NUMBER: the sequence number is usually the checkpoint that you are keeping while you are processing the events. These checkpoints are allowing you to reliably process the events, even in cases of reader failure or when you want to update its version and continue processing all the events and not lose any of them. The difference between AT/AFTER is based on the time of your checkpoint, before or after you processed the events successfully.

Resharding a Kinesi Data Stream

Resharding allows to increase or decrease the number of shards in a stream in order to adapt to changes in the rate of data flowing through the stream.
The KCL tracks the shards in the stream using an Amazon DynamoDB table. When new shards are created as a result of the resharding, the KCL discovers the new shards and populates new rows in the table.

What happen during the resharding process?

1) KPL starts to produce new messages on the new shards

2) KCL will continue to consume from the oldest shards until no more messages available and will tag those as "SHARD_ENDED". After that, KCL will start to consume from the new shards: this assures to preserve the order in which the data has arrived on the stream.

This is the only way to scale the kinesis stream throughput.

TECH NOTE: Potential side effect in case of increasing shards: given that, by design, the table provision of 10 RCU and WCU beware that couldn't be enough to handle the increased throughput. Therefore consider to increase those values properly.

Attachments

  • Original document
  • Permalink

Disclaimer

Expedia Group Inc. published this content on 12 October 2021 and is solely responsible for the information contained therein. Distributed by Public, unedited and unaltered, on 12 October 2021 13:31:10 UTC.