Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do some minor formatting tweaks #1904

Merged
merged 4 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 41 additions & 29 deletions libs/acn/DMPE131Inflator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ bool DMPE131Inflator::HandlePDUData(uint32_t vector,
return true;
}

if (universe_iter == m_handlers.end())
if (universe_iter == m_handlers.end()) {
return true;
}

DMPHeader dmp_header = headers.GetDMPHeader();

Expand Down Expand Up @@ -104,16 +105,17 @@ bool DMPE131Inflator::HandlePDUData(uint32_t vector,

if (address->Increment() != 1) {
OLA_INFO << "E1.31 DMP packet with increment " << address->Increment()
<< ", disarding";
<< ", disarding";
return true;
}

unsigned int length_remaining = pdu_len - available_length;
int start_code = -1;
if (e131_header.UsingRev2())
if (e131_header.UsingRev2()) {
start_code = static_cast<int>(address->Start());
else if (length_remaining && address->Number())
} else if (length_remaining && address->Number()) {
start_code = *(data + available_length);
}

// The only time we want to continue processing a non-0 start code is if it
// contains a Terminate message.
Expand All @@ -132,14 +134,16 @@ bool DMPE131Inflator::HandlePDUData(uint32_t vector,
// Reaching here means that we actually have new data and we should merge.
if (target_buffer && start_code == 0) {
unsigned int channels = std::min(length_remaining, address->Number());
if (e131_header.UsingRev2())
if (e131_header.UsingRev2()) {
target_buffer->Set(data + available_length, channels);
else
target_buffer->Set(data + available_length + 1, channels - 1);
} else {
target_buffer->Set(data + available_length + 1, channels - 1);
}
}

if (universe_iter->second.priority)
if (universe_iter->second.priority) {
*universe_iter->second.priority = universe_iter->second.active_priority;
}

// merge the sources
switch (universe_iter->second.sources.size()) {
Expand All @@ -155,9 +159,11 @@ bool DMPE131Inflator::HandlePDUData(uint32_t vector,
// HTP Merge
universe_iter->second.buffer->Reset();
std::vector<dmx_source>::const_iterator source_iter =
universe_iter->second.sources.begin();
for (; source_iter != universe_iter->second.sources.end(); ++source_iter)
universe_iter->second.sources.begin();
for (; source_iter != universe_iter->second.sources.end();
++source_iter) {
universe_iter->second.buffer->HTPMerge(source_iter->buffer);
}
universe_iter->second.closure->Run();
}
return true;
Expand All @@ -175,8 +181,9 @@ bool DMPE131Inflator::SetHandler(uint16_t universe,
ola::DmxBuffer *buffer,
uint8_t *priority,
ola::Callback0<void> *closure) {
if (!closure || !buffer)
if (!closure || !buffer) {
return false;
}

UniverseHandlers::iterator iter = m_handlers.find(universe);

Expand Down Expand Up @@ -265,38 +272,41 @@ bool DMPE131Inflator::TrackSourceIfRequired(
iter++;
}

if (sources.empty())
if (sources.empty()) {
universe_data->active_priority = 0;
}

for (iter = sources.begin(); iter != sources.end(); ++iter) {
if (iter->cid == headers.GetRootHeader().GetCid())
if (iter->cid == headers.GetRootHeader().GetCid()) {
break;
}
}

if (iter == sources.end()) {
// This is an untracked source
if (e131_header.StreamTerminated() ||
priority < universe_data->active_priority)
priority < universe_data->active_priority) {
return false;
}

if (priority > universe_data->active_priority) {
OLA_INFO << "Raising priority for universe " <<
e131_header.Universe() << " from " <<
static_cast<int>(universe_data->active_priority) << " to " <<
static_cast<int>(priority);
OLA_INFO << "Raising priority for universe " << e131_header.Universe()
<< " from " << static_cast<int>(universe_data->active_priority)
<< " to " << static_cast<int>(priority);
sources.clear();
universe_data->active_priority = priority;
}

if (sources.size() == MAX_MERGE_SOURCES) {
// TODO(simon): flag this in the export map
OLA_WARN << "Max merge sources reached for universe " <<
e131_header.Universe() << ", " <<
headers.GetRootHeader().GetCid().ToString() << " won't be tracked";
OLA_WARN << "Max merge sources reached for universe "
<< e131_header.Universe() << ", "
<< headers.GetRootHeader().GetCid().ToString()
<< " won't be tracked";
return false;
} else {
OLA_INFO << "Added new E1.31 source: " <<
headers.GetRootHeader().GetCid().ToString();
OLA_INFO << "Added new E1.31 source: "
<< headers.GetRootHeader().GetCid().ToString();
dmx_source new_source;
new_source.cid = headers.GetRootHeader().GetCid();
new_source.sequence = e131_header.Sequence();
Expand All @@ -311,19 +321,21 @@ bool DMPE131Inflator::TrackSourceIfRequired(
int8_t seq_diff = static_cast<int8_t>(e131_header.Sequence() -
iter->sequence);
if (seq_diff <= 0 && seq_diff > SEQUENCE_DIFF_THRESHOLD) {
OLA_INFO << "Old packet received, ignoring, this # " <<
static_cast<int>(e131_header.Sequence()) << ", last " <<
static_cast<int>(iter->sequence);
OLA_INFO << "Old packet received, ignoring, this # "
<< static_cast<int>(e131_header.Sequence()) << ", last "
<< static_cast<int>(iter->sequence);
return false;
}
iter->sequence = e131_header.Sequence();

if (e131_header.StreamTerminated()) {
OLA_INFO << "CID " << headers.GetRootHeader().GetCid().ToString() <<
" sent a termination for universe " << e131_header.Universe();
OLA_INFO << "CID " << headers.GetRootHeader().GetCid().ToString()
<< " sent a termination for universe "
<< e131_header.Universe();
sources.erase(iter);
if (sources.empty())
if (sources.empty()) {
universe_data->active_priority = 0;
}
// We need to trigger a merge here else the buffer will be stale, we keep
// the buffer as NULL though so we don't use the data.
return true;
Expand Down
82 changes: 41 additions & 41 deletions libs/acn/DMPE131Inflator.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,58 +36,58 @@ class DMPE131Inflator: public DMPInflator {
friend class DMPE131InflatorTest;

public:
explicit DMPE131Inflator(bool ignore_preview):
DMPInflator(),
m_ignore_preview(ignore_preview) {
}
~DMPE131Inflator();
explicit DMPE131Inflator(bool ignore_preview):
DMPInflator(),
m_ignore_preview(ignore_preview) {
}
~DMPE131Inflator();

bool SetHandler(uint16_t universe, ola::DmxBuffer *buffer,
uint8_t *priority, ola::Callback0<void> *handler);
bool RemoveHandler(uint16_t universe);
bool SetHandler(uint16_t universe, ola::DmxBuffer *buffer,
uint8_t *priority, ola::Callback0<void> *handler);
bool RemoveHandler(uint16_t universe);

void RegisteredUniverses(std::vector<uint16_t> *universes);
void RegisteredUniverses(std::vector<uint16_t> *universes);

protected:
virtual bool HandlePDUData(uint32_t vector,
const HeaderSet &headers,
const uint8_t *data,
unsigned int pdu_len);
virtual bool HandlePDUData(uint32_t vector,
const HeaderSet &headers,
const uint8_t *data,
unsigned int pdu_len);

private:
typedef struct {
ola::acn::CID cid;
uint8_t sequence;
TimeStamp last_heard_from;
DmxBuffer buffer;
} dmx_source;
typedef struct {
ola::acn::CID cid;
uint8_t sequence;
TimeStamp last_heard_from;
DmxBuffer buffer;
} dmx_source;

typedef struct {
DmxBuffer *buffer;
Callback0<void> *closure;
uint8_t active_priority;
uint8_t *priority;
std::vector<dmx_source> sources;
} universe_handler;
typedef struct {
DmxBuffer *buffer;
Callback0<void> *closure;
uint8_t active_priority;
uint8_t *priority;
std::vector<dmx_source> sources;
} universe_handler;

typedef std::map<uint16_t, universe_handler> UniverseHandlers;
typedef std::map<uint16_t, universe_handler> UniverseHandlers;

UniverseHandlers m_handlers;
bool m_ignore_preview;
ola::Clock m_clock;
UniverseHandlers m_handlers;
bool m_ignore_preview;
ola::Clock m_clock;

bool TrackSourceIfRequired(universe_handler *universe_data,
const HeaderSet &headers,
DmxBuffer **buffer);
bool TrackSourceIfRequired(universe_handler *universe_data,
const HeaderSet &headers,
DmxBuffer **buffer);

// The max number of sources we'll track per universe.
static const uint8_t MAX_MERGE_SOURCES = 6;
// The max merge priority.
static const uint8_t MAX_E131_PRIORITY = 200;
// ignore packets that differ by less than this amount from the last one
static const int8_t SEQUENCE_DIFF_THRESHOLD = -20;
// expire sources after 2.5s
static const TimeInterval EXPIRY_INTERVAL;
// The max number of sources we'll track per universe.
static const uint8_t MAX_MERGE_SOURCES = 6;
// The max merge priority.
static const uint8_t MAX_E131_PRIORITY = 200;
// ignore packets that differ by less than this amount from the last one
static const int8_t SEQUENCE_DIFF_THRESHOLD = -20;
// expire sources after 2.5s
static const TimeInterval EXPIRY_INTERVAL;
};
} // namespace acn
} // namespace ola
Expand Down
Loading