Skip to content

Commit

Permalink
v0.3.0 (#44)
Browse files Browse the repository at this point in the history
* Fix wording - reported by @nathanr

* #15 - Retain Discovery Messages (#42)

* Fixes #15 - Retion discovery messages.

Improved documentation.

* Added ability to only run a single discovery job.

(Also, fixes an error if anyone ever made a discovery interval less then 0.)

* removed unused line.

* Implements #43 - Adds Status topic & Last will message.

* Fixes #40 - Adds random segment to clientid.

Also, fixes keep-alive messages.
  • Loading branch information
XtremeOwnageDotCom authored Sep 7, 2024
1 parent eb661aa commit 55fe188
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 11 deletions.
17 changes: 17 additions & 0 deletions Examples/Configuration/config.spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,26 @@ Overrides:
Enabled: false

HomeAssistant:
# Is home assistant mQTT-discovery enabled?
# If you want to manually create entities, or, just want your PDU data pushed to MQTT without home-assistant, change to false.
# (Optional) Default: false
DiscoveryEnabled: true

# Should discovery messages be retained? Strongly recommend leaving this to true!
# (Optional) Default: true
DiscoveryRetain: true

# This is the home-assistant discovery topic.
# Should prob leave this set as-is.
DiscoveryTopic: "homeassistant/discovery"

# How often should discovery messages be sent?
# 0 = Send Once at startup.
# Note, if you have DiscoveryRetain=true, the only benefit to running more discovery jobs- is to update the names of entityes
# based on labels set in the PDU.
# (Optional) Default: 0
DiscoveryInterval: 300

# Default expireAfter interval applied to all sensors. After this time- the sensor will be marked as unavailable.
SensorExpireAfterSeconds: 300

Expand Down
53 changes: 53 additions & 0 deletions rPDU2MQTT/Classes/MqttEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using HiveMQtt.Client;
using System.Diagnostics;
using System.Timers;
using Timer = System.Timers.Timer;

namespace rPDU2MQTT.Classes;

public class MqttEventHandler
{
private readonly HiveMQClient client;
private readonly Timer _timer;
public MqttEventHandler(HiveMQClient client)
{
this.client = client;
client.AfterConnect += Client_AfterConnect;
client.OnDisconnectSent += Client_OnDisconnectSent;
client.OnDisconnectReceived += Client_OnDisconnectReceived;

_timer = new Timer(TimeSpan.FromSeconds(10));
_timer.Elapsed += HealthTimer;
_timer.AutoReset = true; // Ensures the event fires repeatedly at the interval
_timer.Enabled = true; // Starts the timer
}

private void Client_OnDisconnectReceived(object? sender, HiveMQtt.Client.Events.OnDisconnectReceivedEventArgs e)
{
Log.Error("Received disconnect from broker.");
}

private void Client_OnDisconnectSent(object? sender, HiveMQtt.Client.Events.OnDisconnectSentEventArgs e)
{
Log.Information("Sending disconnect to broker");
}

private void HealthTimer(object? sender, ElapsedEventArgs e)
{
Log.Verbose("Timer.Tick()");
sendStatusOnline();

}

private void Client_AfterConnect(object? sender, HiveMQtt.Client.Events.AfterConnectEventArgs e)
{
Log.Information("MQTT Client Connected");
sendStatusOnline();
}


private void sendStatusOnline()
{
TaskManager.AddTask(() => client.PublishAsync(client.Options!.LastWillAndTestament!.Topic, "online", HiveMQtt.MQTT5.Types.QualityOfService.AtLeastOnceDelivery));
}
}
32 changes: 32 additions & 0 deletions rPDU2MQTT/Classes/TaskManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System.Collections.Concurrent;

namespace rPDU2MQTT.Classes;

/// <summary>
/// This class, exists to watch tasks until completion. Non-async code, which needs to execute a task, just passes the task off to this class.
/// </summary>
public static class TaskManager
{
private static readonly ConcurrentBag<Task> taskBag = new ConcurrentBag<Task>();

public static void AddTask(Action task)
{
Task newTask = Task.Run(task);
taskBag.Add(newTask);
}

public static void AddTask(Task task)
{
taskBag.Add(task);
}

public static void WaitForAllTasks()
{
foreach (var task in taskBag)
{
task.Wait();
}

taskBag.Clear(); // ConcurrentBag doesn't have a clear method, but assigning a new one is equivalent to clearing.
}
}
10 changes: 9 additions & 1 deletion rPDU2MQTT/Models/Config/HomeAssistantConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,15 @@ public class HomeAssistantConfig
/// <summary>
/// How often should discovery data be published?
/// </summary>
public int DiscoveryInterval { get; set; } = 300;
/// <remarks>
/// A value of 0, will run a single discovery, and not run a re-discovery until the application is restarted.
/// </remarks>
public int DiscoveryInterval { get; set; } = 0;

/// <summary>
/// Should discovery messages be retained?
/// </summary>
public bool DiscoveryRetain { get; set; } = true;

/// <summary>
/// Default expireAfter interval applied to all sensors. After this time- the sensor will be marked as unavailable.
Expand Down
1 change: 0 additions & 1 deletion rPDU2MQTT/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

//Ensure we can actually connect to MQTT.
var client = host.Services.GetRequiredService<IHiveMQClient>();
var logger = host.Services.GetRequiredService<ILogger<IHiveMQClient>>();

Log.Information($"Connecting to MQTT Broker at {client.Options.Host}:{client.Options.Port}");

Expand Down
3 changes: 2 additions & 1 deletion rPDU2MQTT/Services/baseTypes/baseDiscoveryService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ protected Task PushDiscoveryMessage<T>(T sensor, CancellationToken cancellationT
var msg = new MQTT5PublishMessage(topic, QualityOfService.AtLeastOnceDelivery)
{
ContentType = "json",
PayloadAsString = System.Text.Json.JsonSerializer.Serialize<T>(sensor, this.jsonOptions)
PayloadAsString = System.Text.Json.JsonSerializer.Serialize<T>(sensor, this.jsonOptions),
Retain = cfg.HASS.DiscoveryRetain,
};

if (cfg.Debug.PrintDiscovery)
Expand Down
28 changes: 23 additions & 5 deletions rPDU2MQTT/Services/baseTypes/baseMQTTTService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ namespace rPDU2MQTT.Services.baseTypes;
/// </summary>
public abstract class baseMQTTTService : IHostedService, IDisposable
{
private readonly int interval;
private IHiveMQClient mqtt { get; init; }
private PeriodicTimer timer;
private PeriodicTimer? timer;
private Task timerTask = Task.CompletedTask;
protected Config cfg { get; }
protected PDU pdu { get; }
Expand All @@ -24,11 +23,16 @@ public abstract class baseMQTTTService : IHostedService, IDisposable
protected baseMQTTTService(MQTTServiceDependancies dependancies) : this(dependancies, dependancies.Cfg.PDU.PollInterval) { }
protected baseMQTTTService(MQTTServiceDependancies dependancies, int Interval)
{
interval = Interval;
mqtt = dependancies.Mqtt;
cfg = dependancies.Cfg;
pdu = dependancies.PDU;
timer = new PeriodicTimer(TimeSpan.FromSeconds(Interval));

// If the interval is 0, don't create a timer.
if (Interval <= 0)
timer = null;
else
timer = new PeriodicTimer(TimeSpan.FromSeconds(Interval));

jsonOptions = new System.Text.Json.JsonSerializerOptions
{
WriteIndented = true,
Expand All @@ -46,13 +50,22 @@ protected baseMQTTTService(MQTTServiceDependancies dependancies, int Interval)

public async Task StartAsync(CancellationToken cancellationToken)
{
if (timer is null)
{
Log.Information($"{GetType().Name} - performing single discovery.");
await Execute(cancellationToken);
return;

}

Log.Information($"{GetType().Name} is starting.");

timerTask = Task.Run(() => timerTaskExecution(cancellationToken).Wait());

//Kick off the first one manually.
await Execute(cancellationToken);

// Log message to indicate the service has been started.
Log.Information($"{GetType().Name} is running.");
}

Expand All @@ -75,9 +88,14 @@ public Task StopAsync(CancellationToken stoppingToken)
return Task.CompletedTask;
}

~baseMQTTTService()
{
Log.Debug($"{GetType().Name} has been finalized");
}

public void Dispose()
{
timer.Dispose();
timer?.Dispose();
}

/// <summary>
Expand Down
17 changes: 14 additions & 3 deletions rPDU2MQTT/Startup/ServiceConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@ public static void Configure(HostBuilderContext context, IServiceCollection serv
ThrowError.TestRequiredConfigurationSection(cfg.MQTT.Connection.Host, "MQTT.Connection.Host");
ThrowError.TestRequiredConfigurationSection(cfg.MQTT.Connection.Host, "MQTT.Connection.Port");

var lwt = new LastWillAndTestament(
MQTTHelper.JoinPaths(cfg.MQTT.ParentTopic, "Status"), payload: "offline", HiveMQtt.MQTT5.Types.QualityOfService.AtLeastOnceDelivery, true);

var mqttBuilder = new HiveMQClientOptionsBuilder()
.WithBroker(cfg.MQTT.Connection.Host)
.WithPort(cfg.MQTT.Connection.Port!.Value)
.WithClientId(cfg.MQTT.ClientID ?? "rpdu2mqtt")
.WithAutomaticReconnect(true);
.WithClientId((cfg.MQTT.ClientID ?? "rpdu2mqtt") + Guid.NewGuid().ToString())
.WithAutomaticReconnect(true)
.WithKeepAlive(cfg.MQTT.KeepAlive)
.WithLastWillAndTestament(lwt);

if (cfg.MQTT.Credentials?.Username is not null)
mqttBuilder.WithUserName(cfg.MQTT.Credentials.Username);
Expand All @@ -43,7 +48,13 @@ public static void Configure(HostBuilderContext context, IServiceCollection serv
mqttBuilder.WithPassword(cfg.MQTT.Credentials.Password);

// Return new client, with options applied.
return new HiveMQClient(mqttBuilder.Build());
var x = new HiveMQClient(mqttBuilder.Build());

// While we are here- lets go ahead and create / bind the event handler.
services.AddSingleton<MqttEventHandler>(new MqttEventHandler(x));
return x;


});

//Configure Services
Expand Down

0 comments on commit 55fe188

Please sign in to comment.