Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(executor): add new event checker for lock #16461

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 123 additions & 93 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,12 @@ impl ExecutingGraph {
node.processor.name(),
event
);

#[cfg(debug_assertions)]
let mut need_check_edges_len = false;
#[cfg(debug_assertions)]
let before_edges_len = need_schedule_edges.len();

let processor_state = match event {
Event::Finished => {
if !matches!(state_guard_cache.as_deref(), Some(State::Finished)) {
Expand All @@ -420,7 +426,10 @@ impl ExecutingGraph {

State::Finished
}
Event::NeedData | Event::NeedConsume => State::Idle,
Event::NeedData | Event::NeedConsume => {
need_check_edges_len = true;
State::Idle
}
Event::Sync => {
schedule_queue.push_sync(ProcessorWrapper {
processor: node.processor.clone(),
Expand All @@ -438,6 +447,22 @@ impl ExecutingGraph {
};

node.trigger(&mut need_schedule_edges);

#[cfg(debug_assertions)]
{
if need_check_edges_len && need_schedule_edges.len() <= before_edges_len {
// unlock
drop(state_guard_cache);

return Err(ErrorCode::Internal(format!(
"Incorrect scheduling state, {}({}) has not generated any new events. details: {:?}",
node.processor.name(),
node.processor.id().index(),
graph.format_graph_node(node.processor.id())
)));
}
}

*state_guard_cache.unwrap() = processor_state;
}
}
Expand Down Expand Up @@ -878,103 +903,75 @@ impl RunningGraph {
self.0.finished_notify.clone()
}

pub fn format_graph_nodes(&self) -> String {
pub struct NodeDisplay {
id: usize,
name: String,
state: String,
details_status: Option<String>,
inputs_status: Vec<(&'static str, &'static str, &'static str)>,
outputs_status: Vec<(&'static str, &'static str, &'static str)>,
}
/// # Safety
pub unsafe fn format_graph_node(&self, node_index: NodeIndex) -> NodeDisplay {
let state = self.0.graph[node_index].state.lock().unwrap();
let inputs_status = self.0.graph[node_index]
.inputs_port
.iter()
.map(|x| {
let finished = match x.is_finished() {
true => "Finished",
false => "Unfinished",
};

impl Debug for NodeDisplay {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match &self.details_status {
None => f
.debug_struct("Node")
.field("name", &self.name)
.field("id", &self.id)
.field("state", &self.state)
.field("inputs_status", &self.inputs_status)
.field("outputs_status", &self.outputs_status)
.finish(),
Some(details_status) => f
.debug_struct("Node")
.field("name", &self.name)
.field("id", &self.id)
.field("state", &self.state)
.field("inputs_status", &self.inputs_status)
.field("outputs_status", &self.outputs_status)
.field("details", details_status)
.finish(),
}
}
let has_data = match x.has_data() {
true => "HasData",
false => "Nodata",
};

let need_data = match x.is_need_data() {
true => "NeedData",
false => "UnNeeded",
};

(finished, has_data, need_data)
})
.collect::<Vec<_>>();

let outputs_status = self.0.graph[node_index]
.outputs_port
.iter()
.map(|x| {
let finished = match x.is_finished() {
true => "Finished",
false => "Unfinished",
};

let has_data = match x.has_data() {
true => "HasData",
false => "Nodata",
};

let need_data = match x.is_need_data() {
true => "NeedData",
false => "UnNeeded",
};

(finished, has_data, need_data)
})
.collect::<Vec<_>>();

NodeDisplay {
inputs_status,
outputs_status,
id: self.0.graph[node_index].processor.id().index(),
name: self.0.graph[node_index].processor.name(),
details_status: self.0.graph[node_index].processor.details_status(),
state: String::from(match *state {
State::Idle => "Idle",
State::Processing => "Processing",
State::Finished => "Finished",
}),
}
}

pub fn format_graph_nodes(&self) -> String {
let mut nodes_display = Vec::with_capacity(self.0.graph.node_count());

for node_index in self.0.graph.node_indices() {
unsafe {
let state = self.0.graph[node_index].state.lock().unwrap();
let inputs_status = self.0.graph[node_index]
.inputs_port
.iter()
.map(|x| {
let finished = match x.is_finished() {
true => "Finished",
false => "Unfinished",
};

let has_data = match x.has_data() {
true => "HasData",
false => "Nodata",
};

let need_data = match x.is_need_data() {
true => "NeedData",
false => "UnNeeded",
};

(finished, has_data, need_data)
})
.collect::<Vec<_>>();

let outputs_status = self.0.graph[node_index]
.outputs_port
.iter()
.map(|x| {
let finished = match x.is_finished() {
true => "Finished",
false => "Unfinished",
};

let has_data = match x.has_data() {
true => "HasData",
false => "Nodata",
};

let need_data = match x.is_need_data() {
true => "NeedData",
false => "UnNeeded",
};

(finished, has_data, need_data)
})
.collect::<Vec<_>>();

nodes_display.push(NodeDisplay {
inputs_status,
outputs_status,
id: self.0.graph[node_index].processor.id().index(),
name: self.0.graph[node_index].processor.name(),
details_status: self.0.graph[node_index].processor.details_status(),
state: String::from(match *state {
State::Idle => "Idle",
State::Processing => "Processing",
State::Finished => "Finished",
}),
});
unsafe {
for node_index in self.0.graph.node_indices() {
nodes_display.push(self.format_graph_node(node_index));
}
}

Expand Down Expand Up @@ -1044,3 +1041,36 @@ impl Debug for ScheduleQueue {
}
}
}

pub struct NodeDisplay {
id: usize,
name: String,
state: String,
details_status: Option<String>,
inputs_status: Vec<(&'static str, &'static str, &'static str)>,
outputs_status: Vec<(&'static str, &'static str, &'static str)>,
}

impl Debug for NodeDisplay {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match &self.details_status {
None => f
.debug_struct("Node")
.field("name", &self.name)
.field("id", &self.id)
.field("state", &self.state)
.field("inputs_status", &self.inputs_status)
.field("outputs_status", &self.outputs_status)
.finish(),
Some(details_status) => f
.debug_struct("Node")
.field("name", &self.name)
.field("id", &self.id)
.field("state", &self.state)
.field("inputs_status", &self.inputs_status)
.field("outputs_status", &self.outputs_status)
.field("details", details_status)
.finish(),
}
}
}
Loading