Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 6 additions & 36 deletions cinterop-c/MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions cinterop-c/include/kgrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ void kgrpc_server_set_batch_method_allocator(
kgrpc_batch_call_allocator allocator
);

/**
* Append an grpc_metadata entry to the given grpc_metadata_array.
*
* @return false if the array has not enough capacity, true otherwise.
*/
bool kgrpc_metadata_array_append(grpc_metadata_array *array, grpc_slice key, grpc_slice value);

#ifdef __cplusplus
}
Expand Down
13 changes: 13 additions & 0 deletions cinterop-c/src/kgrpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <kgrpc.h>
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/server/server.h"
#include "grpc/support/string_util.h"

extern "C" {

Expand Down Expand Up @@ -53,6 +54,18 @@ void kgrpc_server_set_batch_method_allocator(
});
}

bool kgrpc_metadata_array_append(grpc_metadata_array *array, grpc_slice key, grpc_slice value) {
if (array->capacity - array->count <= 0) {
return false;
}
grpc_metadata entry = {
.key = key,
.value = value
};
array->metadata[array->count++] = entry;
return true;
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.single
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.rpc.grpc.client.ClientCallScope
import kotlinx.rpc.grpc.client.GrpcClient
import kotlinx.rpc.grpc.GrpcMetadata
import kotlinx.rpc.grpc.Status
import kotlinx.rpc.grpc.StatusCode
import kotlinx.rpc.grpc.StatusException
import kotlinx.rpc.grpc.internal.CallbackFuture
import kotlinx.rpc.grpc.client.ClientCallScope
import kotlinx.rpc.grpc.client.GrpcClient
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.grpc.descriptor.MethodType
import kotlinx.rpc.grpc.internal.Ready
import kotlinx.rpc.grpc.descriptor.methodType
import kotlinx.rpc.grpc.internal.CallbackFuture
import kotlinx.rpc.grpc.internal.Ready
import kotlinx.rpc.grpc.internal.singleOrStatus
import kotlinx.rpc.grpc.statusCode
import kotlinx.rpc.internal.utils.InternalRpcApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import kotlinx.coroutines.CompletableJob
import kotlinx.rpc.grpc.GrpcMetadata
import kotlinx.rpc.grpc.Status
import kotlinx.rpc.grpc.StatusCode
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.grpc.internal.BatchResult
import kotlinx.rpc.grpc.internal.CompletionQueue
import kotlinx.rpc.grpc.descriptor.MethodDescriptor
import kotlinx.rpc.grpc.internal.destroyEntries
import kotlinx.rpc.grpc.internal.internalError
import kotlinx.rpc.grpc.internal.toByteArray
import kotlinx.rpc.grpc.internal.toGrpcByteBuffer
Expand Down Expand Up @@ -182,7 +183,7 @@ internal class NativeClientCall<Request, Response>(
if (!success) return

// send and receive initial headers to/from the server
sendAndReceiveInitialMetadata()
sendAndReceiveInitialMetadata(headers)
}

/**
Expand Down Expand Up @@ -251,13 +252,16 @@ internal class NativeClientCall<Request, Response>(
val statusCode = arena.alloc<grpc_status_code.Var>()
val statusDetails = arena.alloc<grpc_slice>()
val errorStr = arena.alloc<CPointerVar<ByteVar>>()

val trailingMetadata = arena.alloc<grpc_metadata_array>()
grpc_metadata_array_init(trailingMetadata.ptr)

val op = arena.alloc<grpc_op> {
op = GRPC_OP_RECV_STATUS_ON_CLIENT
data.recv_status_on_client.status = statusCode.ptr
data.recv_status_on_client.status_details = statusDetails.ptr
data.recv_status_on_client.error_string = errorStr.ptr
// TODO: trailing metadata
data.recv_status_on_client.trailing_metadata = null
data.recv_status_on_client.trailing_metadata = trailingMetadata.ptr
}

when (val callResult = cq.runBatch(this@NativeClientCall.raw, op.ptr, 1u)) {
Expand All @@ -266,11 +270,13 @@ internal class NativeClientCall<Request, Response>(
val details = statusDetails.toByteArray().toKString()
val kStatusCode = statusCode.value.toKotlin()
val status = Status(kStatusCode, details, null)
val trailers = GrpcMetadata()
val trailers = GrpcMetadata(trailingMetadata)

// cleanup
grpc_slice_unref(statusDetails.readValue())
if (errorStr.value != null) gpr_free(errorStr.value)
// the entries are owned by the call object, so we must only destroy the array
grpc_metadata_array_destroy(trailingMetadata.readValue())
arena.clear()

// set close info and try to close the call.
Expand All @@ -296,29 +302,37 @@ internal class NativeClientCall<Request, Response>(
}
}

private fun sendAndReceiveInitialMetadata() {
private fun sendAndReceiveInitialMetadata(headers: GrpcMetadata) {
// sending and receiving initial metadata
val arena = Arena()
val opsNum = 2uL
val ops = arena.allocArray<grpc_op>(opsNum.convert())

// turn given headers into a grpc_metadata_array.
val sendInitialMetadata: grpc_metadata_array = with(headers) {
arena.allocRawGrpcMetadata()
}

// send initial meta data to server
// TODO: initial metadata
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA
ops[0].data.send_initial_metadata.count = 0u
ops[0].data.send_initial_metadata.count = sendInitialMetadata.count
ops[0].data.send_initial_metadata.metadata = sendInitialMetadata.metadata

val meta = arena.alloc<grpc_metadata_array>()
// TODO: make metadata array an object (for lifecycle management)
grpc_metadata_array_init(meta.ptr)
val recvInitialMetadata = arena.alloc<grpc_metadata_array>()
grpc_metadata_array_init(recvInitialMetadata.ptr)
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA
ops[1].data.recv_initial_metadata.recv_initial_metadata = meta.ptr
ops[1].data.recv_initial_metadata.recv_initial_metadata = recvInitialMetadata.ptr

runBatch(ops, opsNum, cleanup = {
grpc_metadata_array_destroy(meta.ptr)
// we must not destroy the array itself, as it is cleared when clearing the arena.
sendInitialMetadata.destroyEntries()
// the entries are owned by the call object, so we must only destroy the array
grpc_metadata_array_destroy(recvInitialMetadata.readValue())
arena.clear()
}) {
val headers = GrpcMetadata(recvInitialMetadata)
safeUserCode("Failed to call onHeaders.") {
listener?.onHeaders(GrpcMetadata())
listener?.onHeaders(headers)
}
}
}
Expand Down Expand Up @@ -447,4 +461,3 @@ internal class NativeClientCall<Request, Response>(
}
}


Loading
Loading