Skip to content

Commit

Permalink
apacheGH-36408: [GLib][FlightSQL] Add support for INSERT/UPDATE/DELETE (
Browse files Browse the repository at this point in the history
apache#36409)

### Rationale for this change

We need the bindings of them to support INSERT/UPDATE/DELETE:

* `arrow::flight::sql::FlightSqlClient::ExecuteUpdate()`
* `arrow::flight::sql::FlightSqlServerBase::DoPutCommandStatementUpdate()`

### What changes are included in this PR?

The bindings of them.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: apache#36408

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
kou authored Jul 1, 2023
1 parent f03943c commit 0061d79
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 0 deletions.
32 changes: 32 additions & 0 deletions c_glib/arrow-flight-sql-glib/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,38 @@ gaflightsql_client_execute(GAFlightSQLClient *client,
return gaflight_info_new_raw(flight_info.release());
}

/**
* gaflightsql_client_execute_update:
* @client: A #GAFlightSQLClient.
* @query: A query to be executed in the UTF-8 format.
* @options: (nullable): A #GAFlightCallOptions.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: The number of changed records.
*
* Since: 13.0.0
*/
gint64
gaflightsql_client_execute_update(GAFlightSQLClient *client,
const gchar *query,
GAFlightCallOptions *options,
GError **error)
{
auto flight_sql_client = gaflightsql_client_get_raw(client);
arrow::flight::FlightCallOptions flight_default_options;
auto flight_options = &flight_default_options;
if (options) {
flight_options = gaflight_call_options_get_raw(options);
}
auto result = flight_sql_client->ExecuteUpdate(*flight_options, query);
if (!garrow::check(error,
result,
"[flight-sql-client][execute-update]")) {
return 0;
}
return *result;
}

/**
* gaflightsql_client_do_get:
* @client: A #GAFlightClient.
Expand Down
7 changes: 7 additions & 0 deletions c_glib/arrow-flight-sql-glib/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ gaflightsql_client_execute(GAFlightSQLClient *client,
GAFlightCallOptions *options,
GError **error);

GARROW_AVAILABLE_IN_13_0
gint64
gaflightsql_client_execute_update(GAFlightSQLClient *client,
const gchar *query,
GAFlightCallOptions *options,
GError **error);

GARROW_AVAILABLE_IN_9_0
GAFlightStreamReader *
gaflightsql_client_do_get(GAFlightSQLClient *client,
Expand Down
101 changes: 101 additions & 0 deletions c_glib/arrow-flight-sql-glib/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,36 @@ gaflightsql_statement_query_get_query(GAFlightSQLStatementQuery *command)
}


G_DEFINE_TYPE(GAFlightSQLStatementUpdate,
gaflightsql_statement_update,
GAFLIGHTSQL_TYPE_COMMAND)

static void
gaflightsql_statement_update_init(GAFlightSQLStatementUpdate *object)
{
}

static void
gaflightsql_statement_update_class_init(GAFlightSQLStatementUpdateClass *klass)
{
}

/**
* gaflightsql_statement_update_get_query:
* @command: A #GAFlightSQLStatementUpdate.
*
* Returns: The query to be executed.
*
* Since: 13.0.0
*/
const gchar *
gaflightsql_statement_update_get_query(GAFlightSQLStatementUpdate *command)
{
auto statement_update = gaflightsql_statement_update_get_raw(command);
return statement_update->query.c_str();
}


G_DEFINE_TYPE(GAFlightSQLStatementQueryTicket,
gaflightsql_statement_query_ticket,
GAFLIGHTSQL_TYPE_COMMAND)
Expand Down Expand Up @@ -250,6 +280,29 @@ namespace gaflightsql {
return std::make_unique<gaflight::DataStream>(gastream);
}

arrow::Result<int64_t>
DoPutCommandStatementUpdate(
const arrow::flight::ServerCallContext &context,
const arrow::flight::sql::StatementUpdate& command) override {
auto gacontext = gaflight_server_call_context_new_raw(&context);
auto gacommand = gaflightsql_statement_update_new_raw(&command);
GError *gerror = nullptr;
auto n_changed_records =
gaflightsql_server_do_put_command_statement_update(gaserver_,
gacontext,
gacommand,
&gerror);
g_object_unref(gacommand);
g_object_unref(gacontext);
if (gerror) {
return garrow_error_to_status(
gerror,
arrow::StatusCode::UnknownError,
"[flight-sql-server][do-put-command-statement-update]");
}
return n_changed_records;
}

private:
GAFlightSQLServer *gaserver_;
};
Expand Down Expand Up @@ -381,6 +434,35 @@ gaflightsql_server_do_get_statement(GAFlightSQLServer *server,
return (*(klass->do_get_statement))(server, context, ticket, error);
}

/**
* gaflightsql_server_do_put_command_statement_update:
* @server: A #GAFlightServer.
* @context: A #GAFlightServerCallContext.
* @command: A #GAFlightSQLStatementUpdate.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: The number of changed records.
*
* Since: 13.0.0
*/
gint64
gaflightsql_server_do_put_command_statement_update(
GAFlightSQLServer *server,
GAFlightServerCallContext *context,
GAFlightSQLStatementUpdate *command,
GError **error)
{
auto klass = GAFLIGHTSQL_SERVER_GET_CLASS(server);
if (!(klass && klass->do_put_command_statement_update)) {
g_set_error(error,
GARROW_ERROR,
GARROW_ERROR_NOT_IMPLEMENTED,
"not implemented");
return 0;
}
return klass->do_put_command_statement_update(server, context, command, error);
}


G_END_DECLS

Expand All @@ -402,6 +484,25 @@ gaflightsql_statement_query_get_raw(GAFlightSQLStatementQuery *command)
return static_cast<const arrow::flight::sql::StatementQuery *>(priv->command);
}


GAFlightSQLStatementUpdate *
gaflightsql_statement_update_new_raw(
const arrow::flight::sql::StatementUpdate *flight_command)
{
return GAFLIGHTSQL_STATEMENT_UPDATE(
g_object_new(GAFLIGHTSQL_TYPE_STATEMENT_UPDATE,
"command", flight_command,
nullptr));
}

const arrow::flight::sql::StatementUpdate *
gaflightsql_statement_update_get_raw(GAFlightSQLStatementUpdate *command)
{
auto priv = GAFLIGHTSQL_COMMAND_GET_PRIVATE(command);
return static_cast<const arrow::flight::sql::StatementUpdate *>(priv->command);
}


GAFlightSQLStatementQueryTicket *
gaflightsql_statement_query_ticket_new_raw(
const arrow::flight::sql::StatementQueryTicket *flight_command)
Expand Down
30 changes: 30 additions & 0 deletions c_glib/arrow-flight-sql-glib/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ const gchar *
gaflightsql_statement_query_get_query(GAFlightSQLStatementQuery *command);


#define GAFLIGHTSQL_TYPE_STATEMENT_UPDATE (gaflightsql_statement_update_get_type())
G_DECLARE_DERIVABLE_TYPE(GAFlightSQLStatementUpdate,
gaflightsql_statement_update,
GAFLIGHTSQL,
STATEMENT_UPDATE,
GAFlightSQLCommand)
struct _GAFlightSQLStatementUpdateClass
{
GAFlightSQLCommandClass parent_class;
};

GARROW_AVAILABLE_IN_13_0
const gchar *
gaflightsql_statement_update_get_query(GAFlightSQLStatementUpdate *command);


#define GAFLIGHTSQL_TYPE_STATEMENT_QUERY_TICKET \
(gaflightsql_statement_query_ticket_get_type())
G_DECLARE_DERIVABLE_TYPE(GAFlightSQLStatementQueryTicket,
Expand Down Expand Up @@ -87,6 +103,8 @@ G_DECLARE_DERIVABLE_TYPE(GAFlightSQLServer,
* SQL query.
* @do_get_statement: A virtual function to implement `DoGetStatement` API
* that gets a #GAFlightDataStream containing the query results.
* @do_put_command_statement_update: A virtual function to implement
* `DoPutCommandStatementUpdate` API that executes an update SQL statement.
*
* Since: 9.0.0
*/
Expand All @@ -105,6 +123,11 @@ struct _GAFlightSQLServerClass
GAFlightServerCallContext *context,
GAFlightSQLStatementQueryTicket *ticket,
GError **error);
gint64 (*do_put_command_statement_update)(
GAFlightSQLServer *server,
GAFlightServerCallContext *context,
GAFlightSQLStatementUpdate *command,
GError **error);
};

GARROW_AVAILABLE_IN_9_0
Expand All @@ -122,5 +145,12 @@ gaflightsql_server_do_get_statement(
GAFlightServerCallContext *context,
GAFlightSQLStatementQueryTicket *ticket,
GError **error);
GARROW_AVAILABLE_IN_13_0
gint64
gaflightsql_server_do_put_command_statement_update(
GAFlightSQLServer *server,
GAFlightServerCallContext *context,
GAFlightSQLStatementUpdate *command,
GError **error);

G_END_DECLS
7 changes: 7 additions & 0 deletions c_glib/arrow-flight-sql-glib/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ const arrow::flight::sql::StatementQuery *
gaflightsql_statement_query_get_raw(
GAFlightSQLStatementQuery *command);

GAFlightSQLStatementUpdate *
gaflightsql_statement_update_new_raw(
const arrow::flight::sql::StatementUpdate *flight_command);
const arrow::flight::sql::StatementUpdate *
gaflightsql_statement_update_get_raw(
GAFlightSQLStatementUpdate *command);

GAFlightSQLStatementQueryTicket *
gaflightsql_statement_query_ticket_new_raw(
const arrow::flight::sql::StatementQueryTicket *flight_command);
Expand Down
14 changes: 14 additions & 0 deletions c_glib/test/flight-sql/test-client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,18 @@ def test_error
end
end
end

sub_test_case("#execute_update") do
def test_success
insert_sql = "INSERT INTO page_view_table VALUES (100, true)"
n_changed_records = @sql_client.execute_update(insert_sql)
assert_equal(1, n_changed_records)
end

def test_error
assert_raise(Arrow::Error::Invalid) do
@sql_client.execute_update("INSERT")
end
end
end
end
7 changes: 7 additions & 0 deletions c_glib/test/helper/flight-sql-server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,12 @@ def virtual_do_do_get_statement(context, command)
reader = Arrow::TableBatchReader.new(table)
ArrowFlight::RecordBatchStream.new(reader)
end

def virtual_do_do_put_command_statement_update(context, command)
unless command.query == "INSERT INTO page_view_table VALUES (100, true)"
raise Arrow::Error::Invalid.new("invalid SQL")
end
1
end
end
end

0 comments on commit 0061d79

Please sign in to comment.