Note: phiên bản Tiếng Việt của bài này ở link dưới.

https://duongnt.com/sqs-services-integration-one-vie

sqs-services-integration

Enterprise Integration Patterns by Gregor Hohpe is one of my favourite IT books. The most fascinating thing about it is despite being written 17 years ago, most of its contents are still as relevant as ever. And utilizing message channels (queues) is still one of the best ways to integrate multiple services. However, the tools themselves have changed a lot in the intervening period. Nowadays, I mostly use Amazon Simple Queue Service (SQS) as the message channel in my projects.

In this first part of a two-part series, we find out how to interact with a SQS queue using C#.

The advantage of messaging-based services integration

In this context, a message is a package of data that can be exchanged between different parts of a big system. Messaging-based integration has the following advantages.

  • It’s asynchronous by nature, sending a message does not require both the sender and the receiver to be online at the same time.
  • Messaging systems allow each service to be independent (decoupling) from each other. This is because a message can be transformed in transit so that both the sender and receiver keep their original format.
  • We can switch between broadcasting a message to all receivers, or sending it to just one or a few receivers. And we can make all these changes without touching the receivers nor senders.

To quote the book Enterprise Integration Patterns.

Messaging can transfer packages of data frequently, immediately, reliably, and asynchronously, using customizable formats.

How about the other options

Obviously, messaging is not the only integration style. Below are some alternatives.

  • File transfer: each service exports data into a file, which other services can import. The cons are the developer must figure out how to write/read/delete files, how to keep them unique,… At some point, this solution will turn into a version of messaging.
  • Shared database: multiple services store their data in a central, agreed-upon database. Any service can have access to other services’ data when necessary. The cons are it’s difficult to define one database scheme suitable for all services. And the database can easily become a bottleneck.
  • API: when a service needs information from another service, it calls the other service directly via an API. The cons of this method is it couples different services together.

Of course, there is no guarantee that messaging is always the best solution. When faced with a new problem, it’s a good idea to keep the other options in mind.

Standard Amazon SQS

Set up a development environment

For simplicity’ sake, this article will use Alpine SQS, a containerized Java implementation of Amazon SQS. All the sample code was also tested on an actual Amazon SQS instance. If you have an AWS subscription, feel free to use that instead.

First, we need to pull the Docker image and start the container. Our SQS instance will run on port 9324, and the messages Web UI will run on port 9325.

docker run --name alpine-sqs -p 9324:9324 -p 9325:9325 -d roribio16/alpine-sqs:latest

Next, we will create a new message queue and name it helloworldqueue. Note that we need to install the AWS Command Line Interface (CLI), please download it from this link.

aws --endpoint-url http://localhost:9324 sqs create-queue --queue-name helloworldqueue

You can also add that new queue to the Web UI by adding a new endpoint to the file /opt/config/sqs-insight.conf. The fields key, secretKey and region are required by the Web UI, but are actually not used by Alpine SQS.

"endpoints": [
    {
        "key": "notValidKey",
        "secretKey": "notValidSecret",
        "region": "eu-central-1",
        "url": "http://localhost:9324/queue/helloworldqueue"
    }
]

Then you can access the Web UI and view the queue’s content at the following URL.

http://localhost:9325

SQS Web UI

Create a SQS client object

To interact with SQS, we need to install the AWS SDK by running the following command.

dotnet add package AWSSDK.SQS

We need to specify the region and the service URL of our SQS instance. Because we are using Alpine SQS, the service URL is on localhost.

var sqsConfig = new AmazonSQSConfig
{
    RegionEndpoint = RegionEndpoint.EUCentral1,
    ServiceURL = "http://localhost:9324"
};

var sqsClient = new AmazonSQSClient(sqsConfig);

If your SQS instance requires a password, you can provide it via a BasicAWSCredentials object.

var accessKey = "<your access key>";
var secretKey = "<your secret key>";
var cred = new BasicAWSCredentials(accessKey, secretKey);

var sqsClient = new AmazonSQSClient(cred, sqsConfig);

Send a message to SQS

We use the SendMessageAsync method to send a message to the queue. That method has two overloads, but we will use the one which takes a SendMessageRequest argument. At the minimum, we must provide the queue URL and message body.

var sendMessageRequest = new SendMessageRequest
{
    QueueUrl = "http://localhost:9324/queue/helloworldqueue",
    MessageBody = "Hello world!"
};

Then we can use the sqsClient object created in the previous section to send our message to the queue.

await sqsClient.SendMessageAsync(sendMessageRequest);

We can use the Web UI to verify that our message does exist in the queue.

Message exists in the queue

However, in a real project, we rarely want to send a plaintext message. Instead, we usually send a serialized object.

public class Person
{
    public string Name { get; set; }
    public int Age { get; set; }
}

var person = new Person { Name = "John", Age = 30 };
var msg = JsonSerializer.Serialize(person);

var sendMessageRequest = new SendMessageRequest
{
    QueueUrl = "http://localhost:9324/queue/helloworldqueue",
    MessageBody = msg
};

await sqsClient.SendMessageAsync(sendMessageRequest);

Serialized object in the queue

Other notable properties in SendMessageRequest class

  • DelaySeconds: sometimes when sending a message to the queue, we don’t want it to be available for consumption right away. When this property is greater than 0, SQS will wait until the delay period has passed before making our message visible.
    sendMessageRequest.DelaySeconds = 30; // Wait 30 seconds before making our message visible
    
  • MessageAttributes: we can use this property to send metadata along with our message. This property is helpful if our message needs additional information, which is not part of the message itself. Note that all message attributes must be provided in string format. Unfortunately, we cannot confirm message attributes via the Web UI.
    var attrName = "DummyAttribute";
    var attrValue = "DummyValue";
    var additionalAttr = new MessageAttributeValue
    {
        DataType = typeof(string).Name,
        StringValue = attrValue
    };
    sendMessageRequest.MessageAttributes[attrName] = additionalAttr; // MessageAttributes is a dictionary
    
    await sqsClient.SendMessageAsync(sendMessageRequest);
    

Send messages in batch to SQS

If we want to send multiple messages to the queue at once, repeatedly calling SendMessageAsync might harm performance. In this case, SendMessageBatchAsync is a good alternative. Instead of SendMessageRequest, we need to create a SendMessageBatchRequest object.

var sendMessageBatchRequest = new SendMessageBatchRequest
{
    QueueUrl = "http://localhost:9324/queue/helloworldqueue",
    Entries = new List<SendMessageBatchRequestEntry>
    {
        new SendMessageBatchRequestEntry { Id = "001", MessageBody = "msg 1" },
        new SendMessageBatchRequestEntry { Id = "002", MessageBody = "msg 2" },
        new SendMessageBatchRequestEntry { Id = "003", MessageBody = "msg 3" },
    }
};

await sqsClient.SendMessageBatchAsync(sendMessageBatchRequest);

Notice that each entry has a unique Id value. We can confirm that three messages exist in the queue.

Three messages in the queue

Similar to SendMessageRequest, SendMessageBatchRequestEntry also has properties such as DelaySeconds, MessageSystemAttributes,…

Read messages from SQS

Before trying to read messages from the queue, we need to keep these following in mind.

  • By default, SQS only provides best-effort ordering. In other words, while messages are generally delivered in the same order as they are sent, this is not guaranteed.
  • Unlike a normal queue, a message is not automatically deleted from SQS after we read it. Instead, we have to explicitly delete it.

Also, I recommend turning off the Web UI by running the following command, because it might interfere with our test code.

supervisorctl stop insight

To read messages from SQS, we use the ReceiveMessageAsync method, passing a ReceiveMessageRequest object to provide all the necessary options.

var receiveMessageRequest = new ReceiveMessageRequest
{
    QueueUrl = "http://localhost:9324/queue/helloworldqueue",
    WaitTimeSeconds = 5,
    VisibilityTimeout = 30,
    MaxNumberOfMessages = 10
};

var messages = (await sqsClient.ReceiveMessageAsync(receiveMessageRequest)).Messages;

Below are some frequently used options.

  • QueueUrl: the queue endpoint from which to receive messages.
  • MaxNumberOfMessages: try to read this many messages. Keep in mind that SQS might return fewer messages (but never return more). This value must be between 0 ~ 10.
  • VisibilityTimeout: the duration to hide a message after it has been received. This is so that multiple receivers won’t process the same message.
  • WaitTimeSeconds: the maximum duration to wait if the number of messages that can be returned is smaller than MaxNumberOfMessages. If enough messages are available, they will be returned right away.

Here, messages is a List<Message>, and we can access the body of each message via the Body property.

foreach (var message in messages)
{
    Console.WriteLine(message.Body);
}

Or we can deserialize the data if it is a serialized string.

var person = JsonSerializer.Deserialize<Person>(message.Body);

Read message attribute from SQS

Remember that in an earlier section, we write a MessageAttribute called DummyAttribute to our message. We can read that attribute back from our message, but we need to tell SQS to return that attribute first.

var receiveMessageRequest = new ReceiveMessageRequest
{
    // omitted
    MessageAttributeNames = new List<string> { "DummyAttribute" } // To retrieve multiple attributes, add their names to this list
};

Then we can use the receiveMessageRequest to read messages from the queue like normal.

var messages = (await sqsClient.ReceiveMessageAsync(receiveMessageRequest)).Messages;
var message = messages.First(); // Assuming that 'messages' is not empty

After that, we can access all message attributes by name via the MessageAttributes dictionary in the message object. But to be safe, we should use the TryGetValue method.

if (message.MessageAttributes.TryGetValue("DummyAttribute", out var attr))
{
    Console.WriteLine($"Attribute value is: {attr.StringValue}");
}

Delete messages from SQS

As mentioned before, reading a message from SQS is more like a peek than a pop. However, it’s usually a good idea to delete a message after processing. Otherwise, we will keep processing the same message again and again. Looking at the IAmazonSQS interface, we can see a DeleteMessageAsync method, it requires an argument of type DeleteMessageRequest.

public class DeleteMessageRequest : AmazonSQSRequest
{
    // omitted

    [AWSProperty(Required = true)]
    public string QueueUrl { get; set; }

    [AWSProperty(Required = true)]
    public string ReceiptHandle { get; set; }
}

The QueueUrl is easy enough, but what about ReceiptHandle? It turns out that to delete a message, we must retrieve it first (we can’t just recall a specific message). Then we can use the ReceiptHandle from the retrieved message to delete it.

var receiveMessageRequest = new ReceiveMessageRequest
{
    QueueUrl = "http://localhost:9324/queue/helloworldqueue",
    WaitTimeSeconds = 5,
    VisibilityTimeout = 30,
    MaxNumberOfMessages = 1
};

var messages = (await sqsClient.ReceiveMessageAsync(receiveMessageRequest)).Messages;
var message = messages.First(); // Assuming that 'messages' is not empty

var deleteMessageRequest = new DeleteMessageRequest
{
    QueueUrl = "http://localhost:9324/queue/helloworldqueue",
    ReceiptHandle = message.ReceiptHandler
};

await sqsClient.DeleteMessageAsync(deleteMessageRequest);

If the ReceiptHandle we provide does not exist, DeleteMessageAsync will still run to completion instead of throwing an exception.

To delete all messages from the queue, we can use the PurgeQueueAsync method.

await sqsClient.PurgeQueueAsync("http://localhost:9324/queue/helloworldqueue"); // Delete all messages in helloworldqueue

First in first out (FIFO) queue

In most cases, a standard queue with its best-effort ordering is good enough for our needs. But SQS also supports strict FIFO queues. We can create one by the following command (the .fifo suffix is mandatory).

aws --endpoint-url http://localhost:9324 sqs create-queue --queue-name helloworldqueue.fifo --attribute FifoQueue=true

Send messages to a FIFO queue

Sending messages to a FIFO queue is largely the same as sending to a standard queue, we still need to create a SendMessageRequest or SendMessageBatchRequest. The only difference is we need to pay attention to two additional properties: MessageGroupId and MessageDeduplicationId.

  • MessageGroupId: further divide messages in a FIFO queue into multiple groups by assigning a MessageGroupId. Message order is only guaranteed within the same group.
  • MessageDeduplicationId: prevent the same message from being added multiple times to the queue. This should be unique for each message within the same group. If the ContentBasedDeduplication flag is enabled for our queue, we can skip this property, and the SHA-256 hash of the message will be used instead.
var sendMessageRequest = new SendMessageRequest
{
    QueueUrl = "http://localhost:9324/queue/helloworldqueue",
    MessageBody = msg,
    MessageDeduplicationId = "<unique value for this message>",
    MessageGroupId = "<id for this group>"
};

// Or

var sendMessageBatchRequest = new SendMessageBatchRequest
{
    QueueUrl = "http://localhost:9324/queue/helloworldqueue",
    Entries = new List<SendMessageBatchRequestEntry>
    {
        new SendMessageBatchRequestEntry
            { Id = "001", MessageBody = "msg 1", MessageDeduplicationId = "unique value 1", MessageGroupId = "<group id 1>"},
        new SendMessageBatchRequestEntry
            { Id = "002", MessageBody = "msg 2", MessageDeduplicationId = "unique value 2", MessageGroupId = "<group id 1>"},
        // The third message is in a different group
        new SendMessageBatchRequestEntry
            { Id = "003", MessageBody = "msg 3", MessageDeduplicationId = "unique value", MessageGroupId = "<group id 2>"},
    }
};

Read messages from a FIFO queue

Again, reading from a FIFO queue is mostly the same as reading from a standard queue. Below are some differences.

  • The order for messages from the same group is guaranteed.
  • After a message is returned and becomes invisible (due to the VisibilityTimeout), no message from the same group can be returned until that message becomes visible again, or is deleted.
  • By providing a ReceiveRequestAttemptId, we can try to read the same message again before it becomes visible. We just need to send another request with the same ReceiveRequestAttemptId.

Delete messages from a FIFO queue

This operation is identical to a standard queue.

Conclusion

Amazon SQS makes working with message queues simple. Aim with this knowledge, we will explore a sample scenario and see how services integration with messaging works in practice. Please check it out at the following link.

https://duongnt.com/sqs-services-integration-two/

A software developer from Vietnam and is currently living in Japan.

One Thought on “Amazon SQS based services integration – Part one”

Leave a Reply