Skip to content

Commit

Permalink
chore(executor): add new event checker for lock
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Sep 18, 2024
1 parent cbbd4e3 commit 17d62c7
Showing 1 changed file with 123 additions and 93 deletions.
216 changes: 123 additions & 93 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,12 @@ impl ExecutingGraph {
node.processor.name(),
event
);

#[cfg(debug_assertions)]
let mut need_check_edges_len = false;
#[cfg(debug_assertions)]
let before_edges_len = need_schedule_edges.len();

let processor_state = match event {
Event::Finished => {
if !matches!(state_guard_cache.as_deref(), Some(State::Finished)) {
Expand All @@ -420,7 +426,10 @@ impl ExecutingGraph {

State::Finished
}
Event::NeedData | Event::NeedConsume => State::Idle,
Event::NeedData | Event::NeedConsume => {
need_check_edges_len = true;
State::Idle
}
Event::Sync => {
schedule_queue.push_sync(ProcessorWrapper {
processor: node.processor.clone(),
Expand All @@ -438,6 +447,22 @@ impl ExecutingGraph {
};

node.trigger(&mut need_schedule_edges);

#[cfg(debug_assertions)]
{
if need_check_edges_len && need_schedule_edges.len() <= before_edges_len {
// unlock
drop(state_guard_cache);

return Err(ErrorCode::Internal(format!(
"Incorrect scheduling state, {}({}) has not generated any new events. details: {:?}",
node.processor.name(),
node.processor.id().index(),
graph.format_graph_node(node.processor.id())
)));
}
}

*state_guard_cache.unwrap() = processor_state;
}
}
Expand Down Expand Up @@ -878,103 +903,75 @@ impl RunningGraph {
self.0.finished_notify.clone()
}

pub fn format_graph_nodes(&self) -> String {
pub struct NodeDisplay {
id: usize,
name: String,
state: String,
details_status: Option<String>,
inputs_status: Vec<(&'static str, &'static str, &'static str)>,
outputs_status: Vec<(&'static str, &'static str, &'static str)>,
}
/// # Safety
pub unsafe fn format_graph_node(&self, node_index: NodeIndex) -> NodeDisplay {
let state = self.0.graph[node_index].state.lock().unwrap();
let inputs_status = self.0.graph[node_index]
.inputs_port
.iter()
.map(|x| {
let finished = match x.is_finished() {
true => "Finished",
false => "Unfinished",
};

impl Debug for NodeDisplay {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match &self.details_status {
None => f
.debug_struct("Node")
.field("name", &self.name)
.field("id", &self.id)
.field("state", &self.state)
.field("inputs_status", &self.inputs_status)
.field("outputs_status", &self.outputs_status)
.finish(),
Some(details_status) => f
.debug_struct("Node")
.field("name", &self.name)
.field("id", &self.id)
.field("state", &self.state)
.field("inputs_status", &self.inputs_status)
.field("outputs_status", &self.outputs_status)
.field("details", details_status)
.finish(),
}
}
let has_data = match x.has_data() {
true => "HasData",
false => "Nodata",
};

let need_data = match x.is_need_data() {
true => "NeedData",
false => "UnNeeded",
};

(finished, has_data, need_data)
})
.collect::<Vec<_>>();

let outputs_status = self.0.graph[node_index]
.outputs_port
.iter()
.map(|x| {
let finished = match x.is_finished() {
true => "Finished",
false => "Unfinished",
};

let has_data = match x.has_data() {
true => "HasData",
false => "Nodata",
};

let need_data = match x.is_need_data() {
true => "NeedData",
false => "UnNeeded",
};

(finished, has_data, need_data)
})
.collect::<Vec<_>>();

NodeDisplay {
inputs_status,
outputs_status,
id: self.0.graph[node_index].processor.id().index(),
name: self.0.graph[node_index].processor.name(),
details_status: self.0.graph[node_index].processor.details_status(),
state: String::from(match *state {
State::Idle => "Idle",
State::Processing => "Processing",
State::Finished => "Finished",
}),
}
}

pub fn format_graph_nodes(&self) -> String {
let mut nodes_display = Vec::with_capacity(self.0.graph.node_count());

for node_index in self.0.graph.node_indices() {
unsafe {
let state = self.0.graph[node_index].state.lock().unwrap();
let inputs_status = self.0.graph[node_index]
.inputs_port
.iter()
.map(|x| {
let finished = match x.is_finished() {
true => "Finished",
false => "Unfinished",
};

let has_data = match x.has_data() {
true => "HasData",
false => "Nodata",
};

let need_data = match x.is_need_data() {
true => "NeedData",
false => "UnNeeded",
};

(finished, has_data, need_data)
})
.collect::<Vec<_>>();

let outputs_status = self.0.graph[node_index]
.outputs_port
.iter()
.map(|x| {
let finished = match x.is_finished() {
true => "Finished",
false => "Unfinished",
};

let has_data = match x.has_data() {
true => "HasData",
false => "Nodata",
};

let need_data = match x.is_need_data() {
true => "NeedData",
false => "UnNeeded",
};

(finished, has_data, need_data)
})
.collect::<Vec<_>>();

nodes_display.push(NodeDisplay {
inputs_status,
outputs_status,
id: self.0.graph[node_index].processor.id().index(),
name: self.0.graph[node_index].processor.name(),
details_status: self.0.graph[node_index].processor.details_status(),
state: String::from(match *state {
State::Idle => "Idle",
State::Processing => "Processing",
State::Finished => "Finished",
}),
});
unsafe {
for node_index in self.0.graph.node_indices() {
nodes_display.push(self.format_graph_node(node_index));
}
}

Expand Down Expand Up @@ -1044,3 +1041,36 @@ impl Debug for ScheduleQueue {
}
}
}

pub struct NodeDisplay {
id: usize,
name: String,
state: String,
details_status: Option<String>,
inputs_status: Vec<(&'static str, &'static str, &'static str)>,
outputs_status: Vec<(&'static str, &'static str, &'static str)>,
}

impl Debug for NodeDisplay {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match &self.details_status {
None => f
.debug_struct("Node")
.field("name", &self.name)
.field("id", &self.id)
.field("state", &self.state)
.field("inputs_status", &self.inputs_status)
.field("outputs_status", &self.outputs_status)
.finish(),
Some(details_status) => f
.debug_struct("Node")
.field("name", &self.name)
.field("id", &self.id)
.field("state", &self.state)
.field("inputs_status", &self.inputs_status)
.field("outputs_status", &self.outputs_status)
.field("details", details_status)
.finish(),
}
}
}

0 comments on commit 17d62c7

Please sign in to comment.