Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 60 additions & 36 deletions src/ruby/ext/grpc/rb_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -191,61 +191,85 @@ static void grpc_request_call_stack_cleanup(request_call_stack* st) {
grpc_call_details_destroy(&st->details);
}

/* call-seq:
server.request_call
struct server_request_call_args {
grpc_rb_server* server;
grpc_completion_queue* call_queue;
request_call_stack st;
};

static VALUE grpc_rb_server_request_call_try(VALUE value_args) {
struct server_request_call_args* args =
(struct server_request_call_args*)value_args;

Requests notification of a new call on a server. */
static VALUE grpc_rb_server_request_call(VALUE self) {
grpc_rb_server* s = NULL;
grpc_call* call = NULL;
grpc_event ev;
grpc_call_error err;
request_call_stack st;
VALUE result;
void* tag = (void*)&st;
grpc_completion_queue* call_queue =
grpc_completion_queue_create_for_pluck(NULL);
gpr_timespec deadline;
void* tag = (void*)&args->st;

args->call_queue = grpc_completion_queue_create_for_pluck(NULL);
grpc_request_call_stack_init(&args->st);

TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "destroyed!");
return Qnil;
}
grpc_request_call_stack_init(&st);
/* call grpc_server_request_call, then wait for it to complete using
* pluck_event */
err = grpc_server_request_call(s->wrapped, &call, &st.details, &st.md_ary,
call_queue, s->queue, tag);
grpc_call_error err = grpc_server_request_call(
args->server->wrapped, &call, &args->st.details, &args->st.md_ary,
args->call_queue, args->server->queue, tag);
if (err != GRPC_CALL_OK) {
grpc_request_call_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError,
"grpc_server_request_call failed: %s (code=%d)",
grpc_call_error_detail_of(err), err);
return Qnil;
}

ev = rb_completion_queue_pluck(s->queue, tag,
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
grpc_event ev = rb_completion_queue_pluck(
args->server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
if (!ev.success) {
grpc_request_call_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError, "request_call completion failed");
return Qnil;
}

/* build the NewServerRpc struct result */
deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME);
result = rb_struct_new(
grpc_rb_sNewServerRpc, grpc_rb_slice_to_ruby_string(st.details.method),
grpc_rb_slice_to_ruby_string(st.details.host),
rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
INT2NUM(deadline.tv_nsec / 1000)),
grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call, call_queue),
NULL);
grpc_request_call_stack_cleanup(&st);
gpr_timespec deadline =
gpr_convert_clock_type(args->st.details.deadline, GPR_CLOCK_REALTIME);
VALUE result =
rb_struct_new(grpc_rb_sNewServerRpc,
grpc_rb_slice_to_ruby_string(args->st.details.method),
grpc_rb_slice_to_ruby_string(args->st.details.host),
rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
INT2NUM(deadline.tv_nsec / 1000)),
grpc_rb_md_ary_to_h(&args->st.md_ary),
grpc_rb_wrap_call(call, args->call_queue), NULL);
args->call_queue = NULL;
return result;
}

static VALUE grpc_rb_server_request_call_ensure(VALUE value_args) {
struct server_request_call_args* args =
(struct server_request_call_args*)value_args;

if (args->call_queue) {
grpc_rb_completion_queue_destroy(args->call_queue);
}

grpc_request_call_stack_cleanup(&args->st);

return Qnil;
}

/* call-seq:
server.request_call

Requests notification of a new call on a server. */
static VALUE grpc_rb_server_request_call(VALUE self) {
grpc_rb_server* s;

TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "destroyed!");
}

struct server_request_call_args args = {.server = s, .call_queue = NULL};

return rb_ensure(grpc_rb_server_request_call_try, (VALUE)&args,
grpc_rb_server_request_call_ensure, (VALUE)&args);
}

static VALUE grpc_rb_server_start(VALUE self) {
grpc_rb_server* s = NULL;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
Expand Down