Skip to content

Commit

Permalink
Add bind() and connect() to socket
Browse files Browse the repository at this point in the history
  • Loading branch information
fkollmann committed Feb 17, 2024
1 parent 6590f4d commit b34950a
Showing 1 changed file with 86 additions and 17 deletions.
103 changes: 86 additions & 17 deletions src/classes/zsocket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,50 @@ const std = @import("std");

const c = @cImport({
@cInclude("czmq.h");
@cInclude("string.h");
});

pub const ZSocketType = enum(c_int) {
Pair = c.ZMQ_PAIR,
};

//
pub const ZSocket = struct {
allocator: std.mem.Allocator,
selfArena: std.heap.ArenaAllocator,
socket: *c.zsock_t,
type: []const u8,
endpoint: ?[]const u8,

pub fn init(allocator: std.mem.Allocator, socketType: ZSocketType) !ZSocket {
pub fn init(allocator: std.mem.Allocator, socketType: ZSocketType) !*ZSocket {
// try creating the socket, early
var s = c.zsock_new(@intFromEnum(socketType)) orelse return error.SocketCreateFailed;
errdefer c.zsock_destroy(&s);
errdefer {
var ss: ?*c.zsock_t = s;
c.zsock_destroy(&ss);
}

// create the managed object
var selfArena = std.heap.ArenaAllocator.init(allocator);
errdefer selfArena.deinit();
const selfAllocator = selfArena.allocator();

var r = try selfAllocator.create(ZSocket);
r.allocator = allocator;
r.selfArena = selfArena;
r.socket = s;
r.endpoint = null;

return ZSocket{
.allocator = allocator,
.socket = s,
};
// get the socket type as string
const typeStrZ = std.mem.span(c.zsock_type_str(s));

r.type = try selfAllocator.dupe(u8, typeStrZ[0..typeStrZ.len]); // copy to managed memory

// done
return r;
}

// Bind a socket to a formatted endpoint. For tcp:// endpoints, supports
// Bind a socket to a endpoint. For tcp:// endpoints, supports
// ephemeral ports, if you specify the port number as "*". By default
// zsock uses the IANA designated range from C000 (49152) to FFFF (65535).
// To override this range, follow the "*" with "[first-last]". Either or
Expand All @@ -34,40 +57,86 @@ pub const ZSocket = struct {
// tcp://127.0.0.1:! bind to random port from C000 to FFFF
// tcp://127.0.0.1:*[60000-] bind to first free port from 60000 up
// tcp://127.0.0.1:![-60000] bind to random port from C000 to 60000
// tcp://127.0.0.1:![55000-55999]
// bind to random port from 55000 to 55999
// tcp://127.0.0.1:![55000-55999] bind to random port from 55000 to 55999
//
// On success, returns the actual port number used, for tcp:// endpoints,
// and 0 for other transports. Note that when using
// ephemeral ports, a port may be reused by different services without
// clients being aware. Protocols that run on ephemeral ports should take
// this into account.
pub fn bind(self: *ZSocket, endpoint: []const u8) !u16 {
const endpointZ = try self.allocator.dupeZ(u8, endpoint);
defer self.allocator.free(endpointZ);
pub fn bind(self: *ZSocket, ep: []const u8) !u16 {
const epZ = try self.allocator.dupeZ(u8, ep);
defer self.allocator.free(epZ);

const result = c.zsock_bind(self.socket, "%s", &epZ[0]);

// TODO: better: `c.zsock_bind(self.socket, "%s", endpointZ)` for safety
const result = c.zsock_bind(self.socket, endpointZ);
if (result < 0) {
return error.SocketBindFailed;
}

// retrieve endpoint value
const selfAllocator = self.selfArena.allocator();

if (self.endpoint) |e| {
selfAllocator.free(e);
}

self.endpoint = try selfAllocator.dupe(u8, ep); // copy to managed memory

// done
return @intCast(result);
}

// Connect a socket to an endpoint
pub fn connect(self: *ZSocket, ep: []const u8) !void {
const epZ = try self.allocator.dupeZ(u8, ep);
defer self.allocator.free(epZ);

const result = c.zsock_connect(self.socket, "%s", &epZ[0]);
if (result < 0) {
return error.SocketConnectFailed;
}

// retrieve endpoint value
const selfAllocator = self.selfArena.allocator();

if (self.endpoint) |e| {
selfAllocator.free(e);
}

self.endpoint = try selfAllocator.dupe(u8, ep); // copy to managed memory
}

// Destroy the socket and clean up
pub fn deinit(self: *ZSocket) void {
var socket: ?*c.zsock_t = self.socket;

c.zsock_destroy(&socket);

// clean-up arena
var arena = self.selfArena; // prevent seg fault
arena.deinit();
}
};

test "ZSocket - bind and connect" {
const allocator = std.testing.allocator;

var s = try ZSocket.init(allocator, ZSocketType.Pair);
defer s.deinit();
// bind the incoming socket
var incoming = try ZSocket.init(allocator, ZSocketType.Pair);
defer incoming.deinit();

const port = try s.bind("tcp://127.0.0.1:!");
const port = try incoming.bind("tcp://127.0.0.1:!");
try std.testing.expect(port >= 0xC000);
try std.testing.expect(incoming.endpoint != null);

// connect to the socket
var outgoing = try ZSocket.init(allocator, ZSocketType.Pair);
defer outgoing.deinit();

const endpoint = try std.fmt.allocPrint(allocator, "tcp://127.0.0.1:{}", .{port});
defer allocator.free(endpoint);

try outgoing.connect(endpoint);
try std.testing.expect(outgoing.endpoint != null);
}

0 comments on commit b34950a

Please sign in to comment.