Last updated: Jan 26, 2024
Reading timeยท7 min
We are going to create an SQS queue that receives messages from an SNS topic. Once our SQS queue receives a message, a Lambda function is triggered.
If we fail to process any of the messages, they get delivered to a dead letter queue. The dead letter queue gets polled by another lambda function.
Let's start by creating an SQS queue and subscribing it to an SNS topic.
import * as sns from 'aws-cdk-lib/aws-sns'; import * as subs from 'aws-cdk-lib/aws-sns-subscriptions'; import * as sqs from 'aws-cdk-lib/aws-sqs'; import * as cdk from 'aws-cdk-lib'; export class CdkStarterStack extends cdk.Stack { constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { super(scope, id, props); // ๐ create queue const queue = new sqs.Queue(this, 'sqs-queue'); // ๐ create sns topic const topic = new sns.Topic(this, 'sns-topic'); // ๐ subscribe queue to topic topic.addSubscription(new subs.SqsSubscription(queue)); new cdk.CfnOutput(this, 'snsTopicArn', { value: topic.topicArn, description: 'The arn of the SNS topic', }); } }
Let's go over what we did in the code sample:
Let's deploy the stack:
npx aws-cdk deploy --outputs-file ./cdk-outputs.json
After a successful deployment, a cdk-outputs.json
file is created in the root
directory of your project.
The file contains the SNS topic ARN which we'll use to test our application.
If we take a look at the SQS queue in the SQS management console, we can see that the queue has 0 messages available:
We configured the SQS queue as a target of an SNS topic. Let's publish a message to the SNS topic.
aws sns publish \ --subject "Just testing ๐" \ --message "Hello world ๐" \ --topic-arn "YOUR_SNS_TOPIC_ARN"
Take your SNS topic ARN from the cdk-outputs.json
file and run the command.
If we check the SQS management console after running the command, we can see that our SQS queue now has a message in it:
Next, we're going to see how we can handle the messages in our queue with a Lambda function.
In order to configure AWS Lambda to poll for SQS messages as they arrive, we have to:
Let's add a lambda function that polls our SQS queue for messages:
import * as lambda from 'aws-cdk-lib/aws-lambda'; import {SqsEventSource} from 'aws-cdk-lib/aws-lambda-event-sources'; import {NodejsFunction} from 'aws-cdk-lib/aws-lambda-nodejs'; import * as sns from 'aws-cdk-lib/aws-sns'; import * as subs from 'aws-cdk-lib/aws-sns-subscriptions'; import * as sqs from 'aws-cdk-lib/aws-sqs'; import * as cdk from 'aws-cdk-lib'; import * as path from 'path'; export class CdkStarterStack extends cdk.Stack { constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { super(scope, id, props); // ... rest // ๐ create lambda function const myLambda = new NodejsFunction(this, 'my-lambda', { memorySize: 1024, timeout: cdk.Duration.seconds(5), runtime: lambda.Runtime.NODEJS_18_X, handler: 'main', entry: path.join(__dirname, `/../src/my-lambda/index.ts`), }); // ๐ add sqs queue as event source for lambda myLambda.addEventSource( new SqsEventSource(queue, { batchSize: 10, }), ); } }
Let's go over what we did in the code sample:
We used the NodejsFunction construct to create a lambda function.
We added an SQS event source to our lambda function. Now lambda will poll for messages as they arrive in the queue. By default the lambda polls for messages every 20 seconds.
The batchSize
prop we've passed to the SQS event source is the maximum
number of records the lambda will retrieve from the SQS queue.
Create the lambda function under src/my-lambda/index.ts
and add the following
code to it.
import {APIGatewayProxyResultV2, SQSEvent} from 'aws-lambda'; export async function main(event: SQSEvent): Promise<APIGatewayProxyResultV2> { const messages = event.Records.map(record => { const body = JSON.parse(record.body) as {Subject: string; Message: string}; return {subject: body.Subject, message: body.Message}; }); console.log('messages ๐', JSON.stringify(messages, null, 2)); return { body: JSON.stringify({messages}), statusCode: 200, }; }
We have a simple lambda function that prints the subject
and message
from
the SQS records in the event.
If you want to read more about how to write Lambda functions in TypeScript with CDK, check out my other article - Write TypeScript Lambda functions in AWS CDK - Complete Guide.
Let's run the deploy
command:
npx aws-cdk deploy --outputs-file ./cdk-outputs.json
If we take a look at the triggers
section of our lambda function, we can see
that the SQS queue is the trigger.
Let's post another message to the SNS topic, which will trigger the queue, which gets polled by the lambda function.
aws sns publish \ --subject "Just testing ๐" \ --message "Hello world ๐" \ --topic-arn "YOUR_SNS_TOPIC_ARN"
If we open the Cloud Watch logs of our Lambda function, we can see that it processed the SQS records.
In order to configure a dead letter queue in AWS CDK, we are going to:
I'll post a code snippet of the complete example with a dead letter queue (including the parts we already covered).
import * as lambda from 'aws-cdk-lib/aws-lambda'; import {SqsEventSource} from 'aws-cdk-lib/aws-lambda-event-sources'; import {NodejsFunction} from 'aws-cdk-lib/aws-lambda-nodejs'; import * as sns from 'aws-cdk-lib/aws-sns'; import * as subs from 'aws-cdk-lib/aws-sns-subscriptions'; import * as sqs from 'aws-cdk-lib/aws-sqs'; import * as cdk from 'aws-cdk-lib'; import * as path from 'path'; export class CdkStarterStack extends cdk.Stack { constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { super(scope, id, props); // ๐ create DLQ lambda function const dlqLambda = new NodejsFunction(this, 'dlq-lambda', { memorySize: 1024, timeout: cdk.Duration.seconds(5), runtime: lambda.Runtime.NODEJS_18_X, handler: 'main', entry: path.join(__dirname, `/../src/dlq-lambda/index.ts`), }); // ๐ Create a dead letter queue const deadLetterQueue = new sqs.Queue(this, 'dead-letter-queue', { retentionPeriod: cdk.Duration.minutes(30), }); // ๐ add dead letter queue as event source for dlq lambda function dlqLambda.addEventSource(new SqsEventSource(deadLetterQueue)); // ๐ create queue const queue = new sqs.Queue(this, 'sqs-queue', { // ๐ set up DLQ deadLetterQueue: { queue: deadLetterQueue, maxReceiveCount: 1, }, }); // ๐ create an SNS topic const topic = new sns.Topic(this, 'sns-topic'); // ๐ subscribe queue to topic topic.addSubscription(new subs.SqsSubscription(queue)); // ๐ create a Lambda function const myLambda = new NodejsFunction(this, 'my-lambda', { memorySize: 1024, timeout: cdk.Duration.seconds(5), runtime: lambda.Runtime.NODEJS_18_X, handler: 'main', entry: path.join(__dirname, `/../src/my-lambda/index.ts`), }); // ๐ add sqs queue as event source for Lambda myLambda.addEventSource( new SqsEventSource(queue, { batchSize: 10, }), ); new cdk.CfnOutput(this, 'snsTopicArn', { value: topic.topicArn, description: 'The arn of the SNS topic', }); } }
Let's go over what we did in the code sample:
deadLetterQueue
prop on our first SQS queue. In case a
message can't be processed after maxReceiveCount
retries, it will be
sent to the dead letter queue, which will trigger our DLQ lambdaThe code for our DLQ lambda will be the same as the code for our other Lambda, however, in a more real-world application, you would most likely do some logging and set up an alarm.
Create the dlq lambda at src/dlq-lambda/index.ts
and add the following code to
it:
import {APIGatewayProxyResultV2, SQSEvent} from 'aws-lambda'; export async function main(event: SQSEvent): Promise<APIGatewayProxyResultV2> { // console.log('event ๐', JSON.stringify(event, null, 2)); const messages = event.Records.map(record => { const body = JSON.parse(record.body) as {Subject: string; Message: string}; return {subject: body.Subject, message: body.Message}; }); console.log('messages ๐', JSON.stringify(messages, null, 2)); return { body: JSON.stringify({messages}), statusCode: 2000, }; }
In order to test our dead letter queue configuration, we have to update our
first lambda at src/my-lambda/index.ts
to throw an error. Update the code at
src/my-lambda/index.ts
to look like:
import {APIGatewayProxyResultV2, SQSEvent} from 'aws-lambda'; export async function main(event: SQSEvent): Promise<APIGatewayProxyResultV2> { throw new Error('throwing an Error ๐ฅ'); }
Let's run the deploy
command:
npx aws-cdk deploy --outputs-file ./cdk-outputs.json
After a successful deployment if we look at the Asynchronous invocation
configuration for our first Lambda function, we can see that it now has a
dead-letter queue attached to it.
The trigger for the DLQ lambda is the SQS dead letter queue:
Now that our lambda function throws an error, let's publish another SNS message and test the SQS dead letter queue.
aws sns publish \ --subject "Just testing ๐" \ --message "Hello world ๐" \ --topic-arn "YOUR_SNS_TOPIC_ARN"
If we look at the Cloud Watch logs of the first lambda we can see that it threw an error:
In a couple of seconds the message will be sent to the dead letter queue and the DLQ lambda will be invoked. The DQL lambda will simply print the record:
At this point, we know we've successfully set up a dead letter queue.
To delete the resources we've provisioned, issue the destroy
command.
npx aws-cdk destroy
You can learn more about the related topics by checking out the following tutorials: