Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub fn build(b: *std.Build) void {
const mod = b.addModule("zentropy", .{
.root_source_file = b.path("src/root.zig"),
.target = target,
.optimize = optimize,
});

const exe = b.addExecutable(.{
Expand Down Expand Up @@ -36,6 +37,7 @@ pub fn build(b: *std.Build) void {
const mod_tests = b.addTest(.{
.root_module = mod,
});
mod_tests.root_module.addImport("zentropy", mod);

// A run step that will run the test executable.
const run_mod_tests = b.addRunArtifact(mod_tests);
Expand Down
3 changes: 3 additions & 0 deletions src/root.zig
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
const std = @import("std");

pub const Client = @import("zentropy-client.zig").Client;

test {
_ = @import("tests/KVStoreTests.zig");
_ = @import("tests/tcpTests.zig");
_ = @import("tests/unixSocketTest.zig");
_ = @import("tests/clientTest.zig");
}
156 changes: 156 additions & 0 deletions src/tests/clientTest.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
const std = @import("std");
const zentropy = @import("zentropy");
const testing = std.testing;
const KVStore = @import("../KVStore.zig");
const config = @import("../config.zig");
const tcp = @import("../tcp.zig");
const time = std.time;
const Thread = std.Thread;

test "connect" {
const server = try Thread.spawn(.{}, startServer, .{});
defer server.join();

try waitForServerStarted();

var client = try zentropy.Client.connect(.{});
defer {
client.shutdown() catch unreachable;
client.deinit();
waitForServerStopped() catch unreachable;
}
}

test "set-get" {
const server = try Thread.spawn(.{}, startServer, .{});
defer server.join();

try waitForServerStarted();

const ex1 = "example1";
const ex2 = "example 2"; //with spaces
const val1 = "value1";
const val2 = "value 2";

var client = try zentropy.Client.connect(.{});
defer {
client.shutdown() catch unreachable;
client.deinit();
waitForServerStopped() catch unreachable;
}

try client.set(ex1, val1);
try client.set(ex2, val2);

var value1_buf: [32]u8 = undefined;
var value2_buf: [32]u8 = undefined;
var value3_buf: [32]u8 = undefined;

// simple .get
const value1 = try client.get(ex1, &value1_buf) orelse unreachable;
try testing.expectEqualSlices(u8, val1, value1);
const value2 = try client.get(ex2, &value2_buf) orelse unreachable;
try testing.expectEqualSlices(u8, val2, value2);
const value3 = try client.get("not existing", &value3_buf);
try testing.expect(value3 == null);

// .getAlloc
const value1_alloc = try client.getAlloc(testing.allocator, ex1) orelse unreachable;
defer testing.allocator.free(value1_alloc);
try testing.expectEqualSlices(u8, val1, value1_alloc);
const value2_alloc = try client.getAlloc(testing.allocator, ex2) orelse unreachable;
defer testing.allocator.free(value2_alloc);
try testing.expectEqualSlices(u8, val2, value2_alloc);
const value3_alloc = try client.getAlloc(testing.allocator, "not existing");
try testing.expect(value3_alloc == null);

// .getSized
const value1_sized = try client.getSized(ex1, val1.len) orelse unreachable;
try testing.expectEqualSlices(u8, val1, &value1_sized);
const value2_sized = try client.getSized(ex2, val2.len) orelse unreachable;
try testing.expectEqualSlices(u8, val2, &value2_sized);
const value3_sized = try client.getSized("not existing", 5);
try testing.expect(value3_sized == null);
}

test "exists" {
const server = try Thread.spawn(.{}, startServer, .{});
defer server.join();

try waitForServerStarted();

var client = try zentropy.Client.connect(.{});
defer {
client.shutdown() catch unreachable;
client.deinit();
waitForServerStopped() catch unreachable;
}

try testing.expect(!try client.exists("example1"));
try testing.expect(!try client.exists("example2"));
try client.set("example1", "value1");
try testing.expect(try client.exists("example1"));
try testing.expect(!try client.exists("example2")); //double check
try client.set("example2", "value2");
try testing.expect(try client.exists("example2"));
}

test "delete" {
const server = try Thread.spawn(.{}, startServer, .{});
defer server.join();

try waitForServerStarted();

var client = try zentropy.Client.connect(.{});
defer {
client.shutdown() catch unreachable;
client.deinit();
waitForServerStopped() catch unreachable;
}

try testing.expect(!try client.delete("example1"));
try testing.expect(!try client.delete("example2"));
try client.set("example1", "value");
try testing.expect(!try client.delete("example2"));
const val1 = try client.getAlloc(testing.allocator, "example1") orelse unreachable;
defer testing.allocator.free(val1);
try testing.expectEqualSlices(u8, "value", val1);
try testing.expect(try client.delete("example1"));
try testing.expect(!try client.delete("example1"));
const val2 = try client.getAlloc(testing.allocator, "example1");
try testing.expect(val2 == null);
}

fn waitForServerStarted() !void {
for (0..10000) |_| {
const client = zentropy.Client.connect(.{}) catch {
Thread.sleep(time.ns_per_us * 50);
continue;
};
defer client.deinit();
return;
}
return error.Timeout;
}

fn waitForServerStopped() !void {
for (0..10000) |_| {
const client = zentropy.Client.connect(.{}) catch {
return;
};
client.deinit();
Thread.sleep(time.ns_per_us * 50);
continue;
}

return error.Timeout;
}

fn startServer() !void {
var stop_server = std.atomic.Value(bool).init(false);
var store = KVStore.init(testing.allocator);
defer store.deinit();
var app_config = try config.load(testing.allocator);
defer app_config.deinit(testing.allocator);
try tcp.startServer(&store, &stop_server, &app_config);
}
203 changes: 203 additions & 0 deletions src/zentropy-client.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
const std = @import("std");
const net = std.net;
const mem = std.mem;
const builtin = @import("builtin");
const Io = std.Io;
const Reader = Io.Reader;
const Stream = net.Stream;
const Allocator = std.mem.Allocator;
const Config = @import("config.zig");

/// wrapper for connecting with Zentropy server
pub const Client = struct {
stream: net.Stream,

const buffer_size = 4096; // this affects responses max size

const ConnectError = error{
BadPingResponse,
} ||
net.TcpConnectToAddressError ||
Io.Writer.Error ||
Reader.Error ||
net.IPv4ParseError;

///connects to the server with `config` parameters
pub fn connect(config: Config) ConnectError!Client {
const stream = try net.tcpConnectToAddress(.{
.in = try .parse(config.bind_address, config.port),
});

//check for connectivity only in debug and release safe mode
switch (builtin.mode) {
.Debug, .ReleaseSafe => {
var buf: [32]u8 = undefined;
var writer = stream.writer(&buf);
try writer.interface.writeAll("PING");
try writer.interface.flush();
var reader = stream.reader(&buf);
const pong = try reader.file_reader.interface.takeArray(responses.pong.len);

if (!mem.eql(u8, pong, responses.pong)) {
return error.BadPingResponse;
}
},
else => {},
}

return Client{
.stream = stream,
};
}

/// destroys connection
pub fn deinit(self: *const Client) void {
self.stream.close();
}

const SetError = error{
ServerError,
} ||
Io.Writer.Error ||
Io.Reader.Error;

pub fn set(self: *Client, key: []const u8, value: []const u8) SetError!void {
var buf: [buffer_size]u8 = undefined;
var writer = self.stream.writer(&buf);

try writer.interface.print("SET \"{s}\" \"{s}\"", .{ key, value });
try writer.interface.flush();

var reader = self.stream.reader(&buf);
const result = try reader.file_reader.interface.takeByte(); // reading only 1 byte for micro boost in performance
try reader.file_reader.interface.discardAll(responses.ok.len - 1); //discarding rest of the result

if (result != '+') {
return error.ServerError;
}
}
/// returns result slice pointing in `out`
pub fn get(self: *Client, key: []const u8, out: []u8) !?[]u8 {
var buf: [buffer_size]u8 = undefined;
var writer = self.stream.writer(&buf);

try writer.interface.print("GET \"{s}\"", .{key});
try writer.interface.flush();

var reader = self.stream.reader(out);

const peek = try reader.file_reader.interface.peek(responses.none.len);
if (mem.eql(u8, peek, responses.none)) {
try reader.file_reader.interface.discardAll(responses.none.len);
return null;
}
const slice = try reader.file_reader.interface.takeDelimiter('\r');
try reader.file_reader.interface.discardAll(1); //discard "\n"

return slice;
}

/// caller owns memory
pub fn getAlloc(self: *Client, gpa: Allocator, key: []const u8) !?[]u8 {
var buf: [buffer_size]u8 = undefined;
var writer = self.stream.writer(&buf);

try writer.interface.print("GET \"{s}\"", .{key});
try writer.interface.flush();

var reader = self.stream.reader(&buf);
const peek = try reader.file_reader.interface.peek(responses.none.len);
if (mem.eql(u8, peek, responses.none)) {
try reader.file_reader.interface.discardAll(responses.none.len);
return null;
}
const slice = try reader.file_reader.interface.takeDelimiter('\r') orelse return null;
try reader.file_reader.interface.discardAll(1); //discard "\n"

return try gpa.dupe(u8, slice);
}

/// returns comptime known size string
pub fn getSized(self: *Client, key: []const u8, comptime size: comptime_int) !?[size]u8 {
var buf: [buffer_size]u8 = undefined;
var writer = self.stream.writer(&buf);

try writer.interface.print("GET \"{s}\"", .{key});
try writer.interface.flush();

var reader = self.stream.reader(&buf);

const peek = try reader.file_reader.interface.peek(responses.none.len);
if (mem.eql(u8, peek, responses.none)) {
try reader.file_reader.interface.discardAll(responses.none.len);
return null;
}

const result = try reader.file_reader.interface.takeArray(size);
var output: [result.len]u8 = undefined;
output = result.*;
_ = try reader.file_reader.interface.discardShort(2); //discard "\r\n"

return output;
}

/// checks if key exists
pub fn exists(self: *Client, key: []const u8) !bool {
var buf: [buffer_size]u8 = undefined;
var writer = self.stream.writer(&buf);

try writer.interface.print("EXISTS \"{s}\"", .{key});
try writer.interface.flush();

var reader = self.stream.reader(&buf);
const exists_byte = try reader.file_reader.interface.takeByte();
try reader.file_reader.interface.discardAll(2); //discard "\r\n"

return if (exists_byte == '1') true else false;
}

/// deletes key, returns true if deleted
pub fn delete(self: *Client, key: []const u8) !bool {
var buf: [buffer_size]u8 = undefined;
var writer = self.stream.writer(&buf);

try writer.interface.print("DELETE \"{s}\"", .{key});
try writer.interface.flush();

var reader = self.stream.reader(&buf);

const peek = try reader.file_reader.interface.peek(responses.ok.len);
if (mem.eql(u8, peek, responses.ok)) {
try reader.file_reader.interface.discardAll(responses.ok.len);
return true;
}
try reader.file_reader.interface.discardAll(responses.not_deleted.len);
return false;
}

const ShutdownError = error{
BadResponse,
} || Io.Writer.Error || Io.Reader.Error;

/// shuts down server
pub fn shutdown(self: *Client) ShutdownError!void {
var buf: [32]u8 = undefined;
var writer = self.stream.writer(&buf);
try writer.interface.writeAll("SHUTDOWN");
try writer.interface.flush();

var reader = self.stream.reader(&buf);
const result = try reader.file_reader.interface.takeArray(responses.shutdown.len);
if (!mem.eql(u8, responses.shutdown, result)) {
return error.BadResponse;
}
}
};

const responses = struct {
pub const shutdown = "===SHUTDOWN===\r\n";
pub const ok = "+OK\r\n";
pub const not_deleted = "-NOT DELETED\r\n";
pub const none = "NONE\r\n";
pub const pong = "+PONG\r\n";
};