Skip to content

Commit 253b16e

Browse files
committed
Add multiple features for jobs engine like parent child dependencies, throttling with sleep between requests, timeout for processing
1 parent d3993dc commit 253b16e

File tree

5 files changed

+137
-112
lines changed

5 files changed

+137
-112
lines changed

.clang-format

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ AlignConsecutiveAssignments:
1212
AlignFunctionDeclarations: true
1313
AlignConsecutiveBitFields: true
1414
AlignConsecutiveMacros: true
15+
# PointerAlignment: Left
1516
BraceWrapping:
1617
AfterEnum: true
1718
AfterStruct: true

examples/examples_jobs_engine.h

Lines changed: 53 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ namespace examples::jobs_engine {
7777

7878
.m_types = {{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup12}},
7979
{JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup12}},
80-
{JobsType::kJobsType3, {.m_group = JobsGroupType::kJobsGroup3, .m_timeout = std::chrono::milliseconds(500)}},
80+
{JobsType::kJobsType3, {.m_group = JobsGroupType::kJobsGroup3, .m_timeout = std::chrono::milliseconds(700)}},
8181
{JobsType::kJobsDatabase, {.m_group = JobsGroupType::kJobsGroupDatabase}},
8282
{JobsType::kJobsCache, {.m_group = JobsGroupType::kJobsGroupCache}}}};
8383

@@ -88,53 +88,52 @@ namespace examples::jobs_engine {
8888

8989
// create a cache server (with workers to simulate access to it)
9090
// (as an external engine outside the jobs engine for demo purposes)
91-
small::worker_thread<JobsEng::JobsID> cache_server({.threads_count = 1}, [&](auto &w /*this*/, const auto &items) {
92-
for (auto &i : items) {
93-
std::cout << "thread " << std::this_thread::get_id()
94-
<< " CACHE processing"
95-
<< " {" << i << "}" << "\n";
91+
small::worker_thread<std::pair<JobsEng::JobsID, Request>> cache_server({.threads_count = 1}, [&](auto &w /*this*/, const auto &items) {
92+
for (auto &[job_id, req] : items) {
93+
std::cout << "worker thread " << std::this_thread::get_id()
94+
<< " CACHE processing"
95+
<< " {" << req.first << ", " << req.second << ", jobid=" << job_id << "}"
96+
<< " time " << small::toISOString(small::timeNow())
97+
<< "\n";
9698

9799
// mark the jobs id associated as succeeded (for demo purposes to avoid creating other structures)
98-
jobs.jobs_finished(i, (Response)i);
100+
jobs.jobs_finished(job_id, (Response)job_id);
99101
}
100102
// sleep long enough
101103
// no coalesce for demo purposes (sleep 500) so 3rd parent items is finished due to database and not cache server
102104
small::sleep(500);
103105
});
104106

107+
auto fn_print_item = [](auto item, std::string fn_type) {
108+
std::cout << "thread " << std::this_thread::get_id()
109+
<< std::setw(10) << fn_type
110+
<< " processing "
111+
<< "{"
112+
<< " jobid=" << std::setw(2) << item->m_id
113+
<< " type=" << std::setw(1) << (int)item->m_type
114+
<< " req.int=" << std::setw(2) << item->m_request.first << ","
115+
<< " req.str=\"" << item->m_request.second << "\""
116+
<< "}"
117+
<< " time " << small::toISOString(small::timeNow())
118+
<< "\n";
119+
};
120+
105121
// default processing used for job type 3 with custom delay in between requests
106122
// one request will succeed and one request will timeout for demo purposes
107-
jobs.config_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) {
123+
jobs.config_default_function_processing([&](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) {
108124
for (auto &item : jobs_items) {
109-
std::cout << "thread " << std::this_thread::get_id()
110-
<< " DEFAULT processing "
111-
<< "{"
112-
<< " type=" << (int)item->m_type
113-
<< " req.int=" << item->m_request.first << ","
114-
<< " req.str=\"" << item->m_request.second << "\""
115-
<< "}"
116-
<< " ref count " << item.use_count()
117-
<< " time " << small::toISOString(small::timeNow())
118-
<< "\n";
125+
fn_print_item(item, "DEFAULT");
119126
}
120127

121128
// set a custom delay (timeout for job3 is 500 ms)
122-
jobs_config.m_delay_next_request = std::chrono::milliseconds(1000);
129+
jobs_config.m_delay_next_request = std::chrono::milliseconds(500);
130+
small::sleep(500); // TODO remove this after delay works
123131
});
124132

125133
// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param)
126134
jobs.config_jobs_function_processing(JobsType::kJobsType1, [&](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) {
127135
for (auto &item : jobs_items) {
128-
std::cout << "thread " << std::this_thread::get_id()
129-
<< " JOB1 processing "
130-
<< "{"
131-
<< " type=" << (int)item->m_type
132-
<< " req.int=" << item->m_request.first << ","
133-
<< " req.str=\"" << item->m_request.second << "\""
134-
<< "}"
135-
<< " ref count " << item.use_count()
136-
<< " time " << small::toISOString(small::timeNow())
137-
<< "\n";
136+
fn_print_item(item, "JOB1");
138137

139138
// add 2 more children jobs for current one for database and server cache
140139
JobsEng::JobsID jobs_child_db_id{};
@@ -152,7 +151,7 @@ namespace examples::jobs_engine {
152151

153152
j.jobs_start(small::EnumPriorities::kNormal, jobs_child_db_id);
154153
// jobs_child_cache_id has no threads to execute, it has external executors
155-
cache_server.push_back(jobs_child_cache_id);
154+
cache_server.push_back({jobs_child_cache_id, item->m_request});
156155
}
157156
small::sleep(30); }, 5 /*param b*/);
158157

@@ -165,25 +164,16 @@ namespace examples::jobs_engine {
165164
// TODO set state merge daca e doar o dependinta, daca sunt mai multe atunci ar tb o functie custom - childProcessing (desi are sau nu are children - sau cum fac un dummy children - poate cu thread_count 0?)
166165

167166
// add specific function for job2
168-
jobs.config_jobs_function_processing(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) {
169-
bool first_job = true;
167+
jobs.config_jobs_function_processing(JobsType::kJobsType2, [&](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) {
168+
static bool first_job = true;
170169
for (auto &item : jobs_items) {
171-
std::cout << "thread " << std::this_thread::get_id()
172-
<< " JOB2 processing "
173-
<< "{"
174-
<< " type=" << (int)item->m_type
175-
<< " req.int=" << item->m_request.first << ","
176-
<< " req.str=\"" << item->m_request.second << "\""
177-
<< "}"
178-
<< " ref count " << item.use_count()
179-
<< " time " << small::toISOString(small::timeNow())
180-
<< "\n";
170+
fn_print_item(item, "JOB2");
181171

182172
if (first_job) {
183173
// for type 2 only database children (for demo purposes no result will be used from database)
184174
auto ret = j.queue().push_back_and_start_child(item->m_id /*parent*/,
185-
small::EnumPriorities::kNormal,
186-
JobsType::kJobsDatabase,
175+
small::EnumPriorities::kNormal,
176+
JobsType::kJobsDatabase,
187177
item->m_request);
188178
if (!ret) {
189179
j.jobs_failed(item->m_id);
@@ -195,7 +185,21 @@ namespace examples::jobs_engine {
195185
first_job = false;
196186
}
197187
// TODO config to wait after request (even if it is not specified in the global config - so custom throttle)
198-
small::sleep(30); });
188+
small::sleep(30);
189+
jobs_config.m_delay_next_request = std::chrono::milliseconds(30);
190+
});
191+
192+
// add specific function for job2
193+
jobs.config_jobs_function_processing(JobsType::kJobsDatabase, [&](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) {
194+
for (auto &item : jobs_items) {
195+
// simulate long db call
196+
small::sleep(200);
197+
198+
fn_print_item(item, "DATABASE");
199+
200+
// this job will be auto-finished
201+
}
202+
});
199203

200204
// TODO add function for database where demonstrate coalesce of 3 items (sleep 1000)
201205
// TODO add function for cache server - no coalesce for demo purposes (sleep 500) so 3rd parent items is finished due to database and not cache server
@@ -207,8 +211,8 @@ namespace examples::jobs_engine {
207211
std::vector<JobsEng::JobsID> jobs_ids;
208212

209213
// type3 one request will succeed and one request will timeout for demo purposes
210-
jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType3, {3, "normal3"}, &jobs_id);
211-
jobs.queue().push_back_and_start(small::EnumPriorities::kHigh, JobsType::kJobsType3, {3, "high3"}, &jobs_id);
214+
jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(100), small::EnumPriorities::kNormal, JobsType::kJobsType3, {3, "normal3"}, &jobs_id);
215+
jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(100), small::EnumPriorities::kHigh, JobsType::kJobsType3, {3, "high3"}, &jobs_id);
212216

213217
// type2 only the first request succeeds and waits for child the other fails from the start
214218
jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType2, {2, "normal2"}, &jobs_id);
@@ -242,6 +246,8 @@ namespace examples::jobs_engine {
242246
auto ret = jobs.wait_for(std::chrono::milliseconds(100)); // wait to finished
243247
std::cout << "wait for with timeout, ret = " << static_cast<int>(ret) << " as timeout\n";
244248
jobs.wait(); // wait here for jobs to finish due to exit flag
249+
cache_server.signal_exit_force();
250+
cache_server.wait();
245251

246252
std::cout << "size = " << jobs.size() << "\n";
247253

include/impl/jobs_item_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ namespace small::jobsimpl {
101101
inline void set_state_cancelled () { set_state(EnumJobsState::kCancelled); }
102102

103103
inline bool is_state (const EnumJobsState &state) { return m_state.load() == state; }
104+
static bool is_state_complete (const EnumJobsState &state) { return state >= EnumJobsState::kFinished; }
104105

105106
inline bool is_state_inprogress () { return is_state(EnumJobsState::kInProgress); }
106107
inline void is_state_waitchildren () { return is_state(EnumJobsState::kWaitChildren); }

include/impl/jobs_queue_impl.h

Lines changed: 10 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
namespace small::jobsimpl {
1111
//
12-
// small queue helper class for jobs (parent caller must implement 'jobs_schedule', 'jobs_finished')
12+
// small queue helper class for jobs (parent caller must implement 'jobs_add', 'jobs_schedule', 'jobs_finished')
1313
//
1414
template <typename JobsTypeT, typename JobsRequestT, typename JobsResponseT, typename JobsGroupT, typename JobsPrioT, typename ParentCallerT>
1515
class jobs_queue
@@ -94,17 +94,14 @@ namespace small::jobsimpl {
9494
// config job types
9595
// m_types_queues will be initialized in the initial setup phase and will be accessed without locking afterwards
9696
//
97-
inline bool config_jobs_type(const JobsTypeT &jobs_type, const JobsGroupT &jobs_group, const std::optional<std::chrono::milliseconds> &jobs_timeout)
97+
inline bool config_jobs_type(const JobsTypeT &jobs_type, const JobsGroupT &jobs_group)
9898
{
9999
auto it_g = m_groups_queues.find(jobs_group);
100100
if (it_g == m_groups_queues.end()) {
101101
return false;
102102
}
103103

104104
m_types_queues[jobs_type] = &it_g->second;
105-
if (jobs_timeout) {
106-
m_types_timeouts[jobs_type] = *jobs_timeout;
107-
}
108105
return true;
109106
}
110107

@@ -562,8 +559,9 @@ namespace small::jobsimpl {
562559
jobs_item->m_id = id;
563560
m_jobs.emplace(id, jobs_item);
564561

565-
// add it to the timeout queue
566-
jobs_start_timeout(jobs_item);
562+
// call parent for extra processing
563+
m_parent_caller.jobs_add(jobs_item);
564+
567565
return id;
568566
}
569567

@@ -610,21 +608,6 @@ namespace small::jobsimpl {
610608
return ret;
611609
}
612610

613-
//
614-
// add it to the timeout queue
615-
//
616-
inline std::size_t jobs_start_timeout(std::shared_ptr<JobsItem> jobs_item)
617-
{
618-
std::unique_lock l(m_lock);
619-
620-
// only if job type has config a timeout
621-
auto it_timeout = m_types_timeouts.find(jobs_item->m_type);
622-
if (it_timeout == m_types_timeouts.end()) {
623-
return 0;
624-
}
625-
return m_timeout_queue.queue().push_delay_for(it_timeout->second, jobs_item->m_id);
626-
}
627-
628611
//
629612
// erase jobs item
630613
//
@@ -685,34 +668,18 @@ namespace small::jobsimpl {
685668
return count;
686669
}
687670

688-
//
689-
// inner thread function for timeout items
690-
// called from m_timeout_queue
691-
//
692-
using JobsQueueTimeout = small::time_queue_thread<JobsID, ThisJobsQueue>;
693-
friend JobsQueueTimeout;
694-
695-
inline std::size_t push_back(std::vector<JobsID> &&jobs_ids)
696-
{
697-
m_parent_caller.jobs_timeout(jobs_ids);
698-
return jobs_ids.size();
699-
}
700-
701671
private:
702672
//
703673
// members
704674
//
705-
mutable small::base_lock m_lock; // global locker
706-
std::atomic<JobsID> m_jobs_seq_id{}; // to get the next jobs id
707-
std::unordered_map<JobsID, std::shared_ptr<JobsItem>> m_jobs; // current jobs
708-
std::unordered_map<JobsGroupT, JobsQueue> m_groups_queues; // map of queues by group
709-
std::unordered_map<JobsTypeT, JobsQueue *> m_types_queues; // optimize to have queues by type (which reference queues by group)
710-
std::unordered_map<JobsTypeT, std::chrono::milliseconds> m_types_timeouts; // timeouts for types
675+
mutable small::base_lock m_lock; // global locker
676+
std::atomic<JobsID> m_jobs_seq_id{}; // to get the next jobs id
677+
std::unordered_map<JobsID, std::shared_ptr<JobsItem>> m_jobs; // current jobs
678+
std::unordered_map<JobsGroupT, JobsQueue> m_groups_queues; // map of queues by group
679+
std::unordered_map<JobsTypeT, JobsQueue *> m_types_queues; // optimize to have queues by type (which reference queues by group)
711680

712681
JobQueueDelayedT m_delayed_items{*this}; // queue of delayed items
713682

714-
JobsQueueTimeout m_timeout_queue{*this}; // for timeout elements
715-
716683
ParentCallerT &m_parent_caller; // jobs engine
717684
};
718685
} // namespace small::jobsimpl

0 commit comments

Comments
 (0)