Skip to content

Commit

Permalink
apacheGH-43814: [GLib][FlightRPC] Add GAFlightServerClass::do_put (a…
Browse files Browse the repository at this point in the history
…pache#43999)

### Rationale for this change

This is needed to write `DoPut` client tests.

### What changes are included in this PR?

The following features are also added:
* `GAFlightMetadataWriter`
* `garrow_record_batch_writer_is_closed()`
* Improve `ArrowFlight::Client#do_put` API in Ruby

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* GitHub Issue: apache#43814

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
kou authored Sep 7, 2024
1 parent 7d49420 commit 5549fa9
Show file tree
Hide file tree
Showing 14 changed files with 383 additions and 10 deletions.
13 changes: 9 additions & 4 deletions c_glib/arrow-flight-glib/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,9 @@ gaflight_do_put_result_set_property(GObject *object,
{
auto result = static_cast<arrow::flight::FlightClient::DoPutResult *>(
g_value_get_pointer(value));
priv->writer = gaflight_stream_writer_new_raw(result->writer.release());
std::shared_ptr<arrow::flight::FlightStreamWriter> writer =
std::move(result->writer);
priv->writer = gaflight_stream_writer_new_raw(&writer);
priv->reader = gaflight_metadata_reader_new_raw(result->reader.release());
break;
}
Expand Down Expand Up @@ -983,10 +985,13 @@ gaflight_stream_reader_new_raw(arrow::flight::FlightStreamReader *flight_reader,
}

GAFlightStreamWriter *
gaflight_stream_writer_new_raw(arrow::flight::FlightStreamWriter *flight_writer)
gaflight_stream_writer_new_raw(
std::shared_ptr<arrow::flight::FlightStreamWriter> *flight_writer)
{
return GAFLIGHT_STREAM_WRITER(
g_object_new(GAFLIGHT_TYPE_STREAM_WRITER, "writer", flight_writer, nullptr));
return GAFLIGHT_STREAM_WRITER(g_object_new(GAFLIGHT_TYPE_STREAM_WRITER,
"record-batch-writer",
flight_writer,
nullptr));
}

GAFlightMetadataReader *
Expand Down
3 changes: 2 additions & 1 deletion c_glib/arrow-flight-glib/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ gaflight_stream_reader_new_raw(arrow::flight::FlightStreamReader *flight_reader,

GAFLIGHT_EXTERN
GAFlightStreamWriter *
gaflight_stream_writer_new_raw(arrow::flight::FlightStreamWriter *flight_writer);
gaflight_stream_writer_new_raw(
std::shared_ptr<arrow::flight::FlightStreamWriter> *flight_writer);

GAFLIGHT_EXTERN
GAFlightMetadataReader *
Expand Down
166 changes: 166 additions & 0 deletions c_glib/arrow-flight-glib/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ G_BEGIN_DECLS
* client. Also allows reading application-defined metadata via the
* Flight protocol.
*
* #GAFlightMetadataWriter is a class for sending application-specific
* metadata back to client during an upload.
*
* #GAFlightServerAuthSender is a class for sending messages to the
* client during an authentication handshake.
*
Expand Down Expand Up @@ -290,6 +293,98 @@ gaflight_message_reader_get_descriptor(GAFlightMessageReader *reader)
return gaflight_descriptor_new_raw(&flight_descriptor);
}

struct GAFlightMetadataWriterPrivate
{
arrow::flight::FlightMetadataWriter *writer;
};

enum {
PROP_WRITER = 1,
};

G_DEFINE_TYPE_WITH_PRIVATE(GAFlightMetadataWriter,
gaflight_metadata_writer,
G_TYPE_OBJECT)

#define GAFLIGHT_METADATA_WRITER_GET_PRIVATE(object) \
static_cast<GAFlightMetadataWriterPrivate *>( \
gaflight_metadata_writer_get_instance_private(GAFLIGHT_METADATA_WRITER(object)))

static void
gaflight_metadata_writer_finalize(GObject *object)
{
auto priv = GAFLIGHT_METADATA_WRITER_GET_PRIVATE(object);

delete priv->writer;

G_OBJECT_CLASS(gaflight_metadata_writer_parent_class)->finalize(object);
}

static void
gaflight_metadata_writer_set_property(GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
auto priv = GAFLIGHT_METADATA_WRITER_GET_PRIVATE(object);

switch (prop_id) {
case PROP_WRITER:
priv->writer =
static_cast<arrow::flight::FlightMetadataWriter *>(g_value_get_pointer(value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
}
}

static void
gaflight_metadata_writer_init(GAFlightMetadataWriter *object)
{
}

static void
gaflight_metadata_writer_class_init(GAFlightMetadataWriterClass *klass)
{
auto gobject_class = G_OBJECT_CLASS(klass);

gobject_class->finalize = gaflight_metadata_writer_finalize;
gobject_class->set_property = gaflight_metadata_writer_set_property;

GParamSpec *spec;
spec = g_param_spec_pointer(
"writer",
nullptr,
nullptr,
static_cast<GParamFlags>(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_WRITER, spec);
}

/**
* gaflight_metadata_writer_write:
* @writer: A #GAFlightMetadataWriter.
* @metadata: A #GArrowBuffer to be sent.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Writes metadata to the client.
*
* Returns: %TRUE on success, %FALSE on error.
*
* Since: 18.0.0
*/
gboolean
gaflight_metadata_writer_write(GAFlightMetadataWriter *writer,
GArrowBuffer *metadata,
GError **error)
{
auto flight_writer = gaflight_metadata_writer_get_raw(writer);
auto flight_metadata = garrow_buffer_get_raw(metadata);
return garrow::check(error,
flight_writer->WriteMetadata(*flight_metadata),
"[flight-metadata-writer][write]");
}

struct GAFlightServerCallContextPrivate
{
arrow::flight::ServerCallContext *call_context;
Expand Down Expand Up @@ -1034,6 +1129,34 @@ namespace gaflight {
return arrow::Status::OK();
}

arrow::Status
DoPut(const arrow::flight::ServerCallContext &context,
std::unique_ptr<arrow::flight::FlightMessageReader> reader,
std::unique_ptr<arrow::flight::FlightMetadataWriter> writer) override
{
auto gacontext = gaflight_server_call_context_new_raw(&context);
auto gareader = gaflight_message_reader_new_raw(reader.release(), TRUE);
auto gawriter = gaflight_metadata_writer_new_raw(writer.release());
GError *gerror = nullptr;
auto success =
gaflight_server_do_put(gaserver_, gacontext, gareader, gawriter, &gerror);
g_object_unref(gawriter);
g_object_unref(gareader);
g_object_unref(gacontext);
if (!success && !gerror) {
g_set_error(&gerror,
GARROW_ERROR,
GARROW_ERROR_UNKNOWN,
"GAFlightServerClass::do_put() returns FALSE but error isn't set");
}
if (gerror) {
return garrow_error_to_status(gerror,
arrow::StatusCode::UnknownError,
"[flight-server][do-put]");
}
return arrow::Status::OK();
}

private:
GAFlightServer *gaserver_;
};
Expand Down Expand Up @@ -1228,6 +1351,35 @@ gaflight_server_do_get(GAFlightServer *server,
return (*(klass->do_get))(server, context, ticket, error);
}

/**
* gaflight_server_do_put:
* @server: A #GAFlightServer.
* @context: A #GAFlightServerCallContext.
* @reader: A #GAFlightMessageReader.
* @writer: A #GAFlightMetadataWriter.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Processes a stream of IPC payloads sent from a client.
*
* Returns: %TRUE on success, %FALSE on error.
*
* Since: 18.0.0
*/
gboolean
gaflight_server_do_put(GAFlightServer *server,
GAFlightServerCallContext *context,
GAFlightMessageReader *reader,
GAFlightMetadataWriter *writer,
GError **error)
{
auto klass = GAFLIGHT_SERVER_GET_CLASS(server);
if (!(klass && klass->do_put)) {
g_set_error(error, GARROW_ERROR, GARROW_ERROR_NOT_IMPLEMENTED, "not implemented");
return false;
}
return klass->do_put(server, context, reader, writer, error);
}

G_END_DECLS

arrow::flight::FlightDataStream *
Expand Down Expand Up @@ -1257,6 +1409,20 @@ gaflight_message_reader_get_raw(GAFlightMessageReader *reader)
return static_cast<arrow::flight::FlightMessageReader *>(flight_reader);
}

GAFlightMetadataWriter *
gaflight_metadata_writer_new_raw(arrow::flight::FlightMetadataWriter *flight_writer)
{
return GAFLIGHT_METADATA_WRITER(
g_object_new(GAFLIGHT_TYPE_METADATA_WRITER, "writer", flight_writer, nullptr));
}

arrow::flight::FlightMetadataWriter *
gaflight_metadata_writer_get_raw(GAFlightMetadataWriter *writer)
{
auto priv = GAFLIGHT_METADATA_WRITER_GET_PRIVATE(writer);
return priv->writer;
}

GAFlightServerCallContext *
gaflight_server_call_context_new_raw(
const arrow::flight::ServerCallContext *flight_call_context)
Expand Down
29 changes: 29 additions & 0 deletions c_glib/arrow-flight-glib/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ GAFLIGHT_AVAILABLE_IN_14_0
GAFlightDescriptor *
gaflight_message_reader_get_descriptor(GAFlightMessageReader *reader);

#define GAFLIGHT_TYPE_METADATA_WRITER (gaflight_metadata_writer_get_type())
GAFLIGHT_AVAILABLE_IN_18_0
G_DECLARE_DERIVABLE_TYPE(
GAFlightMetadataWriter, gaflight_metadata_writer, GAFLIGHT, METADATA_WRITER, GObject)
struct _GAFlightMetadataWriterClass
{
GObjectClass parent_class;
};

GAFLIGHT_AVAILABLE_IN_18_0
gboolean
gaflight_metadata_writer_write(GAFlightMetadataWriter *writer,
GArrowBuffer *metadata,
GError **error);

#define GAFLIGHT_TYPE_SERVER_CALL_CONTEXT (gaflight_server_call_context_get_type())
GAFLIGHT_AVAILABLE_IN_5_0
G_DECLARE_DERIVABLE_TYPE(GAFlightServerCallContext,
Expand Down Expand Up @@ -199,6 +214,7 @@ G_DECLARE_DERIVABLE_TYPE(GAFlightServer, gaflight_server, GAFLIGHT, SERVER, GObj
* GAFlightServerClass:
* @list_flights: A virtual function to implement `ListFlights` API.
* @do_get: A virtual function to implement `DoGet` API.
* @do_put: A virtual function to implement `DoPut` API.
*
* Since: 5.0.0
*/
Expand All @@ -218,6 +234,11 @@ struct _GAFlightServerClass
GAFlightServerCallContext *context,
GAFlightTicket *ticket,
GError **error);
gboolean (*do_put)(GAFlightServer *server,
GAFlightServerCallContext *context,
GAFlightMessageReader *reader,
GAFlightMetadataWriter *writer,
GError **error);
};

GAFLIGHT_AVAILABLE_IN_5_0
Expand Down Expand Up @@ -254,4 +275,12 @@ gaflight_server_do_get(GAFlightServer *server,
GAFlightTicket *ticket,
GError **error);

GAFLIGHT_AVAILABLE_IN_18_0
gboolean
gaflight_server_do_put(GAFlightServer *server,
GAFlightServerCallContext *context,
GAFlightMessageReader *reader,
GAFlightMetadataWriter *writer,
GError **error);

G_END_DECLS
8 changes: 8 additions & 0 deletions c_glib/arrow-flight-glib/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ GAFLIGHT_EXTERN
arrow::flight::FlightMessageReader *
gaflight_message_reader_get_raw(GAFlightMessageReader *reader);

GAFLIGHT_EXTERN
GAFlightMetadataWriter *
gaflight_metadata_writer_new_raw(arrow::flight::FlightMetadataWriter *flight_writer);

GAFLIGHT_EXTERN
arrow::flight::FlightMetadataWriter *
gaflight_metadata_writer_get_raw(GAFlightMetadataWriter *writer);

GAFLIGHT_EXTERN
GAFlightServerCallContext *
gaflight_server_call_context_new_raw(
Expand Down
31 changes: 26 additions & 5 deletions c_glib/arrow-glib/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ G_BEGIN_DECLS
* batches in file format into output.
*/

typedef struct GArrowRecordBatchWriterPrivate_
struct GArrowRecordBatchWriterPrivate
{
std::shared_ptr<arrow::ipc::RecordBatchWriter> record_batch_writer;
} GArrowRecordBatchWriterPrivate;
bool is_closed;
};

enum {
PROP_0,
PROP_RECORD_BATCH_WRITER
PROP_RECORD_BATCH_WRITER = 1,
};

G_DEFINE_TYPE_WITH_PRIVATE(GArrowRecordBatchWriter,
Expand Down Expand Up @@ -111,6 +111,7 @@ garrow_record_batch_writer_init(GArrowRecordBatchWriter *object)
{
auto priv = GARROW_RECORD_BATCH_WRITER_GET_PRIVATE(object);
new (&priv->record_batch_writer) std::shared_ptr<arrow::ipc::RecordBatchWriter>;
priv->is_closed = false;
}

static void
Expand Down Expand Up @@ -193,7 +194,27 @@ garrow_record_batch_writer_close(GArrowRecordBatchWriter *writer, GError **error
auto arrow_writer = garrow_record_batch_writer_get_raw(writer);

auto status = arrow_writer->Close();
return garrow_error_check(error, status, "[record-batch-writer][close]");
auto success = garrow_error_check(error, status, "[record-batch-writer][close]");
if (success) {
auto priv = GARROW_RECORD_BATCH_WRITER_GET_PRIVATE(writer);
priv->is_closed = true;
}
return success;
}

/**
* garrow_record_batch_writer_is_closed:
* @writer: A #GArrowRecordBatchWriter.
*
* Returns: %TRUE if the writer is closed, %FALSE otherwise.
*
* Since: 18.0.0
*/
gboolean
garrow_record_batch_writer_is_closed(GArrowRecordBatchWriter *writer)
{
auto priv = GARROW_RECORD_BATCH_WRITER_GET_PRIVATE(writer);
return priv->is_closed;
}

G_DEFINE_TYPE(GArrowRecordBatchStreamWriter,
Expand Down
4 changes: 4 additions & 0 deletions c_glib/arrow-glib/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ GARROW_AVAILABLE_IN_ALL
gboolean
garrow_record_batch_writer_close(GArrowRecordBatchWriter *writer, GError **error);

GARROW_AVAILABLE_IN_18_0
gboolean
garrow_record_batch_writer_is_closed(GArrowRecordBatchWriter *writer);

#define GARROW_TYPE_RECORD_BATCH_STREAM_WRITER \
(garrow_record_batch_stream_writer_get_type())
GARROW_AVAILABLE_IN_ALL
Expand Down
Loading

0 comments on commit 5549fa9

Please sign in to comment.