DynamoDB Streams
4 minute read
Introduction
DynamoDB Streams captures data modification events in a DynamoDB table. The stream records are written to a DynamoDB stream, which is an ordered flow of information about changes to items in a table. DynamoDB Streams records data in near-real time, enabling you to develop workflows that process these streams and respond based on their contents.
LocalStack supports DynamoDB Streams, allowing you to create and manage streams in a local environment. The supported APIs are available on our DynamoDB Streams coverage page, which provides information on the extent of DynamoDB Streams integration with LocalStack.
Getting started
This guide is designed for users new to DynamoDB Streams and assumes basic knowledge of the AWS CLI and our awslocal
wrapper script.
Start your LocalStack container using your preferred method. We will demonstrate the following process using LocalStack:
- A user adds an entry to a DynamoDB table.
- A new stream record is generated in DynamoDB Streams when an entry is added.
- This stream record triggers a Lambda function.
- If the record indicates a new entry in the DynamoDB table, the Lambda function extracts the data.
Create a DynamoDB table
You can create a DynamoDB table named BarkTable
using the CreateTable
API.
Run the following command to create the table:
$ awslocal dynamodb create-table \
--table-name BarkTable \
--attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
--key-schema AttributeName=Username,KeyType=HASH AttributeName=Timestamp,KeyType=RANGE \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
--stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
The BarkTable
has a stream enabled which you can trigger by associating a Lambda function with the stream.
You can notice that in the LatestStreamArn
field of the response:
...
"LatestStreamArn": "arn:aws:dynamodb:000000000000:us-east-1:table/BarkTable/stream/timestamp
...
Create a Lambda function
You can now create a Lambda function (publishNewBark
) to process stream records from BarkTable
.
Create a new file named index.js
with the following code:
'use strict';
var AWS = require("aws-sdk");
exports.handler = (event, context, callback) => {
event.Records.forEach((record) => {
console.log('Stream record: ', JSON.stringify(record, null, 2));
if (record.eventName == 'INSERT') {
var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
var params = {
Subject: 'A new bark from ' + who,
Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
};
}
});
callback(null, `Successfully processed ${event.Records.length} records.`);
};
You can now create a Lambda function using the CreateFunction
API.
Run the following command to create the Lambda function:
$ zip index.zip index.js
$ awslocal lambda create-function \
--function-name publishNewBark \
--zip-file fileb://index.zip \
--role roleARN \
--handler index.handler \
--timeout 50 \
--runtime nodejs16.x \
--role arn:aws:iam::000000000000:role/lambda-role
Invoke the Lambda function
To test the Lambda function, you can invoke it using the Invoke
API.
Create a new file named payload.json
with the following content:
{
"Records": [
{
"eventID": "7de3041dd709b024af6f29e4fa13d34c",
"eventName": "INSERT",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"ApproximateCreationDateTime": 1479499740,
"Keys": {
"Timestamp": {
"S": "2016-11-18:12:09:36"
},
"Username": {
"S": "John Doe"
}
},
"NewImage": {
"Timestamp": {
"S": "2016-11-18:12:09:36"
},
"Message": {
"S": "This is a bark from the Woofer social network"
},
"Username": {
"S": "John Doe"
}
},
"SequenceNumber": "13021600000000001596893679",
"SizeBytes": 112,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:000000000000:us-east-1 ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
}
]
}
Run the following command to invoke the Lambda function:
$ awslocal lambda invoke \
--function-name publishNewBark \
--payload file://payload.json \
--cli-binary-format raw-in-base64-out output.txt
In the output.txt
file, you should see the following output:
"Successfully processed 1 records."
Add event source mapping
To add the DynamoDB stream as an event source for the Lambda function, you need the stream ARN.
You can get the stream ARN using the DescribeTable
API.
Run the following command to get the stream ARN:
awslocal dynamodb describe-table --table-name BarkTable
You can now create an event source mapping using the CreateEventSourceMapping
API.
Run the following command to create the event source mapping:
awslocal lambda create-event-source-mapping \
--function-name publishNewBark \
--event-source arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/2024-07-12T06:18:37.101 \
--batch-size 1 \
--starting-position TRIM_HORIZON
Make sure to replace the event-source
value with the stream ARN you obtained from the previous command.
You should see the following output:
{
"UUID": "7ae3426a-eda6-4c10-a596-100c59bd6787",
...
"EventSourceArn": "arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/2024-07-12T06:18:37.101",
"FunctionArn": "arn:aws:lambda:us-east-1:000000000000:function:publishNewBark",
...
"FunctionResponseTypes": []
}
You can now test the event source mapping by adding an item to the BarkTable
table using the PutItem
API.
Run the following command to add an item to the table:
$ awslocal dynamodb put-item \
--table-name BarkTable \
--item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
You can find Lambda function being triggered in the LocalStack logs.
Inspect the stream
You can list the streams using the ListStreams
API.
Run the following command to list the streams:
awslocal dynamodbstreams list-streams
The following output shows the list of streams:
{
"Streams": [
{
"StreamArn": "arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/2024-07-12T06:18:37.101",
"TableName": "BarkTable",
"StreamLabel": "2024-07-12T06:18:37.101"
}
]
}
You can also describe the stream using the DescribeStream
API.
Run the following command to describe the stream:
$ awslocal dynamodbstreams describe-stream --stream-arn arn:aws:dynamodb:us-east-1:000000000000:table/BarkTable/stream/2024-07-12T06:18:37.101
Replace the stream-arn
value with the stream ARN you obtained from the previous command.