Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add client/server example #170

Merged
merged 1 commit into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,5 @@ install(FILES include/udx.h DESTINATION include)
if(PROJECT_IS_TOP_LEVEL)
enable_testing()
add_subdirectory(test)
add_subdirectory(examples)
endif()
14 changes: 14 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
add_executable(server server.c)
add_executable(client client.c)

target_link_libraries(
server
PRIVATE
udx_static
)

target_link_libraries(
client
PRIVATE
udx_static
)
90 changes: 90 additions & 0 deletions examples/client.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#include <stdio.h>
#include <uv.h>
#include <stdlib.h>

#include "../include/udx.h"
#ifdef _WIN32
#include <process.h>
#else
#include <unistd.h>
#endif

static uv_loop_t loop;
static udx_t udx;

static udx_socket_t sock;
static udx_socket_send_t req;

static udx_stream_t stream;
static struct sockaddr_in dest_addr;

static size_t bytes_recv = 0;
static uint64_t started = 0;

static uint32_t client_id = 1;
static uint32_t server_id = 2;

static uv_timer_t timer;

static uint64_t
get_milliseconds () {
return uv_hrtime() / 1000000;
}

static void
on_uv_interval (uv_timer_t *handle) {
printf("received %zu bytes in %llu ms\n", bytes_recv, get_milliseconds() - started);
}

static void
on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
if (started == 0) {
started = get_milliseconds();
uv_timer_init(&loop, &timer);
uv_timer_start(&timer, on_uv_interval, 5000, 5000);
}

if (read_len < 0) {
printf("received %zu bytes in %llu ms\n", bytes_recv, get_milliseconds() - started);
printf("stream is done!\n");
exit(0);
}

bytes_recv += read_len;
}

static void
on_send (udx_socket_send_t *r, int status) {
udx_stream_init(&udx, &stream, client_id, NULL);
udx_stream_connect(&stream, &sock, server_id, (struct sockaddr *) &dest_addr);
udx_stream_read_start(&stream, on_read);
}

int
main (int argc, char **argv) {
if (argc < 2) return 1;

uv_ip4_addr(argv[1], 18081, &dest_addr);

uv_loop_init(&loop);

udx_init(&loop, &udx);

udx_socket_init(&udx, &sock);

struct sockaddr_in addr;
uv_ip4_addr("0.0.0.0", 18082, &addr);

udx_socket_bind(&sock, (struct sockaddr *) &addr, 0);

client_id = (uint32_t) getpid();
server_id = client_id + 1;

uint32_t ids[2] = { client_id, server_id };

uv_buf_t buf = uv_buf_init((char *) ids, 8);
udx_socket_send(&req, &sock, &buf, 1, (struct sockaddr *) &dest_addr, on_send);

uv_run(&loop, UV_RUN_DEFAULT);
return 0;
}
131 changes: 131 additions & 0 deletions examples/server.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#include <stdio.h>
#include <stdlib.h>

#include "../include/udx.h"

#define PUMP_BYTES (1024 * 1024 * 1024)

static uv_loop_t loop;
static udx_t udx;

static udx_socket_t sock;
static udx_stream_t stream;

static bool stream_is_active = false;
static bool stream_is_queued = false;
static uint32_t client_id = 0;
static uint32_t server_id = 0;
static size_t bytes_sent = 0;
static struct sockaddr_in dest_addr;

static uv_buf_t chunk;
static uv_buf_t empty = { .base = NULL, .len = 0 };

static bool printed_warning = false;

static void
pump_stream();

static void
on_close (udx_stream_t *stream, int status) {
printf("stream closed with status %i\n", status);

stream_is_active = false;
bytes_sent = 0;

if (stream_is_queued) {
stream_is_queued = false;
pump_stream();
}
}

static void
on_ack (udx_stream_write_t *req, int status, int unordered) {
free(req);
}

static void
on_ack_end (udx_stream_write_t *req, int status, int unordered) {
udx_stream_destroy(req->handle);
free(req);
}

static void
pump_writes () {
while (bytes_sent < PUMP_BYTES) {
udx_stream_write_t *req = malloc(sizeof(udx_stream_write_t));
bytes_sent += chunk.len;

if (udx_stream_write(req, &stream, &chunk, 1, on_ack)) continue;

udx_stream_write_resume(&stream, pump_writes);
return;
}

udx_stream_write_t *req = malloc(sizeof(udx_stream_write_t));
udx_stream_write_end(req, &stream, &empty, 1, on_ack_end);
}

static void
pump_stream () {
stream_is_active = true;

char dst_ip[20];
uv_ip4_name(&dest_addr, dst_ip, 20);

printf("pumping %d bytes to stream to %s...\n", PUMP_BYTES, dst_ip);

udx_stream_init(&udx, &stream, server_id, on_close);
udx_stream_connect(&stream, &sock, client_id, (struct sockaddr *) &dest_addr);

pump_writes();
}

static void
on_recv (udx_socket_t *handle, ssize_t read_len, const uv_buf_t *buf, const struct sockaddr *from) {
if (read_len != 8) {
if (!printed_warning) {
printed_warning = true;
printf("warning: unknown packet received (%zd bytes)\n", read_len);
}
return;
}

printf("client requested streams...\n");

uint32_t *ids = (uint32_t *) buf->base;

client_id = *(ids++);
server_id = *(ids++);
dest_addr = *((struct sockaddr_in *) from);

if (stream_is_active) {
stream_is_queued = true;
udx_stream_destroy(&stream);
return;
}

pump_stream();
}

int
main (int argc, char **argv) {
uv_loop_init(&loop);

chunk.len = 16384;
chunk.base = calloc(1, chunk.len);

udx_init(&loop, &udx);

udx_socket_init(&udx, &sock);

struct sockaddr_in addr;
uv_ip4_addr("0.0.0.0", 18081, &addr);

udx_socket_bind(&sock, (struct sockaddr *) &addr, 0);

udx_socket_recv_start(&sock, on_recv);

uv_run(&loop, UV_RUN_DEFAULT);
return 0;
}
Loading