Note: phiên bản Tiếng Việt của bài này ở link dưới.

https://duongnt.com/elasticsearch-async-search-kotlin-vie

Async search with Elasticsearch and Kotlin

We explored sending search requests to Elasticsearch with the Elasticsearch Java API Client in the previous article. But people might have noticed that we made all those requests synchronously. Today, we will try two different approaches to make search requests in an asynchronous way. It is recommended that you check out the linked article first before reading this post.

You can download the sample code in this article from the link below.

https://github.com/duongntbk/elasticsearchclient-async-demo

Prerequisites

Similar to the last article, we need a local Elasticsearch cluster to test our code. You can set up one by following this guide. Also, we will use the same test data as well, so I won’t copy it here.

Making requests asynchronously

As we all know, in Kotlin, the most common method to make an asynchronous, non-blocking request is via suspending functions and coroutines. But the Elasticsearch Java API Client is, you guessed it, written in Java, where there is no concept of coroutine nor suspending function. If so, how can we call a Java function in a non-blocking way? Fortunately, this library does provide a search function that returns a CompletableFuture, which we can wait for completion in a non-blocking way.

Install the necessary dependencies

To make asynchronous requests, we need a few additional libraries compared to last time. You can see the whole list here, below are the new ones.

implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-core", "1.7.2")
implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-android", "1.7.2")
implementation("org.jetbrains.kotlinx", "kotlinx-coroutines-jdk8", "1.7.2")

Create an async client

The steps to create an async client is almost identical to creating a normal client. As you can see in this wrapper class ElasticsearchAsyncClientWrapper, the only difference is what object we initialize.

// Notice we create an "ElasticsearchAsyncClient" object instead of an "ElasticsearchClient" object
private val client: ElasticsearchAsyncClient = ElasticsearchAsyncClient(transport)

The ElasticsearchAsyncClient also provides a search method that receives a SearchRequest object and a Class<T> object (to specify how we should deserialize the response). However, instead of returning a SearchResponse<T>, it returns a CompletableFuture<SearchResponse<T>>.

Then by using kotlinx.coroutines.future.await on that CompletableFuture, we can wait for the request to complete without blocking the current thread. All these steps are implemented in the search method of the wrapper class.

return client.search(request, tDocumentClass).await()

And just to make the demonstration easier, I added a delay setting to the wrapper class. We can use it to specify how many miliseconds we want to wait before returning the response to the client.

if (delayInMs > 0) {
    delay(delayInMs) // non-blocking wait
}

Make two concurrent requests

We will test making requests asynchronously here. The wrapper object has a 2 seconds delay just so we can easily verify that our requests are indeed concurrent. We build two search requests using the buildQueryByClass function then send them to the cluster in two different coroutines.

val deferredResponse1 = async(Dispatchers.IO) {
    val rs = client.search(request1, Footballer::class.java)
    println("Finish search 1 from thread: ${Thread.currentThread().name}")
    rs
}
val deferredResponse2 = async(Dispatchers.IO) {
    val rs = client.search(request2, Footballer::class.java)
    println("Finish search 2 from thread: ${Thread.currentThread().name}")
    rs
}

Then we wait for both requests to finish and print out the results, as well as the processing time.

awaitAll(deferredResponse1, deferredResponse2)
val response1 = deferredResponse1.await()
val response2 = deferredResponse2.await()

val processTime = stopWatch.elapsed(TimeUnit.MILLISECONDS)

println("Hits returned: ${response1.hits().total()?.value() ?: 0}")
println("Hits returned: ${response2.hits().total()?.value() ?: 0}")
println("Processing time: $processTime")

Below is the output (your thread name might be different).

Finish search 1 from thread: DefaultDispatcher-worker-2
Finish search 2 from thread: DefaultDispatcher-worker-2
Search 1 hits returned: 5
Search 2 hits returned: 5
Processing time: 2318

As we can see, even though each individual request took 2 seconds, it only took a bit more than 2 seconds to receive both responses. And moreover, as our requests are I/O bound, we can serve them from the same thread. Also, we can set a timeout for our requests much easier, as they are non-blocking.

Making async requests

So what is the difference between an async request and sending a request asynchronously? Let’s say we have a very complex query that takes a long time to run. Perhaps it can take a few minutes or even hours. In that case, even if we make such a request in an asynchronous way, it’s likely to be timed-out by our HTTP settings. And even if we can set a long enough timeout value, it does not make sense to keep a connection open for all that duration. Instead, we can tell the Elasticsearch cluster to start the searching process, disconnect, and then check back later for the result. That is exactly how an async request works.

Create an async search client

As can be seen in the wrapper class ElasticsearchAsyncSearchClientWrapper, we first create a ElasticsearchClient object and then call the asyncSearch function on it.

private val client: ElasticsearchAsyncSearchClient = ElasticsearchClient(transport).asyncSearch()

Send a SubmitRequest to Elasticsearch cluster

To start the searching process, we first need to send a SubmitRequest to the cluster. A SubmitRequest is very similar to a SearchRequest, the only difference is that instead of passing the query to a SearchRequest.Builder, we pass it to a SubmitRequest.Builder. We will reuse the buildQueryByClass function here, as well as provide some additional settings to the builder.

val submitRequest = SubmitRequest.Builder()
    .index("footballer") # We still use the footballer index
    .query(buildQueryByClass().query())
    .waitForCompletionTimeout(Time.of { t -> t.time("0s") }) # Stop waiting for the SearchResponse right away and return a search Id
    .keepAlive(Time.of { t -> t.time("30s") }) # How long the result of this search should be available
    .build()

There are other settings documented here. Below are some important ones in my opinion.

  • keepOnCompletion: only matters if a search finishes within the provided “wait_for_completion”. Keep the response on the cluster until we exceed the “keep_alive” time.
  • explain: include details about the score computation process in the search response.

We can then send the SubmitRequest to the cluster via the wrapper and get back a SubmitResponse<T>. If the search finishes within the waitForCompletionTimeout, this response will contain all the search results. Otherwise, it will only contain a Id so that we can retrieve the result later.

Because we set the waitForCompletionTimeout to 0s, we will only receive an Id. Let’s verify it with the following code.

val submitResponse = client.submit(submitRequest, Footballer::class.java)
println("Id: ${submitResponse.id()}, IsPartial: ${submitResponse.isPartial}, IsRunning: ${submitResponse.isRunning}")

This should print the following to the console.

Id: <a random id>, IsPartial: true, IsRunning: true

As we can see, the search is still running.

Use the search Id to retrieve the results

To get the result, we need to send a GetAsyncSearchRequest to the cluster. In its simplest form, a GetAsyncSearchRequest only stores the search Id.

val responseRequest = GetAsyncSearchRequest.Builder().id(submitResponse.id()).build()

Then we can use the wrapper class to send that GetAsyncSearchRequest to the Elasticsearch cluster and get back a GetAsyncSearchResponse. From that response, we can check if the search has completed, and if so, print out the details.

val response = client.getResponse(responseRequest, Footballer::class.java)

if (!response.isRunning) {
    printResults(response.response().hits())
}

Below are the results (I repurposed the printResult from the last article).

Hits: 5
Name: Bukayo Saka, Score: 4.787636
Name: Antony, Score: 4.145132
Name: Mahrez, Score: 3.7876358
Name: Sancho, Score: 2.1451323
Name: Vinicius Junior, Score: 2.1451323

As we can see, this is the same result we saw last time. Of course, we can also set up a loop to periodically call getResponse and retry if the result is not ready yet.

What about making an async request asynchronously?

Maybe some people will wonder why we send SubmitRequest and GetAsyncSearchRequest in a blocking way. Is it possible to send them in a non-blocking way as well? It is indeed possible to do so. All we need to do is replace the ElasticsearchAsyncSearchClient object with an ElasticsearchAsyncSearchAsyncClient object, which we can create from a ElasticsearchAsyncClient (aren’t they a mouthful :D).

private val client: ElasticsearchAsyncSearchAsyncClient = ElasticsearchAsyncClient(transport).asyncSearch()

Then we can call submit and getResponse with the new client, which returns a CompletableFuture and can be awaited inside a suspending function.

suspend fun <TDocument> submit(
    request: SubmitRequest,
    tDocumentClass: Class<TDocument>
): SubmitResponse<TDocument> = client.submit(request, tDocumentClass).await()

// And

suspend fun <TDocument> getResponse(
    request: GetAsyncSearchRequest,
    tDocumentClass: Class<TDocument>
): GetAsyncSearchResponse<TDocument> = client.get(request, tDocumentClass).await()

But I think this is overkill for the following reasons.

  • We normally want to send many normal search requests in a short time frame. In that case, it makes sense to use asynchronous calls because we don’t want to block too many threads at the same time.
  • In contrast, we hopefully only send a few big requests that require async requests, so it is okay to block those threads.
  • With a short or 0s waitForCompletionTimeout, we should receive a SubmitResponse right away, so we won’t block threads for that long.
  • And when retrieving a GetAsyncSearchResponse, the actual search should have already finished, so we won’t block threads for too long in this case either.

Conclusion

When first reading about all the clients that Elasticsearch API Client supports, I was a bit confused by ElasticsearchAsyncClient and ElasticsearchAsyncSearchClient. Hopefully, my article can help you avoid the same confusion. As for ElasticsearchAsyncSearchAsyncClient, do you also consider it to be overkill? Or do you know of a good use case for it? Please let me know in the comment.

A software developer from Vietnam and is currently living in Japan.

One Thought on “Async search with Elasticsearch and Kotlin”

Leave a Reply