From d307fe1c0f72591e89afa8223b8204308824a22f Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Tue, 31 Oct 2023 14:12:28 +0100 Subject: [PATCH] ack is updated just before sending --- include/udx.h | 1 + src/io_posix.c | 21 +++++++++++++++++++++ src/udx.c | 3 +++ 3 files changed, 25 insertions(+) diff --git a/include/udx.h b/include/udx.h index 9ab71d5f..e7c03918 100644 --- a/include/udx.h +++ b/include/udx.h @@ -269,6 +269,7 @@ struct udx_packet_s { udx_fifo_t *send_queue; // pointer to socket->send_queue uint32_t fifo_gc; // index into socket->send_queue + udx_stream_t *stream; // pointer to the stream if stream packet uint8_t transmits; uint16_t size; diff --git a/src/io_posix.c b/src/io_posix.c index 179929bc..574ebad8 100644 --- a/src/io_posix.c +++ b/src/io_posix.c @@ -15,6 +15,7 @@ #include "fifo.h" #include "internal.h" #include "io.h" +#include "endian.h" #if defined(__APPLE__) @@ -57,6 +58,22 @@ udx__get_link_mtu (const struct sockaddr *addr) { } #endif +static void +ensure_latest_stream_ack (udx_packet_t *pkt) { + if (pkt->stream == NULL) return; // not a stream + + uint32_t *i = (uint32_t *) pkt->header; + + i += 4; + + uint32_t packet_ack = *i; + uint32_t actual_ack = udx__swap_uint32_if_be(pkt->stream->ack); + + if (packet_ack != actual_ack) { + *i = actual_ack; + } +} + ssize_t udx__sendmsg (udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, struct sockaddr *addr, int addr_len) { ssize_t size; @@ -137,6 +154,8 @@ udx__on_writable (udx_socket_t *socket) { pkt->dest_len = sizeof(struct sockaddr_in6); } + ensure_latest_stream_ack(pkt); + batch[pkts] = pkt; struct mmsghdr *p = &h[pkts]; memset(p, 0, sizeof(*p)); @@ -218,6 +237,8 @@ udx__on_writable (udx_socket_t *socket) { pkt->dest_len = sizeof(struct sockaddr_in6); } + ensure_latest_stream_ack(pkt); + ssize_t size = udx__sendmsg(socket, pkt->bufs, pkt->bufs_len, (struct sockaddr *) &(pkt->dest), pkt->dest_len); if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, socket->ttl); diff --git a/src/udx.c b/src/udx.c index 1f99f66c..762af688 100644 --- a/src/udx.c +++ b/src/udx.c @@ -498,6 +498,7 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_ pkt->dest = stream->remote_addr; pkt->dest_len = stream->remote_addr_len; pkt->send_queue = NULL; + pkt->stream = stream; pkt->bufs_len = 2; @@ -1148,6 +1149,7 @@ relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint8_ pkt->type = UDX_PACKET_STREAM_RELAY; pkt->header[3] = data_offset; pkt->seq = seq; + pkt->stream = NULL; pkt->send_queue = &relay->socket->send_queue; pkt->fifo_gc = udx__fifo_push(&relay->socket->send_queue, pkt); @@ -1579,6 +1581,7 @@ udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *handle, const uv_buf_ pkt->type = UDX_PACKET_SEND; pkt->ttl = ttl; pkt->ctx = req; + pkt->stream = NULL; if (dest->sa_family == AF_INET) { pkt->dest_len = sizeof(struct sockaddr_in);