Skip to content

Commit

Permalink
Add multiple features for jobs engine like parent child dependencies,…
Browse files Browse the repository at this point in the history
… throttling with sleep between requests, timeout for processing
  • Loading branch information
herrcristi committed Jan 13, 2025
1 parent e6a342d commit d3993dc
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 78 deletions.
105 changes: 71 additions & 34 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace examples::jobs_engine {
//
enum class JobsType
{
kJobsNone = 0,
kJobsType1,
kJobsType2,
kJobsType3,
Expand All @@ -48,8 +49,9 @@ namespace examples::jobs_engine {
kJobsGroupCache,
};

using Request = std::pair<int, std::string>;
using JobsEng = small::jobs_engine<JobsType, Request, int /*response*/, JobsGroupType>;
using Request = std::pair<int, std::string>;
using Response = int;
using JobsEng = small::jobs_engine<JobsType, Request, Response, JobsGroupType>;

auto jobs_function_processing = [](const std::vector<std::shared_ptr<JobsEng::JobsItem>> &items, JobsEng::JobsConfig::ConfigProcessing & /* config */) {
// this functions is defined without the engine params (it is here just for the example)
Expand Down Expand Up @@ -86,16 +88,17 @@ namespace examples::jobs_engine {

// create a cache server (with workers to simulate access to it)
// (as an external engine outside the jobs engine for demo purposes)
small::worker_thread<JobsEng::JobsID> cache_server({.threads_count = 1}, [](auto &w /*this*/, const auto &items) {
// process item using the workers lock (not recommended)

small::worker_thread<JobsEng::JobsID> cache_server({.threads_count = 1}, [&](auto &w /*this*/, const auto &items) {
for (auto &i : items) {
std::cout << "thread " << std::this_thread::get_id()
<< " processing cache {" << i << "}" << "\n";
<< " CACHE processing"
<< " {" << i << "}" << "\n";

// TODO mark the jobs id associated as succeeded (for demo purposes to avoid creating other structures)
// mark the jobs id associated as succeeded (for demo purposes to avoid creating other structures)
jobs.jobs_finished(i, (Response)i);
}
// sleep long enough
// no coalesce for demo purposes (sleep 500) so 3rd parent items is finished due to database and not cache server
small::sleep(500);
});

Expand All @@ -120,10 +123,10 @@ namespace examples::jobs_engine {
});

// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param)
jobs.config_jobs_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) {
jobs.config_jobs_function_processing(JobsType::kJobsType1, [&](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB1 processing "
<< " JOB1 processing "
<< "{"
<< " type=" << (int)item->m_type
<< " req.int=" << item->m_request.first << ","
Expand All @@ -132,22 +135,41 @@ namespace examples::jobs_engine {
<< " ref count " << item.use_count()
<< " time " << small::toISOString(small::timeNow())
<< "\n";
// TODO add 2 more children jobs for current one for database and server cache
// TODO save somewhere in an unordered_map the database requests - the problem is that jobid is received after push_jobs
// TODO save type1 requests into a promises unordered_map
// TODO for type 2 only database (add another processing function)

// add 2 more children jobs for current one for database and server cache
JobsEng::JobsID jobs_child_db_id{};
JobsEng::JobsID jobs_child_cache_id{};

auto ret = j.queue().push_back_child(item->m_id /*parent*/, JobsType::kJobsDatabase, item->m_request, &jobs_child_db_id);
if (!ret) {
j.jobs_failed(item->m_id);
}
ret = j.queue().push_back_child(item->m_id /*parent*/, JobsType::kJobsCache, item->m_request, &jobs_child_cache_id);
if (!ret) {
j.jobs_failed(jobs_child_db_id);
j.jobs_failed(item->m_id);
}

j.jobs_start(small::EnumPriorities::kNormal, jobs_child_db_id);
// jobs_child_cache_id has no threads to execute, it has external executors
cache_server.push_back(jobs_child_cache_id);
}
small::sleep(30); }, 5 /*param b*/);

// TODO save type1 requests into a promises unordered_map and complete on finishing the job
// TODO add custom finish function for jobtype1 to complete the promises

// TODO save somewhere in an unordered_map the database requests (passes as request params for job type1)
// TODO daca as vrea sa folosesc un alt job_server cum modelez asa incat jobul dintr-o parte sa ramana intr-o stare ca si cand ar avea copii si
// TODO sa se faca un request in alta parte si ala cand se termina pe finish (sau daca e worker thread in functia de procesare) sa faca set state
// TODO set state merge daca e doar o dependinta, daca sunt mai multe atunci ar tb o functie custom - childProcessing (desi are sau nu are children - sau cum fac un dummy children - poate cu thread_count 0?)

// add specific function for job2
jobs.config_jobs_function_processing(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) {
bool first_job = true;
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB2 processing "
<< " JOB2 processing "
<< "{"
<< " type=" << (int)item->m_type
<< " req.int=" << item->m_request.first << ","
Expand All @@ -156,7 +178,21 @@ namespace examples::jobs_engine {
<< " ref count " << item.use_count()
<< " time " << small::toISOString(small::timeNow())
<< "\n";
// TODO for type 2 only database children (add another processing function)

if (first_job) {
// for type 2 only database children (for demo purposes no result will be used from database)
auto ret = j.queue().push_back_and_start_child(item->m_id /*parent*/,
small::EnumPriorities::kNormal,
JobsType::kJobsDatabase,
item->m_request);
if (!ret) {
j.jobs_failed(item->m_id);
}
} else {
j.jobs_failed(item->m_id);
}

first_job = false;
}
// TODO config to wait after request (even if it is not specified in the global config - so custom throttle)
small::sleep(30); });
Expand All @@ -170,35 +206,36 @@ namespace examples::jobs_engine {
JobsEng::JobsID jobs_id{};
std::vector<JobsEng::JobsID> jobs_ids;

// TODO create a promises/futures unordered_map for type1 requests and wait later
// type3 one request will succeed and one request will timeout for demo purposes
jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType3, {3, "normal3"}, &jobs_id);
jobs.queue().push_back_and_start(small::EnumPriorities::kHigh, JobsType::kJobsType3, {3, "high3"}, &jobs_id);

// type2 only the first request succeeds and waits for child the other fails from the start
jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType2, {2, "normal2"}, &jobs_id);
jobs.queue().push_back_and_start(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high2"}, &jobs_id);

// show coalesce for children database requests
std::unordered_map<unsigned int, JobsEng::JobsID> web_requests;

// TODO type3 one request will succeed and one request will timeout for demo purposes
// push
jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
jobs.queue().push_back_and_start(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id);

jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id);
jobs.queue().push_back_and_start(small::EnumPriorities::kHigh, JobsType::kJobsType1, {4, "high"}, &jobs_id);
jobs.queue().push_back_and_start(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id);

Request req = {6, "normal"};
jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType1, req, nullptr);
// TODO create a promises/futures unordered_map for type1 requests and wait later
// push with multiple variants
jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType1, {11, "normal11"}, &jobs_id);

std::vector<std::shared_ptr<JobsEng::JobsItem>> jobs_items = {
std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{7, "highest"}),
std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{8, "highest"}),
std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{12, "highest12"}),
};
jobs.queue().push_back_and_start(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
jobs.queue().push_back_and_start(small::EnumPriorities::kHighest, {std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{9, "highest"})}, &jobs_ids);

jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
jobs.queue().push_back_and_start_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id);
jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobsType::kJobsType1, {102, "delay normal"}, &jobs_id);
jobs.queue().push_back_and_start(small::EnumPriorities::kLow, JobsType::kJobsType1, {13, "low13"}, nullptr);

Request req = {14, "normal14"};
jobs.queue().push_back(JobsType::kJobsType1, req, &jobs_id);
jobs.jobs_start(small::EnumPriorities::kNormal, jobs_id);

jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {115, "delay normal115"}, &jobs_id);

jobs.start_threads(3); // manual start threads
// manual start threads
jobs.start_threads(3);

small::sleep(50);
// jobs.signal_exit_force();
Expand Down
Loading

0 comments on commit d3993dc

Please sign in to comment.