Skip to content

Commit

Permalink
Lazily load streams in meta streams
Browse files Browse the repository at this point in the history
  • Loading branch information
jdecourval committed Oct 13, 2024
1 parent 208066e commit b638d81
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 33 deletions.
135 changes: 102 additions & 33 deletions server/streamreader/meta_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,86 @@ MetaStream::MetaStream(PcmStream::Listener* pcmListener, const std::vector<std::
: PcmStream(pcmListener, ioc, server_settings, uri), first_read_(true)
{
auto path_components = utils::string::split(uri.path, '/');

for (const auto& stream : streams)
{
addStream(stream);
}

updateActiveStream();
}

bool MetaStream::isAllowed(const PcmStream& stream) const
{
auto path_components = utils::string::split(uri_.path, '/');
for (const auto& component : path_components)
{
if (component.empty())
continue;

bool found = false;
for (const auto& stream : streams)
if (stream.getName() == component)
{
if (stream->getName() == component)
{
streams_.push_back(stream);
stream->addListener(this);
found = true;
break;
}
return true;
}
if (!found)
throw SnapException("Unknown stream: \"" + component + "\"");
}

if (streams_.empty())
throw SnapException("Meta stream '" + getName() + "' must contain at least one stream");
return false;
}

active_stream_ = streams_.front();
resampler_ = make_unique<Resampler>(active_stream_->getSampleFormat(), sampleFormat_);
void MetaStream::addStream(std::shared_ptr<PcmStream> stream)
{
if (isAllowed(*stream))
{
stream->addListener(this);
streams_.push_back(std::move(stream));
updateActiveStream();
}
}

void MetaStream::removeStream(const PcmStream& stream)
{
auto iter = std::find_if(streams_.begin(), streams_.end(), [id = stream.getId()](const auto& s) { return s->getId() == id; });
if (iter != streams_.end())
{
streams_.erase(iter);
updateActiveStream();
}
}

void MetaStream::updateActiveStream()
{
auto compareStreamOrder = [this](const std::shared_ptr<PcmStream>& first, const std::shared_ptr<PcmStream>& second)
{
if (first->getName() == second->getName())
return false;

auto path_components = utils::string::split(uri_.path, '/');
for (const auto& component : path_components)
{
if (component == first->getName())
return true;
if (component == second->getName())
return false;
}
return false;
};

std::lock_guard<std::recursive_mutex> lock(active_mutex_);
if (!streams_.empty())
{
auto new_active = std::min_element(streams_.begin(), streams_.end(), compareStreamOrder);
if (!active_stream_ || active_stream_->getId() != ((*new_active)->getId()))
{
active_stream_ = *streams_.begin();
resampler_ = make_unique<Resampler>(active_stream_->getSampleFormat(), sampleFormat_);
}
}
else
{
active_stream_ = nullptr;
resampler_ = nullptr;
}
}

MetaStream::~MetaStream()
{
Expand Down Expand Up @@ -133,7 +186,8 @@ void MetaStream::onStateChanged(const PcmStream* pcmStream, ReaderState state)
}
}

switch_stream(streams_.front());
if (!streams_.empty())
switch_stream(*streams_.begin());
setState(ReaderState::kIdle);
}

Expand Down Expand Up @@ -212,86 +266,100 @@ void MetaStream::onResync(const PcmStream* pcmStream, double ms)
void MetaStream::setShuffle(bool shuffle, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setShuffle(shuffle, std::move(handler));
if (active_stream_)
active_stream_->setShuffle(shuffle, std::move(handler));
}

void MetaStream::setLoopStatus(LoopStatus status, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setLoopStatus(status, std::move(handler));
if (active_stream_)
active_stream_->setLoopStatus(status, std::move(handler));
}

void MetaStream::setVolume(uint16_t volume, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setVolume(volume, std::move(handler));
if (active_stream_)
active_stream_->setVolume(volume, std::move(handler));
}

void MetaStream::setMute(bool mute, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setMute(mute, std::move(handler));
if (active_stream_)
active_stream_->setMute(mute, std::move(handler));
}

void MetaStream::setRate(float rate, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setRate(rate, std::move(handler));
if (active_stream_)
active_stream_->setRate(rate, std::move(handler));
}


// Control commands
void MetaStream::setPosition(std::chrono::milliseconds position, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->setPosition(position, std::move(handler));
if (active_stream_)
active_stream_->setPosition(position, std::move(handler));
}

void MetaStream::seek(std::chrono::milliseconds offset, ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->seek(offset, std::move(handler));
if (active_stream_)
active_stream_->seek(offset, std::move(handler));
}

void MetaStream::next(ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->next(std::move(handler));
if (active_stream_)
active_stream_->next(std::move(handler));
}

void MetaStream::previous(ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->previous(std::move(handler));
if (active_stream_)
active_stream_->previous(std::move(handler));
}

void MetaStream::pause(ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->pause(std::move(handler));
if (active_stream_)
active_stream_->pause(std::move(handler));
}

void MetaStream::playPause(ResultHandler handler)
{
LOG(DEBUG, LOG_TAG) << "PlayPause\n";
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (active_stream_->getState() == ReaderState::kIdle)
play(handler);
else
active_stream_->playPause(std::move(handler));
if (active_stream_)
{
if (active_stream_->getState() == ReaderState::kIdle)
play(handler);
else
active_stream_->playPause(std::move(handler));
}
}

void MetaStream::stop(ResultHandler handler)
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
active_stream_->stop(std::move(handler));
if (active_stream_)
active_stream_->stop(std::move(handler));
}

void MetaStream::play(ResultHandler handler)
{
LOG(DEBUG, LOG_TAG) << "Play\n";
std::lock_guard<std::recursive_mutex> lock(mutex_);
if ((active_stream_->getProperties().can_play) && (active_stream_->getProperties().playback_status != PlaybackStatus::kPlaying))
if ((active_stream_) && (active_stream_->getProperties().can_play) && (active_stream_->getProperties().playback_status != PlaybackStatus::kPlaying))
return active_stream_->play(std::move(handler));

for (const auto& stream : streams_)
Expand All @@ -303,7 +371,8 @@ void MetaStream::play(ResultHandler handler)
}

// call play on the active stream to get the handler called
active_stream_->play(std::move(handler));
if (active_stream_)
active_stream_->play(std::move(handler));
}


Expand Down
6 changes: 6 additions & 0 deletions server/streamreader/meta_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class MetaStream : public PcmStream, public PcmStream::Listener
void start() override;
void stop() override;

void addStream(std::shared_ptr<PcmStream> stream);
void removeStream(const PcmStream& stream);

// Setter for properties
void setShuffle(bool shuffle, ResultHandler handler) override;
void setLoopStatus(LoopStatus status, ResultHandler handler) override;
Expand All @@ -67,6 +70,9 @@ class MetaStream : public PcmStream, public PcmStream::Listener
void play(ResultHandler handler) override;

protected:
bool isAllowed(const PcmStream& stream) const;
void updateActiveStream();

/// Implementation of PcmStream::Listener
void onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties) override;
void onStateChanged(const PcmStream* pcmStream, ReaderState state) override;
Expand Down
9 changes: 9 additions & 0 deletions server/streamreader/stream_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ PcmStreamPtr StreamManager::addStream(StreamUri& streamUri)
{
if (s->getName() == stream->getName())
throw SnapException("Stream with name \"" + stream->getName() + "\" already exists");

if (auto meta = dynamic_cast<MetaStream*>(s.get()))
meta->addStream(stream);
}
streams_.push_back(stream);
}
Expand All @@ -161,6 +164,12 @@ void StreamManager::removeStream(const std::string& name)
{
(*iter)->stop();
streams_.erase(iter);

for (const auto& s : streams_)
{
if (auto meta = dynamic_cast<MetaStream*>(s.get()))
meta->removeStream(**iter);
}
}
}

Expand Down

0 comments on commit b638d81

Please sign in to comment.