Skip to content

Commit

Permalink
Add multiple features for jobs engine like parent child dependencies,…
Browse files Browse the repository at this point in the history
… throttling with sleep between requests, timeout for processing
  • Loading branch information
herrcristi committed Jan 14, 2025
1 parent 253b16e commit 65a2cee
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 68 deletions.
84 changes: 60 additions & 24 deletions include/impl/jobs_item_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ namespace small::jobsimpl {
JobsTypeT m_type{}; // job type
std::atomic<EnumJobsState> m_state{EnumJobsState::kNone}; // job state
std::atomic<int> m_progress{}; // progress 0-100 for state kInProgress
std::atomic_bool m_has_parents{}; // for dependencies relationships parent-child
std::atomic_bool m_has_children{}; // for dependencies relationships parent-child
std::vector<JobsID> m_parentIDs{}; // for dependencies relationships parent-child
std::vector<JobsID> m_childrenIDs{}; // for dependencies relationships parent-child
JobsRequestT m_request{}; // request needed for processing function
Expand All @@ -53,26 +55,30 @@ namespace small::jobsimpl {
jobs_item(jobs_item &&other) noexcept { operator=(other); };
jobs_item &operator=(const jobs_item &other)
{
m_id = other.m_id;
m_type = other.m_type;
m_state = other.m_state.load();
m_progress = other.m_progress.load();
m_parentIDs = other.m_parentIDs;
m_childrenIDs = other.m_childrenIDs;
m_request = other.m_request;
m_response = other.m_response;
m_id = other.m_id;
m_type = other.m_type;
m_state = other.m_state.load();
m_progress = other.m_progress.load();
m_has_parents = other.m_has_parents.load();
m_has_children = other.m_has_children.load();
m_parentIDs = other.m_parentIDs;
m_childrenIDs = other.m_childrenIDs;
m_request = other.m_request;
m_response = other.m_response;
return *this;
}
jobs_item &operator=(jobs_item &&other) noexcept
{
m_id = std::move(other.m_id);
m_type = std::move(other.m_type);
m_state = other.m_state.load();
m_progress = other.m_progress.load();
m_parentIDs = std::move(other.m_parentIDs);
m_childrenIDs = std::move(other.m_childrenIDs);
m_request = std::move(other.m_request);
m_response = std::move(other.m_response);
m_id = std::move(other.m_id);
m_type = std::move(other.m_type);
m_state = other.m_state.load();
m_progress = other.m_progress.load();
m_has_parents = other.m_has_parents.load();
m_has_children = other.m_has_children.load();
m_parentIDs = std::move(other.m_parentIDs);
m_childrenIDs = std::move(other.m_childrenIDs);
m_request = std::move(other.m_request);
m_response = std::move(other.m_response);
return *this;
}

Expand Down Expand Up @@ -100,21 +106,23 @@ namespace small::jobsimpl {
inline void set_state_failed () { set_state(EnumJobsState::kFailed); }
inline void set_state_cancelled () { set_state(EnumJobsState::kCancelled); }

inline bool is_state (const EnumJobsState &state) { return m_state.load() == state; }
static bool is_state_complete (const EnumJobsState &state) { return state >= EnumJobsState::kFinished; }

inline bool is_state_inprogress () { return is_state(EnumJobsState::kInProgress); }
inline void is_state_waitchildren () { return is_state(EnumJobsState::kWaitChildren); }
inline bool is_state_finished () { return is_state(EnumJobsState::kFinished); }
inline bool is_state_timeout () { return is_state(EnumJobsState::kTimeout); }
inline void is_state_failed () { return is_state(EnumJobsState::kFailed); }
inline void is_state_cancelled () { return is_state(EnumJobsState::kCancelled); }
inline EnumJobsState get_state () const { return m_state.load(); }
inline bool is_state (const EnumJobsState &state) const { return get_state() == state; }
inline bool is_complete () const { return is_state_complete(get_state()); }

inline bool is_state_inprogress () const { return is_state(EnumJobsState::kInProgress); }
inline void is_state_waitchildren () const { return is_state(EnumJobsState::kWaitChildren); }
inline bool is_state_finished () const { return is_state(EnumJobsState::kFinished); }
inline bool is_state_timeout () const { return is_state(EnumJobsState::kTimeout); }
inline void is_state_failed () const { return is_state(EnumJobsState::kFailed); }
inline void is_state_cancelled () const { return is_state(EnumJobsState::kCancelled); }
// clang-format on

//
// set job progress (can only increase)
//

inline void set_progress(const int &new_progress)
{
for (;;) {
Expand All @@ -127,6 +135,34 @@ namespace small::jobsimpl {
}
}
}

//
// add child
//
inline void add_child(const JobsID &child_jobs_id)
{
m_childrenIDs.push_back(child_jobs_id); // this should be set under locked area
m_has_children = true;
}

inline bool has_children() const
{
return m_has_children.load();
}

//
// add parent
//
inline void add_parent(const JobsID &parent_jobs_id)
{
m_parentIDs.push_back(parent_jobs_id); // this should be set under locked area
m_has_parents = true;
}

inline bool has_parents() const
{
return m_has_parents.load();
}
};

} // namespace small::jobsimpl
23 changes: 19 additions & 4 deletions include/impl/jobs_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ namespace small::jobsimpl {
if (ret) {
m_parent_caller.jobs_schedule(jobs_type, jobs_id);
} else {
// TODO maybe call m_parent.jobs_failed(jobs_id)?
// TODO call m_parent.jobs_failed(jobs_id)? // jobs_start should not be under lock then
jobs_erase(jobs_id);
}
return ret;
Expand All @@ -614,7 +614,23 @@ namespace small::jobsimpl {
inline void jobs_erase(const JobsID &jobs_id)
{
std::unique_lock l(m_lock);

auto jobs_item = jobs_get(jobs_id);
if (!jobs_item) {
// already deleted
return;
}
// if not a final state, set it to cancelled (in case it is executing at this point)
if (!JobsItem::is_state_complete((*jobs_item)->get_state())) {
(*jobs_item)->set_state_cancelled();
}

m_jobs.erase(jobs_id);

// delete all children
for (auto &child_jobs_id : (*jobs_item)->m_childrenIDs) {
jobs_erase(child_jobs_id);
}
}

//
Expand All @@ -640,9 +656,8 @@ namespace small::jobsimpl {
inline void jobs_parent_child(std::shared_ptr<JobsItem> parent_jobs_item, std::shared_ptr<JobsItem> child_jobs_item)
{
std::unique_lock l(m_lock);

parent_jobs_item->m_childrenIDs.push_back(child_jobs_item->m_id);
child_jobs_item->m_parentIDs.push_back(parent_jobs_item->m_id);
parent_jobs_item->add_child(child_jobs_item->m_id);
child_jobs_item->add_parent(parent_jobs_item->m_id);
}

private:
Expand Down
46 changes: 23 additions & 23 deletions include/jobs_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@ namespace small {
// config for an individual job type
struct ConfigJobsType
{
JobsGroupT m_group{}; // job type group (multiple job types can be configured to same group)
std::optional<std::chrono::milliseconds> m_timeout{}; // if need to delay the next request processing to have some throtelling
bool m_has_function_processing{false}; // use default processing function
bool m_has_function_on_children_finished{false}; // use default function for children finished
bool m_has_function_finished{false}; // use default finished function
FunctionProcessing m_function_processing{}; // processing Function for jobs items
FunctionOnChildrenFinished m_function_on_children_finished{}; // function called for a parent when a child is finished
FunctionFinished m_function_finished{}; // function called when jobs items are finished
JobsGroupT m_group{}; // job type group (multiple job types can be configured to same group)
std::optional<std::chrono::milliseconds> m_timeout{}; // if need to delay the next request processing to have some throtelling
bool m_has_function_processing{false}; // use default processing function
bool m_has_function_children_finished{false}; // use default function for children finished
bool m_has_function_finished{false}; // use default finished function
FunctionProcessing m_function_processing{}; // processing Function for jobs items
FunctionOnChildrenFinished m_function_children_finished{}; // function called for a parent when a child is finished
FunctionFinished m_function_finished{}; // function called when jobs items are finished
};

ConfigJobsEngine m_engine{}; // config for entire engine (threads, priorities, etc)
FunctionProcessing m_default_function_processing{}; // default processing function
FunctionOnChildrenFinished m_default_function_on_children_finished{}; // default function to call for a parent when children are finished
FunctionFinished m_default_function_finished{}; // default function to call when jobs items are finished
std::unordered_map<JobsGroupT, ConfigJobsGroup> m_groups; // config by jobs group
std::unordered_map<JobsTypeT, ConfigJobsType> m_types; // config by jobs type
ConfigJobsEngine m_engine{}; // config for entire engine (threads, priorities, etc)
FunctionProcessing m_default_function_processing{}; // default processing function
FunctionOnChildrenFinished m_default_function_children_finished{}; // default function to call for a parent when children are finished
FunctionFinished m_default_function_finished{}; // default function to call when jobs items are finished
std::unordered_map<JobsGroupT, ConfigJobsGroup> m_groups; // config by jobs group
std::unordered_map<JobsTypeT, ConfigJobsType> m_types; // config by jobs type

//
// add default processing function
Expand All @@ -74,10 +74,10 @@ namespace small {
apply_default_function_processing();
}

inline void config_default_function_on_children_finished(FunctionOnChildrenFinished function_on_children_finished)
inline void config_default_function_children_finished(FunctionOnChildrenFinished function_children_finished)
{
m_default_function_on_children_finished = function_on_children_finished;
apply_default_function_on_children_finished();
m_default_function_children_finished = function_children_finished;
apply_default_function_children_finished();
}

inline void config_default_function_finished(FunctionFinished function_finished)
Expand All @@ -99,14 +99,14 @@ namespace small {
it_f->second.m_function_processing = function_processing;
}

inline void config_jobs_function_on_children_finished(const JobsTypeT &jobs_type, FunctionOnChildrenFinished function_on_children_finished)
inline void config_jobs_function_children_finished(const JobsTypeT &jobs_type, FunctionOnChildrenFinished function_children_finished)
{
auto it_f = m_types.find(jobs_type);
if (it_f == m_types.end()) {
return;
}
it_f->second.m_has_function_on_children_finished = true;
it_f->second.m_function_on_children_finished = function_on_children_finished;
it_f->second.m_has_function_children_finished = true;
it_f->second.m_function_children_finished = function_children_finished;
}

inline void config_jobs_function_finished(const JobsTypeT &jobs_type, FunctionFinished function_finished)
Expand All @@ -131,11 +131,11 @@ namespace small {
}
}

inline void apply_default_function_on_children_finished()
inline void apply_default_function_children_finished()
{
for (auto &[type, jobs_type_config] : m_types) {
if (jobs_type_config.m_has_function_on_children_finished == false) {
jobs_type_config.m_function_on_children_finished = m_default_function_on_children_finished;
if (jobs_type_config.m_has_function_children_finished == false) {
jobs_type_config.m_function_children_finished = m_default_function_children_finished;
}
}
}
Expand Down
Loading

0 comments on commit 65a2cee

Please sign in to comment.