From ce499111499e5fd5b3bccbd3f45790972edc097f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jarom=C3=ADr=20Wysoglad?= Date: Mon, 3 Aug 2020 11:51:15 +0200 Subject: [PATCH 1/2] Add AMQP list message type support. --- socket_snd_th.c | 92 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 31 deletions(-) diff --git a/socket_snd_th.c b/socket_snd_th.c index 263a95e..495b62f 100644 --- a/socket_snd_th.c +++ b/socket_snd_th.c @@ -90,6 +90,63 @@ static int prepare_send_socket_inet(app_data_t *app) { return 0; } +static int process_message_binary(app_data_t *app, pn_data_t *body) { + pn_bytes_t b = pn_data_get_bytes(body); + if (b.start != NULL) { + int send_flags = app->socket_flags; + + ssize_t sent_bytes = sendto(app->send_sock, b.start, b.size, send_flags, + &app->sa, app->sa_len); + if (sent_bytes <= 0) { + // MSG_DONTWAIT is set + switch (errno) { + case EAGAIN: + // Normal backup + app->sock_would_block++; + break; + case EBADF: + case ENOTSOCK: + // sockfd is not a valid file descriptor + // TODO reopen socket + perror("SG Send"); + return 1; + break; + case ECONNREFUSED: + break; + default: + perror("SG Send"); + printf("%d ",errno); + return 1; + } + } else { + app->sock_sent++; + } + } + return 0; +} + +static int process_message_body(app_data_t *app, pn_data_t *body) { + int err = 0; + if (pn_data_type(body) == PN_LIST) { + size_t count = pn_data_get_list(body); + pn_data_enter(body); + for (size_t i = 0; i < count; i++) { + if (pn_data_next(body) && !err) { + err = process_message_body(app, body); + } + } + pn_data_exit(body); + } else if (pn_data_type(body) == PN_SYMBOL || + pn_data_type(body) == PN_STRING || + pn_data_type(body) == PN_BINARY) { + err = process_message_binary(app, body); + } else { + perror("Unexpected message datatype recieved."); + err = 1; + } + return err; +} + static int decode_message(app_data_t *app, pn_rwbytes_t data) { pn_message_t *m; @@ -105,36 +162,9 @@ static int decode_message(app_data_t *app, pn_rwbytes_t data) { if (!err) { pn_data_t *body = pn_message_body(m); if (pn_data_next(body)) { - pn_bytes_t b = pn_data_get_bytes(body); - if (b.start != NULL) { - int send_flags = app->socket_flags; - - ssize_t sent_bytes = sendto(app->send_sock, b.start, b.size, send_flags, - &app->sa, app->sa_len); - if (sent_bytes <= 0) { - // MSG_DONTWAIT is set - switch (errno) { - case EAGAIN: - // Normal backup - app->sock_would_block++; - break; - case EBADF: - case ENOTSOCK: - // sockfd is not a valid file descriptor - // TODO reopen socket - perror("SG Send"); - return 1; - break; - case ECONNREFUSED: - break; - default: - perror("SG Send"); - printf("%d ",errno); - return 1; - } - } else { - app->sock_sent++; - } + err = process_message_body(app, body); + if (err) { + return 1; } } } else { @@ -202,4 +232,4 @@ void *socket_snd_th(void *app_ptr) { pthread_cleanup_pop(1); return NULL; -} \ No newline at end of file +} From 9ff915fbf8d0acb09cfae7d50cc8c60317507a87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jarom=C3=ADr=20Wysoglad?= Date: Wed, 12 Aug 2020 17:31:14 +0200 Subject: [PATCH 2/2] Don't exit list processing on error. When encountering an error in one of the list nodes, continue processing the next node instead of exiting. --- socket_snd_th.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/socket_snd_th.c b/socket_snd_th.c index 495b62f..df243d5 100644 --- a/socket_snd_th.c +++ b/socket_snd_th.c @@ -131,8 +131,8 @@ static int process_message_body(app_data_t *app, pn_data_t *body) { size_t count = pn_data_get_list(body); pn_data_enter(body); for (size_t i = 0; i < count; i++) { - if (pn_data_next(body) && !err) { - err = process_message_body(app, body); + if (pn_data_next(body)) { + err += process_message_body(app, body); } } pn_data_exit(body);