SQS Example in AWS CDK [SNS, Lambda, DLQ] - Complete Guide

avatar

Borislav Hadzhiev

Thu Apr 29 20217 min read

banner

Photo by Jeremy Bishop

Table of Contents #

  1. Create an SQS queue in AWS CDK
  2. SQS Queue Event Source for a Lambda Function in AWS CDK
  3. Creating an SQS Dead Letter Queue in AWS CDK

Create an SQS queue in AWS CDK #

In this article, we are going to create an SQS queue that receives messages from an SNS topic. Once our SQS queue receives a message, a lambda function is triggered.

If we fail to process any of the messages, they get delivered to a dead letter queue. The dead letter queue gets polled by another lambda function.

The code for this article is available on GitHub

Let's start by creating an SQS queue and subscribing it to an SNS topic.

lib/cdk-starter-stack.ts
import * as sns from '@aws-cdk/aws-sns'; import * as subs from '@aws-cdk/aws-sns-subscriptions'; import * as sqs from '@aws-cdk/aws-sqs'; import * as cdk from '@aws-cdk/core'; export class CdkStarterStack extends cdk.Stack { constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { super(scope, id, props); // ๐Ÿ‘‡ create queue const queue = new sqs.Queue(this, 'sqs-queue'); // ๐Ÿ‘‡ create sns topic const topic = new sns.Topic(this, 'sns-topic'); // ๐Ÿ‘‡ subscribe queue to topic topic.addSubscription(new subs.SqsSubscription(queue)); new cdk.CfnOutput(this, 'snsTopicArn', { value: topic.topicArn, description: 'The arn of the SNS topic', }); } }

Let's go over what we did in the code snippet.

  1. we created an SQS queue by instantiating the Queue class.
  2. we created an SNS topic, by instantiating the Topic class.
  3. we subscribed the SQS queue to the SNS topic. Now our SQS queue is a subscription target for messages we send via the SNS topic.
  4. we added an Output, that will allow us to write the SNS topic ARN to a file on the local file system. We will use the SNS topic ARN to publish messages via the AWS CLI.

Let's deploy the stack:

shell
npx cdk deploy \ --outputs-file ./cdk-outputs.json

After a successful deployment a cdk-outputs.json file is created in the root directory of your project. The file includes the SNS topic ARN, which we'll use to test our application.

If we take a look at the sqs queue in the SQS management console, we can see that the queue has 0 messages available:

sqs queue no messages

We configured the SQS queue as a target of an SNS topic. Let's publish a message to the SNS topic:

shell
aws sns publish \ --subject "Just testing ๐Ÿš€" \ --message "Hello world ๐ŸŠ" \ --topic-arn "YOUR_SNS_TOPIC_ARN"

Take your SNS topic ARN from the cdk-outputs.json file and execute the command.

If we check the SQS management console after executing the command, we can see that our SQS queue now has a message in it:

sqs queue one message

Next, we're going to see how we can handle the messages in our queue with a Lambda function.

SQS Queue Event Source for a Lambda Function in AWS CDK #

In order to configure AWS Lambda to poll for SQS messages as they arrive, we have to:

  • create a Lambda function
  • add SQS as an event source for the Lambda function
The code for this article is available on GitHub

Let's add a lambda function, that polls our SQS queue for messages:

lib/cdk-starter-stack.ts
import * as lambda from '@aws-cdk/aws-lambda'; import {SqsEventSource} from '@aws-cdk/aws-lambda-event-sources'; import {NodejsFunction} from '@aws-cdk/aws-lambda-nodejs'; import * as sns from '@aws-cdk/aws-sns'; import * as subs from '@aws-cdk/aws-sns-subscriptions'; import * as sqs from '@aws-cdk/aws-sqs'; import * as cdk from '@aws-cdk/core'; import * as path from 'path'; export class CdkStarterStack extends cdk.Stack { constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { super(scope, id, props); // ... rest // ๐Ÿ‘‡ create lambda function const myLambda = new NodejsFunction(this, 'my-lambda', { memorySize: 1024, timeout: cdk.Duration.seconds(5), runtime: lambda.Runtime.NODEJS_14_X, handler: 'main', entry: path.join(__dirname, `/../src/my-lambda/index.ts`), }); // ๐Ÿ‘‡ add sqs queue as event source for lambda myLambda.addEventSource( new SqsEventSource(queue, { batchSize: 10, }), ); } }

Let's go over what we did in the code snippet.

  1. we used the NodejsFunction construct to create a lambda function

  2. we added an SQS event source to our lambda function. Now lambda will poll for messages as they arrive in the queue. By default the lambda polls for messages every 20 seconds.

    The batchSize prop we've passed to the SQS event source is the maximum number of records the lambda will retrieve from the SQS queue.

Create the lambda function under src/my-lambda/index.ts and add the following code to it:

src/my-lambda/index.ts
import {APIGatewayProxyResultV2, SQSEvent} from 'aws-lambda'; export async function main(event: SQSEvent): Promise<APIGatewayProxyResultV2> { const messages = event.Records.map(record => { const body = JSON.parse(record.body) as {Subject: string; Message: string}; return {subject: body.Subject, message: body.Message}; }); console.log('messages ๐Ÿ‘‰', JSON.stringify(messages, null, 2)); return { body: JSON.stringify({messages}), statusCode: 200, }; }

We have a simple lambda function that prints the subject and message from the sqs records in the event.

If you want to read more about how to write lambda functions in typescript with CDK, check out my other article - Write TypeScript Lambda functions in AWS CDK - Complete Guide.

Let's execute a deployment:

shell
npx cdk deploy \ --outputs-file ./cdk-outputs.json

If we take a look at the triggers section of our lambda function, we can see that the SQS queue is the trigger:

sqs trigger lambda

Let's post another message to the SNS topic, which will trigger the queue, which gets polled by the lambda function:

shell
aws sns publish \ --subject "Just testing ๐Ÿš€" \ --message "Hello world ๐ŸŠ" \ --topic-arn "YOUR_SNS_TOPIC_ARN"

If we open the Cloud Watch logs of our Lambda function, we can see that it processed the SQS records:

lambda processed sqs records

Creating an SQS Dead Letter Queue in AWS CDK #

In order to configure a dead letter queue in AWS CDK, we are going to:

  • create a second SQS queue, that will be the target for messages that weren't processed successfully
  • create a second lambda function, that will handle records from the dead letter queue
  • add the dead letter queue as an event source for the lambda function. This way the lambda will poll for records in the dead letter queue and process them
The code for this article is available on GitHub

I'll post a code snippet of the complete example with dead letter queue (including the parts we already covered):

lib/cdk-starter-stack.ts
import * as lambda from '@aws-cdk/aws-lambda'; import {SqsEventSource} from '@aws-cdk/aws-lambda-event-sources'; import {NodejsFunction} from '@aws-cdk/aws-lambda-nodejs'; import * as sns from '@aws-cdk/aws-sns'; import * as subs from '@aws-cdk/aws-sns-subscriptions'; import * as sqs from '@aws-cdk/aws-sqs'; import * as cdk from '@aws-cdk/core'; import * as path from 'path'; export class CdkStarterStack extends cdk.Stack { constructor(scope: cdk.App, id: string, props?: cdk.StackProps) { super(scope, id, props); // ๐Ÿ‘‡ create DLQ lambda function const dlqLambda = new NodejsFunction(this, 'dlq-lambda', { memorySize: 1024, timeout: cdk.Duration.seconds(5), runtime: lambda.Runtime.NODEJS_14_X, handler: 'main', entry: path.join(__dirname, `/../src/dlq-lambda/index.ts`), }); // ๐Ÿ‘‡ create dead letter queue const deadLetterQueue = new sqs.Queue(this, 'dead-letter-queue', { retentionPeriod: cdk.Duration.minutes(30), }); // ๐Ÿ‘‡ add dead letter queue as event source for dlq lambda function dlqLambda.addEventSource(new SqsEventSource(deadLetterQueue)); // ๐Ÿ‘‡ create queue const queue = new sqs.Queue(this, 'sqs-queue', { // ๐Ÿ‘‡ set up DLQ deadLetterQueue: { queue: deadLetterQueue, maxReceiveCount: 1, }, }); // ๐Ÿ‘‡ create sns topic const topic = new sns.Topic(this, 'sns-topic'); // ๐Ÿ‘‡ subscribe queue to topic topic.addSubscription(new subs.SqsSubscription(queue)); // ๐Ÿ‘‡ create lambda function const myLambda = new NodejsFunction(this, 'my-lambda', { memorySize: 1024, timeout: cdk.Duration.seconds(5), runtime: lambda.Runtime.NODEJS_14_X, handler: 'main', entry: path.join(__dirname, `/../src/my-lambda/index.ts`), }); // ๐Ÿ‘‡ add sqs queue as event source for Lambda myLambda.addEventSource( new SqsEventSource(queue, { batchSize: 10, }), ); new cdk.CfnOutput(this, 'snsTopicArn', { value: topic.topicArn, description: 'The arn of the SNS topic', }); } }

Let's go over what we did in the code snippet:

  1. we created a second lambda function, which will poll for records in the dead letter queue
  2. we created a second SQS queue, which we've set up as a dead letter queue
  3. we've passed the deadLetterQueue prop on our first SQS queue. In case a message couldn't be processed after maxReceiveCount retries it will be send to the dead letter queue, which will trigger our DLQ lambda
The code for our dlq lambda will be the same as the code for our other lambda, however in a more real world application, you would most likely do some logging and set up an alarm.

Create the dlq lambda at src/dlq-lambda/index.ts and add the following code to it:

src/dlq-lambda/index.ts
import {APIGatewayProxyResultV2, SQSEvent} from 'aws-lambda'; export async function main(event: SQSEvent): Promise<APIGatewayProxyResultV2> { // console.log('event ๐Ÿ‘‰', JSON.stringify(event, null, 2)); const messages = event.Records.map(record => { const body = JSON.parse(record.body) as {Subject: string; Message: string}; return {subject: body.Subject, message: body.Message}; }); console.log('messages ๐Ÿ‘‰', JSON.stringify(messages, null, 2)); return { body: JSON.stringify({messages}), statusCode: 2000, }; }

In order to test our dead letter queue configuration, we have to update our first lambda at src/my-lambda/index.ts to throw an error. Update the code at src/my-lambda/index.ts to look like:

src/my-lambda/index.ts
import {APIGatewayProxyResultV2, SQSEvent} from 'aws-lambda'; export async function main(event: SQSEvent): Promise<APIGatewayProxyResultV2> { throw new Error('throwing an Error ๐Ÿ’ฅ'); }

Let's execute a deployment:

shell
npx cdk deploy \ --outputs-file ./cdk-outputs.json

After a successful deployment if we look at the Asynchronous invocation configuration for our first lambda function, we can see that it now has a dead-letter queue attached to it.

lambda dead letter queue

The trigger for the DLQ lambda is the SQS dead letter queue:

dlq lambda trigger

Now that our lambda function throws an error, let's publish another SNS message and test the SQS dead letter queue:

shell
aws sns publish \ --subject "Just testing ๐Ÿš€" \ --message "Hello world ๐ŸŠ" \ --topic-arn "YOUR_SNS_TOPIC_ARN"

If we look at the Cloud Watch logs of the first lambda we can see that it threw an error:

first lambda error

In a couple of seconds the message will be sent to the dead letter queue and the DLQ lambda will be invoked. The DQL lambda will simply print the record:

dlq lambda message

At this point we know we've successfully set up a dead letter queue.

In this article we created an SNS topic, that publishes messages to an SQS queue. The SQS queue gets polled by a lambda function. In case we fail to process any of the messages, they get delivered to a dead letter queue, that gets polled by another lambda function.

Clean up #

To delete the resources we've provisioned, execute the destroy command:

shell
npx cdk destroy

Further Reading #

Add me on LinkedIn

I'm a Web Developer with TypeScript, React.js, Node.js and AWS experience.

Let's connect on LinkedIn

Join my newsletter

I'll send you 1 email a week with links to all of the articles I've written that week

Buy Me A Coffee