Skip to content

Commit

Permalink
Add handling of ownership to frame
Browse files Browse the repository at this point in the history
  • Loading branch information
fkollmann committed Feb 17, 2024
1 parent 6c25142 commit 8672319
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 21 deletions.
60 changes: 48 additions & 12 deletions src/classes/zframe.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const c = @import("../czmq.zig").c;

pub const ZFrame = struct {
frame: *c.zframe_t,
frameOwned: bool = true,

/// Creates a frame based on the provided data.
///
Expand All @@ -24,7 +25,9 @@ pub const ZFrame = struct {
///
/// Example:
/// allocator.dupe(u8, frame.data());
pub fn data(self: *const ZFrame) []const u8 {
pub fn data(self: *const ZFrame) ![]const u8 {
if (!self.frameOwned) return error.FrameOwnershipLost;

const s = c.zframe_size(self.frame);
if (s <= 0) {
return "";
Expand All @@ -36,22 +39,39 @@ pub const ZFrame = struct {
}

/// Retrieves a size of data within the frame.
pub fn size(self: *const ZFrame) usize {
pub fn size(self: *const ZFrame) !usize {
if (!self.frameOwned) return error.FrameOwnershipLost;

return c.zframe_size(self.frame);
}

/// Creates a copy of the frame.
pub fn clone(self: *ZFrame) !ZFrame {
if (!self.frameOwned) return error.FrameOwnershipLost;

return ZFrame{
.frame = c.zframe_dup(self.frame) orelse return error.FrameAllocFailed,
};
}

/// When *receiving* frames of message, this function returns
/// true, if more frames are available to be received,
/// as part of a multi-part message.
pub fn hasMore(self: *ZFrame) !bool {
if (!self.frameOwned) return error.FrameOwnershipLost;

return c.zframe_more(self.frame) != 0;
}

/// Destroys the frame and performs clean up.
pub fn deinit(self: *ZFrame) void {
var d: ?*c.zframe_t = self.frame;
if (self.frameOwned) {
var d: ?*c.zframe_t = self.frame;

c.zframe_destroy(&d);
c.zframe_destroy(&d);

self.frameOwned = false;
}
}
};

Expand All @@ -61,28 +81,44 @@ test "ZFrame - create and destroy" {
var data = try ZFrame.init(msg);
defer data.deinit();

try std.testing.expectEqual(msg.len, data.size());
try std.testing.expectEqualStrings(msg, data.data());
try std.testing.expectEqual(msg.len, try data.size());
try std.testing.expectEqualStrings(msg, try data.data());

// create and test a clone
var clone = try data.clone();
defer clone.deinit();

try std.testing.expectEqual(msg.len, clone.size());
try std.testing.expectEqualStrings(msg, clone.data());
try std.testing.expectEqual(msg.len, try clone.size());
try std.testing.expectEqualStrings(msg, try clone.data());
}

test "ZFrame - empty and destroy" {
var data = try ZFrame.initEmpty();
defer data.deinit();

try std.testing.expectEqual(@as(usize, 0), data.size());
try std.testing.expectEqualStrings("", data.data());
try std.testing.expectEqual(@as(usize, 0), try data.size());
try std.testing.expectEqualStrings("", try data.data());

// create and test a clone
var clone = try data.clone();
defer clone.deinit();

try std.testing.expectEqual(@as(usize, 0), clone.size());
try std.testing.expectEqualStrings("", clone.data());
try std.testing.expectEqual(@as(usize, 0), try clone.size());
try std.testing.expectEqualStrings("", try clone.data());
}

test "ZFrame - ownership lost" {
var data = try ZFrame.initEmpty();
defer data.deinit();

try std.testing.expectEqual(true, data.frameOwned);
try std.testing.expectEqual(@as(usize, 0), try data.size());

// force loosing ownership
data.frameOwned = false;

try std.testing.expectError(error.FrameOwnershipLost, data.size());
try std.testing.expectError(error.FrameOwnershipLost, data.data());
try std.testing.expectError(error.FrameOwnershipLost, data.clone());
try std.testing.expectError(error.FrameOwnershipLost, data.hasMore());
}
59 changes: 50 additions & 9 deletions src/classes/zsocket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -195,33 +195,58 @@ pub const ZSocket = struct {

/// Send a frame to a socket.
///
/// Note: The frame will lose ownership and become invalid, unless `options.reuse` is true.
///
/// Example:
/// var frame = try ZFrame.init(data);
/// defer frame.deinit();
///
/// try socket.send(&frame, .{});
pub fn send(self: *ZSocket, frame: *const zframe.ZFrame, options: struct {
pub fn send(self: *ZSocket, frame: *zframe.ZFrame, options: struct {
/// Indicates that this frame is part of a multi-part message
/// and more frames will be sent.
///
/// On the receiving side, will cause `frame.hasMore()` to return true.
more: bool = false,

/// Indicates that the provided frame shall not loose
/// ownership over its data.
///
/// Important: This requires the frame data to live as long
/// as it takes for the frame to be sent.
/// Consider setting `dontwait` to true.
reuse: bool = false,

/// Do not wait for the frame to be sent, but return immediately.
dontwait: bool = false,
}) !void {
var f: ?*c.zframe_t = frame.frame;

var flags: c_int = c.ZFRAME_REUSE;
var flags: c_int = 0;
if (options.more) flags |= c.ZFRAME_MORE;
if (options.reuse) flags |= c.ZFRAME_REUSE;
if (options.dontwait) flags |= c.ZFRAME_DONTWAIT;

const result = c.zframe_send(&f, self.socket, flags);
if (result < 0) {
return error.SendFrameFailed;
}

// frame lost ownership of internal object
if (!options.reuse) {
frame.frameOwned = false;
}
}

/// Receive frame from socket, returns zframe_t object or NULL if the recv
/// was interrupted. Does a blocking recv, if you want to not block then use
/// Receive a single frame of a message from the socket.
/// Does a blocking recv, if you want to not block then use
/// zpoller or zloop.
///
/// The caller must invoke `deinit()` on the returned frame.
///
/// If receiving a multi-part message, then `frame.hasMore()` will return true
/// and the another receive call should be invoked.
///
/// Example:
/// var frame = try socket.receive();
/// defer frame.deinit();
Expand Down Expand Up @@ -274,15 +299,31 @@ test "ZSocket - bind and connect" {

var outgoingData = try zframe.ZFrame.init(msg);
defer outgoingData.deinit();
try std.testing.expectEqual(msg.len, outgoingData.size());
try std.testing.expectEqualStrings(msg, outgoingData.data());
try std.testing.expectEqual(true, outgoingData.frameOwned);
try std.testing.expectEqual(msg.len, try outgoingData.size());
try std.testing.expectEqualStrings(msg, try outgoingData.data());

// send the first frame
try outgoing.send(&outgoingData, .{ .dontwait = true, .reuse = true, .more = true });
try std.testing.expectEqual(true, outgoingData.frameOwned);

// send the second frame (reusing the previous one)
try outgoing.send(&outgoingData, .{ .dontwait = true });
try std.testing.expectEqual(false, outgoingData.frameOwned);

// receive the message
// receive the first frame of the message
var incomingData = try incoming.receive();
defer incomingData.deinit();

try std.testing.expectEqual(msg.len, incomingData.size());
try std.testing.expectEqualStrings(msg, incomingData.data());
try std.testing.expectEqual(msg.len, try incomingData.size());
try std.testing.expectEqualStrings(msg, try incomingData.data());
try std.testing.expectEqual(true, try incomingData.hasMore());

// receive the second frame
var incomingData2 = try incoming.receive();
defer incomingData2.deinit();

try std.testing.expectEqual(msg.len, try incomingData2.size());
try std.testing.expectEqualStrings(msg, try incomingData2.data());
try std.testing.expectEqual(false, try incomingData2.hasMore());
}

0 comments on commit 8672319

Please sign in to comment.