Skip to content

Commit

Permalink
Feature/refactor outgoing p2 (#172)
Browse files Browse the repository at this point in the history
* copied perf test from sendmmsg branch

* trailing whitespace

* fix some warnings, add warnings to build

* cleanup warnings

* rename udx_stream / udx_socket variables to 'stream' and 'socket' instead of 'handle'. handle now reserved for libuv uv_handle_t types. also fixes some debug code

* fix windows compile

* unfinished obviously

* missed file. still unfinished.

* defer packet creation until write-ready event

* forgot io_win

* free packet created on relay slow path, removed debug code

* edge case: packet arrives for a socket that is reading but not yet connected.

* PR comments + cleanup

* don't set write_wanted in response to packet being acked

* clean up dead code / comments

* dead comment
  • Loading branch information
jthomas43 authored Dec 27, 2023
1 parent 6577bfd commit 33fb62d
Show file tree
Hide file tree
Showing 8 changed files with 820 additions and 734 deletions.
6 changes: 3 additions & 3 deletions examples/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ static size_t bytes_sent = 0;
static struct sockaddr_in dest_addr;

static uv_buf_t chunk;
static uv_buf_t empty = { .base = NULL, .len = 0 };
static uv_buf_t empty = {.base = NULL, .len = 0};

static bool printed_warning = false;

static void
pump_stream();
pump_stream ();

static void
on_close (udx_stream_t *stream, int status) {
Expand All @@ -46,7 +46,7 @@ on_ack (udx_stream_write_t *req, int status, int unordered) {

static void
on_ack_end (udx_stream_write_t *req, int status, int unordered) {
udx_stream_destroy(req->handle);
udx_stream_destroy(req->stream);
free(req);
}

Expand Down
146 changes: 78 additions & 68 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,27 @@ extern "C" {
#define UDX_STREAM_DESTROYED_REMOTE 0b01000000000
#define UDX_STREAM_CLOSED 0b10000000000

#define UDX_PACKET_WAITING 1
#define UDX_PACKET_SENDING 2
#define UDX_PACKET_INFLIGHT 3
#define UDX_PACKET_STATE_UNCOMMITTED 0
#define UDX_PACKET_STATE_INFLIGHT 1
#define UDX_PACKET_STATE_RETRANSMIT 2

#define UDX_PACKET_STREAM_RELAY 0b0
#define UDX_PACKET_STREAM_STATE 0b00001
#define UDX_PACKET_STREAM_WRITE 0b00010
#define UDX_PACKET_STREAM_SEND 0b00100
#define UDX_PACKET_STREAM_DESTROY 0b01000
#define UDX_PACKET_SEND 0b10000
#define UDX_PACKET_TYPE_STREAM_RELAY 0b00000
#define UDX_PACKET_TYPE_STREAM_STATE 0b00001
#define UDX_PACKET_TYPE_STREAM_WRITE 0b00010
#define UDX_PACKET_TYPE_STREAM_SEND 0b00100
#define UDX_PACKET_TYPE_STREAM_DESTROY 0b01000
#define UDX_PACKET_TYPE_SOCKET_SEND 0b10000

#define UDX_HEADER_DATA 0b00001
#define UDX_HEADER_END 0b00010
#define UDX_HEADER_SACK 0b00100
#define UDX_HEADER_MESSAGE 0b01000
#define UDX_HEADER_DESTROY 0b10000

#define UDX_STREAM_WRITE_WANT_DATA 0b001
#define UDX_STREAM_WRITE_WANT_STATE 0b010
#define UDX_STREAM_WRITE_WANT_DESTROY 0b100

typedef struct {
uint32_t seq;
} udx_cirbuf_val_t;
Expand All @@ -88,9 +92,8 @@ typedef struct udx_stream_s udx_stream_t;
typedef struct udx_packet_s udx_packet_t;

typedef struct udx_socket_send_s udx_socket_send_t;

typedef struct udx_stream_write_s udx_stream_write_t;
typedef struct udx_stream_send_s udx_stream_send_t;
typedef struct udx_stream_write_s udx_stream_write_t;

typedef enum {
UDX_LOOKUP_FAMILY_IPV4 = 1,
Expand All @@ -102,17 +105,17 @@ typedef struct udx_lookup_s udx_lookup_t;
typedef struct udx_interface_event_s udx_interface_event_t;

typedef void (*udx_socket_send_cb)(udx_socket_send_t *req, int status);
typedef void (*udx_socket_recv_cb)(udx_socket_t *handle, ssize_t read_len, const uv_buf_t *buf, const struct sockaddr *from);
typedef void (*udx_socket_close_cb)(udx_socket_t *handle);
typedef void (*udx_socket_recv_cb)(udx_socket_t *socket, ssize_t read_len, const uv_buf_t *buf, const struct sockaddr *from);
typedef void (*udx_socket_close_cb)(udx_socket_t *socket);

typedef int (*udx_stream_firewall_cb)(udx_stream_t *handle, udx_socket_t *socket, const struct sockaddr *from);
typedef void (*udx_stream_read_cb)(udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf);
typedef void (*udx_stream_drain_cb)(udx_stream_t *handle);
typedef void (*udx_stream_remote_changed_cb)(udx_stream_t *handle);
typedef int (*udx_stream_firewall_cb)(udx_stream_t *stream, udx_socket_t *socket, const struct sockaddr *from);
typedef void (*udx_stream_read_cb)(udx_stream_t *stream, ssize_t read_len, const uv_buf_t *buf);
typedef void (*udx_stream_drain_cb)(udx_stream_t *stream);
typedef void (*udx_stream_remote_changed_cb)(udx_stream_t *stream);
typedef void (*udx_stream_ack_cb)(udx_stream_write_t *req, int status, int unordered);
typedef void (*udx_stream_send_cb)(udx_stream_send_t *req, int status);
typedef void (*udx_stream_recv_cb)(udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf);
typedef void (*udx_stream_close_cb)(udx_stream_t *handle, int status);
typedef void (*udx_stream_recv_cb)(udx_stream_t *stream, ssize_t read_len, const uv_buf_t *buf);
typedef void (*udx_stream_close_cb)(udx_stream_t *stream, int status);

typedef void (*udx_lookup_cb)(udx_lookup_t *handle, int status, const struct sockaddr *addr, int addr_len);

Expand All @@ -135,8 +138,9 @@ struct udx_s {
};

struct udx_socket_s {
uv_udp_t socket;
uv_udp_t handle;
uv_poll_t io_poll;

udx_fifo_t send_queue;

udx_t *udx;
Expand Down Expand Up @@ -174,8 +178,9 @@ struct udx_stream_s {

int set_id;
int status;
int write_wanted;
int out_of_order;
int recovery;
int recovery; // number of packets to send before recovery finished
int deferred_ack;

bool hit_high_watermark;
Expand Down Expand Up @@ -230,11 +235,8 @@ struct udx_stream_s {
uint32_t rack_next_seq;
uint32_t rack_fack;

uint32_t pkts_waiting; // how many packets are added locally but not sent?
uint32_t pkts_inflight; // packets inflight to the other peer
uint32_t pkts_buffered; // how many (data) packets received but not processed (out of order)?
uint32_t retransmits_waiting; // how many retransmits are waiting to be sent? if 0, then inflight iteration is faster
uint32_t seq_flushed; // highest seq that has been flushed
uint32_t pkts_inflight; // packets inflight to the other peer
uint32_t pkts_buffered; // how many (data) packets received but not processed (out of order)?

// timestamps...
uint64_t rto_timeout;
Expand All @@ -255,6 +257,8 @@ struct udx_stream_s {
udx_cirbuf_t outgoing;
udx_cirbuf_t incoming;

udx_fifo_t retransmit_queue; // udx_packet_t

udx_fifo_t unordered;
};

Expand All @@ -266,9 +270,7 @@ struct udx_packet_s {
int ttl;
int is_retransmit;

udx_fifo_t *send_queue; // pointer to socket->send_queue
uint32_t fifo_gc; // index into socket->send_queue
udx_stream_t *stream; // pointer to the stream if stream packet
udx_stream_t *stream; // pointer to the stream if stream packet

uint8_t transmits;
bool is_mtu_probe;
Expand All @@ -280,6 +282,10 @@ struct udx_packet_s {
struct sockaddr_storage dest;
int dest_len;

uint32_t fifo_gc; // for removing from inflight / retransmit queue
// udx_packet_t *prev; // alternative for inflight / retransmit queues
// udx_packet_t *next; // alternative for inflight / retransmit queues

// just alloc it in place here, easier to manage
char header[UDX_HEADER_SIZE];
unsigned int bufs_len;
Expand All @@ -288,27 +294,31 @@ struct udx_packet_s {

struct udx_socket_send_s {
udx_packet_t pkt;
udx_socket_t *handle;
udx_socket_t *socket;

udx_socket_send_cb on_send;

void *data;
};

struct udx_stream_write_s {
size_t bytes; // buf.len + size of payloads in flight
// immutable, original write
uv_buf_t buf;

size_t bytes_acked;
size_t bytes_inflight;

bool is_write_end;

udx_stream_t *handle;
udx_stream_t *stream;
udx_stream_ack_cb on_ack;

void *data;
};

struct udx_stream_send_s {
udx_packet_t pkt;
udx_stream_t *handle;
udx_stream_t *stream;

udx_stream_send_cb on_send;

Expand Down Expand Up @@ -339,114 +349,114 @@ struct udx_interface_event_s {
};

int
udx_init (uv_loop_t *loop, udx_t *handle);
udx_init (uv_loop_t *loop, udx_t *udx);

int
udx_socket_init (udx_t *handle, udx_socket_t *socket);
udx_socket_init (udx_t *udx, udx_socket_t *socket);

int
udx_socket_get_send_buffer_size (udx_socket_t *handle, int *value);
udx_socket_get_send_buffer_size (udx_socket_t *socket, int *value);

int
udx_socket_set_send_buffer_size (udx_socket_t *handle, int value);
udx_socket_set_send_buffer_size (udx_socket_t *socket, int value);

int
udx_socket_get_recv_buffer_size (udx_socket_t *handle, int *value);
udx_socket_get_recv_buffer_size (udx_socket_t *socket, int *value);

int
udx_socket_set_recv_buffer_size (udx_socket_t *handle, int value);
udx_socket_set_recv_buffer_size (udx_socket_t *socket, int value);

int
udx_socket_get_ttl (udx_socket_t *handle, int *ttl);
udx_socket_get_ttl (udx_socket_t *socket, int *ttl);

int
udx_socket_set_ttl (udx_socket_t *handle, int ttl);
udx_socket_set_ttl (udx_socket_t *socket, int ttl);

int
udx_socket_bind (udx_socket_t *handle, const struct sockaddr *addr, unsigned int flags);
udx_socket_bind (udx_socket_t *socket, const struct sockaddr *addr, unsigned int flags);

int
udx_socket_getsockname (udx_socket_t *handle, struct sockaddr *name, int *name_len);
udx_socket_getsockname (udx_socket_t *socket, struct sockaddr *name, int *name_len);

int
udx_socket_send (udx_socket_send_t *req, udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *addr, udx_socket_send_cb cb);
udx_socket_send (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *addr, udx_socket_send_cb cb);

int
udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *addr, int ttl, udx_socket_send_cb cb);
udx_socket_send_ttl (udx_socket_send_t *req, udx_socket_t *socket, const uv_buf_t bufs[], unsigned int bufs_len, const struct sockaddr *addr, int ttl, udx_socket_send_cb cb);

int
udx_socket_recv_start (udx_socket_t *handle, udx_socket_recv_cb cb);
udx_socket_recv_start (udx_socket_t *socket, udx_socket_recv_cb cb);

int
udx_socket_recv_stop (udx_socket_t *handle);
udx_socket_recv_stop (udx_socket_t *socket);

int
udx_socket_close (udx_socket_t *handle, udx_socket_close_cb cb);
udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb);

// only exposed here as a convenience / debug tool - the udx instance uses this automatically
int
udx_check_timeouts (udx_t *handle);
udx_check_timeouts (udx_t *udx);

int
udx_stream_init (udx_t *udx, udx_stream_t *handle, uint32_t local_id, udx_stream_close_cb close_cb);
udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream_close_cb close_cb);

int
udx_stream_get_mtu (udx_stream_t *handle, uint16_t *mtu);
udx_stream_get_mtu (udx_stream_t *stream, uint16_t *mtu);

int
udx_stream_get_seq (udx_stream_t *handle, uint32_t *seq);
udx_stream_get_seq (udx_stream_t *stream, uint32_t *seq);

int
udx_stream_set_seq (udx_stream_t *handle, uint32_t seq);
udx_stream_set_seq (udx_stream_t *stream, uint32_t seq);

int
udx_stream_get_ack (udx_stream_t *handle, uint32_t *ack);
udx_stream_get_ack (udx_stream_t *stream, uint32_t *ack);

int
udx_stream_set_ack (udx_stream_t *handle, uint32_t ack);
udx_stream_set_ack (udx_stream_t *stream, uint32_t ack);

int
udx_stream_connect (udx_stream_t *handle, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr);
udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr);

int
udx_stream_change_remote (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr, udx_stream_remote_changed_cb remote_changed_cb);

int
udx_stream_relay_to (udx_stream_t *handle, udx_stream_t *destination);
udx_stream_relay_to (udx_stream_t *stream, udx_stream_t *destination);

int
udx_stream_firewall (udx_stream_t *handle, udx_stream_firewall_cb firewall_cb);
udx_stream_firewall (udx_stream_t *stream, udx_stream_firewall_cb firewall_cb);

int
udx_stream_recv_start (udx_stream_t *handle, udx_stream_recv_cb cb);
udx_stream_recv_start (udx_stream_t *stream, udx_stream_recv_cb cb);

int
udx_stream_recv_stop (udx_stream_t *handle);
udx_stream_recv_stop (udx_stream_t *stream);

int
udx_stream_read_start (udx_stream_t *handle, udx_stream_read_cb cb);
udx_stream_read_start (udx_stream_t *stream, udx_stream_read_cb cb);

int
udx_stream_read_stop (udx_stream_t *handle);
udx_stream_read_stop (udx_stream_t *stream);

// only exposed here as a convenience / debug tool - the udx instance uses this automatically
int
udx_stream_check_timeouts (udx_stream_t *handle);
udx_stream_check_timeouts (udx_stream_t *stream);

int
udx_stream_send (udx_stream_send_t *req, udx_stream_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb);
udx_stream_send (udx_stream_send_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb);

int
udx_stream_write_resume (udx_stream_t *handle, udx_stream_drain_cb drain_cb);
udx_stream_write_resume (udx_stream_t *stream, udx_stream_drain_cb drain_cb);

int
udx_stream_write (udx_stream_write_t *req, udx_stream_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb);
udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb);

int
udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *handle, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb);
udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb);

int
udx_stream_destroy (udx_stream_t *handle);
udx_stream_destroy (udx_stream_t *stream);

int
udx_lookup (uv_loop_t *loop, udx_lookup_t *req, const char *host, unsigned int flags, udx_lookup_cb cb);
Expand Down
11 changes: 4 additions & 7 deletions src/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,15 @@ debug_print_outgoing (udx_stream_t *stream) {
continue;
}

if (pkt->status == UDX_PACKET_INFLIGHT) {
if (pkt->status == UDX_PACKET_STATE_INFLIGHT) {
debug_printf("I");
continue;
}
if (pkt->status == UDX_PACKET_SENDING) {
debug_printf("S");
continue;
}
if (pkt->status == UDX_PACKET_WAITING) {
debug_printf("W");
if (pkt->status == UDX_PACKET_STATE_RETRANSMIT) {
debug_printf("R");
continue;
}
assert(false && "should only be inflight or retransmitting");
}
debug_printf("\n");
}
Expand Down
11 changes: 9 additions & 2 deletions src/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

#include "../include/udx.h"

#define UDX_PACKET_CALLBACK (UDX_PACKET_STREAM_SEND | UDX_PACKET_STREAM_DESTROY | UDX_PACKET_SEND)
#define UDX_PACKET_FREE_ON_SEND (UDX_PACKET_STREAM_STATE | UDX_PACKET_STREAM_DESTROY)
#define UDX_PACKET_CALLBACK (UDX_PACKET_TYPE_STREAM_SEND | UDX_PACKET_TYPE_STREAM_DESTROY | UDX_PACKET_TYPE_SOCKET_SEND)
#define UDX_PACKET_FREE_ON_SEND (UDX_PACKET_TYPE_STREAM_STATE | UDX_PACKET_TYPE_STREAM_DESTROY | UDX_PACKET_TYPE_STREAM_RELAY)

#define UDX_UNUSED(x) ((void) (x))

Expand Down Expand Up @@ -36,4 +36,11 @@ udx__trigger_send_callback (udx_packet_t *packet);
void
udx__close_handles (udx_socket_t *socket);

udx_packet_t *
udx__shift_packet (udx_socket_t *socket);
void
udx__confirm_packet (udx_packet_t *pkt);
void
udx__unshift_packet (udx_packet_t *pkt, udx_socket_t *socket);

#endif // UDX_INTERNAL_H
Loading

0 comments on commit 33fb62d

Please sign in to comment.