Note: see the link below for the English version of this article.

https://duongnt.com/elasticsearch-async-search-kotlin

Async search với Elasticsearch và Kotlin

Trong bài trước, chúng ta đã tìm hiểu 2 phương pháp để tạo SearchRequest bằng library Elasticsearch Java API Client. Nhưng có lẽ các bạn cũng nhận ra là chúng ta gửi những request đó lên cluster Elasticsearch một cách đồng bộ. Hôm nay, chúng ta sẽ tìm hiểu 2 phương án để gửi request một cách không đồng bộ. Các bạn nên đọc qua bài trước để dễ hiểu hơn những khái niệm trong bài lần này.

Các bạn có thể tải code ví dụ trong bài từ đường link dưới đây.

https://github.com/duongntbk/elasticsearchclient-async-demo

Các bước chuẩn bị

Cũng giống như trong bài trước, chúng ta cần có một cluster Elasticsearch để chạy thử code. Các bạn có thể làm theo các bước setup trong hướng dẫn này. Chúng ta cũng sẽ dùng lại dữ liệu test trong bài trước nên tôi không copy lại chúng ở đây.

Gửi request một cách không đồng bộ

Như đã biết, trong Kotlin, phương pháp gửi request một cách không đồng bộ và non-blocking phổ biến nhất là dùng hàm suspending và coroutine. Nhưng thư viện Elasticsearch Java API Client lại được viết bằng Java, và trong Java không có khái niệm hàm suspending hay coroutine. Vậy ta phải gọi hàm Java một cách không đồng bộ bằng cách nào? Thật may là thư viện này có cung cấp hàm search với kiểu dữ liệu trả về là CompletableFuture, và ta có thể chờ nó chạy xong một cách non-blocking.

Cài các dependency

So với bài trước, ta cần cài thêm một vài thư viện để gửi request một cách không đồng bộ. Các bạn có thể xem danh sách đầy đủ tại đây. Dưới đây là các thư viện mới.

implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-core", "1.7.2")
implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-android", "1.7.2")
implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-jdk8", "1.7.2")

Tạo client async

Các bước khởi tạo một client async gần như tương tự với cách tạo một client thông thường. Như ta thấy trong wrapper class ElasticsearchAsyncClientWrapper, điểm khác biệt duy nhất là ở object mà ta cần khởi tạo.

// Để ý là ta tạo object "ElasticsearchAsyncClient" thay vì object "ElasticsearchClient"
private val client: ElasticsearchAsyncClient = ElasticsearchAsyncClient(transport)

ElasticsearchAsyncClient cũng cung cấp một hàm search với tham số là object SearchRequestClass<T> (để quy định cách ta deserialize response). Tuy nhiên, thay vì trả về một SearchResponse<T>, hàm này trả về một CompletableFuture<SearchResponse<T>>.

Nhờ đó ta có thể gọi hàm kotlinx.coroutines.future.await trên CompletableFuture này để đợi request chạy xong mà không cần block thread hiện tại. Những bước này được implement trong hàm search của wrapper class.

return client.search(request, tDocumentClass).await()

Ở đây, tôi có thêm một delay vào wrapper class để giúp việc demo dễ dàng hơn. Ta có thể dùng thiết lập này để quy định đợi bao nhiêu mili giây trước khi trả response về cho client.

if (delayInMs > 0) {
    delay(delayInMs) // non-blocking wait
}

Gửi 2 request song song

Ta sẽ chạy thử việc gửi request một cách không đồng bộ tại đây. Wrapper class đã được thiết lập với thời gian delay là 2 giây để ta dễ thấy việc các request chạy song song. Ta tạo 2 search request bằng hàm buildQueryByClassgửi chúng lên cluster trong 2 coroutine khác nhau.

val deferredResponse1 = async(Dispatchers.IO) {
    val rs = client.search(request1, Footballer::class.java)
    println("Finish search 1 from thread: ${Thread.currentThread().name}")
    rs
}
val deferredResponse2 = async(Dispatchers.IO) {
    val rs = client.search(request2, Footballer::class.java)
    println("Finish search 2 from thread: ${Thread.currentThread().name}")
    rs
}

Sau đó, chúng ta đợi cho cả 2 request chạy xong và in ra kết quả cùng với tổng thời gian chạy.

awaitAll(deferredResponse1, deferredResponse2)
val response1 = deferredResponse1.await()
val response2 = deferredResponse2.await()

val processTime = stopWatch.elapsed(TimeUnit.MILLISECONDS)

println("Hits returned: ${response1.hits().total()?.value() ?: 0}")
println("Hits returned: ${response2.hits().total()?.value() ?: 0}")
println("Processing time: $processTime")

Kết quả sẽ như dưới đây (tên thread của bạn có thể khác tên thread trong ví dụ).

Finish search 1 from thread: DefaultDispatcher-worker-2
Finish search 2 from thread: DefaultDispatcher-worker-2
Search 1 hits returned: 5
Search 2 hits returned: 5
Processing time: 2318

Như ta thấy, mặc dù từng request chạy mất 2 giây, chúng ta chỉ mất hơn 2 giây một chút để nhận được cả 2 response. Hơn nữa, vì request của chúng ta là I/O bound, ta có thể dùng cùng một thread để xử lý chúng. Và vì việc gửi request không block thread, ta có thể thiết lập timeout cho chúng một cách dễ dàng hơn.

Gửi async request

Điểm khác biệt giữa async request và gửi request một cách không đồng bộ là gì? Giả sử ta có một query hết sức phức tạp và chạy tốn nhiều thời gian, tầm vài phút, thậm chí vài giờ. Trong trường hợp đó, dù ta có gửi request một cách không đồng bộ thì có lẽ nó cũng sẽ bị timeout bởi những thiết lập HTTP. Và dù ta có thể đặt giá trị timeout thật lớn thì việc giữ một kết nối ở trạng thái mở lâu như vậy cũng là không phù hợp. Thay vào đó, ta có thể bảo cluster Elasticsearch bắt đầu quá trình xử lý request, ngắt kết nối, rồi mở một kết nối mới sau đó một thời gian để lấy kết quả. Đó chính là cách hoạt động của async request.

Tạo client cho async search

Như ta thấy trong wrapper class ElasticsearchAsyncSearchClientWrapper, ta cần tạo một object ElasticsearchClient rồi gọi hàm asyncSearch trên nó.

private val client: ElasticsearchAsyncSearchClient = ElasticsearchClient(transport).asyncSearch()

Gửi SubmitRequest lên cluster Elasticsearch

Để bắt đầu quá trình tìm kiếm, ta cần gửi một SubmitRequest lên cluster. SubmitRequest rất giống với SearchRequest, điểm khác biệt duy nhất là ta truyền query vào SubmitRequest.Builder thay vì vào SearchRequest.Builder. Ta sẽ tái sử dụng hàm buildQueryByClass và thêm vào một vài thiết lập mới.

val submitRequest = SubmitRequest.Builder()
    .index("footballer") # Ta vẫn dùng index footballer
    .query(buildQueryByClass().query())
    .waitForCompletionTimeout(Time.of { t -> t.time("0s") }) # Không đợi SearchResponse mà nhận search Id rồi ngắt kết nối ngay
    .keepAlive(Time.of { t -> t.time("30s") }) # Thời gian lưu kết quả trên cluster
    .build()

Tất cả các thiết lập được lưu tại đây. Dưới đây là một vài thiết lập khác mà tôi thấy ta nên chú ý tới

  • keepOnCompletion: chỉ có ý nghĩa nếu cluster hoàn thành việc tìm kiếm trong giới hạn ta quy định tại “wait_for_completion”. Nếu là true thì response sẽ được lưu trên server cho đến hết keep_alive. Còn nếu không response sẽ bị xoá ngay.
  • explain: bổ sung thông tin về quá trình tính điểm vào response.

Ta sẽ gửi SubmitRequest lên cluster bằng wrapper và nhận về một SubmitResponse<T>. Nếu cluster hoàn thành xử lý query trước khi hết waitForCompletionTimeout thì response sẽ được lưu luôn trong SubmitResponse. Còn nếu không thì response này sẽ chỉ chứa một Id, và ta sẽ dùng Id này để lấy dữ liệu sau này.

Vì ta để giá trị waitForCompletionTimeout là 0s nên ta sẽ chỉ nhận được Id. Ta sẽ kiểm tra nó bằng đoạn code sau.

val submitResponse = client.submit(submitRequest, Footballer::class.java)
println("Id: ${submitResponse.id()}, IsPartial: ${submitResponse.isPartial}, IsRunning: ${submitResponse.isRunning}")

Giá trị in ra console sẽ như sau.

Id: <a random id>, IsPartial: true, IsRunning: true

Như đã thấy, quá trình tìm kiếm của của ta vẫn chưa chạy xong.

Dùng Id để lấy kết quả

Ta cần gửi GetAsyncSearchRequest lên cluster để lấy kết quả. Ở dạng đơn giản nhất, request này chỉ chứa Id đã nói ở phần trước.

val responseRequest = GetAsyncSearchRequest.Builder().id(submitResponse.id()).build()

Ta dùng wrapper class để gửi GetAsyncSearchRequest lên Elasticsearch cluster và nhận về GetAsyncSearchResponse. Từ response đó, ta có thể kiểm tra xem quá trình tìm kiếm đã xong hay chưa, và nếu đã xong thì ta sẽ in ra kết quả.

val response = client.getResponse(responseRequest, Footballer::class.java)

if (!response.isRunning) {
    printResults(response.response().hits())
}

Kết quả của đoạn code trên là như sau (tôi sử dụng lại hàm printResult của bài trước).

Hits: 5
Name: Bukayo Saka, Score: 4.787636
Name: Antony, Score: 4.145132
Name: Mahrez, Score: 3.7876358
Name: Sancho, Score: 2.1451323
Name: Vinicius Junior, Score: 2.1451323

Có thể thấy là kết quả của ta giống hệt trong bài trước. Tất nhiên ta cũng có thể dùng vòng lặp để định kỳ gọi getResponse và thử lại nếu như vẫn chưa có kết quả.

Có thể gửi async request một cách không đồng bộ hay không?

Chắc là sẽ có người tự hỏi vì sao ta lại gửi SubmitRequestGetAsyncSearchRequest một cách đồng bộ. Ta có thể gửi chúng một cách không đồng bộ hay không? Đúng là ta có thể làm như vậy. Ta chỉ cần thay object ElasticsearchAsyncSearchClient bằng object ElasticsearchAsyncSearchAsyncClient. Và ta có thể tạo client mới này từ một ElasticsearchAsyncClient.

private val client: ElasticsearchAsyncSearchAsyncClient = ElasticsearchAsyncClient(transport).asyncSearch()

Sau đó, ta có thể gọi hàm submit and getResponse bằng client mới này để nhận về CompletableFuture rồi await chúng trong hàm suspending.

suspend fun <TDocument> submit(
    request: SubmitRequest,
    tDocumentClass: Class<TDocument>
): SubmitResponse<TDocument> = client.submit(request, tDocumentClass).await()

// Và

suspend fun <TDocument> getResponse(
    request: GetAsyncSearchRequest,
    tDocumentClass: Class<TDocument>
): GetAsyncSearchResponse<TDocument> = client.get(request, tDocumentClass).await()

Nhưng theo tôi phương pháp này là không thật sự cần thiết vì một số lý do sau đây.

  • Ta hay gửi nhiều request thông thường trong một khoảng thời gian ngắn. Vì thế ta cần dùng hàm không đồng bộ để không block quá nhiều thread cùng lúc.
  • Ngược lại, ta chỉ nên gửi một vài request lớn với async request cùng lúc. Vì thế việc block những thread này không phải là vấn đề lớn.
  • Với giá trị waitForCompletionTimeout ngắn hoặc bằng 0, ta sẽ nhận lại SubmitResponse gần như ngay lập tức và sẽ không block thread lâu.
  • Và khi nhận về GetAsyncSearchResponse thì việc xử lý query đã xong nên ta cũng không phải block thread lâu trong trường hợp này.

Kết thúc

Khi tìm hiểu về các kiểu client mà Elasticsearch API Client hỗ trợ, tôi thấy dễ bị nhầm lẫn bởi ElasticsearchAsyncClient and ElasticsearchAsyncSearchClient. Hy vọng là bài viết hôm nay sẽ giúp các bạn hiểu rõ hơn sự khác biệt giữa 2 kiểu client này. Các bạn có cho rằng ElasticsearchAsyncSearchAsyncClient là hơi thừa hay không? Hay các bạn có tình huống cần tới nó? Hãy cho tôi biết trong phần bình luận.

A software developer from Vietnam and is currently living in Japan.

One Thought on “Async search với Elasticsearch và Kotlin”

Leave a Reply