diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 757e4ac827..eb4a539428 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -281,8 +281,12 @@ namespace eCAL iter->second->ApplyLocLayerParameter(process_id, topic_id, tlayer.type(), writer_par); } - // inform for local publisher connection - iter->second->ApplyLocPublication(process_id, topic_id, topic_info); + // we only inform the subscriber when the publisher has already recognized at least one local subscriber + // this should avoid to set the "IsPublished" state before the publisher is able to send data + if (ecal_sample_.topic().connections_loc() > 0) + { + iter->second->ApplyLocPublication(process_id, topic_id, topic_info); + } } } @@ -331,8 +335,12 @@ namespace eCAL const std::string writer_par = tlayer.par_layer().SerializeAsString(); iter->second->ApplyExtLayerParameter(host_name, tlayer.type(), writer_par); } - // inform for external publisher connection - iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info); + // we only inform the subscriber when the publisher has already recognized at least one external subscriber + // this should avoid to set the "IsPublished" state before the publisher is able to send data + if (ecal_sample_.topic().connections_ext() > 0) + { + iter->second->ApplyExtPublication(host_name, process_id, topic_id, topic_info); + } } } diff --git a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp index dfba50e79d..2ffa34b74c 100644 --- a/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp +++ b/testing/ecal/pubsub_test/src/pubsub_receive_test.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -219,3 +219,97 @@ TEST(SUBSCRIBER, SporadicEmptyReceives) // finalize eCAL API EXPECT_EQ(0, eCAL::Finalize()); } + +TEST(PubSub, TestSubscriberSeen) +{ + // initialize eCAL API + EXPECT_EQ(0, eCAL::Initialize(0, nullptr, "subscriber_seen")); + + // enable data loopback + eCAL::Util::EnableLoopback(true); + + std::atomic subscriber_seen_at_publication_start(false); + std::atomic subscriber_seen_at_publication_end(false); + + std::atomic do_start_publication(false); + std::atomic publication_finished(false); + + // publishing thread + auto publisher_thread = [&]() { + eCAL::CPublisher pub("blob"); + pub.ShmSetAcknowledgeTimeout(500); + + int cnt(0); + const auto max_runs(1000); + while (eCAL::Ok()) + { + if (do_start_publication && cnt < max_runs) + { + if (cnt == 0) + { + subscriber_seen_at_publication_start = pub.IsSubscribed(); + } + + pub.Send(std::to_string(cnt)); + cnt++; + + if (cnt == max_runs) + { + subscriber_seen_at_publication_end = pub.IsSubscribed(); + publication_finished = true; + break; + } + } + } + }; + + // subscribing thread + auto subscriber_thread = [&]() { + eCAL::CSubscriber sub("blob"); + bool received(false); + auto max_lines(10); + auto receive_lambda = [&received, &max_lines](const char* /*topic_name_*/, const struct eCAL::SReceiveCallbackData* data_) + { + if (max_lines != 0) + { + // the final log should look like this + // ----------------------------------- + // Receiving 0 + // Receiving 1 + // Receiving 2 + // Receiving 3 + // Receiving 4 + // Receiving 5 + // Receiving 6 + // Receiving 7 + // Receiving 8 + // Receiving 9 + // ----------------------------------- + std::cout << "Receiving " << std::string(static_cast(data_->buf), data_->size) << std::endl; + max_lines--; + } + }; + sub.AddReceiveCallback(receive_lambda); + + while (eCAL::Ok() && !publication_finished) + { + //if (sub.IsPublished()) do_start_publication = true; + if (sub.GetPublisherCount() > 0) do_start_publication = true; + } + }; + + // create threads for publisher and subscriber + std::thread pub_thread(publisher_thread); + std::thread sub_thread(subscriber_thread); + + // join threads to the main thread + pub_thread.join(); + sub_thread.join(); + + // finalize eCAL API + eCAL::Finalize(); + + // check if the publisher has seen the subscriber + EXPECT_TRUE(subscriber_seen_at_publication_start); + EXPECT_TRUE(subscriber_seen_at_publication_end); +}