forked from rbino/zanzara
-
Notifications
You must be signed in to change notification settings - Fork 0
/
example.zig
95 lines (81 loc) · 3.83 KB
/
example.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
const std = @import("std");
const net = std.net;
const os = std.os;
const zanzara = @import("src/zanzara.zig");
const DefaultClient = zanzara.mqtt4.DefaultClient;
const Subscribe = zanzara.mqtt4.packet.Subscribe;
const TcpConnectToAddressError = std.posix.SocketError || std.posix.ConnectError;
fn tcpConnectToAddressNonBlock(address: net.Address) TcpConnectToAddressError!net.Stream {
const sock_flags = std.posix.SOCK.STREAM | std.posix.SOCK.NONBLOCK;
const sockfd = try std.posix.socket(address.any.family, sock_flags, std.posix.IPPROTO.TCP);
errdefer net.Stream.close(.{ .handle = sockfd });
std.posix.connect(sockfd, &address.any, address.getOsSockLen()) catch |err| {
switch (err) {
error.WouldBlock => std.time.sleep(1 * std.time.ns_per_s), // Todo: handle this better
else => return err,
}
};
return net.Stream{ .handle = sockfd };
}
fn tcpConnectToHostNonBlock(allocator: std.mem.Allocator, name: []const u8, port: u16) net.TcpConnectToHostError!net.Stream {
const list = try net.getAddressList(allocator, name, port);
defer list.deinit();
if (list.addrs.len == 0) return error.UnknownHostName;
for (list.addrs) |addr| {
return tcpConnectToAddressNonBlock(addr) catch |err| switch (err) {
error.ConnectionRefused => {
continue;
},
else => return err,
};
}
return std.posix.ConnectError.ConnectionRefused;
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
const stream = try tcpConnectToHostNonBlock(allocator, "mqtt.eclipseprojects.io", 1883);
const socket = stream.handle;
const writer = stream.writer();
var mqtt_buf: [32 * 2048]u8 = undefined;
var client = try DefaultClient.init(mqtt_buf[0 .. 1024 * 32], mqtt_buf[1024 * 32 ..]);
// See ConnectOpts for additional options
try client.connect(.{ .client_id = "zanzara" });
var read_buf: [32 * 2048]u8 = undefined;
while (true) {
const bytes = std.posix.recv(socket, &read_buf, 0) catch |err|
if (err == error.WouldBlock) 0 else return err;
var rest = read_buf[0..bytes];
while (true) {
// The driving force of the client is the client.feed() function
// This must be called periodically, either passing some data coming from the network
// or with an empty slice (if no incoming data is present) to allow the client to handle
// its periodic tasks, like pings etc.
const event = client.feed(rest);
switch (event.data) {
.incoming_packet => |p| {
switch (p) {
.connack => {
std.debug.print("Connected, sending subscriptions\n", .{});
// Subscribe to the topic we're publishing on
const topics = [_]Subscribe.Topic{
.{ .topic_filter = "zig/zanzara_in", .qos = .qos2 },
};
_ = try client.subscribe(&topics);
_ = try client.publish("zig/zanzara_out", "Howdy!", .{});
},
.publish => |pb| {
std.debug.print("Received publish on topic {s} with payload {s}\n", .{ pb.topic, pb.payload });
},
else => std.debug.print("Received packet: {}\n", .{p}),
}
},
.outgoing_buf => |b| try writer.writeAll(b), // Write pending stuff to the socket
.err => |e| std.debug.print("Error event: {}\n", .{e}),
.none => {},
}
rest = rest[event.consumed..];
if (rest.len == 0) break;
}
}
}