From c85c2a2e06986ff14970d7cb3fa80db08f81d236 Mon Sep 17 00:00:00 2001 From: TwinFan Date: Sun, 10 Dec 2023 23:50:26 +0100 Subject: [PATCH] Refactor: RealTraffic working --- Include/LTADSBEx.h | 4 +- Include/LTADSBHub.h | 7 +- Include/LTChannel.h | 13 +- Include/LTFSCharter.h | 2 +- Include/LTForeFlight.h | 8 +- Include/LTOpenGlider.h | 2 +- Include/LTOpenSky.h | 4 +- Include/LTRealTraffic.h | 46 +- .../xcdebugger/Breakpoints_v2.xcbkptlist | 66 --- Src/LTADSBEx.cpp | 14 +- Src/LTADSBHub.cpp | 7 +- Src/LTChannel.cpp | 28 +- Src/LTFSCharter.cpp | 4 +- Src/LTForeFlight.cpp | 15 +- Src/LTOpenGlider.cpp | 4 +- Src/LTOpenSky.cpp | 8 +- Src/LTRealTraffic.cpp | 437 +++++++----------- 17 files changed, 250 insertions(+), 419 deletions(-) diff --git a/Include/LTADSBEx.h b/Include/LTADSBEx.h index 59ab78d1..a285d8be 100644 --- a/Include/LTADSBEx.h +++ b/Include/LTADSBEx.h @@ -134,7 +134,7 @@ class ADSBExchangeConnection : public LTFlightDataChannel public: ADSBExchangeConnection (); std::string GetURL (const positionTy& pos) override; - bool ProcessFetchedData (mapLTFlightDataTy& fdMap) override; + bool ProcessFetchedData () override; std::string GetStatusText () const override; ///< return a human-readable staus // // shall data of this channel be subject to LTFlightData::DataSmoothing? // bool DoDataSmoothing (double& gndRange, double& airbRange) const override @@ -149,12 +149,10 @@ class ADSBExchangeConnection : public LTFlightDataChannel /// Process v2 data void ProcessV2 (JSON_Object* pJAc, LTFlightData::FDKeyTy& fdKey, - mapLTFlightDataTy& fdMap, const double tsCutOff, const double adsbxTime, const positionTy& viewPos); /// Process v1 data void ProcessV1 (JSON_Object* pJAc, LTFlightData::FDKeyTy& fdKey, - mapLTFlightDataTy& fdMap, const double tsCutOff, const double adsbxTime, const positionTy& viewPos); diff --git a/Include/LTADSBHub.h b/Include/LTADSBHub.h index 0325cb56..c823d853 100644 --- a/Include/LTADSBHub.h +++ b/Include/LTADSBHub.h @@ -44,9 +44,6 @@ class ADSBHubConnection : public LTFlightDataChannel { protected: - // the map of flight data, where we deliver our data to - mapLTFlightDataTy& fdMap; - std::thread thrStream; ///< thread for the ADSBHub stream TCPConnection tcpStream; ///< TCP connection to data.adsbhub.org:5002 volatile bool bStopThr=false; ///< stop signal to the thread @@ -80,13 +77,13 @@ class ADSBHubConnection : public LTFlightDataChannel public: /// Constructor - ADSBHubConnection (mapLTFlightDataTy& _fdMap); + ADSBHubConnection (); /// Destructor cleans up ~ADSBHubConnection () override; /// Invokes APRS thread, or returns URL to fetch current data from live.glidernet.org std::string GetURL (const positionTy&) override { return ""; } /// @brief Processes the fetched data - bool ProcessFetchedData (mapLTFlightDataTy&) override { return true; }; + bool ProcessFetchedData () override { return true; }; std::string GetStatusText () const override; ///< return a human-readable staus bool FetchAllData(const positionTy& pos) override; void DoDisabledProcessing() override { StreamClose(); } diff --git a/Include/LTChannel.h b/Include/LTChannel.h index 14392fe6..45cfabaf 100644 --- a/Include/LTChannel.h +++ b/Include/LTChannel.h @@ -69,24 +69,25 @@ class LTChannel THR_ENDED, ///< Thread has ended, but is not yet joined } ThrStatusTy; /// Thread's state - volatile ThrStatusTy bThrStatus = THR_NONE; + volatile ThrStatusTy eThrStatus = THR_NONE; private: bool bValid = true; ///< valid connection? int errCnt = 0; ///< number of errors tolerated public: + /// Constructor just sets initial values LTChannel (dataRefsLT ch, LTChannelType t, const char* chName) : pszChName(chName), eType(t), channel(ch) {} - virtual ~LTChannel () {} + virtual ~LTChannel (); ///< Destructor makes sure the thread is stopped - void Start (); ///< Start the channel, typically starts a separate thread - void Stop (bool bWaitJoin); ///< Stop the channel + virtual void Start (); ///< Start the channel, typically starts a separate thread + virtual void Stop (bool bWaitJoin); ///< Stop the channel bool isRunning () const ///< Is channel's thread running? { return thr.joinable(); } virtual bool shallRun () const; ///< all conditions met to continue the thread loop? /// Thread has ended but still needs to be joined - bool hasEnded () const { return bThrStatus == THR_ENDED; } + bool hasEnded () const { return eThrStatus == THR_ENDED; } private: void _Main(); ///< Thread main function, will call virtual Main() @@ -118,7 +119,7 @@ class LTChannel public: virtual bool FetchAllData (const positionTy& pos) = 0; - virtual bool ProcessFetchedData (mapLTFlightDataTy& fd) = 0; + virtual bool ProcessFetchedData () = 0; // TODO: Remove Disabled Processing, should be done during end of main thread / do something while disabled? virtual void DoDisabledProcessing () {} // TODO: Remove Close / (temporarily) close a connection, (re)open is with first call to FetchAll/ProcessFetchedData diff --git a/Include/LTFSCharter.h b/Include/LTFSCharter.h index 2c583c08..613ff98d 100644 --- a/Include/LTFSCharter.h +++ b/Include/LTFSCharter.h @@ -108,7 +108,7 @@ class FSCConnection : public LTFlightDataChannel void CleanupCurl () override; std::string GetURL (const positionTy& pos) override; void ComputeBody (const positionTy& pos) override; - bool ProcessFetchedData (mapLTFlightDataTy& fdMap) override; + bool ProcessFetchedData () override; bool FetchAllData(const positionTy& pos) override { return LTOnlineChannel::FetchAllData(pos); } // // shall data of this channel be subject to LTFlightData::DataSmoothing? // virtual bool DoDataSmoothing (double& gndRange, double& airbRange) const diff --git a/Include/LTForeFlight.h b/Include/LTForeFlight.h index 0c666e50..8ea15268 100644 --- a/Include/LTForeFlight.h +++ b/Include/LTForeFlight.h @@ -53,8 +53,6 @@ constexpr std::chrono::milliseconds FF_INTVL = std::chrono::milliseconds( class ForeFlightSender : public LTOutputChannel { protected: - // the map of flight data, data that we send out to ForeFlight - mapLTFlightDataTy& fdMap; // thread std::thread thrUdpSender; volatile bool bStopUdpSender = true; // tells thread to stop @@ -71,14 +69,14 @@ class ForeFlightSender : public LTOutputChannel std::chrono::steady_clock::time_point lastStartOfTraffic; public: - ForeFlightSender (mapLTFlightDataTy& _fdMap); - virtual ~ForeFlightSender (); + ForeFlightSender (); + ~ForeFlightSender () override; std::string GetURL (const positionTy&) override { return ""; } // don't need URL, no request/reply // interface called from LTChannel bool FetchAllData(const positionTy& pos) override; - bool ProcessFetchedData (mapLTFlightDataTy&) override { return true; } + bool ProcessFetchedData () override { return true; } void DoDisabledProcessing() override; void Close () override; diff --git a/Include/LTOpenGlider.h b/Include/LTOpenGlider.h index 86b708f1..4268b6fd 100644 --- a/Include/LTOpenGlider.h +++ b/Include/LTOpenGlider.h @@ -156,7 +156,7 @@ class OpenGliderConnection : public LTFlightDataChannel /// Invokes APRS thread, or returns URL to fetch current data from live.glidernet.org std::string GetURL (const positionTy& pos) override; /// @brief Processes the fetched data - bool ProcessFetchedData (mapLTFlightDataTy& fdMap) override; + bool ProcessFetchedData () override; std::string GetStatusText () const override; ///< return a human-readable staus bool FetchAllData(const positionTy& pos) override { return LTOnlineChannel::FetchAllData(pos); } void DoDisabledProcessing() override { Cleanup(); } diff --git a/Include/LTOpenSky.h b/Include/LTOpenSky.h index 9cad7b95..f2a2715a 100644 --- a/Include/LTOpenSky.h +++ b/Include/LTOpenSky.h @@ -63,7 +63,7 @@ class OpenSkyConnection : public LTFlightDataChannel public: OpenSkyConnection (); std::string GetURL (const positionTy& pos) override; - bool ProcessFetchedData (mapLTFlightDataTy& fdMap) override; + bool ProcessFetchedData () override; std::string GetStatusText () const override; ///< return a human-readable staus // // shall data of this channel be subject to LTFlightData::DataSmoothing? // bool DoDataSmoothing (double& gndRange, double& airbRange) const override @@ -117,7 +117,7 @@ class OpenSkyAcMasterdata : public LTACMasterdataChannel OpenSkyAcMasterdata (); ///< Constructor sets channel, name, and URLs public: std::string GetURL (const positionTy& pos) override; ///< Returns the master data or route URL to query - bool ProcessFetchedData (mapLTFlightDataTy& fdMap) override; ///< Process received master or route data + bool ProcessFetchedData () override; ///< Process received master or route data protected: void Main () override; ///< virtual thread main function bool ProcessMasterData (JSON_Object* pJAc); ///< Process received aircraft master data diff --git a/Include/LTRealTraffic.h b/Include/LTRealTraffic.h index 9aa28968..682d3d97 100644 --- a/Include/LTRealTraffic.h +++ b/Include/LTRealTraffic.h @@ -166,26 +166,21 @@ class RealTrafficConnection : public LTFlightDataChannel std::recursive_mutex rtMutex; // RealTraffic connection status volatile rtStatusTy status = RT_STATUS_NONE; - // the map of flight data, where we deliver our data to - mapLTFlightDataTy& fdMap; - // tcp connection to send current position - std::thread thrTcpServer; - TCPConnection tcpPosSender; - volatile bool bStopTcp = false; - volatile bool thrTcpRunning = false; + // TCP connection to send current position + std::thread thrTcpServer; ///< thread of the TCP listening thread (short-lived) + TCPConnection tcpPosSender; ///< TCP connection to communicate with RealTraffic + /// Status of the separate TCP listening thread + volatile ThrStatusTy eTcpThrStatus = THR_NONE; // current position which serves as center positionTy posCamera; - // udp thread and its sockets - std::thread thrUdpListener; + // UDP sockets UDPReceiver udpTrafficData; #if APL == 1 || LIN == 1 // the self-pipe to shut down the UDP listener thread gracefully SOCKET udpPipe[2] = { INVALID_SOCKET, INVALID_SOCKET }; #endif - volatile bool bStopUdp = false; - volatile bool thrUdpRunning = false; double lastReceivedTime = 0.0; // copy of simTime // map of last received datagrams for duplicate detection std::map mapDatagrams; @@ -195,16 +190,15 @@ class RealTrafficConnection : public LTFlightDataChannel double tsAdjust = 0.0; public: - RealTrafficConnection (mapLTFlightDataTy& _fdMap); - virtual ~RealTrafficConnection (); + RealTrafficConnection (); + + void Stop (bool bWaitJoin) override; ///< Stop the UDP listener gracefully std::string GetURL (const positionTy&) override { return ""; } // don't need URL, no request/reply // interface called from LTChannel - bool FetchAllData(const positionTy& pos) override; - bool ProcessFetchedData (mapLTFlightDataTy&) override { return true; } - void DoDisabledProcessing() override; - void Close () override; + bool FetchAllData(const positionTy& /*pos*/) override { return false; } + bool ProcessFetchedData () override { return false; } // SetValid also sets internal status void SetValid (bool _valid, bool bMsg = true) override; // // shall data of this channel be subject to LTFlightData::DataSmoothing? @@ -229,14 +223,10 @@ class RealTrafficConnection : public LTFlightDataChannel protected: void Main () override; ///< virtual thread main function - // Start/Stop - bool StartConnections (); - bool StopConnections (); - // MARK: TCP - void tcpConnection (); - static void tcpConnectionS (RealTrafficConnection* me) { me->tcpConnection();} - bool StopTcpConnection (); + void tcpConnection (); ///< main function of TCP listening thread, lives only until TCP connection established + void StartTcpConnection (); ///< start the TCP listening thread + void StopTcpConnection (); ///< stop the TCP listening thread void SendMsg (const char* msg); ///< Send and log a message to RealTraffic void SendTime (long long ts); ///< Send a timestamp to RealTraffic @@ -244,13 +234,7 @@ class RealTrafficConnection : public LTFlightDataChannel void SendPos (const positionTy& pos, double speed_m); ///< Send position/speed info for own ship to RealTraffic void SendUsersPlanePos(); ///< Send user's plane's position/speed to RealTraffic - // MARK: UDP - // UDP Listen: the main function for receiving UDP broadcasts - void udpListen (); - // just a wrapper to call a member function - static void udpListenS (RealTrafficConnection* me) { me->udpListen();} - bool StopUdpConnection (); - + // MARK: Data Processing // Process received datagrams bool ProcessRecvedTrafficData (const char* traffic); bool ProcessRTTFC (LTFlightData::FDKeyTy& fdKey, const std::vector& tfc); ///< Process a RTTFC type message diff --git a/LiveTraffic.xcodeproj/xcuserdata/birger.xcuserdatad/xcdebugger/Breakpoints_v2.xcbkptlist b/LiveTraffic.xcodeproj/xcuserdata/birger.xcuserdatad/xcdebugger/Breakpoints_v2.xcbkptlist index c58cb623..e96eec2e 100644 --- a/LiveTraffic.xcodeproj/xcuserdata/birger.xcuserdatad/xcdebugger/Breakpoints_v2.xcbkptlist +++ b/LiveTraffic.xcodeproj/xcuserdata/birger.xcuserdatad/xcdebugger/Breakpoints_v2.xcbkptlist @@ -3,70 +3,4 @@ uuid = "C8095CDD-1C39-4D01-99B0-6AD4723390BD" type = "1" version = "2.0"> - - - - - - - - - - - - - - - - - - diff --git a/Src/LTADSBEx.cpp b/Src/LTADSBEx.cpp index bb23870d..ceb0b95b 100644 --- a/Src/LTADSBEx.cpp +++ b/Src/LTADSBEx.cpp @@ -57,7 +57,7 @@ std::string ADSBExchangeConnection::GetURL (const positionTy& pos) } // update shared flight data structures with received flight data -bool ADSBExchangeConnection::ProcessFetchedData (mapLTFlightDataTy& fdMap) +bool ADSBExchangeConnection::ProcessFetchedData () { // some things depend on the key type const char* sERR = keyTy == ADSBEX_KEY_EXCHANGE ? ADSBEX_ERR : ADSBEX_RAPID_ERR; @@ -163,9 +163,9 @@ bool ADSBExchangeConnection::ProcessFetchedData (mapLTFlightDataTy& fdMap) // Process the details, depends on version detected try { if (ver == 2) - ProcessV2(pJAc, fdKey, fdMap, tsCutOff, adsbxTime, viewPos); + ProcessV2(pJAc, fdKey, tsCutOff, adsbxTime, viewPos); else if (ver == 1) - ProcessV1(pJAc, fdKey, fdMap, tsCutOff, adsbxTime, viewPos); + ProcessV1(pJAc, fdKey, tsCutOff, adsbxTime, viewPos); } catch(const std::system_error& e) { LOG_MSG(logERR, ERR_LOCK_ERROR, "mapFd", e.what()); } catch(...) { @@ -184,7 +184,6 @@ bool ADSBExchangeConnection::ProcessFetchedData (mapLTFlightDataTy& fdMap) // Process v2 data void ADSBExchangeConnection::ProcessV2 (JSON_Object* pJAc, LTFlightData::FDKeyTy& fdKey, - mapLTFlightDataTy& fdMap, const double tsCutOff, const double adsbxTime, const positionTy& viewPos) @@ -282,7 +281,7 @@ void ADSBExchangeConnection::ProcessV2 (JSON_Object* pJAc, // get the fd object from the map, key is the transpIcao // this fetches an existing or, if not existing, creates a new one - LTFlightData& fd = fdMap[fdKey]; + LTFlightData& fd = mapFd[fdKey]; // also get the data access lock once and for all // so following fetch/update calls only make quick recursive calls @@ -333,7 +332,6 @@ void ADSBExchangeConnection::ProcessV2 (JSON_Object* pJAc, // Process v1 data void ADSBExchangeConnection::ProcessV1 (JSON_Object* pJAc, LTFlightData::FDKeyTy& fdKey, - mapLTFlightDataTy& fdMap, const double tsCutOff, const double /*adsbxTime*/, const positionTy& viewPos) @@ -368,7 +366,7 @@ void ADSBExchangeConnection::ProcessV1 (JSON_Object* pJAc, // get the fd object from the map, key is the transpIcao // this fetches an existing or, if not existing, creates a new one - LTFlightData& fd = fdMap[fdKey]; + LTFlightData& fd = mapFd[fdKey]; // also get the data access lock once and for all // so following fetch/update calls only make quick recursive calls @@ -496,7 +494,7 @@ void ADSBExchangeConnection::Main () tNextWakeup += std::chrono::seconds(dataRefs.GetFdRefreshIntvl()); // if enabled fetch data and process it - if (FetchAllData(pos) && ProcessFetchedData(mapFd)) + if (FetchAllData(pos) && ProcessFetchedData()) // reduce error count if processed successfully // as a chance to appear OK in the long run DecErrCnt(); diff --git a/Src/LTADSBHub.cpp b/Src/LTADSBHub.cpp index 8bcaa446..0122949d 100644 --- a/Src/LTADSBHub.cpp +++ b/Src/LTADSBHub.cpp @@ -46,9 +46,8 @@ constexpr int ADSBHUB_TIMEOUT_S = 60; ///< ADSBHub sends s // // Constructor -ADSBHubConnection::ADSBHubConnection (mapLTFlightDataTy& _fdMap) : -LTFlightDataChannel(DR_CHANNEL_ADSB_HUB, ADSBHUB_NAME), -fdMap(_fdMap) +ADSBHubConnection::ADSBHubConnection () : +LTFlightDataChannel(DR_CHANNEL_ADSB_HUB, ADSBHUB_NAME) { // purely informational urlName = ADSBHUB_CHECK_NAME; @@ -621,7 +620,7 @@ void ADSBHubConnection::ProcessPlaneData () // get the fd object from the map, key is the transpIcao // this fetches an existing or, if not existing, creates a new one - LTFlightData& fd = fdMap[fdKey]; + LTFlightData& fd = mapFd[fdKey]; // also get the data access lock once and for all // so following fetch/update calls only make quick recursive calls diff --git a/Src/LTChannel.cpp b/Src/LTChannel.cpp index 8275ce6e..d72c69dc 100644 --- a/Src/LTChannel.cpp +++ b/Src/LTChannel.cpp @@ -127,11 +127,19 @@ double jag_n_nan (const JSON_Array *array, size_t idx) //MARK: LTChannel // + +// Destructor makes sure the thread is stopped +LTChannel::~LTChannel () +{ + Stop(true); +} + + // Start the channel, typically starts a separate thread void LTChannel::Start () { if (!isRunning()) { - bThrStatus = THR_STARTING; + eThrStatus = THR_STARTING; thr = std::thread(<Channel::_Main, this); } } @@ -140,12 +148,12 @@ void LTChannel::Start () void LTChannel::Stop (bool bWaitJoin) { if (isRunning()) { - if (bThrStatus < THR_STOP) - bThrStatus = THR_STOP; // indicate to the thread that it has to end itself + if (eThrStatus < THR_STOP) + eThrStatus = THR_STOP; // indicate to the thread that it has to end itself if (bWaitJoin) { thr.join(); // wait for the thread to actually end thr = std::thread(); - bThrStatus = THR_NONE; + eThrStatus = THR_NONE; } } } @@ -156,7 +164,7 @@ bool LTChannel::shallRun () const { return !bFDMainStop // stop flag for all LT processing - && bThrStatus <= THR_RUNNING // thread is not signalled to stop + && eThrStatus <= THR_RUNNING // thread is not signalled to stop && IsValid() // channel valid? && dataRefs.IsChannelEnabled(channel); // channel enabled? } @@ -165,11 +173,11 @@ bool LTChannel::shallRun () const // Thread main function, just calls virtual Main() void LTChannel::_Main() { - bThrStatus = THR_RUNNING; + eThrStatus = THR_RUNNING; LOG_MSG(logDEBUG, "%s: Thread starts", pszChName); Main(); LOG_MSG(logDEBUG, "%s: Thread ends", pszChName); - bThrStatus = THR_ENDED; + eThrStatus = THR_ENDED; } @@ -736,16 +744,16 @@ bool LTFlightDataEnable() listFDC.clear(); // load live feed readers (in order of priority) - listFDC.emplace_back(new RealTrafficConnection(mapFd)); + listFDC.emplace_back(new RealTrafficConnection()); listFDC.emplace_back(new OpenSkyConnection); - listFDC.emplace_back(new ADSBHubConnection(mapFd)); + listFDC.emplace_back(new ADSBHubConnection()); listFDC.emplace_back(new ADSBExchangeConnection); listFDC.emplace_back(new OpenGliderConnection); listFDC.emplace_back(new FSCConnection); // load online master data connections listFDC.emplace_back(new OpenSkyAcMasterdata); // load other channels - listFDC.emplace_back(new ForeFlightSender(mapFd)); + listFDC.emplace_back(new ForeFlightSender()); // Success only if there are still connections left return listFDC.size() > 0; diff --git a/Src/LTFSCharter.cpp b/Src/LTFSCharter.cpp index c5ace238..5f159f3b 100644 --- a/Src/LTFSCharter.cpp +++ b/Src/LTFSCharter.cpp @@ -222,7 +222,7 @@ void FSCConnection::ComputeBody (const positionTy& pos) } // update shared flight data structures with received flight data -bool FSCConnection::ProcessFetchedData (mapLTFlightDataTy& fdMap) +bool FSCConnection::ProcessFetchedData () { // any a/c filter defined for debugging purposes? std::string acFilter ( dataRefs.GetDebugAcFilter() ); @@ -378,7 +378,7 @@ bool FSCConnection::ProcessFetchedData (mapLTFlightDataTy& fdMap) // get the fd object from the map, key is the transpIcao // this fetches an existing or, if not existing, creates a new one - LTFlightData& fd = fdMap[fdKey]; + LTFlightData& fd = mapFd[fdKey]; // also get the data access lock once and for all // so following fetch/update calls only make quick recursive calls diff --git a/Src/LTForeFlight.cpp b/Src/LTForeFlight.cpp index 1162ae29..e5d65e72 100644 --- a/Src/LTForeFlight.cpp +++ b/Src/LTForeFlight.cpp @@ -28,9 +28,8 @@ // // Constructor doesn't do much -ForeFlightSender::ForeFlightSender (mapLTFlightDataTy& _fdMap) : -LTOutputChannel(DR_CHANNEL_FORE_FLIGHT_SENDER, FOREFLIGHT_NAME), -fdMap(_fdMap) +ForeFlightSender::ForeFlightSender () : +LTOutputChannel(DR_CHANNEL_FORE_FLIGHT_SENDER, FOREFLIGHT_NAME) { // purely informational urlName = FF_CHECK_NAME; @@ -201,19 +200,19 @@ void ForeFlightSender::udpSend() // from here on access to fdMap guarded by a mutex std::unique_lock lock (mapFdMutex, std::try_to_lock); if (lock) { - if (!fdMap.empty()) { + if (!mapFd.empty()) { // just starting with a new round? if (lastKey == LTFlightData::FDKeyTy()) lastStartOfTraffic = now; // next key to send? (shall have an actual a/c) mapLTFlightDataTy::const_iterator mapIter; - for (mapIter = fdMap.upper_bound(lastKey); - mapIter != fdMap.cend() && !mapIter->second.hasAc(); + for (mapIter = mapFd.upper_bound(lastKey); + mapIter != mapFd.cend() && !mapIter->second.hasAc(); mapIter++); // something left? - if (mapIter != fdMap.cend()) { + if (mapIter != mapFd.cend()) { // send that plane's info SendTraffic(mapIter->second); // wake up soon again for the rest @@ -268,7 +267,7 @@ void ForeFlightSender::SendAllTraffic () std::lock_guard mapFdLock (mapFdMutex); // loop over all flight data objects - for (const std::pair& fdPair: fdMap) + for (const std::pair& fdPair: mapFd) { // get the fd object from the map, key is the transpIcao // this fetches an existing or, if not existing, creates a new one diff --git a/Src/LTOpenGlider.cpp b/Src/LTOpenGlider.cpp index cdee8ced..281453da 100644 --- a/Src/LTOpenGlider.cpp +++ b/Src/LTOpenGlider.cpp @@ -168,7 +168,7 @@ std::string OpenGliderConnection::GetURL (const positionTy& pos) /// @endcode /// We are not doing full XML parsing, but just search for `` -bool OpenGliderConnection::ProcessFetchedData (mapLTFlightDataTy& fdMap) +bool OpenGliderConnection::ProcessFetchedData () { char buf[100]; @@ -231,7 +231,7 @@ bool OpenGliderConnection::ProcessFetchedData (mapLTFlightDataTy& fdMap) // get the fd object from the map // this fetches an existing or, if not existing, creates a new one - LTFlightData& fd = fdMap[fdKey]; + LTFlightData& fd = mapFd[fdKey]; // also get the data access lock once and for all // so following fetch/update calls only make quick recursive calls diff --git a/Src/LTOpenSky.cpp b/Src/LTOpenSky.cpp index 064c0145..4244f788 100644 --- a/Src/LTOpenSky.cpp +++ b/Src/LTOpenSky.cpp @@ -136,7 +136,7 @@ std::string OpenSkyConnection::GetURL (const positionTy& pos) // update shared flight data structures with received flight data // "a4d85d","UJC11 ","United States",1657226901,1657226901,-90.2035,38.8157,2758.44,false,128.1,269.54,-6.5,null,2895.6,"4102",false,0 -bool OpenSkyConnection::ProcessFetchedData (mapLTFlightDataTy& fdMap) +bool OpenSkyConnection::ProcessFetchedData () { char buf[100]; @@ -248,7 +248,7 @@ bool OpenSkyConnection::ProcessFetchedData (mapLTFlightDataTy& fdMap) // get the fd object from the map, key is the transpIcao // this fetches an existing or, if not existing, creates a new one - LTFlightData& fd = fdMap[fdKey]; + LTFlightData& fd = mapFd[fdKey]; // also get the data access lock once and for all // so following fetch/update calls only make quick recursive calls @@ -370,7 +370,7 @@ void OpenSkyAcMasterdata::Main () // if there is something to request, fetch the data and process it if (requ && - FetchAllData(positionTy()) && ProcessFetchedData(mapFd)) { + FetchAllData(positionTy()) && ProcessFetchedData()) { // reduce error count if processed successfully // as a chance to appear OK in the long run DecErrCnt(); @@ -410,7 +410,7 @@ std::string OpenSkyAcMasterdata::GetURL (const positionTy& /*pos*/) } // process each master data line read from OpenSky -bool OpenSkyAcMasterdata::ProcessFetchedData (mapLTFlightDataTy& /*fdMap*/) +bool OpenSkyAcMasterdata::ProcessFetchedData () { // If the requested data wasn't found we shall never try this request again if (httpResponse == HTTP_NOT_FOUND || diff --git a/Src/LTRealTraffic.cpp b/Src/LTRealTraffic.cpp index c1600180..d7afdc81 100644 --- a/Src/LTRealTraffic.cpp +++ b/Src/LTRealTraffic.cpp @@ -35,9 +35,8 @@ // // Constructor doesn't do much -RealTrafficConnection::RealTrafficConnection (mapLTFlightDataTy& _fdMap) : -LTFlightDataChannel(DR_CHANNEL_REAL_TRAFFIC_ONLINE, REALTRAFFIC_NAME), -fdMap(_fdMap) +RealTrafficConnection::RealTrafficConnection () : +LTFlightDataChannel(DR_CHANNEL_REAL_TRAFFIC_ONLINE, REALTRAFFIC_NAME) { //purely information urlName = RT_CHECK_NAME; @@ -45,64 +44,33 @@ fdMap(_fdMap) urlPopup = RT_CHECK_POPUP; } -// Destructor makes sure we are cleaned up -RealTrafficConnection::~RealTrafficConnection () +// Stop the UDP listener gracefully +void RealTrafficConnection::Stop (bool bWaitJoin) { - StopConnections(); -} + if (isRunning()) { + if (eThrStatus < THR_STOP) + eThrStatus = THR_STOP; // indicate to the thread that it has to end itself -// Does not actually fetch data (the UDP thread does that) but -// 1. Starts the connections -// 2. updates the RealTraffic local server with out current position and time -// 3. cleans up map of datagrams for duplicate check -bool RealTrafficConnection::FetchAllData(const positionTy& pos) -{ - // store camera position for later calculations - posCamera = pos; - - // if we are invalid or disabled we should shut down - if (!IsValid() || !IsEnabled()) { - return StopConnections(); - } - - // need to start up? - if (status != RT_STATUS_CONNECTED_FULL && - !StartConnections()) - return false; - - // do anything only in a normal status - if (IsConnected()) - { - // Send current position and time - SendXPSimTime(); - SendUsersPlanePos(); - - // cleanup map of last datagrams - CleanupMapDatagrams(); - - // map is empty? That only happens if we don't receive data - // continuously - if (mapDatagrams.empty()) - // se Udp status to unavailable, but keep listener running - SetStatusUdp(false, false); +#if APL == 1 || LIN == 1 + // Mac/Lin: Try writing something to the self-pipe to stop gracefully + if (udpPipe[1] == INVALID_SOCKET || + write(udpPipe[1], "STOP", 4) < 0) + { + // if the self-pipe didn't work: +#endif + // close all connections, this will also break out of all + // blocking calls for receiving message and hence terminate the threads + udpTrafficData.Close(); +#if APL == 1 || LIN == 1 + } +#endif } - return true; -} - -// if channel is disabled make sure all connections are closed -void RealTrafficConnection::DoDisabledProcessing() -{ - StopConnections(); + // Parent class processing: Wait for the thread to join + LTFlightDataChannel::Stop(bWaitJoin); } -// closes all connections -void RealTrafficConnection::Close() -{ - StopConnections(); -} - // sets the status and updates global text to show elsewhere void RealTrafficConnection::SetStatus(rtStatusTy s) { @@ -195,7 +163,7 @@ void RealTrafficConnection::SetStatusUdp(bool bEnable, bool _bStopUdp) } else { // Disable - also disconnect, otherwise restart wouldn't work if (_bStopUdp) - StopUdpConnection(); + eThrStatus = THR_STOP; // set status switch (status) { @@ -289,87 +257,168 @@ void RealTrafficConnection::Main () { // This is a communication thread's main function, set thread's name and C locale ThreadSettings TS ("LT_RT", LC_ALL_MASK); -} - - -// starts all networking threads -bool RealTrafficConnection::StartConnections() -{ - // don't start if we shall stop - if (status == RT_STATUS_STOPPING) - return false; - // set startup status - if (status == RT_STATUS_NONE) + // Top-level exception handling + try { + // set startup status SetStatus(RT_STATUS_STARTING); - - // *** TCP server for RealTraffic to connect to *** - if (!thrTcpRunning && !tcpPosSender.IsConnected()) { - if (thrTcpServer.joinable()) - thrTcpServer.join(); - thrTcpRunning = true; - thrTcpServer = std::thread (tcpConnectionS, this); + + // Start the TCP listening thread, that waits for an incoming TCP connection from the RealTraffic app + posCamera = dataRefs.GetViewPos(); + StartTcpConnection(); + // Next time we should send a position update + std::chrono::time_point tNextPos = + std::chrono::steady_clock::now() + std::chrono::seconds(dataRefs.GetFdRefreshIntvl()); + + // --- UDP Listener --- + + // Open the UDP port + udpTrafficData.Open (RT_LOCALHOST, + DataRefs::GetCfgInt(DR_CFG_RT_TRAFFIC_PORT), + RT_NET_BUF_SIZE); + int maxSock = (int)udpTrafficData.getSocket() + 1; +#if APL == 1 || LIN == 1 + // the self-pipe to shut down the UDP socket gracefully + if (pipe(udpPipe) < 0) + throw NetRuntimeError("Couldn't create pipe"); + fcntl(udpPipe[0], F_SETFL, O_NONBLOCK); + maxSock = std::max(maxSock, udpPipe[0]+1); +#endif + + // --- Main Loop --- + + while (shallRun() && udpTrafficData.isOpen() && IsConnecting()) + { + // wait for a UDP datagram on either socket (traffic, weather) + fd_set sRead; + FD_ZERO(&sRead); + FD_SET(udpTrafficData.getSocket(), &sRead); // check our sockets +#if APL == 1 || LIN == 1 + FD_SET(udpPipe[0], &sRead); +#endif + // We specify a timeout, which will really rarely trigger, + // but this way we make sure that we send our position every once in a while even with no traffic around + struct timeval timeout = { dataRefs.GetFdRefreshIntvl(), 0 }; + int retval = select(maxSock, &sRead, NULL, NULL, &timeout); + + // short-cut if we are to shut down (return from 'select' due to closed socket) + if (!shallRun()) break; + + // select call failed??? + if(retval == -1) + throw NetRuntimeError("'select' failed"); + + // select successful - traffic data + if (retval > 0 && FD_ISSET(udpTrafficData.getSocket(), &sRead)) + { + // read UDP datagram + long rcvdBytes = udpTrafficData.recv(); + + // received something? + if (rcvdBytes > 0) + { + // yea, we received something! + SetStatusUdp(true, false); + + // have it processed + ProcessRecvedTrafficData(udpTrafficData.getBuf()); + } + else + retval = -1; + } + + // handling of errors, both from select and from recv + if (retval < 0 && (errno != EAGAIN && errno != EWOULDBLOCK)) { + // not just a normal timeout? + char sErr[SERR_LEN]; + strerror_s(sErr, sizeof(sErr), errno); + LOG_MSG(logERR, ERR_UDP_RCVR_RCVR, ChName(), + sErr); + // increase error count...bail out if too bad + if (!IncErrCnt()) { + SetStatusUdp(false, true); + break; + } + } + + // --- Maintenance Activities --- + + // If we are connected via TCP to RealTraffic + if (tcpPosSender.IsConnected()) { + // Send current position and time every once in a while + if (std::chrono::steady_clock::now() > tNextPos) { + SendXPSimTime(); + SendUsersPlanePos(); + tNextPos = std::chrono::steady_clock::now() + std::chrono::seconds(dataRefs.GetFdRefreshIntvl()); + } + } + // Not connected by TCP, are we still listening and waiting? + else if (eTcpThrStatus != THR_RUNNING) { + // Not running...so make sure it restarts for us to have a chance to get a connection + StopTcpConnection(); + StartTcpConnection(); + } + + // cleanup map of last datagrams + CleanupMapDatagrams(); + // map is empty? That only happens if we don't receive data continuously + if (mapDatagrams.empty()) + // set UDP status to unavailable, but keep listener running + SetStatusUdp(false, false); + } + + } catch (const std::exception& e) { + LOG_MSG(logERR, ERR_TOP_LEVEL_EXCEPTION, e.what()); + IncErrCnt(); + } catch (...) { + LOG_MSG(logERR, ERR_TOP_LEVEL_EXCEPTION, "(unknown type)"); + IncErrCnt(); } - // *** UDP data listener *** - if (!thrUdpRunning) { - if (thrUdpListener.joinable()) - thrUdpListener.join(); - thrUdpRunning = true; - thrUdpListener = std::thread (udpListenS, this); + // Let's make absolutely sure that any connection is really closed + // once we return from this thread + if (udpTrafficData.isOpen()) + udpTrafficData.Close(); +#if APL == 1 || LIN == 1 + // close the self-pipe sockets + for (SOCKET &s: udpPipe) { + if (s != INVALID_SOCKET) close(s); + s = INVALID_SOCKET; } - - // looks ok - return true; -} - -// stop the UDP listening thread -bool RealTrafficConnection::StopConnections() -{ - // not running? - if (status == RT_STATUS_NONE) - return true; - - // tell the threads to stop now - SetStatus(RT_STATUS_STOPPING); +#endif - // stop both TCP and UDP + // Make sure the TCP listener is down StopTcpConnection(); - StopUdpConnection(); // Clear the list of historic time stamp differences dequeTS.clear(); // stopped SetStatus(RT_STATUS_NONE); - return true; -} +} // // MARK: TCP Connection // +// main function of TCP listening thread, lives only until TCP connection established void RealTrafficConnection::tcpConnection () { // This is a communication thread's main function, set thread's name and C locale ThreadSettings TS ("LT_RT_TCP", LC_ALL_MASK); - - // sanity check: return in case of wrong status - if (!IsConnecting()) { - thrTcpRunning = false; - return; - } + eTcpThrStatus = THR_RUNNING; // port to use is configurable int tcpPort = DataRefs::GetCfgInt(DR_CFG_RT_LISTEN_PORT); try { - bStopTcp = false; tcpPosSender.Open (RT_LOCALHOST, tcpPort, RT_NET_BUF_SIZE); + LOG_MSG(logDEBUG, "RealTraffic: Listening on port %d for TCP connection by RealTraffic App", tcpPort); if (tcpPosSender.listenAccept()) { // so we did accept a connection! + LOG_MSG(logDEBUG, "RealTraffic: Accepted TCP connection from RealTraffic App"); SetStatusTcp(true, false); // send our simulated time and first position SendXPSimTime(); @@ -378,7 +427,7 @@ void RealTrafficConnection::tcpConnection () else { // short-cut if we are to shut down (return from 'select' due to closed socket) - if (!bStopTcp) { + if (eTcpThrStatus < THR_STOP) { // not forced to shut down...report other problem SHOW_MSG(logERR,ERR_RT_CANTLISTEN); SetStatusTcp(false, true); @@ -397,17 +446,28 @@ void RealTrafficConnection::tcpConnection () // We make sure that, once leaving this thread, there is no // open listener (there might be a connected socket, though) #if IBM - if (!bStopTcp) // already closed if stop flag set, avoid rare crashes if called in parallel + if (eTcpThrStatus < THR_STOP) // already closed if stop flag set, avoid rare crashes if called in parallel #endif tcpPosSender.CloseListenerOnly(); - thrTcpRunning = false; + eTcpThrStatus = THR_ENDED; } -bool RealTrafficConnection::StopTcpConnection () + +// start the TCP listening thread +void RealTrafficConnection::StartTcpConnection () +{ + if (!thrTcpServer.joinable()) { + eTcpThrStatus = THR_STARTING; + thrTcpServer = std::thread (&RealTrafficConnection::tcpConnection, this); + } +} + +// stop the TCP listening thread +void RealTrafficConnection::StopTcpConnection () { // close all connections, this will also break out of all // blocking calls for receiving message and hence terminate the threads - bStopTcp = true; + eTcpThrStatus = THR_STOP; tcpPosSender.Close(); // wait for threads to finish (if I'm not myself this thread...) @@ -415,9 +475,8 @@ bool RealTrafficConnection::StopTcpConnection () if (thrTcpServer.joinable()) thrTcpServer.join(); thrTcpServer = std::thread(); + eTcpThrStatus = THR_NONE; } - - return true; } @@ -502,154 +561,6 @@ void RealTrafficConnection::SendUsersPlanePos() } - -// -// MARK: UDP Listen Thread - Traffic -// - -// runs in a separate thread, listens for UDP traffic and -// forwards that to the flight data -void RealTrafficConnection::udpListen () -{ - // This is a communication thread's main function, set thread's name and C locale - ThreadSettings TS ("LT_RT_UDP", LC_ALL_MASK); - - // sanity check: return in case of wrong status - if (!IsConnecting()) { - thrUdpRunning = false; - return; - } - - int port = 0; - try { - // Open the UDP port - bStopUdp = false; - udpTrafficData.Open (RT_LOCALHOST, - port = DataRefs::GetCfgInt(DR_CFG_RT_TRAFFIC_PORT), - RT_NET_BUF_SIZE); - int maxSock = (int)udpTrafficData.getSocket() + 1; -#if APL == 1 || LIN == 1 - // the self-pipe to shut down the UDP socket gracefully - if (pipe(udpPipe) < 0) - throw NetRuntimeError("Couldn't create pipe"); - fcntl(udpPipe[0], F_SETFL, O_NONBLOCK); - maxSock = std::max(maxSock, udpPipe[0]+1); -#endif - - // return from the thread when requested - // (not checking for weather socker...not essential) - while (udpTrafficData.isOpen() && IsConnecting() && !bStopUdp) - { - // wait for a UDP datagram on either socket (traffic, weather) - fd_set sRead; - FD_ZERO(&sRead); - FD_SET(udpTrafficData.getSocket(), &sRead); // check our sockets -#if APL == 1 || LIN == 1 - FD_SET(udpPipe[0], &sRead); -#endif - int retval = select(maxSock, &sRead, NULL, NULL, NULL); - - // short-cut if we are to shut down (return from 'select' due to closed socket) - if (bStopUdp) - break; - - // select call failed??? - if(retval == -1) - throw NetRuntimeError("'select' failed"); - - // select successful - traffic data - if (retval > 0 && FD_ISSET(udpTrafficData.getSocket(), &sRead)) - { - // read UDP datagram - long rcvdBytes = udpTrafficData.recv(); - - // received something? - if (rcvdBytes > 0) - { - // yea, we received something! - SetStatusUdp(true, false); - - // have it processed - ProcessRecvedTrafficData(udpTrafficData.getBuf()); - } - else - retval = -1; - } - - // short-cut if we are to shut down - if (bStopUdp) - break; - - // handling of errors, both from select and from recv - if (retval < 0 && (errno != EAGAIN && errno != EWOULDBLOCK)) { - // not just a normal timeout? - char sErr[SERR_LEN]; - strerror_s(sErr, sizeof(sErr), errno); - LOG_MSG(logERR, ERR_UDP_RCVR_RCVR, ChName(), - sErr); - // increase error count...bail out if too bad - if (!IncErrCnt()) { - SetStatusUdp(false, true); - break; - } - } - } - } - catch (std::runtime_error& e) { - // exception...can only really happen in UDPReceiver::Open - LOG_MSG(logERR, ERR_UDP_SOCKET_CREAT, ChName(), - RT_LOCALHOST, port, - e.what()); - // invalidate the channel - SetStatusUdp(false, true); - SetValid(false, true); - } - - // Let's make absolutely sure that any connection is really closed - // once we return from this thread -#if APL == 1 || LIN == 1 - udpTrafficData.Close(); - // close the self-pipe sockets - for (SOCKET &s: udpPipe) { - if (s != INVALID_SOCKET) close(s); - s = INVALID_SOCKET; - } -#else - if (!bStopUdp) { // already closed if stop flag set, avoid rare crashes if called in parallel - udpTrafficData.Close(); - } -#endif - thrUdpRunning = false; -} - -bool RealTrafficConnection::StopUdpConnection () -{ - bStopUdp = true; -#if APL == 1 || LIN == 1 - // Mac/Lin: Try writing something to the self-pipe to stop gracefully - if (udpPipe[1] == INVALID_SOCKET || - write(udpPipe[1], "STOP", 4) < 0) - { - // if the self-pipe didn't work: -#endif - // close all connections, this will also break out of all - // blocking calls for receiving message and hence terminate the threads - udpTrafficData.Close(); -#if APL == 1 || LIN == 1 - } -#endif - - // wait for thread to finish if I'm not this thread myself - if (std::this_thread::get_id() != thrUdpListener.get_id()) { - if (thrUdpListener.joinable()) - thrUdpListener.join(); - thrUdpListener = std::thread(); - } - - return true; -} - - // MARK: Traffic // Process received traffic data. // We keep this a bit flexible to be able to work with different formats @@ -694,6 +605,10 @@ bool RealTrafficConnection::ProcessRecvedTrafficData (const char* traffic) if ((!acFilter.empty() && (fdKey != acFilter))) return true; // silently + // *** Replace 'null' *** + std::for_each(tfc.begin(), tfc.end(), + [](std::string& s){ if (s == "null") s.clear(); }); + // *** Process different formats **** // There are 3 formats we are _really_ interested in: RTTFC, AITFC, and XTRAFFICPSX @@ -801,7 +716,7 @@ bool RealTrafficConnection::ProcessRTTFC (LTFlightData::FDKeyTy& fdKey, // get the fd object from the map, key is the transpIcao // this fetches an existing or, if not existing, creates a new one - LTFlightData& fd = fdMap[fdKey]; + LTFlightData& fd = mapFd[fdKey]; // also get the data access lock once and for all // so following fetch/update calls only make quick recursive calls @@ -955,7 +870,7 @@ bool RealTrafficConnection::ProcessAITFC (LTFlightData::FDKeyTy& fdKey, // get the fd object from the map, key is the transpIcao // this fetches an existing or, if not existing, creates a new one - LTFlightData& fd = fdMap[fdKey]; + LTFlightData& fd = mapFd[fdKey]; // also get the data access lock once and for all // so following fetch/update calls only make quick recursive calls