Skip to content

Commit

Permalink
Move ES|QL helper to flavor-specific area, add async support
Browse files Browse the repository at this point in the history
  • Loading branch information
swallez committed Mar 13, 2024
1 parent 797cca1 commit 6ab0fa1
Show file tree
Hide file tree
Showing 23 changed files with 186 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

package co.elastic.clients.elasticsearch._helpers.esql;

import co.elastic.clients.elasticsearch.esql.ElasticsearchEsqlClient;
import co.elastic.clients.ApiClient;
import co.elastic.clients.elasticsearch.esql.QueryRequest;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BinaryResponse;

import java.io.IOException;
Expand All @@ -42,6 +43,5 @@ public interface EsqlAdapter<Result> {
/**
* Deserialize the raw http response returned by the server
*/
Result deserialize(ElasticsearchEsqlClient client, QueryRequest request, BinaryResponse response) throws IOException;

Result deserialize(ApiClient<ElasticsearchTransport, ?> client, QueryRequest request, BinaryResponse response) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,51 +20,86 @@
package co.elastic.clients.elasticsearch._helpers.esql;

import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch.esql.ElasticsearchEsqlAsyncClient;
import co.elastic.clients.elasticsearch.esql.ElasticsearchEsqlClient;
import co.elastic.clients.elasticsearch.esql.QueryRequest;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.JsonpUtils;
import co.elastic.clients.transport.endpoints.BinaryResponse;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class EsqlHelper {

public static <T> T query(ElasticsearchEsqlClient client, EsqlAdapter<T> deserializer, String query, Object... params)
throws IOException {
//----- Synchronous

QueryRequest request = QueryRequest.of(esql -> esql
.format(deserializer.format())
.columnar(deserializer.columnar())
.query(query)
.params(asFieldValues(params))
);

String json = JsonpUtils.toJsonString(request, client._jsonpMapper());
public static <T> T query(
ElasticsearchEsqlClient client, EsqlAdapter<T> adapter, String query, Object... params
) throws IOException {
QueryRequest request = buildRequest(adapter, query, params);
BinaryResponse response = client.query(request);
return adapter.deserialize(client, request, response);
}

public static <T> T query(ElasticsearchEsqlClient client, EsqlAdapter<T> adapter, QueryRequest request) throws IOException {
request = buildRequest(adapter, request);
BinaryResponse response = client.query(request);
return deserializer.deserialize(client, request, response);
return adapter.deserialize(client, request, response);
}

//----- Asynchronous

public static <T> CompletableFuture<T> queryAsync(
ElasticsearchEsqlAsyncClient client, EsqlAdapter<T> adapter, String query, Object... params
) {
return doQueryAsync(client, adapter, buildRequest(adapter, query, params));
}

public static <T> CompletableFuture<T> queryAsync(
ElasticsearchEsqlAsyncClient client, EsqlAdapter<T> adapter, QueryRequest request
) {
return doQueryAsync(client, adapter, buildRequest(adapter, request));
}

private static <T> CompletableFuture<T> doQueryAsync(
ElasticsearchEsqlAsyncClient client, EsqlAdapter<T> adapter, QueryRequest request
) {
return client
.query(request)
.thenApply(r -> {
try {
return adapter.deserialize(client, request, r);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

public static <T> T query(ElasticsearchEsqlClient client, EsqlAdapter<T> deserializer, QueryRequest request) throws IOException {
//----- Utilities

private static QueryRequest buildRequest(EsqlAdapter<?> adapter, String query, Object... params) {
return QueryRequest.of(esql -> esql
.format(adapter.format())
.columnar(adapter.columnar())
.query(query)
.params(asFieldValues(params))
);
}

QueryRequest esql = QueryRequest.of(q -> q
private static QueryRequest buildRequest(EsqlAdapter<?> adapter, QueryRequest request) {
return QueryRequest.of(q -> q
// Set/override format and columnar
.format(deserializer.format())
.columnar(deserializer.columnar())
.format(adapter.format())
.columnar(adapter.columnar())

.delimiter(request.delimiter())
.filter(request.filter())
.locale(request.locale())
.params(request.params())
.query(request.query())
);

// FIXME: set columnar and format form adapter
BinaryResponse response = client.query(request);
return deserializer.deserialize(client, request, response);
}

private static List<FieldValue> asFieldValues(Object... objects) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

package co.elastic.clients.elasticsearch._helpers.esql;

import co.elastic.clients.elasticsearch.esql.ElasticsearchEsqlClient;
import co.elastic.clients.ApiClient;
import co.elastic.clients.elasticsearch.esql.QueryRequest;
import co.elastic.clients.json.BufferingJsonGenerator;
import co.elastic.clients.json.BufferingJsonpMapper;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.JsonpUtils;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BinaryResponse;
import jakarta.json.stream.JsonParser;

Expand Down Expand Up @@ -63,7 +64,8 @@ public boolean columnar() {
}

@Override
public Iterable<T> deserialize(ElasticsearchEsqlClient client, QueryRequest request, BinaryResponse response) throws IOException {
public Iterable<T> deserialize(ApiClient<ElasticsearchTransport, ?> client, QueryRequest request, BinaryResponse response)
throws IOException {
JsonpMapper mapper = client._jsonpMapper();

if (!(mapper instanceof BufferingJsonpMapper)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

package co.elastic.clients.elasticsearch._helpers.esql.jdbc;

import co.elastic.clients.ApiClient;
import co.elastic.clients.elasticsearch._helpers.esql.EsqlAdapterBase;
import co.elastic.clients.elasticsearch._helpers.esql.EsqlMetadata;
import co.elastic.clients.elasticsearch.esql.ElasticsearchEsqlClient;
import co.elastic.clients.elasticsearch.esql.QueryRequest;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BinaryResponse;
import jakarta.json.stream.JsonParser;

Expand All @@ -46,7 +47,8 @@ public boolean columnar() {
}

@Override
public ResultSet deserialize(ElasticsearchEsqlClient client, QueryRequest request, BinaryResponse response) throws IOException {
public ResultSet deserialize(ApiClient<ElasticsearchTransport, ?> client, QueryRequest request, BinaryResponse response)
throws IOException {
JsonpMapper mapper = client._jsonpMapper();
JsonParser parser = mapper.jsonProvider().createParser(response.content());
EsqlMetadata metadata = EsqlAdapterBase.readHeader(parser, mapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public class ElasticsearchException extends RuntimeException {
@Nullable
private final TransportHttpClient.Response httpResponse;

public ElasticsearchException(String endpointId, ErrorResponse response, @Nullable TransportHttpClient.Response httpResponse) {
public ElasticsearchException(String endpointId, ErrorResponse response,
@Nullable TransportHttpClient.Response httpResponse) {
super("[" + endpointId + "] failed: [" + response.error().type() + "] " + response.error().reason());
this.response = response;
this.endpointId = endpointId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import co.elastic.clients.util.ObjectBuilderBase;
import co.elastic.clients.util.TaggedUnion;
import co.elastic.clients.util.TaggedUnionUtils;
import jakarta.json.Json;
import jakarta.json.stream.JsonGenerator;
import jakarta.json.stream.JsonParser;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package co.elastic.clients.elasticsearch.esql;

import co.elastic.clients.ApiClient;
import co.elastic.clients.elasticsearch._helpers.esql.EsqlAdapter;
import co.elastic.clients.elasticsearch._helpers.esql.EsqlHelper;
import co.elastic.clients.elasticsearch._types.ErrorResponse;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.Endpoint;
Expand Down Expand Up @@ -98,4 +100,43 @@ public final CompletableFuture<BinaryResponse> query(
return query(fn.apply(new QueryRequest.Builder()).build());
}

/**
* Executes an ES|QL request and adapts its result to a target type.
*
* @param adapter
* the ES|QL response adapter
* @param query
* the ES|QL query
* @param parameters
* values for query parameters, if any
*/
public final <T> CompletableFuture<T> query(EsqlAdapter<T> adapter, String query, Object... parameters) {
return EsqlHelper.queryAsync(this, adapter, query, parameters);
}

/**
* Executes an ES|QL request and adapts its result to a target type.
*
* @param adapter
* the ES|QL response adapter
* @param request
* the ES|QL request
*/
public final <T> CompletableFuture<T> query(EsqlAdapter<T> adapter, QueryRequest request) {
return EsqlHelper.queryAsync(this, adapter, request);
}

/**
* Executes an ES|QL request and adapts its result to a target type.
*
* @param adapter
* the ES|QL response adapter
* @param fn
* the ES|QL request builder
*/
public final <T> CompletableFuture<T> query(EsqlAdapter<T> adapter,
Function<QueryRequest.Builder, ObjectBuilder<QueryRequest>> fn) {
return EsqlHelper.queryAsync(this, adapter, fn.apply(new QueryRequest.Builder()).build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import co.elastic.clients.elasticsearch._types.ErrorResponse;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.Endpoint;
import co.elastic.clients.transport.JsonEndpoint;
import co.elastic.clients.transport.Transport;
import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.transport.endpoints.BinaryResponse;
import co.elastic.clients.util.ObjectBuilder;
Expand Down Expand Up @@ -99,12 +101,44 @@ public final BinaryResponse query(Function<QueryRequest.Builder, ObjectBuilder<Q
return query(fn.apply(new QueryRequest.Builder()).build());
}

public final <T> T query(EsqlAdapter<T> adapter, String query, Object... parameters) throws IOException, ElasticsearchException {
/**
* Executes an ES|QL request and adapts its result to a target type.
*
* @param adapter
* the ES|QL response adapter
* @param query
* the ES|QL query
* @param parameters
* values for query parameters, if any
*/
public final <T> T query(EsqlAdapter<T> adapter, String query, Object... parameters)
throws IOException, ElasticsearchException {
return EsqlHelper.query(this, adapter, query, parameters);
}

/**
* Executes an ES|QL request and adapts its result to a target type.
*
* @param adapter
* the ES|QL response adapter
* @param request
* the ES|QL request
*/
public final <T> T query(EsqlAdapter<T> adapter, QueryRequest request) throws IOException, ElasticsearchException {
return EsqlHelper.query(this, adapter, request);
}

/**
* Executes an ES|QL request and adapts its result to a target type.
*
* @param adapter
* the ES|QL response adapter
* @param fn
* the ES|QL request builder
*/
public final <T> T query(EsqlAdapter<T> adapter, Function<QueryRequest.Builder, ObjectBuilder<QueryRequest>> fn)
throws IOException, ElasticsearchException {
return query(adapter, fn.apply(new QueryRequest.Builder()).build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package co.elastic.clients.elasticsearch._helpers.esql;

import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.ElasticsearchTestServer;
import co.elastic.clients.elasticsearch._helpers.esql.jdbc.ResultSetEsqlAdapter;
Expand All @@ -44,6 +45,7 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class EsqlAdapterEndToEndTest extends Assertions {

Expand Down Expand Up @@ -142,6 +144,46 @@ public void objectsTest() throws Exception {

}

@Test
public void asyncObjects() throws Exception {

ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(esClient._transport(), esClient._transportOptions());


CompletableFuture<Iterable<EmpData>> future = asyncClient.esql().query(
ObjectsEsqlAdapter.of(EmpData.class),
"FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | SORT emp_no | LIMIT 300",
// Testing parameters. Note that FROM and LIMIT do not accept parameters
"10042", "10002"
);

future.thenApply(result -> {
Iterator<EmpData> it = result.iterator();

{
EmpData emp = it.next();
assertEquals("10002", emp.empNo);
List<String> jobPositions = emp.jobPositions;
// In addition to the value, this tests that single strings are correctly deserialized as a list
assertEquals(Arrays.asList("Senior Team Lead"), emp.jobPositions);
}

{
EmpData emp = it.next();
assertEquals("10042", emp.empNo);
assertEquals(Arrays.asList("Architect", "Business Analyst", "Internship", "Junior Developer"), emp.jobPositions);

assertEquals("1993-03-21T00:00:00Z[UTC]",
DateTimeFormatter.ISO_DATE_TIME.format(emp.hireDate.toInstant().atZone(ZoneId.of("UTC")))
);
}

assertFalse(it.hasNext());
return null;
}
).get();
}

@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
@JsonIgnoreProperties(ignoreUnknown = true)
public static class EmpData {
Expand Down

0 comments on commit 6ab0fa1

Please sign in to comment.