From a0c1d75e99b3dee00dd1891d1e04efe97da56c1d Mon Sep 17 00:00:00 2001 From: tanneberger Date: Fri, 25 Oct 2024 00:29:56 +0200 Subject: [PATCH] add functions to set serialize/deserialize functions --- .../posix/testing_tcp_ip_channel_server_callback.c | 4 +++- include/reactor-uc/encoding.h | 8 ++++++-- include/reactor-uc/network_channel.h | 8 +++++++- include/reactor-uc/platform/posix/tcp_ip_channel.h | 5 ++++- src/federated.c | 4 +++- src/platform/posix/tcp_ip_channel.c | 14 +++++++++++++- 6 files changed, 36 insertions(+), 7 deletions(-) diff --git a/examples/posix/testing_tcp_ip_channel_server_callback.c b/examples/posix/testing_tcp_ip_channel_server_callback.c index c6a4e50c..c922009e 100644 --- a/examples/posix/testing_tcp_ip_channel_server_callback.c +++ b/examples/posix/testing_tcp_ip_channel_server_callback.c @@ -5,7 +5,9 @@ #include TcpIpChannel channel; -void callback_handler(FederatedConnectionBundle *self, TaggedMessage *msg) { +void callback_handler(FederatedConnectionBundle *self, void *raw_msg) { + TaggedMessage* msg = raw_msg; + printf("Received message with connection number %i and content %s\n", msg->conn_id, (char *)msg->payload.bytes); channel.super.send(&channel.super, msg); } diff --git a/include/reactor-uc/encoding.h b/include/reactor-uc/encoding.h index 76fa5576..80ce6821 100644 --- a/include/reactor-uc/encoding.h +++ b/include/reactor-uc/encoding.h @@ -5,7 +5,9 @@ #include #include -int encode_protobuf(const TaggedMessage *message, unsigned char *buffer, size_t buffer_size) { +int encode_protobuf(const void *raw_message, unsigned char *buffer, size_t buffer_size) { + const TaggedMessage *message = raw_message; + // turing write buffer into pb_ostream buffer pb_ostream_t stream_out = pb_ostream_from_buffer(buffer, buffer_size); @@ -17,7 +19,9 @@ int encode_protobuf(const TaggedMessage *message, unsigned char *buffer, size_t return (int)stream_out.bytes_written; } -int decode_protobuf(TaggedMessage *message, const unsigned char *buffer, size_t buffer_size) { +int decode_protobuf(void *raw_message, const unsigned char *buffer, size_t buffer_size) { + TaggedMessage *message = raw_message; + pb_istream_t stream_in = pb_istream_from_buffer(buffer, buffer_size); if (!pb_decode(&stream_in, TaggedMessage_fields, message)) { diff --git a/include/reactor-uc/network_channel.h b/include/reactor-uc/network_channel.h index 1fd4b83a..2beeb6a8 100644 --- a/include/reactor-uc/network_channel.h +++ b/include/reactor-uc/network_channel.h @@ -12,17 +12,23 @@ typedef struct FederatedConnectionBundle FederatedConnectionBundle; typedef struct NetworkChannel NetworkChannel; +typedef int (*encode_message_hook)(const void *message, unsigned char *buffer, size_t buffer_size); +typedef int (*decode_message_hook)(void *message, const unsigned char *buffer, size_t buffer_size); + struct NetworkChannel { lf_ret_t (*bind)(NetworkChannel *self); lf_ret_t (*connect)(NetworkChannel *self); bool (*accept)(NetworkChannel *self); void (*close)(NetworkChannel *self); void (*register_callback)(NetworkChannel *self, - void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message), + void (*receive_callback)(FederatedConnectionBundle *conn, void *message), FederatedConnectionBundle *conn); lf_ret_t (*send)(NetworkChannel *self, TaggedMessage *message); TaggedMessage *(*receive)(NetworkChannel *self); void (*free)(NetworkChannel *self); + + void (*register_encode_hook)(NetworkChannel *self, encode_message_hook encode_hook); + void (*register_decode_hook)(NetworkChannel *self, decode_message_hook decode_hook); }; #endif // REACTOR_UC_NETWORK_CHANNEL_H diff --git a/include/reactor-uc/platform/posix/tcp_ip_channel.h b/include/reactor-uc/platform/posix/tcp_ip_channel.h index e3491414..ef86ad91 100644 --- a/include/reactor-uc/platform/posix/tcp_ip_channel.h +++ b/include/reactor-uc/platform/posix/tcp_ip_channel.h @@ -42,7 +42,10 @@ struct TcpIpChannel { char receive_thread_stack[TCP_IP_CHANNEL_RECV_THREAD_STACK_SIZE]; FederatedConnectionBundle *federated_connection; - void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *message); + void (*receive_callback)(FederatedConnectionBundle *conn, void *message); + + decode_message_hook decode_hook; + encode_message_hook encode_hook; }; void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family); diff --git a/src/federated.c b/src/federated.c index cfef3acd..3ed66340 100644 --- a/src/federated.c +++ b/src/federated.c @@ -112,7 +112,9 @@ void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *pare // Callback registered with the NetworkChannel. Is called asynchronously when there is a // a TaggedMessage available. -void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, TaggedMessage *msg) { +void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self, void *raw_msg) { + TaggedMessage *msg = raw_msg; + LF_DEBUG(FED, "Callback on FedConnBundle %p for message with tag=%" PRId64 ":%" PRIu32, self, msg->tag.time, msg->tag.microstep); assert(((size_t)msg->conn_id) < self->inputs_size); diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index 4a2ff5ed..d810ae53 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -205,7 +205,7 @@ void *TcpIpChannel_receive_thread(void *untyped_self) { } void TcpIpChannel_register_callback(NetworkChannel *untyped_self, - void (*receive_callback)(FederatedConnectionBundle *conn, TaggedMessage *msg), + void (*receive_callback)(FederatedConnectionBundle *conn, void *msg), FederatedConnectionBundle *conn) { int res; LF_INFO(NET, "TCP/IP registering callback thread"); @@ -238,6 +238,14 @@ void TcpIpChannel_register_callback(NetworkChannel *untyped_self, } } +void TcpIpChannel_register_decode_hook(NetworkChannel *self, decode_message_hook hook) { + ((TcpIpChannel *)self)->decode_hook = hook; +} + +void TcpIpChannel_register_encode_hook(NetworkChannel *self, encode_message_hook hook) { + ((TcpIpChannel *)self)->encode_hook = hook; +} + void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port, int protocol_family) { FD_ZERO(&self->set); @@ -260,9 +268,13 @@ void TcpIpChannel_ctor(TcpIpChannel *self, const char *host, unsigned short port self->super.receive = TcpIpChannel_receive; self->super.send = TcpIpChannel_send; self->super.register_callback = TcpIpChannel_register_callback; + self->super.register_decode_hook = TcpIpChannel_register_decode_hook; + self->super.register_encode_hook = TcpIpChannel_register_encode_hook; self->super.free = TcpIpChannel_free; self->receive_callback = NULL; self->federated_connection = NULL; + self->decode_hook = decode_protobuf; + self->encode_hook = encode_protobuf; } void TcpIpChannel_free(NetworkChannel *untyped_self) {