Skip to content

Commit 3e82ff5

Browse files
committed
refactor: [PPT-2296] Source should sync state when new brokers are added
1 parent 888670e commit 3e82ff5

File tree

5 files changed

+237
-0
lines changed

5 files changed

+237
-0
lines changed

spec/broker_sync_spec.cr

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
require "./spec_helper"
2+
3+
module PlaceOS::Source
4+
describe "Broker State Sync" do
5+
it "callback is invoked when new broker is added after startup" do
6+
# Create initial broker
7+
test_broker
8+
9+
# Setup MQTT broker manager
10+
mqtt_manager = MqttBrokerManager.new
11+
12+
# Track if callback was invoked
13+
callback_invoked = false
14+
callback_broker_id = ""
15+
16+
mqtt_manager.on_broker_ready = ->(broker_id : String) {
17+
callback_invoked = true
18+
callback_broker_id = broker_id
19+
}
20+
21+
# Start the manager to mark startup as finished
22+
mqtt_manager.start
23+
24+
# Wait for startup to complete
25+
sleep 200.milliseconds
26+
27+
# Create a new broker after startup
28+
new_broker = PlaceOS::Model::Broker.new(
29+
name: "new-broker-#{Time.utc.to_unix}",
30+
host: ENV["MQTT_HOST"]?.presence || "mqtt",
31+
port: ENV["MQTT_PORT"]?.presence.try(&.to_i?) || 1883,
32+
auth_type: :no_auth,
33+
).save!
34+
35+
# Trigger the broker creation event
36+
event = Resource::Event(PlaceOS::Model::Broker).new(:created, new_broker)
37+
mqtt_manager.@event_channel.send(event)
38+
39+
# Wait for broker to be processed
40+
sleep 300.milliseconds
41+
42+
# Verify the broker was created successfully
43+
mqtt_manager.@publishers[new_broker.id.as(String)]?.should_not be_nil
44+
45+
# Verify the callback was invoked
46+
callback_invoked.should be_true
47+
callback_broker_id.should eq new_broker.id.as(String)
48+
49+
# Cleanup
50+
mqtt_manager.stop
51+
new_broker.destroy
52+
end
53+
54+
it "callback is not invoked for brokers created during startup" do
55+
# Setup MQTT broker manager
56+
mqtt_manager = MqttBrokerManager.new
57+
58+
# Track if callback was invoked
59+
callback_invoked = false
60+
61+
mqtt_manager.on_broker_ready = ->(_broker_id : String) {
62+
callback_invoked = true
63+
}
64+
65+
# Create broker before starting (simulating existing broker)
66+
startup_broker = test_broker
67+
68+
# Start the manager (this will load existing brokers)
69+
mqtt_manager.start
70+
71+
# Wait for startup to complete
72+
sleep 200.milliseconds
73+
74+
# Verify the broker was loaded
75+
mqtt_manager.@publishers[startup_broker.id.as(String)]?.should_not be_nil
76+
77+
# Verify the callback was NOT invoked during startup
78+
callback_invoked.should be_false
79+
80+
# Cleanup
81+
mqtt_manager.stop
82+
end
83+
84+
it "resync_state only runs after initial sync completes" do
85+
mock_mappings_state = mock_state(module_id: "mod-test")
86+
mock_mappings = Mappings.new(mock_mappings_state)
87+
mock_publisher = MockManager.new
88+
89+
status_events = StatusEvents.new(mock_mappings, [mock_publisher] of PublisherManager)
90+
91+
# Before initial sync, resync should not run
92+
status_events.resync_state
93+
mock_publisher.messages.size.should eq 0
94+
95+
# Start to trigger initial sync
96+
spawn { status_events.start }
97+
98+
# Wait for initial sync
99+
sleep 300.milliseconds
100+
101+
# Clear messages from initial sync
102+
mock_publisher.messages.clear
103+
104+
# Now resync should work
105+
status_events.resync_state
106+
107+
# Wait for resync to process
108+
sleep 200.milliseconds
109+
110+
# Cleanup
111+
status_events.stop
112+
end
113+
114+
it "full integration: new broker receives state via resync" do
115+
# Create initial broker
116+
test_broker
117+
118+
# Setup mock publisher to track messages
119+
mock_publisher = MockManager.new
120+
publisher_managers = [mock_publisher] of PublisherManager
121+
122+
# Add MQTT broker manager
123+
mqtt_manager = MqttBrokerManager.new
124+
publisher_managers << mqtt_manager
125+
126+
# Mock data with a module that has proper mappings
127+
module_id = "mod-integration-test"
128+
status_key = "power"
129+
mock_mappings_state = mock_state(module_id: module_id)
130+
mock_mappings = Mappings.new(mock_mappings_state)
131+
132+
# Start application manager
133+
manager = Manager.new(publisher_managers, mock_mappings)
134+
manager.start
135+
136+
# Wait for initial sync to complete
137+
sleep 300.milliseconds
138+
139+
# Store module state in Redis
140+
Redis.open(url: REDIS_URL) do |client|
141+
client.set("status/#{module_id}/#{status_key}", "on".to_json)
142+
end
143+
144+
# Clear any messages from initial sync
145+
mock_publisher.messages.clear
146+
147+
# Create a new broker after startup
148+
new_broker = PlaceOS::Model::Broker.new(
149+
name: "integration-broker-#{Time.utc.to_unix}",
150+
host: ENV["MQTT_HOST"]?.presence || "mqtt",
151+
port: ENV["MQTT_PORT"]?.presence.try(&.to_i?) || 1883,
152+
auth_type: :no_auth,
153+
).save!
154+
155+
# Trigger the broker creation event
156+
event = Resource::Event(PlaceOS::Model::Broker).new(:created, new_broker)
157+
mqtt_manager.@event_channel.send(event)
158+
159+
# Wait for broker to be processed and state resync to occur
160+
sleep 500.milliseconds
161+
162+
# Verify the broker was created
163+
mqtt_manager.@publishers[new_broker.id.as(String)]?.should_not be_nil
164+
165+
# Verify the callback was wired up by the manager
166+
mqtt_manager.on_broker_ready.should_not be_nil
167+
168+
# Cleanup
169+
manager.stop
170+
new_broker.destroy
171+
end
172+
end
173+
end

src/source/manager.cr

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ module PlaceOS::Source
3434
return if started?
3535
@started = true
3636

37+
# Start publisher managers first to establish connections
3738
Log.info { "registering Publishers" }
3839
publisher_managers.each(&.start)
3940

@@ -53,6 +54,16 @@ module PlaceOS::Source
5354
Log.info { "starting Driver router" }
5455
driver_router.start
5556

57+
# Setup callback for new broker connections to trigger state resync
58+
publisher_managers.each do |manager|
59+
if manager.is_a?(MqttBrokerManager)
60+
manager.on_broker_ready = ->(broker_id : String) {
61+
Log.info { "triggering state resync for new Broker<#{broker_id}>" }
62+
spawn { status_events.resync_state }
63+
}
64+
end
65+
end
66+
5667
Log.info { "listening for Module state events" }
5768
spawn { status_events.start }
5869

src/source/publishing/mqtt_broker_manager.cr

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ module PlaceOS::Source
1111

1212
Log = ::Log.for(self)
1313

14+
# Callback to trigger state sync when new brokers are added
15+
property on_broker_ready : Proc(String, Nil)?
16+
1417
class_getter instance : self { new }
1518

1619
# Broadcast a message to each MQTT Broker
@@ -61,6 +64,12 @@ module PlaceOS::Source
6164

6265
publisher.start
6366

67+
# Trigger state sync callback if this is a new broker after startup
68+
if startup_finished?
69+
Log.info { "new broker connected after startup, triggering state sync for Broker<#{broker_id}>" }
70+
on_broker_ready.try &.call(broker_id)
71+
end
72+
6473
Resource::Result::Success
6574
end
6675

src/source/publishing/mqtt_publisher.cr

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ module PlaceOS::Source
8585
password: broker.password,
8686
)
8787

88+
# Small delay to ensure connection is fully established
89+
sleep 100.milliseconds
90+
8891
close_channel = Channel(Nil).new(1)
8992

9093
repeating_task = Tasker.every((keep_alive // 3).seconds) do

src/source/status_events.cr

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ module PlaceOS::Source
2222
private getter publisher_managers : Array(PublisherManager)
2323

2424
private property? stopped : Bool = true
25+
private property? initial_sync_complete : Bool = false
2526

2627
private getter sync_lock = Mutex.new(:reentrant)
2728

@@ -95,6 +96,46 @@ module PlaceOS::Source
9596
modules: mods_mapped.to_s,
9697
values: status_updated.to_s,
9798
} }
99+
self.initial_sync_complete = true
100+
end
101+
102+
# Trigger a state resync - useful when new brokers are added
103+
def resync_state
104+
return unless initial_sync_complete?
105+
106+
Log.info { "resyncing state for new broker connection" }
107+
108+
mods_mapped = 0_u64
109+
status_updated = 0_u64
110+
pattern = "broker_resync"
111+
112+
PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules|
113+
modules.each do |mod|
114+
next unless mod
115+
mods_mapped += 1_u64
116+
module_id = mod.id.to_s
117+
store = PlaceOS::Driver::RedisStorage.new(module_id)
118+
store.each do |key, value|
119+
status_updated += 1_u64
120+
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value, timestamp: Time.utc})
121+
end
122+
123+
# Backpressure if event container is growing too fast
124+
if event_container.size >= MAX_CONTAINER_SIZE / 2
125+
until event_container.size < MAX_CONTAINER_SIZE / 4
126+
sleep 10.milliseconds
127+
end
128+
end
129+
rescue error
130+
Log.warn(exception: error) { "error resyncing #{mod.try(&.id)}" }
131+
end
132+
end
133+
134+
Log.info { {
135+
message: "state resync complete",
136+
modules: mods_mapped.to_s,
137+
values: status_updated.to_s,
138+
} }
98139
end
99140

100141
protected def handle_pevent(pattern : String, channel : String, payload : String)

0 commit comments

Comments
 (0)