diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 545e852..bfada80 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -2,12 +2,12 @@ name: Rust CI and release on: push: - branches: + branches: - '**' tags: - '*' pull_request: - branches: + branches: - '**' jobs: @@ -24,7 +24,7 @@ jobs: - name: Set up Rust uses: actions-rs/toolchain@v1 with: - toolchain: 1.77.1 + toolchain: 1.81.0 override: true - name: Display Rust and Cargo version diff --git a/Cargo.toml b/Cargo.toml index ede0135..45b20b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,9 +10,8 @@ authors = ["Cacophony Developers "] [dependencies] framebuffer = "0.3.1" -bmp = "0.5.0" -rppal = "0.17.0" -chrono = "0.4.31" +rppal = "0.19.0" +chrono = { version = "0.4.38", features = ["serde"] } byteorder = "1.4.3" thread-priority = "1.1.0" argh = "0.1.10" @@ -20,17 +19,16 @@ mdns-sd = "0.11.0" crc = "3.0.1" log = "0.4.19" simplelog = "0.12.1" -flate2 = "1.0.26" +flate2 = "1.0.33" nom = "7.1.3" serde = { version = "1.0.183", features = ["derive"] } toml = "0.8.8" notify = { version = "6.1.1", default-features = false } -chrono-tz = "0.9.0" +chrono-tz = "0.10.0" signal-hook = "0.3.17" sun-times = "0.2.0" -triangulate = "0.2.0" +louvre = "0.2.1" rustbus = "0.19.3" -lazy_static = "1.4.0" sha256 = "1.5.0" [target.aarch64-unknown-linux-musl] @@ -51,3 +49,6 @@ assets = [ ] maintainer-scripts = "_releases/scripts" revision = "" + +[profile.release] +lto = true diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..90c3849 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,2 @@ +use_field_init_shorthand = true +use_small_heuristics = "Max" diff --git a/src/camera_transfer_state.rs b/src/camera_transfer_state.rs new file mode 100644 index 0000000..827f493 --- /dev/null +++ b/src/camera_transfer_state.rs @@ -0,0 +1,750 @@ +use crate::cptv_frame_dispatch::FRAME_BUFFER; +use crate::dbus_attiny_i2c::{exit_cleanly, process_interrupted}; + +use crate::device_config::{check_for_device_config_changes, DeviceConfig}; +use crate::event_logger::{LoggerEvent, LoggerEventKind, WakeReason}; +use crate::frame_socket_server::FrameSocketServerMessage; +use crate::program_rp2040::program_rp2040; +use crate::recording_state::RecordingMode; +use crate::save_audio::save_audio_file_to_disk; +use crate::save_cptv::save_cptv_file_to_disk; +use crate::utils::u8_slice_as_u16_slice; +use crate::{RecordingState, AUDIO_SHEBANG, EXPECTED_RP2040_FIRMWARE_VERSION, FRAME_LENGTH}; +use byteorder::{BigEndian, ByteOrder, LittleEndian}; +use chrono::{DateTime, Utc}; +use chrono_tz::Tz::Pacific__Auckland; +use crc::{Crc, CRC_16_XMODEM}; +use log::{error, info, warn}; +use rppal::gpio::Trigger; +use rppal::spi::{BitOrder, Bus, Mode, Polarity, SlaveSelect, Spi}; +use rustbus::connection::Timeout; +use rustbus::{get_system_bus_path, DuplexConn}; +use std::ops::Not; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::Arc; +use std::thread::sleep; +use std::time::{Duration, Instant}; +use std::{process, thread}; + +pub const CAMERA_CONNECT_INFO: u8 = 0x1; +pub const CAMERA_RAW_FRAME_TRANSFER: u8 = 0x2; +pub const CAMERA_BEGIN_FILE_TRANSFER: u8 = 0x3; +pub const CAMERA_RESUME_FILE_TRANSFER: u8 = 0x4; +pub const CAMERA_END_FILE_TRANSFER: u8 = 0x5; +pub const CAMERA_BEGIN_AND_END_FILE_TRANSFER: u8 = 0x6; +pub const CAMERA_GET_MOTION_DETECTION_MASK: u8 = 0x7; +pub const CAMERA_SEND_LOGGER_EVENT: u8 = 0x8; + +pub struct CameraHandshakeInfo { + pub radiometry_enabled: bool, + pub is_recording: bool, + pub firmware_version: u32, + pub camera_serial: String, +} + +#[repr(u8)] +#[derive(Copy, Clone, PartialEq, Debug)] +pub enum ExtTransferMessage { + CameraConnectInfo = 0x1, + CameraRawFrameTransfer = 0x2, + BeginFileTransfer = 0x3, + ResumeFileTransfer = 0x4, + EndFileTransfer = 0x5, + BeginAndEndFileTransfer = 0x6, + GetMotionDetectionMask = 0x7, + SendLoggerEvent = 0x8, +} + +impl TryFrom for ExtTransferMessage { + type Error = (); + + fn try_from(value: u8) -> Result { + use ExtTransferMessage::*; + match value { + 0x1 => Ok(CameraConnectInfo), + 0x2 => Ok(CameraRawFrameTransfer), + 0x3 => Ok(BeginFileTransfer), + 0x4 => Ok(ResumeFileTransfer), + 0x5 => Ok(EndFileTransfer), + 0x6 => Ok(BeginAndEndFileTransfer), + 0x7 => Ok(GetMotionDetectionMask), + 0x8 => Ok(SendLoggerEvent), + _ => Err(()), + } + } +} + +pub fn enter_camera_transfer_loop( + initial_config: DeviceConfig, + mut dbus_conn: DuplexConn, + spi_speed_mhz: u32, + device_config_change_channel_rx: Receiver, + restart_rp2040_channel_tx: Sender, + sig_term: Arc, + camera_handshake_channel_tx: Sender, + restart_rp2040_ack: Arc, + mut recording_state: RecordingState, +) { + let spi_speed = spi_speed_mhz * 1_000_000; + // rPi3 can handle 12Mhz (@600Mhz), may need to back it off a little to have some slack. + info!("Initialising SPI at {}Mhz", spi_speed_mhz); + let mut spi = match Spi::new(Bus::Spi0, SlaveSelect::Ss0, spi_speed, Mode::Mode3) { + Ok(spi) => spi, + Err(e) => { + error!("Failed to get SPI0: {e}"); + process::exit(1); + } + }; + if spi.set_bits_per_word(8).is_err() { + error!("Failed to set SPI bits per word"); + process::exit(1); + }; + if spi.set_bit_order(BitOrder::MsbFirst).is_err() { + error!("Failed to set SPI output bit order"); + process::exit(1); + }; + if spi.set_ss_polarity(Polarity::ActiveLow).is_err() { + error!("Failed to set SPI SS polarity"); + process::exit(1); + } + let gpio = match rppal::gpio::Gpio::new() { + Err(e) => { + error!("Failed to get GPIO: {e}"); + process::exit(1); + } + Ok(gpio) => gpio, + }; + let mut pin = match gpio.get(7) { + Ok(pin) => pin.into_input(), + Err(e) => { + error!( + "Failed to get pi ping interrupt pin ({e}), \ + is 'dtoverlay=spi0-1cs,cs0_pin=8' set in your config.txt?" + ); + process::exit(1); + } + }; + pin.clear_interrupt() + .map_err(|e| { + error!("Unable to clear pi ping interrupt pin: {e}"); + process::exit(1); + }) + .unwrap(); + + // NOTE: `rppal` now has a `debounce` option here which may be worth exploring. + pin.set_interrupt(Trigger::RisingEdge, None) + .map_err(|e| { + error!("Unable to set pi ping interrupt: {e}"); + process::exit(1); + }) + .unwrap(); + // 65K buffer that we won't fully use at the moment. + let mut raw_read_buffer = [0u8; 65535]; + let mut got_first_frame = false; + let mut file_download: Option> = None; + let header_length = 18; + let mut return_payload_buf = [0u8; 32 + 104]; + + // This sequence is used to synchronise the return payload start on the rp2040, since + // it seems to have a fair bit of slop/offsetting. + return_payload_buf[0..4].copy_from_slice(&[1, 2, 3, 4]); + let mut part_count = 0; + let mut start = Instant::now(); + let crc_check = Crc::::new(&CRC_16_XMODEM); + let max_size: usize = raw_read_buffer.len(); + let mut device_config: DeviceConfig = initial_config; + let mut rp2040_needs_reset = false; + let mut sent_reset_request = false; + let mut header_integrity_check_has_failed = false; + let mut got_startup_info = false; + let mut radiometry_enabled = false; + let mut firmware_version = 0; + let mut lepton_serial_number = String::from(""); + let mut is_audio_device = device_config.is_audio_device(); + info!("Waiting for messages from rp2040"); + 'transfer: loop { + check_for_device_config_changes( + &device_config_change_channel_rx, + &mut device_config, + &mut is_audio_device, + &mut rp2040_needs_reset, + ); + if is_audio_device { + maybe_make_test_audio_recording( + &mut dbus_conn, + &restart_rp2040_channel_tx, + &mut recording_state, + ); + } + if !recording_state.is_recording() && rp2040_needs_reset { + let date = chrono::Local::now(); + warn!("Requesting reset of rp2040 at {}", date.with_timezone(&Pacific__Auckland)); + rp2040_needs_reset = false; + got_startup_info = false; + is_audio_device = device_config.is_audio_device(); + + if !sent_reset_request { + sent_reset_request = true; + let _ = restart_rp2040_channel_tx.send(true); + } + + if restart_rp2040_ack.load(Ordering::Relaxed) { + // Reset this atomic bool if the reset has been processed by the frame server thread. + sent_reset_request = false; + restart_rp2040_ack.store(false, Ordering::Relaxed); + } + } + let poll_result = pin.poll_interrupt(true, Some(Duration::from_millis(2000))); + if let Ok(_pin_level) = poll_result { + if _pin_level.is_some() { + { + drop(pin); + let output_pin = match gpio.get(7) { + Ok(pin) => pin.into_output_low(), + Err(e) => { + error!( + "Failed to get pi ping interrupt pin ({e}), \ + is 'dtoverlay=spi0-1cs,cs0_pin=8' set in your config.txt?" + ); + process::exit(1); + } + }; + drop(output_pin); + pin = match gpio.get(7) { + Ok(pin) => pin.into_input(), + Err(e) => { + error!( + "Failed to get pi ping interrupt pin ({e}), \ + is 'dtoverlay=spi0-1cs,cs0_pin=8' set in your config.txt?" + ); + process::exit(1); + } + }; + + pin.clear_interrupt() + .map_err(|e| { + error!("Unable to clear pi ping interrupt pin: {e}"); + process::exit(1); + }) + .unwrap(); + pin.set_interrupt(Trigger::RisingEdge, None) + .map_err(|e| { + error!("Unable to set pi ping interrupt: {e}"); + process::exit(1); + }) + .unwrap(); + } + + spi.read(&mut raw_read_buffer[..2066]).unwrap(); + { + let header_slice = &raw_read_buffer[..header_length]; + let transfer_type = header_slice[0]; + let transfer_type_dup = header_slice[1]; + let mut num_bytes = LittleEndian::read_u32(&header_slice[2..6]) as usize; + let num_bytes_dup = LittleEndian::read_u32(&header_slice[6..10]) as usize; + num_bytes = num_bytes.min(max_size); + let crc_from_remote = LittleEndian::read_u16(&header_slice[10..12]); + let crc_from_remote_dup = LittleEndian::read_u16(&header_slice[12..14]); + let crc_from_remote_inv = LittleEndian::read_u16(&header_slice[14..16]); + let crc_from_remote_inv_dup = LittleEndian::read_u16(&header_slice[16..=17]); + + let num_bytes_check = num_bytes == num_bytes_dup; + let header_crc_check = crc_from_remote == crc_from_remote_dup + && crc_from_remote_inv_dup == crc_from_remote_inv + && crc_from_remote_inv.not() == crc_from_remote; + let transfer_type_check = transfer_type == transfer_type_dup; + if !num_bytes_check || !header_crc_check || !transfer_type_check { + // Just log the *first* time the header integrity check fails in a session. + if !header_integrity_check_has_failed { + header_integrity_check_has_failed = true; + warn!("Header integrity check failed {:?}", &header_slice[..]); + // We still need to make sure we read out all the bytes? + } + + LittleEndian::write_u16(&mut return_payload_buf[4..6], 0); + LittleEndian::write_u16(&mut return_payload_buf[6..8], 0); + spi.write(&return_payload_buf).unwrap(); + if process_interrupted(&sig_term, &mut dbus_conn) { + break 'transfer; + } + continue 'transfer; + } + if num_bytes == 0 { + // warn!("zero-sized payload"); + LittleEndian::write_u16(&mut return_payload_buf[4..6], 0); + LittleEndian::write_u16(&mut return_payload_buf[6..8], 0); + spi.write(&return_payload_buf).unwrap(); + if process_interrupted(&sig_term, &mut dbus_conn) { + break 'transfer; + } + continue 'transfer; + } + if transfer_type < CAMERA_CONNECT_INFO + || transfer_type > CAMERA_SEND_LOGGER_EVENT + { + warn!("unknown transfer type {}", transfer_type); + LittleEndian::write_u16(&mut return_payload_buf[4..6], 0); + LittleEndian::write_u16(&mut return_payload_buf[6..8], 0); + spi.write(&return_payload_buf).unwrap(); + if process_interrupted(&sig_term, &mut dbus_conn) { + break 'transfer; + } + continue 'transfer; + } + + if transfer_type != CAMERA_RAW_FRAME_TRANSFER { + if transfer_type == CAMERA_BEGIN_FILE_TRANSFER { + start = Instant::now(); + } + let chunk = &raw_read_buffer[header_length..header_length + num_bytes]; + // Write back the crc we calculated. + let crc = crc_check.checksum(chunk); + LittleEndian::write_u16(&mut return_payload_buf[4..6], crc); + LittleEndian::write_u16(&mut return_payload_buf[6..8], crc); + + if transfer_type == CAMERA_CONNECT_INFO { + info!("Got camera connect info {:?}", chunk); + // Write all the info we need about the device: + device_config.write_to_slice(&mut return_payload_buf[8..]); + info!("Sending camera device config to rp2040"); + } else if transfer_type == CAMERA_GET_MOTION_DETECTION_MASK { + let piece_number = &chunk[0]; + //info!("Got request for mask piece number {}", piece_number); + let piece = device_config.mask_piece(*piece_number as usize); + + let mut piece_with_length = [0u8; 101]; + piece_with_length[0] = piece.len() as u8; + piece_with_length[1..1 + piece.len()].copy_from_slice(piece); + let crc = crc_check.checksum(&piece_with_length[0..1 + piece.len()]); + LittleEndian::write_u16(&mut return_payload_buf[8..10], crc); + //info!("Sending piece({}) {:?}", piece.len(), piece_with_length); + return_payload_buf[10..10 + piece.len() + 1] + .copy_from_slice(&piece_with_length); + } + // Always write the return buffer + spi.write(&return_payload_buf).unwrap(); + if crc == crc_from_remote { + match transfer_type { + CAMERA_CONNECT_INFO => { + radiometry_enabled = LittleEndian::read_u32(&chunk[0..4]) == 2; + firmware_version = LittleEndian::read_u32(&chunk[4..8]); + lepton_serial_number = + format!("{}", LittleEndian::read_u32(&chunk[8..12])); + got_startup_info = true; + if firmware_version != EXPECTED_RP2040_FIRMWARE_VERSION { + exit_cleanly(&mut dbus_conn); + info!("Unsupported firmware version, expected {}, got {}. Will reprogram RP2040.", EXPECTED_RP2040_FIRMWARE_VERSION, firmware_version); + let e = program_rp2040(); + if e.is_err() { + warn!("Failed to reprogram rp2040: {}", e.unwrap_err()); + panic!("Exit"); + } + process::exit(0); + } + + let recording_mode = + if LittleEndian::read_u32(&chunk[12..16]) != 0 { + RecordingMode::Audio + } else { + RecordingMode::Thermal + }; + recording_state.set_mode(recording_mode); + info!( + "Got startup info: \ + radiometry enabled: {radiometry_enabled}, \ + firmware version: {firmware_version}, \ + lepton serial #{lepton_serial_number} \ + recording mode {:?}", + recording_mode + ); + + if device_config.use_low_power_mode() + && !radiometry_enabled + && !device_config.is_audio_device() + { + exit_cleanly(&mut dbus_conn); + error!( + "Low power mode is currently only supported on \ + lepton sensors with radiometry or audio device, exiting." + ); + panic!("Exit"); + } + // Terminate any existing file download. + let in_progress_file_transfer = file_download.take(); + if let Some(file) = in_progress_file_transfer { + warn!( + "Aborting in progress file transfer with {} bytes", + file.len() + ); + } + let _ = camera_handshake_channel_tx.send( + FrameSocketServerMessage { + camera_handshake_info: None, + camera_file_transfer_in_progress: false, + }, + ); + } + CAMERA_SEND_LOGGER_EVENT => { + let event_kind = LittleEndian::read_u16(&chunk[0..2]); + let mut event_timestamp = + LittleEndian::read_u64(&chunk[2..2 + 8]); + let event_payload = LittleEndian::read_u64(&chunk[10..18]); + if let Ok(mut event_kind) = + LoggerEventKind::try_from(event_kind) + { + if let Some(mut time) = + DateTime::from_timestamp_micros(event_timestamp as i64) + { + if let LoggerEventKind::SetAlarm(alarm_time) = + &mut event_kind + { + if DateTime::from_timestamp_micros( + event_payload as i64, + ) + .is_some() + { + *alarm_time = event_payload; + } else { + warn!( + "Wakeup alarm from event was invalid {}", + event_payload + ); + } + } else if let LoggerEventKind::Rp2040MissedAudioAlarm( + alarm_time, + ) = &mut event_kind + { + if DateTime::from_timestamp_micros( + event_payload as i64, + ) + .is_some() + { + *alarm_time = event_payload; + } else { + warn!( + "Missed alarm from event was invalid {}", + event_payload + ); + } + } else if let LoggerEventKind::ToldRpiToWake(reason) = + &mut event_kind + { + if let Ok(wake_reason) = + WakeReason::try_from(event_payload as u8) + { + *reason = wake_reason; + } else { + warn!( + "Told rpi to wake invalid reason {}", + event_payload + ); + } + } else if let LoggerEventKind::RtcCommError = + &mut event_kind + { + if event_timestamp == 0 { + time = chrono::Local::now().with_timezone(&Utc); + event_timestamp = + time.timestamp_micros() as u64; + } + } + let payload_json = + if let LoggerEventKind::SavedNewConfig = event_kind + { + // If we get saved new config, the rp2040 would have just been + // restarted after the config change, so we can log the current + // config in relation to that event. + let json_inner = format!( + r#""continuous-recorder": {}, + "use-low-power-mode": {}, + "start-recording": "{:?}", + "stop-recording": "{:?}", + "location": "({}, {}, {})" + "#, + device_config.is_continuous_recorder(), + device_config.use_low_power_mode(), + device_config.recording_window().0, + device_config.recording_window().1, + device_config.lat_lng().0, + device_config.lat_lng().1, + device_config + .location_altitude() + .unwrap_or(0.0) + ); + let json = + String::from(format!("{{{}}}", json_inner)); + Some(json) + } else { + None + }; + info!( + "Got logger event {:?} at {}", + event_kind, + time.with_timezone(&Pacific__Auckland) + ); + let event = + LoggerEvent::new(event_kind, event_timestamp); + event.log(&mut dbus_conn, payload_json); + } else { + warn!( + "Event had invalid timestamp {}", + event_timestamp + ); + } + } else { + warn!("Unknown logger event kind {}", event_kind); + } + } + CAMERA_BEGIN_FILE_TRANSFER => { + if file_download.is_some() { + warn!("Trying to begin file without ending current"); + } + info!("Begin file transfer"); + // Open new file transfer + part_count += 1; + // If we have to grow this Vec once it gets big it can be slow and interrupt the transfer. + // TODO: Should really be able to recover from that though! + let mut file = Vec::with_capacity(150_000_000); + file.extend_from_slice(&chunk); + file_download = Some(file); + let _ = camera_handshake_channel_tx.send( + FrameSocketServerMessage { + camera_handshake_info: None, + camera_file_transfer_in_progress: true, + }, + ); + } + CAMERA_RESUME_FILE_TRANSFER => { + if let Some(file) = &mut file_download { + // Continue current file transfer + //println!("Continue file transfer"); + if part_count % 100 == 0 { + let megabytes_per_second = (file.len() + chunk.len()) + as f32 + / Instant::now() + .duration_since(start) + .as_secs_f32() + / (1024.0 * 1024.0); + info!( + "Transferring part #{} {:?} for {} bytes, {}MB/s", + part_count, + Instant::now().duration_since(start), + file.len() + chunk.len(), + megabytes_per_second + ); + } + part_count += 1; + file.extend_from_slice(&chunk); + let _ = camera_handshake_channel_tx.send( + FrameSocketServerMessage { + camera_handshake_info: None, + camera_file_transfer_in_progress: true, + }, + ); + } else { + warn!("Trying to continue file with no open file"); + if !got_startup_info { + if recording_state + .safe_to_restart_rp2040(&mut dbus_conn) + { + let date = chrono::Local::now(); + error!( + "1) Requesting reset of rp2040 to \ + force handshake, {}", + date.format("%Y-%m-%d--%H:%M:%S") + ); + let _ = restart_rp2040_channel_tx.send(true); + } + } + } + } + CAMERA_END_FILE_TRANSFER => { + // End current file transfer + if !file_download.is_some() { + warn!("Trying to end file with no open file"); + } + if let Some(mut file) = file_download.take() { + // Continue current file transfer + let megabytes_per_second = (file.len() + chunk.len()) + as f32 + / Instant::now().duration_since(start).as_secs_f32() + / (1024.0 * 1024.0); + info!( + "End file transfer, took {:?} for {} bytes, {}MB/s", + Instant::now().duration_since(start), + file.len() + chunk.len(), + megabytes_per_second + ); + part_count = 0; + file.extend_from_slice(&chunk); + let shebang = LittleEndian::read_u16(&file[0..2]); + if shebang == AUDIO_SHEBANG { + save_audio_file_to_disk(file, device_config.clone()); + } else { + save_cptv_file_to_disk(file, device_config.output_dir()) + } + let _ = camera_handshake_channel_tx.send( + FrameSocketServerMessage { + camera_handshake_info: None, + camera_file_transfer_in_progress: false, + }, + ); + } else { + warn!("Trying to end file with no open file"); + } + } + CAMERA_BEGIN_AND_END_FILE_TRANSFER => { + if file_download.is_some() { + info!( + "Trying to begin (and end) file without ending current" + ); + } + // Open and end new file transfer + part_count = 0; + let mut file = Vec::new(); + file.extend_from_slice(&chunk); + let shebang = LittleEndian::read_u16(&file[0..2]); + if shebang == AUDIO_SHEBANG { + save_audio_file_to_disk(file, device_config.clone()); + } else { + save_cptv_file_to_disk(file, device_config.output_dir()) + } + let _ = camera_handshake_channel_tx.send( + FrameSocketServerMessage { + camera_handshake_info: None, + camera_file_transfer_in_progress: false, + }, + ); + } + CAMERA_GET_MOTION_DETECTION_MASK => { + // Already handled + } + _ => { + if num_bytes != 0 { + warn!("Unhandled transfer type, {:#x}", transfer_type) + } + } + } + } else { + warn!("Crc check failed, remote was notified and will re-transmit"); + } + } else { + spi.read(&mut raw_read_buffer[2066..num_bytes + header_length]) + .map_err(|e| { + error!("SPI read error: {:?}", e); + process::exit(1); + }) + .unwrap(); + // Frame + let mut frame = [0u8; FRAME_LENGTH]; + BigEndian::write_u16_into( + u8_slice_as_u16_slice( + &raw_read_buffer[header_length..header_length + FRAME_LENGTH], + ), + &mut frame, + ); + let back = FRAME_BUFFER.get_back().lock().unwrap(); + back.replace(Some(frame)); + if !got_first_frame { + got_first_frame = true; + info!( + "Got first frame from rp2040, got startup info {}", + got_startup_info + ); + } + // FIXME: Should is_recording bit only be set in high power mode? + let is_recording = + crc_from_remote == 1 && !device_config.use_low_power_mode(); + recording_state.set_is_recording(is_recording); + if !got_startup_info { + warn!("Requesting reset of rp2040 to force handshake"); + rp2040_needs_reset = true; + } else if !rp2040_needs_reset { + FRAME_BUFFER.swap(); + let _ = camera_handshake_channel_tx.send(FrameSocketServerMessage { + camera_handshake_info: Some(CameraHandshakeInfo { + radiometry_enabled, + is_recording: recording_state.is_recording(), + firmware_version, + camera_serial: lepton_serial_number.clone(), + }), + camera_file_transfer_in_progress: false, + }); + } + } + } + } + } + if process_interrupted(&sig_term, &mut dbus_conn) { + break 'transfer; + } + } +} + +fn maybe_make_test_audio_recording( + dbus_conn: &mut DuplexConn, + restart_rp2040_channel_tx: &Sender, + recording_state: &mut RecordingState, +) { + // If the rp2040 is making a recording, and a user test audio recording was requested, + // do nothing. + + // If the user requested a test audio recording, trigger the test audio recording, and + // launch a thread to track when that recording has completed. + if recording_state.user_requested_test_audio_recording() { + recording_state.sync_state_from_attiny(dbus_conn); + if !recording_state.is_recording() + && recording_state.request_test_audio_recording_from_rp2040(dbus_conn) + { + let _ = restart_rp2040_channel_tx.send(true); + info!("Telling rp2040 to take test recording and restarting"); + let mut inner_recording_state = recording_state.clone(); + let _ = thread::spawn(move || { + let dbus_session_path = get_system_bus_path().unwrap_or_else(|e| { + error!("Error getting system DBus: {}", e); + process::exit(1); + }); + match DuplexConn::connect_to_bus(dbus_session_path, true) { + Err(e) => { + error!("Error connecting to system DBus: {}", e); + process::exit(1); + } + Ok(mut conn) => { + let _unique_name = conn.send_hello(Timeout::Infinite); + if _unique_name.is_err() { + error!( + "Error getting handshake with system DBus: {}", + _unique_name.err().unwrap() + ); + process::exit(1); + } else { + loop { + // Re-sync our internal rp2040 state once every 1-2 seconds until + // we see that the state has entered taking_test_audio_recording. + inner_recording_state.sync_state_from_attiny(&mut conn); + let sleep_duration_ms = + if inner_recording_state.is_recording() { 2000 } else { 1000 }; + if inner_recording_state.is_taking_test_audio_recording() { + break; + } + sleep(Duration::from_millis(sleep_duration_ms)); + } + loop { + // Now wait until we've exited taking_test_audio_recording. + inner_recording_state.sync_state_from_attiny(&mut conn); + let sleep_duration_ms = + if inner_recording_state.is_recording() { 2000 } else { 1000 }; + if !inner_recording_state.is_taking_test_audio_recording() { + inner_recording_state.finished_taking_test_recording(); + break; + } + sleep(Duration::from_millis(sleep_duration_ms)); + } + } + } + } + }); + } + } +} diff --git a/src/cptv_frame_dispatch.rs b/src/cptv_frame_dispatch.rs new file mode 100644 index 0000000..a657139 --- /dev/null +++ b/src/cptv_frame_dispatch.rs @@ -0,0 +1,25 @@ +use crate::double_buffer::DoubleBuffer; +use crate::socket_stream::SocketStream; +use crate::FRAME_LENGTH; + +pub type Frame = [u8; FRAME_LENGTH]; +pub static FRAME_BUFFER: DoubleBuffer = DoubleBuffer::new(); +pub fn get_frame(is_recording: bool) -> Option<[u8; 39040]> { + let fb = { FRAME_BUFFER.get_front().lock().unwrap().take() }; + if let Some(mut fb) = fb { + if is_recording { + // Write recording flag into unused telemetry. + fb[639] = 1; + } else { + fb[638] = 0; + } + return Some(fb); + } + None +} +pub fn send_frame(fb: [u8; 39040], stream: &mut SocketStream) -> bool { + if let Err(_) = stream.write_all(&fb) { + return false; + } + stream.flush().is_ok() +} diff --git a/src/dbus_attiny_i2c.rs b/src/dbus_attiny_i2c.rs new file mode 100644 index 0000000..90249fa --- /dev/null +++ b/src/dbus_attiny_i2c.rs @@ -0,0 +1,186 @@ +use crate::EXPECTED_ATTINY_FIRMWARE_VERSION; +use byteorder::{BigEndian, ByteOrder}; +use crc::{Algorithm, Crc}; +use log::error; +use rustbus::connection::Timeout; +use rustbus::{DuplexConn, MessageBuilder, MessageType}; +use std::process; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +const CRC_AUG_CCITT: Algorithm = Algorithm { + width: 16, + poly: 0x1021, + init: 0x1D0F, + refin: false, + refout: false, + xorout: 0x0000, + check: 0x0000, + residue: 0x0000, +}; + +pub fn dbus_read_attiny_command(conn: &mut DuplexConn, command: u8) -> Result { + dbus_attiny_command(conn, command, None) +} + +pub fn dbus_write_attiny_command( + conn: &mut DuplexConn, + command: u8, + value: u8, +) -> Result { + dbus_attiny_command(conn, command, Some(value)) +} + +pub fn dbus_attiny_command( + conn: &mut DuplexConn, + command: u8, + value: Option, +) -> Result { + let max_attempts = 10; + let retry_delay = Duration::from_secs(2); + + for attempt in 1..=max_attempts { + match dbus_attiny_command_attempt(conn, command, value) { + Ok(result) => return Ok(result), + Err(e) => { + if attempt == max_attempts { + return Err("Max attempts reached: failed to execute i2c dbus command"); + } + error!( + "Attempt {}/{} failed: {}. Retrying in {:?}...", + attempt, max_attempts, e, retry_delay + ); + std::thread::sleep(retry_delay); + } + } + } + Err("Failed to execute dbus command after maximum retries") +} + +pub fn dbus_attiny_command_attempt( + conn: &mut DuplexConn, + command: u8, + value: Option, +) -> Result { + let is_read_command = value.is_none(); + let mut payload: Vec = vec![command]; + if let Some(value) = value { + payload.push(value); + } + let crc = Crc::::new(&CRC_AUG_CCITT).checksum(&payload); + payload.push(0); + payload.push(0); + let len = payload.len(); + BigEndian::write_u16(&mut payload[len - 2..], crc); + let mut call = MessageBuilder::new() + .call("Tx") + .with_interface("org.cacophony.i2c") + .on("/org/cacophony/i2c") + .at("org.cacophony.i2c") + .build(); + call.body.push_param(0x25u8).unwrap(); + call.body.push_param(payload).unwrap(); + if is_read_command { + call.body.push_param(3i32).unwrap(); // Num bytes to receive including CRC validation code + } else { + call.body.push_param(0i32).unwrap(); // Receive nothing if this was a write. + } + call.body.push_param(1000i32).unwrap(); // Timeout ms + let id = conn.send.send_message(&call).unwrap().write_all().unwrap(); + let mut attempts = 0; + // Now wait for the reply that matches our call id + loop { + if let Ok(message) = + conn.recv.get_next_message(Timeout::Duration(Duration::from_millis(10))) + { + match message.typ { + MessageType::Reply => { + let reply_id = message.dynheader.response_serial.unwrap(); + if reply_id == id { + // Looks like the first 4 bytes are a u32 with the length of the following bytes + // which can be ignored. + return if is_read_command { + let response = &message.get_buf()[4..][0..3]; + let crc = Crc::::new(&CRC_AUG_CCITT).checksum(&response[0..1]); + let received_crc = BigEndian::read_u16(&response[1..=2]); + if received_crc != crc { + Err("CRC Mismatch") + } else { + Ok(response[0]) + } + } else { + // Check that the written value was actually written correctly. + let set_value = dbus_attiny_command(conn, command, None); + match set_value { + Ok(new_value) => { + if new_value != value.unwrap() { + Err("Failed setting state on attiny") + } else { + Ok(0) + } + } + Err(e) => Err(e), + } + }; + } + } + MessageType::Error => { + let reply_id = message.dynheader.response_serial.unwrap(); + if reply_id == id { + return Err("Dbus error"); + } + } + _ => {} + } + } + attempts += 1; + if attempts == 100 { + return Err("Timed out waiting for response from Dbus service"); + } + } +} + +pub fn exit_cleanly(conn: &mut DuplexConn) { + let _ = dbus_write_attiny_command(conn, 0x07, 0x00); +} + +pub fn process_interrupted(term: &Arc, conn: &mut DuplexConn) -> bool { + if term.load(Ordering::Relaxed) { + // We got terminated - before we exit, clean up the tc2-agent ready state register + exit_cleanly(conn); + true + } else { + false + } +} + +pub fn read_tc2_agent_state(conn: &mut DuplexConn) -> Result { + dbus_read_attiny_command(conn, 0x07) +} + +pub fn read_attiny_firmware_version(conn: &mut DuplexConn) -> Result { + dbus_read_attiny_command(conn, 0x01) +} + +pub fn exit_if_attiny_version_is_not_as_expected(dbus_conn: &mut DuplexConn) { + let version = read_attiny_firmware_version(dbus_conn); + match version { + Ok(version) => match version { + EXPECTED_ATTINY_FIRMWARE_VERSION => {} + _ => { + error!( + "Mismatched attiny firmware version, expected {}, got {}", + EXPECTED_ATTINY_FIRMWARE_VERSION, version + ); + exit_cleanly(dbus_conn); + process::exit(1); + } + }, + Err(msg) => { + error!("{}", msg); + exit_cleanly(dbus_conn); + process::exit(1); + } + } +} diff --git a/src/dbus_audio.rs b/src/dbus_audio.rs new file mode 100644 index 0000000..7f81fe8 --- /dev/null +++ b/src/dbus_audio.rs @@ -0,0 +1,89 @@ +use crate::RecordingState; +use log::error; +use rustbus::connection::dispatch_conn::{HandleEnvironment, HandleResult, Matches}; +use rustbus::connection::Timeout; +use rustbus::message_builder::MarshalledMessage; +use rustbus::{get_system_bus_path, DispatchConn, DuplexConn}; +use std::{process, thread}; +use thread_priority::{ThreadBuilderExt, ThreadPriority}; + +// TC2-Agent dbus audio service +type MyHandleEnv<'a, 'b> = HandleEnvironment; + +fn default_handler( + _recording_state_ctx: &mut RecordingState, + _matches: Matches, + _msg: &MarshalledMessage, + _env: &mut MyHandleEnv, +) -> HandleResult<()> { + Ok(None) +} + +fn audio_handler( + recording_state_ctx: &mut RecordingState, + _matches: Matches, + msg: &MarshalledMessage, + _env: &mut MyHandleEnv, +) -> HandleResult<()> { + if msg.dynheader.member.as_ref().unwrap() == "testaudio" { + let message = if !recording_state_ctx.is_taking_test_audio_recording() { + recording_state_ctx.request_test_audio_recording(); + "Asked for a test recording" + } else { + "Already making a test recording" + }; + let mut resp = msg.dynheader.make_response(); + resp.body.push_param(message)?; + Ok(Some(resp)) + } else if msg.dynheader.member.as_ref().unwrap() == "audiostatus" { + let mut response = msg.dynheader.make_response(); + let status = recording_state_ctx.get_audio_status(); + response.body.push_param(if recording_state_ctx.is_in_audio_mode() { 1 } else { 0 })?; + response.body.push_param(status as u8)?; + Ok(Some(response)) + } else { + Ok(None) + } +} +#[derive(PartialEq)] +pub enum AudioStatus { + Ready = 1, + WaitingToTakeTestRecording = 2, + TakingTestRecording = 3, + Recording = 4, +} + +pub fn setup_dbus_test_audio_recording_service(recording_state: &RecordingState) { + // set up dbus service for handling messages between managementd and tc2-agent about when + // to make test audio recordings. + let recording_state = recording_state.clone(); + let session_path = get_system_bus_path().unwrap(); + let _dbus_thread = thread::Builder::new().name("dbus-service".to_string()).spawn_with_priority( + ThreadPriority::Max, + move |_| { + let mut dbus_conn = + DuplexConn::connect_to_bus(session_path, false).unwrap_or_else(|e| { + error!("Error connecting to system DBus: {}", e); + process::exit(1); + }); + let _name = dbus_conn.send_hello(Timeout::Infinite).unwrap_or_else(|e| { + error!("Error getting handshake with system DBus: {}", e); + process::exit(1); + }); + dbus_conn + .send + .send_message(&mut rustbus::standard_messages::request_name( + "org.cacophony.TC2Agent".into(), + rustbus::standard_messages::DBUS_NAME_FLAG_REPLACE_EXISTING, + )) + .unwrap() + .write_all() + .unwrap(); + + let mut dispatch_conn = + DispatchConn::new(dbus_conn, recording_state, Box::new(default_handler)); + dispatch_conn.add_handler("/org/cacophony/TC2Agent", Box::new(audio_handler)); + dispatch_conn.run().unwrap(); + }, + ); +} diff --git a/src/detection_mask.rs b/src/detection_mask.rs index e847bdf..cba08fb 100644 --- a/src/detection_mask.rs +++ b/src/detection_mask.rs @@ -1,13 +1,19 @@ -#[derive(Debug, PartialEq, Clone)] +use std::fmt::{Debug, Formatter}; + +#[derive(PartialEq, Clone)] pub struct DetectionMask { inner: [u8; 2400], } +impl Debug for DetectionMask { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DetectionMask").field("has_masking", &self.has_masking()).finish() + } +} + impl DetectionMask { pub fn new(mask: Option<[u8; 2400]>) -> DetectionMask { - DetectionMask { - inner: mask.unwrap_or([0u8; 2400]), - } + DetectionMask { inner: mask.unwrap_or([0u8; 2400]) } } #[allow(unused)] pub fn is_masked_at_pos(&self, x: usize, y: usize) -> bool { @@ -25,6 +31,10 @@ impl DetectionMask { self.inner[i >> 3] |= 1 << (i % 8); } + pub fn has_masking(&self) -> bool { + self.inner.iter().any(|x| *x != 0x00) + } + #[inline(always)] #[allow(unused)] pub fn is_masked_at_index(&self, index: usize) -> bool { diff --git a/src/device_config.rs b/src/device_config.rs index 5b2457a..d7b7d44 100644 --- a/src/device_config.rs +++ b/src/device_config.rs @@ -1,3 +1,4 @@ +use core::fmt; use std::collections::HashMap; // Read camera config file use crate::detection_mask::DetectionMask; @@ -5,17 +6,22 @@ use byteorder::{LittleEndian, WriteBytesExt}; use chrono::{ DateTime, Duration, FixedOffset, Local, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc, }; -use log::{error, info}; +use log::{error, info, warn}; +use louvre::triangulate; +use notify::event::{AccessKind, AccessMode}; +use notify::{Event, EventKind, RecursiveMode, Watcher}; use serde::de::Error; use serde::{Deserialize, Deserializer}; -use std::fs; +use std::fmt::{Debug, Formatter}; use std::io::{Cursor, Write}; use std::ops::Add; +use std::path::Path; use std::str::FromStr; +use std::sync::mpsc::{Receiver, Sender, TryRecvError}; +use std::{fs, process}; use sun_times::sun_times; use toml::value::Offset; use toml::Value; -use triangulate::{ListFormat, Polygon}; fn default_constant_recorder() -> bool { false @@ -47,17 +53,11 @@ fn default_activate_thermal_throttler() -> bool { } fn default_recording_start_time() -> AbsRelTime { - AbsRelTime { - relative_time_seconds: Some(-(60 * 30)), - absolute_time: None, - } + AbsRelTime { relative_time_seconds: Some(-(60 * 30)), absolute_time: None } } fn default_recording_stop_time() -> AbsRelTime { - AbsRelTime { - relative_time_seconds: Some(60 * 30), - absolute_time: None, - } + AbsRelTime { relative_time_seconds: Some(60 * 30), absolute_time: None } } #[derive(Debug)] @@ -66,11 +66,11 @@ struct TimeUnit(char); #[derive(Debug)] struct NumberString(String, Option, bool); -fn sign(p1: (f32, f32), p2: (f32, f32), p3: (f32, f32)) -> f32 { +fn sign(p1: (f64, f64), p2: (f64, f64), p3: (f64, f64)) -> f64 { (p1.0 - p3.0) * (p2.1 - p3.1) - (p2.0 - p3.0) * (p1.1 - p3.1) } -fn point_in_triangle(triangle: ((f32, f32), (f32, f32), (f32, f32)), point: (f32, f32)) -> bool { +fn point_in_triangle(triangle: ((f64, f64), (f64, f64), (f64, f64)), point: (f64, f64)) -> bool { let d1 = sign(point, triangle.0, triangle.1); let d2 = sign(point, triangle.1, triangle.2); let d3 = sign(point, triangle.2, triangle.0); @@ -78,15 +78,15 @@ fn point_in_triangle(triangle: ((f32, f32), (f32, f32), (f32, f32)), point: (f32 let has_neg = (d1 < 0.) || (d2 < 0.) || (d3 < 0.); let has_pos = (d1 > 0.) || (d2 > 0.) || (d3 > 0.); - return !(has_neg && has_pos); + !(has_neg && has_pos) } fn deserialize_mask_regions<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { - let masks: toml::map::Map = Deserialize::deserialize(deserializer)?; - let mut regions: HashMap> = HashMap::new(); + let masks: toml::map::Map = Deserialize::deserialize(deserializer)?; + let mut regions: HashMap> = HashMap::new(); for (label, mask_region) in masks { let mut region = Vec::new(); match mask_region { @@ -106,8 +106,8 @@ where let mut y; for (idx, el) in coord.iter().enumerate() { let el_val = match &el { - Value::Float(float_val) => Some(*float_val as f32), - Value::Integer(int_val) => Some(*int_val as f32), + Value::Float(float_val) => Some(*float_val as f64), + Value::Integer(int_val) => Some(*int_val as f64), _ => { error!("Region '{}'[{}].{}: Unsupported coordinate value, expected Float or Integer", label, i, if idx == 0 {'x'} else { 'y' }); None @@ -131,10 +131,7 @@ where } } } - _ => error!( - "Region '{}': Must be an array of [[x, y], ...] coordinates", - label - ), + _ => error!("Region '{}': Must be an array of [[x, y], ...] coordinates", label), } regions.insert(label.clone(), region); } @@ -143,19 +140,14 @@ where let w = 160.0; let h = 120.0; for (_label, polygon) in regions { - let mut triangulated_indices: Vec = Vec::new(); - polygon - .triangulate( - triangulate::formats::IndexedListFormat::new(&mut triangulated_indices) - .into_fan_format(), - ) - .expect("Triangulation failed"); - for corners in triangulated_indices.chunks_exact(3) { + let mut polygon = polygon.concat(); + let (polygon, triangulated_indices) = triangulate(&mut polygon, 2); + for corners in triangulated_indices.chunks_exact(6) { // Map each triangle into the frame space, then do 'point-in triangle checks for each pixel of the frame. triangles.push(( - (polygon[corners[0]][0] * w, polygon[corners[0]][1] * h), - (polygon[corners[1]][0] * w, polygon[corners[1]][1] * h), - (polygon[corners[2]][0] * w, polygon[corners[2]][1] * h), + (polygon[corners[0]] * w, polygon[corners[1]] * h), + (polygon[corners[2]] * w, polygon[corners[3]] * h), + (polygon[corners[4]] * w, polygon[corners[5]] * h), )); } } @@ -163,7 +155,7 @@ where for y in 0..120 { for x in 0..160 { for triangle in &triangles { - if point_in_triangle(*triangle, (x as f32, y as f32)) { + if point_in_triangle(*triangle, (x as f64, y as f64)) { mask.set_pos(x, y); } } @@ -278,10 +270,7 @@ where if absolute_time.is_none() && relative_time_seconds.is_none() { Err(Error::custom(format!("Failed to parse window time: {}", s))) } else { - Ok(AbsRelTime { - absolute_time, - relative_time_seconds, - }) + Ok(AbsRelTime { absolute_time, relative_time_seconds }) } } @@ -339,15 +328,9 @@ struct LocationSettings { longitude: Option, altitude: Option, - #[serde( - deserialize_with = "timestamp_to_u64", - default = "default_location_timestamp" - )] + #[serde(deserialize_with = "timestamp_to_u64", default = "default_location_timestamp")] timestamp: Option, - #[serde( - deserialize_with = "location_accuracy_to_f32", - default = "default_location_accuracy" - )] + #[serde(deserialize_with = "location_accuracy_to_f32", default = "default_location_accuracy")] accuracy: Option, } @@ -362,16 +345,32 @@ fn timezone_offset_seconds() -> i32 { // devices' GPS coordinates to work out correct absolute start/end recording window times. let now = Local::now(); let local_tz = now.timezone(); - local_tz - .offset_from_utc_datetime(&now.naive_utc()) - .local_minus_utc() + local_tz.offset_from_utc_datetime(&now.naive_utc()).local_minus_utc() } -#[derive(Debug, PartialEq, Clone)] +#[derive(PartialEq, Clone)] pub struct AbsRelTime { absolute_time: Option, relative_time_seconds: Option, } +impl Debug for AbsRelTime { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let absolute_time = self.absolute_time.clone(); + let relative_time = self.relative_time_seconds.clone(); + if let Some(time) = absolute_time { + return f + .debug_struct("AbsoluteTime") + .field("hour", &time.hour) + .field("min", &time.min) + .finish(); + } + if let Some(time) = relative_time { + return f.debug_struct("RelativeOffset").field("secs", &time).finish(); + } + Err(fmt::Error::default()) + } +} + impl AbsRelTime { pub fn time_offset(&self) -> (bool, i32) { // Absolute or relative time in seconds in the day @@ -474,9 +473,7 @@ pub struct AudioSettings { impl Default for AudioSettings { fn default() -> Self { - AudioSettings { - audio_mode: default_audio_mode(), - } + AudioSettings { audio_mode: default_audio_mode() } } } @@ -492,10 +489,7 @@ where if let Ok(mode) = AudioMode::from_str(&audio_mode_raw) { Ok(mode) } else { - Err(Error::custom(format!( - "Failed to parse audio mode: {}", - audio_mode_raw - ))) + Err(Error::custom(format!("Failed to parse audio mode: {}", audio_mode_raw))) } } @@ -578,13 +572,7 @@ impl DeviceConfig { } pub fn device_name(&self) -> &[u8] { - self.device_info - .as_ref() - .unwrap() - .name - .as_ref() - .unwrap() - .as_bytes() + self.device_info.as_ref().unwrap().name.as_ref().unwrap().as_bytes() } pub fn lat_lng(&self) -> (f32, f32) { @@ -626,11 +614,7 @@ impl DeviceConfig { pub fn is_continuous_recorder(&self) -> bool { self.recording_settings.constant_recorder - || (self - .recording_window - .start_recording - .absolute_time - .is_some() + || (self.recording_window.start_recording.absolute_time.is_some() && self.recording_window.stop_recording.absolute_time.is_some() && self.recording_window.start_recording == self.recording_window.stop_recording) } @@ -652,12 +636,15 @@ impl DeviceConfig { "No location set for this device. To enter recording mode, a location must be set." ); // TODO: Event log error? - std::process::exit(1); + process::exit(1); } if !device_config.is_registered() { - error!("This device is not yet registered. To enter recording mode the device must be named assigned to a project."); + error!( + "This device is not yet registered. \ + To enter recording mode the device must be named assigned to a project." + ); // TODO: Event log error? - std::process::exit(1); + process::exit(1); } info!("Got config {:?}", device_config); if device_config.audio_info.audio_mode != AudioMode::AudioOnly { @@ -688,17 +675,11 @@ impl DeviceConfig { start_offset = 86_400 + start_offset; } let (window_start, window_end) = if !is_absolute_start || !is_absolute_end { - let location = self - .location - .as_ref() - .expect("Relative recording windows require a location"); + let location = + self.location.as_ref().expect("Relative recording windows require a location"); let (lat, lng) = ( - location - .latitude - .expect("Relative recording windows require a valid latitude"), - location - .longitude - .expect("Relative recording windows require a valid longitude"), + location.latitude.expect("Relative recording windows require a valid latitude"), + location.longitude.expect("Relative recording windows require a valid longitude"), ); let altitude = location.altitude; let yesterday_utc = *now_utc - Duration::days(1); @@ -711,13 +692,9 @@ impl DeviceConfig { .unwrap(); let yesterday_sunset = yesterday_sunset.naive_utc() + Duration::seconds(start_offset as i64); - let (today_sunrise, today_sunset) = sun_times( - now_utc.date(), - lat as f64, - lng as f64, - altitude.unwrap_or(0.0) as f64, - ) - .unwrap(); + let (today_sunrise, today_sunset) = + sun_times(now_utc.date(), lat as f64, lng as f64, altitude.unwrap_or(0.0) as f64) + .unwrap(); let today_sunrise = today_sunrise.naive_utc() + Duration::seconds(end_offset as i64); let today_sunset = today_sunset.naive_utc() + Duration::seconds(start_offset as i64); let tomorrow_utc = *now_utc + Duration::days(1); @@ -833,47 +810,17 @@ impl DeviceConfig { "Next recording window will start in {}h{}m and end in {}h{}m, window duration {}h{}m", starts_in_hours, starts_in_mins, ends_in_hours, ends_in_mins, window_hours, window_mins ); - - info!( - "Next recording window will end in {}h{}m, window duration {}h{}m", - ends_in_hours, ends_in_mins, window_hours, window_mins - ); } pub fn time_is_in_recording_window(&self, date_time_utc: &NaiveDateTime) -> bool { if self.is_continuous_recorder() { return true; } let (start_time, end_time) = self.next_recording_window(date_time_utc); - let starts_in = start_time - *date_time_utc; - let starts_in_hours = starts_in.num_hours(); - let starts_in_mins = starts_in.num_minutes() - (starts_in_hours * 60); - let ends_in = end_time - *date_time_utc; - let ends_in_hours = ends_in.num_hours(); - let ends_in_mins = ends_in.num_minutes() - (ends_in_hours * 60); - let window = end_time - start_time; - let window_hours = window.num_hours(); - let window_mins = window.num_minutes() - (window_hours * 60); - if start_time > *date_time_utc && end_time > *date_time_utc { - info!( - "Recording will start in {}h{}m and end in {}h{}m, window duration {}h{}m", - starts_in_hours, - starts_in_mins, - ends_in_hours, - ends_in_mins, - window_hours, - window_mins - ); - } else if end_time > *date_time_utc { - info!( - "Recording will end in {}h{}m, window duration {}h{}m", - ends_in_hours, ends_in_mins, window_hours, window_mins - ); - } *date_time_utc >= start_time && *date_time_utc <= end_time } pub fn is_audio_device(&self) -> bool { - return self.audio_info.audio_mode != AudioMode::Disabled; + self.audio_info.audio_mode != AudioMode::Disabled } pub fn write_to_slice(&self, output: &mut [u8]) { @@ -913,10 +860,8 @@ impl DeviceConfig { buf.write_i32::(start_seconds_offset).unwrap(); buf.write_u8(if end_is_abs { 1 } else { 0 }).unwrap(); buf.write_i32::(end_seconds_offset).unwrap(); - buf.write_u8(if self.is_continuous_recorder() { 1 } else { 0 }) - .unwrap(); - buf.write_u8(if self.use_low_power_mode() { 1 } else { 0 }) - .unwrap(); + buf.write_u8(if self.is_continuous_recorder() { 1 } else { 0 }).unwrap(); + buf.write_u8(if self.use_low_power_mode() { 1 } else { 0 }).unwrap(); let device_name = self.device_name(); let device_name_length = device_name.len().min(63); @@ -924,3 +869,83 @@ impl DeviceConfig { buf.write(&device_name[0..device_name_length]).unwrap(); } } + +pub fn watch_local_config_file_changes( + mut current_config: DeviceConfig, + config_tx: &Sender, +) { + let config_tx = config_tx.clone(); + let mut watcher = notify::recommended_watcher(move |res| match res { + Ok(Event { kind, .. }) => { + match kind { + EventKind::Access(AccessKind::Close(AccessMode::Write)) => { + // File got written to + // Send event to + match DeviceConfig::load_from_fs() { + Ok(config) => { + // Send to rp2040 to write via a channel somehow. + // Maybe we have to restart the rp2040 here so that it re-handshakes + // and picks up the new info + if config != current_config { + current_config = config; + warn!("Config updated"); + let _ = config_tx.send(current_config.clone()); + } + } + Err(msg) => { + error!("Load config error: {}", msg); + process::exit(1); + } + } + } + _ => {} + } + } + Err(e) => error!("file watch error for /etc/cacophony/config.toml: {:?}", e), + }) + .map_err(|e| { + error!("{}", e); + process::exit(1); + }) + .unwrap(); + // Add a path to be watched. All files and directories at that path and + // below will be monitored for changes. + watcher + .watch(Path::new("/etc/cacophony/config.toml"), RecursiveMode::NonRecursive) + .map_err(|e| { + error!("File watcher setup error: {e}"); + process::exit(1); + }) + .unwrap(); +} + +// FIXME: Use RecordingModeState here? +pub fn check_for_device_config_changes( + device_config_change_channel_rx: &Receiver, + device_config: &mut DeviceConfig, + is_audio_device: &mut bool, + rp2040_needs_reset: &mut bool, +) { + // Check once per frame to see if the config file may have been changed + let updated_config = device_config_change_channel_rx.try_recv(); + match updated_config { + Ok(config) => { + // NOTE: Defer this till a time we know the rp2040 isn't writing to flash memory. + if *device_config != config { + info!("Config updated, should update rp2040"); + *device_config = config; + if !*is_audio_device { + //will update after reset request if in audio mode + *is_audio_device = device_config.is_audio_device(); + } + } + *rp2040_needs_reset = true; + } + Err(kind) => match kind { + TryRecvError::Empty => {} + TryRecvError::Disconnected => { + warn!("Disconnected from config file watcher channel"); + } + }, + } +} diff --git a/src/double_buffer.rs b/src/double_buffer.rs index 95321e7..e0a853c 100644 --- a/src/double_buffer.rs +++ b/src/double_buffer.rs @@ -1,4 +1,4 @@ -use crate::Frame; +use crate::cptv_frame_dispatch::Frame; use std::cell::RefCell; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; diff --git a/src/frame_socket_server.rs b/src/frame_socket_server.rs new file mode 100644 index 0000000..82563f9 --- /dev/null +++ b/src/frame_socket_server.rs @@ -0,0 +1,266 @@ +use crate::camera_transfer_state::CameraHandshakeInfo; +use crate::cptv_frame_dispatch; +use crate::program_rp2040::program_rp2040; +use crate::recording_state::{RecordingMode, RecordingState}; +use crate::socket_stream::{get_socket_address, SocketStream}; +use crate::telemetry::{read_telemetry, Telemetry}; +use log::{debug, error, info, warn}; +use rppal::gpio::OutputPin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::{Receiver, RecvTimeoutError}; +use std::sync::Arc; +use std::thread::sleep; +use std::time::{Duration, Instant}; +use std::{process, thread}; +use thread_priority::{ThreadBuilderExt, ThreadPriority}; + +pub struct FrameSocketServerMessage { + pub(crate) camera_handshake_info: Option, + pub(crate) camera_file_transfer_in_progress: bool, +} + +fn restart_rp2040_if_requested( + restart_rp2040_channel_rx: &Receiver, + run_pin: &mut OutputPin, + restart_rp2040_ack: &mut Arc, +) { + // Check if we need to reset rp2040 because of a config change + if let Ok(_) = restart_rp2040_channel_rx.try_recv() { + restart_rp2040_ack.store(true, Ordering::Relaxed); + info!("Restarting rp2040"); + if !run_pin.is_set_high() { + run_pin.set_high(); + sleep(Duration::from_millis(1000)); + } + run_pin.set_low(); + sleep(Duration::from_millis(1000)); + run_pin.set_high(); + } +} + +pub fn spawn_frame_socket_server_thread( + restart_rp2040_channel_rx: Receiver, + camera_handshake_channel_rx: Receiver, + serve_frames_via_wifi: bool, + mut run_pin: OutputPin, + mut restart_rp2040_ack: Arc, + recording_state: &RecordingState, +) { + let recording_state = recording_state.clone(); + let _ = thread::Builder::new().name("frame-socket".to_string()).spawn_with_priority( + ThreadPriority::Max, + move |result| { + let address = get_socket_address(serve_frames_via_wifi); + let management_address = "/var/spool/managementd".to_string(); + // Spawn a thread which can output the frames, converted to rgb grayscale + // This is printed out from within the spawned thread. + if result.is_err() { + error!( + "Thread must have permissions to run with realtime priority, run as root user" + ); + process::exit(1); + } + + let mut reconnects = 0; + let mut prev_frame_num = None; + let mut sockets: [(String, bool, Option); 2] = + [(address, serve_frames_via_wifi, None), (management_address, false, None)]; + loop { + restart_rp2040_if_requested( + &restart_rp2040_channel_rx, + &mut run_pin, + &mut restart_rp2040_ack, + ); + let mut recv_timeout_ms = 10; + info!("Connecting to frame sockets"); + let mut ms_elapsed = 0; + loop { + restart_rp2040_if_requested( + &restart_rp2040_channel_rx, + &mut run_pin, + &mut restart_rp2040_ack, + ); + if recording_state.recording_mode() == RecordingMode::Thermal { + for (address, use_wifi, stream) in + sockets.iter_mut().filter(|(_, _, stream)| stream.is_none()) + { + let stream_connection: Option = + SocketStream::from_address(&address, *use_wifi).ok(); + if stream_connection.is_some() { + println!("Connected to {}", &address); + } + *stream = stream_connection; + } + + let connections = + sockets.iter().filter(|(_, _, stream)| stream.is_some()).count(); + if connections == 0 { + sleep(Duration::from_millis(1000)); + continue; + } + } + + handle_payload_from_frame_acquire_thread( + camera_handshake_channel_rx + .recv_timeout(Duration::from_millis(recv_timeout_ms)), + &mut sockets, + &mut ms_elapsed, + &mut reconnects, + &mut prev_frame_num, + &recording_state, + &mut recv_timeout_ms, + ); + } + } + }, + ); +} + +fn handle_payload_from_frame_acquire_thread( + result: Result, + sockets: &mut [(String, bool, Option); 2], + ms_elapsed: &mut u64, + reconnects: &mut usize, + prev_frame_num: &mut Option, + recording_state: &RecordingState, + recv_timeout_ms: &mut u64, +) { + match result { + Ok(FrameSocketServerMessage { + camera_handshake_info: + Some(CameraHandshakeInfo { + radiometry_enabled, + is_recording, + firmware_version, + camera_serial, + }), + camera_file_transfer_in_progress: false, + }) => { + let model = if radiometry_enabled { "lepton3.5" } else { "lepton3" }; + let header = format!( + "ResX: 160\n\ + ResX: 160\n\ + ResY: 120\n\ + FrameSize: 39040\n\ + Model: {model}\n\ + Brand: flir\n\ + FPS: 9\n\ + Firmware: DOC-AI-v0.{firmware_version}\n\ + CameraSerial: {camera_serial}\n\n", + ); + for (_, _, stream) in sockets.iter_mut().filter(|(_, use_wifi, stream)| { + stream.is_some() && !use_wifi && !stream.as_ref().unwrap().sent_header + }) { + let stream = stream.as_mut().expect("Never fails, because we filtered already."); + if stream.write_all(header.as_bytes()).is_err() { + warn!("Failed sending header info"); + } + // Clear existing + if stream.write_all(b"clear").is_err() { + warn!("Failed clearing buffer"); + } + let _ = stream.flush(); + stream.sent_header = true; + } + + if *reconnects > 0 { + info!("Got frame connection"); + *prev_frame_num = None; + *reconnects = 0; + } + let s = Instant::now(); + let mut telemetry: Option = None; + let frame_data = cptv_frame_dispatch::get_frame(is_recording); + if let Some(fb) = frame_data { + telemetry = Some(read_telemetry(&fb)); + for (address, use_wifi, stream) in + sockets.iter_mut().filter(|(_, _, stream)| stream.is_some()) + { + let sent = + cptv_frame_dispatch::send_frame(fb, stream.as_mut().expect("Never fails")); + if !sent { + warn!( + "Send to {} failed", + if *use_wifi { "tc2-frames server" } else { address } + ); + let _ = stream.take().expect("Never fails").shutdown().is_ok(); + } + } + } + let e = s.elapsed().as_secs_f32(); + if e > 0.1 { + info!("socket send took {}s", e); + } + if let Some(telemetry) = telemetry { + if let Some(prev_frame_num) = prev_frame_num { + if !telemetry.ffc_in_progress { + if telemetry.frame_num != *prev_frame_num + 1 { + // NOTE: Frames can be missed when the raspberry pi + // blocks the thread with the + // unix socket in `thermal-recorder`. + debug!( + "Missed {} frames after {}s on", + telemetry.frame_num - (*prev_frame_num + 1), + telemetry.msec_on as f32 / 1000.0 + ); + } + } + } + *prev_frame_num = Some(telemetry.frame_num); + if telemetry.frame_num % 2700 == 0 { + info!("Got frame #{}", telemetry.frame_num); + } + } + *ms_elapsed = 0; + } + Ok(FrameSocketServerMessage { + camera_handshake_info: None, + camera_file_transfer_in_progress: true, + }) => { + // There's a file transfer in progress, and we got a recording mode change? + *ms_elapsed = 0; + match recording_state.recording_mode() { + RecordingMode::Audio => { + *recv_timeout_ms = 1000; + for (address, use_wifi, stream) in + sockets.iter_mut().filter(|(_, _, stream)| stream.is_some()) + { + info!( + "Shutting down socket '{}'", + if *use_wifi { "tc2-frames server" } else { address } + ); + let _ = stream.take().unwrap().shutdown().is_ok(); + } + } + RecordingMode::Thermal => { + *recv_timeout_ms = 10; + } + } + } + _ => { + if recording_state.recording_mode() == RecordingMode::Thermal { + const NUM_ATTEMPTS_BEFORE_REPROGRAM: usize = 20; + *ms_elapsed += *recv_timeout_ms; + if *ms_elapsed > 10_000 { + *ms_elapsed = 0; + *reconnects += 1; + if *reconnects == NUM_ATTEMPTS_BEFORE_REPROGRAM { + match program_rp2040() { + Ok(()) => process::exit(0), + Err(e) => { + error!("Failed to reprogram RP2040: {e}"); + process::exit(1); + } + } + } else { + info!( + "-- #{reconnects} waiting to connect to rp2040 \ + (reprogram RP2040 after {} more attempts)", + NUM_ATTEMPTS_BEFORE_REPROGRAM - *reconnects + ); + } + } + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 3583e2a..e90258e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,72 +1,51 @@ +extern crate core; + +mod camera_transfer_state; +mod cptv_frame_dispatch; mod cptv_header; +mod dbus_attiny_i2c; +mod dbus_audio; mod detection_mask; mod device_config; mod double_buffer; mod event_logger; +mod frame_socket_server; mod mode_config; -mod service; +mod program_rp2040; +mod recording_state; +mod save_audio; +mod save_cptv; mod socket_stream; mod telemetry; mod utils; -use byteorder::{BigEndian, ByteOrder, LittleEndian}; -use chrono::NaiveDateTime; -use rppal::spi::BitOrder; -use rppal::{ - gpio::{Gpio, Trigger}, - spi::{Bus, Mode, Polarity, SlaveSelect, Spi}, -}; -use rustbus::connection::dispatch_conn::DispatchConn; +use rppal::gpio::Gpio; use std::fs; -use std::io; -use std::ops::Not; -use std::path::Path; use std::process; -use std::process::Command; -use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; -use std::sync::mpsc::{channel, TryRecvError}; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::sleep; -use std::time::Instant; use std::{thread, time::Duration}; use thread_priority::ThreadBuilderExt; use thread_priority::*; -use crate::cptv_header::{decode_cptv_header_streaming, CptvHeader}; -use crate::device_config::DeviceConfig; -use crate::double_buffer::DoubleBuffer; -use crate::event_logger::{LoggerEvent, LoggerEventKind, WakeReason}; -use crate::mode_config::ModeConfig; -use crate::service::AgentService; -use crate::socket_stream::{get_socket_address, SocketStream}; -use crate::telemetry::{read_telemetry, Telemetry}; -use crate::utils::u8_slice_as_u16_slice; -use crate::ExtTransferMessage::{BeginAndEndFileTransfer, BeginFileTransfer, CameraConnectInfo, CameraRawFrameTransfer, EndFileTransfer, GetMotionDetectionMask, ResumeFileTransfer, SendLoggerEvent}; -use crc::{Algorithm, Crc, CRC_16_XMODEM}; -use log::{error, info, warn}; -use notify::event::{AccessKind, AccessMode}; -use notify::{Event, EventKind, RecursiveMode, Watcher}; +use log::{error, info}; use rustbus::connection::Timeout; -use rustbus::{get_system_bus_path, DuplexConn, MessageBuilder, MessageType}; +use rustbus::{get_system_bus_path, DuplexConn}; use simplelog::*; -use std::io::Write; -pub mod tc2_agent_state { - pub const NOT_READY: u8 = 0x00; - pub const READY: u8 = 1 << 1; - //taking an audio or thermal recording - pub const RECORDING: u8 = 1 << 2; - pub const TEST_AUDIO_RECORDING: u8 = 1 << 3; - //an audio recording has been requested from thermal mode - pub const TAKE_AUDIO: u8 = 1 << 4; - pub const OFFLOAD: u8 = 1 << 5; - //requested to boot into thermal mode from audio mode - pub const THERMAL_MODE: u8 = 1 << 6; - //is no audio mode flag as it isn't needed at this point - // pub const THERMAL_MODE: u8 = 1 << 7; -} +use crate::camera_transfer_state::enter_camera_transfer_loop; +use crate::dbus_attiny_i2c::exit_if_attiny_version_is_not_as_expected; +use crate::dbus_audio::setup_dbus_test_audio_recording_service; +use crate::device_config::watch_local_config_file_changes; +use crate::device_config::DeviceConfig; +use crate::frame_socket_server::spawn_frame_socket_server_thread; +use crate::mode_config::ModeConfig; +use crate::program_rp2040::check_if_rp2040_needs_programming; +use crate::recording_state::RecordingState; const AUDIO_SHEBANG: u16 = 1; @@ -76,355 +55,27 @@ const EXPECTED_ATTINY_FIRMWARE_VERSION: u8 = 1; const SEGMENT_LENGTH: usize = 9760; const FRAME_LENGTH: usize = SEGMENT_LENGTH * 4; -pub type Frame = [u8; FRAME_LENGTH]; -pub static FRAME_BUFFER: DoubleBuffer = DoubleBuffer::new(); -fn get_frame(is_recording: bool) -> Option<[u8; 39040]> { - let fb = { FRAME_BUFFER.get_front().lock().unwrap().take() }; - if let Some(mut fb) = fb { - if is_recording { - // Write recording flag into unused telemetry. - fb[639] = 1; - } else { - fb[638] = 0; - } - return Some(fb); - } - return None; -} -fn send_frame(fb: [u8; 39040], stream: &mut SocketStream) -> bool { - if let Err(_) = stream.write_all(&fb) { - return false; - } - stream.flush().is_ok() -} - -fn save_cptv_file_to_disk(cptv_bytes: Vec, output_dir: &str) { - let output_dir = String::from(output_dir); - thread::spawn(move || match decode_cptv_header_streaming(&cptv_bytes) { - Ok(header) => match header { - CptvHeader::V2(header) => { - info!("Saving CPTV file with header {:?}", header); - let recording_date_time = NaiveDateTime::from_timestamp_millis(header.timestamp as i64 / 1000).unwrap_or(chrono::Local::now().naive_local()); - if fs::metadata(&output_dir).is_err() { - fs::create_dir(&output_dir).expect(&format!("Failed to create output directory {}", output_dir)); - } - let path = format!("{}/{}.cptv", output_dir, recording_date_time.format("%Y-%m-%d--%H-%M-%S")); - // If the file already exists, don't re-save it. - let is_existing_file = match fs::metadata(&path) { - Ok(metadata) => metadata.len() as usize == cptv_bytes.len(), - Err(_) => false, - }; - if !is_existing_file { - match fs::write(&path, &cptv_bytes) { - Ok(()) => { - info!("Saved CPTV file {}", path); - } - Err(e) => { - error!("Failed writing CPTV file to storage at {}, reason: {}", path, e); - } - } - - // NOTE: For debug purposes, we may want to also save the CPTV file locally for inspection. - // let path = format!( - // "{}/{}.cptv", - // "/home/pi", - // recording_date_time.format("%Y-%m-%d--%H-%M-%S") - // ); - // match fs::write(&path, &cptv_bytes) { - // Ok(()) => { - // info!("Saved CPTV file {}", path); - // } - // Err(e) => { - // error!( - // "Failed writing CPTV file to storage at {}, reason: {}", - // path, e - // ); - // } - // } - } else { - error!("File {} already exists, discarding duplicate", path); - } - } - _ => error!("Unsupported CPTV file format, discarding file"), - }, - Err(e) => { - error!("Invalid CPTV file: ({:?}), discarding", e); - } - }); -} - -fn wav_header(audio_bytes: &Vec) -> [u8; 44] { - let mut header: [u8; 44] = [0u8; 44]; - let mut cursor = 0; - for b in "RIFF".bytes() { - header[cursor] = b; - cursor += 1; - } - let file_size = (audio_bytes.len() - 12 + 36) as u32; - for b in file_size.to_le_bytes() { - header[cursor] = b; - cursor += 1; - } - - for b in "WAVEfmt".bytes() { - header[cursor] = b; - cursor += 1; - } - - header[cursor] = 32; - cursor += 1; - header[cursor] = 16; - cursor += 4; - - // PCM - header[cursor] = 1; - cursor += 2; - - // channels - header[cursor] = 1; - cursor += 2; - - let sr = LittleEndian::read_u16(&audio_bytes[10..12]) as u32; - // SR - for b in sr.to_le_bytes() { - header[cursor] = b; - cursor += 1; - } - - let sr = sr * 2; - for b in sr.to_le_bytes() { - header[cursor] = b; - cursor += 1; - } - - header[cursor] = 16 / 2; - cursor += 2; - - for b in 16u16.to_le_bytes() { - header[cursor] = b; - cursor += 1; - } - - for b in "data".bytes() { - header[cursor] = b; - cursor += 1; - } - - for b in ((audio_bytes.len() - 12) as u32).to_le_bytes() { - header[cursor] = b; - cursor += 1; - } - return header; -} - -fn save_audio_file_to_disk(audio_bytes: Vec, output_dir: &str) { - let output_dir = String::from(output_dir); - - thread::spawn(move || { - let header = wav_header(&audio_bytes); - let timestamp = LittleEndian::read_u64(&audio_bytes[2..10]); - let recording_date_time = NaiveDateTime::from_timestamp_millis(timestamp as i64 / 1000).unwrap_or(chrono::Local::now().naive_local()); - info!("Saving Audio file with header "); - if fs::metadata(&output_dir).is_err() { - fs::create_dir(&output_dir).expect(&format!("Failed to create output directory {}", output_dir)); - } - let path: String = format!("{}/{}.wav", output_dir, recording_date_time.format("%Y%m%d-%H%M%S")); - // If the file already exists, don't re-save it. - let is_existing_file = match fs::metadata(&path) { - Ok(metadata) => metadata.len() as usize == audio_bytes.len() - 12, - Err(_) => false, - }; - if !is_existing_file { - match fs::write(&path, &header) { - Ok(()) => { - info!("Saved Audio file header {} bytes are {}", path, header.len()); - } - Err(e) => { - error!("Failed writing Audio file to storage at {}, reason: {}", path, e); - } - } - - let mut f = fs::OpenOptions::new().append(true).create(false).open(&path).expect("Unable to open file"); - match f.write_all(&audio_bytes[12..]) { - Ok(()) => { - info!("Saved Audio file {} bytes are {}", path, audio_bytes.len()); - } - Err(e) => { - error!("Failed writing Audio file to storage at {}, reason: {}", path, e); - } - } - } else { - error!("File {} already exists, discarding duplicate", path); - } - }); -} - -const CAMERA_CONNECT_INFO: u8 = 0x1; - -const CAMERA_RAW_FRAME_TRANSFER: u8 = 0x2; - -const CAMERA_BEGIN_FILE_TRANSFER: u8 = 0x3; - -const CAMERA_RESUME_FILE_TRANSFER: u8 = 0x4; - -const CAMERA_END_FILE_TRANSFER: u8 = 0x5; - -const CAMERA_BEGIN_AND_END_FILE_TRANSFER: u8 = 0x6; - -const CAMERA_GET_MOTION_DETECTION_MASK: u8 = 0x7; -const CAMERA_SEND_LOGGER_EVENT: u8 = 0x8; -#[repr(u8)] -#[derive(Copy, Clone, PartialEq, Debug)] -pub enum ExtTransferMessage { - CameraConnectInfo = 0x1, - CameraRawFrameTransfer = 0x2, - BeginFileTransfer = 0x3, - ResumeFileTransfer = 0x4, - EndFileTransfer = 0x5, - BeginAndEndFileTransfer = 0x6, - GetMotionDetectionMask = 0x7, - SendLoggerEvent = 0x8, -} - -impl TryFrom for ExtTransferMessage { - type Error = (); - - fn try_from(value: u8) -> Result { - match value { - 0x1 => Ok(CameraConnectInfo), - 0x2 => Ok(CameraRawFrameTransfer), - 0x3 => Ok(BeginFileTransfer), - 0x4 => Ok(ResumeFileTransfer), - 0x5 => Ok(EndFileTransfer), - 0x6 => Ok(BeginAndEndFileTransfer), - 0x7 => Ok(GetMotionDetectionMask), - 0x8 => Ok(SendLoggerEvent), - _ => Err(()), - } - } -} - -fn read_tc2_agent_state(conn: &mut DuplexConn) -> Result { - dbus_read_attiny_command(conn, 0x07) -} - -fn read_attiny_recording_flag(conn: &mut DuplexConn) -> bool { - read_tc2_agent_state(conn).map_or(false, |x| x & tc2_agent_state::RECORDING == tc2_agent_state::RECORDING) -} -fn safe_to_restart_rp2040(conn: &mut DuplexConn) -> bool { - !read_attiny_recording_flag(conn) -} - -fn read_attiny_firmware_version(conn: &mut DuplexConn) -> Result { - dbus_read_attiny_command(conn, 0x01) -} - -fn set_attiny_tc2_agent_test_audio_rec(conn: &mut DuplexConn) -> Result { - let state = read_tc2_agent_state(conn); - if let Ok(state) = state { - if (state & tc2_agent_state::RECORDING) == tc2_agent_state::RECORDING { - Err("Already recording so not doing test rec") - } else { - let res = dbus_write_attiny_command(conn, 0x07, state | tc2_agent_state::TEST_AUDIO_RECORDING); - if res.is_ok() { - Ok(state | tc2_agent_state::TEST_AUDIO_RECORDING) - } else { - Err(res.unwrap_err()) - } - } - } else { - Err("Failed reading ready state from attiny") - } -} - -fn set_attiny_tc2_agent_ready(conn: &mut DuplexConn) -> Result<(), &'static str> { - let state = read_tc2_agent_state(conn); - if let Ok(state) = state { - dbus_write_attiny_command(conn, 0x07, state | tc2_agent_state::READY).map(|_| ()) - } else { - Err("Failed reading ready state from attiny") - } -} - -use rustbus::connection::dispatch_conn::HandleEnvironment; -use rustbus::connection::dispatch_conn::HandleResult; -use rustbus::connection::dispatch_conn::Matches; -use rustbus::message_builder::MarshalledMessage; - -// TC2-Agent dbus audio service -type MyHandleEnv<'a, 'b> = HandleEnvironment<&'b mut AgentService, ()>; - -fn default_handler(_c: &mut &mut AgentService, _matches: Matches, _msg: &MarshalledMessage, _env: &mut MyHandleEnv) -> HandleResult<()> { - Ok(None) -} - -fn audio_handler(_c: &mut &mut AgentService, _matches: Matches, msg: &MarshalledMessage, _env: &mut MyHandleEnv) -> HandleResult<()> { - if msg.dynheader.member.as_ref().unwrap() == "testaudio" { - let message; - - if RP2040_STATE.load(Ordering::Relaxed) & tc2_agent_state::TEST_AUDIO_RECORDING == 0 { - TAKE_TEST_AUDIO.store(true, Ordering::Relaxed); - message = "Asked for a test recording"; - } else { - message = "Already making a test recording"; - } - - let mut resp = msg.dynheader.make_response(); - resp.body.push_param(message).unwrap(); - return Ok(Some(resp)); - } else if msg.dynheader.member.as_ref().unwrap() == "audiostatus" { - let status; - - let mut resp = msg.dynheader.make_response(); - - let state = RP2040_STATE.load(Ordering::Relaxed); - - if state & (tc2_agent_state::TEST_AUDIO_RECORDING | tc2_agent_state::RECORDING) == (tc2_agent_state::TEST_AUDIO_RECORDING | tc2_agent_state::RECORDING) { - status = AudioStatus::TakingTestRecoding; - } else if state & tc2_agent_state::TEST_AUDIO_RECORDING == tc2_agent_state::TEST_AUDIO_RECORDING { - status = AudioStatus::WaitingToRecord; - } else if state & tc2_agent_state::RECORDING == tc2_agent_state::RECORDING { - status = AudioStatus::Recording; - } else if TAKE_TEST_AUDIO.load(Ordering::Relaxed) { - status = AudioStatus::WaitingToRecord; - } else { - status = AudioStatus::Ready; - } - resp.body.push_param(RP2040_MODE.load(Ordering::Relaxed)).unwrap(); - resp.body.push_param(status as u8).unwrap(); - Ok(Some(resp)) - } else { - Ok(None) - } -} -pub enum AudioStatus { - Ready = 1, - WaitingToRecord = 2, - TakingTestRecoding = 3, - Recording = 4, -} - -use lazy_static::lazy_static; - -lazy_static! { - static ref TAKE_TEST_AUDIO: Arc = Arc::new(AtomicBool::new(false)); -} -lazy_static! { - static ref RP2040_STATE: Arc = Arc::new(AtomicU8::new(2)); -} - -lazy_static! { - static ref RP2040_MODE: Arc = Arc::new(AtomicU8::new(0)); -} const VERSION: &str = env!("CARGO_PKG_VERSION"); fn main() { let log_config = ConfigBuilder::default().set_time_level(LevelFilter::Off).build(); - TermLogger::init(LevelFilter::Info, log_config, TerminalMode::Mixed, ColorChoice::Auto).unwrap(); + TermLogger::init(LevelFilter::Info, log_config, TerminalMode::Mixed, ColorChoice::Auto) + .unwrap(); + + println!( + "\n=========\nStarting thermal camera 2 agent {}, run with --help to see options.\n", + VERSION + ); + check_if_rp2040_needs_programming(); + + let (serve_frames_via_wifi, spi_speed_mhz) = { + let startup_mode_config: ModeConfig = argh::from_env(); + let serve_frames_via_wifi = startup_mode_config.use_wifi; + let spi_speed = startup_mode_config.spi_speed_mhz; + (serve_frames_via_wifi, spi_speed) + }; - println!("\n=========\nStarting thermal camera 2 agent {}, run with --help to see options.\n", VERSION); - let config: ModeConfig = argh::from_env(); let device_config = DeviceConfig::load_from_fs(); if device_config.is_err() { error!("Load config error: {}", device_config.err().unwrap()); @@ -444,887 +95,86 @@ fn main() { process::exit(1); }); - //set up dbus service - let _ = thread::Builder::new().name("dbus-service".to_string()).spawn_with_priority(ThreadPriority::Max, move |_| { - let mut dbus_conn = DuplexConn::connect_to_bus(session_path, false).unwrap_or_else(|e| { - error!("Error connecting to system DBus: {}", e); - std::process::exit(1); - }); - let _name = dbus_conn.send_hello(rustbus::connection::Timeout::Infinite).unwrap_or_else(|e| { - error!("Error getting handshake with system DBus: {}", e); - process::exit(1); - }); - dbus_conn - .send - .send_message(&mut rustbus::standard_messages::request_name("org.cacophony.TC2Agent".into(), rustbus::standard_messages::DBUS_NAME_FLAG_REPLACE_EXISTING)) - .unwrap() - .write_all() - .unwrap(); + let mut recording_state = RecordingState::new(); + let _dbus_audio_thread = setup_dbus_test_audio_recording_service(&recording_state); - let mut ctx: AgentService = AgentService {}; - - let dh = Box::new(default_handler); - - let mut dpcon = DispatchConn::new(dbus_conn, &mut ctx, dh); - let th = Box::new(audio_handler); - - dpcon.add_handler("/org/cacophony/TC2Agent", th); - dpcon.run().unwrap(); - }); - - // Check if the file indicating that the RP2040 needs to be programmed. - // This is used to save time when setting up cameras so it will program the RP2040 instead of trying to connect first. - let program_rp2040_file = Path::new("/etc/cacophony/program_rp2040"); - if program_rp2040_file.exists() { - println!("Program RP2040 because /etc/cacophony/program_rp2040 exists"); - let e = program_rp2040(); - if e.is_err() { - warn!("Failed to reprogram RP2040: {}", e.unwrap_err()); - process::exit(1); - } - fs::remove_file(program_rp2040_file).unwrap(); - process::exit(0); - } - - let mut current_config = device_config.unwrap(); + let current_config = device_config.unwrap(); let initial_config = current_config.clone(); - let (config_tx, config_rx) = channel(); - let mut watcher = notify::recommended_watcher(move |res| match res { - Ok(Event { kind, .. }) => { - match kind { - EventKind::Access(AccessKind::Close(AccessMode::Write)) => { - // File got written to - // Send event to - match DeviceConfig::load_from_fs() { - Ok(config) => { - // Send to rp2040 to write via a channel somehow. Maybe we have to restart the rp2040 here so that it re-handshakes and - // picks up the new info - if config != current_config { - current_config = config; - warn!("Config updated"); - let _ = config_tx.send(current_config.clone()); - } - } - Err(msg) => { - error!("Load config error: {}", msg); - process::exit(1); - } - } - } - _ => {} - } - } - Err(e) => error!("file watch error for /etc/cacophony/config.toml: {:?}", e), - }) - .unwrap(); - // Add a path to be watched. All files and directories at that path and - // below will be monitored for changes. - watcher.watch(Path::new("/etc/cacophony/config.toml"), RecursiveMode::NonRecursive).unwrap(); + let (device_config_change_channel_tx, device_config_change_channel_rx) = channel(); + watch_local_config_file_changes(current_config, &device_config_change_channel_tx); - let term = Arc::new(AtomicBool::new(false)); - signal_hook::flag::register(signal_hook::consts::SIGTERM, Arc::clone(&term)).unwrap(); - signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(&term)).unwrap(); + // NOTE: This handles gracefully exiting the process if ctrl-c etc is pressed + // while running in an interactive terminal. + let sig_term_state = Arc::new(AtomicBool::new(false)); + signal_hook::flag::register(signal_hook::consts::SIGTERM, sig_term_state.clone()).unwrap(); + signal_hook::flag::register(signal_hook::consts::SIGINT, sig_term_state.clone()).unwrap(); + exit_if_attiny_version_is_not_as_expected(&mut dbus_conn); // We want real-time priority for all the work we do. let handle = thread::Builder::new() .name("frame-acquire".to_string()) .spawn_with_priority(ThreadPriority::Max, move |result| { - assert!(result.is_ok(), "Thread must have permissions to run with realtime priority, run as root user"); - - // 65K buffer that we won't fully use at the moment. - let mut raw_read_buffer = [0u8; 65535]; - - //let spi_speed = 30_000_000; // rPi4 can handle this in PIO mode - let spi_speed = config.spi_speed * 1_000_000; - // rPi3 can handle 12Mhz (@600Mhz), may need to back it off a little to have some slack. - info!("Initialising SPI at {}Mhz", config.spi_speed); - let mut spi = Spi::new(Bus::Spi0, SlaveSelect::Ss0, spi_speed, Mode::Mode3).unwrap(); - spi.set_bits_per_word(8).unwrap(); - spi.set_bit_order(BitOrder::MsbFirst).unwrap(); - spi.set_ss_polarity(Polarity::ActiveLow).unwrap(); - - let address = get_socket_address(&config); - let management_address = "/var/spool/managementd".to_string(); - - // For some reason when running periph.io host.ini function, needed to use the I2C in the attiny-controller, - // it 'holds/exports' some of the GPIO pins, so we manually 'release/unexport' them with the following. + assert!( + result.is_ok(), + "Thread must have permissions to run with realtime priority, run as root user" + ); + + // For some reason when running periph.io host.ini function, + // needed to use the I2C in the attiny-controller, it 'holds/exports' some of the + // GPIO pins, so we manually 'release/unexport' them with the following. let gpio_number = "7"; let _ = fs::write("/sys/class/gpio/unexport", gpio_number); let gpio = Gpio::new().unwrap(); - let mut pin = gpio.get(7).expect("Failed to get pi ping interrupt pin, is 'dtoverlay=spi0-1cs,cs0_pin=8' set in your config.txt?").into_input(); let mut run_pin = gpio.get(23).unwrap().into_output_high(); if !run_pin.is_set_high() { info!("Setting run pin high to enable rp2040"); run_pin.set_high(); sleep(Duration::from_millis(1000)); } - pin.clear_interrupt().expect("Unable to clear pi ping interrupt pin"); - pin.set_interrupt(Trigger::RisingEdge).expect("Unable to set pi ping interrupt"); - - let (tx, rx) = channel(); - let (restart_tx, restart_rx) = channel(); - - // Used to indicate that a reset request was received and processed by the frame-socket thread. - let cross_thread_signal = Arc::new(AtomicBool::new(false)); - let cross_thread_signal_2 = cross_thread_signal.clone(); - let _ = thread::Builder::new().name("frame-socket".to_string()).spawn_with_priority(ThreadPriority::Max, move |result| { - // Spawn a thread which can output the frames, converted to rgb grayscale - // This is printed out from within the spawned thread. - assert!(result.is_ok(), "Thread must have permissions to run with realtime priority, run as root user"); - - let mut reconnects = 0; - let mut prev_frame_num = None; - let mut sockets: [(String, bool, Option); 2] = [(address, config.use_wifi, None), (management_address, false, None)]; - loop { - if let Ok(_) = restart_rx.try_recv() { - cross_thread_signal_2.store(true, Ordering::Relaxed); - info!("Restarting rp2040"); - if !run_pin.is_set_high() { - run_pin.set_high(); - sleep(Duration::from_millis(1000)); - } - - run_pin.set_low(); - sleep(Duration::from_millis(1000)); - run_pin.set_high(); - } - let mut recv_audio_mode = false; - let mut recv_timeout_ms = 10; - info!("Connecting to frame sockets"); - let mut ms_elapsed = 0; - loop { - // Check if we need to reset rp2040 because of a config change - if let Ok(_) = restart_rx.try_recv() { - cross_thread_signal_2.store(true, Ordering::Relaxed); - info!("Restarting rp2040"); - if !run_pin.is_set_high() { - run_pin.set_high(); - sleep(Duration::from_millis(1000)); - } - - run_pin.set_low(); - sleep(Duration::from_millis(1000)); - run_pin.set_high(); - } - if !recv_audio_mode { - for (address, use_wifi, stream) in sockets.iter_mut().filter(|(_, _, stream)| stream.is_none()) { - let stream_connection: Option = SocketStream::from_address(&address, *use_wifi).ok(); - if stream_connection.is_some() { - println!("Connected to {}", &address); - } - *stream = stream_connection; - } - - let connections = sockets.iter().filter(|(_, _, stream)| stream.is_some()).count(); - - if connections == 0 { - sleep(Duration::from_millis(1000)); - continue; - } - } - - let result = rx.recv_timeout(Duration::from_millis(recv_timeout_ms)); - match result { - Ok((Some((radiometry_enabled, is_recording, firmware_version, camera_serial)), None, None)) => { - let model = if radiometry_enabled { "lepton3.5" } else { "lepton3" }; - let header = format!("ResX: 160\nResX: 160\nResY: 120\nFrameSize: 39040\nModel: {}\nBrand: flir\nFPS: 9\nFirmware: DOC-AI-v0.{}\nCameraSerial: {}\n\n", model, firmware_version, camera_serial); - for (_, _, stream) in sockets.iter_mut().filter(|(_, use_wifi, stream)| stream.is_some() && !use_wifi && !stream.as_ref().unwrap().sent_header) { - let stream = stream.as_mut().unwrap(); - if let Err(_) = stream.write_all(header.as_bytes()) { - warn!("Failed sending header info"); - } - - // Clear existing - if let Err(_) = stream.write_all(b"clear") { - warn!("Failed clearing buffer"); - } - let _ = stream.flush(); - stream.sent_header = true; - } - - if reconnects > 0 { - info!("Got frame connection"); - prev_frame_num = None; - reconnects = 0; - } - let s = Instant::now(); - let mut telemetry: Option = None; - let frame_data = get_frame(is_recording); - if let Some(fb) = frame_data { - telemetry = Some(read_telemetry(&fb)); - for (address, use_wifi, stream) in sockets.iter_mut().filter(|(_, _, stream)| stream.is_some()) { - let sent = send_frame(fb, stream.as_mut().unwrap()); - if !sent { - warn!("Send to {} failed", if *use_wifi { "tc2-frames server" } else { address }); - let _ = stream.take().unwrap().shutdown().is_ok(); - } - } - } - let e = s.elapsed().as_secs_f32(); - if e > 0.1 { - info!("socket send took {}s", e); - } - if let Some(telemetry) = telemetry { - if let Some(prev_frame_num) = prev_frame_num { - if !telemetry.ffc_in_progress { - if telemetry.frame_num != prev_frame_num + 1 { - // NOTE: Frames can be missed when the raspberry pi blocks the thread with the unix socket in `thermal-recorder`. - // println!("===="); - // println!("Missed {} frames after {}s on", telemetry.frame_num - (prev_frame_num + 1), telemetry.msec_on as f32 / 1000.0); - } - } - } - prev_frame_num = Some(telemetry.frame_num); - if telemetry.frame_num % 2700 == 0 { - info!("Got frame #{}", telemetry.frame_num); - } - } - ms_elapsed = 0; - } - Ok((None, Some(_transfer_in_progress), Some(_connect_audio_mode))) => { - ms_elapsed = 0; - recv_audio_mode = _connect_audio_mode; - if recv_audio_mode { - recv_timeout_ms = 1000; - for (address, use_wifi, stream) in sockets.iter_mut().filter(|(_, _, stream)| stream.is_some()) { - info!("Shutting down socket {}", if *use_wifi { "tc2-frames server" } else { address }); - let _ = stream.take().unwrap().shutdown().is_ok(); - } - } else { - recv_timeout_ms = 10; - } - } - Ok((None, Some(_transfer_in_progress), None)) => { - ms_elapsed = 0; - } - _ => { - if recv_audio_mode { - continue; - } - const NUM_ATTEMPTS_BEFORE_REPROGRAM: usize = 20; - ms_elapsed += recv_timeout_ms; - if ms_elapsed > 10000 { - ms_elapsed = 0; - reconnects += 1; - if reconnects == NUM_ATTEMPTS_BEFORE_REPROGRAM { - let e = program_rp2040(); - if e.is_err() { - warn!("Failed to reprogram RP2040: {}", e.unwrap_err()); - process::exit(1); - } - process::exit(0); - } else { - info!("-- #{reconnects} waiting to connect to rp2040 (reprogram RP2040 after {} more attempts)", NUM_ATTEMPTS_BEFORE_REPROGRAM - reconnects); - } - } - } - } - } - } - }); - - info!("Waiting to for messages from rp2040"); - // Poke register 0x07 of the attiny letting the rp2040 know that we're ready: - //let mut dbus_conn = Some(dbus_conn); - - match set_attiny_tc2_agent_ready(&mut dbus_conn) { - Ok(_) => {} - Err(msg) => { - error!("{}", msg); - process::exit(1); - } - } - let version = read_attiny_firmware_version(&mut dbus_conn); - match version { - Ok(version) => match version { - EXPECTED_ATTINY_FIRMWARE_VERSION => {} - _ => { - error!("Mismatched attiny firmware version, expected {}, got {}", EXPECTED_ATTINY_FIRMWARE_VERSION, version); - exit_cleanly(&mut dbus_conn); - process::exit(1); - } - }, - Err(msg) => { - error!("{}", msg); - exit_cleanly(&mut dbus_conn); - process::exit(1); - } - } - if !initial_config.use_low_power_mode() || safe_to_restart_rp2040(&mut dbus_conn) { + let (camera_handshake_channel_tx, camera_handshake_channel_rx) = channel(); + let (restart_rp2040_channel_tx, restart_rp2040_channel_rx) = channel(); + + // Used to indicate that a reset request was received and processed by the + // frame-socket thread. + let restart_rp2040_ack = Arc::new(AtomicBool::new(false)); + + // The frame socket server takes frames from the main camera transfer loop, + // and serves them to various consumers of frames. + // For mostly historical reasons, it's also the thread that handles actually restarting + // the rp2040 – but it would perhaps be cleaner to handle this in a separate thread? + spawn_frame_socket_server_thread( + restart_rp2040_channel_rx, + camera_handshake_channel_rx, + serve_frames_via_wifi, + run_pin, + restart_rp2040_ack.clone(), + &recording_state, + ); + recording_state.set_ready(&mut dbus_conn); + if !initial_config.use_low_power_mode() || !recording_state.is_recording() { // NOTE: Always reset rp2040 on startup if it's safe to do so. - let _ = restart_tx.send(true); - } - - let mut got_first_frame = false; - let mut file_download: Option> = None; - let header_length = 18; - let mut return_payload_buf = [0u8; 32 + 104]; - - // This sequence is used to synchronise the return payload start on the rp2040, since - // it seems to have a fair bit of slop/offsetting. - return_payload_buf[0..4].copy_from_slice(&[1, 2, 3, 4]); - let mut part_count = 0; - let mut start = Instant::now(); - let crc_check = Crc::::new(&CRC_16_XMODEM); - let max_size: usize = raw_read_buffer.len(); - let mut device_config: DeviceConfig = initial_config; - let mut rp2040_needs_reset = false; - let mut sent_reset_request = false; - let mut has_failed = false; - let mut got_startup_info = false; - let mut radiometry_enabled = false; - let mut firmware_version = 0; - let mut lepton_serial_number = String::from(""); - let mut taking_test_recording = false; - let mut test_audio_state_thread: Option> = None; - let mut is_audio = device_config.is_audio_device(); - let mut is_recording = false; - 'transfer: loop { - // Check once per frame to see if the config file may have been changed - let updated_config = config_rx.try_recv(); - match updated_config { - Ok(config) => { - // NOTE: Defer this till a time we know the rp2040 isn't writing to flash memory. - if device_config != config { - info!("Config updated, should update rp2040"); - device_config = config; - if !is_audio { - //will update after reset request if in audio mode - is_audio = device_config.is_audio_device(); - } - } - rp2040_needs_reset = true; - } - Err(kind) => match kind { - TryRecvError::Empty => {} - TryRecvError::Disconnected => { - warn!("Disconnected from config file watcher channel"); - } - }, - } - if is_audio { - if taking_test_recording { - if test_audio_state_thread.is_none() || (test_audio_state_thread.is_some() && test_audio_state_thread.as_mut().unwrap().is_finished()) { - taking_test_recording = false; - test_audio_state_thread = None; - is_recording = false; - } - } else if TAKE_TEST_AUDIO.load(Ordering::Relaxed) { - //if already recording dont take test rec - is_recording = read_attiny_recording_flag(&mut dbus_conn); - if is_recording { - RP2040_STATE.store(tc2_agent_state::RECORDING, Ordering::Relaxed); - } else if let Ok(state) = set_attiny_tc2_agent_test_audio_rec(&mut dbus_conn) { - if safe_to_restart_rp2040(&mut dbus_conn) { - let _ = restart_tx.send(true); - taking_test_recording = true; - info!("Telling rp2040 to take test recording and restarting"); - } - RP2040_STATE.store(state, Ordering::Relaxed); - } - if is_recording || taking_test_recording { - test_audio_state_thread = Some(thread::spawn(move || { - let thread_dbus = DuplexConn::connect_to_bus(session_path, true); - if thread_dbus.is_err() { - error!("Error connecting to system DBus: {}", thread_dbus.err().unwrap()); - } else { - let mut thread_dbus = thread_dbus.unwrap(); - let _unique_name = thread_dbus.send_hello(Timeout::Infinite); - if _unique_name.is_err() { - error!("Error getting handshake with system DBus: {}", _unique_name.err().unwrap()); - } else { - //be a bit lazy if doing a recording - let sleep_duration = if is_recording { 2000 } else { 1000 }; - loop { - if let Ok(state) = read_tc2_agent_state(&mut thread_dbus) { - RP2040_STATE.store(state, Ordering::Relaxed); - if state & (tc2_agent_state::RECORDING | tc2_agent_state::TEST_AUDIO_RECORDING) == 0 { - break; - } - } else { - warn!("error reading tc2 agent state"); - } - - sleep(Duration::from_millis(sleep_duration)); - } - } - } - })); - } - TAKE_TEST_AUDIO.store(false, Ordering::Relaxed); - } - } - if !is_recording && rp2040_needs_reset { - let date = chrono::Local::now(); - - error!("4) Requesting reset of rp2040 due to config change, {}", date.format("%Y-%m-%d--%H:%M:%S")); - rp2040_needs_reset = false; - got_startup_info = false; - is_audio = device_config.is_audio_device(); - - if !sent_reset_request { - sent_reset_request = true; - let _ = restart_tx.send(true); - } - - if cross_thread_signal.load(Ordering::Relaxed) { - sent_reset_request = false; - cross_thread_signal.store(false, Ordering::Relaxed); - } - } - let poll_result = pin.poll_interrupt(true, Some(Duration::from_millis(2000))); - if let Ok(_pin_level) = poll_result { - if _pin_level.is_some() { - { - drop(pin); - let output_pin = gpio.get(7).expect("Failed to get pi ping interrupt pin, is 'dtoverlay=spi0-1cs,cs0_pin=8' set in your config.txt?").into_output_low(); - drop(output_pin); - pin = gpio.get(7).expect("Failed to get pi ping interrupt pin, is 'dtoverlay=spi0-1cs,cs0_pin=8' set in your config.txt?").into_input(); - pin.clear_interrupt().expect("Unable to clear pi ping interrupt pin"); - pin.set_interrupt(Trigger::RisingEdge).expect("Unable to set pi ping interrupt"); - } - - spi.read(&mut raw_read_buffer[..2066]).unwrap(); - { - let header_slice = &raw_read_buffer[..header_length]; - let transfer_type = header_slice[0]; - let transfer_type_dup = header_slice[1]; - let mut num_bytes = LittleEndian::read_u32(&header_slice[2..6]) as usize; - let num_bytes_dup = LittleEndian::read_u32(&header_slice[6..10]) as usize; - num_bytes = num_bytes.min(max_size); - let crc_from_remote = LittleEndian::read_u16(&header_slice[10..12]); - let crc_from_remote_dup = LittleEndian::read_u16(&header_slice[12..14]); - let crc_from_remote_inv = LittleEndian::read_u16(&header_slice[14..16]); - let crc_from_remote_inv_dup = LittleEndian::read_u16(&header_slice[16..=17]); - - let num_bytes_check = num_bytes == num_bytes_dup; - let header_crc_check = crc_from_remote == crc_from_remote_dup && crc_from_remote_inv_dup == crc_from_remote_inv && crc_from_remote_inv.not() == crc_from_remote; - let transfer_type_check = transfer_type == transfer_type_dup; - if !num_bytes_check || !header_crc_check || !transfer_type_check { - if !has_failed { - has_failed = true; - warn!("Header integrity check failed {:?}", &header_slice[..]); - // We still need to make sure we read out all the bytes? - } - - // Well, this is super tricky if this is a raw frame transfer that got garbled. - // Do we care about this case? In the case where the pi wants frames, it should just restart - // the rp2040 on startup. - LittleEndian::write_u16(&mut return_payload_buf[4..6], 0); - LittleEndian::write_u16(&mut return_payload_buf[6..8], 0); - spi.write(&return_payload_buf).unwrap(); - if process_interrupted(&term, &mut dbus_conn) { - break 'transfer; - } - continue 'transfer; - } - if num_bytes == 0 { - // warn!("zero-sized payload"); - LittleEndian::write_u16(&mut return_payload_buf[4..6], 0); - LittleEndian::write_u16(&mut return_payload_buf[6..8], 0); - spi.write(&return_payload_buf).unwrap(); - if process_interrupted(&term, &mut dbus_conn) { - break 'transfer; - } - continue 'transfer; - } - if transfer_type < CAMERA_CONNECT_INFO || transfer_type > CAMERA_SEND_LOGGER_EVENT { - warn!("unknown transfer type {}", transfer_type); - LittleEndian::write_u16(&mut return_payload_buf[4..6], 0); - LittleEndian::write_u16(&mut return_payload_buf[6..8], 0); - spi.write(&return_payload_buf).unwrap(); - if process_interrupted(&term, &mut dbus_conn) { - break 'transfer; - } - continue 'transfer; - } - - if transfer_type != CAMERA_RAW_FRAME_TRANSFER { - if transfer_type == CAMERA_BEGIN_FILE_TRANSFER { - start = Instant::now(); - } - let chunk = &raw_read_buffer[header_length..header_length + num_bytes]; - // Write back the crc we calculated. - let crc = crc_check.checksum(chunk); - LittleEndian::write_u16(&mut return_payload_buf[4..6], crc); - LittleEndian::write_u16(&mut return_payload_buf[6..8], crc); - - if transfer_type == CAMERA_CONNECT_INFO { - info!("Got camera connect info {:?}", chunk); - // Write all the info we need about the device: - device_config.write_to_slice(&mut return_payload_buf[8..]); - info!("Sending camera device config to rp2040"); - } else if transfer_type == CAMERA_GET_MOTION_DETECTION_MASK { - let piece_number = &chunk[0]; - //info!("Got request for mask piece number {}", piece_number); - let piece = device_config.mask_piece(*piece_number as usize); - - let mut piece_with_length = [0u8; 101]; - piece_with_length[0] = piece.len() as u8; - piece_with_length[1..1 + piece.len()].copy_from_slice(piece); - let crc = crc_check.checksum(&piece_with_length[0..1 + piece.len()]); - LittleEndian::write_u16(&mut return_payload_buf[8..10], crc); - //info!("Sending piece({}) {:?}", piece.len(), piece_with_length); - return_payload_buf[10..10 + piece.len() + 1].copy_from_slice(&piece_with_length); - } - // Always write the return buffer - spi.write(&return_payload_buf).unwrap(); - if crc == crc_from_remote { - match transfer_type { - CAMERA_CONNECT_INFO => { - radiometry_enabled = LittleEndian::read_u32(&chunk[0..4]) == 2; - firmware_version = LittleEndian::read_u32(&chunk[4..8]); - lepton_serial_number = format!("{}", LittleEndian::read_u32(&chunk[8..12])); - got_startup_info = true; - if firmware_version != EXPECTED_RP2040_FIRMWARE_VERSION { - exit_cleanly(&mut dbus_conn); - info!("Unsupported firmware version, expected {}, got {}. Will reprogram RP2040.", EXPECTED_RP2040_FIRMWARE_VERSION, firmware_version); - let e = program_rp2040(); - if e.is_err() { - warn!("Failed to reprogram rp2040: {}", e.unwrap_err()); - panic!("Exit"); - } - process::exit(0); - } - - let audio_mode = LittleEndian::read_u32(&chunk[12..16]) > 0; - RP2040_MODE.store(if audio_mode { 1 } else { 0 }, Ordering::Relaxed); - info!("Got startup info: radiometry enabled: {}, firmware version: {}, lepton serial #{} audio mode {}", radiometry_enabled, firmware_version, lepton_serial_number, audio_mode); - - if device_config.use_low_power_mode() && !radiometry_enabled && !device_config.is_audio_device() { - exit_cleanly(&mut dbus_conn); - error!("Low power mode is currently only supported on lepton sensors with radiometry or audio device, exiting."); - panic!("Exit"); - } - // Terminate any existing file download. - let in_progress_file_transfer = file_download.take(); - if let Some(file) = in_progress_file_transfer { - warn!("Aborting in progress file transfer with {} bytes", file.len()); - } - let _ = tx.send((None, Some(false), Some(audio_mode))); - } - CAMERA_SEND_LOGGER_EVENT => { - let event_kind = LittleEndian::read_u16(&chunk[0..2]); - let mut event_timestamp = LittleEndian::read_u64(&chunk[2..2 + 8]); - let event_payload = LittleEndian::read_u64(&chunk[10..18]); - if let Ok(mut event_kind) = LoggerEventKind::try_from(event_kind) { - if let Some(mut time) = NaiveDateTime::from_timestamp_micros(event_timestamp as i64) { - if let LoggerEventKind::SetAlarm(alarm_time) = &mut event_kind { - if NaiveDateTime::from_timestamp_micros(event_payload as i64).is_some() { - *alarm_time = event_payload; - } else { - warn!("Wakeup alarm from event was invalid {}", event_payload); - } - } else if let LoggerEventKind::Rp2040MissedAudioAlarm(alarm_time) = &mut event_kind { - if NaiveDateTime::from_timestamp_micros(event_payload as i64).is_some() { - *alarm_time = event_payload; - } else { - warn!("Missed alarm from event was invalid {}", event_payload); - } - } else if let LoggerEventKind::ToldRpiToWake(reason) = &mut event_kind { - if let Ok(wake_reason) = WakeReason::try_from(event_payload as u8) { - *reason = wake_reason; - } else { - warn!("Told rpi to wake invalid reason {}", event_payload); - } - } else if let LoggerEventKind::RtcCommError = &mut event_kind { - if event_timestamp == 0 { - time = chrono::Local::now().naive_local(); - event_timestamp = time.timestamp_micros() as u64; - } - } - let payload_json = if let LoggerEventKind::SavedNewConfig = event_kind { - // If we get saved new config, the rp2040 would have just been - // restarted after the config change, so we can log the current - // config in relation to that event. - let json_inner = format!( - r#""continuous-recorder": {}, "use-low-power-mode": {}, "start-recording": "{:?}", "stop-recording": "{:?}", "location": "({}, {}, {})""#, - device_config.is_continuous_recorder(), - device_config.use_low_power_mode(), - device_config.recording_window().0, - device_config.recording_window().1, - device_config.lat_lng().0, - device_config.lat_lng().1, - device_config.location_altitude().unwrap_or(0.0) - ); - let json = String::from(format!("{{{}}}", json_inner)); - Some(json) - } else { - None - }; - info!("Got logger event {:?} at {}", event_kind, time); - let event = LoggerEvent::new(event_kind, event_timestamp); - event.log(&mut dbus_conn, payload_json); - } else { - warn!("Event had invalid timestamp {}", event_timestamp); - } - } else { - warn!("Unknown logger event kind {}", event_kind); - } - } - CAMERA_BEGIN_FILE_TRANSFER => { - if file_download.is_some() { - warn!("Trying to begin file without ending current"); - } - info!("Begin file transfer"); - // Open new file transfer - - part_count += 1; - // If we have to grow this Vec once it gets big it can be slow and interrupt the transfer. - // TODO: Should really be able to recover from that though! - let mut file = Vec::with_capacity(150_000_000); - file.extend_from_slice(&chunk); - file_download = Some(file); - let _ = tx.send((None, Some(true), None)); - } - CAMERA_RESUME_FILE_TRANSFER => { - if let Some(file) = &mut file_download { - // Continue current file transfer - //println!("Continue file transfer"); - if part_count % 100 == 0 { - let megabytes_per_second = (file.len() + chunk.len()) as f32 / Instant::now().duration_since(start).as_secs_f32() / (1024.0 * 1024.0); - info!("Transferring part #{} {:?} for {} bytes, {}MB/s", part_count, Instant::now().duration_since(start), file.len() + chunk.len(), megabytes_per_second); - } - part_count += 1; - file.extend_from_slice(&chunk); - let _ = tx.send((None, Some(true), None)); - } else { - warn!("Trying to continue file with no open file"); - if !got_startup_info { - if safe_to_restart_rp2040(&mut dbus_conn) { - let date = chrono::Local::now(); - error!("1) Requesting reset of rp2040 to force handshake, {}", date.format("%Y-%m-%d--%H:%M:%S")); - let _ = restart_tx.send(true); - } - } - } - } - CAMERA_END_FILE_TRANSFER => { - // End current file transfer - if !file_download.is_some() { - warn!("Trying to end file with no open file"); - } - if let Some(mut file) = file_download.take() { - // Continue current file transfer - let megabytes_per_second = (file.len() + chunk.len()) as f32 / Instant::now().duration_since(start).as_secs_f32() / (1024.0 * 1024.0); - info!("End file transfer, took {:?} for {} bytes, {}MB/s", Instant::now().duration_since(start), file.len() + chunk.len(), megabytes_per_second); - part_count = 0; - file.extend_from_slice(&chunk); - let shebang = LittleEndian::read_u16(&file[0..2]); - if shebang == AUDIO_SHEBANG { - save_audio_file_to_disk(file, device_config.output_dir()); - } else { - save_cptv_file_to_disk(file, device_config.output_dir()) - } - let _ = tx.send((None, Some(false), None)); - } else { - warn!("Trying to end file with no open file"); - } - } - CAMERA_BEGIN_AND_END_FILE_TRANSFER => { - if file_download.is_some() { - info!("Trying to begin (and end) file without ending current"); - } - // Open and end new file transfer - part_count = 0; - let mut file = Vec::new(); - file.extend_from_slice(&chunk); - let shebang = u8_slice_as_u16_slice(&file[0..2]); - if shebang[0] == AUDIO_SHEBANG { - save_audio_file_to_disk(file, device_config.output_dir()); - } else { - save_cptv_file_to_disk(file, device_config.output_dir()) - } - let _ = tx.send((None, Some(false), None)); - } - CAMERA_GET_MOTION_DETECTION_MASK => { - // Already handled - } - _ => { - if num_bytes != 0 { - warn!("Unhandled transfer type, {:#x}", transfer_type) - } - } - } - } else { - warn!("Crc check failed, remote was notified and will re-transmit"); - } - } else { - spi.read(&mut raw_read_buffer[2066..num_bytes + header_length]).unwrap(); - // Frame - let mut frame = [0u8; FRAME_LENGTH]; - BigEndian::write_u16_into(u8_slice_as_u16_slice(&raw_read_buffer[header_length..header_length + FRAME_LENGTH]), &mut frame); - let back = FRAME_BUFFER.get_back().lock().unwrap(); - back.replace(Some(frame)); - if !got_first_frame { - got_first_frame = true; - info!("Got first frame from rp2040, got startup info {}", got_startup_info); - } - is_recording = crc_from_remote == 1 && !device_config.use_low_power_mode(); - if !got_startup_info { - error!("1) Requesting reset of rp2040 to force handshake"); - rp2040_needs_reset = true; - } else if !rp2040_needs_reset { - FRAME_BUFFER.swap(); - let _ = tx.send((Some((radiometry_enabled, is_recording, firmware_version, lepton_serial_number.clone())), None, None)); - } - } - } - } - } - if process_interrupted(&term, &mut dbus_conn) { - break 'transfer; - } + let _ = restart_rp2040_channel_tx.send(true); } + enter_camera_transfer_loop( + initial_config, + dbus_conn, + spi_speed_mhz, + device_config_change_channel_rx, + restart_rp2040_channel_tx, + sig_term_state, + camera_handshake_channel_tx, + restart_rp2040_ack, + recording_state, + ); info!("Exiting gracefully"); Ok::<(), Error>(()) }) .unwrap(); if let Err(e) = handle.join() { - eprintln!("Thread panicked: {:?}", e); - std::process::exit(1); - } -} - -pub const CRC_AUG_CCITT: Algorithm = Algorithm { - width: 16, - poly: 0x1021, - init: 0x1D0F, - refin: false, - refout: false, - xorout: 0x0000, - check: 0x0000, - residue: 0x0000, -}; - -fn program_rp2040() -> io::Result<()> { - let bytes = std::fs::read("/etc/cacophony/rp2040-firmware.elf").expect("firmware file should exist at /etc/cacophony/rp2040-firmware.elf"); // Vec - let hash = sha256::digest(&bytes); - let expected_hash = EXPECTED_RP2040_FIRMWARE_HASH.trim(); - if hash != expected_hash { - return Err(io::Error::new(io::ErrorKind::Other, format!("rp2040-firmware.elf does not match expected hash. Expected: '{}', Calculated: '{}'", expected_hash, hash))); - } - let status = Command::new("tc2-hat-rp2040").arg("--elf").arg("/etc/cacophony/rp2040-firmware.elf").status()?; - - if !status.success() { - return Err(io::Error::new(io::ErrorKind::Other, "Command execution failed")); - } - - info!("Updated RP2040 firmware."); - Ok(()) -} - -fn dbus_read_attiny_command(conn: &mut DuplexConn, command: u8) -> Result { - dbus_attiny_command(conn, command, None) -} - -fn dbus_write_attiny_command(conn: &mut DuplexConn, command: u8, value: u8) -> Result { - dbus_attiny_command(conn, command, Some(value)) -} - -fn dbus_attiny_command(conn: &mut DuplexConn, command: u8, value: Option) -> Result { - let max_attempts = 10; - let retry_delay = std::time::Duration::from_secs(2); - - for attempt in 1..=max_attempts { - match dbus_attiny_command_attempt(conn, command, value) { - Ok(result) => return Ok(result), - Err(e) => { - if attempt == max_attempts { - return Err("Max attempts reached: failed to execute i2c dbus command"); - } - eprintln!("Attempt {}/{} failed: {}. Retrying in {:?}...", attempt, max_attempts, e, retry_delay); - std::thread::sleep(retry_delay); - } - } - } - Err("Failed to execute dbus command after maximum retries") -} - -fn dbus_attiny_command_attempt(conn: &mut DuplexConn, command: u8, value: Option) -> Result { - let is_read_command = value.is_none(); - let mut payload: Vec = vec![command]; - if let Some(value) = value { - payload.push(value); - } - let crc = Crc::::new(&CRC_AUG_CCITT).checksum(&payload); - payload.push(0); - payload.push(0); - let len = payload.len(); - BigEndian::write_u16(&mut payload[len - 2..], crc); - let mut call = MessageBuilder::new().call("Tx").with_interface("org.cacophony.i2c").on("/org/cacophony/i2c").at("org.cacophony.i2c").build(); - call.body.push_param(0x25u8).unwrap(); - call.body.push_param(payload).unwrap(); - if is_read_command { - call.body.push_param(3i32).unwrap(); // Num bytes to receive including CRC validation code - } else { - call.body.push_param(0i32).unwrap(); // Receive nothing if this was a write. - } - call.body.push_param(1000i32).unwrap(); // Timeout ms - let id = conn.send.send_message(&call).unwrap().write_all().unwrap(); - let mut attempts = 0; - // Now wait for the reply that matches our call id - loop { - if let Ok(message) = conn.recv.get_next_message(Timeout::Duration(Duration::from_millis(10))) { - match message.typ { - MessageType::Reply => { - let reply_id = message.dynheader.response_serial.unwrap(); - if reply_id == id { - // Looks like the first 4 bytes are a u32 with the length of the following bytes - // which can be ignored. - return if is_read_command { - let response = &message.get_buf()[4..][0..3]; - let crc = Crc::::new(&CRC_AUG_CCITT).checksum(&response[0..1]); - let received_crc = BigEndian::read_u16(&response[1..=2]); - if received_crc != crc { - Err("CRC Mismatch") - } else { - Ok(response[0]) - } - } else { - // Check that the written value was actually written correctly. - let set_value = dbus_attiny_command(conn, command, None); - match set_value { - Ok(new_value) => { - if new_value != value.unwrap() { - Err("Failed setting state on attiny") - } else { - Ok(0) - } - } - Err(e) => Err(e), - } - }; - } - } - MessageType::Error => { - let reply_id = message.dynheader.response_serial.unwrap(); - if reply_id == id { - return Err("Dbus error"); - } - } - _ => {} - } - } - attempts += 1; - if attempts == 100 { - return Err("Timed out waiting for response from Dbus service"); - } - } -} -fn exit_cleanly(conn: &mut DuplexConn) { - let _ = dbus_write_attiny_command(conn, 0x07, 0x00); -} - -fn process_interrupted(term: &Arc, conn: &mut DuplexConn) -> bool { - if term.load(Ordering::Relaxed) { - // We got terminated - before we exit, clean up the tc2-agent ready state register - exit_cleanly(conn); - true - } else { - false + error!("Thread panicked: {:?}", e); + process::exit(1); } } diff --git a/src/mode_config.rs b/src/mode_config.rs index c0025d2..fcd79d4 100644 --- a/src/mode_config.rs +++ b/src/mode_config.rs @@ -1,6 +1,6 @@ use argh::FromArgs; -fn default_spi_speed() -> u32 { +fn default_spi_speed_mhz() -> u32 { 12 } @@ -13,6 +13,6 @@ pub struct ModeConfig { pub(crate) use_wifi: bool, /// raspberry pi SPI speed in Mhz, defaults to 12 - #[argh(option, default = "default_spi_speed()")] - pub(crate) spi_speed: u32, + #[argh(option, default = "default_spi_speed_mhz()")] + pub(crate) spi_speed_mhz: u32, } diff --git a/src/program_rp2040.rs b/src/program_rp2040.rs new file mode 100644 index 0000000..38f30bb --- /dev/null +++ b/src/program_rp2040.rs @@ -0,0 +1,60 @@ +use crate::EXPECTED_RP2040_FIRMWARE_HASH; +use log::{error, info}; +use std::path::Path; +use std::process::Command; +use std::{fs, io, process}; + +pub fn program_rp2040() -> io::Result<()> { + let bytes: Vec = fs::read("/etc/cacophony/rp2040-firmware.elf") + .expect("firmware file should exist at /etc/cacophony/rp2040-firmware.elf"); + let hash = sha256::digest(&bytes); + let expected_hash = EXPECTED_RP2040_FIRMWARE_HASH.trim(); + if hash != expected_hash { + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "rp2040-firmware.elf does not match \ + expected hash. Expected: '{}', Calculated: '{}'", + expected_hash, hash + ), + )); + } + let status = Command::new("tc2-hat-rp2040") + .arg("--elf") + .arg("/etc/cacophony/rp2040-firmware.elf") + .status()?; + + if !status.success() { + return Err(io::Error::new(io::ErrorKind::Other, "Command execution failed")); + } + + info!("Updated RP2040 firmware."); + Ok(()) +} + +pub fn check_if_rp2040_needs_programming() { + // Check if the file indicating that the RP2040 needs to be programmed. + // This is used to save time when setting up cameras + // so it will program the RP2040 instead of trying to connect first. + let program_rp2040_file = Path::new("/etc/cacophony/program_rp2040"); + if program_rp2040_file.exists() { + println!("Program RP2040 because /etc/cacophony/program_rp2040 exists"); + match program_rp2040() { + Ok(()) => match fs::remove_file(program_rp2040_file) { + Ok(()) => process::exit(0), + Err(e) => { + error!( + "Failed to remove 'program_rp2040' \ + file after successful reprogram: {}", + e + ); + process::exit(1); + } + }, + Err(e) => { + error!("Failed to reprogram RP2040: {e}"); + process::exit(1); + } + } + } +} diff --git a/src/recording_state.rs b/src/recording_state.rs new file mode 100644 index 0000000..f96231e --- /dev/null +++ b/src/recording_state.rs @@ -0,0 +1,227 @@ +use crate::dbus_attiny_i2c::{dbus_write_attiny_command, read_tc2_agent_state}; +use crate::dbus_audio::AudioStatus; +use log::error; +use rustbus::DuplexConn; +use std::process; +use std::sync::atomic::{AtomicU8, Ordering}; +use std::sync::Arc; +use std::thread::sleep; +use std::time::Duration; + +mod tc2_agent_state { + pub const NOT_READY: u8 = 0b0000_0000; + /// tc2-agent is ready to accept files or recording streams from the rp2040 + pub const READY: u8 = 0b0000_0010; + /// taking an audio or thermal recording, not safe to reboot rp2040 + pub const RECORDING: u8 = 0b0000_0100; + /// Requested test audio recording. Cleared by rp2040 when test audio recording + /// is completed + pub const REQUESTED_TEST_AUDIO_RECORDING: u8 = 0b0000_1000; + + /// FIXME: Unclear what this is for, or if it is really used? + #[allow(unused)] + pub const TAKE_AUDIO: u8 = 0b0001_0000; + + /// FIXME: Not used anywhere in either tc2-agent or tc2-firmware: remove? + #[allow(unused)] + pub const OFFLOAD: u8 = 0b0010_0000; + + #[allow(unused)] + pub const THERMAL_MODE: u8 = 0b0100_0000; +} + +#[repr(u8)] +enum TestRecordingState { + NotRequested = 0, + UserRequested = 1, + Rp2040Requested = 2, +} + +#[repr(u8)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum RecordingMode { + Thermal = 0, + Audio = 1, +} + +#[derive(Clone)] +struct RecordingModeState { + inner: Arc, +} + +impl RecordingModeState { + pub fn new() -> Self { + Self { inner: Arc::new(AtomicU8::new(RecordingMode::Thermal as u8)) } + } + + pub fn is_in_audio_mode(&self) -> bool { + self.inner.load(Ordering::Relaxed) == RecordingMode::Audio as u8 + } + + #[allow(unused)] + pub fn is_in_thermal_mode(&self) -> bool { + self.inner.load(Ordering::Relaxed) == RecordingMode::Thermal as u8 + } + + pub fn set_mode(&mut self, mode: RecordingMode) { + self.inner.store(mode as u8, Ordering::Relaxed); + } +} + +#[derive(Clone)] +pub struct RecordingState { + rp2040_recording_state_inner: Arc, + test_recording_state_inner: Arc, + recording_mode_state: RecordingModeState, +} + +impl RecordingState { + pub fn new() -> Self { + Self { + rp2040_recording_state_inner: Arc::new(AtomicU8::new(tc2_agent_state::NOT_READY)), + test_recording_state_inner: Arc::new(AtomicU8::new(0)), + recording_mode_state: RecordingModeState::new(), + } + } + + pub fn set_mode(&mut self, mode: RecordingMode) { + self.recording_mode_state.set_mode(mode); + } + + pub fn is_recording(&self) -> bool { + self.rp2040_recording_state_inner.load(Ordering::Relaxed) & tc2_agent_state::RECORDING + == tc2_agent_state::RECORDING + } + + pub fn sync_state_from_attiny(&mut self, conn: &mut DuplexConn) -> u8 { + let state = read_tc2_agent_state(conn); + if let Ok(state) = state { + self.set_state(state); + state + } else { + error!("Failed reading ready state from attiny"); + process::exit(1); + } + } + + pub fn set_is_recording(&mut self, is_recording: bool) { + let state = self.rp2040_recording_state_inner.load(Ordering::Relaxed); + let new_state = + if is_recording { tc2_agent_state::RECORDING } else { !tc2_agent_state::RECORDING }; + while !self + .rp2040_recording_state_inner + .compare_exchange(state, state & new_state, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + sleep(Duration::from_micros(1)); + } + } + + pub fn is_in_audio_mode(&self) -> bool { + self.recording_mode_state.is_in_audio_mode() + } + + pub fn recording_mode(&self) -> RecordingMode { + if self.recording_mode_state.is_in_audio_mode() { + RecordingMode::Audio + } else { + RecordingMode::Thermal + } + } + + pub fn is_taking_test_audio_recording(&self) -> bool { + self.get_audio_status() == AudioStatus::TakingTestRecording + } + + #[allow(unused)] + pub fn is_waiting_to_take_test_audio_recording(&self) -> bool { + self.get_audio_status() == AudioStatus::WaitingToTakeTestRecording + } + + pub fn finished_taking_test_recording(&mut self) { + self.test_recording_state_inner + .store(TestRecordingState::NotRequested as u8, Ordering::Relaxed); + let state = self.rp2040_recording_state_inner.load(Ordering::Relaxed); + while !self + .rp2040_recording_state_inner + .compare_exchange( + state, + state + & !(tc2_agent_state::RECORDING + | tc2_agent_state::REQUESTED_TEST_AUDIO_RECORDING), + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_ok() + { + sleep(Duration::from_micros(1)); + } + } + + pub fn get_audio_status(&self) -> AudioStatus { + let state = self.rp2040_recording_state_inner.load(Ordering::Relaxed); + if state & (tc2_agent_state::REQUESTED_TEST_AUDIO_RECORDING | tc2_agent_state::RECORDING) + == (tc2_agent_state::REQUESTED_TEST_AUDIO_RECORDING | tc2_agent_state::RECORDING) + { + AudioStatus::TakingTestRecording + } else if state & tc2_agent_state::REQUESTED_TEST_AUDIO_RECORDING + == tc2_agent_state::REQUESTED_TEST_AUDIO_RECORDING + { + AudioStatus::WaitingToTakeTestRecording + } else if state & tc2_agent_state::RECORDING == tc2_agent_state::RECORDING { + AudioStatus::Recording + } else if self.user_requested_test_audio_recording() { + AudioStatus::WaitingToTakeTestRecording + } else { + AudioStatus::Ready + } + } + + pub(crate) fn set_state(&mut self, new_state: u8) { + self.rp2040_recording_state_inner.store(new_state, Ordering::Relaxed); + } + + pub fn request_test_audio_recording(&mut self) { + self.test_recording_state_inner + .store(TestRecordingState::UserRequested as u8, Ordering::Relaxed); + } + + pub fn user_requested_test_audio_recording(&self) -> bool { + self.test_recording_state_inner.load(Ordering::Relaxed) + == TestRecordingState::UserRequested as u8 + } + + pub fn merge_state_to_attiny(&mut self, state_bits_to_set: u8, conn: &mut DuplexConn) { + let state = self.sync_state_from_attiny(conn); + let new_state = state | state_bits_to_set; + dbus_write_attiny_command(conn, 0x07, new_state) + .map(|_| ()) + .or_else(|msg: &str| -> Result<(), String> { + error!("{}", msg); + process::exit(1); + }) + .ok(); + self.set_state(new_state); + } + + pub fn set_ready(&mut self, conn: &mut DuplexConn) { + self.merge_state_to_attiny(tc2_agent_state::READY, conn); + } + + pub fn safe_to_restart_rp2040(&mut self, conn: &mut DuplexConn) -> bool { + self.sync_state_from_attiny(conn); + !self.is_recording() + } + + pub fn request_test_audio_recording_from_rp2040(&mut self, conn: &mut DuplexConn) -> bool { + self.sync_state_from_attiny(conn); + if self.is_recording() { + false + } else { + self.merge_state_to_attiny(tc2_agent_state::REQUESTED_TEST_AUDIO_RECORDING, conn); + self.test_recording_state_inner + .store(TestRecordingState::Rp2040Requested as u8, Ordering::Relaxed); + true + } + } +} diff --git a/src/save_audio.rs b/src/save_audio.rs new file mode 100644 index 0000000..cdeb6a9 --- /dev/null +++ b/src/save_audio.rs @@ -0,0 +1,156 @@ +use crate::device_config::DeviceConfig; +use byteorder::LittleEndian; +use byteorder::{ByteOrder, WriteBytesExt}; +use chrono::{DateTime, Utc}; +use log::{error, info}; +use std::io::Cursor; +use std::io::Write; +use std::process::{Command, Stdio}; +use std::{fs, thread}; +use thread_priority::{ThreadBuilderExt, ThreadPriority}; + +fn wav_header(audio_length: usize, sample_rate: u32) -> [u8; 44] { + let header_inner = [0u8; 44]; + let bits_per_sample = 16; + let bytes_per_block = bits_per_sample / 8; + let bytes_per_second: u32 = sample_rate * bytes_per_block as u32; + let num_channels = 1; + let format_pcm = 1; + let file_size = (audio_length + (header_inner.len() - 8)) as u32; + + let mut cursor = Cursor::new(header_inner); + + // RIFF header (12 bytes) + cursor.write_all(b"RIFF").unwrap(); + // Overall file size minus 8 bytes + cursor.write_u32::(file_size).unwrap(); + cursor.write_all(b"WAVE").unwrap(); + + // fmt block (24 bytes) + cursor.write_all(b"fmt ").unwrap(); + // Size of format data after this point (minus "fmt " and 16u32, i.e. 8 bytes) + cursor.write_u32::(16).unwrap(); + cursor.write_u16::(format_pcm).unwrap(); + cursor.write_u16::(num_channels).unwrap(); + cursor.write_u32::(sample_rate).unwrap(); + cursor.write_u32::(bytes_per_second).unwrap(); + cursor.write_u16::(bytes_per_block).unwrap(); + cursor.write_u16::(bits_per_sample).unwrap(); + + // Beginning of data block / end of header (8 bytes) + cursor.write_all(b"data").unwrap(); + cursor.write_u32::(audio_length as u32).unwrap(); + cursor.into_inner() +} + +pub fn save_audio_file_to_disk(mut audio_bytes: Vec, device_config: DeviceConfig) { + //let output_dir = String::from(device_config.output_dir()); + let output_dir = String::from("/home/pi/temp"); + let _ = thread::Builder::new().name("audio-transcode".to_string()).spawn_with_priority( + ThreadPriority::Min, + move |_| { + // Reclaim some memory + audio_bytes.shrink_to_fit(); + let timestamp = LittleEndian::read_u64(&audio_bytes[2..10]); + let recording_date_time = DateTime::from_timestamp_millis(timestamp as i64 / 1000) + .unwrap_or(chrono::Local::now().with_timezone(&Utc)) + .with_timezone(&chrono::Local); + info!("Saving AAC file"); + if !fs::exists(&output_dir).unwrap_or(false) { + fs::create_dir(&output_dir) + .expect(&format!("Failed to create AAC output directory {}", output_dir)); + } + + let output_path: String = + format!("{}/{}.aac", output_dir, recording_date_time.format("%Y-%m-%d--%H-%M-%S")); + // If the file already exists, don't re-save it. + if !fs::exists(&output_path).unwrap_or(false) { + let recording_date_time = + format!("recordingDateTime={}", recording_date_time.to_rfc3339()); + let latitude = format!("latitude={}", device_config.lat_lng().0); + let longitude = format!("longitude={}", device_config.lat_lng().1); + let altitude = + format!("locAltitude={}", device_config.location_altitude().unwrap_or(0.0)); + let location_accuracy = + format!("locAccuracy={}", device_config.location_accuracy().unwrap_or(0.0)); + let location_timestamp = + format!("locTimestamp={}", device_config.location_timestamp().unwrap_or(0)); + let device_id = format!("deviceId={}", device_config.device_id()); + let sample_rate = LittleEndian::read_u16(&audio_bytes[10..12]) as u32; + let duration = format!( + "duration={}", + audio_bytes[12..].len() as f32 / sample_rate as f32 / 2.0 + ); + + // Now transcode with ffmpeg – we create an aac stream in an m4a wrapper in order + // to support adding metadata tags. + let mut cmd = Command::new("ffmpeg") + .arg("-i") + .arg("pipe:0") + .arg("-codec:a") + .arg("aac") + .arg("-q:a") + .arg("1.2") // VBR, more appropriate for audio with lots of nothing + // Faster perceptual coder that should give faster + + // better results at the higher bitrates we're using. + .arg("-aac_coder") + .arg("fast") + .arg("-movflags") + .arg("faststart") // Move the metadata to the beginning of file + .arg("-movflags") + .arg("+use_metadata_tags") // Allow custom metadata tags + .arg("-map_metadata") // Keep existing metadata? + .arg("0") + .arg("-metadata") + .arg(recording_date_time) + .arg("-metadata") + .arg(duration) + .arg("-metadata") + .arg(latitude) + .arg("-metadata") + .arg(longitude) + .arg("-metadata") + .arg(device_id) + .arg("-metadata") + .arg(altitude) + .arg("-metadata") + .arg(location_timestamp) + .arg("-metadata") + .arg(location_accuracy) + .arg("-f") + .arg("mp4") + .arg(output_path.clone()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .expect("Failed to spawn ffmpeg process"); + + let mut stdin = cmd.stdin.take().expect("Failed to open stdin"); + thread::spawn(move || { + // Write wav to stdin: + let audio_bytes = &audio_bytes[12..]; + stdin.write_all(&wav_header(audio_bytes.len(), sample_rate)).unwrap(); + stdin.write_all(&audio_bytes).unwrap(); + }); + match cmd.wait() { + Ok(exit_status) => { + if exit_status.success() { + info!("Saved AAC file {}", output_path); + } else { + error!("Failed transcoding {} to AAC", output_path); + } + } + Err(e) => { + error!( + "Failed invoking ffmpeg to transcode {}, reason: {}", + output_path, e + ); + } + } + } else { + error!("File {} already exists, discarding duplicate", output_path); + } + }, + ); +} diff --git a/src/save_cptv.rs b/src/save_cptv.rs new file mode 100644 index 0000000..3cc25c8 --- /dev/null +++ b/src/save_cptv.rs @@ -0,0 +1,82 @@ +use crate::cptv_header::{decode_cptv_header_streaming, CptvHeader}; +use chrono::{DateTime, Utc}; +use flate2::read::GzEncoder; +use flate2::read::MultiGzDecoder; +use flate2::Compression; +use log::{error, info}; +use std::io::prelude::*; +use std::{fs, thread}; +use thread_priority::{ThreadBuilderExt, ThreadPriority}; + +pub fn save_cptv_file_to_disk(cptv_bytes: Vec, output_dir: &str) { + let output_dir = String::from(output_dir); + let _ = thread::Builder::new().name("cptv-save".to_string()).spawn_with_priority( + ThreadPriority::Min, + move |_| match decode_cptv_header_streaming(&cptv_bytes) { + Ok(header) => match header { + CptvHeader::V2(header) => { + info!("Saving CPTV file with header {:?}", header); + let recording_date_time = + DateTime::from_timestamp_millis(header.timestamp as i64 / 1000) + .unwrap_or(chrono::Local::now().with_timezone(&Utc)) + .with_timezone(&chrono::Local); + if !fs::exists(&output_dir).unwrap_or(false) { + fs::create_dir(&output_dir) + .expect(&format!("Failed to create output directory {}", output_dir)); + } + let path = format!( + "{}/{}.cptv", + output_dir, + recording_date_time.format("%Y-%m-%d--%H-%M-%S") + ); + let decoder = MultiGzDecoder::new(&cptv_bytes[..]); + let mut encoder = GzEncoder::new(decoder, Compression::default()); + let mut cptv_bytes = Vec::new(); + encoder.read_to_end(&mut cptv_bytes).unwrap(); + // If the file already exists, don't re-save it. + let is_existing_file = match fs::metadata(&path) { + Ok(metadata) => metadata.len() as usize == cptv_bytes.len(), + Err(_) => false, + }; + if !is_existing_file { + match fs::write(&path, &cptv_bytes) { + Ok(()) => { + info!("Saved CPTV file {}", path); + } + Err(e) => { + error!( + "Failed writing CPTV file to storage at {}, reason: {}", + path, e + ); + } + } + + // NOTE: For debug purposes, we may want to also save the CPTV file locally for inspection. + // let path = format!( + // "{}/{}.cptv", + // "/home/pi", + // recording_date_time.format("%Y-%m-%d--%H-%M-%S") + // ); + // match fs::write(&path, &cptv_bytes) { + // Ok(()) => { + // info!("Saved CPTV file {}", path); + // } + // Err(e) => { + // error!( + // "Failed writing CPTV file to storage at {}, reason: {}", + // path, e + // ); + // } + // } + } else { + error!("File {} already exists, discarding duplicate", path); + } + } + _ => error!("Unsupported CPTV file format, discarding file"), + }, + Err(e) => { + error!("Invalid CPTV file: ({:?}), discarding", e); + } + }, + ); +} diff --git a/src/service.rs b/src/service.rs deleted file mode 100644 index 9611450..0000000 --- a/src/service.rs +++ /dev/null @@ -1 +0,0 @@ -pub struct AgentService; diff --git a/src/socket_stream.rs b/src/socket_stream.rs index 872432b..f2509d4 100644 --- a/src/socket_stream.rs +++ b/src/socket_stream.rs @@ -1,4 +1,3 @@ -use crate::ModeConfig; use log::info; use std::io; use std::io::Write; @@ -23,14 +22,8 @@ impl SocketStream { } else { TcpStream::connect(address).map(|stream| { stream.set_nodelay(true).unwrap(); - stream - .set_write_timeout(Some(Duration::from_millis(1500))) - .unwrap(); - SocketStream { - unix: None, - tcp: Some(stream), - sent_header: false, - } + stream.set_write_timeout(Some(Duration::from_millis(1500))).unwrap(); + SocketStream { unix: None, tcp: Some(stream), sent_header: false } }) } } @@ -66,10 +59,10 @@ impl SocketStream { } } -pub fn get_socket_address(config: &ModeConfig) -> String { +pub fn get_socket_address(serve_frames_via_wifi: bool) -> String { let address = { // Find the socket address - let address = if config.use_wifi { + let address = if serve_frames_via_wifi { // Scan for servers on port 34254. use mdns_sd::{ServiceDaemon, ServiceEvent}; // Create a daemon @@ -98,10 +91,10 @@ pub fn get_socket_address(config: &ModeConfig) -> String { } else { Some("/var/run/lepton-frames".to_string()) }; - if config.use_wifi && address.is_none() { + if serve_frames_via_wifi && address.is_none() { panic!("t2c-frames service not found on local network"); } - let address = if config.use_wifi { + let address = if serve_frames_via_wifi { format!("{}:34254", address.unwrap()) } else { address.unwrap() diff --git a/src/telemetry.rs b/src/telemetry.rs index b7ff5b8..6882f41 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -1,7 +1,8 @@ +use crate::cptv_frame_dispatch::Frame; use crate::utils::u8_slice_as_u16_slice; -use crate::Frame; use byteorder::{BigEndian, ByteOrder, LittleEndian}; +#[allow(unused)] pub struct Telemetry { pub frame_num: u32, pub msec_on: u32, @@ -19,10 +20,5 @@ pub fn read_telemetry(frame: &Frame) -> Telemetry { let status_bits = LittleEndian::read_u32(&buf[6..10]); let ffc_state = (status_bits >> 4) & 0b11; let ffc_in_progress = ffc_state == 0b10; - Telemetry { - frame_num, - msec_on, - ffc_in_progress, - msec_since_last_ffc, - } + Telemetry { frame_num, msec_on, ffc_in_progress, msec_since_last_ffc } }