Skip to content

Commit ac51687

Browse files
committed
Add multiple features for jobs engine like parent child dependencies, throttling with sleep between requests, timeout for processing
1 parent 65a2cee commit ac51687

File tree

4 files changed

+68
-49
lines changed

4 files changed

+68
-49
lines changed

examples/examples_jobs_engine.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ namespace examples::jobs_engine {
127127

128128
// set a custom delay (timeout for job3 is 500 ms)
129129
jobs_config.m_delay_next_request = std::chrono::milliseconds(500);
130-
small::sleep(500); // TODO remove this after delay works
131130
});
132131

133132
// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param)
@@ -184,8 +183,7 @@ namespace examples::jobs_engine {
184183

185184
first_job = false;
186185
}
187-
// TODO config to wait after request (even if it is not specified in the global config - so custom throttle)
188-
small::sleep(30);
186+
// config to wait after request (even if it is not specified in the global config - so custom throttle)
189187
jobs_config.m_delay_next_request = std::chrono::milliseconds(30);
190188
});
191189

include/impl/jobs_engine_thread_pool_impl.h

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ namespace small::jobsimpl {
7373
// even if here it is considered that there are items and something will be scheduled,
7474
// the actual check if work will still exists will be done in do_action of parent
7575
auto &stats = it->second;
76-
jobs_action_start(job_group, true, stats);
76+
jobs_action_start(job_group, true /*has items*/, std::chrono::milliseconds(0) /*delay*/, stats);
7777
}
7878

7979
// clang-format off
@@ -129,7 +129,7 @@ namespace small::jobsimpl {
129129
//
130130
// to trigger action (if needed for the new job group)
131131
//
132-
inline void jobs_action_start(const JobGroupT &job_group, const bool has_items, JobGroupStats &stats)
132+
inline void jobs_action_start(const JobGroupT &job_group, const bool has_items, const std::chrono::milliseconds &delay_next_request, JobGroupStats &stats)
133133
{
134134
if (!has_items) {
135135
return;
@@ -141,14 +141,18 @@ namespace small::jobsimpl {
141141
bool needs_runners = stats.m_running < stats.m_threads_count;
142142
if (needs_runners) {
143143
++stats.m_running;
144-
m_workers.push_back(job_group);
144+
if (delay_next_request.count() > 0) {
145+
m_workers.push_back_delay_for(delay_next_request, job_group);
146+
} else {
147+
m_workers.push_back(job_group);
148+
}
145149
}
146150
}
147151

148152
//
149153
// job action ended
150154
//
151-
inline void jobs_action_end(const JobGroupT &job_group, const bool has_items)
155+
inline void jobs_action_end(const JobGroupT &job_group, const bool has_items, const std::chrono::milliseconds &delay_next_request)
152156
{
153157
auto it = m_scheduler.find(job_group); // map is not changed, so can be access without locking
154158
if (it == m_scheduler.end()) {
@@ -160,7 +164,7 @@ namespace small::jobsimpl {
160164
auto &stats = it->second;
161165
--stats.m_running;
162166

163-
jobs_action_start(job_group, has_items, stats);
167+
jobs_action_start(job_group, has_items, delay_next_request, stats);
164168
}
165169

166170
//
@@ -170,11 +174,21 @@ namespace small::jobsimpl {
170174
{
171175
for (auto job_group : items) {
172176

173-
bool has_items = false;
174-
m_parent_caller.do_action(job_group, &has_items);
177+
std::chrono::milliseconds delay_next_request{};
178+
179+
auto ret = m_parent_caller.do_action(job_group, delay_next_request);
180+
bool has_items = ret == small::EnumLock::kElement;
181+
182+
// has items does not mean that there are still items in the queue
183+
// it means that items were processed in this iteration
184+
// as such an execution will be scheduled no matter what
185+
// and next processing time will check to see if there are items or not
186+
187+
// for delay is important items -> delay -> next item -> delay ....
188+
// -> empty ... when new items arrive delay was already processed
175189

176190
// start another action
177-
jobs_action_end(job_group, has_items);
191+
jobs_action_end(job_group, has_items, delay_next_request);
178192
}
179193
}
180194

include/impl/jobs_queue_impl.h

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,17 @@ namespace small::jobsimpl {
517517
//
518518
// get job items
519519
//
520+
inline std::shared_ptr<JobsItem> *jobs_get(const JobsID &jobs_id)
521+
{
522+
std::unique_lock l(m_lock);
523+
524+
auto it_j = m_jobs.find(jobs_id);
525+
if (it_j == m_jobs.end()) {
526+
return nullptr;
527+
}
528+
return &it_j->second;
529+
}
530+
520531
inline std::vector<std::shared_ptr<JobsItem>> jobs_get(const std::vector<JobsID> &jobs_ids)
521532
{
522533
std::vector<std::shared_ptr<JobsItem>> jobs_items;
@@ -525,27 +536,15 @@ namespace small::jobsimpl {
525536
std::unique_lock l(m_lock);
526537

527538
for (auto &jobs_id : jobs_ids) {
528-
auto it_j = m_jobs.find(jobs_id);
529-
if (it_j == m_jobs.end()) {
530-
continue;
539+
auto *jobs_item = jobs_get(jobs_id);
540+
if (jobs_item) {
541+
jobs_items.push_back(*jobs_item);
531542
}
532-
jobs_items.push_back(it_j->second);
533543
}
534544

535545
return jobs_items; // will be moved
536546
}
537547

538-
inline std::shared_ptr<JobsItem> *jobs_get(const JobsID &jobs_id)
539-
{
540-
std::unique_lock l(m_lock);
541-
542-
auto it_j = m_jobs.find(jobs_id);
543-
if (it_j == m_jobs.end()) {
544-
return nullptr;
545-
}
546-
return &it_j->second;
547-
}
548-
549548
//
550549
// add jobs item
551550
//

include/jobs_engine.h

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -431,15 +431,11 @@ namespace small {
431431
}
432432

433433
//
434-
// inner thread function for executing items (should return if there are more items) (called from thread_pool)
434+
// get jobs to execute based on the group
435435
//
436-
friend small::jobsimpl::jobs_engine_thread_pool<JobsGroupT, ThisJobsEngine>;
437-
438-
inline EnumLock do_action(const JobsGroupT &jobs_group, bool *has_items)
436+
inline EnumLock get_group_jobs(const JobsGroupT &jobs_group, std::vector<JobsID> &vec_ids, typename JobsConfig::ConfigProcessing &group_config)
439437
{
440-
*has_items = false;
441-
442-
// get bulk_count
438+
// get bulk_count property
443439
auto it_cfg_grp = m_config.m_groups.find(jobs_group);
444440
if (it_cfg_grp == m_config.m_groups.end()) {
445441
return small::EnumLock::kExit;
@@ -448,7 +444,6 @@ namespace small {
448444
int bulk_count = std::max(it_cfg_grp->second.m_bulk_count, 1);
449445

450446
// delay request
451-
typename JobsConfig::ConfigProcessing group_config{};
452447
group_config.m_delay_next_request = it_cfg_grp->second.m_delay_next_request;
453448

454449
// get items to process
@@ -457,40 +452,53 @@ namespace small {
457452
return small::EnumLock::kExit;
458453
}
459454

460-
std::vector<JobsID> vec_ids;
461-
auto ret = q->wait_pop_front_for(std::chrono::nanoseconds(0), vec_ids, bulk_count);
455+
auto ret = q->wait_pop_front_for(std::chrono::nanoseconds(0), vec_ids, bulk_count);
456+
return ret;
457+
}
458+
459+
//
460+
// inner thread function for executing items (should return if there are more items) (called from thread_pool)
461+
//
462+
friend small::jobsimpl::jobs_engine_thread_pool<JobsGroupT, ThisJobsEngine>;
463+
464+
inline EnumLock do_action(const JobsGroupT &jobs_group, std::chrono::milliseconds &delay_next_request)
465+
{
466+
// get jobs for the group
467+
typename JobsConfig::ConfigProcessing group_config{}; // for delay request
468+
std::vector<JobsID> vec_ids;
469+
470+
auto ret = get_group_jobs(jobs_group, vec_ids, group_config);
462471
if (ret != small::EnumLock::kElement) {
463472
return ret;
464473
}
465474

466-
*has_items = true;
467-
468475
// split by type
469-
std::unordered_map<JobsTypeT, std::vector<std::shared_ptr<JobsItem>>> elems_by_type;
476+
std::unordered_map<JobsTypeT, std::vector<std::shared_ptr<JobsItem>>> jobs_in_progress_by_type;
470477
{
471478
// get jobs
472-
std::vector<std::shared_ptr<JobsItem>> jobs_items = m_queue.jobs_get(vec_ids);
479+
std::vector<std::shared_ptr<JobsItem>> jobs_items = queue().jobs_get(vec_ids);
473480
for (auto &jobs_item : jobs_items) {
474-
elems_by_type[jobs_item->m_type].reserve(jobs_items.size());
481+
jobs_in_progress_by_type[jobs_item->m_type].reserve(jobs_items.size());
475482

476483
// mark the item as in progress
477484
jobs_item->set_state_inprogress();
478-
// execute if it is still in progress (may be moved to higher states due to external factors like cancel, timeout, finish early due to other job, etc)
485+
// execute if it is still in progress
486+
// (may be moved to higher states due to external factors like cancel, timeout, finish early due to other job, etc)
479487
if (jobs_item->is_state_inprogress()) {
480-
elems_by_type[jobs_item->m_type].push_back(jobs_item);
488+
jobs_in_progress_by_type[jobs_item->m_type].push_back(jobs_item);
481489
}
482490
}
483491
}
484492

485493
// process specific job by type
486-
for (auto &[jobs_type, jobs] : elems_by_type) {
494+
for (auto &[jobs_type, jobs] : jobs_in_progress_by_type) {
487495
auto it_cfg_type = m_config.m_types.find(jobs_type);
488496
if (it_cfg_type == m_config.m_types.end()) {
489497
continue;
490498
}
491499

492500
// process specific jobs by type
493-
typename JobsConfig::ConfigProcessing type_config;
501+
typename JobsConfig::ConfigProcessing type_config{};
494502
it_cfg_type->second.m_function_processing(jobs, type_config);
495503

496504
// get the max for config
@@ -503,13 +511,13 @@ namespace small {
503511
}
504512

505513
// mark the item as in wait for children of finished
506-
// if in callback the state is set to failed, cancelled or timeout setting to finish wont succeed because if less value than those
514+
// if in callback the state is set to failed, cancelled or timeout
515+
// setting to finish wont succeed because is less value than those
507516
jobs_waitforchildren(jobs);
508517
}
509518

510-
// TODO group_config.m_delay_next_request
511-
// TODO for delay after requests use worker_thread delay item -> check if has_items should be set properly
512-
// TODO if delay is set set has_items to true to force the sleep, but also a last time sleep so if there too much time and are no items dont continue
519+
// delay request
520+
delay_next_request = group_config.m_delay_next_request.value_or(std::chrono::milliseconds(0));
513521

514522
return ret;
515523
}

0 commit comments

Comments
 (0)