Borislav Hadzhiev
Reading timeยท7 min
Photo from Unsplash
We are going to create an SQS queue that receives messages from an SNS topic. Once our SQS queue receives a message, a lambda function is triggered.
If we fail to process any of the messages, they get delivered to a dead letter queue. The dead letter queue gets polled by another lambda function.
Let's start by creating an SQS queue and subscribing it to an SNS topic.
import * as sns from 'aws-cdk-lib/aws-sns'; import * as subs from 'aws-cdk-lib/aws-sns-subscriptions'; import * as sqs from 'aws-cdk-lib/aws-sqs'; import * as cdk from 'aws-cdk-lib'; export class CdkStarterStack extends cdk.Stack { constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { super(scope, id, props); // ๐ create queue const queue = new sqs.Queue(this, 'sqs-queue'); // ๐ create sns topic const topic = new sns.Topic(this, 'sns-topic'); // ๐ subscribe queue to topic topic.addSubscription(new subs.SqsSubscription(queue)); new cdk.CfnOutput(this, 'snsTopicArn', { value: topic.topicArn, description: 'The arn of the SNS topic', }); } }
Let's go over what we did in the code sample:
Output
that will allow us to write the SNS topic ARN to a file
on the local file system. We will use the SNS topic ARN to publish messages
via the AWS CLI.Let's deploy the stack:
npx aws-cdk deploy --outputs-file ./cdk-outputs.json
After a successful deployment, a cdk-outputs.json
file is created in the root
directory of your project. The file contains the SNS topic ARN, which we'll use
to test our application.
If we take a look at the SQS queue in the SQS management console, we can see that the queue has 0 messages available:
We configured the SQS queue as a target of an SNS topic. Let's publish a message to the SNS topic:
aws sns publish \ --subject "Just testing ๐" \ --message "Hello world ๐" \ --topic-arn "YOUR_SNS_TOPIC_ARN"
Take your SNS topic ARN from the cdk-outputs.json
file and run the command.
If we check the SQS management console after running the command, we can see that our SQS queue now has a message in it:
Next, we're going to see how we can handle the messages in our queue with a Lambda function.
In order to configure AWS Lambda to poll for SQS messages as they arrive, we have to:
Let's add a lambda function that polls our SQS queue for messages:
import * as lambda from 'aws-cdk-lib/aws-lambda'; import {SqsEventSource} from 'aws-cdk-lib/aws-lambda-event-sources'; import {NodejsFunction} from 'aws-cdk-lib/aws-lambda-nodejs'; import * as sns from 'aws-cdk-lib/aws-sns'; import * as subs from 'aws-cdk-lib/aws-sns-subscriptions'; import * as sqs from 'aws-cdk-lib/aws-sqs'; import * as cdk from 'aws-cdk-lib'; import * as path from 'path'; export class CdkStarterStack extends cdk.Stack { constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { super(scope, id, props); // ... rest // ๐ create lambda function const myLambda = new NodejsFunction(this, 'my-lambda', { memorySize: 1024, timeout: cdk.Duration.seconds(5), runtime: lambda.Runtime.NODEJS_16_X, handler: 'main', entry: path.join(__dirname, `/../src/my-lambda/index.ts`), }); // ๐ add sqs queue as event source for lambda myLambda.addEventSource( new SqsEventSource(queue, { batchSize: 10, }), ); } }
Let's go over what we did in the code sample:
We used the NodejsFunction construct to create a lambda function
We added an SQS event source to our lambda function. Now lambda will poll for messages as they arrive in the queue. By default the lambda polls for messages every 20 seconds.
The batchSize
prop we've passed to the SQS event source is the maximum
number of records the lambda will retrieve from the SQS queue.
Create the lambda function under src/my-lambda/index.ts
and add the following
code to it:
import {APIGatewayProxyResultV2, SQSEvent} from 'aws-lambda'; export async function main(event: SQSEvent): Promise<APIGatewayProxyResultV2> { const messages = event.Records.map(record => { const body = JSON.parse(record.body) as {Subject: string; Message: string}; return {subject: body.Subject, message: body.Message}; }); console.log('messages ๐', JSON.stringify(messages, null, 2)); return { body: JSON.stringify({messages}), statusCode: 200, }; }
We have a simple lambda function that prints the subject
and message
from
the SQS records in the event.
If you want to read more about how to write lambda functions in TypeScript with CDK, check out my other article - Write TypeScript Lambda functions in AWS CDK - Complete Guide.
Let's run the deploy
command:
npx aws-cdk deploy --outputs-file ./cdk-outputs.json
If we take a look at the triggers
section of our lambda function, we can see
that the SQS queue is the trigger:
Let's post another message to the SNS topic, which will trigger the queue, which gets polled by the lambda function:
aws sns publish \ --subject "Just testing ๐" \ --message "Hello world ๐" \ --topic-arn "YOUR_SNS_TOPIC_ARN"
If we open the Cloud Watch logs of our Lambda function, we can see that it processed the SQS records:
In order to configure a dead letter queue in AWS CDK, we are going to:
I'll post a code snippet of the complete example with a dead letter queue (including the parts we already covered):
import * as lambda from 'aws-cdk-lib/aws-lambda'; import {SqsEventSource} from 'aws-cdk-lib/aws-lambda-event-sources'; import {NodejsFunction} from 'aws-cdk-lib/aws-lambda-nodejs'; import * as sns from 'aws-cdk-lib/aws-sns'; import * as subs from 'aws-cdk-lib/aws-sns-subscriptions'; import * as sqs from 'aws-cdk-lib/aws-sqs'; import * as cdk from 'aws-cdk-lib'; import * as path from 'path'; export class CdkStarterStack extends cdk.Stack { constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { super(scope, id, props); // ๐ create DLQ lambda function const dlqLambda = new NodejsFunction(this, 'dlq-lambda', { memorySize: 1024, timeout: cdk.Duration.seconds(5), runtime: lambda.Runtime.NODEJS_16_X, handler: 'main', entry: path.join(__dirname, `/../src/dlq-lambda/index.ts`), }); // ๐ create dead letter queue const deadLetterQueue = new sqs.Queue(this, 'dead-letter-queue', { retentionPeriod: cdk.Duration.minutes(30), }); // ๐ add dead letter queue as event source for dlq lambda function dlqLambda.addEventSource(new SqsEventSource(deadLetterQueue)); // ๐ create queue const queue = new sqs.Queue(this, 'sqs-queue', { // ๐ set up DLQ deadLetterQueue: { queue: deadLetterQueue, maxReceiveCount: 1, }, }); // ๐ create sns topic const topic = new sns.Topic(this, 'sns-topic'); // ๐ subscribe queue to topic topic.addSubscription(new subs.SqsSubscription(queue)); // ๐ create lambda function const myLambda = new NodejsFunction(this, 'my-lambda', { memorySize: 1024, timeout: cdk.Duration.seconds(5), runtime: lambda.Runtime.NODEJS_16_X, handler: 'main', entry: path.join(__dirname, `/../src/my-lambda/index.ts`), }); // ๐ add sqs queue as event source for Lambda myLambda.addEventSource( new SqsEventSource(queue, { batchSize: 10, }), ); new cdk.CfnOutput(this, 'snsTopicArn', { value: topic.topicArn, description: 'The arn of the SNS topic', }); } }
Let's go over what we did in the code sample:
deadLetterQueue
prop on our first SQS queue. In case a
message couldn't be processed after maxReceiveCount
retries, it will be
sent to the dead letter queue, which will trigger our DLQ lambdaCreate the dlq lambda at src/dlq-lambda/index.ts
and add the following code to
it:
import {APIGatewayProxyResultV2, SQSEvent} from 'aws-lambda'; export async function main(event: SQSEvent): Promise<APIGatewayProxyResultV2> { // console.log('event ๐', JSON.stringify(event, null, 2)); const messages = event.Records.map(record => { const body = JSON.parse(record.body) as {Subject: string; Message: string}; return {subject: body.Subject, message: body.Message}; }); console.log('messages ๐', JSON.stringify(messages, null, 2)); return { body: JSON.stringify({messages}), statusCode: 2000, }; }
In order to test our dead letter queue configuration, we have to update our
first lambda at src/my-lambda/index.ts
to throw an error. Update the code at
src/my-lambda/index.ts
to look like:
import {APIGatewayProxyResultV2, SQSEvent} from 'aws-lambda'; export async function main(event: SQSEvent): Promise<APIGatewayProxyResultV2> { throw new Error('throwing an Error ๐ฅ'); }
Let's run the deploy
command:
npx aws-cdk deploy --outputs-file ./cdk-outputs.json
After a successful deployment if we look at the Asynchronous invocation
configuration for our first lambda function, we can see that it now has a
dead-letter queue attached to it.
The trigger for the DLQ lambda is the SQS dead letter queue:
Now that our lambda function throws an error, let's publish another SNS message and test the SQS dead letter queue:
aws sns publish \ --subject "Just testing ๐" \ --message "Hello world ๐" \ --topic-arn "YOUR_SNS_TOPIC_ARN"
If we look at the Cloud Watch logs of the first lambda we can see that it threw an error:
In a couple of seconds the message will be sent to the dead letter queue and the DLQ lambda will be invoked. The DQL lambda will simply print the record:
At this point we know we've successfully set up a dead letter queue.
To delete the resources we've provisioned, issue the destroy
command:
npx aws-cdk destroy