Skip to content

Commit

Permalink
catch for cuda processing thread (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhreshold authored Apr 20, 2020
1 parent 4f6e688 commit 1704055
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 105 deletions.
142 changes: 38 additions & 104 deletions src/video/nvcodec/cuda_threaded_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ CUThreadedDecoder::CUThreadedDecoder(int device_id, AVCodecParameters *codecpar,
pkt_queue_{}, frame_queue_{},
run_(false), frame_count_(0), draining_(false),
tex_registry_(), nv_time_base_({1, 10000000}), frame_base_({1, 1000000}),
dec_ctx_(nullptr), bsf_ctx_(nullptr), width_(-1), height_(-1) {
dec_ctx_(nullptr), bsf_ctx_(nullptr), width_(-1), height_(-1),
error_status_(false), error_message_() {

// initialize bitstream filters
InitBitStreamFilter(codecpar, iformat);
Expand Down Expand Up @@ -114,6 +115,7 @@ void CUThreadedDecoder::SetCodecContext(AVCodecContext *dec_ctx, int width, int
}

void CUThreadedDecoder::Start() {
CheckErrorStatus();
if (run_.load()) return;

pkt_queue_.reset(new PacketQueue());
Expand All @@ -128,56 +130,36 @@ void CUThreadedDecoder::Start() {
LOG(FATAL) << "Problem creating video parser";
return;
}
// frame_in_use_.resize(kMaxOutputSurfaces, 0);
//CHECK(permits_.size() == 0);
//permits_.resize(kMaxOutputSurfaces);
// for (auto& p : permits_) {
// p.reset(new PermitQueue());
// p->Push(1);
// }
// LOG(INFO) << "permits initied.";
run_.store(true);
// launch worker threads
auto launcher_t = std::thread{&CUThreadedDecoder::LaunchThread, this};
std::swap(launcher_t_, launcher_t);
// auto converter_t = std::thread{&CUThreadedDecoder::ConvertThread, this};
// std::swap(converter_t_, converter_t);
}

void CUThreadedDecoder::Stop() {
if (run_.load()) {
pkt_queue_->SignalForKill();
run_.store(false);
frame_queue_->SignalForKill();
//buffer_queue_->SignalForKill();
reorder_queue_->SignalForKill();
//frame_order_->SignalForKill();
// for (auto& p : permits_) {
// if (p) p->SignalForKill();
// }
// frame_in_use_.clear();
}
if (launcher_t_.joinable()) {
launcher_t_.join();
}
// if (converter_t_.joinable()) {
// converter_t_.join();
// }
}

void CUThreadedDecoder::Clear() {
Stop();
frame_count_.store(0);
// reorder_buffer_.clear();
// for (auto& p : permits_) {
// if (p) p->SignalForKill();
// }
// permits_.clear();
// frame_in_use_.clear();
{
std::lock_guard<std::mutex> lock(pts_mutex_);
discard_pts_.clear();
}
error_status_.store(false);
{
std::lock_guard<std::mutex> lock(error_mutex_);
error_message_.clear();
}
}

CUThreadedDecoder::~CUThreadedDecoder() {
Expand Down Expand Up @@ -289,20 +271,6 @@ void CUThreadedDecoder::Push(AVPacketPtr pkt, NDArray buf) {
if (draining_.load()) return;
draining_.store(true);
}
// if (pkt) {
// // calculate frame number for output order
// auto frame_num = av_rescale_q(pkt->pts, AV_TIME_BASE_Q, dec_ctx_->time_base);
// frame_order_->Push(frame_num);
// }
// if (pkt) {
// if (last_pts_ < 0) {
// last_pts_ = pkt->pts;
// frame_order_->Push(last_pts_);
// } else {
// last_pts_ += pkt->duration;
// frame_order_->Push(last_pts_);
// }
// }

while (pkt_queue_->Size() > kMaxOutputSurfaces) {
// too many in queue to be processed, wait here
Expand All @@ -315,19 +283,35 @@ void CUThreadedDecoder::Push(AVPacketPtr pkt, NDArray buf) {
}

bool CUThreadedDecoder::Pop(NDArray *frame) {
CheckErrorStatus();
if (!frame_count_.load() && !draining_.load()) {
return false;
}
if (reorder_queue_->Size() < 1) {
return false;
}
int ret = reorder_queue_->Pop(frame);
CheckErrorStatus();
if (!ret) return false;
--frame_count_;
return true;
}

void CUThreadedDecoder::LaunchThread() {
try {
LaunchThreadImpl();
} catch (dmlc::Error error) {
RecordInternalError(error.what());
run_.store(false);
frame_queue_->SignalForKill(); // Unblock all consumers
} catch (std::exception& error) {
RecordInternalError(error.what());
run_.store(false);
frame_queue_->SignalForKill(); // Unblock all consumers
}
}

void CUThreadedDecoder::LaunchThreadImpl() {
ctx_.Push();
// LOG(INFO) << "LaunchThread, thread id: " << std::this_thread::get_id();
while (run_.load()) {
Expand Down Expand Up @@ -368,70 +352,20 @@ void CUThreadedDecoder::LaunchThread() {
}
}

// void CUThreadedDecoder::ConvertThread() {
// ctx_.Push();
// // LOG(INFO) << "convert thread, thread id: " << std::this_thread::get_id();;
// while (run_.load()) {
// bool ret;
// CUVIDPARSERDISPINFO *disp_info = nullptr;
// NDArray arr;
// ret = buffer_queue_->Pop(&disp_info);
// if (!ret) return;
// CHECK(disp_info != nullptr);
// // CUDA mem buffer
// ret = frame_queue_->Pop(&arr);
// CHECK(arr.defined());
// // LOG(INFO) << "COnvert thread, ndarray buffer get";
// uint8_t* dst_ptr = static_cast<uint8_t*>(arr.data_->dl_tensor.data);
// auto frame = CUMappedFrame(disp_info, decoder_, stream_);
// // conversion to usable format, RGB, resize, etc...
// auto input_width = decoder_.Width();
// auto input_height = decoder_.Height();
// auto& textures = tex_registry_.GetTexture(frame.get_ptr(),
// frame.get_pitch(),
// input_width,
// input_height,
// ScaleMethod_Linear,
// ChromaUpMethod_Linear);
// int64_t frame_pts = static_cast<int64_t>(frame.disp_info->timestamp);
// // (TODO @zhreshold) verify how to precisely align discard pts
// ProcessFrame(textures.chroma, textures.luma, dst_ptr, stream_, input_width, input_height, width_, height_);
// // bool no_skip = true;
// // {
// // std::lock_guard<std::mutex> lock(pts_mutex_);
// // auto it = discard_pts_.find(frame_pts);
// // no_skip = it == discard_pts_.end();
// // if (!no_skip) {
// // discard_pts_.erase(it);
// // }
// // }
// // if (no_skip) {
// // // only process frame when not indicated with discard flag
// // ProcessFrame(textures.chroma, textures.luma, dst_ptr, stream_, input_width, input_height, width_, height_);
// // }
// // auto frame_num = av_rescale_q(frame.disp_info->timestamp,
// // nv_time_base_, frame_base_);
// int64_t desired_pts;
// int fo_ret = frame_order_->Pop(&desired_pts);
// if (!fo_ret) return;
// if (desired_pts == frame_pts) {
// // queue top is current array
// reorder_queue_->Push(arr);
// } else {
// // store current arr to map
// reorder_buffer_[frame_pts] = arr;
// auto r = reorder_buffer_.find(static_cast<int64_t>(frame_pts));
// if (r != reorder_buffer_.end()) {
// // queue top is stored in map
// reorder_queue_->Push(r->second);
// reorder_buffer_.erase(r);
// }
// }

// // Output cleared, allow next decoding
// permits_[disp_info->picture_index]->Push(1);
// }
// }
void CUThreadedDecoder::CheckErrorStatus() {
if (error_status_.load()) {
std::lock_guard<std::mutex> lock(error_mutex_);
LOG(FATAL) << error_message_;
}
}

void CUThreadedDecoder::RecordInternalError(std::string message) {
{
std::lock_guard<std::mutex> lock(error_mutex_);
error_message_ = message;
}
error_status_.store(true);
}

} // namespace cuda
} // namespace decord
7 changes: 6 additions & 1 deletion src/video/nvcodec/cuda_threaded_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ class CUThreadedDecoder final : public ThreadedDecoderInterface {
int HandlePictureDecode_(CUVIDPICPARAMS* pic_params);
int HandlePictureDisplay_(CUVIDPARSERDISPINFO* disp_info);
void LaunchThread();
void ConvertThread();
void LaunchThreadImpl();
void RecordInternalError(std::string message);
void CheckErrorStatus();
void InitBitStreamFilter(AVCodecParameters *codecpar, AVInputFormat *iformat);

int device_id_;
Expand Down Expand Up @@ -102,6 +104,9 @@ class CUThreadedDecoder final : public ThreadedDecoderInterface {
// uint64_t decoded_cnt_;
std::unordered_set<int64_t> discard_pts_;
std::mutex pts_mutex_;
std::mutex error_mutex_;
std::atomic<bool> error_status_;
std::string error_message_;

DISALLOW_COPY_AND_ASSIGN(CUThreadedDecoder);
};
Expand Down

0 comments on commit 1704055

Please sign in to comment.