Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC: Pipelining support #197

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions contrib/ruby/ext/trilogy-ruby/cext.c
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,42 @@ static VALUE rb_trilogy_query(VALUE self, VALUE query)
return execute_read_query_response(ctx);
}

static VALUE rb_trilogy_pipelined_query(VALUE self, VALUE queries)
{
struct trilogy_ctx *ctx = get_open_ctx(self);

Check_Type(queries, T_ARRAY);

rb_encoding * encoding = rb_to_encoding(ctx->encoding);

long count = RARRAY_LEN(queries);
long index;
for (index = 0; index < count; index++) {
VALUE query = RARRAY_AREF(queries, index);
StringValue(query);

query = rb_str_export_to_enc(query, encoding);

int rc = trilogy_query_send(&ctx->conn, RSTRING_PTR(query), RSTRING_LEN(query));
if (rc == TRILOGY_AGAIN) {
rc = flush_writes(ctx);
}

if (rc < 0) {
handle_trilogy_error(ctx, rc, "trilogy_query_send");
}
}

VALUE results = rb_ary_new2(count);
for (index = 0; index < count; index++) {
ctx->conn.packet_parser.sequence_number = 1; // HACK
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the sequence number can really be assumed to be always 1. From what I could see it's the case, but we could also record the sequence number of each query.

Copy link
Contributor

@composerinteralia composerinteralia Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't be 1 if the query is long enough to span multiple MySQL packets (e.g. connection.query('x' * 0xFFFFFE) will write 2 MySQL packets, so we'd need the sequence number here to be 2).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I think we need to store these values in an array somewhere to restore them.

VALUE result = execute_read_query_response(ctx);
rb_ary_push(results, result);
}

return results;
}

static VALUE rb_trilogy_ping(VALUE self)
{
struct trilogy_ctx *ctx = get_open_ctx(self);
Expand Down Expand Up @@ -1141,6 +1177,7 @@ RUBY_FUNC_EXPORTED void Init_cext(void)
rb_define_method(Trilogy, "change_db", rb_trilogy_change_db, 1);
rb_define_alias(Trilogy, "select_db", "change_db");
rb_define_method(Trilogy, "query", rb_trilogy_query, 1);
rb_define_method(Trilogy, "pipelined_query", rb_trilogy_pipelined_query, 1);
rb_define_method(Trilogy, "ping", rb_trilogy_ping, 0);
rb_define_method(Trilogy, "escape", rb_trilogy_escape, 1);
rb_define_method(Trilogy, "close", rb_trilogy_close, 0);
Expand Down
36 changes: 36 additions & 0 deletions contrib/ruby/test/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,42 @@ def test_trilogy_set_server_option_multi_statement
assert_match(/trilogy_query_recv/, e.message)
end

def test_pipeline
client = new_tcp_client
create_test_table(client)
first, second = client.pipelined_query(["SELECT 1 AS a, 2 AS b", "SELECT 3 AS c, 4 AS d"])

assert_equal ["a", "b"], first.fields
assert_equal [[1, 2]], first.rows
assert_equal [{ "a" => 1, "b" => 2 }], first.each_hash.to_a
assert_equal [[1, 2]], first.to_a
assert_kind_of Float, first.query_time
assert_in_delta 0.1, first.query_time, 0.1

assert_equal ["c", "d"], second.fields
assert_equal [[3, 4]], second.rows
assert_equal [{ "c" => 3, "d" => 4 }], second.each_hash.to_a
assert_equal [[3, 4]], second.to_a
assert_kind_of Float, second.query_time
assert_in_delta 0.1, second.query_time, 0.1

results = client.pipelined_query([
"INSERT INTO trilogy_test (int_test) VALUES ('4')",
"INSERT INTO trilogy_test (int_test) VALUES ('3')",
"INSERT INTO trilogy_test (int_test) VALUES ('1')",
"SELECT * FROM trilogy_test",
])
assert_equal 4, results.size
3.times do |i|
assert_equal 1, results[i].affected_rows
assert_predicate results[i].fields, :empty?
end
result = results.last
assert_equal 3, result.rows.size
index = result.fields.index("int_test")
assert_equal [1, 3, 4], result.rows.map { |r| r[index] }.sort
end

def test_trilogy_query_result_object
client = new_tcp_client

Expand Down
Loading