You are currently viewing SQS Lambda Integration for Seamless Data Processing

SQS Lambda Integration for Seamless Data Processing

In the realm of modern technology, achieving efficient data processing has become critical. SQS Lambda integration has emerged as a powerful solution. In this blog post, we’ll delve into how Amazon Simple Queue Service (SQS) and AWS Lambda can be seamlessly integrated to revolutionize your data processing capabilities using the Serverless Framework and TypeScript.
By the end, you’ll have a comprehensive understanding of how this integration can enhance your application’s performance, scalability, and responsiveness.

What we’ll cover

Understanding Amazon SQS and AWS Lambda

Before we dive into the integration process, let’s understand the fundamentals of Amazon SQS and AWS Lambda.

Amazon Simple Queue Service (SQS)

Amazon SQS is a managed message queuing service designed to decouple components within an application. With SQS, components can communicate asynchronously, ensuring optimal scalability and fault tolerance. Messages are sent to a queue, acting as a buffer that prevents overloading the recipient.

AWS Lambda

AWS Lambda offers serverless compute power by executing functions in response to events. It allows you to run code without provisioning or managing servers, leading to streamlined development and resource utilization. If you want to know more about AWS Lambda and how to create a lamda function, you can read this guide on How to create a serverless function with nodejs on AWS Lambda

Benefits of Integrating SQS and Lambda

Now, let’s explore the benefits that SQS and Lambda Integration can bring to your application.

Enhanced Scalability with SQS Lambda Integration

Consider an e-commerce platform like “E-Shop Express”. During sales events, the order volume surges. By integrating SQS and Lambda, “E-Shop Express” can automatically scale its order processing. As orders flood in, Lambda functions are triggered, ensuring every order is efficiently handled without overwhelming the system.

Improved Fault Tolerance through SQS Lambda Integration

In the digital realm, disruptions are unwelcome guests. But with SQS and Lambda, applications like “E-Shop Express” can maintain fault tolerance. If a Lambda function fails, the message remains in the queue. Once the function is up and running, it processes the pending message, preventing any data loss.

Cost optimization enabled by SQS Lambda Integration

“Serverless” also means “cost-effective.” With AWS Lambda’s pay-as-you-go model and SQS’s resource-efficient queuing, applications maximize resource utilization. “E-Shop Express” pays only for compute time, resulting in a budget-friendly approach to handling varying workloads.

Real-time Responsiveness with SQS Lambda Integration

Imagine a customer placing an order on “E-Shop Express.” With SQS and Lambda, the order is processed promptly. Lambda functions swiftly handle the order, update inventory, and send confirmation emails, offering real-time updates to customers.

Best practices for SQS Lambda Integration

To make the most of SQS and Lambda integration, consider these best practices:

Batch Processing for Efficient SQS Lambda Integration

By batching messages, applications like “E-Shop Express” reduce Lambda invocation overhead. In TypeScript, Serverless Framework facilitates this with ease, allowing you to process multiple orders in a single Lambda function call.

Leveraging Dead Letter Queues in SQS Lambda Integration

In case of processing failures, dead letter queues provide a safety net. With TypeScript’s expressive code and Serverless Framework’s intuitive setup, “E-Shop Express” can capture problematic messages, analyze them, and make necessary improvements.

Effective Error Handling and Retries in SQS Lmabda Integration

With TypeScript’s strong typing and Serverless Framework’s error handling capabilities, applications can gracefully handle failures. Implement retry mechanisms, ensuring that temporary hiccups don’t hinder order processing.

Real World Example: E-Commerce Order Processing with SQS Lambda Integration

Let’s dive into “E-Shop Express” once more to understand how SQS and Lambda integration works in a real-world context.
For this example we are going to use Serverless Framework with Typescript. If you not familiar on how to create a lamda function using serverless Framework and Typescript please read this comprehensive guide.

The entire code base of this example can be found in this repository. We’ll focus on two essentials part for this example, at first we’ll dive into the configuration of our serverless project and then we’ll see how we implemented these lambda functions. 

Serverless Configuration

import type { AWS } from '@serverless/typescript';

import { orderNotifier, orderProcessor } from '@functions/orders';

const serverlessConfiguration: AWS = {
  ...,
  functions: { 
    orderNotifier,
    orderProcessor
  },
  ...,
  resources: {
    Resources: {
      OrderSQSQueue: {
        Type: 'AWS::SQS::Queue',
        Properties: {
          QueueName: 'EShopQueue',
          VisibilityTimeout: 120
        }
      },
      DeadLetterQueue: {
        Type: 'AWS::SQS::Queue',
        Properties: {
          QueueName: 'EShopDeadLetterQueue',
          VisibilityTimeout: 120
        }
      },
      OrderProcessorRole: {
        Type: 'AWS::IAM::Role',
        Properties: {
          RoleName: 'OrderProcessorRole',
          AssumeRolePolicyDocument: {
            Version: '2012-10-17',
            Statement: [
              {
                Effect: 'Allow',
                Principal: {
                  Service: 'lambda.amazonaws.com',
                },
                Action: 'sts:AssumeRole'
              }
            ]
          },
          Policies: [
            {
              PolicyName: 'OrderProcessorPolicy',
              PolicyDocument: {
                Version: '2012-10-17',
                Statement: [
                  {
                    Effect: 'Allow',
                    Action: [
                      'sqs:*'
                    ],
                    Resource: [
                      { 'Fn::GetAtt': ['OrderSQSQueue', 'Arn']}, 
                      { 'Fn::GetAtt': ['DeadLetterQueue', 'Arn']}
                    ]
                  },
                  {
                    Effect: "Allow",
                    Action: [
                      "logs:CreateLogGroup",
                      "logs:CreateLogStream",
                      "logs:PutLogEvents"
                    ],
                    Resource: ["arn:aws:logs:*:*:*"]
                  }
                ]
              }
            }
          ]
        }
      }
    }
  }
};

module.exports = serverlessConfiguration;

Let’s explain the configuration of our project. 
We do have two functions : orderNotifier that takes incoming orders and puts them in a queue and orderProcessor that retrieves orders from the queue and processes them. 
We do have three resources: OrderSQSQueue which acts as a waiting area for incoming orders, DeadLetterQueue which catches orders that can’t be processed successfully and OrderProcessorRole which gives permissions to the orderProcessor function to work with the queues and logs.  
The relation between the functions and resources is defined as such :
orderNotifier places orders into OrderSQSQueue.
orderProcessor fetches orders from OrderSQSQueue and handles processing, including handling errors.
OrderProcessorRole defines what the orderProcessor is allowed to do with the queues and logs.

In the next section, let’s understand how these lambda functions were implemented. 

Lambda functions implementation

In this section we’ll explain how the orderProcessor and orderNotifier function has been implemented.

OrderProcessor implementation

// src/functions/orders/handler.ts

import type { ValidatedEventAPIGatewayProxyEvent } from '@libs/api-gateway';
import { formatJSONResponse } from '@libs/api-gateway';
import { middyfy } from '@libs/lambda';
import schema from './schema';
import { createLogger } from '@libs/logger';
import { sendMessage } from './../../services/sqsHelper'
...
const processHandler: ValidatedEventAPIGatewayProxyEvent<typeof schema> = async (event) => {
  try {
    const request = event.body
    logger.info('Receiving body parameters', { body: request });
    const data = {
      id: Date.now(),
      orderStatus: 'Shipped',
      phoneNumber: request.phone_number
    };
    const Message = JSON.stringify(data); 
    logger.info('Sending order details to the Queue', { Message })
    await sendMessage(Message);
    return formatJSONResponse({
      'message': 'Order process successfully',
      ...data
    });
  } catch (error) {
    logger.error('Error occured', { error })
    return formatJSONResponse({
      message : error.message
    }, 500); 
  }
};
...
const process = middyfy(processHandler);
export { process }

Let’s break this function step by step :

  • Inside the try block, the code does the following:
    • Extracts the request from the event’s body. 
    • Logs information about the received body parameters.
    • Creates a data object with an ID, order status, and phone number based on the request.
    • Converts the data object into a JSON string and assigns it to the Message variable.
    • Logs information about sending order details to a queue.
    • Calls the sendMessage function with the JSON message.
    • Returns a JSON response with a success message and the dataobject.
  • In case of an error, the code enters the catch block:
    • Logs an error message.
    • Returns a JSON response with the error message and a status code of 500 (Internal Server Error)

Let’s take a look on how the sendMessage function is implemented. 

import { 
    SQSClient,
    SendMessageCommand, ...
} from "@aws-sdk/client-sqs";

const region        = process.env.REGION
const QueueUrl      = process.env.QUEUE_URL
const delaySeconds  = process.env.DELAY_S || 1;

const sqsClient     = new SQSClient({ region });

export const sendMessage = async (MessageBody: string) => {
    try {
        const command: SendMessageCommand = new SendMessageCommand({
            QueueUrl,
            DelaySeconds: +delaySeconds,
            MessageBody
        })
        const response = await sqsClient.send(command)
        console.log(response);
        return response;
    } catch (error) {
        console.log('Error happened ...', error.message)
    }
}
...

The sendMessage function takes a single parameter MessageBody, which is the string message that you want to send to the SQS queue. Within the try block, a new SendMessageCommand is created specifying the QueueUrl, DelaySeconds and MessageBody based on the provided configuration. The SendMessageCommand is then sent to the SQS queue using the sqsClient.send(command) method, which returns a response. If an error occurs during the message sending process, it is caught in the catch block. An error message is logged to the console. 

Note that we used AWS SDK v3’s @aws-sdk/client-sqs module for interacting with SQS, which is the recommended and up-to-date SDK for AWS services.

To summarize, this code is a Lambda function designed to receive an API Gateway event, process order details, log events, and send the order details to an SQS queue. This function is responsible for handling order processing.

OrderNotifier implementation

// src/functions/orders/handler.ts

export const notify = async (event: any): Promise<void> => {
  try {
    for (const record of event.Records) {
      const orderDetails = JSON.parse(record.body);
    }
    const record = event.Records[0];
    logger.info('Reading event', { event })
    // process the order
  } catch (error) {
    logger.error('Error occured', { message : error.message }) 
  }
};

Let’s break down this code step by step: 

  • Inside the try block, there’s a for...of loop that iterates through the Records property of the event. This suggests that the function is designed to handle multiple records within an event, in this case it’s the one coming from AWS SQS.
  • Inside the loop, it parses each record.body as JSON into the orderDetails variable. This assumes that the records contain JSON-formatted data. 
  • After the loop, it logs information about reading the entire event object using a logger.
  • You can implement after the log any processing logic of your choice. For example Store order details in a database, send a confirmation email/sms to the customer, … 

The specific processing logic you implement will depend on your application’s requirements and the services it interacts with. 

Conclusion

In our exploration of the orderNotifier and orderProcessor components within the Serverless Framework, we’ve delved into the world of efficient message queuing and processing using AWS Simple Queue Service (SQS). These two essential pieces of a serverless architecture not only facilitate streamlined order management but also provide a valuable lesson in understanding the interaction with AWS SQS.

In the orderNotifier function, we demonstrated how to efficiently send order notifications to an SQS queue. By leveraging the Serverless Framework, we simplified the deployment and management of this serverless function, making it easy to integrate into our application. The result is a seamless flow of order information into the SQS queue, ready for further processing.

The orderProcessor function takes center stage when it comes to processing orders stored in the SQS queue. Here, we showcased how to retrieve and process these orders efficiently, thanks to the capabilities offered by the Serverless Framework. This function provides a practical example of how serverless computing can scale to handle a high volume of orders, ensuring timely and reliable processing.

This Post Has One Comment

  1. Moussa

    Very useful

Leave a Reply