Note: see the link below for the English version of this article.
https://duongnt.com/sqs-services-integration-one
Enterprise Integration Patterns by Gregor Hohpe là một trong những cuốn sách về IT mà tôi thích nhất. Mặc dù được viết cách đây hơn 17 năm, phần lớn nội dung của cuốn sách này vẫn rất hữu ích. Và sử dụng message vẫn là một trong những phương pháp kết nối service hiệu quả nhất. Tuy nhiên, ngày nay chúng ta có thêm nhiều tool mới và thuận tiện hơn so với năm 2004. Thông thường, khi cần kết nối nhiều service với nhau thì tôi sẽ sử dụng Amazon Simple Queue Service (SQS) để làm kênh nhận và gửi message.
Bài này là phần 1 trong loạt 2 bài về kết nối service sử dụng message. Chúng ta sẽ tìm hiểu cách tương tác với SQS queue bằng C#.
Ưu điểm của việc kết nối service bằng message
Trong bài này, khái niệm message có nghĩa là một gói dữ liệu được trao đổi giữa các bộ phận của một hệ thống lớn. Phương pháp kết nối service bằng message có những ưu điểm dưới đây.
- Có tính asynchronous cao do việc gửi message không yêu cầu phía gửi và phía nhận phải online cùng thời điểm.
- Đảm bảo các service độc lập với nhau. Nguyên nhân là do ta có thể thay đổi message trong quá trình gửi mà không cần phía nhận hay phía gửi phải tự thay đổi format của mình.
- Ta có thể cấu hình để gửi 1 message tới tất cả các service, hoặc lựa chọn chỉ gửi message đó tới một vài service. Việc thay đổi này hoàn toàn không ảnh hưởng gì tới bản thân các service.
Xin trích dẫn cuốn Enterprise Integration Patterns.
Messaging can transfer packages of data frequently, immediately, reliably, and asynchronously, using customizable formats.
Tạm dịch.
Message cho phép ta gửi các gói dữ liệu một cách thường xuyên, tức thời, phi đồng bộ, với độ tin cậy cao và sử dụng format có thể tùy biến được.
Điểm qua một số giải pháp khác
Tất nhiên message không phải là giải pháp kết nối service duy nhất. Dưới đây là một vài phương pháp khác.
- Gửi file: mỗi service tự xuất dữ liệu ra file để các service khác import. Nhược điểm là chúng ta phải tự viết code để ghi/đọc/xóa file cũng như đảm bảo chúng không bị trùng lặp,… Giải pháp của ta càng phức tạp thì phương pháp này càng trở nên giống với sử dụng message.
- Dùng chung cơ sở dữ liệu: nhiều service cùng dùng chung một cơ sở dữ liệu. Từng service đều có thể truy cập dữ liệu của service khác khi cần. Nhược điểm là ta khó thiết kế cơ sở dữ liệu để phù hợp cho tất cả các service. Đồng thời cơ sở dữ liệu dễ trở thành nút cổ chai gây ảnh hưởng hiệu năng.
- API: khi một service cần thông tin từ một service khác, nó sẽ gọi service đó thông qua API. Nhược điểm là các service sẽ bị phụ thuộc vào nhau.
Cho dù vậy, ta cũng không thể khẳng định rằng message luôn là giải pháp tốt nhất. Mỗi khi gặp một bài toán mới, ta vẫn nên cân nhắc tất cả các giải pháp đã nói ở trên.
Amazon SQS thông thường
Thiết lập môi trường thử nghiệm
Để giảm độ phức tạp, code trong bài này sẽ sử dụng Alpine SQS, một phiên bản của Amazon SQS viết bằng Java và chạy trong container. Nhưng nếu đã có sẵn subscription cho AWS thì các bạn cũng có thể chạy code trên AWS.
Đầu tiên, chúng ta cần tải về image Docker và khởi động container. SQS của chúng ta sẽ chạy ở cổng 9324
, và giao diện để đọc message sẽ chạy ở cổng 9325
.
docker run --name alpine-sqs -p 9324:9324 -p 9325:9325 -d roribio16/alpine-sqs:latest
Sau đó, chúng ta sẽ tạo một message queue mới với tên gọi là helloworldqueue
. Chú ý là ở bước này ta cần cài AWS Command Line Interface (CLI), các bạn có thể tải bộ cài từ đường link này.
aws --endpoint-url http://localhost:9324 sqs create-queue --queue-name helloworldqueue
Các bạn cũng có thể thêm queue mới đó vào giao diện quản lý message bằng cách cập nhật file /opt/config/sqs-insight.conf
. Các trường key
, secretKey
và region
là bắt buộc phải có, nhưng Alpine SQS
không sử dụng chúng.
"endpoints": [
{
"key": "notValidKey",
"secretKey": "notValidSecret",
"region": "eu-central-1",
"url": "http://localhost:9324/queue/helloworldqueue"
}
]
Lúc này, các bạn có thể mở giao diện web và xem nội dung của queue từ đường link dưới.
http://localhost:9325
Tạo SQS client
Để tương tác với SQS, chúng ta cần cài bộ AWS SDK bằng lệnh dưới.
dotnet add package AWSSDK.SQS
Ta phải cung cấp URL và region của service SQS. Vì ta đang dùng Alpine SQS nên URL của service sẽ là localhost.
var sqsConfig = new AmazonSQSConfig
{
RegionEndpoint = RegionEndpoint.EUCentral1,
ServiceURL = "http://localhost:9324"
};
var sqsClient = new AmazonSQSClient(sqsConfig);
Nếu service SQS của bạn có mật khẩu thì ta cần cung cấp mật khẩu thông qua object thuộc lớp BasicAWSCredentials
.
var accessKey = "<access key của bạn>";
var secretKey = "<secret key của bạn>";
var cred = new BasicAWSCredentials(accessKey, secretKey);
var sqsClient = new AmazonSQSClient(cred, sqsConfig);
Gửi message lên SQS
Ta dùng hàm SendMessageAsync
để gửi message lên queue. Hàm này có 2 overload, nhưng ta sẽ dùng overload với argument thuộc lớp SendMessageRequest
. Ở mức tối thiểu, ta cần cung cấp URL của queue và nội dung message.
var sendMessageRequest = new SendMessageRequest
{
QueueUrl = "http://localhost:9324/queue/helloworldqueue",
MessageBody = "Hello world!"
};
Sau đó ta dùng object sqsClient
đã tạo ở phần trước để gửi message lên queue.
await sqsClient.SendMessageAsync(sendMessageRequest);
Ta có thể dùng giao diện web để kiểm tra là message tồn tại trong queue.
Tuy nhiên, trong một project thực tế, chúng ta ít khi gửi message dưới dạng plaintext. Thông thường, ta sẽ gửi object đã được serialize.
public class Person
{
public string Name { get; set; }
public int Age { get; set; }
}
var person = new Person { Name = "John", Age = 30 };
var msg = JsonSerializer.Serialize(person);
var sendMessageRequest = new SendMessageRequest
{
QueueUrl = "http://localhost:9324/queue/helloworldqueue",
MessageBody = msg
};
await sqsClient.SendMessageAsync(sendMessageRequest);
Một số property khác trong lớp SendMessageRequest
DelaySeconds
: đôi khi ta không muốn service khác có thể đọc message ngay sau khi gửi lên queue. Nếu giá trị property này lớn hơn0
, SQS sẽ đợi thời gian chờ trôi qua rồi mới cho service khác đọc message.sendMessageRequest.DelaySeconds = 30; // Ẩn message trong 30s đầu
MessageAttributes
: ta có thể dùng property này để gửi metadata cùng với message. Property này hữu ích khi ta cần gửi thông tin bổ sung, mà thông tin đó không phải là một phần của message. Lưu ý là tất cả các attribute của message đều phải có dạngstring
. Đáng tiếc là ta không thể xem được attribute từ giao diện web.var attrName = "DummyAttribute"; var attrValue = "DummyValue"; var additionalAttr = new MessageAttributeValue { DataType = typeof(string).Name, StringValue = attrValue }; sendMessageRequest.MessageAttributes[attrName] = additionalAttr; // MessageAttributes là một dictionary await sqsClient.SendMessageAsync(sendMessageRequest);
Gửi đồng thời nhiều message lên SQS
Nếu ta muốn gửi nhiều message cùng lúc thì việc gọi hàm SendMessageAsync
nhiều lần sẽ ảnh hưởng tới hiệu năng. Trong trường hợp này, ta có thể sử dụng hàm SendMessageBatchAsync
. Thay vì truyền vào SendMessageRequest
, ta cần truyền vào SendMessageBatchRequest
.
var sendMessageBatchRequest = new SendMessageBatchRequest
{
QueueUrl = "http://localhost:9324/queue/helloworldqueue",
Entries = new List<SendMessageBatchRequestEntry>
{
new SendMessageBatchRequestEntry { Id = "001", MessageBody = "msg 1" },
new SendMessageBatchRequestEntry { Id = "002", MessageBody = "msg 2" },
new SendMessageBatchRequestEntry { Id = "003", MessageBody = "msg 3" },
}
};
await sqsClient.SendMessageBatchAsync(sendMessageBatchRequest);
Chú ý là mỗi message đều có Id
độc nhất. Ta có thể xác nhận là 3 message đã được gửi lên queue.
Giống như SendMessageRequest
, SendMessageBatchRequestEntry
cũng có các propery DelaySeconds
, MessageSystemAttributes
,…
Đọc message từ SQS
Trước khi đọc message từ queue, chúng ta cần lưu ý một số điểm sau.
- Với thiết lập mặc định, thứ tự các message trong SQS chỉ là best-effort. Có nghĩa là mặc dù trong phần lớn các trường hợp, thứ tự đọc message là giống với thứ tự gửi message, nhưng điều này không được đảm bảo.
- Khác với queue thông thường, sau khi ta đọc 1 message từ SQS thì message đó không được xóa khỏi queue một cách tự động. Thay vào đó, ta phải tự gửi lệnh xóa message.
Ngoài ra, các bạn nên tắt giao diện web trước khi chạy code trong phần tiếp theo, vì giao diện web có thể ảnh hưởng tới quá trình test.
supervisorctl stop insight
Ta dùng hàm ReceiveMessageAsync
để đọc message từ SQS. Ta cần truyền vào một object thuộc lớp ReceiveMessageRequest
để cung cấp các thông tin cần thiết.
var receiveMessageRequest = new ReceiveMessageRequest
{
QueueUrl = "http://localhost:9324/queue/helloworldqueue",
WaitTimeSeconds = 5,
VisibilityTimeout = 30,
MaxNumberOfMessages = 10
};
var messages = (await sqsClient.ReceiveMessageAsync(receiveMessageRequest)).Messages;
Dưới đây là một số property thường dùng.
QueueUrl
: URL của queue mà ta muốn đọc.MaxNumberOfMessages
: số message ta muốn lấy về. Lưu ý là SQS có thể trả về ít message hơn (nhưng không bao giờ trả về nhiều hơn). Giá trị ở đây cần nằm trong khoảng0 ~ 10
.VisibilityTimeout
: thời gian ẩn một message sau khi ta đọc nó. Mục đích là để tránh nhiều instance xử lý trùng một message.WaitTimeSeconds
: thời gian chờ tối đa nếu số message có thể tải về là ít hơnMaxNumberOfMessages
. Nếu có đủ message thì các message sẽ được trả về ngay mà không cần chờ.
Ở đây, message
có dạng List<Message>
, và ta có thể đọc nội dung từng message thông qua property Body
.
foreach (var message in messages)
{
Console.WriteLine(message.Body);
}
Hoặc ta có thể deserialize dữ liệu từ string.
var person = JsonSerializer.Deserialize<Person>(message.Body);
Đọc message attribute từ SQS
Trong một phần ở trên, chúng ta đã gửi một MessageAttribute
với tên gọi DummyAttribute
kèm với message. Bây giờ ta có thể đọc lại attribute đó từ message, nhưng ta phải bảo SQS trả về attribute cùng với message.
var receiveMessageRequest = new ReceiveMessageRequest
{
// ... Lược bỏ nhiều code
MessageAttributeNames = new List<string> { "DummyAttribute" } // Để đọc nhiều attribute, ta thêm tên của chúng vào list này
};
Sau đó ta gửi receiveMessageRequest
để đọc message từ queue như bình thường.
var messages = (await sqsClient.ReceiveMessageAsync(receiveMessageRequest)).Messages;
var message = messages.First(); // Ở đây chỉ xét trường hợp `messages` không rỗng
Ta có thể đọc attribute của message từ dictionary với tên gọi MessageAttributes
của object message
. Nhưng để tránh lỗi, ta nên dùng hàm TryGetValue
.
if (message.MessageAttributes.TryGetValue("DummyAttribute", out var attr))
{
Console.WriteLine($"Attribute value is: {attr.StringValue}");
}
Xóa message khỏi SQS
Như đã nói ở phần trước, việc đọc message từ SQS giống với lệnh peek hơn là lệnh pop. Tuy nhiên, sau khi đã xử lý xong một message thì ta nên xóa nó đi. Nếu không ta sẽ lặp lại việc xử lý message đó nhiều lần. Trong interface IAmazonSQS
có hàm DeleteMessageAsync
, nó nhận argument với kiểu là DeleteMessageRequest
.
public class DeleteMessageRequest : AmazonSQSRequest
{
// ... lược bỏ nhiều code
[AWSProperty(Required = true)]
public string QueueUrl { get; set; }
[AWSProperty(Required = true)]
public string ReceiptHandle { get; set; }
}
Giá trị của QueueUrl
là dễ hiểu, nhưng còn ReceiptHandle
thì sao? Với SQS, trước khi xóa một message thì ta phải đọc nó (ta không thể thu hồi lại một message cụ thể). Sau đó ta có thể dùng giá trị ReceiptHandle
của message để xóa nó đi.
var receiveMessageRequest = new ReceiveMessageRequest
{
QueueUrl = "http://localhost:9324/queue/helloworldqueue",
WaitTimeSeconds = 5,
VisibilityTimeout = 30,
MaxNumberOfMessages = 1
};
var messages = (await sqsClient.ReceiveMessageAsync(receiveMessageRequest)).Messages;
var message = messages.First(); // Ở đây chỉ xét trường hợp `messages` không rỗng
var deleteMessageRequest = new DeleteMessageRequest
{
QueueUrl = "http://localhost:9324/queue/helloworldqueue",
ReceiptHandle = message.ReceiptHandler
};
await sqsClient.DeleteMessageAsync(deleteMessageRequest);
Dù giá trị ReceiptHandle
ta cung cấp là không tồn tại thì DeleteMessageAsync
vẫn không trả về ngoại lệ.
Để xóa toàn bộ message trong queue, ta có thể dùng hàm PurgeQueueAsync
.
await sqsClient.PurgeQueueAsync("http://localhost:9324/queue/helloworldqueue"); // Xóa toàn bộ messages trong helloworldqueue
Queue đảm bảo first in first out (FIFO)
Trong phần lớn trường hợp, queue thông thường là đủ cho nhu cầu của ta. Nhưng SQS cũng hộ trợ cả queue đảm bảo tính FIFO. Ta có thể tạo kiểu queue này bằng lệnh dưới đây (phần đuôi .fifo
là bắt buộc).
aws --endpoint-url http://localhost:9324 sqs create-queue --queue-name helloworldqueue.fifo --attribute FifoQueue=true
Gửi message lên queue FIFO
Cách gửi message lên queue FIFO không khác nhiều so với cách gửi message lên queue bình thường, ta vẫn cần tạo SendMessageRequest
hoặc SendMessageBatchRequest
. Điểm khác biệt duy nhất là ta cần quan tâm tới 2 property mới: MessageGroupId
and MessageDeduplicationId
.
MessageGroupId
: chia các message trong queue FIFO ra thành nhiều nhóm khác nhau. Queue FIFO chỉ đảm bảo thứ tự trong từng nhóm.MessageDeduplicationId
: để tránh gửi lặp một message nhiều lần lên queue. Giá trị này phải là độc nhất trong từng nhóm. Nếu ta bật modeContentBasedDeduplication
cho queue thì ta không cần cung cấp giá trị này. SQS sẽ tự động sử dụng hash SHA-256 để phân biệt các message.
var sendMessageRequest = new SendMessageRequest
{
QueueUrl = "http://localhost:9324/queue/helloworldqueue",
MessageBody = msg,
MessageDeduplicationId = "<giá trị độc nhất cho message>",
MessageGroupId = "<id của group>"
};
// Hoặc
var sendMessageBatchRequest = new SendMessageBatchRequest
{
QueueUrl = "http://localhost:9324/queue/helloworldqueue",
Entries = new List<SendMessageBatchRequestEntry>
{
new SendMessageBatchRequestEntry
{ Id = "001", MessageBody = "msg 1", MessageDeduplicationId = "unique value 1", MessageGroupId = "<group id 1>"},
new SendMessageBatchRequestEntry
{ Id = "002", MessageBody = "msg 2", MessageDeduplicationId = "unique value 2", MessageGroupId = "<group id 1>"},
// Message thứ 3 ở trong group khác
new SendMessageBatchRequestEntry
{ Id = "003", MessageBody = "msg 3", MessageDeduplicationId = "unique value", MessageGroupId = "<group id 2>"},
}
};
Đọc message từ queue FIFO
Cách đọc message từ queue FIFO cũng không khác nhiều so với cách đọc message từ queue bình thường. Dưới đây là các điểm khác biệt.
- Queue FIFO đảm bảo giữ đúng thứ tự cho các message trong cùng một nhóm.
- Sau khi 1 message đã được đọc và ẩn đi (do giá trị
VisibilityTimeout
), ta không thể đọc các message trong cùng nhóm với message đó cho đến khi message đầu tiên hết bị ẩn hoặc được xóa. - Ta có thể đọc lại message dù nó đang bị ẩn bằng cách gửi kèm request giá trị
ReceiveRequestAttemptId
. Lúc này ta chỉ cẩn gửi request mới với cùngReceiveRequestAttemptId
.
Xóa message từ queue FIFO
Thao tác này giống hệt trường hợp queue thông thường.
Kết thúc
Amazon SQS giúp việc sử dụng message queue trở nên dễ dàng hơn. Ta sẽ sử dụng những kiến thức trong bài hôm nay để phân tích một bài toán giả định và xem message hoạt động thế nào trong thực tế. Hẹn gặp các bạn trong phần 2.