From 3f01df1c3d6c44d0ce2a3c7abf91cc9b8a74c787 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Sat, 11 Nov 2023 21:25:04 +0100 Subject: [PATCH] add client/server example --- CMakeLists.txt | 1 + examples/CMakeLists.txt | 14 +++++ examples/client.c | 90 +++++++++++++++++++++++++++ examples/server.c | 131 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 236 insertions(+) create mode 100644 examples/CMakeLists.txt create mode 100644 examples/client.c create mode 100644 examples/server.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 375db9c1..f56d0326 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -113,4 +113,5 @@ install(FILES include/udx.h DESTINATION include) if(PROJECT_IS_TOP_LEVEL) enable_testing() add_subdirectory(test) + add_subdirectory(examples) endif() diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 00000000..362516a5 --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,14 @@ +add_executable(server server.c) +add_executable(client client.c) + +target_link_libraries( + server + PRIVATE + udx_static +) + +target_link_libraries( + client + PRIVATE + udx_static +) diff --git a/examples/client.c b/examples/client.c new file mode 100644 index 00000000..050e52b2 --- /dev/null +++ b/examples/client.c @@ -0,0 +1,90 @@ +#include +#include +#include + +#include "../include/udx.h" +#ifdef _WIN32 +#include +#else +#include +#endif + +static uv_loop_t loop; +static udx_t udx; + +static udx_socket_t sock; +static udx_socket_send_t req; + +static udx_stream_t stream; +static struct sockaddr_in dest_addr; + +static size_t bytes_recv = 0; +static uint64_t started = 0; + +static uint32_t client_id = 1; +static uint32_t server_id = 2; + +static uv_timer_t timer; + +static uint64_t +get_milliseconds () { + return uv_hrtime() / 1000000; +} + +static void +on_uv_interval (uv_timer_t *handle) { + printf("received %zu bytes in %llu ms\n", bytes_recv, get_milliseconds() - started); +} + +static void +on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) { + if (started == 0) { + started = get_milliseconds(); + uv_timer_init(&loop, &timer); + uv_timer_start(&timer, on_uv_interval, 5000, 5000); + } + + if (read_len < 0) { + printf("received %zu bytes in %llu ms\n", bytes_recv, get_milliseconds() - started); + printf("stream is done!\n"); + exit(0); + } + + bytes_recv += read_len; +} + +static void +on_send (udx_socket_send_t *r, int status) { + udx_stream_init(&udx, &stream, client_id, NULL); + udx_stream_connect(&stream, &sock, server_id, (struct sockaddr *) &dest_addr); + udx_stream_read_start(&stream, on_read); +} + +int +main (int argc, char **argv) { + if (argc < 2) return 1; + + uv_ip4_addr(argv[1], 18081, &dest_addr); + + uv_loop_init(&loop); + + udx_init(&loop, &udx); + + udx_socket_init(&udx, &sock); + + struct sockaddr_in addr; + uv_ip4_addr("0.0.0.0", 18082, &addr); + + udx_socket_bind(&sock, (struct sockaddr *) &addr, 0); + + client_id = (uint32_t) getpid(); + server_id = client_id + 1; + + uint32_t ids[2] = { client_id, server_id }; + + uv_buf_t buf = uv_buf_init((char *) ids, 8); + udx_socket_send(&req, &sock, &buf, 1, (struct sockaddr *) &dest_addr, on_send); + + uv_run(&loop, UV_RUN_DEFAULT); + return 0; +} diff --git a/examples/server.c b/examples/server.c new file mode 100644 index 00000000..21a5cf29 --- /dev/null +++ b/examples/server.c @@ -0,0 +1,131 @@ +#include +#include + +#include "../include/udx.h" + +#define PUMP_BYTES (1024 * 1024 * 1024) + +static uv_loop_t loop; +static udx_t udx; + +static udx_socket_t sock; +static udx_stream_t stream; + +static bool stream_is_active = false; +static bool stream_is_queued = false; +static uint32_t client_id = 0; +static uint32_t server_id = 0; +static size_t bytes_sent = 0; +static struct sockaddr_in dest_addr; + +static uv_buf_t chunk; +static uv_buf_t empty = { .base = NULL, .len = 0 }; + +static bool printed_warning = false; + +static void +pump_stream(); + +static void +on_close (udx_stream_t *stream, int status) { + printf("stream closed with status %i\n", status); + + stream_is_active = false; + bytes_sent = 0; + + if (stream_is_queued) { + stream_is_queued = false; + pump_stream(); + } +} + +static void +on_ack (udx_stream_write_t *req, int status, int unordered) { + free(req); +} + +static void +on_ack_end (udx_stream_write_t *req, int status, int unordered) { + udx_stream_destroy(req->handle); + free(req); +} + +static void +pump_writes () { + while (bytes_sent < PUMP_BYTES) { + udx_stream_write_t *req = malloc(sizeof(udx_stream_write_t)); + bytes_sent += chunk.len; + + if (udx_stream_write(req, &stream, &chunk, 1, on_ack)) continue; + + udx_stream_write_resume(&stream, pump_writes); + return; + } + + udx_stream_write_t *req = malloc(sizeof(udx_stream_write_t)); + udx_stream_write_end(req, &stream, &empty, 1, on_ack_end); +} + +static void +pump_stream () { + stream_is_active = true; + + char dst_ip[20]; + uv_ip4_name(&dest_addr, dst_ip, 20); + + printf("pumping %d bytes to stream to %s...\n", PUMP_BYTES, dst_ip); + + udx_stream_init(&udx, &stream, server_id, on_close); + udx_stream_connect(&stream, &sock, client_id, (struct sockaddr *) &dest_addr); + + pump_writes(); +} + +static void +on_recv (udx_socket_t *handle, ssize_t read_len, const uv_buf_t *buf, const struct sockaddr *from) { + if (read_len != 8) { + if (!printed_warning) { + printed_warning = true; + printf("warning: unknown packet received (%zd bytes)\n", read_len); + } + return; + } + + printf("client requested streams...\n"); + + uint32_t *ids = (uint32_t *) buf->base; + + client_id = *(ids++); + server_id = *(ids++); + dest_addr = *((struct sockaddr_in *) from); + + if (stream_is_active) { + stream_is_queued = true; + udx_stream_destroy(&stream); + return; + } + + pump_stream(); +} + +int +main (int argc, char **argv) { + uv_loop_init(&loop); + + chunk.len = 16384; + chunk.base = calloc(1, chunk.len); + + udx_init(&loop, &udx); + + udx_socket_init(&udx, &sock); + + struct sockaddr_in addr; + uv_ip4_addr("0.0.0.0", 18081, &addr); + + udx_socket_bind(&sock, (struct sockaddr *) &addr, 0); + + udx_socket_recv_start(&sock, on_recv); + + uv_run(&loop, UV_RUN_DEFAULT); + return 0; +}