Skip to content

Commit

Permalink
Add Java support for proposal
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm authored and kou committed Jun 21, 2023
1 parent bd2783d commit 68d32ba
Show file tree
Hide file tree
Showing 21 changed files with 1,279 additions and 16 deletions.
10 changes: 5 additions & 5 deletions dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,32 +439,32 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
"expiration_time:do_get",
description=("Ensure FlightEndpoint.expiration_time with "
"DoGet is working as expected."),
skip={"Java", "JS", "C#", "Rust"},
skip={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:list_actions",
description=("Ensure FlightEndpoint.expiration_time related "
"pre-defined actions is working with ListActions "
"as expected."),
skip={"Java", "JS", "C#", "Rust"},
skip={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:cancel_flight_info",
description=("Ensure FlightEndpoint.expiration_time and "
"CancelFlightInfo are working as expected."),
skip={"Java", "JS", "C#", "Rust"},
skip={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:close_flight_info",
description=("Ensure FlightEndpoint.expiration_time and "
"CloseFlightInfo are working as expected."),
skip={"Java", "JS", "C#", "Rust"},
skip={"JS", "C#", "Rust"},
),
Scenario(
"expiration_time:refresh_flight_endpoint",
description=("Ensure FlightEndpoint.expiration_time and "
"RefreshFlightEndpoint are working as expected."),
skip={"Java", "JS", "C#", "Rust"},
skip={"JS", "C#", "Rust"},
),
Scenario(
"flight_sql",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

import org.apache.arrow.flight.impl.Flight;

/**
* The result of cancelling a FlightInfo.
*/
public class CancelFlightInfoResult {
private final CancelStatus status;

public CancelFlightInfoResult(CancelStatus status) {
this.status = status;
}

CancelFlightInfoResult(Flight.CancelFlightInfoResult proto) {
switch (proto.getStatus()) {
case CANCEL_STATUS_UNSPECIFIED:
status = CancelStatus.UNSPECIFIED;
break;
case CANCEL_STATUS_CANCELLED:
status = CancelStatus.CANCELLED;
break;
case CANCEL_STATUS_CANCELLING:
status = CancelStatus.CANCELLING;
break;
case CANCEL_STATUS_NOT_CANCELLABLE:
status = CancelStatus.NOT_CANCELLABLE;
break;
default:
throw new IllegalArgumentException("");
}
}

public CancelStatus getStatus() {
return status;
}

Flight.CancelFlightInfoResult toProtocol() {
Flight.CancelFlightInfoResult.Builder b = Flight.CancelFlightInfoResult.newBuilder();
switch (status) {
case UNSPECIFIED:
b.setStatus(Flight.CancelStatus.CANCEL_STATUS_UNSPECIFIED);
break;
case CANCELLED:
b.setStatus(Flight.CancelStatus.CANCEL_STATUS_CANCELLED);
break;
case CANCELLING:
b.setStatus(Flight.CancelStatus.CANCEL_STATUS_CANCELLING);
break;
case NOT_CANCELLABLE:
b.setStatus(Flight.CancelStatus.CANCEL_STATUS_NOT_CANCELLABLE);
break;
default:
// Not possible
throw new AssertionError();
}
return b.build();
}

/**
* Get the serialized form of this protocol message.
*
* <p>Intended to help interoperability by allowing non-Flight services to still return Flight types.
*/
public ByteBuffer serialize() {
return ByteBuffer.wrap(toProtocol().toByteArray());
}

/**
* Parse the serialized form of this protocol message.
*
* <p>Intended to help interoperability by allowing Flight clients to obtain stream info from non-Flight services.
*
* @param serialized The serialized form of the message, as returned by {@link #serialize()}.
* @return The deserialized message.
* @throws IOException if the serialized form is invalid.
*/
public static CancelFlightInfoResult deserialize(ByteBuffer serialized) throws IOException {
return new CancelFlightInfoResult(Flight.CancelFlightInfoResult.parseFrom(serialized));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CancelFlightInfoResult that = (CancelFlightInfoResult) o;
return status == that.status;
}

@Override
public int hashCode() {
return Objects.hash(status);
}

@Override
public String toString() {
return "CancelFlightInfoResult{" +
"status=" + status +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight;

/** The result of cancelling a FlightInfo. */
public enum CancelStatus {
/**
* The cancellation status is unknown. Servers should avoid using
* this value (send a NOT_FOUND error if the requested query is
* not known). Clients can retry the request.
*/
UNSPECIFIED,
/**
* The cancellation request is complete. Subsequent requests with
* the same payload may return CANCELLED or a NOT_FOUND error.
*/
CANCELLED,
/**
* The cancellation request is in progress. The client may retry
* the cancellation request.
*/
CANCELLING,
/**
* The query is not cancellable. The client should not retry the
* cancellation request.
*/
NOT_CANCELLABLE,
;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.arrow.flight;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -489,6 +491,79 @@ public void getResult() {
}
}

/**
* Cancel execution of a distributed query.
*
* @param info The query to cancel.
* @param options Call options.
* @return The server response.
*/
public CancelFlightInfoResult cancelFlightInfo(FlightInfo info, CallOption... options) {
Action action = new Action(FlightConstants.CANCEL_FLIGHT_INFO.getType(), info.serialize().array());
Iterator<Result> results = doAction(action, options);
if (!results.hasNext()) {
throw CallStatus.INTERNAL
.withDescription("Server did not return a response")
.toRuntimeException();
}

CancelFlightInfoResult result;
try {
result = CancelFlightInfoResult.deserialize(ByteBuffer.wrap(results.next().getBody()));
} catch (IOException e) {
throw CallStatus.INTERNAL
.withDescription("Failed to parse server response: " + e)
.withCause(e)
.toRuntimeException();
}
results.forEachRemaining((ignored) -> {
});
return result;
}

/**
* Request the server to free resources associated with a query.
*
* @param info The query to close.
* @param options Call options.
*/
public void closeFlightInfo(FlightInfo info, CallOption... options) {
Action action = new Action(FlightConstants.CLOSE_FLIGHT_INFO.getType(), info.serialize().array());
Iterator<Result> results = doAction(action, options);
results.forEachRemaining((ignored) -> {
});
}

/**
* Request the server to extend the lifetime of a query result set.
*
* @param endpoint The result set partition.
* @param options Call options.
* @return The new endpoint with an updated expiration time.
*/
public FlightEndpoint refreshFlightEndpoint(FlightEndpoint endpoint, CallOption... options) {
Action action = new Action(FlightConstants.REFRESH_FLIGHT_ENDPOINT.getType(), endpoint.serialize().array());
Iterator<Result> results = doAction(action, options);
if (!results.hasNext()) {
throw CallStatus.INTERNAL
.withDescription("Server did not return a response")
.toRuntimeException();
}

FlightEndpoint result;
try {
result = FlightEndpoint.deserialize(ByteBuffer.wrap(results.next().getBody()));
} catch (IOException | URISyntaxException e) {
throw CallStatus.INTERNAL
.withDescription("Failed to parse server response: " + e)
.withCause(e)
.toRuntimeException();
}
results.forEachRemaining((ignored) -> {
});
return result;
}

/**
* Interface for writers to an Arrow data stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,18 @@ public interface FlightConstants {

FlightServerMiddleware.Key<ServerHeaderMiddleware> HEADER_KEY =
FlightServerMiddleware.Key.of("org.apache.arrow.flight.ServerHeaderMiddleware");

ActionType CANCEL_FLIGHT_INFO = new ActionType("CancelFlightInfo",
"Explicitly cancel a running FlightInfo.\n" +
"Request Message: FlightInfo to be canceled\n" +
"Response Message: ActionCancelFlightInfoResult");

ActionType CLOSE_FLIGHT_INFO = new ActionType("CloseFlightInfo",
"Close the given FlightInfo explicitly.\n" +
"Request Message: FlightInfo to be closed\n" +
"Response Message: N/A");
ActionType REFRESH_FLIGHT_ENDPOINT = new ActionType("RefreshFlightEndpoint",
"Extend expiration time of the given FlightEndpoint.\n" +
"Request Message: FlightEndpoint to be refreshed\n" +
"Response Message: Refreshed FlightEndpoint");
}
Loading

0 comments on commit 68d32ba

Please sign in to comment.