Using DynamoDB Streams To Update Denormalized Data
Why DynamoDB streams are the perfect use case for updating denormalized data.
One of the biggest concerns of using NoSQL (or DynamoDB) is normalizing vs denormalizing data.
NoSQL’s superpower is pre-joining data so we gain an unfair exponential advantage over SQL in terms of latency.
Instead of joining 4 or 5 tables of related data, I can pre-join this data in DynamoDB and query it all together in one fetch with 0 added latency.
So the main concern is when should you keep the data normalized vs denormalized?
Generally, immutable data should always be denormalized.
Immutable data is data that never (or rarely) changes, such as:
- name
- email
- phone
- product ID
- product ISBN
- etc
You can manage the data in multiple places in your database because it won’t change.
On the other hand, data that changes often should usually be normalized:
- prices
- dates
- user’s profile image
When you come across normalized data that needs to be updated in multiple different places in your database, how should you handle that?
Imagine an e-learning application that stores courses and instructors on a database table.
A course will usually have some denormalized information about the instructor teaching it.
Now imagine that an instructor changes their name (rare but can happen).
The instructor item itself could be modified without issues. But you would end up with several course items that are still storing an outdated instructor name.
Does DynamoDB provide a strategy for handling this issue?
Yes — and that’s a great use case for DynamoDB streams.
What Are DynamoDB Streams?
DynamoDB streams capture a time-ordered sequence of changes such as inserts, updates and deletes, made to items in a DynamoDB table.
This allows you to track and respond to these changes in real-time.
How this works under the hood is DynamoDB stores these change events in memory for up to 24 hours and makes them accessible to you via a stream.
You can then consume these streams using AWS Lambda or other AWS services to trigger actions like updating any other data in DynamoDB or even process external actions.
Demo: Updating Denormalized Data With Streams
In this demo, we’re going to look at how we can use a DynamoDB stream to update some data on our database.
We’ll create an instructor item, and some “courses” items.
We’ll then run an update command on the user’s name and have the denormalized name in the other course items update to provide a synchronized data behavior flow.
Let’s get started.
In the AWS console, let’s head to the DynamoDB service.
I’ve created two tables:
An instructor table to store instructor items
A course table to store course items
The instructors table contains 3 items, each with an instructorID and some decorator attributes (like name, email and title).
The courses table contains 4 items, each with a courseID and some decorator attributes (like instructorID, instructorName, length, level and course name).
Now, let’s write some code to update the instructor with the inst#101 ID.
I’ll use AWS Lambda for this code.
In AWS Lambda, create a new Node JS function (with DynamoDB write permissions) and write the following code:
import { DynamoDBClient, UpdateItemCommand } from "@aws-sdk/client-dynamodb";
const dynamoDbClient = new DynamoDBClient({ region: "us-east-1" });
export const handler = async (event) => {
const { instructorID, newInstructorName } = event; //use JSON.parse(event.body) instead of event, when calling from frontend)
const updateParams = {
TableName: "instructors",
Key: {
instructorID: { S: instructorID },
},
UpdateExpression: "SET instructorName = :newName",
ExpressionAttributeValues: {
":newName": { S: newInstructorName },
},
ReturnValues: "UPDATED_NEW",
};
try {
const data = await dynamoDbClient.send(new UpdateItemCommand(updateParams));
console.log(data);
} catch (error) {
console.error("Error updating instructor:", error);
}
};
The code above is straightforward: we accept two input parameters, namely an instructorID and a newInstructorName.
We build the update params with the table name, providing the primary key and the updateExpression — which is changing the instructorName value.
Save and deploy the function.
Before we run this, we need to attach a DynamoDB stream to Lambda.
Here’s how we can do this.
In this function page, near the top add a trigger.
On the new page, from the Trigger configuration, add DynamoDB for the source.
Add the instructors table as the DynamoDB table.
Add the trigger.
Next, we’ll write the code to capture DynamoDB update streams.
Create a new Lambda function called “update-instructor-stream” and write the following code:
import { DynamoDBClient, ScanCommand, UpdateItemCommand } from "@aws-sdk/client-dynamodb";
const dynamoDbClient = new DynamoDBClient({ region: "us-east-1" });
export const handler = async (event) => {
try {
for (const record of event.Records) {
if (record.eventName === 'MODIFY') {
const newImage = record.dynamodb.NewImage;
const oldImage = record.dynamodb.OldImage;
if (newImage.instructorName.S !== oldImage.instructorName.S) {
const instructorID = newImage.instructorID.S;
const updatedInstructorName = newImage.instructorName.S;
const scanParams = {
TableName: "courses",
};
const coursesData = await dynamoDbClient.send(new ScanCommand(scanParams));
const matchingCourses = coursesData.Items.filter((course) => course.instructorID.S === instructorID);
const updatePromises = matchingCourses.map(async (course) => {
const updateParams = {
TableName: "courses",
Key: {
courseID: course.courseID,
},
UpdateExpression: "SET instructorName = :newName",
ExpressionAttributeValues: {
":newName": { S: updatedInstructorName }
}
};
await dynamoDbClient.send(new UpdateItemCommand(updateParams));
});
await Promise.all(updatePromises);
}
}
}
} catch (error) {
console.error("Error processing DynamoDB stream:", error);
throw new Error("Failed to process the DynamoDB stream event.");
}
};
The code above captures a DynamoDB update stream event.
It checks if the instructorName of an item in the instructors table was changed.
If it is changed it will scan the courses table for all courses that are taught by the given instructor (with a matching instructorID) and update the instructorName attribute accordingly.
Let’s save and deploy this function.
We can now return to the first function (“update-instructor”) we wrote earlier.
Let’s run a test on this function and add a new name for instructorName. Here’s the JSON test data you can use:
{
"instructorID": "inst#101",
"newInstructorName": "John Dole"
}
Run the test.
You should now see the courses items in DynamoDB updated with the new instructorName value.
Conclusion
DynamoDB offers a powerful solution for managing denormalized data through its streams features.
By using DynamoDB streams, we can efficiently handle updates to normalized data, ensuring that any changes are propagated across related items in real-time.
This keeps our data synchronized and consistent without the complexity of manually updating the data yourself.
👋 My name is Uriel Bitton and I’m committed to helping you master Serverless, Cloud Computing, and AWS.
🚀 If you want to learn how to build serverless, scalable, and resilient applications, you can also follow me on Linkedin for valuable daily posts.
Thanks for reading and see you in the next one!