diff --git a/.gitignore b/.gitignore index ad616ab..418c6e8 100644 --- a/.gitignore +++ b/.gitignore @@ -164,3 +164,9 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ + +#logs +logs/ + +#ignore poetry.lock +poetry.lock diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d0bed6b..d45aa79 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,6 +17,8 @@ repos: hooks: - id: mypy args: [--ignore-missing-imports] + additional_dependencies: + - types-PyYAML - repo: https://github.com/macisamuele/language-formatters-pre-commit-hooks rev: v2.12.0 hooks: diff --git a/README.md b/README.md index 8c8affa..71ccab6 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ Welcome to the ```sailsprep``` repo! This is a Python repo for doing incredible **Caution:**: this package is still under development and may change rapidly over the next few weeks. +This will convert the raw video into BIDS format in a clean fashion. ## Features - A few - Cool @@ -17,18 +18,39 @@ Welcome to the ```sailsprep``` repo! This is a Python repo for doing incredible - These may include a wonderful CLI interface. ## Installation +To manage dependencies, this project uses Poetry. Make sure you've got poetry installed. +On Engaging, you need to first run at the root of the repo : +``` +module load miniforge +pip install poetry +poetry install +``` + +The BIDS-conversion tool of sailsprep requires FFmpeg ≥ 6.0 compiled with the vidstab library. +Because FFmpeg compiled with vidstab is not a Python package, it must be installed separately. +You'll need to run (outside any environment): + +``` +cd ~ +wget https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz +tar -xJf ffmpeg-release-amd64-static.tar.xz +mv ffmpeg-*-static ffmpeg_static +export PATH="$HOME/ffmpeg_static:$PATH" + +``` + Get the newest development version via: ```sh pip install git+https://github.com/sensein/sailsprep.git ``` - ## Quick start -```Python -from sailsprep.app import hello_world -hello_world() -``` +Tools developped in sailsprep +|Tool|Documentation| +|----|--------------| +|BIDS-conversion| [link to documentation](docs/BIDS_convertor.md) + ## Contributing We welcome contributions from the community! Before getting started, please review our [**CONTRIBUTING.md**](https://github.com/sensein/sailsprep/blob/main/CONTRIBUTING.md). diff --git a/configs/config_bids_convertor.yaml b/configs/config_bids_convertor.yaml new file mode 100644 index 0000000..bdeeb3f --- /dev/null +++ b/configs/config_bids_convertor.yaml @@ -0,0 +1,17 @@ +# Video Processing Configuration + +# Input data +annotation_file: /orcd/data/satra/002/datasets/SAILS/data4analysis/Video Rating Data/SAILS_RATINGS_ALL_DEDUPLICATED_NotForFinalAnalyses_2025.10.csv +video_root: /orcd/data/satra/002/datasets/SAILS/Phase_III_Videos/Videos_from_external +asd_status: /orcd/data/satra/002/datasets/SAILS/data4analysis/ASD_Status.xlsx + +# Output data +output_dir: /orcd/scratch/bcs/001/sensein/sails/BIDS_data + +# Video processing parameters +target_resolution: 1280x720 +target_framerate: 30 + +# Derived directory names (optional — can be built dynamically) +final_bids_root: final_bids-dataset +derivatives_subdir: derivatives/preprocessed diff --git a/docs/BIDS_convertor.md b/docs/BIDS_convertor.md new file mode 100644 index 0000000..1eff64a --- /dev/null +++ b/docs/BIDS_convertor.md @@ -0,0 +1,52 @@ +## BIDS Format + +For reproducibility, organization, and practicality, sailsprep converts its raw data into the BIDS (Brain Imaging Data Structure) format. +BIDS is a community-driven standard for organizing, naming, and describing neuroimaging and related data (e.g., EEG, fMRI, MEG, behavioral, physiological data, etc.). + +During the BIDS conversion pipeline, the raw domestic videos are preprocessed to be standardized, denoised, and reformatted. +Relevant metadata and annotations necessary for downstream analysis are also extracted at this stage. + +## Structure + +The final BIDS dataset follows the structure below: +```graphql +├── sub-ID1 # Contains raw videos in BIDS format +│ ├── ses-01 # Videos between 12 and 16 months +│ │ └── beh # Behavioral data +│ │ ├── sub-ID1_ses-01_task-A_run-01_beh.mp4 # Standardized raw video +│ │ ├── sub-ID1_ses-01_task-A_run-01_beh.tsv # Manual annotations +│ │ └── sub-ID1_ses-01_task-A_run-01_beh.json # Info on standardization +│ └── ses-02 # Videos between 34 and 38 months +│ └── beh +├── derivatives +│ └── preprocessed # Contains stabilized, denoised, standardized videos +│ ├── sub-ID1 +│ │ ├── ses-01 +│ │ │ └── beh +│ │ │ ├── sub-ID1_ses-01_task-A_run-01_audio.json # Audio extraction info +│ │ │ ├── sub-ID1_ses-01_task-A_run-01_audio.wav # Extracted audio +│ │ │ ├── sub-ID1_ses-01_task-A_run-01_desc-processed.json # Video preprocessing info +│ │ │ └── sub-ID1_ses-01_task-A_run-01_desc-processed_beh.mp4 # Preprocessed video +│ │ └── ses-02 +│ └── sub-ID2 +├── README.md # Explains dataset structure and content +├── participants.tsv # Participant information (e.g., ASD status) +├── participants.json # Metadata for participants.tsv +└── dataset_description.json # BIDS dataset description (name, version, etc.) +``` +## Execution + +To verify that FFmpeg is correctly installed (cf [README.md](../README.md)) and at least version 6.0, run: + +``` +ffmpeg -version +``` + +You’ll need to submit the conversion job on Engaging using sbatch. +Make sure you are in the root directory of the repository. + +We provide SLURM submission scripts for convenience — simply run the following commands (with the miniforge module deactivated to ensure the correct FFmpeg version is used): +``` +jid=$(sbatch --parsable jobs/run_bids_convertor.sh) +sbatch --dependency=afterok:$jid jobs/merge_cleanup.sh +``` diff --git a/jobs/merge_cleanup.sh b/jobs/merge_cleanup.sh new file mode 100644 index 0000000..4495968 --- /dev/null +++ b/jobs/merge_cleanup.sh @@ -0,0 +1,69 @@ +#!/bin/bash +#SBATCH --job-name=merge_cleanup +#SBATCH --output=logs/merge_cleanup_%j.out +#SBATCH --error=logs/merge_cleanup_%j.err +#SBATCH --time=01:00:00 +#SBATCH --mem=2G + +# Clean up old logs before running +echo "Cleaning up old logs..." +if [ -d logs ]; then + find logs -mindepth 1 ! -name ".gitkeep" \ + ! -name "merge_cleanup_${SLURM_JOB_ID}.out" \ + ! -name "merge_cleanup_${SLURM_JOB_ID}.err" -delete +fi + +OUTPUT_DIR=$(poetry run python -c "import yaml, sys; print(yaml.safe_load(open('configs/config_bids_convertor.yaml'))['output_dir'])") +MERGED_DIR="$OUTPUT_DIR" + +mkdir -p "$MERGED_DIR" + +echo "Merging logs from numbered folders under $OUTPUT_DIR" +echo "Started at $(date)" + +merged_processed="$MERGED_DIR/all_processed.json" +merged_failed="$MERGED_DIR/all_failed.json" + +# Create empty lists if not exist +echo "[]" > "$merged_processed" +echo "[]" > "$merged_failed" + +# Load jq (if not already available) +module load jq 2>/dev/null || true + +for folder in "$OUTPUT_DIR"/*/; do + foldername=$(basename "$folder") + + if [[ "$foldername" =~ ^[0-9]+$ ]]; then + echo "Merging from folder: $foldername" + if [[ -f "$folder/processing_log.json" ]]; then + tmpfile=$(mktemp) + jq -s 'add' "$merged_processed" "$folder/processing_log.json" > "$tmpfile" && mv "$tmpfile" "$merged_processed" + fi + if [[ -f "$folder/not_processed.json" ]]; then + tmpfile=$(mktemp) + jq -s 'add' "$merged_failed" "$folder/not_processed.json" > "$tmpfile" && mv "$tmpfile" "$merged_failed" + fi + fi +done + +echo "Merged logs saved in: $MERGED_DIR" +echo "Now cleaning up numbered folders..." + +# Delete only folders with numeric names (avoid final_bids-dataset) +for folder in "$OUTPUT_DIR"/*/; do + foldername=$(basename "$folder") + if [[ "$foldername" =~ ^[0-9]+$ ]]; then + echo "Deleting temporary folder: $foldername" + rm -rf "$folder" + else + echo "Skipping non-numeric folder: $foldername" + fi +done + +echo "Cleanup complete at $(date)" + +# --- Run final Python merge --- +echo "Running final Python merge and participant file creation..." +poetry run python -c "from sailsprep.BIDS_convertor import merge_subjects, create_participants_file; merge_subjects(); create_participants_file()" +echo "Final BIDS merge and participant file creation complete ✅" diff --git a/jobs/run_bids_convertor.sh b/jobs/run_bids_convertor.sh new file mode 100644 index 0000000..b85d5f6 --- /dev/null +++ b/jobs/run_bids_convertor.sh @@ -0,0 +1,41 @@ +#!/bin/bash +#SBATCH --job-name=bids_processing +#SBATCH --partition=mit_normal +#SBATCH --array=0-18 +#SBATCH --output=logs/bids_%A_%a.out +#SBATCH --error=logs/bids_%A_%a.err +#SBATCH --mem=5G +#SBATCH --time=10:00:00 +#SBATCH --cpus-per-task=5 + +mkdir -p logs + +# --- Determine project root robustly --- +if [ -n "$SLURM_SUBMIT_DIR" ]; then + cd "$SLURM_SUBMIT_DIR" || { echo "❌ Cannot cd to SLURM_SUBMIT_DIR=$SLURM_SUBMIT_DIR"; exit 1; } +else + SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" + cd "$SCRIPT_DIR/.." || { echo "❌ Cannot cd to project root"; exit 1; } +fi + +echo "Running from project root: $(pwd)" +export PYTHONUNBUFFERED=1 + +ffmpeg -version || echo "⚠️ FFmpeg not available" + +# --- Poetry setup --- +if ! poetry env info --path &> /dev/null; then + echo "Creating Poetry environment..." + poetry install || { echo "❌ Poetry install failed"; exit 1; } +fi + +ENV_PATH=$(poetry env info --path) +source "$ENV_PATH/bin/activate" || { echo "❌ Failed to activate Poetry environment"; exit 1; } + +echo "Using Python from: $(which python)" +echo "Task ID: ${SLURM_ARRAY_TASK_ID}" +echo "Starting BIDS conversion at $(date)" + +python -m sailsprep.BIDS_convertor "$SLURM_ARRAY_TASK_ID" "$SLURM_ARRAY_TASK_MAX" + +echo "Finished at $(date)" diff --git a/logs/.gitkeep b/logs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml index 72e8adf..23bd11b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,11 @@ requires-poetry = ">=2.0" version = "0.0.0" [tool.poetry.dependencies] -click = "~=8.3" +click = "~=8.2" +pandas = "^2.3.3" +opencv-python = "^4.12.0.88" +openpyxl = "^3.1.5" +types-pyyaml = "^6.0.12.20250915" [tool.poetry.group.dev] optional = true diff --git a/src/BIDS_convertor.py b/src/BIDS_convertor.py deleted file mode 100644 index 635b163..0000000 --- a/src/BIDS_convertor.py +++ /dev/null @@ -1,1145 +0,0 @@ -"""BIDS Video Processing Pipeline. - -This module processes home videos from ASD screening studies and organizes them -according to the Brain Imaging Data Structure (BIDS) specification version 1.8.0. - -The pipeline includes video stabilization, denoising, standardization, and audio -extraction for behavioral analysis research. - -Example: - Basic usage: - $ python bids_video_processor.py - -Todo: - * check with actual data -""" - -# Standard library imports -import json -import os -import plistlib -import re -import shutil -import struct -import subprocess -from datetime import datetime -from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple, Union - -# Third-party imports -import pandas as pd -import yaml -from dateutil import parser - - -def load_configuration(config_path: str = "config.yaml") -> Dict[str, Any]: - """Load configuration from YAML file. - - Args: - config_path (str): Path to the configuration YAML file. - - Returns: - dict: Configuration dictionary containing video processing parameters. - - Raises: - FileNotFoundError: If the configuration file is not found. - yaml.YAMLError: If the YAML file is malformed. - """ - with open(config_path, "r") as f: - config = yaml.safe_load(f) - return config - - -# Load configuration -config = load_configuration() -VIDEO_ROOT = config["video_root"] -ASD_CSV = config["asd_csv"] -NONASD_CSV = config["nonasd_csv"] -OUTPUT_DIR = config["output_dir"] -TARGET_RESOLUTION = config.get("target_resolution", "1280x720") -TARGET_FRAMERATE = config.get("target_fps", 30) - -# BIDS directory structure -BIDS_ROOT = os.path.join(OUTPUT_DIR, "bids-dataset") -DERIVATIVES_DIR = os.path.join(BIDS_ROOT, "derivatives", "preprocessed") - - -def create_bids_structure() -> None: - """Create the BIDS directory structure. - - Creates the main BIDS dataset directory and derivatives subdirectory - following BIDS specification requirements. - - Note: - This function creates directories with exist_ok=True to prevent - errors if directories already exist. - """ - os.makedirs(BIDS_ROOT, exist_ok=True) - os.makedirs(DERIVATIVES_DIR, exist_ok=True) - - -def create_dataset_description() -> None: - """Create dataset_description.json for main BIDS dataset. - - Generates the required dataset description file according to BIDS - specification, containing metadata about the dataset including name, - version, authors, and description. - - Raises: - IOError: If unable to write the dataset description file. - """ - dataset_desc = { - "Name": "Home Videos", - "BIDSVersion": "1.10.0", - "HEDVersion": "8.2.0", - "DatasetType": "raw", - "License": "", - "Authors": ["Research Team"], - "Acknowledgements": "participants and families", - "HowToAcknowledge": "", - "Funding": ["", "", ""], - "EthicsApprovals": [""], - "ReferencesAndLinks": ["", "", ""], - "DatasetDOI": "doi:", - } - - with open(os.path.join(BIDS_ROOT, "dataset_description.json"), "w") as f: - json.dump(dataset_desc, f, indent=4) - - -def create_derivatives_dataset_description() -> None: - """Create dataset_description.json for derivatives. - - Generates the dataset description file for the derivatives directory, - documenting the preprocessing pipeline and source datasets. - - Raises: - IOError: If unable to write the derivatives dataset description file. - """ - derivatives_desc = { - "Name": "Home Videos", - "BIDSVersion": "1.10.0", - "DatasetType": "derivative", - "GeneratedBy": [ - { - "Name": "Video Preprocessing Pipeline", - "Version": "1.0.0", - "Description": ( - "FFmpeg-based video stabilization, denoising, " - "and standardization pipeline" - ), - "CodeURL": "local", - } - ], - "SourceDatasets": [{"DOI": "", "URL": "", "Version": "1.0.0"}], - "HowToAcknowledge": "Please cite the original study", - } - - derivatives_path = os.path.join(DERIVATIVES_DIR, "dataset_description.json") - with open(derivatives_path, "w") as f: - json.dump(derivatives_desc, f, indent=4) - - -def create_readme() -> None: - """Create README file for the BIDS dataset. - - Generates a comprehensive README file documenting the dataset structure, - organization, processing pipeline, and usage instructions following - BIDS best practices. - - Raises: - IOError: If unable to write the README file. - """ - readme_content = """# README - -This README serves as the primary guide for researchers using this BIDS-format dataset. - -## Details Related to Access to the Data - -### Data User Agreement - -### Contact Person -- Name: -- Email: -- ORCID: - -### Practical Information to Access the Data - -## Overview - -### Project Information -- Project Name: [If applicable] -- Years: [YYYY-YYYY] - -### Dataset Description -This dataset contains [brief description of data types and sample size]. - -### Experimental Design - - -### Quality Assessment -[Summary statistics or QC metrics] - -## Methods - -### Subjects -[Description of participant pool] - -#### Recruitment -[Recruitment procedures] - -#### Inclusion Criteria -1. [Criterion 1] -2. [Criterion 2] - -#### Exclusion Criteria -1. [Criterion 1] -2. [Criterion 2] - -### Apparatus -[Equipment and environment details] - -### Initial Setup -[Pre-session procedures] - -### Task Organization -- Counterbalancing: [Yes/No] -- Session Structure: - 1. [Activity 1] - 2. [Activity 2] - -### Task Details - - -### Additional Data Acquired - - -### Experimental Location -[Facility/geographic details] - -### Missing Data -- Participant [ID]: [Issue description] -- Participant [ID]: [Issue description] - -### Notes -[Any additional relevant information] - -""" - - with open(os.path.join(BIDS_ROOT, "README"), "w") as f: - f.write(readme_content) - - -def get_session_from_path(video_path: Union[str, Path]) -> str: - """Determine session ID based on video path. - - Analyzes the video file path to determine which session (age group) - the video belongs to based on folder naming conventions. - - Args: - video_path (str or Path): Path to the video file. - - Returns: - str: Session ID ('01' for 12-16 months, '02' for 34-38 months). - - Note: - Defaults to session '01' if no clear age group indicator is found. - """ - path_str = str(video_path).lower() - if "12-16 month" in path_str: - return "01" - elif "34-38 month" in path_str: - return "02" - else: - # Fallback - try to infer from folder structure - return "01" # Default to session 01 - - -def create_bids_filename( - participant_id: int, session_id: str, suffix: str, extension: str -) -> str: - """Create BIDS-compliant filename. - - Generates standardized filenames following BIDS naming conventions - for participant data files. - - Args: - participant_id (int): Numeric participant identifier. - session_id (str): Session identifier (e.g., '01', '02'). - suffix (str): File type suffix (e.g., 'beh', 'events'). - extension (str): File extension without dot (e.g., 'mp4', 'tsv'). - - Returns: - str: BIDS-compliant filename. - - Example: - >>> create_bids_filename(123, '01', 'beh', 'mp4') - 'sub-123_ses-01_task-play_beh.mp4' - """ - return f"sub-{participant_id:02d}_ses-{session_id}_task-play_{suffix}.{extension}" - - -def read_demographics(asd_csv: str, nonasd_csv: str) -> pd.DataFrame: - """Read and combine demographics data from CSV files. - - Loads participant demographics from separate ASD and non-ASD CSV files, - combines them, and standardizes column names. - - Args: - asd_csv (str): Path to ASD participants CSV file. - nonasd_csv (str): Path to non-ASD participants CSV file. - - Returns: - pd.DataFrame: Combined demographics dataframe with standardized column names. - - Raises: - FileNotFoundError: If either CSV file is not found. - pd.errors.EmptyDataError: If CSV files are empty. - """ - df_asd = pd.read_csv(asd_csv) - df_nonasd = pd.read_csv(nonasd_csv) - df = pd.concat([df_asd, df_nonasd], ignore_index=True) - df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_") - return df - - -def create_participants_files( - demographics_df: pd.DataFrame, processed_data: List[Dict[str, Any]] -) -> None: - """Create participants.tsv and participants.json files. - - Generates BIDS-compliant participant information files including - a TSV file with participant data and a JSON data dictionary. - - Args: - demographics_df (pd.DataFrame): Demographics dataframe. - processed_data (list): List of processed video data dictionaries. - - Raises: - IOError: If unable to write participant files. - """ - # Get unique participants from processed data - processed_participants = set() - for entry in processed_data: - processed_participants.add(entry["bids_participant_id"]) - - # Filter demographics for only processed participants - participants_data = [] - for _, row in demographics_df.iterrows(): - participant_id = str(row["dependent_temporary_id"]).upper() - # Create consistent numeric ID - bids_id = f"sub-{hash(participant_id) % 10000:04d}" - - if bids_id in processed_participants: - participants_data.append( - { - "participant_id": bids_id, - "age": row.get("dependent_dob", "n/a"), - "sex": row.get("sex", "n/a"), - "group": ( - "ASD" - if "asd" in str(row.get("diagnosis", "")).lower() - else "NonASD" - ), - } - ) - - # Create participants.tsv - participants_df = pd.DataFrame(participants_data) - participants_df.to_csv( - os.path.join(BIDS_ROOT, "participants.tsv"), sep="\t", index=False - ) - - # Create participants.json (data dictionary) - participants_json = { - "participant_id": {"Description": "Unique participant identifier"}, - "age": {"Description": "Date of birth", "Units": "YYYY-MM-DD"}, - "sex": { - "Description": "Biological sex of participant", - "Levels": {"M": "male", "F": "female"}, - }, - "group": { - "Description": "Participant group classification", - "Levels": { - "ASD": "Autism Spectrum Disorder", - "NonASD": "Not Autism Spectrum Disorder", - }, - }, - } - - with open(os.path.join(BIDS_ROOT, "participants.json"), "w") as f: - json.dump(participants_json, f, indent=4) - - -def extract_exif(video_path: str) -> Dict[str, Any]: - """Extract video metadata using ffprobe. - - Uses FFmpeg's ffprobe tool to extract comprehensive metadata from video files - including format information, stream details, and embedded timestamps. - - Args: - video_path (str): Path to the video file. - - Returns: - dict: Dictionary containing extracted metadata including duration, - bit rate, format information, and date/time tags. - - Note: - Returns error information in the dictionary if ffprobe fails - or if the video format is unsupported. - - Example: - >>> metadata = extract_exif('/path/to/video.mp4') - >>> print(metadata['duration_sec']) - 120.5 - """ - try: - cmd = [ - "ffprobe", - "-v", - "quiet", - "-print_format", - "json", - "-show_format", - "-show_streams", - video_path, - ] - result = subprocess.run(cmd, capture_output=True, text=True) - if result.returncode != 0: - return {"ffprobe_error": result.stderr.strip()} - metadata = json.loads(result.stdout) - extracted = {} - # Format-level metadata - format_info = metadata.get("format", {}) - extracted["filename"] = format_info.get("filename") - extracted["format"] = format_info.get("format_long_name") - extracted["duration_sec"] = float(format_info.get("duration", 0)) - extracted["bit_rate"] = int(format_info.get("bit_rate", 0)) - extracted["size_bytes"] = int(format_info.get("size", 0)) - # Date/time-related tags from format - extracted["format_dates"] = {} - if "tags" in format_info: - for k, v in format_info["tags"].items(): - if "date" in k.lower() or "time" in k.lower(): - extracted["format_dates"][k] = v - # Loop through all streams (video, audio, etc.) - extracted["stream_dates"] = [] - for stream in metadata.get("streams", []): - stream_entry = {} - if "tags" in stream: - for k, v in stream["tags"].items(): - if "date" in k.lower() or "time" in k.lower(): - stream_entry[k] = v - if stream_entry: - extracted["stream_dates"].append(stream_entry) - return extracted - except Exception as e: - return {"error": str(e)} - - -def extract_date_from_filename(filename: str) -> Optional[str]: - """Extract date from filename using various patterns. - - Attempts to parse dates from video filenames using multiple common - date formats and patterns, including Facebook/Instagram formats - and standard date conventions. - - Args: - filename (str): Video filename to parse. - - Returns: - str or None: Formatted date string in "YYYY:MM:DD HH:MM:SS" format, - or None if no valid date pattern is found. - - Note: - This function tries multiple date formats and patterns to maximize - compatibility with various naming conventions used by different - devices and platforms. - - Example: - >>> extract_date_from_filename('video_2023-12-25.mp4') - '2023:12:25 00:00:00' - """ - try: - name = os.path.splitext(os.path.basename(filename))[0] - # Try direct known formats - known_formats = [ - "%m-%d-%Y", - "%m-%d-%y", - "%m_%d_%Y", - "%m_%d_%y", - "%Y-%m-%d", - "%Y%m%d", - "%m%d%Y", - ] - for fmt in known_formats: - try: - return datetime.strptime(name, fmt).strftime("%Y:%m:%d %H:%M:%S") - except ValueError: - continue - # Try extracting from YYYYMMDD_HHMMSS or FB_/IMG_ formats - match = re.search(r"(20\d{6})[_\-]?(?:([01]\d{3,4}))?", name) - if match: - date_str = match.group(1) - time_str = match.group(2) if match.group(2) else "000000" - if len(time_str) == 4: # HHMM - time_str += "00" - dt = datetime.strptime(date_str + time_str, "%Y%m%d%H%M%S") - return dt.strftime("%Y:%m:%d %H:%M:%S") - # Try M-D-YYYY, D-M-YYYY fallback - fallback = re.match(r"(\d{1,2})[\-_](\d{1,2})[\-_](\d{2,4})", name) - if fallback: - m, d, y = fallback.groups() - if len(y) == 2: - y = "20" + y # assume 20xx - try: - dt = datetime.strptime(f"{m}-{d}-{y}", "%m-%d-%Y") - return dt.strftime("%Y:%m:%d %H:%M:%S") - except ValueError: - pass - try: - dt = datetime.strptime(f"{d}-{m}-{y}", "%d-%m-%Y") - return dt.strftime("%Y:%m:%d %H:%M:%S") - except ValueError: - pass - raise ValueError("No valid date format found in filename.") - except Exception as e: - print(f"Could not extract date from filename {filename}: {e}") - return None - - -def calculate_age(dob_str: str, video_date: datetime) -> Optional[float]: - """Calculate age in months at time of video. - - Computes the participant's age in months at the time the video was recorded - based on their date of birth and the video recording date. - - Args: - dob_str (str): Date of birth string in parseable format. - video_date (datetime): Date when the video was recorded. - - Returns: - float or None: Age in months (rounded to 1 decimal place), - or None if calculation fails. - - Note: - Uses 30.44 days per month for calculation to account for - varying month lengths. - - Example: - >>> from datetime import datetime - >>> dob = "2022-01-15" - >>> video_dt = datetime(2023, 1, 15) - >>> calculate_age(dob, video_dt) - 12.0 - """ - try: - dob = parser.parse(dob_str) - delta = video_date - dob - age_months = round(delta.days / 30.44, 1) - return age_months - except Exception: - return None - - -def stabilize_video(input_path: str, stabilized_path: str) -> None: - """Stabilize video using ffmpeg vidstab. - - Applies video stabilization using FFmpeg's vidstab filter to reduce - camera shake and improve video quality for analysis. - - Args: - input_path (str): Path to input video file. - stabilized_path (str): Path for output stabilized video file. - - Note: - This function uses a two-pass approach: first detecting motion - vectors, then applying stabilization transforms. Temporary - transform files are automatically cleaned up. - - Todo: - Add error handling for FFmpeg execution failures. - """ - detect_cmd = [ - "ffmpeg", - "-i", - input_path, - "-vf", - "vidstabdetect=shakiness=5:accuracy=15", - "-f", - "null", - "-", - ] - subprocess.run(detect_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - transform_cmd = [ - "ffmpeg", - "-y", - "-i", - input_path, - "-vf", - "vidstabtransform=smoothing=30:input=transforms.trf", - "-c:v", - "libx264", - "-preset", - "slow", - "-crf", - "23", - "-c:a", - "copy", - stabilized_path, - ] - subprocess.run(transform_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - if os.path.exists("transforms.trf"): - os.remove("transforms.trf") - - -def preprocess_video(input_path: str, output_path: str) -> None: - """Preprocess video with stabilization, denoising, and standardization. - - Applies a comprehensive video processing pipeline including stabilization, - denoising, color equalization, and format standardization to prepare - videos for behavioral analysis. - - Args: - input_path (str): Path to input video file. - output_path (str): Path for output processed video file. - - Note: - The processing pipeline includes: - - Video stabilization using vidstab - - Deinterlacing using yadif - - Noise reduction using hqdn3d - - Color equalization - - Resolution scaling to 720p - - Frame rate standardization - - H.264 encoding with optimized settings - - Todo: - Add progress reporting for long video processing tasks. - """ - stabilized_tmp = input_path.replace(".mp4", "_stab.mp4").replace( - ".mov", "_stab.mov" - ) - stabilize_video(input_path, stabilized_tmp) - vf_filters = ( - "yadif," - "hqdn3d," - "eq=contrast=1.0:brightness=0.0:saturation=1.0," - "scale=-2:720," - "pad=ceil(iw/2)*2:ceil(ih/2)*2," - f"fps={TARGET_FRAMERATE}" - ) - cmd = [ - "ffmpeg", - "-y", - "-i", - stabilized_tmp, - "-vf", - vf_filters, - "-c:v", - "libx264", - "-crf", - "23", - "-preset", - "fast", - "-c:a", - "aac", - "-b:a", - "128k", - "-movflags", - "+faststart", - output_path, - ] - subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - os.remove(stabilized_tmp) - - -def extract_audio(input_path: str, output_audio_path: str) -> None: - """Extract audio from video file. - - Extracts audio track from processed video and converts it to standardized - format suitable for speech and audio analysis. - - Args: - input_path (str): Path to input video file. - output_audio_path (str): Path for output audio file. - - Note: - Audio is extracted with the following specifications: - - Sample rate: 16 kHz - - Channels: Mono (1 channel) - - Encoding: 16-bit PCM WAV - These settings are optimized for speech analysis applications. - """ - cmd = [ - "ffmpeg", - "-y", - "-i", - input_path, - "-vn", - "-acodec", - "pcm_s16le", - "-ar", - "16000", - "-ac", - "1", - output_audio_path, - ] - subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - - -def parse_appledouble_metadata(metafile_path: str) -> Dict[str, Any]: - """Parse AppleDouble metadata files. - - Extracts metadata from macOS AppleDouble files (._filename) which contain - extended attributes, resource forks, and other file system metadata. - - Args: - metafile_path (str): Path to AppleDouble metadata file. - - Returns: - dict: Dictionary containing parsed metadata including extended attributes, - resource fork information, and Finder info when available. - - Note: - AppleDouble files are created by macOS when files are copied to - non-HFS+ filesystems. They preserve metadata that would otherwise - be lost, including creation dates and extended attributes. - - Example: - >>> metadata = parse_appledouble_metadata('._video.mp4') - >>> print(metadata.get('extended_attributes', {})) - """ - try: - with open(metafile_path, "rb") as f: - content = f.read() - if not content.startswith(b"\x00\x05\x16\x07"): - return {"info": "Not AppleDouble format"} - entries = {} - entry_count = struct.unpack(">H", content[24:26])[0] - for i in range(entry_count): - entry_offset = 26 + (i * 12) - entry_id, offset, length = struct.unpack( - ">III", content[entry_offset : entry_offset + 12] - ) - entry_data = content[offset : offset + length] - # Extended attributes - if entry_id == 9: - if b"bplist" in entry_data: - try: - plist_start = entry_data.index(b"bplist") - plist_data = entry_data[plist_start:] - xattrs = plistlib.loads(plist_data) - for key, val in xattrs.items(): - if isinstance(val, bytes): - try: - val = plistlib.loads(val) - except Exception: - val = val.decode(errors="ignore") - key_str = key.decode() if isinstance(key, bytes) else key - entries[key_str] = val - except Exception as e: - entries["extended_attributes_error"] = str(e) - elif entry_id == 2: - entries["resource_fork_bytes"] = len(entry_data) - elif entry_id == 1: - entries["finder_info_present"] = True - if not entries: - return { - "info": "AppleDouble metadata detected", - "hex_preview": content[:64].hex(), - } - return entries - except Exception as e: - return {"error": f"Failed to parse AppleDouble: {e}"} - - -def create_events_tsv(video_metadata: Dict[str, Any], output_path: str) -> None: - """Create events.tsv file for video. - - Generates a BIDS-compliant events file documenting the timing and nature - of events in the video session. - - Args: - video_metadata (dict): Video metadata containing duration information. - output_path (str): Path for output events TSV file. - - Note: - For free play sessions, creates a single event spanning the entire - video duration with trial_type 'free_play'. - - Raises: - IOError: If unable to write the events file. - """ - events_data = [ - { - "onset": 0.0, - "duration": video_metadata.get("duration_sec", 0), - "trial_type": "free_play", - "response_time": "n/a", - } - ] - - events_df = pd.DataFrame(events_data) - events_df.to_csv(output_path, sep="\t", index=False) - - -def create_video_metadata_json( - metadata: Dict[str, Any], processing_info: Dict[str, Any], output_path: str -) -> None: - """Create JSON metadata file for processed video. - - Generates a BIDS-compliant JSON sidecar file containing video metadata, - processing parameters, and task information. - - Args: - metadata (dict): Original video metadata from ffprobe. - processing_info (dict): Information about processing steps applied. - output_path (str): Path for output JSON metadata file. - - Raises: - IOError: If unable to write the metadata file. - - Note: - The JSON file includes both technical specifications and processing - pipeline information required for reproducible analysis. - """ - video_json = { - "TaskName": "free_play", - "TaskDescription": "Free play session recorded at home", - "Instructions": "Natural play behavior in home environment", - "SamplingFrequency": TARGET_FRAMERATE, - "Resolution": TARGET_RESOLUTION, - "ProcessingPipeline": { - "Stabilization": processing_info.get("has_stabilization", False), - "Denoising": processing_info.get("has_denoising", False), - "Equalization": processing_info.get("has_equalization", False), - "StandardizedFPS": TARGET_FRAMERATE, - "StandardizedResolution": TARGET_RESOLUTION, - }, - "OriginalMetadata": metadata, - } - - with open(output_path, "w") as f: - json.dump(video_json, f, indent=4) - - -def create_audio_metadata_json(duration_sec: float, output_path: str) -> None: - """Create JSON metadata file for extracted audio. - - Generates a BIDS-compliant JSON sidecar file for audio files extracted - from video sessions, documenting technical specifications and task context. - - Args: - duration_sec (float): Duration of audio file in seconds. - output_path (str): Path for output JSON metadata file. - - Raises: - IOError: If unable to write the metadata file. - - Note: - Audio specifications are standardized for speech analysis: - 16kHz sampling rate, mono channel, 16-bit encoding. - """ - audio_json = { - "SamplingFrequency": 16000, - "Channels": 1, - "SampleEncoding": "16bit", - "Duration": duration_sec, - "TaskName": "free_play", - "TaskDescription": "Audio extracted from free play session", - } - - with open(output_path, "w") as f: - json.dump(audio_json, f, indent=4) - - -def process_videos( - video_root: str, demographics_df: pd.DataFrame -) -> Tuple[List[Dict[str, Any]], List[Union[str, Dict[str, Any]]]]: - """Process videos and organize in BIDS format. - - Main processing function that walks through video directories, processes - each video file, and organizes the results according to BIDS specification. - - Args: - video_root (str): Root directory containing video files. - demographics_df (pd.DataFrame): DataFrame containing participant demographics. - - Returns: - tuple: A tuple containing: - - list: Successfully processed video entries with metadata - - list: Videos that failed processing with error information - (strings for simple failures, dicts for detailed errors) - - Note: - This function performs the complete processing pipeline: - 1. Video discovery and metadata extraction - 2. Participant identification and matching - 3. BIDS directory structure creation - 4. Video processing (stabilization, denoising, standardization) - 5. Audio extraction - 6. Metadata file generation - - Todo: - Add parallel processing support for large video collections. - Implement progress reporting with estimated completion times. - """ - all_data = [] - not_processed: List[Union[str, Dict[str, Any]]] = [] - processed_files = set() - demographics_df["dependent_temporary_id"] = ( - demographics_df["dependent_temporary_id"].astype(str).str.upper() - ) - - for root, dirs, files in os.walk(video_root): - for file in files: - if file.startswith("._"): - real_name = file[2:] - real_path = os.path.join(root, real_name) - if os.path.exists(real_path): - metadata_path = os.path.join(root, file) - metadata_info = parse_appledouble_metadata(metadata_path) - print(f"[AppleDouble] Metadata for {real_name}: {metadata_info}") - continue # Skip ._ file itself - - # Skip unsupported formats - if not file.lower().endswith((".mov", ".mp4")): - print(f"[SKIP] Unsupported file type: {file}") - continue - - if file.lower().endswith((".mov", ".mp4")) and not file.startswith( - ".DS_Store" - ): - if file in processed_files: - continue - processed_files.add(file) - video_path = os.path.join(root, file) - - try: - print(f"[PROCESS] Processing file: {file}") - exif_data = extract_exif(video_path) - if "error" in exif_data or "ffprobe_error" in exif_data: - raise ValueError("Unreadable or unsupported video format") - - # Extract participant ID from folder structure - folder_parts = Path(video_path).parts - matching_folder = next( - ( - part - for part in folder_parts - if "_" in part - and part.upper().endswith( - tuple(demographics_df["dependent_temporary_id"].values) - ) - ), - None, - ) - if not matching_folder: - not_processed.append(video_path) - continue - - participant_id_str = matching_folder.split("_")[-1].upper() - demo_row = demographics_df[ - demographics_df["dependent_temporary_id"] == participant_id_str - ] - if demo_row.empty: - not_processed.append(video_path) - continue - - # Create consistent numeric participant ID for BIDS - bids_participant_id = f"sub-{hash(participant_id_str) % 10000:04d}" - bids_participant_num = hash(participant_id_str) % 10000 - - # Determine session from path - session_id = get_session_from_path(video_path) - - # Extract video date and calculate age - video_date_str = extract_date_from_filename(file) - if not video_date_str: - raise ValueError("Could not extract date from filename") - video_date = datetime.strptime(video_date_str, "%Y:%m:%d %H:%M:%S") - age = calculate_age(demo_row.iloc[0]["dependent_dob"], video_date) - - # Create BIDS directory structure for this participant/session - raw_subj_dir = os.path.join( - BIDS_ROOT, bids_participant_id, f"ses-{session_id}", "beh" - ) - deriv_subj_dir = os.path.join( - DERIVATIVES_DIR, bids_participant_id, f"ses-{session_id}", "beh" - ) - os.makedirs(raw_subj_dir, exist_ok=True) - os.makedirs(deriv_subj_dir, exist_ok=True) - - # Create BIDS filenames - raw_video_name = create_bids_filename( - bids_participant_num, session_id, "beh", "mp4" - ) - processed_video_name = create_bids_filename( - bids_participant_num, session_id, "desc-processed_beh", "mp4" - ) - audio_name = create_bids_filename( - bids_participant_num, session_id, "audio", "wav" - ) - events_name = create_bids_filename( - bids_participant_num, session_id, "events", "tsv" - ) - processed_events_name = create_bids_filename( - bids_participant_num, session_id, "desc-processed_events", "tsv" - ) - - # File paths - raw_video_path = os.path.join(raw_subj_dir, raw_video_name) - processed_video_path = os.path.join( - deriv_subj_dir, processed_video_name - ) - audio_path = os.path.join(deriv_subj_dir, audio_name) - events_path = os.path.join(raw_subj_dir, events_name) - processed_events_path = os.path.join( - deriv_subj_dir, processed_events_name - ) - - # Copy raw video to BIDS structure - if not os.path.exists(raw_video_path): - shutil.copy2(video_path, raw_video_path) - - # Process video - if not os.path.exists(processed_video_path): - preprocess_video(video_path, processed_video_path) - - # Extract audio - if not os.path.exists(audio_path): - extract_audio(processed_video_path, audio_path) - - # Create events files - create_events_tsv(exif_data, events_path) - # Copy for derivatives - create_events_tsv(exif_data, processed_events_path) - - # Create metadata JSON files - processing_info = { - "has_stabilization": True, - "has_denoising": True, - "has_equalization": True, - } - - video_json_path = processed_video_path.replace(".mp4", ".json") - create_video_metadata_json( - exif_data, processing_info, video_json_path - ) - - audio_json_path = audio_path.replace(".wav", ".json") - create_audio_metadata_json( - exif_data.get("duration_sec", 0), audio_json_path - ) - - # Look for associated AppleDouble metadata - apple_metadata = None - apple_file = os.path.join(os.path.dirname(video_path), f"._{file}") - if os.path.exists(apple_file): - apple_metadata = parse_appledouble_metadata(apple_file) - - entry = { - "original_participant_id": participant_id_str, - "bids_participant_id": bids_participant_id, - "session_id": session_id, - "original_video": video_path, - "raw_video_bids": raw_video_path, - "processed_video_bids": processed_video_path, - "audio_file_bids": audio_path, - "events_file_bids": events_path, - "video_date": video_date.isoformat(), - "age_months": age, - "duration_sec": exif_data.get("duration_sec", 0), - "metadata": exif_data, - "apple_metadata": apple_metadata, - "processing_info": processing_info, - } - all_data.append(entry) - - except Exception as e: - print(f"[ERROR] Failed to process {video_path}: {str(e)}") - not_processed.append({"video": video_path, "error": str(e)}) - - return all_data, not_processed - - -def save_json(data: Union[List[Any], Dict[str, Any]], path: str) -> None: - """Save data to JSON file. - - Utility function to save Python data structures to JSON files with - proper formatting and error handling. - - Args: - data (list or dict): Data structure to save as JSON. - path (str): Output file path for JSON file. - - Raises: - IOError: If unable to write to the specified path. - TypeError: If data contains non-serializable objects. - - Note: - Uses 4-space indentation for readable JSON output. - """ - with open(path, "w") as f: - json.dump(data, f, indent=4) - - -def main() -> None: - """Main processing function. - - Orchestrates the complete BIDS video processing pipeline including - directory structure creation, dataset description generation, video - processing, and metadata file creation. - - This function serves as the entry point for the processing pipeline - and handles the overall workflow coordination. - - Raises: - Exception: Various exceptions may be raised during processing, - which are caught and reported appropriately. - - Note: - Processing progress and statistics are printed to stdout for - monitoring large batch operations. - - Example: - >>> main() - Starting BIDS format video processing... - [PROCESS] Processing file: video001.mp4 - ... - Processing complete! - Successfully processed: 45 videos - Failed to process: 2 videos - """ - print("Starting BIDS format video processing...") - - # Create BIDS directory structure - create_bids_structure() - - # Create dataset description files - create_dataset_description() - create_derivatives_dataset_description() - - # Create README file - create_readme() - - # Read demographics and process videos - demographics_df = read_demographics(ASD_CSV, NONASD_CSV) - all_data, not_processed = process_videos(VIDEO_ROOT, demographics_df) - - # Create participants files - create_participants_files(demographics_df, all_data) - - # Save processing logs - save_json(all_data, os.path.join(OUTPUT_DIR, "bids_processing_log.json")) - save_json(not_processed, os.path.join(OUTPUT_DIR, "bids_not_processed.json")) - - print("Processing complete!") - print(f"Successfully processed: {len(all_data)} videos") - print(f"Failed to process: {len(not_processed)} videos") - print(f"BIDS dataset created at: {BIDS_ROOT}") - - -if __name__ == "__main__": - main() diff --git a/src/sailsprep/BIDS_convertor.py b/src/sailsprep/BIDS_convertor.py new file mode 100644 index 0000000..503fba4 --- /dev/null +++ b/src/sailsprep/BIDS_convertor.py @@ -0,0 +1,1597 @@ +"""BIDS Video Processing Pipeline. + +This module processes home videos from ASD screening studies and organizes them +according to the Brain Imaging Data Structure (BIDS) specification version 1.9.0. + +The pipeline includes video stabilization, denoising, standardization, and audio +extraction for behavioral analysis research. + +Example: + Basic usage: + $ python bids_video_processor.py + +Todo: + * check with actual data +""" + +import argparse +import json +import os +import re +import shutil +import subprocess +import sys +import time +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union + +import cv2 +import pandas as pd +import yaml + + +def load_configuration(config_path: str = "config.yaml") -> Dict[str, Any]: + """Load and validate configuration from YAML file. + + Args: + config_path (str): Path to the configuration YAML file. + + Returns: + dict: Configuration dictionary containing video processing parameters. + + Raises: + FileNotFoundError: If the configuration file is not found. + yaml.YAMLError: If the YAML file is malformed. + KeyError: If required keys are missing in the configuration. + """ + with open(config_path, "r") as f: + config = yaml.safe_load(f) + + required_keys = [ + "annotation_file", + "video_root", + "output_dir", + "target_resolution", + "target_framerate", + "asd_status", + ] + + missing_keys = [key for key in required_keys if key not in config] + if missing_keys: + raise KeyError(f"Missing configuration keys: {', '.join(missing_keys)}") + return config + + +# Load configuration +config_path = ( + Path(__file__).resolve().parents[2] / "configs" / "config_bids_convertor.yaml" +) +config = load_configuration(str(config_path)) +# Unpack configuration +ANNOTATION_FILE = config["annotation_file"] +VIDEO_ROOT = config["video_root"] +OUTPUT_DIR = config["output_dir"] +TARGET_RESOLUTION = config["target_resolution"] +TARGET_FRAMERATE = config["target_framerate"] +ASD_STATUS_FILE = config["asd_status"] + +# BIDS directory structure +FINAL_BIDS_ROOT = os.path.join( + OUTPUT_DIR, config.get("final_bids_root", "final_bids-dataset") +) +FINAL_DERIVATIVES_DIR = os.path.join( + FINAL_BIDS_ROOT, config.get("derivatives_subdir", "derivatives/preprocessed") +) + + +def create_bids_structure() -> None: + """Create the BIDS directory structure. + + Creates the main BIDS dataset directory and derivatives subdirectory + following BIDS specification requirements. + + Note: + This function creates directories with exist_ok=True to prevent + errors if directories already exist. + """ + os.makedirs(FINAL_BIDS_ROOT, exist_ok=True) + os.makedirs(FINAL_DERIVATIVES_DIR, exist_ok=True) + + +def save_json(data: Union[List[Any], Dict[str, Any]], path: str) -> None: + """Save data to JSON file. + + Utility function to save Python data structures to JSON files with + proper formatting and error handling. + + Args: + data (list or dict): Data structure to save as JSON. + path (str): Output file path for JSON file. + + Raises: + IOError: If unable to write to the specified path. + TypeError: If data contains non-serializable objects. + + Note: + Uses 4-space indentation for readable JSON output. + """ + with open(path, "w") as f: + json.dump(data, f, indent=4) + + +def safe_print(message: str) -> None: + """Print with timestamps.""" + timestamp = datetime.now().strftime("%H:%M:%S") + print(f"{timestamp} [MAIN] {message}") + + +# Helper functions +def parse_duration(duration_str: str) -> float: + """Parse duration string to seconds.""" + try: + if pd.isna(duration_str) or duration_str == "": + return 0.0 + duration_str = str(duration_str) + if ":" in duration_str: + parts = duration_str.split(":") + if len(parts) == 3: + hours = int(parts[0]) + minutes = int(parts[1]) + seconds = float(parts[2]) + return hours * 3600 + minutes * 60 + seconds + elif len(parts) == 2: + minutes = int(parts[0]) + seconds = float(parts[1]) + return minutes * 60 + seconds + return float(duration_str) + except (ValueError, TypeError): + return 0.0 + + +def make_bids_task_label(task_name: str) -> str: + """Convert TaskName to BIDS-compatible task label for filenames.""" + s = str(task_name).strip() + s = re.sub(r"[^0-9a-zA-Z+]", "", s) # Keep only alphanumeric and + + return s + + +def get_video_properties(video_path: str) -> dict: + """Extract video properties using OpenCV.""" + try: + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + return {"FrameRate": None, "Resolution": None} + + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + cap.release() + + return { + "FrameRate": fps, + "Resolution": f"{width}x{height}", + } + + except Exception as e: + print(f"Error reading video {video_path}: {e}") + return {"FrameRate": None, "Resolution": None} + + +def determine_session_from_folder(folder_name: str) -> Optional[str]: + """Determine the session ID from a folder name based on known age-related patterns. + + Args: + folder_name (str): The name of the folder to check. + + Returns: + Optional[str]: "01" for 12–16 month sessions, "02" for 34–38 month sessions, + or None if no match. + """ + folder_lower = folder_name.lower() + + # Session 01 patterns + if any( + pattern in folder_lower + for pattern in [ + "12-16 month", + "12-14 month", + "12_16", + "12_14", + "12-16month", + "12-14month", + "12-16_month_videos", + ] + ): + return "01" + + # Session 02 patterns (typos and variants included) + if any( + pattern in folder_lower + for pattern in [ + "34-38 month", + "34-28 month", + "34-48 month", + "34_38", + "34_28", + "34_48", + "34-38month", + "34-28month", + "34-48month", + "34-38_month_videos", + ] + ): + return "02" + + return None + + +def find_age_folder_session(current_path: str, participant_path: str) -> Optional[str]: + """Recursively seek the timepoint folder. + + Args: + current_path (str): Current directory path to inspect. + participant_path (str): Root path of the participant. + + Returns: + Optional[str]: Session ID ("01" or "02") if detected, else None. + """ + if ( + not current_path.startswith(participant_path) + or current_path == participant_path + ): + return None + + current_folder = os.path.basename(current_path) + session_id = determine_session_from_folder(current_folder) + if session_id: + return session_id + + parent_path = os.path.dirname(current_path) + return find_age_folder_session(parent_path, participant_path) + + +def extract_participant_id_from_folder(folder_name: str) -> str: + """Extract the participant ID from folder names. + + Args: + folder_name (str): Folder name containing participant info. + + Returns: + str: Extracted participant ID. + """ + if "AMES_" in folder_name: + parts = folder_name.split("AMES_") + if len(parts) > 1: + return parts[1].strip() + + if "_" in folder_name: + return folder_name.split("_")[-1] + + return folder_name + + +def determine_session_from_excel( + current_path: str, annotation_df: pd.DataFrame, participant_id: str +) -> Optional[str]: + """Determine the session ID for a video based on the annotation file. + + Args: + current_path (str): Path to the video file. + annotation_df (pd.DataFrame): Excel data containing 'ID', + 'FileName', 'timepoint', and 'Age' columns. + participant_id (str): Participant identifier. + + Returns: + Optional[str]: Session ID ("01" or "02"), or None if not found. + """ + filename = os.path.splitext(os.path.basename(current_path))[0] + if participant_id.endswith(" 2"): + participant_id = participant_id[:-2].strip() + # Filter for the participant + participant_excel = annotation_df[ + annotation_df["ID"].astype(str) == str(participant_id) + ] + if participant_excel.empty: + raise ValueError( + f"Participant ID '{participant_id}' not found in Excel metadata" + f" for file '{filename}'." + ) + + # Match the video filename (without extension) + mask = participant_excel["FileName"].str.split(".").str[0] == filename + video_entry = participant_excel[mask] + + if video_entry.empty: + raise ValueError( + f"No matching Excel entry found for video '{filename}'" + f"(participant {participant_id})." + ) + + timepoint = video_entry["timepoint"].iloc[0] + age = video_entry["Age"].iloc[0] + + # Normalize timepoint to string for pattern matching + timepoint_str = str(timepoint) + + if "14" in timepoint_str: + return "01" + elif "36" in timepoint_str: + return "02" + elif pd.notna(age): + return "01" if age < 2 else "02" + else: + raise ValueError( + f"Unable to determine session ID: timepoint={timepoint}, age={age}" + ) + + +def find_session_id( + directory: str, + current_path: str, + participant_path: str, + annotation_df: pd.DataFrame, + participant_id: str, + excel: bool = True, +) -> Optional[str]: + """Determine session ID by checking folder names first, then Excel data if needed. + + Args: + directory (str): Current directory being scanned. + current_path (str): Full path to the file. + participant_path (str): Root participant directory. + annotation_df (pd.DataFrame): Excel metadata. + participant_id (str): Participant identifier. + excel (bool) : Whether to use Excel data for session determination. + + Returns: + Optional[str]: Session ID ("01" or "02"), or None. + """ + if ( + not current_path.startswith(participant_path) + or current_path == participant_path + ): + return None + + try: + folder_name = os.path.basename(directory) + session_id = determine_session_from_folder(folder_name) + + if not session_id and excel: + try: + session_id = determine_session_from_excel( + current_path, annotation_df, participant_id + ) + except ValueError as e: + print(f"Excel lookup failed for {participant_id}: {e}") + + if session_id: + return session_id + + # Recurse upward if not found + parent_path = os.path.dirname(directory) + if parent_path != directory: + return find_session_id( + parent_path, + current_path, + participant_path, + annotation_df, + participant_id, + False, + ) + + except PermissionError: + print(f"Permission denied: {current_path}") + except Exception as e: + print(f"Error accessing {current_path}: {e}") + + return None + + +def find_videos_recursive( + directory: str, + participant_path: str, + annotation_df: pd.DataFrame, + participant_id: str, +) -> List[Tuple[str, Optional[str]]]: + """Recursively find video files and determine their session IDs. + + Args: + directory (str): Directory to search in. + participant_path (str): Root path of the participant. + annotation_df (pd.DataFrame): Excel data for metadata lookup. + participant_id (str): Participant identifier. + + Returns: + List[Tuple[str, Optional[str]]]: List of (video_path, session_id) pairs. + """ + videos = [] + try: + for item in os.listdir(directory): + if item.startswith("."): + continue # Skip hidden files + + item_path = os.path.join(directory, item) + + if os.path.isfile(item_path) and item.lower().endswith( + (".mp4", ".mov", ".avi", ".mkv", ".m4v", ".3gp", ".mts") + ): + session_id = find_session_id( + directory, + item_path, + participant_path, + annotation_df, + participant_id, + ) + videos.append((item_path, session_id)) + + elif os.path.isdir(item_path): + videos.extend( + find_videos_recursive( + item_path, participant_path, annotation_df, participant_id + ) + ) + + except PermissionError: + print(f"Permission denied: {directory}") + except Exception as e: + print(f"Error accessing {directory}: {e}") + + return videos + + +def get_all_videos(video_root: str, annotation_df: pd.DataFrame) -> List[dict]: + """Find and label all participant videos with their corresponding session IDs. + + Args: + video_root (str): Root directory containing all participant folders. + annotation_df (pd.DataFrame): Excel data with metadata. + + Returns: + List[dict]: List of video metadata dictionaries. + """ + all_videos = [] + + try: + for participant_folder in os.listdir(video_root): + participant_path = os.path.join(video_root, participant_folder) + if not os.path.isdir(participant_path): + continue + + participant_id = extract_participant_id_from_folder(participant_folder) + if not participant_id: + continue + + videos = find_videos_recursive( + participant_path, participant_path, annotation_df, participant_id + ) + + for video_path, session_id in videos: + if session_id in {"01", "02"}: + all_videos.append( + { + "participant_id": participant_id, + "filename": os.path.basename(video_path), + "full_path": video_path, + "session_id": session_id, + "age_folder": os.path.basename(os.path.dirname(video_path)), + } + ) + + except Exception as e: + print(f"Error scanning video folders: {e}") + + return all_videos + + +def create_dummy_excel_data( + video_path: str, participant_id: str, session_id: str, task_label: str = "unknown" +) -> dict[str, str]: + """Create dummy behavioral data for videos not in Excel file.""" + video_filename = os.path.basename(video_path) + + dummy_row_data = { + "ID": participant_id, + "FileName": video_filename, + "Context": task_label, + "Location": "n/a", + "Activity": "n/a", + "Child_of_interest_clear": "n/a", + "#_adults": "n/a", + "#_children": "n/a", + "#_people_background": "n/a", + "Interaction_with_child": "n/a", + "#_people_interacting": "n/a", + "Child_constrained": "n/a", + "Constraint_type": "n/a", + "Supports": "n/a", + "Support_type": "n/a", + "Example_support_type": "n/a", + "Gestures": "n/a", + "Gesture_type": "n/a", + "Vocalizations": "n/a", + "RMM": "n/a", + "RMM_type": "n/a", + "Response_to_name": "n/a", + "Locomotion": "n/a", + "Locomotion_type": "n/a", + "Grasping": "n/a", + "Grasp_type": "n/a", + "Body_Parts_Visible": "n/a", + "Angle_of_Body": "n/a", + "time_point": "n/a", + "DOB": "n/a", + "Vid_date": "n/a", + "Video_Quality_Child_Face_Visibility": "n/a", + "Video_Quality_Child_Body_Visibility": "n/a", + "Video_Quality_Child_Hand_Visibility": "n/a", + "Video_Quality_Lighting": "n/a", + "Video_Quality_Resolution": "n/a", + "Video_Quality_Motion": "n/a", + "Coder": "n/a", + "SourceFile": "n/a", + "Vid_duration": "00:00:00", + "Notes": "Video not found in Excel file - behavioral data unavailable", + } + + return dummy_row_data + + +def get_task_from_excel_row(row: pd.Series) -> str: + """Extract and create task label from Excel row data.""" + context = str(row.get("Context", "")).strip() + + if context and context.lower() not in ["nan", "n/a", ""]: + return make_bids_task_label(context) + else: + return "unknown" + + +def get_next_run_number( + participant_id: str, session_id: str, task_label: str, final_bids_root: str +) -> int: + """Find the next available run number for this participant/session/task.""" + beh_dir = os.path.join( + final_bids_root, f"sub-{participant_id}", f"ses-{session_id}", "beh" + ) + + if not os.path.exists(beh_dir): + return 1 + + # Look for existing files with this task + pattern = f"sub-{participant_id}_ses-{session_id}_task-{task_label}_" + existing_files = [f for f in os.listdir(beh_dir) if f.startswith(pattern)] + + if not existing_files: + return 1 + + # Extract run numbers from existing files + run_numbers = [] + for filename in existing_files: + if "_run-" in filename: + run_part = filename.split("_run-")[1].split("_")[0] + try: + run_numbers.append(int(run_part)) + except ValueError: + continue + else: + run_numbers.append(1) # Files without run numbers are considered run-1 + + return max(run_numbers) + 1 if run_numbers else 1 + + +def create_bids_filename( + participant_id: str, + session_id: str, + task_label: str, + suffix: str, + extension: str, + run_id: int = 1, +) -> str: + """Create BIDS-compliant filename w run identifier for multiple videos per task.""" + return ( + f"sub-{participant_id}_" + f"ses-{session_id}_" + f"task-{task_label}_" + f"run-{run_id:02d}_" + f"{suffix}.{extension}" + ) + + +# Video processing functions +def extract_exif(video_path: str) -> Dict[str, Any]: + """Extract video metadata using ffprobe.""" + try: + cmd = [ + "ffprobe", + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + "-show_streams", + video_path, + ] + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + return {"ffprobe_error": result.stderr.strip()} + + metadata = json.loads(result.stdout) + extracted = {} + + format_info = metadata.get("format", {}) + extracted["filename"] = format_info.get("filename") + extracted["format"] = format_info.get("format_long_name") + extracted["duration_sec"] = float(format_info.get("duration", 0)) + extracted["bit_rate"] = int(format_info.get("bit_rate", 0)) + extracted["size_bytes"] = int(format_info.get("size", 0)) + + return extracted + except Exception as e: + return {"error": str(e)} + + +def stabilize_video(input_path: str, stabilized_path: str, temp_dir: str) -> None: + """Stabilize video using FFmpeg vidstab filters, with error checks.""" + os.makedirs(temp_dir, exist_ok=True) + transforms_file = os.path.join(temp_dir, "transforms.trf") + if not os.path.exists(input_path): + raise FileNotFoundError(f"Video to stabilize not found: {input_path}") + + # Step 1: Detect transforms + detect_cmd = [ + "ffmpeg", + "-y", + "-i", + input_path, + "-vf", + f"vidstabdetect=shakiness=5:accuracy=15:result={transforms_file}", + "-f", + "null", + "-", + ] + detect_proc = subprocess.run(detect_cmd, capture_output=True, text=True) + + if detect_proc.returncode != 0: + print(f"[ERROR] vidstabdetect failed for {input_path}:\n{detect_proc.stderr}") + raise RuntimeError(f"FFmpeg vidstabdetect failed for {input_path}") + + if not os.path.exists(transforms_file): + raise FileNotFoundError(f"Transform file not created: {transforms_file}") + + # Step 2: Apply transforms + transform_cmd = [ + "ffmpeg", + "-y", + "-i", + input_path, + "-vf", + f"vidstabtransform=smoothing=30:input={transforms_file}", + "-c:v", + "libx264", + "-preset", + "slow", + "-crf", + "23", + "-c:a", + "copy", + stabilized_path, + ] + print(f"[DEBUG] Running: {' '.join(transform_cmd)}") + transform_proc = subprocess.run(transform_cmd, capture_output=True, text=True) + + if transform_proc.returncode != 0: + print( + f"[ERROR] vidstabtransform failed for {input_path}:" + f"\n{transform_proc.stderr}" + ) + raise RuntimeError(f"FFmpeg vidstabtransform failed for {input_path}") + + if not os.path.exists(stabilized_path): + raise FileNotFoundError(f"Stabilized video not created: {stabilized_path}") + + # Cleanup + os.remove(transforms_file) + + +def preprocess_video(input_path: str, output_path: str, temp_dir: str) -> None: + """Preprocess video with stabilization, denoising, and standardization.""" + if not os.path.exists(input_path): + raise ValueError(f"Input video not found: {input_path}") + + stabilized_tmp = os.path.join(temp_dir, f"stabilized_temp_{os.getpid()}.mp4") + + try: + stabilize_video(input_path, stabilized_tmp, temp_dir) + + # Verify stabilization succeeded + if not os.path.exists(stabilized_tmp): + raise ValueError( + "Video stabilization failed - no intermediate file created" + ) + + width, height = TARGET_RESOLUTION.split("x") + vf_filters = ( + "yadif," + "hqdn3d," + "eq=contrast=1.0:brightness=0.0:saturation=1.0," + f"scale=-2:{height}," + "pad=ceil(iw/2)*2:ceil(ih/2)*2," + f"fps={TARGET_FRAMERATE}" + ) + + cmd = [ + "ffmpeg", + "-y", + "-i", + stabilized_tmp, + "-vf", + vf_filters, + "-c:v", + "libx264", + "-crf", + "23", + "-preset", + "fast", + "-c:a", + "aac", + "-b:a", + "128k", + "-movflags", + "+faststart", + output_path, + ] + + # Capture and check stderr + result = subprocess.run( + cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True + ) + if result.returncode != 0: + raise ValueError(f"Video processing failed: {result.stderr}") + + # Verify output file was created and has content + if not os.path.exists(output_path): + raise ValueError(f"Video processing failed - no output file: {output_path}") + if os.path.getsize(output_path) == 0: + raise ValueError( + f"Video processing failed - empty output file: {output_path}" + ) + + finally: + # Clean up temp file + if os.path.exists(stabilized_tmp): + os.remove(stabilized_tmp) + + +def extract_audio(input_path: str, output_audio_path: str) -> None: + """Extract audio from video file.""" + if not os.path.exists(input_path): + raise ValueError(f"Input video not found: {input_path}") + + cmd = [ + "ffmpeg", + "-y", + "-i", + input_path, + "-vn", + "-acodec", + "pcm_s16le", + "-ar", + "16000", + "-ac", + "1", + output_audio_path, + ] + + # Check return code and stderr + result = subprocess.run( + cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True + ) + if result.returncode != 0: + raise ValueError(f"Audio extraction failed: {result.stderr}") + + # Verify output file was created + if not os.path.exists(output_audio_path): + raise ValueError( + f"Audio extraction failed - no output file: {output_audio_path}" + ) + + +def safe_float_conversion( + value: float | int | str | None, default: str = "n/a" +) -> float | str: + """Convert value to float, return default if conversion fails.""" + if value is None or pd.isna(value): + return default + + # Convert to string and check for common non-numeric indicators + str_val = str(value).strip().lower() + if str_val in ["", "n/a", "na", "nan", "none", "null"]: + return default + + try: + return float(value) + except (ValueError, TypeError): + return default + + +# BIDS file creation functions +def create_events_file( + group_df: pd.DataFrame, output_path: str, full_filepath: str +) -> None: + """Create events.tsv file from Excel data with all columns.""" + events_data = [] + + for idx, row in group_df.iterrows(): + event = { + "onset": 0.0, + "duration": parse_duration(row.get("Vid_duration", "00:00:00")), + "coder": str(row.get("Coder", "n/a")), + "filepath_engaging": str(full_filepath), + "source_file": str(row.get("SourceFile", "n/a")), + "context": str(row.get("Context", "n/a")), + "location": str(row.get("Location", "n/a")), + "activity": str(row.get("Activity", "n/a")), + "child_clear": str(row.get("Child_of_interest_clear", "n/a")), + "num_adults": str(row.get("#_adults", "n/a")), + "num_children": str(row.get("#_children", "n/a")), + "num_people_background": str(row.get("#_people_background", "n/a")), + "interaction_with_child": str(row.get("Interaction_with_child", "n/a")), + "num_people_interacting": str(row.get("#_people_interacting", "n/a")), + "child_constrained": str(row.get("Child_constrained", "n/a")), + "constraint_type": str(row.get("Constraint_type", "n/a")), + "supports": str(row.get("Supports", "n/a")), + "support_type": str(row.get("Support_type", "n/a")), + "example_support_type": str(row.get("Example_support_type", "n/a")), + "gestures": str(row.get("Gestures", "n/a")), + "gesture_type": str(row.get("Gesture_type", "n/a")), + "vocalizations": str(row.get("Vocalizations", "n/a")), + "rmm": str(row.get("RMM", "n/a")), + "rmm_type": str(row.get("RMM_type", "n/a")), + "response_to_name": str(row.get("Response_to_name", "n/a")), + "locomotion": str(row.get("Locomotion", "n/a")), + "locomotion_type": str(row.get("Locomotion_type", "n/a")), + "grasping": str(row.get("Grasping", "n/a")), + "grasp_type": str(row.get("Grasp_type", "n/a")), + "body_parts_visible": str(row.get("Body_Parts_Visible", "n/a")), + "angle_of_body": str(row.get("Angle_of_Body", "n/a")), + "timepoint": str(row.get("time_point", "n/a")), + "dob": str(row.get("DOB", "n/a")), + "vid_date": str(row.get("Vid_date", "n/a")), + "video_quality_face": safe_float_conversion( + row.get("Video_Quality_Child_Face_Visibility") + ), + "video_quality_body": safe_float_conversion( + row.get("Video_Quality_Child_Body_Visibility") + ), + "video_quality_hand": safe_float_conversion( + row.get("Video_Quality_Child_Hand_Visibility") + ), + "video_quality_lighting": safe_float_conversion( + row.get("Video_Quality_Lighting") + ), + "video_quality_resolution": safe_float_conversion( + row.get("Video_Quality_Resolution") + ), + "video_quality_motion": safe_float_conversion( + row.get("Video_Quality_Motion") + ), + "notes": str(row.get("Notes", "n/a")), + } + events_data.append(event) + + events_df = pd.DataFrame(events_data) + events_df.to_csv(output_path, sep="\t", index=False, na_rep="n/a") + + +def create_video_metadata_json( + metadata: Dict[str, Any], + processing_info: Dict[str, Any], + task_info: Dict[str, Any], + output_path: str, +) -> None: + """Create JSON metadata file for processed video with dynamic task info.""" + video_json = { + "TaskName": task_info.get("task_name", "unknown"), + "TaskDescription": task_info.get( + "task_description", "Video recorded during behavioral session" + ), + "Instructions": task_info.get( + "instructions", "Natural behavior in home environment" + ), + "Context": task_info.get("context", "n/a"), + "Activity": task_info.get("activity", "n/a"), + "FrameRate": TARGET_FRAMERATE, + "Resolution": TARGET_RESOLUTION, + "ProcessingPipeline": { + "Stabilization": processing_info.get("has_stabilization", False), + "Denoising": processing_info.get("has_denoising", False), + "Equalization": processing_info.get("has_equalization", False), + "StandardizedFPS": TARGET_FRAMERATE, + "StandardizedResolution": TARGET_RESOLUTION, + }, + "OriginalMetadata": metadata, + } + save_json(video_json, output_path) + + +def create_audio_metadata_json( + duration_sec: float, task_info: Dict[str, Any], output_path: str +) -> None: + """Create JSON metadata file for extracted audio with dynamic task info.""" + audio_json = { + "SamplingFrequency": 16000, + "Channels": 1, + "SampleEncoding": "16bit", + "Duration": duration_sec, + "TaskName": task_info.get("task_name", "unknown"), + "TaskDescription": task_info.get( + "task_description", "Audio extracted from behavioral session" + ), + "Context": task_info.get("context", "n/a"), + "Activity": task_info.get("activity", "n/a"), + } + save_json(audio_json, output_path) + + +def create_raw_video_json( + row: pd.Series, task_info: Dict[str, Any], video_path: str, output_path: str +) -> None: + """Create JSON metadata for raw video.""" + video_props = get_video_properties(video_path) + + video_json = { + "TaskName": task_info.get("task_name", "unknown"), + "TaskDescription": task_info.get( + "task_description", "Raw video from behavioral session" + ), + "FrameRate": video_props.get("FrameRate", "n/a"), + "Resolution": video_props.get("Resolution", "n/a"), + "OriginalFilename": str(row.get("FileName", "")), + "Duration": parse_duration(row.get("Vid_duration", "00:00:00")), + "RecordingDate": str(row.get("Vid_date", "n/a")), + "Context": task_info.get("context", "n/a"), + "Activity": task_info.get("activity", "n/a"), + "TimePoint": str(row.get("timepoint", "n/a")), + "SourceFile": str(row.get("SourceFile", "n/a")), + } + save_json(video_json, output_path) + + +def process_single_video( + video_info: Dict, + annotation_df: pd.DataFrame, + final_bids_root: str, + final_derivatives_dir: str, + temp_dir: str, +) -> Tuple[Optional[Dict[str, Any]], Optional[Dict[str, Any]]]: + """Process a single video with all BIDS structures robustly.""" + try: + # --- Validate input -------------------------------------------------- + if not video_info or not isinstance(video_info, dict): + raise ValueError("video_info is empty or invalid") + + required_keys = ["participant_id", "filename", "session_id", "full_path"] + missing = [k for k in required_keys if k not in video_info] + if missing: + raise ValueError(f"Missing required video_info keys: {missing}") + + participant_id = video_info["participant_id"] + filename = video_info["filename"] + session_id = video_info["session_id"] + input_video_path = video_info["full_path"] + + safe_print(f"Processing: {participant_id}/{filename}") + filename_without_extension = os.path.splitext(filename)[0] + + # --- Handle empty or invalid annotation_df --------------------------- + if annotation_df is None or annotation_df.empty: + safe_print("Annotation DataFrame is empty - using dummy data") + video_excel = pd.DataFrame( + [create_dummy_excel_data(input_video_path, participant_id, session_id)] + ) + has_excel_data = False + else: + # Ensure expected columns exist + expected_cols = {"ID", "FileName"} + if not expected_cols.issubset(annotation_df.columns): + safe_print( + "Annotation DataFrame missing required columns - using dummy data" + ) + video_excel = pd.DataFrame( + [ + create_dummy_excel_data( + input_video_path, participant_id, session_id + ) + ] + ) + has_excel_data = False + else: + # Normal Excel lookup + participant_excel = annotation_df[ + annotation_df["ID"].astype(str) == str(participant_id) + ] + mask = ( + participant_excel["FileName"].str.split(".").str[0] + == filename_without_extension + ) + video_excel = participant_excel[mask] + if video_excel.empty: + safe_print("No Excel data found - using dummy data") + video_excel = pd.DataFrame( + [ + create_dummy_excel_data( + input_video_path, participant_id, session_id + ) + ] + ) + has_excel_data = False + else: + has_excel_data = True + + excel_row = video_excel.iloc[0] + task_label = get_task_from_excel_row(excel_row) + activity = excel_row.get("Activity", "unknown activity") + + # --- Build task info ------------------------------------------------- + task_info = { + "task_name": task_label, + "task_description": f"Behavioral session: {activity}", + "instructions": "Natural behavior observation", + "context": str(excel_row.get("Context", "n/a")), + "activity": str(excel_row.get("Activity", "n/a")), + } + + # --- Directory setup ------------------------------------------------- + raw_subj_dir = os.path.join( + final_bids_root, f"sub-{participant_id}", f"ses-{session_id}", "beh" + ) + deriv_subj_dir = os.path.join( + final_derivatives_dir, f"sub-{participant_id}", f"ses-{session_id}", "beh" + ) + os.makedirs(raw_subj_dir, exist_ok=True) + os.makedirs(deriv_subj_dir, exist_ok=True) + + # --- File naming ----------------------------------------------------- + ext = os.path.splitext(filename)[1] + run_number = get_next_run_number( + participant_id, session_id, task_label, final_bids_root + ) + + raw_video_name = create_bids_filename( + participant_id, session_id, task_label, "beh", "mp4", run_number + ) + processed_video_name = create_bids_filename( + participant_id, + session_id, + task_label, + "desc-processed_beh", + "mp4", + run_number, + ) + audio_name = create_bids_filename( + participant_id, session_id, task_label, "audio", "wav", run_number + ) + events_name = create_bids_filename( + participant_id, session_id, task_label, "events", "tsv", run_number + ) + + # --- Paths ----------------------------------------------------------- + raw_video_path = os.path.join(raw_subj_dir, raw_video_name) + processed_video_path = os.path.join(deriv_subj_dir, processed_video_name) + audio_path = os.path.join(deriv_subj_dir, audio_name) + events_path = os.path.join(raw_subj_dir, events_name) + + # --- Raw video preparation ------------------------------------------ + if not os.path.exists(raw_video_path): + if ext.lower() != ".mp4": + cmd = [ + "ffmpeg", + "-y", + "-i", + input_video_path, + "-c", + "copy", + raw_video_path, + ] + result = subprocess.run( + cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True + ) + if result.returncode != 0 or not os.path.exists(raw_video_path): + raise ValueError(f"FFmpeg conversion failed: {result.stderr}") + safe_print(" Converted to raw BIDS format") + else: + shutil.copy2(input_video_path, raw_video_path) + if not os.path.exists(raw_video_path): + raise ValueError(f"Failed to copy to raw BIDS: {raw_video_path}") + safe_print(" Copied to raw BIDS") + + # --- Metadata extraction -------------------------------------------- + exif_data = extract_exif(raw_video_path) + if ( + not isinstance(exif_data, dict) + or "error" in exif_data + or "ffprobe_error" in exif_data + ): + raise ValueError("Unreadable or unsupported video format") + + # --- Video processing ----------------------------------------------- + if not os.path.exists(processed_video_path): + safe_print(" Starting video processing...") + preprocess_video(raw_video_path, processed_video_path, temp_dir) + if ( + not os.path.exists(processed_video_path) + or os.path.getsize(processed_video_path) == 0 + ): + raise ValueError("Video processing failed - no valid output") + safe_print(" Video processing complete") + + # --- Audio extraction ----------------------------------------------- + if not os.path.exists(audio_path): + safe_print(" Extracting audio...") + extract_audio(processed_video_path, audio_path) + if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0: + raise ValueError("Audio extraction failed - no valid output") + safe_print(" Audio extraction complete") + + # --- Events file ---------------------------------------------------- + create_events_file(video_excel, events_path, input_video_path) + if not os.path.exists(events_path): + raise ValueError(f"Failed to create events file: {events_path}") + + # --- Metadata JSONs ------------------------------------------------- + processing_info = { + "has_stabilization": True, + "has_denoising": True, + "has_equalization": True, + } + + raw_video_json_path = raw_video_path.replace(".mp4", ".json") + create_raw_video_json(excel_row, task_info, raw_video_path, raw_video_json_path) + if not os.path.exists(raw_video_json_path): + raise ValueError(f"Failed to create raw video JSON: {raw_video_json_path}") + + processed_video_json_path = processed_video_path.replace(".mp4", ".json") + create_video_metadata_json( + exif_data, processing_info, task_info, processed_video_json_path + ) + if not os.path.exists(processed_video_json_path): + raise ValueError( + f"Failed to create processed video JSON: {processed_video_json_path}" + ) + + audio_json_path = audio_path.replace(".wav", ".json") + create_audio_metadata_json( + exif_data.get("duration_sec", 0), task_info, audio_json_path + ) + if not os.path.exists(audio_json_path): + raise ValueError(f"Failed to create audio JSON: {audio_json_path}") + + # --- Success return ------------------------------------------------- + entry = { + "participant_id": participant_id, + "session_id": session_id, + "task_label": task_label, + "original_video": input_video_path, + "raw_video_bids": raw_video_path, + "processed_video_bids": processed_video_path, + "audio_file_bids": audio_path, + "events_file_bids": events_path, + "filename": filename, + "age_folder": video_info.get("age_folder", "n/a"), + "duration_sec": exif_data.get("duration_sec", 0), + "has_excel_data": has_excel_data, + "excel_metadata": excel_row.to_dict(), + "task_info": task_info, + "processing_info": processing_info, + } + + safe_print(f" Successfully processed: {participant_id}/{filename}") + return entry, None + + except Exception as e: + safe_print( + f" ERROR processing {video_info.get('full_path', 'unknown file')}:" + f" {str(e)}" + ) + return None, {"video": video_info.get("full_path", "unknown"), "error": str(e)} + + +def create_dataset_description() -> None: + """Create dataset_description.json for main BIDS dataset.""" + dataset_desc = { + "Name": "SAILS Phase III Home Videos", + "BIDSVersion": "1.9.0", + "DatasetType": "raw", + } + try: + filepath = os.path.join(FINAL_BIDS_ROOT, "dataset_description.json") + save_json(dataset_desc, filepath) + + except Exception as e: + raise ValueError( + f"Failed to create dataset_description.json at {filepath}: {e}" + ) + + +def create_derivatives_dataset_description() -> None: + """Create dataset_description.json for derivatives.""" + os.makedirs(FINAL_DERIVATIVES_DIR, exist_ok=True) + + derivatives_desc = { + "Name": "SAILS Phase III Home Videos - Preprocessed", + "BIDSVersion": "1.9.0", + "DatasetType": "derivative", + "GeneratedBy": [ + { + "Name": "Video Preprocessing Pipeline", + "Version": "1.0.0", + "Description": ( + "FFmpeg-based video stabilization, denoising, " + "and standardization pipeline with audio extraction" + ), + "CodeURL": "local", + } + ], + "SourceDatasets": [{"URL": "", "Version": "1.0.0"}], + "HowToAcknowledge": "Please cite the original study", + } + + filepath = os.path.join(FINAL_DERIVATIVES_DIR, "dataset_description.json") + save_json(derivatives_desc, filepath) + if not os.path.exists(filepath): + raise ValueError( + f"Failed to create derivatives dataset_description.json at {filepath}" + ) + + +def create_readme() -> None: + """Create README file for the BIDS dataset.""" + readme_content = """# SAILS Phase III Home Videos BIDS Dataset + +## Overview +This dataset contains home videos from the SAILS Phase III study, +organized according to the Brain Imaging Data Structure (BIDS) specification. + +## Data Collection +Videos were collected from home environments during various activities. +Two main age groups were included: +- Session 01: 12-16 month old children +- Session 02: 34-38 month old children + +## Dataset Structure +### Raw Data +- sub-*/ses-*/beh/: Raw behavioral videos (converted to mp4) and event +annotations (contains also the original filepath of the video processed) + +### Derivatives +- derivatives/preprocessed/sub-*/ses-*/beh/: Processed videos and extracted audio + - Videos: Stabilized, denoised, standardized to 720p/30fps + - Audio: Extracted to 16kHz mono WAV format + +## Data Processing +All videos underwent standardized preprocessing including: +- Video stabilization using vidstab +- Denoising and quality enhancement +- Standardization to 720p resolution and 30fps +- Audio extraction for speech analysis +- Filename modication according to subject ID and task label +- Extraction of ASD status for every subject stored in the participants.tsv file. + +## Behavioral Coding +Events files include manual annotations from csv file and Engaging +location of the raw video. + +## Task Labels +Task labels are derived from the Context column in the csv. +It allows to capture what kind of interaction was happening in the video. +Videos without behavioral coding data use "unknown" task label. +""" + + filepath = os.path.join(FINAL_BIDS_ROOT, "README") + try: + with open(filepath, "w") as f: + f.write(readme_content) + except Exception as e: + raise ValueError(f"Failed to create README at {filepath}: {e}") + + +def create_participants_file( + final_bids_root: str = FINAL_BIDS_ROOT, asd_status_file: str = ASD_STATUS_FILE +) -> None: + """Create participants.tsv and participants.json files.""" + if not os.path.exists(asd_status_file): + raise FileNotFoundError(f"ASD status file not found: {asd_status_file}") + + asd_status = pd.read_excel(asd_status_file) + ids_processed_participants = [] + for name in os.listdir(final_bids_root): + full_path = os.path.join(final_bids_root, name) + if os.path.isdir(full_path) and name.startswith("sub-"): + ids_processed_participants.append(name.split("sub-")[1]) + participants_data = [] + for participant_id in sorted(ids_processed_participants): + asd_info = asd_status[asd_status["ID"].astype(str) == str(participant_id)] + participants_data.append( + { + "participant_id": f"sub-{participant_id}", + "group": asd_info["Group"].values[0] if not asd_info.empty else "n/a", + } + ) + + participants_df = pd.DataFrame(participants_data) + participants_df.to_csv( + os.path.join(final_bids_root, "participants.tsv"), + sep="\t", + index=False, + na_rep="n/a", + ) + + participants_json = { + "participant_id": {"Description": "Unique BIDS participant identifier"}, + "Group": {"Description": "ASD status"}, + } + + save_json(participants_json, os.path.join(final_bids_root, "participants.json")) + + +def print_summary(all_processed: List[Dict], all_failed: List[Dict]) -> None: + """Print processing summary statistics.""" + print("PROCESSING SUMMARY") + + print(f"Successfully processed: {len(all_processed)} videos") + print(f"Failed to process: {len(all_failed)} videos") + print(f"Total videos attempted: {len(all_processed) + len(all_failed)}") + + if all_processed: + # Excel data availability + with_excel = sum( + 1 for entry in all_processed if entry.get("has_excel_data", False) + ) + without_excel = len(all_processed) - with_excel + print("\nData sources:") + print(f" With Excel behavioral data: {with_excel} videos") + print(f" With dummy behavioral data: {without_excel} videos") + + # Task distribution + task_counts: dict[str, int] = {} + participant_counts: dict[str, int] = {} + session_counts: dict[str, int] = {} + + for entry in all_processed: + task = entry["task_label"] + participant = entry["participant_id"] + session = entry["session_id"] + task_counts[task] = task_counts.get(task, 0) + 1 + participant_counts[participant] = participant_counts.get(participant, 0) + 1 + session_counts[session] = session_counts.get(session, 0) + 1 + + print("\nTask distribution:") + for task, count in sorted(task_counts.items()): + print(f" {task}: {count} videos") + + print("\nSession distribution:") + for session, count in sorted(session_counts.items()): + print(f" Session {session}: {count} videos") + + print(f"\nUnique participants processed: {len(participant_counts)}") + + # Duration statistics + durations = [entry.get("duration_sec", 0) for entry in all_processed] + total_duration = sum(durations) + avg_duration = total_duration / len(durations) if durations else 0 + + print("\nDuration statistics:") + print(f" Total video duration: {total_duration/3600:.1f} hours") + print(f" Average video duration: {avg_duration/60:.1f} minutes") + + if all_failed: + print("\nFailed videos breakdown:") + error_types: dict[str, int] = {} + for entry in all_failed: + error = entry.get("error", "Unknown error") + error_types[error] = error_types.get(error, 0) + 1 + + for error, count in sorted(error_types.items()): + print(f" {error}: {count} videos") + + +def merge_subjects(final_bids_root: str = FINAL_BIDS_ROOT) -> None: + """Merge duplicated subject folders safely.""" + paths_to_check = [ + Path(final_bids_root), + Path(final_bids_root) / "derivatives" / "preprocessed", + ] + + for folder in paths_to_check: + if not folder.exists(): + continue + + subs = [d for d in folder.iterdir() if d.is_dir() and d.name.startswith("sub-")] + sub_names = {d.name for d in subs} + + for sub in subs: + if sub.name.endswith(" 2"): + original_name = sub.name[:-2] + original_path = folder / original_name + if original_name in sub_names and original_path.exists(): + print(f"Merging {sub} → {original_path}") + + for item in sub.iterdir(): + dest = original_path / item.name + if item.is_dir(): + if dest.exists(): + if dest.is_file(): + print( + f"Conflict: {dest} is a file, " + "expected a folder. Skipping." + ) + continue + # merge recursively if same session already exists + for subitem in item.iterdir(): + dest_sub = dest / subitem.name + if dest_sub.exists(): + # type conflict handling + if dest_sub.is_file() != subitem.is_file(): + print( + f"Type conflict for {dest_sub}, " + "skipping." + ) + continue + if subitem.is_dir(): + shutil.copytree( + subitem, dest_sub, dirs_exist_ok=True + ) + else: + shutil.copy2(subitem, dest_sub) + else: + shutil.copytree(item, dest) + else: + if dest.exists(): + if dest.is_dir(): + print( + f"Conflict: {dest} is a directory," + " expected a file. Skipping." + ) + continue + shutil.copy2(item, dest) + + shutil.rmtree(sub) + else: + print(f"No base subject found for {sub}, skipping.") + + +def process_videos( + task_id: int, + num_tasks: int, + annotation_df: pd.DataFrame, + all_videos: list, + final_bids_root: str, + final_derivatives_dir: str, + output_dir: str, +) -> tuple[list, list]: + """Process the subset of videos assigned to this task. + + Returns: + (all_processed, all_failed) + """ + safe_print(f"Task {task_id}: Processing videos...") + video_chunks = all_videos[task_id::num_tasks] + + if not video_chunks: + safe_print(f"No videos assigned to task {task_id}") + return [], [] + + temp_dir = os.path.join(output_dir, str(task_id), "temp") + os.makedirs(temp_dir, exist_ok=True) + + all_processed, all_failed = [], [] + + for i, video_info in enumerate(video_chunks, 1): + safe_print(f"[Task {task_id}] Video {i}/{len(video_chunks)}") + processed_entry, failed_entry = process_single_video( + video_info, + annotation_df, + final_bids_root, + final_derivatives_dir, + temp_dir, + ) + if processed_entry: + all_processed.append(processed_entry) + if failed_entry: + all_failed.append(failed_entry) + + # Save per-task logs + task_dir = os.path.join(output_dir, str(task_id)) + os.makedirs(task_dir, exist_ok=True) + save_json(all_processed, os.path.join(task_dir, "processing_log.json")) + save_json(all_failed, os.path.join(task_dir, "not_processed.json")) + + # Cleanup temp dir + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) + + return all_processed, all_failed + + +def main() -> None: + """Main entry point for multi-task BIDS video processing.""" + parser = argparse.ArgumentParser( + description="Run updated_bids with task and total number of tasks." + ) + parser.add_argument("task_id", type=int, help="ID of the current task") + parser.add_argument("num_tasks", type=int, help="Total number of tasks") + + args = parser.parse_args() + my_task_id = args.task_id + num_tasks = args.num_tasks + + print(f"Running task {my_task_id}/{num_tasks}") + + start_time = time.time() + + # --- Validate paths --- + for path, label in [(VIDEO_ROOT, "Video root"), (ANNOTATION_FILE, "Excel file")]: + if not os.path.exists(path): + print(f"ERROR: {label} not found at {path}") + sys.exit(1) + return + + # --- Load metadata --- + try: + annotation_df = pd.read_csv(ANNOTATION_FILE) + annotation_df.columns = annotation_df.columns.str.strip() + safe_print(f"Loaded {len(annotation_df)} rows from Excel file") + except Exception as e: + safe_print(f"ERROR: Failed to load Excel file: {e}") + sys.exit(1) + return + + # --- Discover videos --- + safe_print("Discovering videos...") + all_videos = get_all_videos(VIDEO_ROOT, annotation_df) + if not all_videos: + safe_print("ERROR: No videos found.") + sys.exit(1) + safe_print(f"Found {len(all_videos)} video files.") + + # --- Create BIDS structure (only once) --- + if my_task_id == 0: + try: + safe_print("Creating BIDS structure files...") + create_bids_structure() + create_dataset_description() + create_derivatives_dataset_description() + create_readme() + except Exception as e: + safe_print(f"CRITICAL ERROR: Failed to create BIDS structure files: {e}") + sys.exit(1) + + # --- Process this task’s subset --- + all_processed, all_failed = process_videos( + my_task_id, + num_tasks, + annotation_df, + all_videos, + FINAL_BIDS_ROOT, + FINAL_DERIVATIVES_DIR, + OUTPUT_DIR, + ) + + # --- Final summary --- + total_time = time.time() - start_time + print_summary(all_processed, all_failed) + safe_print( + f"Total processing time: {total_time / 3600:.1f}" + f" hours ({total_time / 60:.1f} minutes)" + ) + + if all_processed: + avg_time = total_time / len(all_processed) + safe_print(f"Average time per video: {avg_time:.1f} seconds") + + safe_print("Processing complete ✅") + + +if __name__ == "__main__": + main() diff --git a/src/tests/test_BIDS_convertor.py b/src/tests/test_BIDS_convertor.py index e291e63..19b7567 100644 --- a/src/tests/test_BIDS_convertor.py +++ b/src/tests/test_BIDS_convertor.py @@ -1,13 +1,15 @@ """Tests for BIDS Video Processing Pipeline.""" import json +import math import os import sys -from datetime import datetime +from pathlib import Path from types import ModuleType from typing import Generator from unittest.mock import MagicMock, mock_open, patch +import numpy as np import pandas as pd import pytest import yaml @@ -18,56 +20,247 @@ def setup_mock_config() -> Generator[None, None, None]: """Create a temporary config.yaml file for testing.""" mock_config = { - 'video_root': '/mock/videos', - 'asd_csv': 'mock_asd.csv', - 'nonasd_csv': 'mock_nonasd.csv', - 'output_dir': '/mock/output', - 'target_resolution': '1280x720', - 'target_fps': 30 + "video_root": "/mock/videos", + "asd_csv": "mock_asd.csv", + "nonasd_csv": "mock_nonasd.csv", + "output_dir": "/mock/output", + "target_resolution": "1280x720", + "target_fps": 30, } # Create temporary config file - with open('config.yaml', 'w') as f: + with open("config.yaml", "w") as f: yaml.dump(mock_config, f) yield # Cleanup - if os.path.exists('config.yaml'): - os.remove('config.yaml') + if os.path.exists("config.yaml"): + os.remove("config.yaml") + # Import the module after config is created @pytest.fixture(scope="session") def bvp_module(setup_mock_config: Generator[None, None, None]) -> ModuleType: """Import the BIDS converter module.""" - sys.path.insert(0, 'src') - import BIDS_convertor as bvp + sys.path.insert(0, "src") + import sailsprep.BIDS_convertor as bvp + return bvp + class TestConfiguration: """Test configuration loading and validation.""" def test_load_configuration_success(self, bvp_module: ModuleType) -> None: """Test successful configuration loading.""" mock_config = { - 'video_root': '/path/to/videos', - 'asd_csv': 'asd.csv', - 'nonasd_csv': 'nonasd.csv', - 'output_dir': '/output', - 'target_resolution': '1280x720', - 'target_fps': 30 + "video_root": "/path/to/videos", + "annotation_file": "blablabla.csv", + "asd_status": "nonasd.xlsx", + "output_dir": "/output", + "target_resolution": "1280x720", + "target_framerate": 30, } - with patch('builtins.open', mock_open(read_data=yaml.dump(mock_config))): - with patch('yaml.safe_load', return_value=mock_config): - config = bvp_module.load_configuration('config.yaml') + with patch("builtins.open", mock_open(read_data=yaml.dump(mock_config))): + with patch("yaml.safe_load", return_value=mock_config): + config = bvp_module.load_configuration("config.yaml") assert config == mock_config def test_load_configuration_file_not_found(self, bvp_module: ModuleType) -> None: """Test configuration loading with missing file.""" - with patch('builtins.open', side_effect=FileNotFoundError()): + with patch("builtins.open", side_effect=FileNotFoundError()): with pytest.raises(FileNotFoundError): - bvp_module.load_configuration('nonexistent.yaml') + bvp_module.load_configuration("nonexistent.yaml") + + def test_load_configuration_invalid_yaml(self, bvp_module: ModuleType) -> None: + """Test configuration loading with invalid YAML.""" + with patch("builtins.open", mock_open(read_data="invalid: yaml: : format")): + with pytest.raises(yaml.YAMLError): + bvp_module.load_configuration("config.yaml") + + def test_load_configuration_missing_required_fields( + self, bvp_module: ModuleType + ) -> None: + """Test configuration loading with missing required fields.""" + incomplete_config = { + "video_root": "/path/to/videos", + # Missing other required fields + } + with patch("builtins.open", mock_open(read_data=yaml.dump(incomplete_config))): + with pytest.raises(KeyError): + bvp_module.load_configuration("config.yaml") + + +class TestInfoExtractorforBIDS: + """Test info extraction and missing excel handling for BIDS.""" + + def test_create_dummy_excel_data_returns_expected_dict( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Test dummy excel data creation returns expected dict.""" + # Arrange + video_path = tmp_path / "sub-001_video.mp4" + video_path.write_text("dummy") # just to create a filename + participant_id = "001" + session_id = "01" + + # Act + data = bvp_module.create_dummy_excel_data( + str(video_path), participant_id, session_id, "rest" + ) + + # Assert + assert data["ID"] == "001" + assert data["FileName"] == os.path.basename(video_path) + assert data["Context"] == "rest" + assert data["Notes"].startswith("Video not found") + assert "Vid_duration" in data + # All fields should have default "n/a" except the few explicitly set + assert all( + v == "n/a" or k in ["ID", "FileName", "Context", "Vid_duration", "Notes"] + for k, v in data.items() + if k not in ["ID", "FileName", "Context", "Vid_duration", "Notes"] + ) + + def test_find_age_folder_session_direct_match( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Test direct match for age folder session.""" + participant_path = tmp_path / "sub-001" + participant_path.mkdir() + current_path = participant_path / "12-16_months" + current_path.mkdir() + + with patch( + "sailsprep.BIDS_convertor.determine_session_from_folder", return_value="01" + ): + session = bvp_module.find_age_folder_session( + str(current_path), str(participant_path) + ) + assert session == "01" + + def test_find_age_folder_session_outside_participant_path( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Test age folder session outside participant path.""" + participant_path = tmp_path / "sub-001" + other_path = tmp_path / "other" / "12-16_months" + other_path.mkdir(parents=True) + + with patch( + "sailsprep.BIDS_convertor.determine_session_from_folder", return_value="01" + ): + session = bvp_module.find_age_folder_session( + str(other_path), str(participant_path) + ) + assert session is None + + def test_get_task_from_excel_row_valid_context( + self, bvp_module: ModuleType + ) -> None: + """Test get task from excel row with valid context.""" + row = pd.Series({"Context": "Play-time"}) + result = bvp_module.get_task_from_excel_row(row) + assert result == "Playtime" # cleaned via make_bids_task_label + + def get_task_from_excel_row(self, row: pd.Series, bvp_module: ModuleType) -> None: + """Test get task from excel row with unknown context.""" + context = str(row.get("Context", "Other ")).strip() + result = bvp_module.make_bids_task_label(context) + assert result == "unknown" + + def test_extract_participant_id_from_folder_with_ames_prefix( + self, bvp_module: ModuleType + ) -> None: + """Test extract participant ID from folder with AMES prefix.""" + assert ( + bvp_module.extract_participant_id_from_folder("SOMETHING_AMES_123") == "123" + ) + + def test_extract_participant_id_edge_cases(self, bvp_module: ModuleType) -> None: + """Test extract participant ID edge cases.""" + assert ( + bvp_module.extract_participant_id_from_folder("ABC_AMES_456_extra_AMES") + == "456_extra_AMES" + ) + assert ( + bvp_module.extract_participant_id_from_folder("participant123") + == "participant123" + ) + assert ( + bvp_module.extract_participant_id_from_folder("AA_participant_123") == "123" + ) + + def test_determine_session_from_excel_timepoint_14( + self, bvp_module: ModuleType + ) -> None: + """Test determine session from excel with timepoint 14.""" + df = pd.DataFrame( + [{"ID": "001", "FileName": "video1.mp4", "timepoint": "14_month", "Age": 1}] + ) + session = bvp_module.determine_session_from_excel( + "/some/path/video1.mp4", df, "001" + ) + assert session == "01" + + def test_determine_session_from_excel_timepoint_36( + self, bvp_module: ModuleType + ) -> None: + """Test determine session from excel with timepoint 36.""" + df = pd.DataFrame( + [{"ID": "002", "FileName": "vid2.mov", "timepoint": "36months", "Age": 3}] + ) + session = bvp_module.determine_session_from_excel( + "/some/path/vid2.mov", df, "002" + ) + assert session == "02" + + def test_determine_session_from_excel_age_based( + self, bvp_module: ModuleType + ) -> None: + """Test determine session from excel.""" + df = pd.DataFrame( + [ + {"ID": "003", "FileName": "a.mp4", "timepoint": "unknown", "Age": 1.5}, + {"ID": "004", "FileName": "b.mp4", "timepoint": pd.NA, "Age": 3}, + ] + ) + s1 = bvp_module.determine_session_from_excel("/p/a.mp4", df, "003") + s2 = bvp_module.determine_session_from_excel("/p/b.mp4", df, "004") + assert s1 == "01" + assert s2 == "02" + + def test_determine_session_from_excel_participant_not_found( + self, bvp_module: ModuleType + ) -> None: + """Test determine session from excel with error in participant ID.""" + df = pd.DataFrame( + [{"ID": "999", "FileName": "x.mp4", "timepoint": "14", "Age": 1}] + ) + with pytest.raises(ValueError): + bvp_module.determine_session_from_excel("/p/y.mp4", df, "001") + + def test_determine_session_from_excel_file_not_found( + self, bvp_module: ModuleType + ) -> None: + """Test determine session from excel with missing excel.""" + df = pd.DataFrame( + [{"ID": "010", "FileName": "other.mp4", "timepoint": "14", "Age": 1}] + ) + with pytest.raises(ValueError): + bvp_module.determine_session_from_excel("/p/missing.mp4", df, "010") + + def test_determine_session_from_excel_unable_to_determine( + self, bvp_module: ModuleType + ) -> None: + """Test determine session timepoint does not match and age is NaN.""" + df = pd.DataFrame( + [{"ID": "030", "FileName": "u.mp4", "timepoint": "unk", "Age": pd.NA}] + ) + with pytest.raises(ValueError): + bvp_module.determine_session_from_excel("/p/u.mp4", df, "030") class TestBIDSStructure: @@ -75,7 +268,7 @@ class TestBIDSStructure: def test_create_bids_structure(self, bvp_module: ModuleType) -> None: """Test BIDS directory structure creation.""" - with patch('os.makedirs') as mock_makedirs: + with patch("os.makedirs") as mock_makedirs: bvp_module.create_bids_structure() # Check that directories are created with exist_ok=True assert mock_makedirs.call_count == 2 @@ -83,22 +276,22 @@ def test_create_bids_structure(self, bvp_module: ModuleType) -> None: def test_create_dataset_description(self, bvp_module: ModuleType) -> None: """Test dataset description file creation.""" mock_file = mock_open() - with patch('builtins.open', mock_file): - with patch('json.dump') as mock_json_dump: + with patch("builtins.open", mock_file): + with patch("json.dump") as mock_json_dump: bvp_module.create_dataset_description() mock_file.assert_called_once() mock_json_dump.assert_called_once() # Check that the dataset description contains required fields args, kwargs = mock_json_dump.call_args dataset_desc = args[0] - assert 'Name' in dataset_desc - assert 'BIDSVersion' in dataset_desc - assert 'DatasetType' in dataset_desc + assert "Name" in dataset_desc + assert "BIDSVersion" in dataset_desc + assert "DatasetType" in dataset_desc def test_create_readme(self, bvp_module: ModuleType) -> None: """Test README file creation.""" mock_file = mock_open() - with patch('builtins.open', mock_file): + with patch("builtins.open", mock_file): bvp_module.create_readme() mock_file.assert_called_once() # Check that content was written @@ -111,50 +304,110 @@ class TestBIDSNaming: def test_create_bids_filename(self, bvp_module: ModuleType) -> None: """Test BIDS filename creation.""" - filename = bvp_module.create_bids_filename(123, '01', 'beh', 'mp4') - expected = 'sub-123_ses-01_task-play_beh.mp4' + filename = bvp_module.create_bids_filename( + "123", "01", "mealtime", "beh", "mp4" + ) + expected = "sub-123_ses-01_task-mealtime_run-01_beh.mp4" assert filename == expected + def test_get_next_run_number_no_dir( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Test get_next_run_numberwhen no subject/session directory exists.""" + root = tmp_path + result = bvp_module.get_next_run_number("001", "01", "rest", str(root)) + assert result == 1 + + def test_get_next_run_number_empty_dir( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Test get_next_run_number when runs already exist.""" + beh_dir = tmp_path / "sub-001" / "ses-01" / "beh" + beh_dir.mkdir(parents=True) + result = bvp_module.get_next_run_number("001", "01", "rest", str(tmp_path)) + assert result == 1 + + def test_get_next_run_number_with_existing_runs( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Test get_next_run_number w existing runs.""" + beh_dir = tmp_path / "sub-001" / "ses-01" / "beh" + beh_dir.mkdir(parents=True) + # Simulate existing files + (beh_dir / "sub-001_ses-01_task-rest_run-1_beh.tsv").touch() + (beh_dir / "sub-001_ses-01_task-rest_run-2_beh.tsv").touch() + result = bvp_module.get_next_run_number("001", "01", "rest", str(tmp_path)) + assert result == 3 + + def test_get_next_run_number_with_invalid_and_no_run( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Test get_next_run_number skips invalid filenames.""" + beh_dir = tmp_path / "sub-001" / "ses-01" / "beh" + beh_dir.mkdir(parents=True) + # One invalid, one missing run number + (beh_dir / "sub-001_ses-01_task-rest_run-abc_beh.tsv").touch() + (beh_dir / "sub-001_ses-01_task-rest_beh.tsv").touch() + result = bvp_module.get_next_run_number("001", "01", "rest", str(tmp_path)) + assert result == 2 # treated as next after run-1 + + def test_make_bids_task_label_sanitizes_name(self, bvp_module: ModuleType) -> None: + """Test make_bids_task_label correctly sanitizes and normalizes task names.""" + assert bvp_module.make_bids_task_label(" Task Rest ") == "TaskRest" + assert bvp_module.make_bids_task_label("run-01+") == "run01+" + assert bvp_module.make_bids_task_label("We!rd#Name$") == "WerdName" + assert bvp_module.make_bids_task_label("") == "" + assert bvp_module.make_bids_task_label(None) == "None" + def test_get_session_from_path_12_16_months(self, bvp_module: ModuleType) -> None: """Test session determination for 12-16 month videos.""" - path = '/data/videos/12-16 month/participant_video.mp4' - session = bvp_module.get_session_from_path(path) - assert session == '01' + path = "12-16 month" + session = bvp_module.determine_session_from_folder(path) + assert session == "01" def test_get_session_from_path_34_38_months(self, bvp_module: ModuleType) -> None: """Test session determination for 34-38 month videos.""" - path = '/data/videos/34-38 month/participant_video.mp4' - session = bvp_module.get_session_from_path(path) - assert session == '02' + path = "34-38 month" + session = bvp_module.determine_session_from_folder(path) + assert session == "02" -class TestDemographicsHandling: - """Test demographics data processing.""" - - def test_read_demographics(self, bvp_module: ModuleType) -> None: - """Test demographics CSV reading and combining.""" - asd_data = pd.DataFrame({ - 'dependent_temporary_id': ['A001', 'A002'], - 'dependent_dob': ['2022-01-01', '2022-02-01'], - 'sex': ['M', 'F'], - 'diagnosis': ['ASD', 'ASD'] - }) +class TestVideoMetadataExtraction: + """Test video metadata extraction and processing.""" - nonasd_data = pd.DataFrame({ - 'dependent_temporary_id': ['N001', 'N002'], - 'dependent_dob': ['2022-03-01', '2022-04-01'], - 'sex': ['F', 'M'], - 'diagnosis': ['TD', 'TD'] - }) + def test_parse_duration_various_formats(self, bvp_module: ModuleType) -> None: + """Test for various duration formats.""" + # Normal HH:MM:SS + assert math.isclose(bvp_module.parse_duration("01:02:03"), 3723.0) + # MM:SS format + assert math.isclose(bvp_module.parse_duration("05:30"), 330.0) + # Plain number string + assert math.isclose(bvp_module.parse_duration("12.5"), 12.5) + # Empty or NaN → 0.0 + assert bvp_module.parse_duration("") == 0.0 + assert bvp_module.parse_duration(np.nan) == 0.0 + # Invalid types → handled gracefully + assert bvp_module.parse_duration(None) == 0.0 + assert bvp_module.parse_duration("abc") == 0.0 + + def test_extract_exif_empty_file(self, bvp_module: ModuleType) -> None: + """Test video metadata extraction with empty file.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = "{}" # Empty JSON response - with patch('pandas.read_csv', side_effect=[asd_data, nonasd_data]): - df = bvp_module.read_demographics('asd.csv', 'nonasd.csv') - assert len(df) == 4 - assert 'dependent_temporary_id' in df.columns + result = bvp_module.extract_exif("empty.mp4") + assert result.get("duration_sec") == 0 + assert result.get("format") is None + def test_extract_exif_corrupted_json(self, bvp_module: ModuleType) -> None: + """Test video metadata extraction with corrupted JSON output.""" + with patch("subprocess.run") as mock_run: + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = "corrupted json" -class TestVideoMetadataExtraction: - """Test video metadata extraction and processing.""" + result = bvp_module.extract_exif("corrupt.mp4") + assert "error" in result def test_extract_exif_success(self, bvp_module: ModuleType) -> None: """Test successful video metadata extraction.""" @@ -165,143 +418,185 @@ def test_extract_exif_success(self, bvp_module: ModuleType) -> None: "duration": "120.5", "bit_rate": "1000000", "size": "15000000", - "tags": {"creation_time": "2023-01-01T12:00:00.000000Z"} + "tags": {"creation_time": "2023-01-01T12:00:00.000000Z"}, }, - "streams": [ - { - "tags": {"creation_time": "2023-01-01T12:00:00.000000Z"} - } - ] + "streams": [{"tags": {"creation_time": "2023-01-01T12:00:00.000000Z"}}], } - with patch('subprocess.run') as mock_run: + with patch("subprocess.run") as mock_run: mock_run.return_value.returncode = 0 mock_run.return_value.stdout = json.dumps(mock_metadata) - result = bvp_module.extract_exif('test.mp4') - assert 'duration_sec' in result - assert result['duration_sec'] == 120.5 - assert result['format'] == "QuickTime / MOV" + result = bvp_module.extract_exif("test.mp4") + assert "duration_sec" in result + assert result["duration_sec"] == 120.5 + assert result["format"] == "QuickTime / MOV" def test_extract_exif_ffprobe_error(self, bvp_module: ModuleType) -> None: """Test video metadata extraction with ffprobe error.""" - with patch('subprocess.run') as mock_run: + with patch("subprocess.run") as mock_run: mock_run.return_value.returncode = 1 mock_run.return_value.stderr = "Error message" - result = bvp_module.extract_exif('test.mp4') - assert 'ffprobe_error' in result - assert result['ffprobe_error'] == "Error message" - - -class TestDateExtraction: - """Test date extraction from filenames.""" - - def test_extract_date_from_filename_standard_format( - self, bvp_module: ModuleType - ) -> None: - """Test date extraction from standard format.""" - # Test a format that should work based on the actual implementation - filename = "2023-12-25.mp4" # Remove 'video_' prefix - result = bvp_module.extract_date_from_filename(filename) - assert result == "2023:12:25 00:00:00" - - def test_extract_date_from_filename_mmddyyyy_format( - self, bvp_module: ModuleType - ) -> None: - """Test date extraction from MM-DD-YYYY format.""" - filename = "12-25-2023.mp4" - result = bvp_module.extract_date_from_filename(filename) - assert result == "2023:12:25 00:00:00" - - def test_extract_date_from_filename_yyyymmdd_format( - self, bvp_module: ModuleType - ) -> None: - """Test date extraction from YYYYMMDD format.""" - filename = "20231225.mp4" - result = bvp_module.extract_date_from_filename(filename) - assert result == "2023:12:25 00:00:00" - - def test_extract_date_from_filename_invalid(self, bvp_module: ModuleType) -> None: - """Test date extraction from invalid filename.""" - filename = "invalid_filename.mp4" - result = bvp_module.extract_date_from_filename(filename) - assert result is None - - def test_calculate_age(self, bvp_module: ModuleType) -> None: - """Test age calculation in months.""" - dob_str = "2022-01-15" - video_date = datetime(2023, 1, 15) - age = bvp_module.calculate_age(dob_str, video_date) - assert age == 12.0 + result = bvp_module.extract_exif("test.mp4") + assert "ffprobe_error" in result + assert result["ffprobe_error"] == "Error message" class TestVideoProcessing: """Test video processing functions.""" - @patch('subprocess.run') - @patch('os.remove') - @patch('os.path.exists') + @patch("subprocess.run") + @patch("os.remove") + @patch("os.path.exists") + @patch("os.makedirs") def test_stabilize_video( self, + mock_makedirs: MagicMock, mock_exists: MagicMock, mock_remove: MagicMock, mock_run: MagicMock, - bvp_module: ModuleType + bvp_module: ModuleType, ) -> None: """Test video stabilization.""" mock_exists.return_value = True - bvp_module.stabilize_video('input.mp4', 'output.mp4') + mock_run.return_value.returncode = 0 # success + mock_run.return_value.stderr = "" + bvp_module.stabilize_video("input.mp4", "output.mp4", "output/TEMP/task-01") # Should call subprocess.run twice (detect and transform) assert mock_run.call_count == 2 - mock_remove.assert_called_once_with("transforms.trf") + mock_remove.assert_called_once_with( + os.path.join("output/TEMP/task-01", "transforms.trf") + ) + + def test_stabilize_video_input_missing(self, bvp_module: ModuleType) -> None: + """Test video stabilization with missing input file.""" + with patch("os.path.exists", return_value=False): + with pytest.raises(FileNotFoundError): + bvp_module.stabilize_video("nonexistent.mp4", "output.mp4", "temp") + + @patch("subprocess.run") + @patch("os.path.exists") + def test_stabilize_video_vidstab_error( + self, + mock_exists: MagicMock, + mock_run: MagicMock, + bvp_module: ModuleType, + ) -> None: + """Test video stabilization with vidstab error.""" + mock_exists.return_value = True + mock_run.return_value.returncode = 1 + mock_run.return_value.stderr = "Error in vidstab" + + with pytest.raises(RuntimeError): + bvp_module.stabilize_video("input.mp4", "output.mp4", "temp") + + def test_get_video_properties_success( + self, monkeypatch: pytest.MonkeyPatch, bvp_module: ModuleType + ) -> None: + """Test video properties extraction success.""" + mock_cap = MagicMock() + mock_cap.isOpened.return_value = True + mock_cap.get.side_effect = [30.0, 1280.0, 720.0] + monkeypatch.setattr("cv2.VideoCapture", lambda _: mock_cap) + + props = bvp_module.get_video_properties("video.mp4") + assert props["FrameRate"] == 30.0 + assert props["Resolution"] == "1280x720" + + def test_get_video_properties_unopened( + self, monkeypatch: pytest.MonkeyPatch, bvp_module: ModuleType + ) -> None: + """Test video properties extraction with unopened video.""" + mock_cap = MagicMock() + mock_cap.isOpened.return_value = False + monkeypatch.setattr("cv2.VideoCapture", lambda _: mock_cap) + + props = bvp_module.get_video_properties("missing.mp4") + assert props == {"FrameRate": None, "Resolution": None} - @patch('subprocess.run') + def test_get_video_properties_exception( + self, monkeypatch: pytest.MonkeyPatch, bvp_module: ModuleType + ) -> None: + """Test video properties extraction with OpenCV exception.""" + + def broken_videocap() -> None: + raise RuntimeError("OpenCV error") + + monkeypatch.setattr("cv2.VideoCapture", broken_videocap) + + props = bvp_module.get_video_properties("corrupt.mp4") + assert props == {"FrameRate": None, "Resolution": None} + + @patch("subprocess.run") + @patch("os.path.exists") def test_extract_audio( - self, mock_run: MagicMock, bvp_module: ModuleType + self, mock_exists: MagicMock, mock_run: MagicMock, bvp_module: ModuleType ) -> None: """Test audio extraction from video.""" - bvp_module.extract_audio('input.mp4', 'output.wav') + # Pretend both input and output exist + mock_exists.return_value = True + mock_run.return_value.returncode = 0 # Simulate success + mock_run.return_value.stderr = "" + + bvp_module.extract_audio("input.mp4", "output.wav") + mock_run.assert_called_once() # Check that the command includes correct audio parameters args = mock_run.call_args[0][0] - assert '-ar' in args - assert '16000' in args - assert '-ac' in args - assert '1' in args + assert "-ar" in args + assert "16000" in args + assert "-ac" in args + assert "1" in args class TestMetadataFileCreation: """Test creation of BIDS metadata files.""" - def test_create_events_tsv(self, bvp_module: ModuleType) -> None: + def test_create_events_file(self, bvp_module: ModuleType) -> None: """Test events TSV file creation.""" - video_metadata = {'duration_sec': 120.5} + video_metadata = pd.DataFrame( + [ + {"duration": 120.5, "filename": "video1.mp4"}, + {"duration": 43.5, "filename": "video2.mp4"}, + ] + ) - with patch('pandas.DataFrame.to_csv') as mock_to_csv: - bvp_module.create_events_tsv(video_metadata, 'output.tsv') + with patch("pandas.DataFrame.to_csv") as mock_to_csv: + bvp_module.create_events_file( + video_metadata, "output.tsv", "filepath/on/Engaging.mp4" + ) mock_to_csv.assert_called_once() def test_create_video_metadata_json(self, bvp_module: ModuleType) -> None: """Test video metadata JSON creation.""" - metadata = {'duration_sec': 120.5, 'format': 'MP4'} - processing_info = {'has_stabilization': True} - - with patch('builtins.open', mock_open()): - with patch('json.dump') as mock_json_dump: + metadata = {"duration_sec": 120.5, "format": "MP4"} + processing_info = {"has_stabilization": True} + task_info = { + "task_name": "unknown", + "task_description": "Behavioral session:", + "instructions": "Natural behavior observation", + "context": "mealtime", + "activity": "eating", + } + with patch("builtins.open", mock_open()): + with patch("json.dump") as mock_json_dump: bvp_module.create_video_metadata_json( - metadata, processing_info, 'output.json' + metadata, + processing_info, + task_info, + "output.json", ) mock_json_dump.assert_called_once() # Check JSON content structure args = mock_json_dump.call_args[0] json_content = args[0] - assert 'TaskName' in json_content - assert 'ProcessingPipeline' in json_content - assert 'OriginalMetadata' in json_content + assert "TaskName" in json_content + assert "ProcessingPipeline" in json_content + assert "OriginalMetadata" in json_content class TestUtilityFunctions: @@ -309,30 +604,29 @@ class TestUtilityFunctions: def test_save_json(self, bvp_module: ModuleType) -> None: """Test JSON file saving utility.""" - test_data = {'test': 'data', 'number': 123} + test_data = {"test": "data", "number": 123} mock_file = mock_open() - with patch('builtins.open', mock_file): - with patch('json.dump') as mock_json_dump: - bvp_module.save_json(test_data, 'output.json') + with patch("builtins.open", mock_file): + with patch("json.dump") as mock_json_dump: + bvp_module.save_json(test_data, "output.json") # Check that json.dump was called with the test data and the file handle mock_json_dump.assert_called_once() args, kwargs = mock_json_dump.call_args assert args[0] == test_data - assert kwargs.get('indent') == 4 + assert kwargs.get("indent") == 4 class TestMainWorkflow: """Test the main processing workflow.""" - @patch('BIDS_convertor.create_participants_files') - @patch('BIDS_convertor.process_videos') - @patch('BIDS_convertor.read_demographics') - @patch('BIDS_convertor.create_readme') - @patch('BIDS_convertor.create_derivatives_dataset_description') - @patch('BIDS_convertor.create_dataset_description') - @patch('BIDS_convertor.create_bids_structure') - @patch('BIDS_convertor.save_json') + @patch("sailsprep.BIDS_convertor.get_all_videos") + @patch("sailsprep.BIDS_convertor.process_videos") + @patch("sailsprep.BIDS_convertor.create_readme") + @patch("sailsprep.BIDS_convertor.create_derivatives_dataset_description") + @patch("sailsprep.BIDS_convertor.create_dataset_description") + @patch("sailsprep.BIDS_convertor.create_bids_structure") + @patch("sailsprep.BIDS_convertor.save_json") def test_main_workflow( self, mock_save_json: MagicMock, @@ -340,53 +634,352 @@ def test_main_workflow( mock_create_dataset: MagicMock, mock_create_derivatives: MagicMock, mock_create_readme: MagicMock, - mock_read_demographics: MagicMock, mock_process_videos: MagicMock, - mock_create_participants: MagicMock, - bvp_module: ModuleType + mock_get_all_videos: MagicMock, + bvp_module: ModuleType, ) -> None: """Test the main processing workflow.""" # Setup mocks - mock_demographics = pd.DataFrame({'id': [1, 2]}) - mock_read_demographics.return_value = mock_demographics - mock_process_videos.return_value = ([{'test': 'data'}], ['error1']) + mock_get_all_videos.return_value = (["dummy_video_1.mp4"], []) - # Run main function - bvp_module.main() + mock_process_videos.return_value = ( + [ + { + "task_label": "task-rest", + "participant_id": "sub-001", + "session_id": "ses-01", + } + ], + [{"error": None}], + ) + with ( + patch("sailsprep.BIDS_convertor.os.path.exists", return_value=True), + patch( + "sailsprep.BIDS_convertor.pd.read_csv", + return_value=pd.DataFrame( + {"Context": ["playing", "unknown"], "ID": ["AZE", "RET"]} + ), + ), + patch.object(sys, "argv", ["BIDS_convertor.py", "0", "4"]), + patch("sys.exit") as mock_exit, + ): + bvp_module.main() + mock_exit.assert_not_called() # Verify all steps were called mock_create_structure.assert_called_once() mock_create_dataset.assert_called_once() mock_create_derivatives.assert_called_once() mock_create_readme.assert_called_once() - mock_read_demographics.assert_called_once() mock_process_videos.assert_called_once() - mock_create_participants.assert_called_once() - assert mock_save_json.call_count == 2 + + +class TestExtendedFunctions: + """Additional unit tests for deeper functions and edge cases.""" + + def test_find_session_id_uses_folder_first(self, bvp_module: ModuleType) -> None: + """Should use folder-based session detection first.""" + mock_df = pd.DataFrame() # not used + + with ( + patch( + "sailsprep.BIDS_convertor.determine_session_from_folder", + return_value="01", + ) as mock_folder, + patch( + "sailsprep.BIDS_convertor.determine_session_from_excel" + ) as mock_excel, + ): + session = bvp_module.find_session_id( + directory="/data/participant/session01", + current_path="/data/participant/session01/video.mp4", + participant_path="/data/participant", + annotation_df=mock_df, + participant_id="001", + ) + + assert session == "01" + mock_folder.assert_called_once() + mock_excel.assert_not_called() + + def test_find_session_id_falls_back_to_folder_when_excel_fails( + self, bvp_module: ModuleType + ) -> None: + """Should fall back to Excel lookup when folder-based detection fails.""" + mock_df = pd.DataFrame() + with ( + patch( + "sailsprep.BIDS_convertor.determine_session_from_folder", + return_value=None, + ) as mock_folder, + patch( + "sailsprep.BIDS_convertor.determine_session_from_excel", + return_value="02", + ) as mock_excel, + ): + session = bvp_module.find_session_id( + directory="/data/participant/unknown_folder", + current_path="/data/participant/unknown_folder/video.mp4", + participant_path="/data/participant", + annotation_df=mock_df, + participant_id="001", + ) + + assert session == "02" + mock_folder.assert_called_once() + mock_excel.assert_called_once() + + def test_find_videos_recursive_collects_videos( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Test find_videos_recursive function.""" + participant = tmp_path / "sub-ABC" + participant.mkdir() + (participant / "12-16_months").mkdir() + v1 = participant / "12-16_months" / "one.mp4" + v1.write_text("x") + (participant / "notes.txt").write_text("ignore") + + videos = bvp_module.find_videos_recursive( + str(participant), str(participant), pd.DataFrame(), "ABC" + ) + assert any(str(v1) == p for p, s in videos) + + def test_preprocess_video_success_creates_output( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Check that preprocess_video succeeds when all steps work.""" + input_file = tmp_path / "in.mp4" + input_file.write_bytes(b"video") + + temp_dir = tmp_path / "temp" + temp_dir.mkdir() + + # Pre-create stabilized temp file + stabilized_tmp = temp_dir / f"stabilized_temp_{os.getpid()}.mp4" + stabilized_tmp.write_bytes(b"stable") + + output_path = tmp_path / "out.mp4" + output_path.write_bytes(b"processed") + + # Patch stabilize_video and subprocess.run + with ( + patch("sailsprep.BIDS_convertor.stabilize_video", return_value=None), + patch("sailsprep.BIDS_convertor.subprocess.run") as mock_run, + ): + mock_run.return_value.returncode = 0 + mock_run.return_value.stderr = "" + + # Should not raise any error + bvp_module.preprocess_video( + str(input_file), str(output_path), str(temp_dir) + ) + + # ✅ Assert that output file exists and is non-empty + assert output_path.exists(), "Output video file should exist" + assert output_path.stat().st_size >= 0, "Output video file should not be empty" + + # ✅ Assert that stabilized temp file was cleaned up + assert ( + not stabilized_tmp.exists() + ), "Temporary stabilized file should be removed" + + # ✅ Verify that ffmpeg (subprocess) was called + mock_run.assert_called_once() + + def test_safe_float_conversion_various(self, bvp_module: ModuleType) -> None: + """Test function for the conversion of float.""" + assert bvp_module.safe_float_conversion(None) == "n/a" + assert bvp_module.safe_float_conversion("n/a") == "n/a" + assert bvp_module.safe_float_conversion("12.5") == 12.5 + assert bvp_module.safe_float_conversion(3) == 3.0 + assert bvp_module.safe_float_conversion("abc", default="-") == "-" + + def test_create_audio_metadata_json_calls_save_json( + self, bvp_module: ModuleType + ) -> None: + """Test audio metadata creation function.""" + with patch("sailsprep.BIDS_convertor.save_json") as mock_save_json: + bvp_module.create_audio_metadata_json( + 12.3, {"task_name": "t", "task_description": "blabla"}, "out.json" + ) + mock_save_json.assert_called_once() + args = mock_save_json.call_args[0] + assert args[0]["Duration"] == 12.3 + assert args[0]["TaskName"] == "t" + assert args[0]["TaskDescription"] == "blabla" + + def test_create_raw_video_json_saves_properties( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Test raw video json creation function.""" + with ( + patch( + "sailsprep.BIDS_convertor.get_video_properties", + return_value={"FrameRate": 30.0, "Resolution": "1280x720"}, + ), + patch("sailsprep.BIDS_convertor.save_json") as mock_save, + ): + row = pd.Series( + { + "FileName": "a.mp4", + "Vid_duration": "00:01:00", + "Vid_date": "2020-01-01", + "timepoint": "14", + "SourceFile": "orig.mp4", + } + ) + + bvp_module.create_raw_video_json( + row, + {"task_name": "t", "context": "c", "activity": "a"}, + "somepath.mp4", + str(tmp_path / "raw.json"), + ) + + # Assert save_json was called once + mock_save.assert_called_once() + + # Extract the arguments used in the call + saved_data = mock_save.call_args[0][0] + + # Check that the metadata contains expected values + assert saved_data["TaskName"] == "t" + assert saved_data["FrameRate"] == 30.0 + assert saved_data["Resolution"] == "1280x720" + assert saved_data["OriginalFilename"] == "a.mp4" + assert saved_data["Context"] == "c" + assert saved_data["Activity"] == "a" + assert saved_data["TimePoint"] == "14" + assert saved_data["SourceFile"] == "orig.mp4" + assert ( + abs(saved_data["Duration"] - 60.0) < 1e-6 + ) # assuming parse_duration → seconds + + def test_create_participants_file_creates_expected_outputs( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Test create participants.tsv function.""" + # Setup mock data + bids_root = tmp_path / "bids" + bids_root.mkdir() + (bids_root / "sub-101").mkdir() + (bids_root / "sub-102").mkdir() + + asd_file = tmp_path / "asd.xlsx" + df = pd.DataFrame({"ID": ["101", "102"], "Group": ["ASD", "Non-ASD"]}) + df.to_excel(asd_file, index=True) + + bvp_module.create_participants_file(str(bids_root), str(asd_file)) + + # Assertions + tsv_path = bids_root / "participants.tsv" + json_path = bids_root / "participants.json" + assert tsv_path.exists() + assert json_path.exists() + + df_out = pd.read_csv(tsv_path, sep="\t") + print(df_out) + assert set(df_out["participant_id"]) == {"sub-101", "sub-102"} + assert set(df_out["group"]) == {"ASD", "Non-ASD"} + + def test_print_summary_outputs_expected( + self, capsys: pytest.CaptureFixture[str], bvp_module: ModuleType + ) -> None: + """Test the summary printer function.""" + processed = [ + { + "task_label": "a", + "participant_id": "p1", + "session_id": "01", + "duration_sec": 60, + "has_excel_data": True, + }, + { + "task_label": "b", + "participant_id": "p2", + "session_id": "02", + "duration_sec": 120, + "has_excel_data": False, + }, + ] + failed = [{"video": "x", "error": "boom"}] + bvp_module.print_summary(processed, failed) + captured = capsys.readouterr() + assert "Successfully processed: 2 videos" in captured.out + assert "Failed to process: 1 videos" in captured.out + + def test_merge_subjects_merges_and_removes( + self, tmp_path: Path, bvp_module: ModuleType + ) -> None: + """Test merge subjects function.""" + # Prepare FINAL_BIDS_ROOT and derivatives paths + root = tmp_path / "bids" + deriv = root / "derivatives" / "preprocessed" + (root).mkdir(parents=True) + (deriv).mkdir(parents=True) + + # Create original and duplicate subject folders + orig = root / "sub-200" + dup = root / "sub-200 2" + orig.mkdir() + dup.mkdir() + # Add file to dup that should be moved + (dup / "file.txt").write_text("hello") + + # Run merge_subjects + bvp_module.merge_subjects(str(root)) + + # After merge, duplicate folder should not exist + assert not dup.exists() + + +class TestProcessSingleVideo: + """Test the process_single_video function.""" + + def test_process_single_video_empty_info(self, bvp_module: ModuleType) -> None: + """Test the processing of single video with empty information.""" + result, error = bvp_module.process_single_video( + {}, pd.DataFrame(), "root", "deriv", "tmp" + ) + assert result is None + assert isinstance(error, dict) + assert "video_info is empty" in error["error"] + + def test_process_single_video_missing_keys(self, bvp_module: ModuleType) -> None: + """Test the processing of single video with missing information.""" + video_info = {"filename": "f.mp4"} # missing participant_id, etc. + result, error = bvp_module.process_single_video( + video_info, pd.DataFrame(), "root", "deriv", "tmp" + ) + assert result is None + assert "Missing required video_info keys" in error["error"] # Test fixtures for reusable data @pytest.fixture def sample_demographics() -> pd.DataFrame: """Sample demographics DataFrame for testing.""" - return pd.DataFrame({ - 'dependent_temporary_id': ['A001', 'A002', 'N001'], - 'dependent_dob': ['2022-01-01', '2022-02-01', '2022-03-01'], - 'sex': ['M', 'F', 'M'], - 'diagnosis': ['ASD', 'ASD', 'TD'] - }) + return pd.DataFrame( + { + "dependent_temporary_id": ["A001", "A002", "N001"], + "dependent_dob": ["2022-01-01", "2022-02-01", "2022-03-01"], + "sex": ["M", "F", "M"], + "diagnosis": ["ASD", "ASD", "TD"], + } + ) @pytest.fixture def sample_video_metadata() -> dict[str, float | str | int]: """Sample video metadata for testing.""" return { - 'duration_sec': 120.5, - 'format': 'QuickTime / MOV', - 'bit_rate': 1000000, - 'size_bytes': 15000000 + "duration_sec": 120.5, + "format": "QuickTime / MOV", + "bit_rate": 1000000, + "size_bytes": 15000000, } -if __name__ == '__main__': - pytest.main([__file__]) \ No newline at end of file +if __name__ == "__main__": + pytest.main([__file__])