Skip to content

Commit

Permalink
[ndnrtc-client] implemented file pipe frame source. now you can grab …
Browse files Browse the repository at this point in the history
…video from camera using ffmpeg and source it to ndnrtc-client: `ffmpeg -y -f avfoundation -pixel_format 0rgb -framerate 25 -i 0 -map 0:v -c copy -f rawvideo /tmp/camera` <-- specify `source = { name_/tmp/camera; type=pipe; };` in config file
  • Loading branch information
peetonn committed Sep 12, 2018
1 parent c1f0839 commit 4b2eb40
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 33 deletions.
25 changes: 23 additions & 2 deletions cpp/client/src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,34 @@ LocalStream Client::initLocalStream(const ProducerStreamParams &p)

if (p.type_ == ndnrtc::MediaStreamParams::MediaStreamTypeVideo)
{
LogDebug("") << "initializing video source at " << p.source_ << endl;
LogDebug("") << "initializing video source at " << p.source_.name_ << endl;

boost::shared_ptr<RawFrame> sampleFrame = sampleFrameForStream(p);

LogDebug("") << "source should support frames of size "
<< sampleFrame->getWidth() << "x" << sampleFrame->getHeight() << endl;
videoSource.reset(new VideoSource(io_, p.source_, sampleFrame));

boost::shared_ptr<IFrameSource> source;

if (p.source_.type_ == "file")
{
if (!FileFrameSource::checkSourceForFrame(p.source_.name_, *sampleFrame))
{
stringstream msg;
msg << "bad source (" << p.source_.name_ << ") for "
<< sampleFrame->getWidth() << "x" << sampleFrame->getHeight() << " video";
throw runtime_error(msg.str());
}
source.reset(new FileFrameSource(p.source_.name_));
}
else if (p.source_.type_ == "pipe")
{
source.reset(new PipeFrameSource(p.source_.name_));
}
else
throw runtime_error("Uknown source type "+p.source_.type_);

videoSource.reset(new VideoSource(io_, source, sampleFrame));
LogDebug("") << "video source initialized" << endl;

boost::shared_ptr<ndnrtc::LocalVideoStream> s =
Expand Down
10 changes: 9 additions & 1 deletion cpp/client/src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,15 @@ int loadStreamParams(const Setting &s, ProducerStreamParams &params)
// ClientMediaStreamParams msp;
if (EXIT_SUCCESS == loadStreamParams(s, (ClientMediaStreamParams &)params))
{
s.lookupValue("source", params.source_);
// user may specify file source by simply providing "source = <filename>;"
// otherwise, treat source as a structure { <filename>, <type> }
if (!s.lookupValue("source", params.source_.name_))
{
const Setting &sourceSettings = s["source"];

sourceSettings.lookupValue("name", params.source_.name_);
sourceSettings.lookupValue("type", params.source_.type_);
}

if (params.type_ == MediaStreamParams::MediaStreamTypeAudio)
{
Expand Down
10 changes: 7 additions & 3 deletions cpp/client/src/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,21 @@ class ClientMediaStreamParams : public ndnrtc::MediaStreamParams
class ProducerStreamParams : public ClientMediaStreamParams
{
public:
std::string source_;
typedef struct _Source {
std::string name_, type_;
} Source;
Source source_;

ProducerStreamParams() {}
ProducerStreamParams() : source_({"", "file"}) {}
ProducerStreamParams(const ProducerStreamParams &params) : ClientMediaStreamParams(params), source_(params.source_) {}

void getMaxResolution(unsigned int &width, unsigned int &height) const;

void write(std::ostream &os) const
{
os
<< "stream source: " << source_ << "; ";
<< "stream source: " << source_.name_ << " (type: "
<< source_.type_ << "); ";
ClientMediaStreamParams::write(os);
}
};
Expand Down
83 changes: 82 additions & 1 deletion cpp/client/src/frame-io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include "frame-io.hpp"

Expand Down Expand Up @@ -213,7 +214,8 @@ IFrameSink &NanoMsgSink::operator<<(const RawFrame &frame)

//******************************************************************************
FileFrameSource::FileFrameSource(const string &path) : FileFrameStorage(path),
current_(0), readError_(false)
current_(0), readError_(false),
errorMsg_("")
{
openFile();
}
Expand Down Expand Up @@ -258,3 +260,82 @@ FILE *FileFrameSource::openFile_impl(string path)
{
return fopen(path.c_str(), "rb");
}

PipeFrameSource::PipeFrameSource(const std::string &path):pipe_(-1), pipePath_(path)
{
createReadPipe();
reopenReadPipe();

if (pipe_ < 0)
throw std::runtime_error("Couldn't open pipe "+pipePath_);
}

PipeFrameSource::~PipeFrameSource()
{
closePipe();
}

IFrameSource &PipeFrameSource::operator>>(RawFrame& frame) noexcept
{
uint8_t *buf = frame.getBuffer().get();
int bufferSize = frame.getFrameSizeInBytes();

int c = 0;

readError_ = false;
errorMsg_ = "";

do { // read frame data until all frame data read
int r = read(pipe_, buf+c, bufferSize-c);

if (r > 0)
c += r;
else if (r < 0)
{
std::stringstream ss;
ss << "Error reading from pipe " << pipePath_ << ": " << errno << " (" << strerror(errno) << ")";
readError_ = true;
errorMsg_ = ss.str();
}
else
reopenReadPipe();
} while (c < bufferSize);

return *this;
}

int
PipeFrameSource::createReadPipe()
{
int res = 0;

res = mkfifo(pipePath_.c_str(), 0644);

if (res < 0 && errno != EEXIST)
{
std::stringstream ss;
ss << "Error creating pipe " << pipePath_ << " error " << errno << "(" << strerror(errno) << ")";
throw std::runtime_error(ss.str());
}
else res = 0;

return res;
}

void
PipeFrameSource::reopenReadPipe()
{
// do {
if (pipe_ > 0)
close(pipe_);

pipe_ = open(pipePath_.c_str(), O_RDONLY);
sleep(0.5);
// } while (pipe_ < 0);
}

void
PipeFrameSource::closePipe()
{
close(pipe_);
}
36 changes: 34 additions & 2 deletions cpp/client/src/frame-io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ class IFrameSource
{
public:
virtual IFrameSource &operator>>(RawFrame &frame) noexcept = 0;
virtual std::string getName() const = 0;
virtual bool isEof() const = 0;
virtual void rewind() = 0;
virtual bool isError() const = 0;
virtual std::string getErrorMsg() const = 0;
};

class FileFrameSource : public IFrameSource, public FileFrameStorage
Expand All @@ -177,6 +182,8 @@ class FileFrameSource : public IFrameSource, public FileFrameStorage
FileFrameSource(const std::string &path);

IFrameSource &operator>>(RawFrame &frame) noexcept;
std::string getName() const { return path_; }

/**
* NOTE: will always return 'true' before any read call.
* proper use to read frames in a loop:
Expand All @@ -185,8 +192,9 @@ class FileFrameSource : public IFrameSource, public FileFrameStorage
* source >> frame;
* } while(source.isEof() && !source.isError());
*/
bool isEof() { return (feof(file_) != 0); }
bool isError() { return readError_; }
bool isEof() const { return (feof(file_) != 0); }
bool isError() const { return readError_; }
std::string getErrorMsg() const { return errorMsg_; }
void rewind()
{
::rewind(file_);
Expand All @@ -199,6 +207,30 @@ class FileFrameSource : public IFrameSource, public FileFrameStorage
FILE *openFile_impl(std::string path);
unsigned long current_;
bool readError_;
std::string errorMsg_;
};

class PipeFrameSource : public IFrameSource {
public:
PipeFrameSource(const std::string &path);
~PipeFrameSource();

IFrameSource &operator>>(RawFrame &frame) noexcept;
std::string getName() const { return pipePath_; }
bool isError() const { return readError_; }
std::string getErrorMsg() const { return errorMsg_; }
bool isEof() const { return false; }
void rewind() { /*do nothing*/ }

private:
std::string pipePath_;
int pipe_;
bool readError_;
std::string errorMsg_;

int createReadPipe();
void reopenReadPipe();
void closePipe();
};

#endif
23 changes: 7 additions & 16 deletions cpp/client/src/video-source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,12 @@ using namespace boost::chrono;
using namespace boost::asio;

VideoSource::VideoSource(io_service &io_service,
const std::string &sourcePath,
const boost::shared_ptr<RawFrame> &frame) : io_(io_service), frame_(frame), isRunning_(false), framesSourced_(0),
const boost::shared_ptr<IFrameSource>& source,
const boost::shared_ptr<RawFrame> &frame) : io_(io_service), frame_(frame), source_(source),
isRunning_(false), framesSourced_(0),
nRewinds_(0)
{
assert(frame.get());

if (!FileFrameSource::checkSourceForFrame(sourcePath, *frame_))
{
stringstream msg;
msg << "bad source (" << sourcePath << ") for "
<< frame_->getWidth() << "x" << frame_->getHeight() << " video";
throw runtime_error(msg.str());
}

source_.reset(new FileFrameSource(sourcePath));
}

VideoSource::~VideoSource()
Expand Down Expand Up @@ -56,7 +47,7 @@ void VideoSource::start(const double &rate)

isRunning_ = generator_->isRunning();

LogInfo("") << "sourcing video from " << source_->getPath()
LogInfo("") << "sourcing video from " << source_->getName()
<< " started (capturing rate " << rate << ")" << endl;
}
else
Expand All @@ -70,7 +61,7 @@ void VideoSource::stop()
generator_->stop();
isRunning_ = false;

LogInfo("") << "sourcing video from " << source_->getPath()
LogInfo("") << "sourcing video from " << source_->getName()
<< " stopped" << endl;
}
}
Expand All @@ -86,13 +77,13 @@ double VideoSource::getMeanSourcingTimeMs()
void VideoSource::sourceFrame()
{
// LogTrace("") << "reading " << frame_->getWidth() << "x" << frame_->getHeight()
// << "frame from " << source_->getPath() << endl;
// << "frame from " << source_->getName() << endl;

do
{
if (source_->isEof())
{
LogDebug("") << "rewound source " << source_->getPath() << endl;
LogDebug("") << "rewound source " << source_->getName() << endl;
source_->rewind();
nRewinds_++;
}
Expand Down
5 changes: 3 additions & 2 deletions cpp/client/src/video-source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class VideoSource
* @param rate Framerate (per second) for delivering frames (reading
* frames from a source file)
*/
VideoSource(boost::asio::io_service &io_service, const std::string &sourcePath,
VideoSource(boost::asio::io_service &io_service,
const boost::shared_ptr<IFrameSource>& source,
const boost::shared_ptr<RawFrame> &frame);
~VideoSource();

Expand All @@ -46,7 +47,7 @@ class VideoSource
private:
bool isRunning_;
unsigned int framesSourced_, nRewinds_;
boost::shared_ptr<FileFrameSource> source_;
boost::shared_ptr<IFrameSource> source_;
boost::shared_ptr<RawFrame> frame_;
std::vector<ndnrtc::IExternalCapturer *> capturers_;
boost::asio::io_service &io_;
Expand Down
10 changes: 5 additions & 5 deletions cpp/tests/default.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ produce = {
sample = 15; // sample freshness (audio, video delta)
sampleKey = 900; // key sample freshness (video key)
};
source = "camera.argb"; // file from where raw frames will
// be read. frame resolution should be
// equal to the maximum encoding resolution
// among threads
source = { // file from where raw frames will be read
name = "camera.argb";
type = "file"; // could be either "file" or "pipe"
};
sync = "sound";

threads = ({ // an array of streams all threads that will be published
Expand Down Expand Up @@ -62,7 +62,7 @@ produce = {
type = "video";
name = "desktop";
segment_size = 1000;
source = "desktop.argb";
source = "desktop.argb"; // this way of defining source defaults to a "file" type
freshness = 2000;
sync = "sound";

Expand Down
2 changes: 1 addition & 1 deletion cpp/tests/tests-helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ ClientParams sampleProducerParams()

msp.sessionPrefix_ = pcp.prefix_;
msp.streamName_ = "camera";
msp.source_ = "/tmp/camera.argb";
msp.source_ = {"/tmp/camera.argb", "file"};
msp.type_ = MediaStreamParams::MediaStreamTypeVideo;
msp.synchronizedStreamName_ = "mic";
msp.producerParams_.freshness_ = {15, 15, 900};
Expand Down

0 comments on commit 4b2eb40

Please sign in to comment.