Skip to content

Commit

Permalink
Feature/replace hrtime (#176)
Browse files Browse the repository at this point in the history
* copied perf test from sendmmsg branch

* trailing whitespace

* fix some warnings, add warnings to build

* cleanup warnings

* performance: uv_hrtime -> uv_now

* simpler on_uv_poll

* clean up warnings

* remove debug message

* revert unnecessary change

* clean up

---------

Co-authored-by: James Thomas <jthomas>
  • Loading branch information
jthomas43 authored Mar 2, 2024
1 parent 36a2216 commit 8d99c1d
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 33 deletions.
8 changes: 4 additions & 4 deletions examples/client.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <stdio.h>
#include <uv.h>
#include <stdlib.h>
#include <uv.h>

#include "../include/udx.h"
#ifdef _WIN32
Expand Down Expand Up @@ -33,7 +33,7 @@ get_milliseconds () {

static void
on_uv_interval (uv_timer_t *handle) {
printf("received %zu bytes in %llu ms\n", bytes_recv, get_milliseconds() - started);
printf("received %zu bytes in %lu ms\n", bytes_recv, get_milliseconds() - started);
}

static void
Expand All @@ -45,7 +45,7 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
}

if (read_len < 0) {
printf("received %zu bytes in %llu ms\n", bytes_recv, get_milliseconds() - started);
printf("received %zu bytes in %lu ms\n", bytes_recv, get_milliseconds() - started);
printf("stream is done!\n");
exit(0);
}
Expand Down Expand Up @@ -80,7 +80,7 @@ main (int argc, char **argv) {
client_id = (uint32_t) getpid();
server_id = client_id + 1;

uint32_t ids[2] = { client_id, server_id };
uint32_t ids[2] = {client_id, server_id};

uv_buf_t buf = uv_buf_init((char *) ids, 8);
udx_socket_send(&req, &sock, &buf, 1, (struct sockaddr *) &dest_addr, on_send);
Expand Down
6 changes: 2 additions & 4 deletions src/io_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ udx__on_writable (udx_socket_t *socket) {
npkts++;
}

uint64_t time_sent = uv_hrtime() / 1e6;

int rc;

if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, ttl);
Expand Down Expand Up @@ -178,7 +176,7 @@ udx__on_writable (udx_socket_t *socket) {
for (int i = 0; i < nsent; i++) {
udx_packet_t *pkt = batch[i];
// todo: set in confirm packet with uv_now()
pkt->time_sent = time_sent;
pkt->time_sent = uv_now(socket->udx->loop);
udx__confirm_packet(batch[i]);
}

Expand Down Expand Up @@ -209,7 +207,7 @@ udx__on_writable (udx_socket_t *socket) {
break;
}
// todo: set in confirm packet with uv_now()
pkt->time_sent = uv_hrtime() / 1e6;
pkt->time_sent = uv_now(socket->udx->loop);
udx__confirm_packet(pkt);
}
#endif
Expand Down
3 changes: 1 addition & 2 deletions src/io_win.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ udx__on_writable (udx_socket_t *socket) {
udx__unshift_packet(pkt, socket);
break;
}
// todo: set in confirm packet with uv_now()
pkt->time_sent = uv_hrtime() / 1e6;
pkt->time_sent = uv_now(socket->udx->loop);
udx__confirm_packet(pkt);
}
}
30 changes: 10 additions & 20 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,6 @@ typedef struct {
uv_buf_t buf;
} udx_pending_read_t;

static uint64_t
get_microseconds () {
return uv_hrtime() / 1000;
}

static uint64_t
get_milliseconds () {
return get_microseconds() / 1000;
}

static inline uint32_t
cubic_root (uint64_t a) {
return (uint32_t) cbrt(a);
Expand Down Expand Up @@ -487,7 +477,6 @@ on_bytes_acked (udx_stream_t *stream, udx_stream_write_t *w, size_t bytes, bool

static void
clear_outgoing_packets (udx_stream_t *stream) {
// todo: simplify with

// We should make sure all existing packets do not send, and notify the user that they failed
for (uint32_t seq = stream->remote_acked; seq != stream->seq; seq++) {
Expand Down Expand Up @@ -645,7 +634,6 @@ get_stream (udx_socket_t *socket) {

udx_packet_t *
udx__shift_packet (udx_socket_t *socket) {
// debug_printf("in get packet\n");

while (socket->send_queue.len > 0) {
udx_packet_t *pkt = udx__fifo_shift(&socket->send_queue);
Expand Down Expand Up @@ -1007,7 +995,7 @@ static void
rack_detect_loss (udx_stream_t *stream) {
uint64_t timeout = 0;
uint32_t reo_wnd = rack_update_reo_wnd(stream);
uint64_t now = 0;
uint64_t now = uv_now(stream->udx->loop);

int resending = 0;
int mtu_probes_lost = 0;
Expand All @@ -1018,8 +1006,6 @@ rack_detect_loss (udx_stream_t *stream) {
if (pkt == NULL || pkt->status != UDX_PACKET_STATE_INFLIGHT) continue;
assert(pkt->transmits > 0);

if (!now) now = get_milliseconds();

// debug_printf("%lu > %lu=%d\n", stream->rack_time_sent, pkt->time_sent, stream->rack_time_sent > pkt->time_sent);

if (!rack_sent_after(stream->rack_time_sent, stream->rack_next_seq, pkt->time_sent, pkt->seq + 1)) {
Expand Down Expand Up @@ -1070,7 +1056,7 @@ rack_detect_loss (udx_stream_t *stream) {

static void
ack_update (udx_stream_t *stream, uint32_t acked, bool is_limited) {
uint64_t time = get_milliseconds();
uint64_t time = uv_now(stream->udx->loop);

// also reset rto, since things are moving forward...
stream->rto_timeout = time + stream->rto;
Expand Down Expand Up @@ -1144,7 +1130,7 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) {
stream->inflight -= pkt->size;
}

const uint64_t time = get_milliseconds();
const uint64_t time = uv_now(stream->udx->loop);
const uint32_t rtt = (uint32_t) (time - pkt->time_sent);
const uint32_t next = seq + 1;

Expand Down Expand Up @@ -1562,6 +1548,10 @@ on_uv_poll (uv_poll_t *handle, int status, int events) {
}

if (events & UV_WRITABLE) {
if (events & UV_READABLE) {
// compensate for potentially long-running read callbacks
uv_update_time(handle->loop);
}
udx__on_writable(socket);
if (socket->status & UDX_SOCKET_CLOSING && socket->send_queue.len == 0 && !check_if_streams_have_data(socket)) {
udx__close_handles(socket);
Expand Down Expand Up @@ -1857,7 +1847,7 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream
stream->srtt = 0;
stream->rttvar = 0;
stream->rto = 1000;
stream->rto_timeout = get_milliseconds() + stream->rto;
stream->rto_timeout = uv_now(udx->loop) + stream->rto;
stream->rack_timeout = 0;

stream->rack_rtt_min = 0;
Expand Down Expand Up @@ -1999,7 +1989,7 @@ udx_stream_check_timeouts (udx_stream_t *stream) {
return 0;
}

const uint64_t now = stream->inflight ? get_milliseconds() : 0;
const uint64_t now = stream->inflight ? uv_now(stream->udx->loop) : 0;

if (stream->rack_timeout > 0 && now >= stream->rack_timeout) {
rack_detect_loss(stream);
Expand Down Expand Up @@ -2213,7 +2203,7 @@ udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t

// if this is the first inflight packet, we should "restart" rto timer
if (stream->inflight == 0) {
stream->rto_timeout = get_milliseconds() + stream->rto;
stream->rto_timeout = uv_now(stream->udx->loop) + stream->rto;
}

req->bytes_acked = 0;
Expand Down
2 changes: 1 addition & 1 deletion test/stream-multiple.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ main () {
uv_run(&loop, UV_RUN_DEFAULT);

for (int i = 0; i < NSTREAMS; i++) {
printf("%d: send_hash=%x receive_hash=%x sent_bytes=%lu recv_bytes=%lu\n", i, sender[i].write_hash, receiver[i].read_hash, NBYTES_TO_SEND, receiver[i].nbytes_read);
printf("%d: send_hash=%lx receive_hash=%lx sent_bytes=%d recv_bytes=%lu\n", i, sender[i].write_hash, receiver[i].read_hash, NBYTES_TO_SEND, receiver[i].nbytes_read);
assert(sender[i].write_hash == receiver[i].read_hash);
assert(receiver[i].nbytes_read == NBYTES_TO_SEND);
}
Expand Down
2 changes: 1 addition & 1 deletion test/stream-relay.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
assert(buf->len == read_len);

if (nbytes_read == 0) {
printf("read_len=%d\n", read_len);
printf("read_len=%ld\n", read_len);
assert(memcmp(buf->base, "hello", 5) == 0);
}
read_hash = hash(read_hash, buf->base, read_len);
Expand Down
2 changes: 1 addition & 1 deletion test/stream-write-read-perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ main () {

// just for valgrind
free(data);
printf("readhash=%x writehash=%x\n", read_hash, write_hash);
printf("readhash=%lx writehash=%lx\n", read_hash, write_hash);
assert(read_hash == write_hash);
return 0;
}

0 comments on commit 8d99c1d

Please sign in to comment.