Skip to content

Commit

Permalink
Java: refresh -> renew
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm committed Jun 23, 2023
1 parent 1135742 commit 374bfdd
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -537,12 +537,12 @@ public void closeFlightInfo(FlightInfo info, CallOption... options) {
/**
* Request the server to extend the lifetime of a query result set.
*
* @param endpoint The result set partition.
* @param request The result set partition.
* @param options Call options.
* @return The new endpoint with an updated expiration time.
*/
public FlightEndpoint refreshFlightEndpoint(FlightEndpoint endpoint, CallOption... options) {
Action action = new Action(FlightConstants.REFRESH_FLIGHT_ENDPOINT.getType(), endpoint.serialize().array());
public FlightEndpoint renewFlightEndpoint(RenewFlightEndpointRequest request, CallOption... options) {
Action action = new Action(FlightConstants.RENEW_FLIGHT_ENDPOINT.getType(), request.serialize().array());
Iterator<Result> results = doAction(action, options);
if (!results.hasNext()) {
throw CallStatus.INTERNAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public interface FlightConstants {
"Close the given FlightInfo explicitly.\n" +
"Request Message: FlightInfo to be closed\n" +
"Response Message: N/A");
ActionType REFRESH_FLIGHT_ENDPOINT = new ActionType("RefreshFlightEndpoint",
ActionType RENEW_FLIGHT_ENDPOINT = new ActionType("RenewFlightEndpoint",
"Extend expiration time of the given FlightEndpoint.\n" +
"Request Message: FlightEndpoint to be refreshed\n" +
"Response Message: Refreshed FlightEndpoint");
"Request Message: FlightEndpoint to be renewed\n" +
"Response Message: Renewed FlightEndpoint");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Objects;

import org.apache.arrow.flight.impl.Flight;

/** A request to extend the expiration time of a FlightEndpoint. */
public class RenewFlightEndpointRequest {
private final FlightEndpoint endpoint;

public RenewFlightEndpointRequest(FlightEndpoint endpoint) {
this.endpoint = Objects.requireNonNull(endpoint);
}

RenewFlightEndpointRequest(Flight.RenewFlightEndpointRequest proto) throws URISyntaxException {
this(new FlightEndpoint(proto.getEndpoint()));
}

public FlightEndpoint getFlightEndpoint() {
return endpoint;
}

Flight.RenewFlightEndpointRequest toProtocol() {
Flight.RenewFlightEndpointRequest.Builder b = Flight.RenewFlightEndpointRequest.newBuilder();
b.setEndpoint(endpoint.toProtocol());
return b.build();
}

/**
* Get the serialized form of this protocol message.
*
* <p>Intended to help interoperability by allowing non-Flight services to still return Flight types.
*/
public ByteBuffer serialize() {
return ByteBuffer.wrap(toProtocol().toByteArray());
}

/**
* Parse the serialized form of this protocol message.
*
* <p>Intended to help interoperability by allowing Flight clients to obtain stream info from non-Flight services.
*
* @param serialized The serialized form of the message, as returned by {@link #serialize()}.
* @return The deserialized message.
* @throws IOException if the serialized form is invalid.
*/
public static RenewFlightEndpointRequest deserialize(ByteBuffer serialized) throws IOException, URISyntaxException {
return new RenewFlightEndpointRequest(Flight.RenewFlightEndpointRequest.parseFrom(serialized));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void client(BufferAllocator allocator, Location location, FlightClient cl
}

// Check data
IntegrationAssertions.assertEquals(5, batches.size());
IntegrationAssertions.assertEquals(3, batches.size());
try (final VectorSchemaRoot root = VectorSchemaRoot.create(ExpirationTimeProducer.SCHEMA, allocator)) {
final VectorLoader loader = new VectorLoader(root);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.RenewFlightEndpointRequest;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -164,7 +165,8 @@ public void doAction(CallContext context, Action action, StreamListener<Result>
statuses.get(index).closed = true;
}
} else if (action.getType().equals(FlightConstants.RENEW_FLIGHT_ENDPOINT.getType())) {
FlightEndpoint endpoint = FlightEndpoint.deserialize(ByteBuffer.wrap(action.getBody()));
RenewFlightEndpointRequest request = RenewFlightEndpointRequest.deserialize(ByteBuffer.wrap(action.getBody()));
FlightEndpoint endpoint = request.getFlightEndpoint();
int index = parseIndexFromTicket(endpoint.getTicket());
EndpointStatus status = statuses.get(index);
if (status.closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.RenewFlightEndpointRequest;
import org.apache.arrow.memory.BufferAllocator;

/** Test RenewFlightEndpoint. */
Expand All @@ -50,7 +51,7 @@ public void client(BufferAllocator allocator, Location location, FlightClient cl
continue;
}
Instant expiration = endpoint.getExpirationTime().get();
FlightEndpoint renewed = client.renewFlightEndpoint(endpoint);
FlightEndpoint renewed = client.renewFlightEndpoint(new RenewFlightEndpointRequest(endpoint));

IntegrationAssertions.assertTrue("Renewed FlightEndpoint must have expiration time",
renewed.getExpirationTime().isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.Result;

/** Typed StreamListener for refreshFlightEndpoint. */
/** Typed StreamListener for renewFlightEndpoint. */
public class FlightEndpointListener implements FlightProducer.StreamListener<FlightEndpoint> {
private final FlightProducer.StreamListener<Result> listener;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.RenewFlightEndpointRequest;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.SchemaResult;
import org.apache.arrow.flight.SyncPutListener;
Expand Down Expand Up @@ -917,12 +918,12 @@ public CancelResult cancelQuery(FlightInfo info, CallOption... options) {
/**
* Request the server to extend the lifetime of a query result set.
*
* @param endpoint The result set partition.
* @param request The result set partition.
* @param options Call options.
* @return The new endpoint with an updated expiration time.
*/
public FlightEndpoint refreshFlightEndpoint(FlightEndpoint endpoint, CallOption... options) {
return client.refreshFlightEndpoint(endpoint, options);
public FlightEndpoint renewFlightEndpoint(RenewFlightEndpointRequest request, CallOption... options) {
return client.renewFlightEndpoint(request, options);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.RenewFlightEndpointRequest;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.SchemaResult;
import org.apache.arrow.flight.Ticket;
Expand Down Expand Up @@ -381,18 +382,18 @@ default void doAction(CallContext context, Action action, StreamListener<Result>
return;
}
closeFlightInfo(info, context, new NoResultListener(listener));
} else if (actionType.equals(FlightConstants.REFRESH_FLIGHT_ENDPOINT.getType())) {
final FlightEndpoint endpoint;
} else if (actionType.equals(FlightConstants.RENEW_FLIGHT_ENDPOINT.getType())) {
final RenewFlightEndpointRequest request;
try {
endpoint = FlightEndpoint.deserialize(ByteBuffer.wrap(action.getBody()));
request = RenewFlightEndpointRequest.deserialize(ByteBuffer.wrap(action.getBody()));
} catch (IOException | URISyntaxException e) {
listener.onError(CallStatus.INTERNAL
.withDescription("Could not unpack FlightInfo: " + e)
.withCause(e)
.toRuntimeException());
return;
}
refreshFlightEndpoint(endpoint, context, new FlightEndpointListener(listener));
renewFlightEndpoint(request, context, new FlightEndpointListener(listener));
} else {
throw CallStatus.INVALID_ARGUMENT
.withDescription("Unrecognized request: " + action.getType())
Expand Down Expand Up @@ -909,14 +910,14 @@ void getStreamCrossReference(CommandGetCrossReference command, CallContext conte
ServerStreamListener listener);

/**
* Refresh the duration of the given endpoint.
* Renew the duration of the given endpoint.
*
* @param endpoint The endpoint to refresh.
* @param request The endpoint to renew.
* @param context Per-call context.
* @param listener An interface for sending data back to the client.
*/
default void refreshFlightEndpoint(FlightEndpoint endpoint, CallContext context,
StreamListener<FlightEndpoint> listener) {
default void renewFlightEndpoint(RenewFlightEndpointRequest request, CallContext context,
StreamListener<FlightEndpoint> listener) {
listener.onError(CallStatus.UNIMPLEMENTED.toRuntimeException());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,10 +1016,10 @@ public void testCloseInfo() {
}

@Test
public void testRefreshEndpoint() {
public void testRenewEndpoint() {
FlightInfo info = sqlClient.getSqlInfo();
FlightRuntimeException fre = assertThrows(FlightRuntimeException.class,
() -> sqlClient.refreshFlightEndpoint(info.getEndpoints().get(0)));
() -> sqlClient.renewFlightEndpoint(new RenewFlightEndpointRequest(info.getEndpoints().get(0))));
assertEquals(FlightStatusCode.UNIMPLEMENTED, fre.status().code());
}
}

0 comments on commit 374bfdd

Please sign in to comment.