diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java index 8af5fffd01d6a..093e53e9feb97 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java @@ -537,12 +537,12 @@ public void closeFlightInfo(FlightInfo info, CallOption... options) { /** * Request the server to extend the lifetime of a query result set. * - * @param endpoint The result set partition. + * @param request The result set partition. * @param options Call options. * @return The new endpoint with an updated expiration time. */ - public FlightEndpoint refreshFlightEndpoint(FlightEndpoint endpoint, CallOption... options) { - Action action = new Action(FlightConstants.REFRESH_FLIGHT_ENDPOINT.getType(), endpoint.serialize().array()); + public FlightEndpoint renewFlightEndpoint(RenewFlightEndpointRequest request, CallOption... options) { + Action action = new Action(FlightConstants.RENEW_FLIGHT_ENDPOINT.getType(), request.serialize().array()); Iterator results = doAction(action, options); if (!results.hasNext()) { throw CallStatus.INTERNAL diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightConstants.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightConstants.java index c7fd978df2f48..6dc84c8d6f883 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightConstants.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightConstants.java @@ -36,8 +36,8 @@ public interface FlightConstants { "Close the given FlightInfo explicitly.\n" + "Request Message: FlightInfo to be closed\n" + "Response Message: N/A"); - ActionType REFRESH_FLIGHT_ENDPOINT = new ActionType("RefreshFlightEndpoint", + ActionType RENEW_FLIGHT_ENDPOINT = new ActionType("RenewFlightEndpoint", "Extend expiration time of the given FlightEndpoint.\n" + - "Request Message: FlightEndpoint to be refreshed\n" + - "Response Message: Refreshed FlightEndpoint"); + "Request Message: FlightEndpoint to be renewed\n" + + "Response Message: Renewed FlightEndpoint"); } diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/RenewFlightEndpointRequest.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/RenewFlightEndpointRequest.java new file mode 100644 index 0000000000000..ea233ff5ab0c2 --- /dev/null +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/RenewFlightEndpointRequest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Objects; + +import org.apache.arrow.flight.impl.Flight; + +/** A request to extend the expiration time of a FlightEndpoint. */ +public class RenewFlightEndpointRequest { + private final FlightEndpoint endpoint; + + public RenewFlightEndpointRequest(FlightEndpoint endpoint) { + this.endpoint = Objects.requireNonNull(endpoint); + } + + RenewFlightEndpointRequest(Flight.RenewFlightEndpointRequest proto) throws URISyntaxException { + this(new FlightEndpoint(proto.getEndpoint())); + } + + public FlightEndpoint getFlightEndpoint() { + return endpoint; + } + + Flight.RenewFlightEndpointRequest toProtocol() { + Flight.RenewFlightEndpointRequest.Builder b = Flight.RenewFlightEndpointRequest.newBuilder(); + b.setEndpoint(endpoint.toProtocol()); + return b.build(); + } + + /** + * Get the serialized form of this protocol message. + * + *

Intended to help interoperability by allowing non-Flight services to still return Flight types. + */ + public ByteBuffer serialize() { + return ByteBuffer.wrap(toProtocol().toByteArray()); + } + + /** + * Parse the serialized form of this protocol message. + * + *

Intended to help interoperability by allowing Flight clients to obtain stream info from non-Flight services. + * + * @param serialized The serialized form of the message, as returned by {@link #serialize()}. + * @return The deserialized message. + * @throws IOException if the serialized form is invalid. + */ + public static RenewFlightEndpointRequest deserialize(ByteBuffer serialized) throws IOException, URISyntaxException { + return new RenewFlightEndpointRequest(Flight.RenewFlightEndpointRequest.parseFrom(serialized)); + } +} diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeDoGetScenario.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeDoGetScenario.java index 3933f9b17581e..504836b334e82 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeDoGetScenario.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeDoGetScenario.java @@ -71,7 +71,7 @@ public void client(BufferAllocator allocator, Location location, FlightClient cl } // Check data - IntegrationAssertions.assertEquals(5, batches.size()); + IntegrationAssertions.assertEquals(3, batches.size()); try (final VectorSchemaRoot root = VectorSchemaRoot.create(ExpirationTimeProducer.SCHEMA, allocator)) { final VectorLoader loader = new VectorLoader(root); diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeProducer.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeProducer.java index b58127a85f745..09088e594bea7 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeProducer.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeProducer.java @@ -38,6 +38,7 @@ import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.RenewFlightEndpointRequest; import org.apache.arrow.flight.Result; import org.apache.arrow.flight.Ticket; import org.apache.arrow.memory.BufferAllocator; @@ -164,7 +165,8 @@ public void doAction(CallContext context, Action action, StreamListener statuses.get(index).closed = true; } } else if (action.getType().equals(FlightConstants.RENEW_FLIGHT_ENDPOINT.getType())) { - FlightEndpoint endpoint = FlightEndpoint.deserialize(ByteBuffer.wrap(action.getBody())); + RenewFlightEndpointRequest request = RenewFlightEndpointRequest.deserialize(ByteBuffer.wrap(action.getBody())); + FlightEndpoint endpoint = request.getFlightEndpoint(); int index = parseIndexFromTicket(endpoint.getTicket()); EndpointStatus status = statuses.get(index); if (status.closed) { diff --git a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeRenewFlightEndpointScenario.java b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeRenewFlightEndpointScenario.java index 9b6d783ad5785..6f47e2965a217 100644 --- a/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeRenewFlightEndpointScenario.java +++ b/java/flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/ExpirationTimeRenewFlightEndpointScenario.java @@ -27,6 +27,7 @@ import org.apache.arrow.flight.FlightProducer; import org.apache.arrow.flight.FlightServer; import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.RenewFlightEndpointRequest; import org.apache.arrow.memory.BufferAllocator; /** Test RenewFlightEndpoint. */ @@ -50,7 +51,7 @@ public void client(BufferAllocator allocator, Location location, FlightClient cl continue; } Instant expiration = endpoint.getExpirationTime().get(); - FlightEndpoint renewed = client.renewFlightEndpoint(endpoint); + FlightEndpoint renewed = client.renewFlightEndpoint(new RenewFlightEndpointRequest(endpoint)); IntegrationAssertions.assertTrue("Renewed FlightEndpoint must have expiration time", renewed.getExpirationTime().isPresent()); diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightEndpointListener.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightEndpointListener.java index aa9357c3a4863..c92888f6c92df 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightEndpointListener.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightEndpointListener.java @@ -21,7 +21,7 @@ import org.apache.arrow.flight.FlightProducer; import org.apache.arrow.flight.Result; -/** Typed StreamListener for refreshFlightEndpoint. */ +/** Typed StreamListener for renewFlightEndpoint. */ public class FlightEndpointListener implements FlightProducer.StreamListener { private final FlightProducer.StreamListener listener; diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java index 63aee483b5773..c6987f1e06f8c 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java @@ -66,6 +66,7 @@ import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.flight.FlightStream; import org.apache.arrow.flight.PutResult; +import org.apache.arrow.flight.RenewFlightEndpointRequest; import org.apache.arrow.flight.Result; import org.apache.arrow.flight.SchemaResult; import org.apache.arrow.flight.SyncPutListener; @@ -917,12 +918,12 @@ public CancelResult cancelQuery(FlightInfo info, CallOption... options) { /** * Request the server to extend the lifetime of a query result set. * - * @param endpoint The result set partition. + * @param request The result set partition. * @param options Call options. * @return The new endpoint with an updated expiration time. */ - public FlightEndpoint refreshFlightEndpoint(FlightEndpoint endpoint, CallOption... options) { - return client.refreshFlightEndpoint(endpoint, options); + public FlightEndpoint renewFlightEndpoint(RenewFlightEndpointRequest request, CallOption... options) { + return client.renewFlightEndpoint(request, options); } @Override diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java index 031e4eca12f8d..8f49782f0def2 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java @@ -62,6 +62,7 @@ import org.apache.arrow.flight.FlightProducer; import org.apache.arrow.flight.FlightStream; import org.apache.arrow.flight.PutResult; +import org.apache.arrow.flight.RenewFlightEndpointRequest; import org.apache.arrow.flight.Result; import org.apache.arrow.flight.SchemaResult; import org.apache.arrow.flight.Ticket; @@ -381,10 +382,10 @@ default void doAction(CallContext context, Action action, StreamListener return; } closeFlightInfo(info, context, new NoResultListener(listener)); - } else if (actionType.equals(FlightConstants.REFRESH_FLIGHT_ENDPOINT.getType())) { - final FlightEndpoint endpoint; + } else if (actionType.equals(FlightConstants.RENEW_FLIGHT_ENDPOINT.getType())) { + final RenewFlightEndpointRequest request; try { - endpoint = FlightEndpoint.deserialize(ByteBuffer.wrap(action.getBody())); + request = RenewFlightEndpointRequest.deserialize(ByteBuffer.wrap(action.getBody())); } catch (IOException | URISyntaxException e) { listener.onError(CallStatus.INTERNAL .withDescription("Could not unpack FlightInfo: " + e) @@ -392,7 +393,7 @@ default void doAction(CallContext context, Action action, StreamListener .toRuntimeException()); return; } - refreshFlightEndpoint(endpoint, context, new FlightEndpointListener(listener)); + renewFlightEndpoint(request, context, new FlightEndpointListener(listener)); } else { throw CallStatus.INVALID_ARGUMENT .withDescription("Unrecognized request: " + action.getType()) @@ -909,14 +910,14 @@ void getStreamCrossReference(CommandGetCrossReference command, CallContext conte ServerStreamListener listener); /** - * Refresh the duration of the given endpoint. + * Renew the duration of the given endpoint. * - * @param endpoint The endpoint to refresh. + * @param request The endpoint to renew. * @param context Per-call context. * @param listener An interface for sending data back to the client. */ - default void refreshFlightEndpoint(FlightEndpoint endpoint, CallContext context, - StreamListener listener) { + default void renewFlightEndpoint(RenewFlightEndpointRequest request, CallContext context, + StreamListener listener) { listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException()); } diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java index 281664a6eccd1..e99b153a01bcf 100644 --- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java @@ -1016,10 +1016,10 @@ public void testCloseInfo() { } @Test - public void testRefreshEndpoint() { + public void testRenewEndpoint() { FlightInfo info = sqlClient.getSqlInfo(); FlightRuntimeException fre = assertThrows(FlightRuntimeException.class, - () -> sqlClient.refreshFlightEndpoint(info.getEndpoints().get(0))); + () -> sqlClient.renewFlightEndpoint(new RenewFlightEndpointRequest(info.getEndpoints().get(0)))); assertEquals(FlightStatusCode.UNIMPLEMENTED, fre.status().code()); } }