Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ScopedJobThreadPtr class to be used with JobThread #4275

Open
wants to merge 1 commit into
base: 25.lts.1+
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion starboard/android/shared/audio_renderer_passthrough.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class AudioRendererPassthrough
// ensure the thread completes all tasks before |audio_track_bridge_| is
// invalidated.
std::unique_ptr<AudioTrackBridge> audio_track_bridge_;
std::unique_ptr<JobThread> audio_track_thread_;
starboard::shared::starboard::player::ScopedJobThreadPtr audio_track_thread_;
};

} // namespace shared
Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/libaom/aom_video_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void VideoDecoder::Reset() {
SB_DCHECK(BelongsToCurrentThread());

if (decoder_thread_) {
decoder_thread_->job_queue()->ScheduleAndWait(
decoder_thread_->job_queue()->Schedule(
std::bind(&VideoDecoder::TeardownCodec, this));

// Join the thread to ensure that all callbacks in process are finished.
Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/libaom/aom_video_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class VideoDecoder : public starboard::player::filter::VideoDecoder,
bool error_occurred_;

// Working thread to avoid lengthy decoding work block the player thread.
std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;

// Decode-to-texture related state.
SbPlayerOutputMode output_mode_;
Expand Down
5 changes: 2 additions & 3 deletions starboard/shared/libdav1d/dav1d_video_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ void VideoDecoder::Reset() {
SB_DCHECK(BelongsToCurrentThread());

if (decoder_thread_) {
// Wait to ensure all tasks are done before decoder_thread_ reset.
decoder_thread_->job_queue()->ScheduleAndWait(
decoder_thread_->job_queue()->Schedule(
std::bind(&VideoDecoder::TeardownCodec, this));

// Join the thread to ensure that all callbacks in process are finished.
decoder_thread_.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/libdav1d/dav1d_video_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class VideoDecoder : public starboard::player::filter::VideoDecoder,
bool stream_ended_ = false;

// Working thread to avoid lengthy decoding work block the player thread.
std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;

// Decode-to-texture related state.
const SbPlayerOutputMode output_mode_;
Expand Down
5 changes: 2 additions & 3 deletions starboard/shared/libde265/de265_video_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,9 @@ void VideoDecoder::Reset() {
SB_DCHECK(BelongsToCurrentThread());

if (decoder_thread_) {
// Wait to ensure all tasks are done before decoder_thread_ reset.
decoder_thread_->job_queue()->ScheduleAndWait(
decoder_thread_->job_queue()->Schedule(
std::bind(&VideoDecoder::TeardownCodec, this));

// Join the thread to ensure that all callbacks in process are finished.
decoder_thread_.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/libde265/de265_video_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class VideoDecoder : public starboard::player::filter::VideoDecoder,
bool error_occurred_ = false;

// Working thread to avoid lengthy decoding work block the player thread.
std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;

// Decode-to-texture related state.
SbPlayerOutputMode output_mode_;
Expand Down
5 changes: 2 additions & 3 deletions starboard/shared/libvpx/vpx_video_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@ void VideoDecoder::Reset() {
SB_DCHECK(BelongsToCurrentThread());

if (decoder_thread_) {
// Wait to ensure all tasks are done before decoder_thread_ reset.
decoder_thread_->job_queue()->ScheduleAndWait(
decoder_thread_->job_queue()->Schedule(
std::bind(&VideoDecoder::TeardownCodec, this));

// Join the thread to ensure that all callbacks in process are finished.
decoder_thread_.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/libvpx/vpx_video_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class VideoDecoder : public starboard::player::filter::VideoDecoder,
bool error_occurred_;

// Working thread to avoid lengthy decoding work block the player thread.
std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;

// Decode-to-texture related state.
SbPlayerOutputMode output_mode_;
Expand Down
4 changes: 2 additions & 2 deletions starboard/shared/openh264/openh264_video_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ void VideoDecoder::Reset() {
SB_DCHECK(BelongsToCurrentThread());

if (decoder_thread_) {
// Wait to ensure all tasks are done before decoder_thread_ reset.
decoder_thread_->job_queue()->ScheduleAndWait(
decoder_thread_->job_queue()->Schedule(
std::bind(&VideoDecoder::TeardownCodec, this));
// Join the thread to ensure that all callbacks in process are finished.
decoder_thread_.reset();
}

Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/openh264/openh264_video_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class VideoDecoder : public starboard::player::filter::VideoDecoder,
Mutex decode_target_mutex_;

// Working thread to avoid lengthy decoding work block the player thread.
std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;

// Openh264 decode handler.
ISVCDecoder* decoder_ = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class StubAudioDecoder : public AudioDecoder, private JobQueue::JobOwner {
OutputCB output_cb_;
ErrorCB error_cb_;

std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;
Mutex decoded_audios_mutex_;
std::queue<scoped_refptr<DecodedAudio> > decoded_audios_;
scoped_refptr<InputBuffer> last_input_buffer_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class StubVideoDecoder : public VideoDecoder, private JobQueue::JobOwner {
DecoderStatusCB decoder_status_cb_;
media::VideoStreamInfo video_stream_info_;

std::unique_ptr<starboard::player::JobThread> decoder_thread_;
starboard::player::ScopedJobThreadPtr decoder_thread_;
// std::set<> keeps frame timestamps sorted in ascending order.
std::set<int64_t> output_frame_timestamps_;
// Used to determine when to send kBufferFull in DecodeOneBuffer().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const int kJobThreadStackSize = 0;
std::unique_ptr<VideoDmpReader> s_video_dmp_reader;
std::unique_ptr<PlayerComponents> s_player_components;
int s_audio_sample_index;
std::unique_ptr<JobThread> s_job_thread;
starboard::shared::starboard::player::ScopedJobThreadPtr s_job_thread;
int64_t s_duration;

static void DeallocateSampleFunc(SbPlayer player,
Expand Down
34 changes: 32 additions & 2 deletions starboard/shared/starboard/player/job_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class JobThread {
explicit JobThread(const char* thread_name,
int64_t stack_size = 0,
SbThreadPriority priority = kSbThreadPriorityNormal);
~JobThread();

JobQueue* job_queue() { return job_queue_.get(); }
const JobQueue* job_queue() const { return job_queue_.get(); }
Expand Down Expand Up @@ -83,13 +82,44 @@ class JobThread {

return job_queue_->RemoveJobByToken(job_token);
}

private:
~JobThread();
static void* ThreadEntryPoint(void* context);
void RunLoop();

pthread_t thread_;
std::unique_ptr<JobQueue> job_queue_;

friend class ScopedJobThreadPtr;
};

// The ScopedJobThreadPtr class guarantees that the pointer to JobThread object
// is valid during JobThread destructor. This prevents issues of accessing nullified
// JobThread pointer, as per b/372515171
class ScopedJobThreadPtr {
alexanderbobrovnik marked this conversation as resolved.
Show resolved Hide resolved
public:
explicit ScopedJobThreadPtr(JobThread* p = nullptr): job_thread_(p) {
}

~ScopedJobThreadPtr() {
delete job_thread_;
alexanderbobrovnik marked this conversation as resolved.
Show resolved Hide resolved
}

void reset(JobThread* p = nullptr) {
delete job_thread_;
job_thread_ = p;
}

JobThread* operator->() const {
return job_thread_;
}

explicit operator bool() const {
return job_thread_ != nullptr;
}

private:
JobThread* job_thread_;
};

} // namespace player
Expand Down
64 changes: 32 additions & 32 deletions starboard/shared/starboard/player/job_thread_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,35 @@ using ::testing::ElementsAre;
// Require at least millisecond-level precision.
constexpr int64_t kPrecisionUsec = 1000;

void ExecutePendingJobs(JobThread* job_thread) {
void ExecutePendingJobs(ScopedJobThreadPtr& job_thread) {
job_thread->ScheduleAndWait([]() {});
}

TEST(JobThreadTest, ScheduledJobsAreExecutedInOrder) {
std::vector<int> values;
JobThread job_thread{"JobThreadTests"};
job_thread.Schedule([&]() { values.push_back(1); });
job_thread.Schedule([&]() { values.push_back(2); });
job_thread.Schedule([&]() { values.push_back(3); });
job_thread.Schedule([&]() { values.push_back(4); }, 1 * kPrecisionUsec);
job_thread.Schedule([&]() { values.push_back(5); }, 1 * kPrecisionUsec);
job_thread.Schedule([&]() { values.push_back(6); }, 1 * kPrecisionUsec);
job_thread.Schedule([&]() { values.push_back(7); }, 2 * kPrecisionUsec);
job_thread.Schedule([&]() { values.push_back(8); }, 3 * kPrecisionUsec);
ScopedJobThreadPtr job_thread(new JobThread("JobThreadTests"));
job_thread->Schedule([&]() { values.push_back(1); });
job_thread->Schedule([&]() { values.push_back(2); });
job_thread->Schedule([&]() { values.push_back(3); });
job_thread->Schedule([&]() { values.push_back(4); }, 1 * kPrecisionUsec);
job_thread->Schedule([&]() { values.push_back(5); }, 1 * kPrecisionUsec);
job_thread->Schedule([&]() { values.push_back(6); }, 1 * kPrecisionUsec);
job_thread->Schedule([&]() { values.push_back(7); }, 2 * kPrecisionUsec);
job_thread->Schedule([&]() { values.push_back(8); }, 3 * kPrecisionUsec);

// Sleep past the last scheduled job.
usleep(4 * kPrecisionUsec);

ExecutePendingJobs(&job_thread);
ExecutePendingJobs(job_thread);

EXPECT_THAT(values, ElementsAre(1, 2, 3, 4, 5, 6, 7, 8));
}

TEST(JobThreadTest, ScheduleAndWaitWaits) {
int64_t start = CurrentMonotonicTime();
std::atomic_bool job_1 = {false};
JobThread job_thread{"JobThreadTests"};
job_thread.ScheduleAndWait([&]() {
ScopedJobThreadPtr job_thread(new JobThread("JobThreadTests"));
job_thread->ScheduleAndWait([&]() {
usleep(1 * kPrecisionUsec);
job_1 = true;
});
Expand All @@ -77,16 +77,16 @@ TEST(JobThreadTest, ScheduleAndWaitWaits) {
TEST(JobThreadTest, ScheduledJobsShouldNotExecuteAfterGoingOutOfScope) {
std::atomic_int counter = {0};
{
JobThread job_thread{"JobThreadTests"};
ScopedJobThreadPtr job_thread(new JobThread("JobThreadTests"));
std::function<void()> job = [&]() {
counter++;
job_thread.Schedule(job, 2 * kPrecisionUsec);
job_thread->Schedule(job, 2 * kPrecisionUsec);
};
job_thread.Schedule(job);
job_thread->Schedule(job);

// Wait for the job to run at least once and reschedule itself.
usleep(1 * kPrecisionUsec);
ExecutePendingJobs(&job_thread);
ExecutePendingJobs(job_thread);
}
int end_value = counter;
EXPECT_GE(counter, 1);
Expand All @@ -100,31 +100,31 @@ TEST(JobThreadTest, CanceledJobsAreCanceled) {
std::atomic_int counter_1 = {0}, counter_2 = {0};
JobQueue::JobToken job_token_1, job_token_2;

JobThread job_thread{"JobThreadTests"};
ScopedJobThreadPtr job_thread(new JobThread("JobThreadTests"));
std::function<void()> job_1 = [&]() {
counter_1++;
job_token_1 = job_thread.Schedule(job_1);
job_token_1 = job_thread->Schedule(job_1);
};
std::function<void()> job_2 = [&]() {
counter_2++;
job_token_2 = job_thread.Schedule(job_2);
job_token_2 = job_thread->Schedule(job_2);
};

job_token_1 = job_thread.Schedule(job_1);
job_token_2 = job_thread.Schedule(job_2);
job_token_1 = job_thread->Schedule(job_1);
job_token_2 = job_thread->Schedule(job_2);

// Wait for the scheduled jobs to at least run once.
ExecutePendingJobs(&job_thread);
ExecutePendingJobs(job_thread);

// Cancel job 1 and grab the current counter values.
job_thread.ScheduleAndWait(
[&]() { job_thread.RemoveJobByToken(job_token_1); });
job_thread->ScheduleAndWait(
[&]() { job_thread->RemoveJobByToken(job_token_1); });
int checkpoint_1 = counter_1;
int checkpoint_2 = counter_2;

// Sleep and wait for pending jobs to run.
usleep(1 * kPrecisionUsec);
ExecutePendingJobs(&job_thread);
ExecutePendingJobs(job_thread);

// Job 1 should not have run again.
EXPECT_EQ(counter_1, checkpoint_1);
Expand All @@ -133,20 +133,20 @@ TEST(JobThreadTest, CanceledJobsAreCanceled) {
EXPECT_GT(counter_2, checkpoint_2);

// Cancel job 2 to avoid it scheduling itself during destruction.
job_thread.ScheduleAndWait(
[&]() { job_thread.RemoveJobByToken(job_token_2); });
job_thread->ScheduleAndWait(
[&]() { job_thread->RemoveJobByToken(job_token_2); });
}

TEST(JobThreadTest, QueueBelongsToCorrectThread) {
JobThread job_thread{"JobThreadTests"};
ScopedJobThreadPtr job_thread(new JobThread("JobThreadTests"));
JobQueue job_queue;

bool belongs_to_job_thread = false;
bool belongs_to_main_thread = false;

// Schedule in JobQueue owned by job thread.
job_thread.ScheduleAndWait([&]() {
belongs_to_job_thread = job_thread.BelongsToCurrentThread();
job_thread->ScheduleAndWait([&]() {
belongs_to_job_thread = job_thread->BelongsToCurrentThread();
belongs_to_main_thread = job_queue.BelongsToCurrentThread();
});

Expand All @@ -157,7 +157,7 @@ TEST(JobThreadTest, QueueBelongsToCorrectThread) {

// Schedule in JobQueue owned by main thread.
job_queue.Schedule([&]() {
belongs_to_job_thread = job_thread.BelongsToCurrentThread();
belongs_to_job_thread = job_thread->BelongsToCurrentThread();
belongs_to_main_thread = job_queue.BelongsToCurrentThread();
});

Expand Down
2 changes: 1 addition & 1 deletion starboard/shared/uwp/wasapi_audio_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class WASAPIAudioSink {
Mutex output_frames_mutex_;
std::queue<scoped_refptr<DecodedAudio>> pending_decoded_audios_;

std::unique_ptr<JobThread> job_thread_;
starboard::player::ScopedJobThreadPtr job_thread_;

starboard::ThreadChecker thread_checker_;
};
Expand Down
Loading
Loading