diff --git a/build.zig b/build.zig index 44cc65d..4e00dd2 100644 --- a/build.zig +++ b/build.zig @@ -6,6 +6,7 @@ pub fn build(b: *std.Build) void { const mod = b.addModule("zentropy", .{ .root_source_file = b.path("src/root.zig"), .target = target, + .optimize = optimize, }); const exe = b.addExecutable(.{ @@ -36,6 +37,7 @@ pub fn build(b: *std.Build) void { const mod_tests = b.addTest(.{ .root_module = mod, }); + mod_tests.root_module.addImport("zentropy", mod); // A run step that will run the test executable. const run_mod_tests = b.addRunArtifact(mod_tests); diff --git a/src/root.zig b/src/root.zig index efe0ce9..834e659 100644 --- a/src/root.zig +++ b/src/root.zig @@ -1,7 +1,10 @@ const std = @import("std"); +pub const Client = @import("zentropy-client.zig").Client; + test { _ = @import("tests/KVStoreTests.zig"); _ = @import("tests/tcpTests.zig"); _ = @import("tests/unixSocketTest.zig"); + _ = @import("tests/clientTest.zig"); } diff --git a/src/tests/clientTest.zig b/src/tests/clientTest.zig new file mode 100644 index 0000000..7fa0e2c --- /dev/null +++ b/src/tests/clientTest.zig @@ -0,0 +1,156 @@ +const std = @import("std"); +const zentropy = @import("zentropy"); +const testing = std.testing; +const KVStore = @import("../KVStore.zig"); +const config = @import("../config.zig"); +const tcp = @import("../tcp.zig"); +const time = std.time; +const Thread = std.Thread; + +test "connect" { + const server = try Thread.spawn(.{}, startServer, .{}); + defer server.join(); + + try waitForServerStarted(); + + var client = try zentropy.Client.connect(.{}); + defer { + client.shutdown() catch unreachable; + client.deinit(); + waitForServerStopped() catch unreachable; + } +} + +test "set-get" { + const server = try Thread.spawn(.{}, startServer, .{}); + defer server.join(); + + try waitForServerStarted(); + + const ex1 = "example1"; + const ex2 = "example 2"; //with spaces + const val1 = "value1"; + const val2 = "value 2"; + + var client = try zentropy.Client.connect(.{}); + defer { + client.shutdown() catch unreachable; + client.deinit(); + waitForServerStopped() catch unreachable; + } + + try client.set(ex1, val1); + try client.set(ex2, val2); + + var value1_buf: [32]u8 = undefined; + var value2_buf: [32]u8 = undefined; + var value3_buf: [32]u8 = undefined; + + // simple .get + const value1 = try client.get(ex1, &value1_buf) orelse unreachable; + try testing.expectEqualSlices(u8, val1, value1); + const value2 = try client.get(ex2, &value2_buf) orelse unreachable; + try testing.expectEqualSlices(u8, val2, value2); + const value3 = try client.get("not existing", &value3_buf); + try testing.expect(value3 == null); + + // .getAlloc + const value1_alloc = try client.getAlloc(testing.allocator, ex1) orelse unreachable; + defer testing.allocator.free(value1_alloc); + try testing.expectEqualSlices(u8, val1, value1_alloc); + const value2_alloc = try client.getAlloc(testing.allocator, ex2) orelse unreachable; + defer testing.allocator.free(value2_alloc); + try testing.expectEqualSlices(u8, val2, value2_alloc); + const value3_alloc = try client.getAlloc(testing.allocator, "not existing"); + try testing.expect(value3_alloc == null); + + // .getSized + const value1_sized = try client.getSized(ex1, val1.len) orelse unreachable; + try testing.expectEqualSlices(u8, val1, &value1_sized); + const value2_sized = try client.getSized(ex2, val2.len) orelse unreachable; + try testing.expectEqualSlices(u8, val2, &value2_sized); + const value3_sized = try client.getSized("not existing", 5); + try testing.expect(value3_sized == null); +} + +test "exists" { + const server = try Thread.spawn(.{}, startServer, .{}); + defer server.join(); + + try waitForServerStarted(); + + var client = try zentropy.Client.connect(.{}); + defer { + client.shutdown() catch unreachable; + client.deinit(); + waitForServerStopped() catch unreachable; + } + + try testing.expect(!try client.exists("example1")); + try testing.expect(!try client.exists("example2")); + try client.set("example1", "value1"); + try testing.expect(try client.exists("example1")); + try testing.expect(!try client.exists("example2")); //double check + try client.set("example2", "value2"); + try testing.expect(try client.exists("example2")); +} + +test "delete" { + const server = try Thread.spawn(.{}, startServer, .{}); + defer server.join(); + + try waitForServerStarted(); + + var client = try zentropy.Client.connect(.{}); + defer { + client.shutdown() catch unreachable; + client.deinit(); + waitForServerStopped() catch unreachable; + } + + try testing.expect(!try client.delete("example1")); + try testing.expect(!try client.delete("example2")); + try client.set("example1", "value"); + try testing.expect(!try client.delete("example2")); + const val1 = try client.getAlloc(testing.allocator, "example1") orelse unreachable; + defer testing.allocator.free(val1); + try testing.expectEqualSlices(u8, "value", val1); + try testing.expect(try client.delete("example1")); + try testing.expect(!try client.delete("example1")); + const val2 = try client.getAlloc(testing.allocator, "example1"); + try testing.expect(val2 == null); +} + +fn waitForServerStarted() !void { + for (0..10000) |_| { + const client = zentropy.Client.connect(.{}) catch { + Thread.sleep(time.ns_per_us * 50); + continue; + }; + defer client.deinit(); + return; + } + return error.Timeout; +} + +fn waitForServerStopped() !void { + for (0..10000) |_| { + const client = zentropy.Client.connect(.{}) catch { + return; + }; + client.deinit(); + Thread.sleep(time.ns_per_us * 50); + continue; + } + + return error.Timeout; +} + +fn startServer() !void { + var stop_server = std.atomic.Value(bool).init(false); + var store = KVStore.init(testing.allocator); + defer store.deinit(); + var app_config = try config.load(testing.allocator); + defer app_config.deinit(testing.allocator); + try tcp.startServer(&store, &stop_server, &app_config); +} diff --git a/src/zentropy-client.zig b/src/zentropy-client.zig new file mode 100644 index 0000000..17d97fd --- /dev/null +++ b/src/zentropy-client.zig @@ -0,0 +1,203 @@ +const std = @import("std"); +const net = std.net; +const mem = std.mem; +const builtin = @import("builtin"); +const Io = std.Io; +const Reader = Io.Reader; +const Stream = net.Stream; +const Allocator = std.mem.Allocator; +const Config = @import("config.zig"); + +/// wrapper for connecting with Zentropy server +pub const Client = struct { + stream: net.Stream, + + const buffer_size = 4096; // this affects responses max size + + const ConnectError = error{ + BadPingResponse, + } || + net.TcpConnectToAddressError || + Io.Writer.Error || + Reader.Error || + net.IPv4ParseError; + + ///connects to the server with `config` parameters + pub fn connect(config: Config) ConnectError!Client { + const stream = try net.tcpConnectToAddress(.{ + .in = try .parse(config.bind_address, config.port), + }); + + //check for connectivity only in debug and release safe mode + switch (builtin.mode) { + .Debug, .ReleaseSafe => { + var buf: [32]u8 = undefined; + var writer = stream.writer(&buf); + try writer.interface.writeAll("PING"); + try writer.interface.flush(); + var reader = stream.reader(&buf); + const pong = try reader.file_reader.interface.takeArray(responses.pong.len); + + if (!mem.eql(u8, pong, responses.pong)) { + return error.BadPingResponse; + } + }, + else => {}, + } + + return Client{ + .stream = stream, + }; + } + + /// destroys connection + pub fn deinit(self: *const Client) void { + self.stream.close(); + } + + const SetError = error{ + ServerError, + } || + Io.Writer.Error || + Io.Reader.Error; + + pub fn set(self: *Client, key: []const u8, value: []const u8) SetError!void { + var buf: [buffer_size]u8 = undefined; + var writer = self.stream.writer(&buf); + + try writer.interface.print("SET \"{s}\" \"{s}\"", .{ key, value }); + try writer.interface.flush(); + + var reader = self.stream.reader(&buf); + const result = try reader.file_reader.interface.takeByte(); // reading only 1 byte for micro boost in performance + try reader.file_reader.interface.discardAll(responses.ok.len - 1); //discarding rest of the result + + if (result != '+') { + return error.ServerError; + } + } + /// returns result slice pointing in `out` + pub fn get(self: *Client, key: []const u8, out: []u8) !?[]u8 { + var buf: [buffer_size]u8 = undefined; + var writer = self.stream.writer(&buf); + + try writer.interface.print("GET \"{s}\"", .{key}); + try writer.interface.flush(); + + var reader = self.stream.reader(out); + + const peek = try reader.file_reader.interface.peek(responses.none.len); + if (mem.eql(u8, peek, responses.none)) { + try reader.file_reader.interface.discardAll(responses.none.len); + return null; + } + const slice = try reader.file_reader.interface.takeDelimiter('\r'); + try reader.file_reader.interface.discardAll(1); //discard "\n" + + return slice; + } + + /// caller owns memory + pub fn getAlloc(self: *Client, gpa: Allocator, key: []const u8) !?[]u8 { + var buf: [buffer_size]u8 = undefined; + var writer = self.stream.writer(&buf); + + try writer.interface.print("GET \"{s}\"", .{key}); + try writer.interface.flush(); + + var reader = self.stream.reader(&buf); + const peek = try reader.file_reader.interface.peek(responses.none.len); + if (mem.eql(u8, peek, responses.none)) { + try reader.file_reader.interface.discardAll(responses.none.len); + return null; + } + const slice = try reader.file_reader.interface.takeDelimiter('\r') orelse return null; + try reader.file_reader.interface.discardAll(1); //discard "\n" + + return try gpa.dupe(u8, slice); + } + + /// returns comptime known size string + pub fn getSized(self: *Client, key: []const u8, comptime size: comptime_int) !?[size]u8 { + var buf: [buffer_size]u8 = undefined; + var writer = self.stream.writer(&buf); + + try writer.interface.print("GET \"{s}\"", .{key}); + try writer.interface.flush(); + + var reader = self.stream.reader(&buf); + + const peek = try reader.file_reader.interface.peek(responses.none.len); + if (mem.eql(u8, peek, responses.none)) { + try reader.file_reader.interface.discardAll(responses.none.len); + return null; + } + + const result = try reader.file_reader.interface.takeArray(size); + var output: [result.len]u8 = undefined; + output = result.*; + _ = try reader.file_reader.interface.discardShort(2); //discard "\r\n" + + return output; + } + + /// checks if key exists + pub fn exists(self: *Client, key: []const u8) !bool { + var buf: [buffer_size]u8 = undefined; + var writer = self.stream.writer(&buf); + + try writer.interface.print("EXISTS \"{s}\"", .{key}); + try writer.interface.flush(); + + var reader = self.stream.reader(&buf); + const exists_byte = try reader.file_reader.interface.takeByte(); + try reader.file_reader.interface.discardAll(2); //discard "\r\n" + + return if (exists_byte == '1') true else false; + } + + /// deletes key, returns true if deleted + pub fn delete(self: *Client, key: []const u8) !bool { + var buf: [buffer_size]u8 = undefined; + var writer = self.stream.writer(&buf); + + try writer.interface.print("DELETE \"{s}\"", .{key}); + try writer.interface.flush(); + + var reader = self.stream.reader(&buf); + + const peek = try reader.file_reader.interface.peek(responses.ok.len); + if (mem.eql(u8, peek, responses.ok)) { + try reader.file_reader.interface.discardAll(responses.ok.len); + return true; + } + try reader.file_reader.interface.discardAll(responses.not_deleted.len); + return false; + } + + const ShutdownError = error{ + BadResponse, + } || Io.Writer.Error || Io.Reader.Error; + + /// shuts down server + pub fn shutdown(self: *Client) ShutdownError!void { + var buf: [32]u8 = undefined; + var writer = self.stream.writer(&buf); + try writer.interface.writeAll("SHUTDOWN"); + try writer.interface.flush(); + + var reader = self.stream.reader(&buf); + const result = try reader.file_reader.interface.takeArray(responses.shutdown.len); + if (!mem.eql(u8, responses.shutdown, result)) { + return error.BadResponse; + } + } +}; + +const responses = struct { + pub const shutdown = "===SHUTDOWN===\r\n"; + pub const ok = "+OK\r\n"; + pub const not_deleted = "-NOT DELETED\r\n"; + pub const none = "NONE\r\n"; + pub const pong = "+PONG\r\n"; +};