diff --git a/lib/mqtt/client.rb b/lib/mqtt/client.rb index e46d14c..f5f3b9f 100644 --- a/lib/mqtt/client.rb +++ b/lib/mqtt/client.rb @@ -332,14 +332,11 @@ def publish(topic, payload = '', retain = false, qos = 0) :payload => payload ) - # Send the packet - res = send_packet(packet) - - return if qos.zero? + queue = qos.zero? ? nil : wait_for_puback(packet.id) - queue = Queue.new + res = send_packet(packet) - wait_for_puback packet.id, queue + return unless queue deadline = current_time + @ack_timeout @@ -488,9 +485,9 @@ def receive_packet Thread.current[:parent].raise(exp) end - def wait_for_puback(id, queue) + def wait_for_puback(id) @pubacks_semaphore.synchronize do - @pubacks[id] = queue + @pubacks[id] = Queue.new end end @@ -502,7 +499,7 @@ def handle_packet(packet) @last_ping_response = current_time elsif packet.class == MQTT::Packet::Puback @pubacks_semaphore.synchronize do - @pubacks[packet.id] << packet + @pubacks[packet.id] << packet if @pubacks[packet.id] end end # Ignore all other packets diff --git a/spec/mqtt_client_spec.rb b/spec/mqtt_client_spec.rb index c09af57..28a701f 100644 --- a/spec/mqtt_client_spec.rb +++ b/spec/mqtt_client_spec.rb @@ -641,11 +641,11 @@ def inject_puback(packet) @injected_pubacks[packet.id] = packet end - def wait_for_puback(id, queue) + def wait_for_puback(id) packet = @injected_pubacks.fetch(id) { return super } - queue << packet + Queue.new << packet end end @@ -745,6 +745,20 @@ def wait_for_puback(id, queue) expect(client).to receive(:send_packet) { |packet| expect(packet.id).to eq(2) } client.publish "topic", "message", false, 1 end + + it "does not crash when receiving a PUBACK for a packet it never sent" do + expect { client.send(:handle_packet, MQTT::Packet::Puback.new(:id => 666)) }.to_not raise_error + end + + it "does not crash with QoS 1 when the broker sends the PUBACK instantly" do + allow(client).to receive(:send_packet).and_wrap_original do |send_packet, packet, *args, **kwargs, &block| + send_packet.call(packet, *args, **kwargs, &block).tap do + client.send(:handle_packet, MQTT::Packet::Puback.new(:id => packet.id)) + end + end + + expect { client.publish("topic", "message", false, 1) }.to_not raise_error + end end describe "when calling the 'subscribe' method" do