Skip to content

Commit

Permalink
core: move send out of self
Browse files Browse the repository at this point in the history
  • Loading branch information
flowerinthenight committed Sep 29, 2024
1 parent 74c89d4 commit 6ce8266
Showing 1 changed file with 27 additions and 27 deletions.
54 changes: 27 additions & 27 deletions src/zgroup.zig
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ pub fn Fleet(UserData: type) type {
msg.cmd = .join;
try self.setMsgSrcToOwn(msg);

try self.send(dst_ip, dst_port, buf, null);
try send(dst_ip, dst_port, buf, null);

switch (msg.cmd) {
.ack => {
Expand Down Expand Up @@ -950,7 +950,7 @@ pub fn Fleet(UserData: type) type {
continue;

msg.proto1 = self.getTerm();
self.send(ip, port, buf, null) catch continue;
send(ip, port, buf, null) catch continue;

if (msg.cmd != .ack) continue;

Expand Down Expand Up @@ -1071,7 +1071,7 @@ pub fn Fleet(UserData: type) type {
self.setTermAndN(msg);

ltm.reset();
self.send(ip, port, buf, 50_000) catch |err| {
send(ip, port, buf, 50_000) catch |err| {
log.err("[{d}] hb:send failed: {any}", .{ i, err });
fails += 1;
continue;
Expand Down Expand Up @@ -1260,7 +1260,7 @@ pub fn Fleet(UserData: type) type {
const n = self.getCounts();
msg.proto2 = n[0] + n[1];

try self.send(ip, port, buf, null);
try send(ip, port, buf, null);

// Handle join address protocol (ingress).
const cmdm: JoinCmd = @enumFromInt((msg.proto1 &
Expand Down Expand Up @@ -1381,28 +1381,6 @@ pub fn Fleet(UserData: type) type {
}
}

// Helper function for internal one-shot send/recv. The same message ptr is
// used for both request and response payloads. If `tm_us` is not null,
// default timeout will be 5s.
fn send(_: *Self, ip: []const u8, port: u16, msg: []u8, tm_us: ?u32) !void {
const addr = try std.net.Address.resolveIp(ip, port);
const sock = try std.posix.socket(
std.posix.AF.INET,
std.posix.SOCK.DGRAM | std.posix.SOCK.CLOEXEC,
0,
);

var tm: u32 = 1_000_000;
if (tm_us) |v| tm = v;

defer std.posix.close(sock);
try setReadTimeout(sock, tm);
try setWriteTimeout(sock, tm);
try std.posix.connect(sock, &addr.any, addr.getOsSockLen());
_ = try std.posix.write(sock, msg);
_ = try std.posix.recv(sock, msg, 0);
}

// Handle the isd_* infection protocol of the message payload.
// We are passing in an arena allocator here.
fn handleIsd(self: *Self, allocator: std.mem.Allocator, msg: *Message, force: bool) !void {
Expand Down Expand Up @@ -1841,7 +1819,7 @@ pub fn Fleet(UserData: type) type {
const ip = leader[0..sep];
const port = try std.fmt.parseUnsigned(u16, leader[sep + 1 ..], 10);

try self.send(ip, port, msg, null);
try send(ip, port, msg, null);
}

fn getTerm(self: *Self) u64 {
Expand Down Expand Up @@ -1924,6 +1902,28 @@ pub fn Fleet(UserData: type) type {
};
}

// Helper function for internal one-shot send/recv. The same message ptr is
// used for both request and response payloads. If `tm_us` is not null,
// default timeout will be used.
fn send(ip: []const u8, port: u16, msg: []u8, tm_us: ?u32) !void {
const addr = try std.net.Address.resolveIp(ip, port);
const sock = try std.posix.socket(
std.posix.AF.INET,
std.posix.SOCK.DGRAM | std.posix.SOCK.CLOEXEC,
0,
);

var tm: u32 = 1_000_000;
if (tm_us) |v| tm = v;

defer std.posix.close(sock);
try setReadTimeout(sock, tm);
try setWriteTimeout(sock, tm);
try std.posix.connect(sock, &addr.any, addr.getOsSockLen());
_ = try std.posix.write(sock, msg);
_ = try std.posix.recv(sock, msg, 0);
}

/// Converts an ip and port to a string with format ip:port, eg. "127.0.0.1:8080".
/// Caller is responsible for releasing the returned memory.
fn keyFromIpPort(allocator: std.mem.Allocator, ip: u32, port: u16) ![]const u8 {
Expand Down

0 comments on commit 6ce8266

Please sign in to comment.