diff --git a/examples/examples_jobs_engine.h b/examples/examples_jobs_engine.h index f0078fe..0f648e2 100644 --- a/examples/examples_jobs_engine.h +++ b/examples/examples_jobs_engine.h @@ -33,6 +33,7 @@ namespace examples::jobs_engine { // enum class JobsType { + kJobsNone = 0, kJobsType1, kJobsType2, kJobsType3, @@ -48,8 +49,9 @@ namespace examples::jobs_engine { kJobsGroupCache, }; - using Request = std::pair; - using JobsEng = small::jobs_engine; + using Request = std::pair; + using Response = int; + using JobsEng = small::jobs_engine; auto jobs_function_processing = [](const std::vector> &items, JobsEng::JobsConfig::ConfigProcessing & /* config */) { // this functions is defined without the engine params (it is here just for the example) @@ -86,16 +88,17 @@ namespace examples::jobs_engine { // create a cache server (with workers to simulate access to it) // (as an external engine outside the jobs engine for demo purposes) - small::worker_thread cache_server({.threads_count = 1}, [](auto &w /*this*/, const auto &items) { - // process item using the workers lock (not recommended) - + small::worker_thread cache_server({.threads_count = 1}, [&](auto &w /*this*/, const auto &items) { for (auto &i : items) { std::cout << "thread " << std::this_thread::get_id() - << " processing cache {" << i << "}" << "\n"; + << " CACHE processing" + << " {" << i << "}" << "\n"; - // TODO mark the jobs id associated as succeeded (for demo purposes to avoid creating other structures) + // mark the jobs id associated as succeeded (for demo purposes to avoid creating other structures) + jobs.jobs_finished(i, (Response)i); } // sleep long enough + // no coalesce for demo purposes (sleep 500) so 3rd parent items is finished due to database and not cache server small::sleep(500); }); @@ -120,10 +123,10 @@ namespace examples::jobs_engine { }); // add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param) - jobs.config_jobs_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) { + jobs.config_jobs_function_processing(JobsType::kJobsType1, [&](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) { for (auto &item : jobs_items) { std::cout << "thread " << std::this_thread::get_id() - << " JOB1 processing " + << " JOB1 processing " << "{" << " type=" << (int)item->m_type << " req.int=" << item->m_request.first << "," @@ -132,22 +135,41 @@ namespace examples::jobs_engine { << " ref count " << item.use_count() << " time " << small::toISOString(small::timeNow()) << "\n"; - // TODO add 2 more children jobs for current one for database and server cache - // TODO save somewhere in an unordered_map the database requests - the problem is that jobid is received after push_jobs - // TODO save type1 requests into a promises unordered_map - // TODO for type 2 only database (add another processing function) + + // add 2 more children jobs for current one for database and server cache + JobsEng::JobsID jobs_child_db_id{}; + JobsEng::JobsID jobs_child_cache_id{}; + + auto ret = j.queue().push_back_child(item->m_id /*parent*/, JobsType::kJobsDatabase, item->m_request, &jobs_child_db_id); + if (!ret) { + j.jobs_failed(item->m_id); + } + ret = j.queue().push_back_child(item->m_id /*parent*/, JobsType::kJobsCache, item->m_request, &jobs_child_cache_id); + if (!ret) { + j.jobs_failed(jobs_child_db_id); + j.jobs_failed(item->m_id); + } + + j.jobs_start(small::EnumPriorities::kNormal, jobs_child_db_id); + // jobs_child_cache_id has no threads to execute, it has external executors + cache_server.push_back(jobs_child_cache_id); } small::sleep(30); }, 5 /*param b*/); + // TODO save type1 requests into a promises unordered_map and complete on finishing the job + // TODO add custom finish function for jobtype1 to complete the promises + + // TODO save somewhere in an unordered_map the database requests (passes as request params for job type1) // TODO daca as vrea sa folosesc un alt job_server cum modelez asa incat jobul dintr-o parte sa ramana intr-o stare ca si cand ar avea copii si // TODO sa se faca un request in alta parte si ala cand se termina pe finish (sau daca e worker thread in functia de procesare) sa faca set state // TODO set state merge daca e doar o dependinta, daca sunt mai multe atunci ar tb o functie custom - childProcessing (desi are sau nu are children - sau cum fac un dummy children - poate cu thread_count 0?) // add specific function for job2 jobs.config_jobs_function_processing(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) { + bool first_job = true; for (auto &item : jobs_items) { std::cout << "thread " << std::this_thread::get_id() - << " JOB2 processing " + << " JOB2 processing " << "{" << " type=" << (int)item->m_type << " req.int=" << item->m_request.first << "," @@ -156,7 +178,21 @@ namespace examples::jobs_engine { << " ref count " << item.use_count() << " time " << small::toISOString(small::timeNow()) << "\n"; - // TODO for type 2 only database children (add another processing function) + + if (first_job) { + // for type 2 only database children (for demo purposes no result will be used from database) + auto ret = j.queue().push_back_and_start_child(item->m_id /*parent*/, + small::EnumPriorities::kNormal, + JobsType::kJobsDatabase, + item->m_request); + if (!ret) { + j.jobs_failed(item->m_id); + } + } else { + j.jobs_failed(item->m_id); + } + + first_job = false; } // TODO config to wait after request (even if it is not specified in the global config - so custom throttle) small::sleep(30); }); @@ -170,35 +206,36 @@ namespace examples::jobs_engine { JobsEng::JobsID jobs_id{}; std::vector jobs_ids; - // TODO create a promises/futures unordered_map for type1 requests and wait later + // type3 one request will succeed and one request will timeout for demo purposes + jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType3, {3, "normal3"}, &jobs_id); + jobs.queue().push_back_and_start(small::EnumPriorities::kHigh, JobsType::kJobsType3, {3, "high3"}, &jobs_id); + + // type2 only the first request succeeds and waits for child the other fails from the start + jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType2, {2, "normal2"}, &jobs_id); + jobs.queue().push_back_and_start(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high2"}, &jobs_id); // show coalesce for children database requests std::unordered_map web_requests; - // TODO type3 one request will succeed and one request will timeout for demo purposes - // push - jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id); - jobs.queue().push_back_and_start(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id); - - jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id); - jobs.queue().push_back_and_start(small::EnumPriorities::kHigh, JobsType::kJobsType1, {4, "high"}, &jobs_id); - jobs.queue().push_back_and_start(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id); - - Request req = {6, "normal"}; - jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType1, req, nullptr); + // TODO create a promises/futures unordered_map for type1 requests and wait later + // push with multiple variants + jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType1, {11, "normal11"}, &jobs_id); std::vector> jobs_items = { - std::make_shared(JobsType::kJobsType1, Request{7, "highest"}), - std::make_shared(JobsType::kJobsType1, Request{8, "highest"}), + std::make_shared(JobsType::kJobsType1, Request{12, "highest12"}), }; jobs.queue().push_back_and_start(small::EnumPriorities::kHighest, jobs_items, &jobs_ids); - jobs.queue().push_back_and_start(small::EnumPriorities::kHighest, {std::make_shared(JobsType::kJobsType1, Request{9, "highest"})}, &jobs_ids); - jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id); - jobs.queue().push_back_and_start_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id); - jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobsType::kJobsType1, {102, "delay normal"}, &jobs_id); + jobs.queue().push_back_and_start(small::EnumPriorities::kLow, JobsType::kJobsType1, {13, "low13"}, nullptr); + + Request req = {14, "normal14"}; + jobs.queue().push_back(JobsType::kJobsType1, req, &jobs_id); + jobs.jobs_start(small::EnumPriorities::kNormal, jobs_id); + + jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {115, "delay normal115"}, &jobs_id); - jobs.start_threads(3); // manual start threads + // manual start threads + jobs.start_threads(3); small::sleep(50); // jobs.signal_exit_force(); diff --git a/include/jobs_engine.h b/include/jobs_engine.h index 7517c87..f24fae4 100644 --- a/include/jobs_engine.h +++ b/include/jobs_engine.h @@ -108,7 +108,7 @@ namespace small { // clear inline void clear() { - std::unique_lock l(m_queue); + std::unique_lock l(*this); m_queue.clear(); m_thread_pool.clear(); } @@ -251,36 +251,69 @@ namespace small { // // set jobs state // - inline void jobs_progress(const JobsID &jobs_id, const int &progress) + inline bool jobs_progress(const JobsID &jobs_id, const int &progress) { - jobs_set_progress(jobs_id, progress); + return jobs_set_progress(jobs_id, progress); } - inline void jobs_finished(const JobsID &jobs_id) + inline bool jobs_response(const JobsID &jobs_id, const JobsResponseT &jobs_response) { - jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFinished); + return jobs_set_response(jobs_id, jobs_response); } - inline void jobs_finished(const std::vector &jobs_ids) + inline bool jobs_response(const JobsID &jobs_id, JobsResponseT &&jobs_response) { - jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kFinished); + return jobs_set_response(jobs_id, jobs_response); } - inline void jobs_failed(const JobsID &jobs_id) + inline bool jobs_finished(const JobsID &jobs_id) { - jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFailed); + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFinished); } - inline void jobs_failed(const std::vector &jobs_ids) + inline bool jobs_finished(const std::vector &jobs_ids) { - jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kFailed); + return jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kFinished); + } + inline bool jobs_finished(const JobsID &jobs_id, const JobsResponseT &jobs_response) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFinished, jobs_response); + } + inline bool jobs_finished(const JobsID &jobs_id, JobsResponseT &&jobs_response) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFinished, std::forward(jobs_response)); } - inline void jobs_cancelled(const JobsID &jobs_id) + inline bool jobs_failed(const JobsID &jobs_id) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFailed); + } + inline bool jobs_failed(const std::vector &jobs_ids) + { + return jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kFailed); + } + inline bool jobs_failed(const JobsID &jobs_id, const JobsResponseT &jobs_response) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFailed, jobs_response); + } + inline bool jobs_failed(const JobsID &jobs_id, JobsResponseT &&jobs_response) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFailed, std::forward(jobs_response)); + } + + inline bool jobs_cancelled(const JobsID &jobs_id) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kCancelled); + } + inline bool jobs_cancelled(const std::vector &jobs_ids) + { + return jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kCancelled); + } + inline bool jobs_cancelled(const JobsID &jobs_id, const JobsResponseT &jobs_response) { - jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kCancelled); + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kCancelled, jobs_response); } - inline void jobs_cancelled(const std::vector &jobs_ids) + inline bool jobs_cancelled(const JobsID &jobs_id, JobsResponseT &&jobs_response) { - jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kCancelled); + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kCancelled, std::forward(jobs_response)); } // clang-format off @@ -469,81 +502,146 @@ namespace small { } // - // jobs states + // jobs area transform from id to jobs_item // - inline void jobs_set_progress(const JobsID &jobs_id, const int &progress) + inline bool jobs_set_progress(const JobsID &jobs_id, const int &progress) { auto *jobs_item = jobs_get(jobs_id); if (!jobs_item) { - return; + return false; } + return jobs_set_progress(*jobs_item, progress); + } - (*jobs_item)->set_progress(progress); + inline bool jobs_set_response(const JobsID &jobs_id, const JobsResponseT &jobs_response) + { + std::unique_lock l(*this); + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return false; + } + return jobs_set_response(*jobs_item, jobs_response); + } - if (progress == 100) { - jobs_finished(jobs_id); + inline bool jobs_set_response(const JobsID &jobs_id, JobsResponseT &&jobs_response) + { + std::unique_lock l(*this); + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return false; } + return jobs_set_response(*jobs_item, std::forward(jobs_response)); } - inline void jobs_waitforchildren(const std::vector> &jobs_items) + inline bool jobs_set_state(const JobsID &jobs_id, const small::jobsimpl::EnumJobsState &jobs_state) { - // set the jobs as waitforchildren only if there are children otherwise advance to finish - jobs_set_state(jobs_items, small::jobsimpl::EnumJobsState::kTimeout); + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return false; + } + return jobs_set_state(*jobs_item, jobs_state); } - inline void jobs_timeout(const std::vector &jobs_ids) + inline bool jobs_set_state(const JobsID &jobs_id, const small::jobsimpl::EnumJobsState &jobs_state, const JobsResponseT &jobs_response) { - // set the jobs as timeout if it is not finished until now (called from queue) - jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kTimeout); + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return false; + } + jobs_set_response(*jobs_item, jobs_response); + return jobs_set_state(*jobs_item, jobs_state); } - // - // apply state - // - inline void jobs_set_state(const JobsID &jobs_id, const small::jobsimpl::EnumJobsState &jobs_state) + inline bool jobs_set_state(const JobsID &jobs_id, const small::jobsimpl::EnumJobsState &jobs_state, JobsResponseT &&jobs_response) { auto *jobs_item = jobs_get(jobs_id); if (!jobs_item) { - return; + return false; } + jobs_set_response(*jobs_item, std::forward(jobs_response)); + return jobs_set_state(*jobs_item, jobs_state); + } - auto ret = jobs_set_state(*jobs_item, jobs_state); - if (ret) { - jobs_completed({*jobs_item}); + inline std::size_t jobs_timeout(const std::vector &jobs_ids) + { + // set the jobs as timeout if it is not finished until now (called from queue) + return jobs_set_state(jobs_get(jobs_ids), small::jobsimpl::EnumJobsState::kTimeout); + } + + // + // jobs set progress + // + inline bool jobs_set_progress(const std::shared_ptr &jobs_item, const int &progress) + { + jobs_item->set_progress(progress); + if (progress == 100) { + jobs_finished(jobs_item->m_id); } + return true; } - inline void jobs_set_state(const std::vector &jobs_ids, const small::jobsimpl::EnumJobsState &jobs_state) + // + // jobs set response + // + inline void jobs_set_response(std::shared_ptr &jobs_item, const JobsResponseT &jobs_response) { - auto jobs_items = jobs_get(jobs_ids); - jobs_set_state(jobs_items, jobs_state); + std::unique_lock l(*this); + jobs_item->m_response = jobs_response; + } + + inline void jobs_set_response(std::shared_ptr &jobs_item, JobsResponseT &&jobs_response) + { + std::unique_lock l(*this); + jobs_item->m_response = std::move(jobs_response); + } + + // + // jobs set states + // + inline std::size_t jobs_waitforchildren(const std::vector> &jobs_items) + { + // set the jobs as waitforchildren only if there are children otherwise advance to finish + return jobs_set_state(jobs_items, small::jobsimpl::EnumJobsState::kTimeout); + } + + // + // apply state + // + inline bool jobs_set_state(const std::shared_ptr &jobs_item, const small::jobsimpl::EnumJobsState &jobs_state) + { + auto ret = jobs_apply_state(jobs_item, jobs_state); + if (ret) { + jobs_completed({jobs_item}); + } + return ret; } - inline void jobs_set_state(const std::vector> &jobs_items, const small::jobsimpl::EnumJobsState &jobs_state) + inline std::size_t jobs_set_state(const std::vector> &jobs_items, const small::jobsimpl::EnumJobsState &jobs_state) { std::vector> changed_items; changed_items.reserve(jobs_items.size()); for (auto &jobs_item : jobs_items) { - auto ret = jobs_set_state(jobs_item, jobs_state); + auto ret = jobs_apply_state(jobs_item, jobs_state); if (ret) { changed_items.push_back(jobs_item); } } jobs_completed(changed_items); + return changed_items.size(); } - inline std::size_t jobs_set_state(std::shared_ptr jobs_item, small::jobsimpl::EnumJobsState jobs_state) + inline bool jobs_apply_state(std::shared_ptr jobs_item, small::jobsimpl::EnumJobsState jobs_state) { // state is already the same if (jobs_item->is_state(jobs_state)) { - return 0; + return false; } // set the jobs as timeout if it is not finished until now if (jobs_state == small::jobsimpl::EnumJobsState::kTimeout && jobs_item->is_state_finished()) { - return 0; + return false; } // set the jobs as waitforchildren only if there are children otherwise advance to finish @@ -555,7 +653,7 @@ namespace small { } jobs_item->set_state(jobs_state); - return jobs_item->is_state(jobs_state) ? 1 : 0; + return jobs_item->is_state(jobs_state); } //