diff --git a/neurobooth_os/__init__.py b/neurobooth_os/__init__.py index e15de168..6753fb38 100644 --- a/neurobooth_os/__init__.py +++ b/neurobooth_os/__init__.py @@ -1,3 +1,3 @@ """Neurobooth OS""" -__version__ = "0.0.33" +__version__ = "0.0.34" diff --git a/neurobooth_os/tasks/MOT/frame.py b/neurobooth_os/tasks/MOT/frame.py index 5bb98776..754dc044 100755 --- a/neurobooth_os/tasks/MOT/frame.py +++ b/neurobooth_os/tasks/MOT/frame.py @@ -178,6 +178,7 @@ class ClickInfo(NamedTuple): class TrialResult(NamedTuple): """Contains information about subject performance during the trial""" trial_type: str + trial_count: int n_circles: int n_targets: int n_correct: int @@ -209,6 +210,7 @@ def __init__( super().__init__(window) self.task = task self._allow_clicks = True + self._log_results = True # Time-related properties of the stimulus self.flash_duration = trial_param.flash_duration @@ -256,6 +258,8 @@ def __init__( self.result_status: str = 'click' def run(self) -> None: + self.result_status = 'click' + self.completed = False self.__current_message = self.animation_message self.send_marker(self.start_marker) self.send_marker(f"number targets:{self.n_targets}") @@ -286,6 +290,9 @@ def run(self) -> None: "click the dots that flashed." ) self.result_status = 'timeout' + self.completed = True + if self._log_results: + self.task.log_result(self.results()) self.update_score(-sum([c.correct for c in self.click_info])) self.click_info = [] self.run() # Repeat the trial @@ -293,6 +300,8 @@ def run(self) -> None: self.send_marker(self.end_marker) self.completed = True + if self._log_results: + self.task.log_result(self.results()) wait(0.5) def send_marker(self, marker: str) -> None: @@ -469,6 +478,7 @@ def results(self) -> TrialResult: click_duration=max([0, *[c.time for c in self.click_info]]), state='aborted' if not self.completed else self.result_status, trial_type=self.trial_type, + trial_count=self.trial_count, ) @@ -479,6 +489,7 @@ class ExampleFrame(TrialFrame): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._allow_clicks = False + self._log_results = False # Disable the message at the top/bottom of the stimulus area self.click_message = '' @@ -502,6 +513,7 @@ def __init__(self, *args, max_attempts: int = 2, **kwargs): :param max_attempts: The maximum number of practice attempts. """ super().__init__(*args, **kwargs) + self._log_results = False # This class will handle result logging after calling super.run() # Set a different message to display at the bottom of the stimulus area self.click_message = f'Click the {self.n_targets} dots that were cyan' @@ -526,10 +538,12 @@ def run(self) -> None: n_correct = sum([c.correct for c in self.click_info]) if (n_correct < self.n_targets) and (i < self.max_attempts-1): self.result_status = 'repeat' + self.task.log_result(self.results()) self.present_alert( "Let's try again.\n" f"When the movement stops, click the {self.n_targets} dots that flashed." ) else: self.present_alert(f"You got {n_correct} of {self.n_targets} dots correct.") + self.task.log_result(self.results()) break diff --git a/neurobooth_os/tasks/MOT/task.py b/neurobooth_os/tasks/MOT/task.py index bf48e044..62bc51e5 100644 --- a/neurobooth_os/tasks/MOT/task.py +++ b/neurobooth_os/tasks/MOT/task.py @@ -85,6 +85,8 @@ def __init__( self.stimulus_params = [continue_message, practice_chunks, test_chunks] self._init_frame_sequence(*self.stimulus_params) + self.results: List[TrialResult] = [] + @classmethod def asset_path(cls, asset: Union[str, os.PathLike]) -> str: """ @@ -110,7 +112,8 @@ def _create_frame(self, params: FrameParameters) -> MOTFrame: self.trial_count += 1 return TrialFrame(self.win, self, self.trial_count, params) elif params.trial_type == 'practice': - return PracticeFrame(self.win, self, 0, params) + self.trial_count += 1 + return PracticeFrame(self.win, self, self.trial_count, params) elif params.trial_type == 'example': return ExampleFrame(self.win, self, 0, params) else: @@ -122,7 +125,7 @@ def _create_frame(self, params: FrameParameters) -> MOTFrame: raise MOTException(f'Unexpected frame parameter type: {type(params)}') def _create_chunk(self, chunk: FrameChunk) -> List[MOTFrame]: - self.trial_count = 0 # Keep track of how many test trial frames are in a chunk + self.trial_count = 0 # Keep track of how many test/practice trial frames are in a chunk return [self._create_frame(params) for params in chunk.frames] def _init_frame_sequence( @@ -184,10 +187,18 @@ def run(self, prompt=True, last_task=False, subj_id=None, **kwargs): self.present_complete(last_task) return self.events - @staticmethod - def run_chunk(chunk: List[MOTFrame]) -> None: + def run_chunk(self, chunk: List[MOTFrame]) -> None: for frame in chunk: - frame.run() + try: + frame.run() + except TaskAborted as e: + # Log results for aborted frame, then propagate the exception + if isinstance(frame, (PracticeFrame, TrialFrame)): + self.log_result(frame.results()) + raise e + + def log_result(self, result: TrialResult): + self.results.append(result) @staticmethod def chunk_click_duration(chunk: List[MOTFrame]) -> float: @@ -214,12 +225,7 @@ def save_csv(self, data: pd.DataFrame, name: str) -> None: self.task_files.append(fname) def save_results(self): - all_frames: List[MOTFrame] = [*chain(*self.practice_chunks), *chain(*self.test_chunks)] - results: List[TrialResult] = [ - frame.results() for frame in all_frames - if isinstance(frame, TrialFrame) and frame.trial_type in ('test', 'practice') - ] - results_df = pd.DataFrame(results, columns=TrialResult._fields) + results_df = pd.DataFrame(self.results, columns=TrialResult._fields) test_results = results_df.loc[(results_df['trial_type'] == 'test') & (results_df['state'] == 'click')] total_targets = test_results['n_targets'].sum() @@ -234,6 +240,8 @@ def save_results(self): self.save_csv(results_df, 'results') self.save_csv(outcome_df, 'outcomes') + self.results = [] # Clear log of results in case the task repeats! + if __name__ == "__main__": from psychopy import monitors