diff --git a/c_glib/arrow-flight-glib/client.cpp b/c_glib/arrow-flight-glib/client.cpp index 23f59c9da69ad..75b02ec25869f 100644 --- a/c_glib/arrow-flight-glib/client.cpp +++ b/c_glib/arrow-flight-glib/client.cpp @@ -570,7 +570,9 @@ gaflight_do_put_result_set_property(GObject *object, { auto result = static_cast( g_value_get_pointer(value)); - priv->writer = gaflight_stream_writer_new_raw(result->writer.release()); + std::shared_ptr writer = + std::move(result->writer); + priv->writer = gaflight_stream_writer_new_raw(&writer); priv->reader = gaflight_metadata_reader_new_raw(result->reader.release()); break; } @@ -983,10 +985,13 @@ gaflight_stream_reader_new_raw(arrow::flight::FlightStreamReader *flight_reader, } GAFlightStreamWriter * -gaflight_stream_writer_new_raw(arrow::flight::FlightStreamWriter *flight_writer) +gaflight_stream_writer_new_raw( + std::shared_ptr *flight_writer) { - return GAFLIGHT_STREAM_WRITER( - g_object_new(GAFLIGHT_TYPE_STREAM_WRITER, "writer", flight_writer, nullptr)); + return GAFLIGHT_STREAM_WRITER(g_object_new(GAFLIGHT_TYPE_STREAM_WRITER, + "record-batch-writer", + flight_writer, + nullptr)); } GAFlightMetadataReader * diff --git a/c_glib/arrow-flight-glib/client.hpp b/c_glib/arrow-flight-glib/client.hpp index 888f87ecb5732..32ad35845aa12 100644 --- a/c_glib/arrow-flight-glib/client.hpp +++ b/c_glib/arrow-flight-glib/client.hpp @@ -30,7 +30,8 @@ gaflight_stream_reader_new_raw(arrow::flight::FlightStreamReader *flight_reader, GAFLIGHT_EXTERN GAFlightStreamWriter * -gaflight_stream_writer_new_raw(arrow::flight::FlightStreamWriter *flight_writer); +gaflight_stream_writer_new_raw( + std::shared_ptr *flight_writer); GAFLIGHT_EXTERN GAFlightMetadataReader * diff --git a/c_glib/arrow-flight-glib/server.cpp b/c_glib/arrow-flight-glib/server.cpp index f7444918e90f6..e39fd97b0d06c 100644 --- a/c_glib/arrow-flight-glib/server.cpp +++ b/c_glib/arrow-flight-glib/server.cpp @@ -45,6 +45,9 @@ G_BEGIN_DECLS * client. Also allows reading application-defined metadata via the * Flight protocol. * + * #GAFlightMetadataWriter is a class for sending application-specific + * metadata back to client during an upload. + * * #GAFlightServerAuthSender is a class for sending messages to the * client during an authentication handshake. * @@ -290,6 +293,98 @@ gaflight_message_reader_get_descriptor(GAFlightMessageReader *reader) return gaflight_descriptor_new_raw(&flight_descriptor); } +struct GAFlightMetadataWriterPrivate +{ + arrow::flight::FlightMetadataWriter *writer; +}; + +enum { + PROP_WRITER = 1, +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GAFlightMetadataWriter, + gaflight_metadata_writer, + G_TYPE_OBJECT) + +#define GAFLIGHT_METADATA_WRITER_GET_PRIVATE(object) \ + static_cast( \ + gaflight_metadata_writer_get_instance_private(GAFLIGHT_METADATA_WRITER(object))) + +static void +gaflight_metadata_writer_finalize(GObject *object) +{ + auto priv = GAFLIGHT_METADATA_WRITER_GET_PRIVATE(object); + + delete priv->writer; + + G_OBJECT_CLASS(gaflight_metadata_writer_parent_class)->finalize(object); +} + +static void +gaflight_metadata_writer_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GAFLIGHT_METADATA_WRITER_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_WRITER: + priv->writer = + static_cast(g_value_get_pointer(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gaflight_metadata_writer_init(GAFlightMetadataWriter *object) +{ +} + +static void +gaflight_metadata_writer_class_init(GAFlightMetadataWriterClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->finalize = gaflight_metadata_writer_finalize; + gobject_class->set_property = gaflight_metadata_writer_set_property; + + GParamSpec *spec; + spec = g_param_spec_pointer( + "writer", + nullptr, + nullptr, + static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_WRITER, spec); +} + +/** + * gaflight_metadata_writer_write: + * @writer: A #GAFlightMetadataWriter. + * @metadata: A #GArrowBuffer to be sent. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Writes metadata to the client. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 18.0.0 + */ +gboolean +gaflight_metadata_writer_write(GAFlightMetadataWriter *writer, + GArrowBuffer *metadata, + GError **error) +{ + auto flight_writer = gaflight_metadata_writer_get_raw(writer); + auto flight_metadata = garrow_buffer_get_raw(metadata); + return garrow::check(error, + flight_writer->WriteMetadata(*flight_metadata), + "[flight-metadata-writer][write]"); +} + struct GAFlightServerCallContextPrivate { arrow::flight::ServerCallContext *call_context; @@ -1034,6 +1129,34 @@ namespace gaflight { return arrow::Status::OK(); } + arrow::Status + DoPut(const arrow::flight::ServerCallContext &context, + std::unique_ptr reader, + std::unique_ptr writer) override + { + auto gacontext = gaflight_server_call_context_new_raw(&context); + auto gareader = gaflight_message_reader_new_raw(reader.release(), TRUE); + auto gawriter = gaflight_metadata_writer_new_raw(writer.release()); + GError *gerror = nullptr; + auto success = + gaflight_server_do_put(gaserver_, gacontext, gareader, gawriter, &gerror); + g_object_unref(gawriter); + g_object_unref(gareader); + g_object_unref(gacontext); + if (!success && !gerror) { + g_set_error(&gerror, + GARROW_ERROR, + GARROW_ERROR_UNKNOWN, + "GAFlightServerClass::do_put() returns FALSE but error isn't set"); + } + if (gerror) { + return garrow_error_to_status(gerror, + arrow::StatusCode::UnknownError, + "[flight-server][do-put]"); + } + return arrow::Status::OK(); + } + private: GAFlightServer *gaserver_; }; @@ -1228,6 +1351,35 @@ gaflight_server_do_get(GAFlightServer *server, return (*(klass->do_get))(server, context, ticket, error); } +/** + * gaflight_server_do_put: + * @server: A #GAFlightServer. + * @context: A #GAFlightServerCallContext. + * @reader: A #GAFlightMessageReader. + * @writer: A #GAFlightMetadataWriter. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Processes a stream of IPC payloads sent from a client. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 18.0.0 + */ +gboolean +gaflight_server_do_put(GAFlightServer *server, + GAFlightServerCallContext *context, + GAFlightMessageReader *reader, + GAFlightMetadataWriter *writer, + GError **error) +{ + auto klass = GAFLIGHT_SERVER_GET_CLASS(server); + if (!(klass && klass->do_put)) { + g_set_error(error, GARROW_ERROR, GARROW_ERROR_NOT_IMPLEMENTED, "not implemented"); + return false; + } + return klass->do_put(server, context, reader, writer, error); +} + G_END_DECLS arrow::flight::FlightDataStream * @@ -1257,6 +1409,20 @@ gaflight_message_reader_get_raw(GAFlightMessageReader *reader) return static_cast(flight_reader); } +GAFlightMetadataWriter * +gaflight_metadata_writer_new_raw(arrow::flight::FlightMetadataWriter *flight_writer) +{ + return GAFLIGHT_METADATA_WRITER( + g_object_new(GAFLIGHT_TYPE_METADATA_WRITER, "writer", flight_writer, nullptr)); +} + +arrow::flight::FlightMetadataWriter * +gaflight_metadata_writer_get_raw(GAFlightMetadataWriter *writer) +{ + auto priv = GAFLIGHT_METADATA_WRITER_GET_PRIVATE(writer); + return priv->writer; +} + GAFlightServerCallContext * gaflight_server_call_context_new_raw( const arrow::flight::ServerCallContext *flight_call_context) diff --git a/c_glib/arrow-flight-glib/server.h b/c_glib/arrow-flight-glib/server.h index 7e594febb172f..e3a469098b32c 100644 --- a/c_glib/arrow-flight-glib/server.h +++ b/c_glib/arrow-flight-glib/server.h @@ -65,6 +65,21 @@ GAFLIGHT_AVAILABLE_IN_14_0 GAFlightDescriptor * gaflight_message_reader_get_descriptor(GAFlightMessageReader *reader); +#define GAFLIGHT_TYPE_METADATA_WRITER (gaflight_metadata_writer_get_type()) +GAFLIGHT_AVAILABLE_IN_18_0 +G_DECLARE_DERIVABLE_TYPE( + GAFlightMetadataWriter, gaflight_metadata_writer, GAFLIGHT, METADATA_WRITER, GObject) +struct _GAFlightMetadataWriterClass +{ + GObjectClass parent_class; +}; + +GAFLIGHT_AVAILABLE_IN_18_0 +gboolean +gaflight_metadata_writer_write(GAFlightMetadataWriter *writer, + GArrowBuffer *metadata, + GError **error); + #define GAFLIGHT_TYPE_SERVER_CALL_CONTEXT (gaflight_server_call_context_get_type()) GAFLIGHT_AVAILABLE_IN_5_0 G_DECLARE_DERIVABLE_TYPE(GAFlightServerCallContext, @@ -199,6 +214,7 @@ G_DECLARE_DERIVABLE_TYPE(GAFlightServer, gaflight_server, GAFLIGHT, SERVER, GObj * GAFlightServerClass: * @list_flights: A virtual function to implement `ListFlights` API. * @do_get: A virtual function to implement `DoGet` API. + * @do_put: A virtual function to implement `DoPut` API. * * Since: 5.0.0 */ @@ -218,6 +234,11 @@ struct _GAFlightServerClass GAFlightServerCallContext *context, GAFlightTicket *ticket, GError **error); + gboolean (*do_put)(GAFlightServer *server, + GAFlightServerCallContext *context, + GAFlightMessageReader *reader, + GAFlightMetadataWriter *writer, + GError **error); }; GAFLIGHT_AVAILABLE_IN_5_0 @@ -254,4 +275,12 @@ gaflight_server_do_get(GAFlightServer *server, GAFlightTicket *ticket, GError **error); +GAFLIGHT_AVAILABLE_IN_18_0 +gboolean +gaflight_server_do_put(GAFlightServer *server, + GAFlightServerCallContext *context, + GAFlightMessageReader *reader, + GAFlightMetadataWriter *writer, + GError **error); + G_END_DECLS diff --git a/c_glib/arrow-flight-glib/server.hpp b/c_glib/arrow-flight-glib/server.hpp index ec4815751c8d8..f68eef83781ec 100644 --- a/c_glib/arrow-flight-glib/server.hpp +++ b/c_glib/arrow-flight-glib/server.hpp @@ -36,6 +36,14 @@ GAFLIGHT_EXTERN arrow::flight::FlightMessageReader * gaflight_message_reader_get_raw(GAFlightMessageReader *reader); +GAFLIGHT_EXTERN +GAFlightMetadataWriter * +gaflight_metadata_writer_new_raw(arrow::flight::FlightMetadataWriter *flight_writer); + +GAFLIGHT_EXTERN +arrow::flight::FlightMetadataWriter * +gaflight_metadata_writer_get_raw(GAFlightMetadataWriter *writer); + GAFLIGHT_EXTERN GAFlightServerCallContext * gaflight_server_call_context_new_raw( diff --git a/c_glib/arrow-glib/writer.cpp b/c_glib/arrow-glib/writer.cpp index b0321d51b3ba4..08af1c7976965 100644 --- a/c_glib/arrow-glib/writer.cpp +++ b/c_glib/arrow-glib/writer.cpp @@ -45,14 +45,14 @@ G_BEGIN_DECLS * batches in file format into output. */ -typedef struct GArrowRecordBatchWriterPrivate_ +struct GArrowRecordBatchWriterPrivate { std::shared_ptr record_batch_writer; -} GArrowRecordBatchWriterPrivate; + bool is_closed; +}; enum { - PROP_0, - PROP_RECORD_BATCH_WRITER + PROP_RECORD_BATCH_WRITER = 1, }; G_DEFINE_TYPE_WITH_PRIVATE(GArrowRecordBatchWriter, @@ -111,6 +111,7 @@ garrow_record_batch_writer_init(GArrowRecordBatchWriter *object) { auto priv = GARROW_RECORD_BATCH_WRITER_GET_PRIVATE(object); new (&priv->record_batch_writer) std::shared_ptr; + priv->is_closed = false; } static void @@ -193,7 +194,27 @@ garrow_record_batch_writer_close(GArrowRecordBatchWriter *writer, GError **error auto arrow_writer = garrow_record_batch_writer_get_raw(writer); auto status = arrow_writer->Close(); - return garrow_error_check(error, status, "[record-batch-writer][close]"); + auto success = garrow_error_check(error, status, "[record-batch-writer][close]"); + if (success) { + auto priv = GARROW_RECORD_BATCH_WRITER_GET_PRIVATE(writer); + priv->is_closed = true; + } + return success; +} + +/** + * garrow_record_batch_writer_is_closed: + * @writer: A #GArrowRecordBatchWriter. + * + * Returns: %TRUE if the writer is closed, %FALSE otherwise. + * + * Since: 18.0.0 + */ +gboolean +garrow_record_batch_writer_is_closed(GArrowRecordBatchWriter *writer) +{ + auto priv = GARROW_RECORD_BATCH_WRITER_GET_PRIVATE(writer); + return priv->is_closed; } G_DEFINE_TYPE(GArrowRecordBatchStreamWriter, diff --git a/c_glib/arrow-glib/writer.h b/c_glib/arrow-glib/writer.h index 46bbdddec8c9d..cea8390d9028f 100644 --- a/c_glib/arrow-glib/writer.h +++ b/c_glib/arrow-glib/writer.h @@ -53,6 +53,10 @@ GARROW_AVAILABLE_IN_ALL gboolean garrow_record_batch_writer_close(GArrowRecordBatchWriter *writer, GError **error); +GARROW_AVAILABLE_IN_18_0 +gboolean +garrow_record_batch_writer_is_closed(GArrowRecordBatchWriter *writer); + #define GARROW_TYPE_RECORD_BATCH_STREAM_WRITER \ (garrow_record_batch_stream_writer_get_type()) GARROW_AVAILABLE_IN_ALL diff --git a/c_glib/test/flight/test-client.rb b/c_glib/test/flight/test-client.rb index 7eb093d3cab80..f1e3f31234ab4 100644 --- a/c_glib/test/flight/test-client.rb +++ b/c_glib/test/flight/test-client.rb @@ -84,4 +84,37 @@ def test_error end end end + + sub_test_case("#do_put") do + def test_success + client = ArrowFlight::Client.new(@location) + generator = Helper::FlightInfoGenerator.new + descriptor = generator.page_view_descriptor + table = generator.page_view_table + result = client.do_put(descriptor, table.schema) + writer = result.writer + writer.write_table(table) + writer.done_writing + reader = result.reader + metadata = reader.read + writer.close + assert_equal(["done", table], + [metadata.data.to_s, @server.uploaded_table]) + end + + def test_error + client = ArrowFlight::Client.new(@location) + generator = Helper::FlightInfoGenerator.new + descriptor = generator.page_view_descriptor + table = generator.page_view_table + result = client.do_put(descriptor, table.schema) + assert_raise(Arrow::Error::Invalid) do + writer = result.writer + writer.done_writing + reader = result.reader + reader.read + writer.close + end + end + end end diff --git a/c_glib/test/helper/flight-server.rb b/c_glib/test/helper/flight-server.rb index 8c47029d41791..80b8a5c96cf9f 100644 --- a/c_glib/test/helper/flight-server.rb +++ b/c_glib/test/helper/flight-server.rb @@ -34,6 +34,8 @@ def virtual_do_is_valid(context, token) class FlightServer < ArrowFlight::Server type_register + attr_reader :uploaded_table + private def virtual_do_list_flights(context, criteria) generator = FlightInfoGenerator.new @@ -54,5 +56,14 @@ def virtual_do_do_get(context, ticket) reader = Arrow::TableBatchReader.new(table) ArrowFlight::RecordBatchStream.new(reader) end + + def virtual_do_do_put(context, reader, writer) + @uploaded_table = reader.read_all + writer.write(Arrow::Buffer.new("done")) + if @uploaded_table.n_rows.zero? + raise Arrow::Error::Invalid.new("empty table") + end + true + end end end diff --git a/c_glib/test/test-file-writer.rb b/c_glib/test/test-file-writer.rb index 5f9c3c4e19aa9..06c9dfa25c7fc 100644 --- a/c_glib/test/test-file-writer.rb +++ b/c_glib/test/test-file-writer.rb @@ -34,6 +34,9 @@ def test_write_record_batch file_writer.write_record_batch(record_batch) ensure file_writer.close + assert do + file_writer.closed? + end end ensure output.close @@ -68,6 +71,9 @@ def test_write_table file_writer.write_table(table) ensure file_writer.close + assert do + file_writer.closed? + end end ensure output.close diff --git a/c_glib/test/test-stream-writer.rb b/c_glib/test/test-stream-writer.rb index 32754e20838b4..261732ae91e15 100644 --- a/c_glib/test/test-stream-writer.rb +++ b/c_glib/test/test-stream-writer.rb @@ -35,6 +35,9 @@ def test_write_record_batch stream_writer.write_record_batch(record_batch) ensure stream_writer.close + assert do + stream_writer.closed? + end end ensure output.close diff --git a/ruby/red-arrow-flight/lib/arrow-flight/client.rb b/ruby/red-arrow-flight/lib/arrow-flight/client.rb index ad45a4e403559..2750bcca589c8 100644 --- a/ruby/red-arrow-flight/lib/arrow-flight/client.rb +++ b/ruby/red-arrow-flight/lib/arrow-flight/client.rb @@ -47,5 +47,49 @@ def authenticate_basic(user, password, options=nil) end options end + + alias_method :do_put_raw, :do_put + # Upload data to a Flight described by the given descriptor. The + # caller must call `#close` on the returned stream once they are + # done writing. Note that it's automatically done when you use + # block. + # + # The reader and writer are linked; closing the writer will also + # close the reader. Use GArrowFlight::StreamWriter#done_writing to + # only close the write side of the channel. + # + # @param descriptor [GArrowFlight::Descriptor] Descriptor to be uploaded. + # @param schema [GArrow::Schema] Schema of uploaded data. + # @param options [ArrowFlight::CallOptions, Hash, nil] (nil) + # The options to be used. + # + # @yieldparam writer [GArrowFlight::StreamWriter] The writer to upload + # data to the given descriptor. + # + # This is closed automatically after the given block is finished. + # + # @yieldparam reader [GArrowFlight::MetadataReader] The reader to read + # metadata from the server. + # + # @return [Array, Object] + # The reader and the writer if block isn't given. + # + # The return value from block if block is given. + # + # @since 18.0.0 + def do_put(descriptor, schema, options=nil) + result = do_put_raw(descriptor, schema, options) + reader = result.reader + writer = result.writer + if block_given? + begin + yield(reader, writer) + ensure + writer.close unless writer.closed? + end + else + return reader, writer + end + end end end diff --git a/ruby/red-arrow-flight/test/helper/server.rb b/ruby/red-arrow-flight/test/helper/server.rb index 269bb5f3d7858..1ea4855897b09 100644 --- a/ruby/red-arrow-flight/test/helper/server.rb +++ b/ruby/red-arrow-flight/test/helper/server.rb @@ -21,6 +21,8 @@ module Helper class Server < ArrowFlight::Server type_register + attr_reader :uploaded_table + private def virtual_do_list_flights(context, criteria) generator = InfoGenerator.new @@ -35,5 +37,14 @@ def virtual_do_do_get(context, ticket) table = generator.page_view_table ArrowFlight::RecordBatchStream.new(table) end + + def virtual_do_do_put(context, reader, writer) + @uploaded_table = reader.read_all + writer.write(Arrow::Buffer.new("done")) + if @uploaded_table.n_rows.zero? + raise Arrow::Error::Invalid.new("empty table") + end + true + end end end diff --git a/ruby/red-arrow-flight/test/test-client.rb b/ruby/red-arrow-flight/test/test-client.rb index 850d6f45790c3..9f1ebbff81550 100644 --- a/ruby/red-arrow-flight/test/test-client.rb +++ b/ruby/red-arrow-flight/test/test-client.rb @@ -43,4 +43,35 @@ def test_do_get assert_equal(generator.page_view_table, reader.read_all) end + + def test_do_put_with_block + client = ArrowFlight::Client.new(@location) + generator = Helper::InfoGenerator.new + descriptor = generator.page_view_descriptor + table = generator.page_view_table + client.do_put(descriptor, table.schema) do |reader, writer| + writer.write_table(table) + writer.done_writing + metadata = reader.read + assert_equal(["done", table], + [metadata.data.to_s, @server.uploaded_table]) + end + end + + def test_do_put_without_block + client = ArrowFlight::Client.new(@location) + generator = Helper::InfoGenerator.new + descriptor = generator.page_view_descriptor + table = generator.page_view_table + reader, writer = client.do_put(descriptor, table.schema) + begin + writer.write_table(table) + writer.done_writing + metadata = reader.read + assert_equal(["done", table], + [metadata.data.to_s, @server.uploaded_table]) + ensure + writer.close + end + end end