Skip to content

Commit

Permalink
use a more compatible approach for non-blocking sockets.
Browse files Browse the repository at this point in the history
(instead of a linux-only flag)
  • Loading branch information
adriweb committed Sep 12, 2024
1 parent 8a37811 commit db450c1
Showing 1 changed file with 39 additions and 6 deletions.
45 changes: 39 additions & 6 deletions example.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,58 @@ const zanzara = @import("src/zanzara.zig");
const DefaultClient = zanzara.mqtt4.DefaultClient;
const Subscribe = zanzara.mqtt4.packet.Subscribe;

const TcpConnectToAddressError = std.posix.SocketError || std.posix.ConnectError;

fn tcpConnectToAddressNonBlock(address: net.Address) TcpConnectToAddressError!net.Stream {
const sock_flags = std.posix.SOCK.STREAM | std.posix.SOCK.NONBLOCK;
const sockfd = try std.posix.socket(address.any.family, sock_flags, std.posix.IPPROTO.TCP);
errdefer net.Stream.close(.{ .handle = sockfd });

std.posix.connect(sockfd, &address.any, address.getOsSockLen()) catch |err| {
switch (err) {
error.WouldBlock => std.time.sleep(1 * std.time.ns_per_s), // Todo: handle this better
else => return err,
}
};

return net.Stream{ .handle = sockfd };
}

fn tcpConnectToHostNonBlock(allocator: std.mem.Allocator, name: []const u8, port: u16) net.TcpConnectToHostError!net.Stream {
const list = try net.getAddressList(allocator, name, port);
defer list.deinit();

if (list.addrs.len == 0) return error.UnknownHostName;

for (list.addrs) |addr| {
return tcpConnectToAddressNonBlock(addr) catch |err| switch (err) {
error.ConnectionRefused => {
continue;
},
else => return err,
};
}
return std.posix.ConnectError.ConnectionRefused;
}

pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();

const stream = try net.tcpConnectToHost(allocator, "mqtt.eclipseprojects.io", 1883);
const stream = try tcpConnectToHost(allocator, "mqtt.eclipseprojects.io", 1883);
const socket = stream.handle;
const writer = stream.writer();

var mqtt_buf: [2048]u8 = undefined;
var mqtt_buf: [32 * 2048]u8 = undefined;
var client = try DefaultClient.init(mqtt_buf[0 .. 1024 * 32], mqtt_buf[1024 * 32 ..]);

var client = try DefaultClient.init(mqtt_buf[0..1024], mqtt_buf[1024..]);
// See ConnectOpts for additional options
try client.connect(.{ .client_id = "zanzara" });

var read_buf: [2048]u8 = undefined;
var read_buf: [32 * 2048]u8 = undefined;

while (true) {
// We use os.MSG.DONTWAIT so the socket returns WouldBlock if no data is present
const bytes = std.posix.recv(socket, &read_buf, std.posix.MSG.DONTWAIT) catch |err|
const bytes = std.posix.recv(socket, &read_buf, 0) catch |err|
if (err == error.WouldBlock) 0 else return err;
var rest = read_buf[0..bytes];
while (true) {
Expand Down

0 comments on commit db450c1

Please sign in to comment.