Note: see the link below for the English version of this article.
https://duongnt.com/sqs-services-integration-two
Trong bài trước, chúng ta đã tìm hiểu cách sử dụng C# để đọc và ghi message vào Amazon SQS. Hôm nay, chúng ta sẽ dùng SQS để tích hợp nhiều service trong một bài toán giả định.
Chú ý là Amazon SQS chỉ hỗ trợ message với kích thước tối đa là 256KB. Nhưng ta giả sử là tất cả message trong hệ thống của ta đều nhỏ hơn mức đó.
Bài toán của chúng ta
Câu chuyện về một mạng xã hội
Giả định là chúng ta lập ra một công ty với tên gọi Beyond để phát triển một mạng xã hội gọi là Headtome. Headtome được viết bằng C#, nó cho phép người dùng chia sẻ dòng trạng thái và hình ảnh với cả thế giới. Như mọi mạng xã hội khác, chúng ta thực hiện phân tích ảnh mà người dùng tải lên. Bước phân tích này do một service độc lập đảm nhiệm. Đồng thời, chúng ta nén và lưu tất cả ảnh trong một service khác để thực hiện backup.
Tất cả service ở trên trao đổi dữ liệu dưới dạng JSON, nhưng mỗi service lại cần một định dạng riêng. Ảnh do người dùng tải lên có dạng byte array và dùng model RGB. Đây cũng là định dạng chính mà Headtome sử dụng.
{
"id": "<picture UUID>",
"user_id": "<user UUID>",
"data": "64756f6e67..."
}
Mặt khác, service phân tích cần dữ liệu dưới dạng mảng integer một chiều. Vì ảnh gốc dùng RGB, chúng ta cần đọc giá trị trong từng kênh màu rồi chuyển về dạng một chiều.
{
"id": "picture UUID",
"user_id": "<user UUID>",
"data": [17, 10, 27, 10,...]
}
Còn service lưu trữ lại sử dụng byte array với dữ liệu đã được nén.
{
"id": "<picture UUID>",
"user_id": "<user UUID>",
"data": "6c7579..."
}
Một vụ sáp nhập
Headtome là một thành công lớn. Nhờ đó, chúng ta có đủ vốn để mua lại một dịch vụ chia sẻ ảnh và video với tên gọi Rapidpound. Ta muốn tích hợp tất cả ảnh mà Rapidpound thu thập được vào cùng workflow mà Headtome đang sử dụng, nhưng ta gặp phải một số trở ngại. Rapidpound được viết bằng Python và sử dụng model HSV để lưu ảnh.
Giải pháp đơn giản nhất là ta sẽ thêm code vào Rapidpound để chuyển tất cả ảnh về đúng định dạng trước khi gửi chúng tới các service khác. Nhưng ta càng bổ sung nhiều code thì ta càng phải test, bảo trì, và sửa lỗi nhiều hơn. Hơn nữa, những chức năng mới này là điều mà lẽ ra Rapidpound không cần quan tâm. Ngoài ra, trong tương lai, ta có thể phải tích hợp Rapidpound với nhiều service khác nữa. Chả lẽ mỗi lần đó ta lại đi thay đổi mã nguồn?
Về phía Headtome, thiết kế ở trên cũng không phải là lý tưởng. Ta đang khiến Headtome bị coupling với service phân tích và service lưu trữ. Nếu như ta cần bổ sung thêm một service khác sử dụng dữ liệu của Headtome thì sao?
Phương án tích hợp sử dụng message
Dưới đây là một số thách thức ta gặp phải.
- Ta khó kiểm soát được service nào đang gửi dữ liệu tới service nào. Và việc thay đổi service gửi/service nhận là rất khó khăn.
- Các service khác nhau cần dữ liệu dưới dạng khác nhau. Bản thân việc chuyển đổi dữ liệu về đúng định dạng là không khó, nhưng nhớ được rằng service nào cần dùng chuyển đổi nào thì là tương đối phức tạp.
- Nếu một service gửi dữ liệu quá nhanh, nó có thể làm quá tải các service khác.
- Khi một service ngừng hoạt động, tất cả dữ liệu gửi cho nó sẽ bị mất.
Ta sẽ dùng Amazon SQS để giải quyết tất cả các vấn đề trên.
Bổ sung một service integrator trung gian
Thật may là chúng ta đã chuẩn bị cho ngày này ngay từ ban đầu. Thực ra Headtome không trực tiếp gửi dữ liệu cho service phân tích và service lưu trữ. Nó coi mỗi ảnh cùng metadata là một message và gửi tất cả message lên một SQS queue. Sau đó, integrator sẽ đọc message từ queue, chuyển nó sang định dạng đúng, gửi nó tới phía nhận, rồi xóa message đã được xử lý đi. Integrator có thể tự mình quyết định tốc độ xử lý message từ queue. Nếu lưu lượng message tăng cao, một vài message có thể bị tắc trong queue, nhưng cuối cùng chúng sẽ vẫn được integrator xử lý.
Để phân biệt message từ các nguồn khác nhau, ta cần bổ sung một Content Enricher vào trước SQS queue. Module này chỉ làm một nhiệm vụ duy nhất là gắn serviceId
vào từng message. Ví dụ dưới đây là message từ Headtome.
{
"id": "<picture UUID>",
"service_id": "<serviceId của Headtome>",
"user_id": "<user UUID>",
"data": "64756f6e67..."
}
Integrator sẽ sử dụng serviceId
để quyết định gửi message tới đâu. Chú ý là một message có thể được gửi tới nhiều hơn một đích đến. Trong ví dụ của ta, cả service phân tích lẫn service lưu trữ đều đăng ký nhận message từ Headtome.
Integrator bao gồm một database để lưu đăng ký, một Content-Based Router, và nhiều Message Translator. Content-Based Router đọc serviceId
của từng message và kết hợp nó với dữ liệu từ database để quyết định xem cần gửi message đó tới những đâu. Sau đó message sẽ được gửi tới Message Translator tương ứng, tại đây nó được chuyển sang đúng định dạng. Sau bước này, ta đã có thể gửi message thẳng tới đích đến. Nhưng để khiến hệ thống trở nên dễ mở rộng hơn, ta sẽ gửi các message tới SQS queue riêng biệt của từng service đích. Từ đó, các service này có thể đọc message từ queue với tốc độ tùy ý.
Xử lý message sai định dạng
Trong điều kiện lý tưởng, tất cả message đều có thể được chuyển sang định dạng cần thiết. Nhưng nếu ta nhận phải một message lỗi thì sao? Rõ ràng Message Translator là nơi lý tưởng nhất để xử lý các lỗi này. Mỗi khi nhận phải một message lỗi, Message Translator sẽ lưu nội dung message đó vào một database. Sau đó ta có thể bổ sung một Web UI để hiển thị thông tin về các message lỗi.
Ta cũng có phương án khác là sử dụng Invalid Message Channel, ở đây ta gửi tất cả message lỗi tới một queue mới. Sau đó, ta tạo một service mới để đọc dữ liệu từ queue đó và thực hiện các bước xử lý tiếp theo.
Các ưu điểm của giải pháp sử dụng message
- Vì giải pháp của ta là không đồng bộ nên dù một service ngừng hoạt động ta cũng không bị mất dữ liệu. Tất cả message gửi tới service đó sẽ vẫn nằm trong queue để đợi service chạy trở lại.
- Từng service có thể quyết định tốc độ đọc message từ queue. Ta thậm chí có thể bổ sung cơ chế tự động phát hiện service đang bị overload để giảm tốc độ xử lý message.
- Việc mở rộng mỗi service là rất đơn giản. Ta thậm chí không cần tới load-balancer mà chỉ cần tạo thêm instance của service đó và cho nó xử lý message từ cùng một queue.
- Admin có thể thay đổi record trong database lưu thông tin đăng ký để thay đổi service nào gửi message cho service nào.
- Nếu ta cần thay đổi định dạng dữ liệu mà một service đang sử dụng, ta chỉ cần thay đổi Message Translator mà không sợ ảnh hưởng gì đến các phần khác trong hệ thống.
Tích hợp Rapidpound vào hệ thống hiện tại
Với thiết kế ở trên, ta có thể nhanh chóng tích hợp Rapidpound vào hệ thống.
- Dùng Content Enricher để thêm
serviceId
vào các message từ Rapidpound. - Tạo record mới trong database để đăng ký cho service phân tích và service lưu trữ nhận message từ Rapidpound.
- Cập nhật Message Translator để nó có thể xử lý message từ Rapidpound.
Một số thay đổi khác
Thêm một service ở phía nhận, tương tự như service phân tích hay service lưu trữ.
- Tạo SQS queue cho service mới.
- Tạo Message Translator (nếu cần thiết).
- Cập nhật database lưu thông tin đăng ký. Ta có thể chọn nhận message từ cả Headtome lẫn Rapidpound, hoặc chỉ nhận message từ một trong hai service đó.
Xóa service phía nhận.
- Cập nhật database lưu thông tin đăng ký để xóa tất cả các record trỏ tới service đó.
- Xóa Message Translator và SQS queue của service.
Xóa service phía gửi.
- Cập nhật database lưu thông tin đăng ký để xóa tất cả các record đăng ký nhận message của service đó.
- Cập nhật các translator để bỏ phần code xử lý message từ service.
Như ta thấy, tất cả các thay đổi ở trên đều không ảnh hưởng gì tới tất cả các phần khác trong hệ thống.
Kết thúc
Tất cả các thiết kế ở trong bài đều được tham khảo từ cuốn Enterprise Integration Patterns by Gregor Hohpe. Bản tái bản lần thứ nhất của cuốn này đã được phát hành hè vừa rồi, tôi dự định sẽ đọc nó trong tương lai gần.
2 Thoughts on “Dùng Amazon SQS để kết nối các service – Phần 2”