diff --git a/examples/client.c b/examples/client.c index 050e52b2..dbe34ced 100644 --- a/examples/client.c +++ b/examples/client.c @@ -1,6 +1,6 @@ #include -#include #include +#include #include "../include/udx.h" #ifdef _WIN32 @@ -33,7 +33,7 @@ get_milliseconds () { static void on_uv_interval (uv_timer_t *handle) { - printf("received %zu bytes in %llu ms\n", bytes_recv, get_milliseconds() - started); + printf("received %zu bytes in %lu ms\n", bytes_recv, get_milliseconds() - started); } static void @@ -45,7 +45,7 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) { } if (read_len < 0) { - printf("received %zu bytes in %llu ms\n", bytes_recv, get_milliseconds() - started); + printf("received %zu bytes in %lu ms\n", bytes_recv, get_milliseconds() - started); printf("stream is done!\n"); exit(0); } @@ -80,7 +80,7 @@ main (int argc, char **argv) { client_id = (uint32_t) getpid(); server_id = client_id + 1; - uint32_t ids[2] = { client_id, server_id }; + uint32_t ids[2] = {client_id, server_id}; uv_buf_t buf = uv_buf_init((char *) ids, 8); udx_socket_send(&req, &sock, &buf, 1, (struct sockaddr *) &dest_addr, on_send); diff --git a/src/io_posix.c b/src/io_posix.c index 4710e9f5..7d5f6dbe 100644 --- a/src/io_posix.c +++ b/src/io_posix.c @@ -148,8 +148,6 @@ udx__on_writable (udx_socket_t *socket) { npkts++; } - uint64_t time_sent = uv_hrtime() / 1e6; - int rc; if (adjust_ttl) uv_udp_set_ttl((uv_udp_t *) socket, ttl); @@ -178,7 +176,7 @@ udx__on_writable (udx_socket_t *socket) { for (int i = 0; i < nsent; i++) { udx_packet_t *pkt = batch[i]; // todo: set in confirm packet with uv_now() - pkt->time_sent = time_sent; + pkt->time_sent = uv_now(socket->udx->loop); udx__confirm_packet(batch[i]); } @@ -209,7 +207,7 @@ udx__on_writable (udx_socket_t *socket) { break; } // todo: set in confirm packet with uv_now() - pkt->time_sent = uv_hrtime() / 1e6; + pkt->time_sent = uv_now(socket->udx->loop); udx__confirm_packet(pkt); } #endif diff --git a/src/io_win.c b/src/io_win.c index 92208038..dc7618d4 100644 --- a/src/io_win.c +++ b/src/io_win.c @@ -106,8 +106,7 @@ udx__on_writable (udx_socket_t *socket) { udx__unshift_packet(pkt, socket); break; } - // todo: set in confirm packet with uv_now() - pkt->time_sent = uv_hrtime() / 1e6; + pkt->time_sent = uv_now(socket->udx->loop); udx__confirm_packet(pkt); } } diff --git a/src/udx.c b/src/udx.c index 68ac19e7..54c35bdf 100644 --- a/src/udx.c +++ b/src/udx.c @@ -57,16 +57,6 @@ typedef struct { uv_buf_t buf; } udx_pending_read_t; -static uint64_t -get_microseconds () { - return uv_hrtime() / 1000; -} - -static uint64_t -get_milliseconds () { - return get_microseconds() / 1000; -} - static inline uint32_t cubic_root (uint64_t a) { return (uint32_t) cbrt(a); @@ -487,7 +477,6 @@ on_bytes_acked (udx_stream_t *stream, udx_stream_write_t *w, size_t bytes, bool static void clear_outgoing_packets (udx_stream_t *stream) { - // todo: simplify with // We should make sure all existing packets do not send, and notify the user that they failed for (uint32_t seq = stream->remote_acked; seq != stream->seq; seq++) { @@ -645,7 +634,6 @@ get_stream (udx_socket_t *socket) { udx_packet_t * udx__shift_packet (udx_socket_t *socket) { - // debug_printf("in get packet\n"); while (socket->send_queue.len > 0) { udx_packet_t *pkt = udx__fifo_shift(&socket->send_queue); @@ -1007,7 +995,7 @@ static void rack_detect_loss (udx_stream_t *stream) { uint64_t timeout = 0; uint32_t reo_wnd = rack_update_reo_wnd(stream); - uint64_t now = 0; + uint64_t now = uv_now(stream->udx->loop); int resending = 0; int mtu_probes_lost = 0; @@ -1018,8 +1006,6 @@ rack_detect_loss (udx_stream_t *stream) { if (pkt == NULL || pkt->status != UDX_PACKET_STATE_INFLIGHT) continue; assert(pkt->transmits > 0); - if (!now) now = get_milliseconds(); - // debug_printf("%lu > %lu=%d\n", stream->rack_time_sent, pkt->time_sent, stream->rack_time_sent > pkt->time_sent); if (!rack_sent_after(stream->rack_time_sent, stream->rack_next_seq, pkt->time_sent, pkt->seq + 1)) { @@ -1070,7 +1056,7 @@ rack_detect_loss (udx_stream_t *stream) { static void ack_update (udx_stream_t *stream, uint32_t acked, bool is_limited) { - uint64_t time = get_milliseconds(); + uint64_t time = uv_now(stream->udx->loop); // also reset rto, since things are moving forward... stream->rto_timeout = time + stream->rto; @@ -1144,7 +1130,7 @@ ack_packet (udx_stream_t *stream, uint32_t seq, int sack) { stream->inflight -= pkt->size; } - const uint64_t time = get_milliseconds(); + const uint64_t time = uv_now(stream->udx->loop); const uint32_t rtt = (uint32_t) (time - pkt->time_sent); const uint32_t next = seq + 1; @@ -1562,6 +1548,10 @@ on_uv_poll (uv_poll_t *handle, int status, int events) { } if (events & UV_WRITABLE) { + if (events & UV_READABLE) { + // compensate for potentially long-running read callbacks + uv_update_time(handle->loop); + } udx__on_writable(socket); if (socket->status & UDX_SOCKET_CLOSING && socket->send_queue.len == 0 && !check_if_streams_have_data(socket)) { udx__close_handles(socket); @@ -1857,7 +1847,7 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream stream->srtt = 0; stream->rttvar = 0; stream->rto = 1000; - stream->rto_timeout = get_milliseconds() + stream->rto; + stream->rto_timeout = uv_now(udx->loop) + stream->rto; stream->rack_timeout = 0; stream->rack_rtt_min = 0; @@ -1999,7 +1989,7 @@ udx_stream_check_timeouts (udx_stream_t *stream) { return 0; } - const uint64_t now = stream->inflight ? get_milliseconds() : 0; + const uint64_t now = stream->inflight ? uv_now(stream->udx->loop) : 0; if (stream->rack_timeout > 0 && now >= stream->rack_timeout) { rack_detect_loss(stream); @@ -2213,7 +2203,7 @@ udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t // if this is the first inflight packet, we should "restart" rto timer if (stream->inflight == 0) { - stream->rto_timeout = get_milliseconds() + stream->rto; + stream->rto_timeout = uv_now(stream->udx->loop) + stream->rto; } req->bytes_acked = 0; diff --git a/test/stream-multiple.c b/test/stream-multiple.c index 15315c69..03f98e31 100644 --- a/test/stream-multiple.c +++ b/test/stream-multiple.c @@ -111,7 +111,7 @@ main () { uv_run(&loop, UV_RUN_DEFAULT); for (int i = 0; i < NSTREAMS; i++) { - printf("%d: send_hash=%x receive_hash=%x sent_bytes=%lu recv_bytes=%lu\n", i, sender[i].write_hash, receiver[i].read_hash, NBYTES_TO_SEND, receiver[i].nbytes_read); + printf("%d: send_hash=%lx receive_hash=%lx sent_bytes=%d recv_bytes=%lu\n", i, sender[i].write_hash, receiver[i].read_hash, NBYTES_TO_SEND, receiver[i].nbytes_read); assert(sender[i].write_hash == receiver[i].read_hash); assert(receiver[i].nbytes_read == NBYTES_TO_SEND); } diff --git a/test/stream-relay.c b/test/stream-relay.c index a99bed34..a8e18599 100644 --- a/test/stream-relay.c +++ b/test/stream-relay.c @@ -53,7 +53,7 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) { assert(buf->len == read_len); if (nbytes_read == 0) { - printf("read_len=%d\n", read_len); + printf("read_len=%ld\n", read_len); assert(memcmp(buf->base, "hello", 5) == 0); } read_hash = hash(read_hash, buf->base, read_len); diff --git a/test/stream-write-read-perf.c b/test/stream-write-read-perf.c index bcacbdeb..5385f995 100644 --- a/test/stream-write-read-perf.c +++ b/test/stream-write-read-perf.c @@ -141,7 +141,7 @@ main () { // just for valgrind free(data); - printf("readhash=%x writehash=%x\n", read_hash, write_hash); + printf("readhash=%lx writehash=%lx\n", read_hash, write_hash); assert(read_hash == write_hash); return 0; }