Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analyzing progress notifications #44

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl Config {
debug!("reloaded configuration: {:#?}", state.config);

state.proc_macro_controller.on_config_change(&mut state.db, &state.config);
state.analysis_progress_controller.on_config_change(&state.config);
})
};

Expand Down
File renamed without changes.
106 changes: 106 additions & 0 deletions src/ide/analysis_progress.rs
Arcticae marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};

use crate::config::Config;
use crate::lsp::ext::{DiagnosticsCalculationFinish, DiagnosticsCalculationStart};
use crate::server::client::Notifier;

/// A struct that allows to track procmacro requests.
#[derive(Clone)]
pub struct ProcMacroRequestTracker {
procmacro_request_submitted: Arc<AtomicBool>,
}

impl ProcMacroRequestTracker {
pub fn new() -> Self {
Self { procmacro_request_submitted: Arc::new(AtomicBool::new(false)) }
}

/// Signals that a request to proc macro server was made during the current generation of
/// diagnostics.
pub fn register_procmacro_request(&self) {
self.procmacro_request_submitted.store(true, Ordering::SeqCst);
}

pub fn reset(&self) {
self.procmacro_request_submitted.store(false, Ordering::SeqCst);
}

pub fn get_did_submit_procmacro_request(&self) -> bool {
self.procmacro_request_submitted.load(Ordering::SeqCst)
}
}

#[derive(Clone)]
pub(crate) struct AnalysisProgressController {
state: Arc<Mutex<AnalysisProgressControllerState>>,
request_tracker: ProcMacroRequestTracker,
}

impl AnalysisProgressController {
pub fn on_config_change(&self, config: &Config) {
self.state.lock().unwrap().on_config_change(config)
}

pub fn try_start_analysis(&self) {
self.request_tracker.reset();
self.state.lock().unwrap().try_start_analysis()
}

pub fn try_stop_analysis(&self) {
self.state
.lock()
.unwrap()
.try_stop_analysis(self.request_tracker.get_did_submit_procmacro_request());
}
}

impl AnalysisProgressController {
pub fn new(notifier: Notifier, request_tracker: ProcMacroRequestTracker) -> Self {
Self {
request_tracker,
state: Arc::new(Mutex::new(AnalysisProgressControllerState::new(notifier))),
}
}
}

/// Controller used to send notifications to the client about analysis progress.
/// Uses information provided from other controllers (diagnostics controller, procmacro controller)
/// to assess if diagnostics are in fact calculated.
#[derive(Clone)]
pub struct AnalysisProgressControllerState {
notifier: Notifier,
/// Indicates that a notification was sent and analysis (i.e. macro expansion) is taking place.
analysis_in_progress: bool,
/// Loaded asynchronously from config
procmacros_enabled: Option<bool>,
}

impl AnalysisProgressControllerState {
pub fn new(notifier: Notifier) -> Self {
Self { notifier, analysis_in_progress: false, procmacros_enabled: None }
}

pub fn on_config_change(&mut self, config: &Config) {
self.procmacros_enabled = Some(config.enable_proc_macros);
}

fn try_start_analysis(&mut self) {
if !self.analysis_in_progress {
self.analysis_in_progress = true;
self.notifier.notify::<DiagnosticsCalculationStart>(());
}
}

fn try_stop_analysis(&mut self, did_submit_procmacro_request: bool) {
let config_not_loaded = self.procmacros_enabled.is_none();
if (!did_submit_procmacro_request
|| config_not_loaded
|| (self.procmacros_enabled == Some(false)))
&& self.analysis_in_progress
{
self.analysis_in_progress = false;
self.notifier.notify::<DiagnosticsCalculationFinish>(());
}
}
}
1 change: 1 addition & 0 deletions src/ide/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod analysis_progress;
pub mod code_actions;
pub mod completion;
pub mod formatter;
Expand Down
50 changes: 35 additions & 15 deletions src/lang/diagnostics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ use tracing::{error, trace};

use self::project_diagnostics::ProjectDiagnostics;
use self::refresh::{clear_old_diagnostics, refresh_diagnostics};
use self::trigger::trigger;
use crate::ide::analysis_progress::AnalysisProgressController;
use crate::lang::diagnostics::file_batches::{batches, find_primary_files, find_secondary_files};
use crate::lang::lsp::LsProtoGroup;
use crate::server::client::Notifier;
use crate::server::panic::cancelled_anyhow;
use crate::server::schedule::thread::task_progress_monitor::TaskHandle;
use crate::server::schedule::thread::{self, JoinHandle, ThreadPriority};
use crate::server::trigger;
use crate::state::{State, StateSnapshot};

mod file_batches;
mod file_diagnostics;
mod lsp;
mod project_diagnostics;
mod refresh;
mod trigger;

/// Schedules refreshing of diagnostics in a background thread.
///
Expand All @@ -41,9 +42,10 @@ pub struct DiagnosticsController {

impl DiagnosticsController {
/// Creates a new diagnostics controller.
pub fn new(notifier: Notifier) -> Self {
let (trigger, receiver) = trigger();
let (thread, parallelism) = DiagnosticsControllerThread::spawn(receiver, notifier);
pub fn new(notifier: Notifier, analysis_progress_tracker: AnalysisProgressController) -> Self {
let (trigger, receiver) = trigger::trigger();
let (thread, parallelism) =
DiagnosticsControllerThread::spawn(receiver, notifier, analysis_progress_tracker);
Self {
trigger,
_thread: thread,
Expand All @@ -53,8 +55,7 @@ impl DiagnosticsController {

/// Schedules diagnostics refreshing on snapshot(s) of the current state.
pub fn refresh(&self, state: &State) {
let state_snapshots = StateSnapshots::new(state, &self.state_snapshots_props);
self.trigger.activate(state_snapshots);
self.trigger.activate(StateSnapshots::new(state, &self.state_snapshots_props));
}
}

Expand All @@ -64,6 +65,8 @@ struct DiagnosticsControllerThread {
notifier: Notifier,
pool: thread::Pool,
project_diagnostics: ProjectDiagnostics,
analysis_progress_controller: AnalysisProgressController,
worker_handles: Vec<TaskHandle>,
}

impl DiagnosticsControllerThread {
Expand All @@ -72,12 +75,15 @@ impl DiagnosticsControllerThread {
fn spawn(
receiver: trigger::Receiver<StateSnapshots>,
notifier: Notifier,
analysis_progress_controller: AnalysisProgressController,
) -> (JoinHandle, NonZero<usize>) {
let this = Self {
let mut this = Self {
receiver,
notifier,
analysis_progress_controller,
pool: thread::Pool::new(),
project_diagnostics: ProjectDiagnostics::new(),
worker_handles: Vec::new(),
};

let parallelism = this.pool.parallelism();
Expand All @@ -91,8 +97,11 @@ impl DiagnosticsControllerThread {
}

/// Runs diagnostics controller's event loop.
fn event_loop(&self) {
fn event_loop(&mut self) {
while let Some(state_snapshots) = self.receiver.wait() {
assert!(self.worker_handles.is_empty());
self.analysis_progress_controller.try_start_analysis();

if let Err(err) = catch_unwind(AssertUnwindSafe(|| {
self.diagnostics_controller_tick(state_snapshots);
})) {
Expand All @@ -103,20 +112,23 @@ impl DiagnosticsControllerThread {
error!("caught panic while refreshing diagnostics");
}
}

self.join_and_clear_workers();
self.analysis_progress_controller.try_stop_analysis();
}
}

/// Runs a single tick of the diagnostics controller's event loop.
#[tracing::instrument(skip_all)]
fn diagnostics_controller_tick(&self, state_snapshots: StateSnapshots) {
fn diagnostics_controller_tick(&mut self, state_snapshots: StateSnapshots) {
let (state, primary_snapshots, secondary_snapshots) = state_snapshots.split();

let primary_set = find_primary_files(&state.db, &state.open_files);
let primary: Vec<_> = primary_set.iter().copied().collect();
self.spawn_refresh_worker(&primary, primary_snapshots);
self.spawn_refresh_workers(&primary, primary_snapshots);

let secondary = find_secondary_files(&state.db, &primary_set);
self.spawn_refresh_worker(&secondary, secondary_snapshots);
self.spawn_refresh_workers(&secondary, secondary_snapshots);

let files_to_preserve: HashSet<Url> = primary
.into_iter()
Expand All @@ -131,11 +143,11 @@ impl DiagnosticsControllerThread {

/// Shortcut for spawning a worker task which does the boilerplate around cloning state parts
/// and catching panics.
fn spawn_worker(&self, f: impl FnOnce(ProjectDiagnostics, Notifier) + Send + 'static) {
fn spawn_worker(&mut self, f: impl FnOnce(ProjectDiagnostics, Notifier) + Send + 'static) {
let project_diagnostics = self.project_diagnostics.clone();
let notifier = self.notifier.clone();
let worker_fn = move || f(project_diagnostics, notifier);
self.pool.spawn(ThreadPriority::Worker, move || {
let worker_handle = self.pool.spawn_with_tracking(ThreadPriority::Worker, move || {
if let Err(err) = catch_unwind(AssertUnwindSafe(worker_fn)) {
if let Ok(err) = cancelled_anyhow(err, "diagnostics worker has been cancelled") {
trace!("{err:?}");
Expand All @@ -144,10 +156,11 @@ impl DiagnosticsControllerThread {
}
}
});
self.worker_handles.push(worker_handle);
}

/// Makes batches out of `files` and spawns workers to run [`refresh_diagnostics`] on them.
fn spawn_refresh_worker(&self, files: &[FileId], state_snapshots: Vec<StateSnapshot>) {
fn spawn_refresh_workers(&mut self, files: &[FileId], state_snapshots: Vec<StateSnapshot>) {
let files_batches = batches(files, self.pool.parallelism());
assert_eq!(files_batches.len(), state_snapshots.len());
for (batch, state) in zip(files_batches, state_snapshots) {
Expand All @@ -162,6 +175,13 @@ impl DiagnosticsControllerThread {
});
}
}

fn join_and_clear_workers(&mut self) {
for handle in self.worker_handles.iter() {
handle.join();
}
self.worker_handles.clear();
}
}

/// Holds multiple snapshots of the state.
Expand Down
26 changes: 23 additions & 3 deletions src/lang/proc_macros/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, VecDeque};
use std::fmt::{Debug, Formatter};
use std::sync::{Mutex, MutexGuard};

use anyhow::{Context, Result, anyhow, ensure};
Expand All @@ -16,8 +17,10 @@ use scarb_proc_macro_server_types::methods::expand::{
pub use status::ClientStatus;
use tracing::error;

use crate::id_generator;
use crate::ide::analysis_progress::ProcMacroRequestTracker;

pub mod connection;
mod id_generator;
pub mod status;

#[derive(Debug)]
Expand All @@ -27,21 +30,26 @@ pub enum RequestParams {
Inline(ExpandInlineMacroParams),
}

#[derive(Debug)]
pub struct ProcMacroClient {
connection: ProcMacroServerConnection,
id_generator: id_generator::IdGenerator,
requests_params: Mutex<HashMap<RequestId, RequestParams>>,
error_channel: Sender<()>,
proc_macro_request_tracker: ProcMacroRequestTracker,
}

impl ProcMacroClient {
pub fn new(connection: ProcMacroServerConnection, error_channel: Sender<()>) -> Self {
pub fn new(
connection: ProcMacroServerConnection,
error_channel: Sender<()>,
proc_macro_request_tracker: ProcMacroRequestTracker,
) -> Self {
Self {
connection,
id_generator: Default::default(),
requests_params: Default::default(),
error_channel,
proc_macro_request_tracker,
}
}

Expand Down Expand Up @@ -146,6 +154,7 @@ impl ProcMacroClient {

match self.send_request_untracked::<M>(id, &params) {
Ok(()) => {
self.proc_macro_request_tracker.register_procmacro_request();
requests_params.insert(id, map(params));
}
Err(err) => {
Expand All @@ -162,6 +171,17 @@ impl ProcMacroClient {
}
}

impl Debug for ProcMacroClient {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProcMacroClient")
.field("connection", &self.connection)
.field("id_generator", &self.id_generator)
.field("requests_params", &self.requests_params)
.field("error_channel", &self.error_channel)
.finish_non_exhaustive()
}
}

pub struct Responses<'a> {
responses: MutexGuard<'a, VecDeque<RpcResponse>>,
requests: MutexGuard<'a, HashMap<RequestId, RequestParams>>,
Expand Down
10 changes: 9 additions & 1 deletion src/lang/proc_macros/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::client::connection::ProcMacroServerConnection;
use super::client::status::ClientStatus;
use super::client::{ProcMacroClient, RequestParams};
use crate::config::Config;
use crate::ide::analysis_progress::ProcMacroRequestTracker;
use crate::lang::db::AnalysisDatabase;
use crate::lang::proc_macros::db::ProcMacroGroup;
use crate::lang::proc_macros::plugins::proc_macro_plugin_suite;
Expand Down Expand Up @@ -53,17 +54,23 @@ pub struct ProcMacroClientController {
plugin_suite: Option<PluginSuite>,
initialization_retries: RateLimiter<NotKeyed, InMemoryState, QuantaClock>,
channels: ProcMacroChannels,
proc_macro_request_tracker: ProcMacroRequestTracker,
}

impl ProcMacroClientController {
pub fn channels(&mut self) -> ProcMacroChannels {
self.channels.clone()
}

pub fn new(scarb: ScarbToolchain, notifier: Notifier) -> Self {
pub fn new(
scarb: ScarbToolchain,
notifier: Notifier,
proc_macro_request_tracker: ProcMacroRequestTracker,
) -> Self {
Self {
scarb,
notifier,
proc_macro_request_tracker,
plugin_suite: Default::default(),
initialization_retries: RateLimiter::direct(
Quota::with_period(Duration::from_secs(
Expand Down Expand Up @@ -176,6 +183,7 @@ impl ProcMacroClientController {
self.channels.response_sender.clone(),
),
self.channels.error_sender.clone(),
self.proc_macro_request_tracker.clone(),
);

client.start_initialize();
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ use crate::state::State;

mod config;
mod env_config;
mod id_generator;
mod ide;
mod lang;
pub mod lsp;
Expand Down
Loading
Loading