Skip to content

Commit

Permalink
defer packet creation until write-ready event
Browse files Browse the repository at this point in the history
  • Loading branch information
jthomas43 committed Dec 5, 2023
1 parent ccf013e commit 69eff5d
Show file tree
Hide file tree
Showing 5 changed files with 582 additions and 536 deletions.
6 changes: 3 additions & 3 deletions examples/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ static size_t bytes_sent = 0;
static struct sockaddr_in dest_addr;

static uv_buf_t chunk;
static uv_buf_t empty = { .base = NULL, .len = 0 };
static uv_buf_t empty = {.base = NULL, .len = 0};

static bool printed_warning = false;

static void
pump_stream();
pump_stream ();

static void
on_close (udx_stream_t *stream, int status) {
Expand All @@ -46,7 +46,7 @@ on_ack (udx_stream_write_t *req, int status, int unordered) {

static void
on_ack_end (udx_stream_write_t *req, int status, int unordered) {
udx_stream_destroy(req->handle);
udx_stream_destroy(req->stream);
free(req);
}

Expand Down
29 changes: 17 additions & 12 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ extern "C" {
#define UDX_HEADER_MESSAGE 0b01000
#define UDX_HEADER_DESTROY 0b10000

#define UDX_STREAM_WRITE_WANT_DATA 0b001
#define UDX_STREAM_WRITE_WANT_STATE 0b010
#define UDX_STREAM_WRITE_WANT_DESTROY 0b100

typedef struct {
uint32_t seq;
} udx_cirbuf_val_t;
Expand Down Expand Up @@ -138,8 +142,6 @@ struct udx_socket_s {
uv_poll_t io_poll;

udx_fifo_t send_queue;
udx_fifo_t stream_queue; // when writing first fill this with all streams that need writing.
// todo: better data structure

udx_t *udx;
udx_cirbuf_t *streams_by_id; // for convenience
Expand Down Expand Up @@ -176,6 +178,7 @@ struct udx_stream_s {

int set_id;
int status;
int write_wanted;
int out_of_order;
int recovery; // number of packets to send before recovery finished
int deferred_ack;
Expand Down Expand Up @@ -232,11 +235,8 @@ struct udx_stream_s {
uint32_t rack_next_seq;
uint32_t rack_fack;

uint32_t pkts_waiting; // how many packets are added locally but not sent?
uint32_t pkts_inflight; // packets inflight to the other peer
uint32_t pkts_buffered; // how many (data) packets received but not processed (out of order)?
uint32_t retransmits_waiting; // how many retransmits are waiting to be sent? if 0, then inflight iteration is faster
uint32_t seq_flushed; // highest seq that has been flushed. usually seq_flushed == seq
uint32_t pkts_inflight; // packets inflight to the other peer
uint32_t pkts_buffered; // how many (data) packets received but not processed (out of order)?

// timestamps...
uint64_t rto_timeout;
Expand All @@ -257,8 +257,7 @@ struct udx_stream_s {
udx_cirbuf_t outgoing;
udx_cirbuf_t incoming;

udx_fifo_t inflight_queue;
udx_fifo_t retransmit_queue;
udx_fifo_t retransmit_queue; // udx_packet_t

udx_fifo_t unordered;
};
Expand All @@ -283,6 +282,10 @@ struct udx_packet_s {
struct sockaddr_storage dest;
int dest_len;

uint32_t fifo_gc; // for removing from inflight / retransmit queue
// udx_packet_t *prev; // alternative for inflight / retransmit queues
// udx_packet_t *next; // alternative for inflight / retransmit queues

// just alloc it in place here, easier to manage
char header[UDX_HEADER_SIZE];
unsigned int bufs_len;
Expand All @@ -299,10 +302,12 @@ struct udx_socket_send_s {
};

struct udx_stream_write_s {
// todo: more consistent to have 'buf' immutable
// and have bytes instead be 'acked_bytes'
size_t bytes; // buf.len + size of payloads in flight
// immutable, original write
uv_buf_t buf;

size_t bytes_acked;
size_t bytes_inflight;

bool is_write_end;

udx_stream_t *stream;
Expand Down
3 changes: 3 additions & 0 deletions src/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ udx__confirm_packet (udx_packet_t *pkt);
void
udx__cancel_packet (udx_packet_t *pkt);

void
udx__initialize_stream_queue (udx_socket_t *socket);

#endif // UDX_INTERNAL_H
149 changes: 66 additions & 83 deletions src/io_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,113 +101,95 @@ udx__recvmsg (udx_socket_t *handle, uv_buf_t *buf, struct sockaddr *addr, int ad

void
udx__on_writable (udx_socket_t *socket) {
/*
#ifdef UDX_PLATFORM_HAS_SENDMMSG
udx_fifo_t *fifo = &socket->send_queue;
#ifdef UDX_PLATFORM_HAS_SENDMMSG
bool finished = false;

while (fifo->len > 0) {
udx_packet_t *batch[UDX_SENDMMSG_BATCH_SIZE];
struct mmsghdr h[UDX_SENDMMSG_BATCH_SIZE];
while (!finished) {
udx_packet_t *batch[UDX_SENDMMSG_BATCH_SIZE];
struct mmsghdr h[UDX_SENDMMSG_BATCH_SIZE];

while (fifo->len > 0 && udx__fifo_peek(fifo) == NULL) {
udx__fifo_shift(fifo);
}
if (fifo->len == 0) {
return;
}
int npkts = 0;

int pkts = 0;
// todo: ttl

udx_packet_t *pkt = udx__fifo_peek(fifo);
int ttl = pkt->ttl;
bool adjust_ttl = ttl > 0 && socket->ttl != ttl;
int ttl = -1;
bool adjust_ttl;

while (pkts < UDX_SENDMMSG_BATCH_SIZE && fifo->len > 0) {
udx_packet_t *pkt = udx__fifo_shift(fifo);
while (npkts < UDX_SENDMMSG_BATCH_SIZE) {
udx_packet_t *pkt = udx__get_packet(socket);

if (pkt == NULL) continue;
// packet is null when descheduled after being acked
if (pkt->ttl != ttl) {
udx__fifo_undo(fifo);
break;
}
if (socket->family == 6 && pkt->dest.ss_family == AF_INET) {
addr_to_v6((struct sockaddr_in *) &(pkt->dest));
pkt->dest_len = sizeof(struct sockaddr_in6);
}
batch[pkts] = pkt;
struct mmsghdr *p = &h[pkts];
memset(p, 0, sizeof(*p));
p->msg_hdr.msg_name = &pkt->dest;
p->msg_hdr.msg_namelen = pkt->dest_len;
if (pkt == NULL) {
finished = true;
break;
}

p->msg_hdr.msg_iov = (struct iovec *) pkt->bufs;
p->msg_hdr.msg_iovlen = pkt->bufs_len;
if (ttl == -1) {
ttl = pkt->ttl;
adjust_ttl = ttl > 0 && socket->ttl != ttl;
}

pkts++;
if (pkt->ttl != ttl) {
udx__cancel_packet(pkt);
break;
}
uint64_t time_sent = uv_hrtime() / 1e6;

int rc;
if (socket->family == 6 && pkt->dest.ss_family == AF_INET) {
addr_to_v6((struct sockaddr_in *) &(pkt->dest));
pkt->dest_len = sizeof(struct sockaddr_in6);
}

if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, ttl);
batch[npkts] = pkt;
struct mmsghdr *p = &h[npkts];
memset(p, 0, sizeof(*p));
p->msg_hdr.msg_name = &pkt->dest;
p->msg_hdr.msg_namelen = pkt->dest_len;

do {
rc = sendmmsg(socket->io_poll.io_watcher.fd, h, pkts, 0);
} while (rc == -1 && errno == EINTR);
p->msg_hdr.msg_iov = (struct iovec *) pkt->bufs;
p->msg_hdr.msg_iovlen = pkt->bufs_len;

if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, socket->ttl);
npkts++;
}

rc = rc == -1 ? uv_translate_sys_error(errno) : rc;
uint64_t time_sent = uv_hrtime() / 1e6;

int nsent = rc > 0 ? rc : 0;
int rc;

if (rc < 0 && rc != UV_EAGAIN) {
nsent = pkts; // something errored badly, assume all packets sent and lost
}
if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, ttl);

int unsent = pkts - nsent;
do {
rc = sendmmsg(socket->io_poll.io_watcher.fd, h, npkts, 0);
} while (rc == -1 && errno == EINTR);

while (unsent > 0) {
// restore an unsent packet
udx__fifo_undo(fifo);
if (udx__fifo_peek(fifo) != NULL) {
unsent--;
}
}
if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, socket->ttl);

// update packet status for sent packets
rc = rc == -1 ? uv_translate_sys_error(errno) : rc;

for (int i = 0; i < nsent; i++) {
udx_packet_t *pkt = batch[i];
int nsent = rc > 0 ? rc : 0;

assert(pkt->status == UDX_PACKET_SENDING);
pkt->status = UDX_PACKET_INFLIGHT;
pkt->transmits++;
pkt->time_sent = time_sent;
if (rc < 0 && rc != UV_EAGAIN) {
nsent = npkts; // something errored badly, assume all packets sent and lost
}

int type = pkt->type;
int unsent = npkts - nsent;

if (type & (UDX_PACKET_STREAM_SEND | UDX_PACKET_STREAM_DESTROY | UDX_PACKET_SEND)) {
udx__trigger_send_callback(pkt);
// TODO: watch for re-entry here!
}
// cancel packets in reverse !
for (int i = npkts; i > npkts - unsent; i--) {
__builtin_trap();
udx__cancel_packet(batch[i - 1]);
}

if (type & UDX_PACKET_FREE_ON_SEND) {
free(pkt);
}
}
for (int i = 0; i < nsent; i++) {
udx_packet_t *pkt = batch[i];
// todo: set in confirm packet with uv_now()
pkt->time_sent = time_sent;
udx__confirm_packet(batch[i]);
}

if (rc == UV_EAGAIN) {
break;
}
if (rc == UV_EAGAIN) {
finished = true;
}
*/
// #else /* no sendmmsg */
initialize_stream_queue(socket);
}
#else /* no sendmmsg */
while (true) {
udx_packet_t *pkt = udx__get_packet(socket);
if (pkt == NULL) break;
Expand All @@ -229,8 +211,9 @@ udx__on_writable (udx_socket_t *socket) {
udx__cancel_packet(pkt);
break;
}

// todo: set in confirm packet with uv_now()
pkt->time_sent = uv_hrtime() / 1e6;
udx__confirm_packet(pkt);
}
// #endif
#endif
}
Loading

0 comments on commit 69eff5d

Please sign in to comment.