Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re enable use all monitor (without device manager) #1360

Draft
wants to merge 1 commit into
base: revert-device-manager
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@
cli.enable_realtime_audio_transcription,
Arc::new(realtime_transcription_sender_clone), // Use the cloned sender
realtime_vision_sender_clone,
cli.use_all_monitors,
);

let result = tokio::select! {
Expand Down Expand Up @@ -602,7 +603,7 @@

let (audio_devices_tx, _) = broadcast::channel(100);

let realtime_vision_sender_clone = realtime_vision_sender_clone.clone();

Check warning on line 606 in screenpipe-server/src/bin/screenpipe-server.rs

View workflow job for this annotation

GitHub Actions / test-linux

unused variable: `realtime_vision_sender_clone`

Check warning on line 606 in screenpipe-server/src/bin/screenpipe-server.rs

View workflow job for this annotation

GitHub Actions / test-windows

unused variable: `realtime_vision_sender_clone`
// TODO: Add SSE stream for realtime audio transcription
let server = Server::new(
db_server,
Expand Down
4 changes: 4 additions & 0 deletions screenpipe-server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ pub struct Cli {
#[arg(long, default_value_t = false)]
pub capture_unfocused_windows: bool,

/// Automatically detect and use all monitors, including newly connected ones
#[arg(long, default_value_t = false)]
pub use_all_monitors: bool,

#[command(subcommand)]
pub command: Option<Command>,

Expand Down
99 changes: 94 additions & 5 deletions screenpipe-server/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{DatabaseManager, VideoCapture};
use anyhow::Result;
use dashmap::DashMap;
use futures::future::join_all;
use tracing::{debug, error, info, warn};
use screenpipe_audio::realtime::RealtimeTranscriptionEvent;
use screenpipe_audio::vad_engine::VadSensitivity;
use screenpipe_audio::{
Expand All @@ -15,6 +14,7 @@ use screenpipe_audio::{start_realtime_recording, AudioStream};
use screenpipe_core::pii_removal::remove_pii;
use screenpipe_core::Language;
use screenpipe_vision::core::{RealtimeVisionEvent, WindowOcr};
use screenpipe_vision::monitor::list_monitors;
use screenpipe_vision::OcrEngine;
use std::collections::HashMap;
use std::path::PathBuf;
Expand All @@ -23,6 +23,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};

#[allow(clippy::too_many_arguments)]
pub async fn start_continuous_recording(
Expand Down Expand Up @@ -52,9 +53,10 @@ pub async fn start_continuous_recording(
realtime_audio_enabled: bool,
realtime_transcription_sender: Arc<tokio::sync::broadcast::Sender<RealtimeTranscriptionEvent>>,
realtime_vision_sender: Arc<tokio::sync::broadcast::Sender<RealtimeVisionEvent>>,
use_all_monitors: bool,
) -> Result<()> {
debug!("Starting video recording for monitor {:?}", monitor_ids);
let video_tasks = if !vision_disabled {
let video_tasks = if !vision_disabled && !use_all_monitors {
monitor_ids
.iter()
.map(|&monitor_id| {
Expand Down Expand Up @@ -96,6 +98,85 @@ pub async fn start_continuous_recording(
})]
};

let monitor_watcher = if !vision_disabled && use_all_monitors {
let vision_control_clone = Arc::clone(&vision_control);
let db_clone = Arc::clone(&db);
let output_path_clone = Arc::clone(&output_path);
let ocr_engine_clone = Arc::clone(&ocr_engine);
let ignored_windows_clone = ignored_windows.to_vec();
let include_windows_clone = include_windows.to_vec();
let languages_clone = languages.clone();
let realtime_vision_sender_clone = realtime_vision_sender.clone();
let vision_handle_clone = vision_handle.clone();

Some(tokio::spawn(async move {
let mut current_monitors = HashMap::new();

loop {
let available_monitors = list_monitors().await;
let available_monitors = Arc::new(available_monitors);

// Check for new monitors
for monitor in available_monitors.iter() {
let monitor_id = monitor.id();
if !current_monitors.contains_key(&monitor_id) {
debug!("New monitor detected: {}", monitor_id);

let handle = vision_handle_clone.spawn({
let db_clone = Arc::clone(&db_clone);
let output_path_clone = Arc::clone(&output_path_clone);
let vision_control_clone = Arc::clone(&vision_control_clone);
let ocr_engine_clone = Arc::clone(&ocr_engine_clone);
let ignored_windows = ignored_windows_clone.clone();
let include_windows = include_windows_clone.clone();
let languages = languages_clone.clone();
let realtime_sender = realtime_vision_sender_clone.clone();

async move {
record_video(
db_clone,
output_path_clone,
fps,
vision_control_clone,
ocr_engine_clone,
monitor_id,
use_pii_removal,
&ignored_windows,
&include_windows,
video_chunk_duration,
languages,
capture_unfocused_windows,
realtime_sender,
)
.await
}
});

current_monitors.insert(monitor_id, handle);
}
}

// Check for removed monitors
current_monitors.retain(|monitor_id, handle| {
if !Arc::clone(&available_monitors)
.iter()
.any(|m| m.id() == *monitor_id)
{
debug!("Monitor removed: {}", monitor_id);
handle.abort();
false
} else {
true
}
});

tokio::time::sleep(Duration::from_secs(5)).await;
}
}))
} else {
None
};

let (whisper_sender, whisper_receiver, whisper_shutdown_flag) = if audio_disabled {
// Create a dummy channel if no audio devices are available, e.g. audio disabled
let (input_sender, _): (
Expand Down Expand Up @@ -163,6 +244,13 @@ pub async fn start_continuous_recording(
error!("Audio recording error: {:?}", e);
}

// Make sure to handle the monitor watcher task
if let Some(watcher) = monitor_watcher {
if let Err(e) = watcher.await {
error!("Monitor watcher task failed: {}", e);
}
}

// Shutdown the whisper channel
whisper_shutdown_flag.store(true, Ordering::Relaxed);
drop(whisper_sender_clone); // Close the sender channel
Expand All @@ -172,6 +260,7 @@ pub async fn start_continuous_recording(
// TODO: any additional cleanup like device controls to release

info!("Stopped recording");

Ok(())
}

Expand Down Expand Up @@ -220,10 +309,10 @@ async fn record_video(
fps,
video_chunk_duration,
new_chunk_callback,
Arc::clone(&ocr_engine),
(*ocr_engine).clone(),
monitor_id,
ignored_windows,
include_windows,
ignored_windows.to_vec().into(),
include_windows.to_vec().into(),
languages,
capture_unfocused_windows,
);
Expand Down
Loading
Loading