diff --git a/.gitignore b/.gitignore index b36d5e8e..3770de95 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ __pycache__ *.log *.ipynb_checkpoints Data/* +*/Data/* *DS_Store diff --git a/ActivityAnalyses/gait_analysis.py b/ActivityAnalyses/gait_analysis.py new file mode 100644 index 00000000..825a4aca --- /dev/null +++ b/ActivityAnalyses/gait_analysis.py @@ -0,0 +1,435 @@ +""" + --------------------------------------------------------------------------- + OpenCap processing: gaitAnalysis.py + --------------------------------------------------------------------------- + + Copyright 2023 Stanford University and the Authors + + Author(s): Antoine Falisse, Scott Uhlrich + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +import os +import sys +sys.path.append('../') + +import numpy as np +import pandas as pd +from scipy.signal import find_peaks + +from utilsKinematics import kinematics +from utilsProcessing import lowPassFilter +from utilsTRC import trc_2_dict + + +class gait_analysis(kinematics): + + def __init__(self, sessionDir, trialName, leg='auto', + lowpass_cutoff_frequency_for_coordinate_values=-1, + n_gait_cycles=-1): + + # Inherit init from kinematics class. + super().__init__( + sessionDir, + trialName, + lowpass_cutoff_frequency_for_coordinate_values=lowpass_cutoff_frequency_for_coordinate_values) + + # Marker data load and filter. + trcFilePath = os.path.join(sessionDir, + 'MarkerData', + '{}.trc'.format(trialName)) + self.markerDict = trc_2_dict(trcFilePath) + if lowpass_cutoff_frequency_for_coordinate_values > 0: + self.markerDict['markers'] = { + marker_name: lowPassFilter(self.time, data, lowpass_cutoff_frequency_for_coordinate_values) + for marker_name, data in self.markerDict['markers'].items()} + + # Coordinate values. + self.coordinateValues = self.get_coordinate_values() + + # Segment gait cycles. + self.gaitEvents = self.segment_walking(n_gait_cycles=n_gait_cycles,leg=leg) + self.nGaitCycles = np.shape(self.gaitEvents['ipsilateralIdx'])[0] + + # Determine treadmill speed (0 if overground). + self.treadmillSpeed = self.compute_treadmill_speed() + + # Initialize variables to be lazy loaded. + self._comValues = None + self._R_world_to_gait = None + + # Compute COM trajectory. + def comValues(self): + if self._comValues is None: + self._comValues = self.get_center_of_mass_values() + return self._comValues + + # Compute gait frame. + def R_world_to_gait(self): + if self._R_world_to_gait is None: + self._R_world_to_gait = self.compute_gait_frame() + return self._R_world_to_gait + + def compute_scalars(self,scalarNames): + + # Verify that scalarNames are methods in gait_analysis. + method_names = [func for func in dir(self) if callable(getattr(self, func))] + possibleMethods = [entry for entry in method_names if 'compute_' in entry] + + if scalarNames is None: + print('No scalars defined, these methods are available:') + print(*possibleMethods) + return + + nonexistant_methods = [entry for entry in scalarNames if 'compute_' + entry not in method_names] + + if len(nonexistant_methods) > 0: + raise Exception(str(['compute_' + a for a in nonexistant_methods]) + ' does not exist in gait_analysis class.') + + scalarDict = {} + for scalarName in scalarNames: + thisFunction = getattr(self, 'compute_' + scalarName) + scalarDict[scalarName] = thisFunction() + + return scalarDict + + def compute_stride_length(self): + + leg,_ = self.get_leg() + + calc_position = self.markerDict['markers'][leg + '_calc_study'] + + # On treadmill, the stride length is the difference in ipsilateral + # calcaneus position at heel strike + treadmill speed * time. + strideLength = ( + np.linalg.norm( + calc_position[self.gaitEvents['ipsilateralIdx'][:,:1]] - + calc_position[self.gaitEvents['ipsilateralIdx'][:,2:3]], axis=2) + + self.treadmillSpeed * np.diff(self.gaitEvents['ipsilateralTime'][:,(0,2)])) + + # Average across all strides. + strideLength = np.mean(strideLength) + + return strideLength + + def compute_gait_speed(self): + + comValuesArray = np.vstack((self.comValues()['x'],self.comValues()['y'],self.comValues()['z'])).T + gait_speed = ( + np.linalg.norm( + comValuesArray[self.gaitEvents['ipsilateralIdx'][:,:1]] - + comValuesArray[self.gaitEvents['ipsilateralIdx'][:,2:3]], axis=2) / + np.diff(self.gaitEvents['ipsilateralTime'][:,(0,2)]) + self.treadmillSpeed) + + # Average across all strides. + gait_speed = np.mean(gait_speed) + + return gait_speed + + def compute_cadence(self): + + # In steps per second. + cadence = 1/np.diff(self.gaitEvents['ipsilateralTime'][:,(0,2)]) + + # Average across all strides. + cadence = np.mean(cadence) + + return cadence + + def compute_treadmill_speed(self, overground_speed_threshold=0.3): + + leg,_ = self.get_leg() + + foot_position = self.markerDict['markers'][leg + '_ankle_study'] + + stanceTimeLength = np.round(np.diff(self.gaitEvents['ipsilateralIdx'][:,:2])) + startIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,:1]+.1*stanceTimeLength).astype(int) + endIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,1:2]-.3*stanceTimeLength).astype(int) + + # Average instantaneous velocities. + dt = np.diff(self.markerDict['time'][:2])[0] + for i in range(self.nGaitCycles): + footVel = np.linalg.norm(np.mean(np.diff( + foot_position[startIdx[i,0]:endIdx[i,0],:],axis=0),axis=0)/dt) + + treadmillSpeed = np.mean(footVel) + + # Overground. + if treadmillSpeed < overground_speed_threshold: + treadmillSpeed = 0 + + return treadmillSpeed + + def compute_step_width(self): + + leg,contLeg = self.get_leg() + + # Get ankle joint center positions. + ankle_position_ips = ( + self.markerDict['markers'][leg + '_ankle_study'] + + self.markerDict['markers'][leg + '_mankle_study'])/2 + ankle_position_cont = ( + self.markerDict['markers'][contLeg + '_ankle_study'] + + self.markerDict['markers'][contLeg + '_mankle_study'])/2 + ankleVector = ( + ankle_position_cont[self.gaitEvents['contralateralIdx'][:,1]] - + ankle_position_ips[self.gaitEvents['ipsilateralIdx'][:,0]]) + + ankleVector_inGaitFrame = np.array( + [np.dot(ankleVector[i,:], self.R_world_to_gait()[i,:,:]) + for i in range(self.nGaitCycles)]) + + # Step width is z distance. + stepWidth = np.abs(ankleVector_inGaitFrame[:,2]) + + # Average across all strides. + stepWidth = np.mean(stepWidth) + + return stepWidth + + def compute_single_support_time(self): + + singleSupportTime = np.diff(self.gaitEvents['ipsilateralTime'][:,:2]) + + # Average across all strides. + singleSupportTime = np.mean(singleSupportTime) + + return singleSupportTime + + def compute_double_support_time(self): + + # Ipsilateral single support time - contralateral swing time. + doubleSupportTime = ( + np.diff(self.gaitEvents['ipsilateralTime'][:,:2]) - + np.diff(self.gaitEvents['contralateralTime'][:,:2])) + + # Average across all strides. + doubleSupportTime = np.mean(doubleSupportTime) + + return doubleSupportTime + + def compute_gait_frame(self): + + # Create frame for each gait cycle with x: pelvis heading, + # z: average vector between ASIS during gait cycle, y: cross. + + # Pelvis center trajectory (for overground heading vector). + pelvisMarkerNames = ['r.ASIS_study','L.ASIS_study','r.PSIS_study','L.PSIS_study'] + pelvisMarkers = [self.markerDict['markers'][mkr] for mkr in pelvisMarkerNames] + pelvisCenter = np.mean(np.array(pelvisMarkers),axis=0) + + # Ankle trajectory (for treadmill heading vector). + leg = self.gaitEvents['ipsilateralLeg'] + if leg == 'l': leg='L' + anklePos = self.markerDict['markers'][leg + '_ankle_study'] + + # Vector from left ASIS to right ASIS (for mediolateral direction). + asisMarkerNames = ['L.ASIS_study','r.ASIS_study'] + asisMarkers = [self.markerDict['markers'][mkr] for mkr in asisMarkerNames] + asisVector = np.squeeze(np.diff(np.array(asisMarkers),axis=0)) + + # Heading vector per gait cycle. + # If overground, use pelvis center trajectory; treadmill: ankle trajectory. + if self.treadmillSpeed == 0: + x = np.diff(pelvisCenter[self.gaitEvents['ipsilateralIdx'][:,(0,2)],:],axis=1)[:,0,:] + x = x / np.linalg.norm(x,axis=1,keepdims=True) + else: + x = np.zeros((self.nGaitCycles,3)) + for i in range(self.nGaitCycles): + x[i,:] = anklePos[self.gaitEvents['ipsilateralIdx'][i,2]] - \ + anklePos[self.gaitEvents['ipsilateralIdx'][i,1]] + x = x / np.linalg.norm(x,axis=1,keepdims=True) + + # Mean ASIS vector over gait cycle. + z = np.zeros((self.nGaitCycles,3)) + for i in range(self.nGaitCycles): + z[i,:] = np.mean(asisVector[self.gaitEvents['ipsilateralIdx'][i,0]: \ + self.gaitEvents['ipsilateralIdx'][i,2]],axis=0) + z = z / np.linalg.norm(z,axis=1,keepdims=True) + + # Cross to get y. + y = np.cross(z,x) + + # 3x3xnSteps. + R_lab_to_gait = np.stack((x.T,y.T,z.T),axis=1).transpose((2, 0, 1)) + + return R_lab_to_gait + + def get_leg(self): + + if self.gaitEvents['ipsilateralLeg'] == 'r': + leg = 'r' + contLeg = 'L' + else: + leg = 'L' + contLeg = 'r' + + return leg, contLeg + + def get_coordinates_normalized_time(self): + + colNames = self.coordinateValues.columns + data = self.coordinateValues.to_numpy(copy=True) + coordValuesNorm = [] + for i in range(self.nGaitCycles): + coordValues = data[self.gaitEvents['ipsilateralIdx'][i,0]:self.gaitEvents['ipsilateralIdx'][i,2]] + coordValuesNorm.append(np.stack([np.interp(np.linspace(0,100,101), + np.linspace(0,100,len(coordValues)),coordValues[:,i]) \ + for i in range(coordValues.shape[1])],axis=1)) + + coordinateValuesTimeNormalized = {} + # Average. + coordVals_mean = np.mean(np.array(coordValuesNorm),axis=0) + coordinateValuesTimeNormalized['mean'] = pd.DataFrame(data=coordVals_mean, columns=colNames) + + # Standard deviation. + if self.nGaitCycles >2: + coordVals_sd = np.std(np.array(coordValuesNorm), axis=0) + coordinateValuesTimeNormalized['sd'] = pd.DataFrame(data=coordVals_sd, columns=colNames) + else: + coordinateValuesTimeNormalized['sd'] = None + + # Return to dataframe. + coordinateValuesTimeNormalized['indiv'] = [pd.DataFrame(data=d, columns=colNames) for d in coordValuesNorm] + + return coordinateValuesTimeNormalized + + def segment_walking(self, n_gait_cycles=-1, leg='auto', visualize=False): + + # n_gait_cycles = -1 finds all accessible gait cycles. Otherwise, it + # finds that many gait cycles, working backwards from end of trial. + + # Subtract sacrum from foot. + # It looks like the position-based approach will be more robust. + r_calc_rel_x = ( + self.markerDict['markers']['r_calc_study'] - + self.markerDict['markers']['r.PSIS_study'])[:,0] + r_toe_rel_x = ( + self.markerDict['markers']['r_toe_study'] - + self.markerDict['markers']['r.PSIS_study'])[:,0] + + # Repeat for left. + l_calc_rel_x = ( + self.markerDict['markers']['L_calc_study'] - + self.markerDict['markers']['L.PSIS_study'])[:,0] + l_toe_rel_x = ( + self.markerDict['markers']['L_toe_study'] - + self.markerDict['markers']['L.PSIS_study'])[:,0] + + # Find HS. + rHS, _ = find_peaks(r_calc_rel_x, prominence=0.3) + lHS, _ = find_peaks(l_calc_rel_x, prominence=0.3) + + # Find TO. + rTO, _ = find_peaks(-r_toe_rel_x, prominence=0.3) + lTO, _ = find_peaks(-l_toe_rel_x, prominence=0.3) + + if visualize: + import matplotlib.pyplot as plt + plt.close('all') + plt.figure(1) + plt.plot(self.markerDict['time'],r_toe_rel_x,label='toe') + plt.plot(self.markerDict['time'],r_calc_rel_x,label='calc') + plt.scatter(self.markerDict['time'][rHS], r_calc_rel_x[rHS], color='red', label='rHS') + plt.scatter(self.markerDict['time'][rTO], r_toe_rel_x[rTO], color='blue', label='rTO') + plt.legend() + + plt.figure(2) + plt.plot(self.markerDict['time'],l_toe_rel_x,label='toe') + plt.plot(self.markerDict['time'],l_calc_rel_x,label='calc') + plt.scatter(self.markerDict['time'][lHS], l_calc_rel_x[lHS], color='red', label='lHS') + plt.scatter(self.markerDict['time'][lTO], l_toe_rel_x[lTO], color='blue', label='lTO') + plt.legend() + + # Find the number of gait cycles for the foot of interest. + if leg=='auto': + # Find the last HS of either foot. + if rHS[-1] > lHS[-1]: + leg = 'r' + else: + leg = 'l' + + # Find the number of gait cycles for the foot of interest. + if leg == 'r': + hsIps = rHS + toIps = rTO + hsCont = lHS + toCont = lTO + elif leg == 'l': + hsIps = lHS + toIps = lTO + hsCont = rHS + toCont = rTO + + if len(hsIps)-1 < n_gait_cycles: + print('You requested {} gait cycles, but only {} were found. ' + 'Proceeding with this number.'.format(n_gait_cycles,len(hsIps)-1)) + n_gait_cycles = len(hsIps)-1 + if n_gait_cycles == -1: + n_gait_cycles = len(hsIps)-1 + print('Processing {} gait cycles.'.format(n_gait_cycles)) + + # Ipsilateral gait events: heel strike, toe-off, heel strike. + gaitEvents_ips = np.zeros((n_gait_cycles, 3),dtype=np.int) + # Contralateral gait events: toe-off, heel strike. + gaitEvents_cont = np.zeros((n_gait_cycles, 2),dtype=np.int) + if n_gait_cycles <1: + raise Exception('Not enough gait cycles found.') + + for i in range(n_gait_cycles): + # Ipsilateral HS, TO, HS. + gaitEvents_ips[i,0] = hsIps[-i-2] + gaitEvents_ips[i,2] = hsIps[-i-1] + + # Iterate in reverse through ipsilateral TO, finding the one that + # is within the range of gaitEvents_ips. + toIpsFound = False + for j in range(len(toIps)): + if toIps[-j-1] > gaitEvents_ips[i,0] and toIps[-j-1] < gaitEvents_ips[i,2] and not toIpsFound: + gaitEvents_ips[i,1] = toIps[-j-1] + toIpsFound = True + + # Contralateral TO, HS. + # Iterate in reverse through contralateral HS and TO, finding the + # one that is within the range of gaitEvents_ips + hsContFound = False + toContFound = False + for j in range(len(toCont)): + if toCont[-j-1] > gaitEvents_ips[i,0] and toCont[-j-1] < gaitEvents_ips[i,2] and not toContFound: + gaitEvents_cont[i,0] = toCont[-j-1] + toContFound = True + + for j in range(len(hsCont)): + if hsCont[-j-1] > gaitEvents_ips[i,0] and hsCont[-j-1] < gaitEvents_ips[i,2] and not hsContFound: + gaitEvents_cont[i,1] = hsCont[-j-1] + hsContFound = True + + # Making contralateral gait events optional. + if not toContFound or not hsContFound: + raise Warning('Could not find contralateral gait event within ipsilateral gait event range.') + gaitEvents_cont[i,0] = np.nan + gaitEvents_cont[i,1] = np.nan + + # Convert gaitEvents to times using self.markerDict['time']. + gaitEventTimes_ips = self.markerDict['time'][gaitEvents_ips] + gaitEventTimes_cont = self.markerDict['time'][gaitEvents_cont] + + gaitEvents = {'ipsilateralIdx':gaitEvents_ips, + 'contralateralIdx':gaitEvents_cont, + 'ipsilateralTime':gaitEventTimes_ips, + 'contralateralTime':gaitEventTimes_cont, + 'eventNamesIpsilateral':['HS','TO','HS'], + 'eventNamesContralateral':['TO','HS'], + 'ipsilateralLeg':leg} + + return gaitEvents + \ No newline at end of file diff --git a/ActivityAnalyses/y_balance_analysis.py b/ActivityAnalyses/y_balance_analysis.py new file mode 100644 index 00000000..39d0b992 --- /dev/null +++ b/ActivityAnalyses/y_balance_analysis.py @@ -0,0 +1,479 @@ +""" + --------------------------------------------------------------------------- + OpenCap processing: gaitAnalysis.py + --------------------------------------------------------------------------- + + Copyright 2023 Stanford University and the Authors + + Author(s): Antoine Falisse, Scott Uhlrich + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +import os +import sys +sys.path.append('../') + +import numpy as np +import pandas as pd +from scipy.signal import find_peaks + +from utilsKinematics import kinematics + + +class y_balance(kinematics): + + def __init__(self, session_dir, trial_name, leg='auto', + lowpass_cutoff_frequency_for_coordinate_values=-1, + n_gait_cycles=-1): + + # Inherit init from kinematics class. + super().__init__( + session_dir, + trial_name, + lowpass_cutoff_frequency_for_coordinate_values=lowpass_cutoff_frequency_for_coordinate_values) + + # Marker data load and filter. + self.marker_dict = self.get_marker_dict(session_dir, trial_name, + lowpass_cutoff_frequency=lowpass_cutoff_frequency_for_coordinate_values) + + # Coordinate values. + self.coordinateValues = self.get_coordinate_values() + + # Find what leg is moving. + self.leg = self.get_y_balance_leg() + + # Define reference frame. + self.R_world_to_yBalance = self.compute_y_balance_frame() + + # Segment y balance. + self.events, self.distances, self.leg = self.segment_y_balance() + + + def get_y_balance_leg(self): + + calc_r = self.markerDict['markers']['r_calc_study'] + calc_l = self.markerDict['markers']['L_calc_study'] + + leg= {} + if np.std(calc_r) < np.std(calc_l): + leg['stance'] = 'r' + leg['swing'] = 'L' + else: + leg['stance'] = 'L' + leg['swing'] = 'r' + + return leg + + def compute_y_balance_frame(self): + # y up, x-> line between calc and 2metatarsal head projected onto xz plane, + # z -> cross(x,y) + + foot = self.markerDict['markers'][self.leg['stance'] + '_toe_study'] - \ + self.markerDict['markers'][self.leg['stance'] + '_calc_study'] + + x = np.mean(foot[:10,:] / np.linalg.norm(foot[:10,:],axis=1,keepdims=True)) + x[:,1] = 0 + + y = np.array([0,1,0]) + z = np.cross(x,y) + + R_lab_to_yBalance = np.array([x.T,y.T,z.T]) + + # TODO TEST + + return R_lab_to_yBalance + + def segment_y_balance(self): + + todo=1 + + # rotate marker data (make fxn in utils) + + # subtract stance toe from swing toe + + # Segment start (march back from forward max, forward max, + # 1st diag start, max + # 2nd diag start, max + # return distance and ind + + + + + + + + # Compute COM trajectory. + def comValues(self): + if self._comValues is None: + self._comValues = self.get_center_of_mass_values() + return self._comValues + + # Compute gait frame. + def R_world_to_gait(self): + if self._R_world_to_gait is None: + self._R_world_to_gait = self.compute_gait_frame() + return self._R_world_to_gait + + def compute_scalars(self,scalarNames): + + # Verify that scalarNames are methods in gait_analysis. + method_names = [func for func in dir(self) if callable(getattr(self, func))] + possibleMethods = [entry for entry in method_names if 'compute_' in entry] + + if scalarNames is None: + print('No scalars defined, these methods are available:') + print(*possibleMethods) + return + + nonexistant_methods = [entry for entry in scalarNames if 'compute_' + entry not in method_names] + + if len(nonexistant_methods) > 0: + raise Exception(str(['compute_' + a for a in nonexistant_methods]) + ' does not exist in gait_analysis class.') + + scalarDict = {} + for scalarName in scalarNames: + thisFunction = getattr(self, 'compute_' + scalarName) + scalarDict[scalarName] = thisFunction() + + return scalarDict + + def compute_stride_length(self): + + leg,_ = self.get_leg() + + calc_position = self.markerDict['markers'][leg + '_calc_study'] + + # On treadmill, the stride length is the difference in ipsilateral + # calcaneus position at heel strike + treadmill speed * time. + strideLength = ( + np.linalg.norm( + calc_position[self.gaitEvents['ipsilateralIdx'][:,:1]] - + calc_position[self.gaitEvents['ipsilateralIdx'][:,2:3]], axis=2) + + self.treadmillSpeed * np.diff(self.gaitEvents['ipsilateralTime'][:,(0,2)])) + + # Average across all strides. + strideLength = np.mean(strideLength) + + return strideLength + + def compute_gait_speed(self): + + comValuesArray = np.vstack((self.comValues()['x'],self.comValues()['y'],self.comValues()['z'])).T + gait_speed = ( + np.linalg.norm( + comValuesArray[self.gaitEvents['ipsilateralIdx'][:,:1]] - + comValuesArray[self.gaitEvents['ipsilateralIdx'][:,2:3]], axis=2) / + np.diff(self.gaitEvents['ipsilateralTime'][:,(0,2)]) + self.treadmillSpeed) + + # Average across all strides. + gait_speed = np.mean(gait_speed) + + return gait_speed + + def compute_cadence(self): + + # In steps per second. + cadence = 1/np.diff(self.gaitEvents['ipsilateralTime'][:,(0,2)]) + + # Average across all strides. + cadence = np.mean(cadence) + + return cadence + + def compute_treadmill_speed(self, overground_speed_threshold=0.3): + + leg,_ = self.get_leg() + + foot_position = self.markerDict['markers'][leg + '_ankle_study'] + + stanceTimeLength = np.round(np.diff(self.gaitEvents['ipsilateralIdx'][:,:2])) + startIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,:1]+.1*stanceTimeLength).astype(int) + endIdx = np.round(self.gaitEvents['ipsilateralIdx'][:,1:2]-.3*stanceTimeLength).astype(int) + + # Average instantaneous velocities. + dt = np.diff(self.markerDict['time'][:2])[0] + for i in range(self.nGaitCycles): + footVel = np.linalg.norm(np.mean(np.diff( + foot_position[startIdx[i,0]:endIdx[i,0],:],axis=0),axis=0)/dt) + + treadmillSpeed = np.mean(footVel) + + # Overground. + if treadmillSpeed < overground_speed_threshold: + treadmillSpeed = 0 + + return treadmillSpeed + + def compute_step_width(self): + + leg,contLeg = self.get_leg() + + # Get ankle joint center positions. + ankle_position_ips = ( + self.markerDict['markers'][leg + '_ankle_study'] + + self.markerDict['markers'][leg + '_mankle_study'])/2 + ankle_position_cont = ( + self.markerDict['markers'][contLeg + '_ankle_study'] + + self.markerDict['markers'][contLeg + '_mankle_study'])/2 + ankleVector = ( + ankle_position_cont[self.gaitEvents['contralateralIdx'][:,1]] - + ankle_position_ips[self.gaitEvents['ipsilateralIdx'][:,0]]) + + ankleVector_inGaitFrame = np.array( + [np.dot(ankleVector[i,:], self.R_world_to_gait()[i,:,:]) + for i in range(self.nGaitCycles)]) + + # Step width is z distance. + stepWidth = np.abs(ankleVector_inGaitFrame[:,2]) + + # Average across all strides. + stepWidth = np.mean(stepWidth) + + return stepWidth + + def compute_single_support_time(self): + + singleSupportTime = np.diff(self.gaitEvents['ipsilateralTime'][:,:2]) + + # Average across all strides. + singleSupportTime = np.mean(singleSupportTime) + + return singleSupportTime + + def compute_double_support_time(self): + + # Ipsilateral single support time - contralateral swing time. + doubleSupportTime = ( + np.diff(self.gaitEvents['ipsilateralTime'][:,:2]) - + np.diff(self.gaitEvents['contralateralTime'][:,:2])) + + # Average across all strides. + doubleSupportTime = np.mean(doubleSupportTime) + + return doubleSupportTime + + def compute_gait_frame(self): + + # Create frame for each gait cycle with x: pelvis heading, + # z: average vector between ASIS during gait cycle, y: cross. + + # Pelvis center trajectory (for overground heading vector). + pelvisMarkerNames = ['r.ASIS_study','L.ASIS_study','r.PSIS_study','L.PSIS_study'] + pelvisMarkers = [self.markerDict['markers'][mkr] for mkr in pelvisMarkerNames] + pelvisCenter = np.mean(np.array(pelvisMarkers),axis=0) + + # Ankle trajectory (for treadmill heading vector). + leg = self.gaitEvents['ipsilateralLeg'] + if leg == 'l': leg='L' + anklePos = self.markerDict['markers'][leg + '_ankle_study'] + + # Vector from left ASIS to right ASIS (for mediolateral direction). + asisMarkerNames = ['L.ASIS_study','r.ASIS_study'] + asisMarkers = [self.markerDict['markers'][mkr] for mkr in asisMarkerNames] + asisVector = np.squeeze(np.diff(np.array(asisMarkers),axis=0)) + + # Heading vector per gait cycle. + # If overground, use pelvis center trajectory; treadmill: ankle trajectory. + if self.treadmillSpeed == 0: + x = np.diff(pelvisCenter[self.gaitEvents['ipsilateralIdx'][:,(0,2)],:],axis=1)[:,0,:] + x = x / np.linalg.norm(x,axis=1,keepdims=True) + else: + x = np.zeros((self.nGaitCycles,3)) + for i in range(self.nGaitCycles): + x[i,:] = anklePos[self.gaitEvents['ipsilateralIdx'][i,2]] - \ + anklePos[self.gaitEvents['ipsilateralIdx'][i,1]] + x = x / np.linalg.norm(x,axis=1,keepdims=True) + + # Mean ASIS vector over gait cycle. + z = np.zeros((self.nGaitCycles,3)) + for i in range(self.nGaitCycles): + z[i,:] = np.mean(asisVector[self.gaitEvents['ipsilateralIdx'][i,0]: \ + self.gaitEvents['ipsilateralIdx'][i,2]],axis=0) + z = z / np.linalg.norm(z,axis=1,keepdims=True) + + # Cross to get y. + y = np.cross(z,x) + + # 3x3xnSteps. + R_lab_to_gait = np.stack((x.T,y.T,z.T),axis=1).transpose((2, 0, 1)) + + return R_lab_to_gait + + def get_leg(self): + + if self.gaitEvents['ipsilateralLeg'] == 'r': + leg = 'r' + contLeg = 'L' + else: + leg = 'L' + contLeg = 'r' + + return leg, contLeg + + def get_coordinates_normalized_time(self): + + colNames = self.coordinateValues.columns + data = self.coordinateValues.to_numpy(copy=True) + coordValuesNorm = [] + for i in range(self.nGaitCycles): + coordValues = data[self.gaitEvents['ipsilateralIdx'][i,0]:self.gaitEvents['ipsilateralIdx'][i,2]] + coordValuesNorm.append(np.stack([np.interp(np.linspace(0,100,101), + np.linspace(0,100,len(coordValues)),coordValues[:,i]) \ + for i in range(coordValues.shape[1])],axis=1)) + + coordinateValuesTimeNormalized = {} + # Average. + coordVals_mean = np.mean(np.array(coordValuesNorm),axis=0) + coordinateValuesTimeNormalized['mean'] = pd.DataFrame(data=coordVals_mean, columns=colNames) + + # Standard deviation. + if self.nGaitCycles >2: + coordVals_sd = np.std(np.array(coordValuesNorm), axis=0) + coordinateValuesTimeNormalized['sd'] = pd.DataFrame(data=coordVals_sd, columns=colNames) + else: + coordinateValuesTimeNormalized['sd'] = None + + # Return to dataframe. + coordinateValuesTimeNormalized['indiv'] = [pd.DataFrame(data=d, columns=colNames) for d in coordValuesNorm] + + return coordinateValuesTimeNormalized + + def segment_y_balance(self, n_gait_cycles=-1, leg='auto', visualize=False): + + # n_gait_cycles = -1 finds all accessible gait cycles. Otherwise, it + # finds that many gait cycles, working backwards from end of trial. + + # Subtract sacrum from foot. + # It looks like the position-based approach will be more robust. + r_calc_rel_x = ( + self.markerDict['markers']['r_calc_study'] - + self.markerDict['markers']['r.PSIS_study'])[:,0] + r_toe_rel_x = ( + self.markerDict['markers']['r_toe_study'] - + self.markerDict['markers']['r.PSIS_study'])[:,0] + + # Repeat for left. + l_calc_rel_x = ( + self.markerDict['markers']['L_calc_study'] - + self.markerDict['markers']['L.PSIS_study'])[:,0] + l_toe_rel_x = ( + self.markerDict['markers']['L_toe_study'] - + self.markerDict['markers']['L.PSIS_study'])[:,0] + + # Find HS. + rHS, _ = find_peaks(r_calc_rel_x, prominence=0.3) + lHS, _ = find_peaks(l_calc_rel_x, prominence=0.3) + + # Find TO. + rTO, _ = find_peaks(-r_toe_rel_x, prominence=0.3) + lTO, _ = find_peaks(-l_toe_rel_x, prominence=0.3) + + if visualize: + import matplotlib.pyplot as plt + plt.close('all') + plt.figure(1) + plt.plot(self.markerDict['time'],r_toe_rel_x,label='toe') + plt.plot(self.markerDict['time'],r_calc_rel_x,label='calc') + plt.scatter(self.markerDict['time'][rHS], r_calc_rel_x[rHS], color='red', label='rHS') + plt.scatter(self.markerDict['time'][rTO], r_toe_rel_x[rTO], color='blue', label='rTO') + plt.legend() + + plt.figure(2) + plt.plot(self.markerDict['time'],l_toe_rel_x,label='toe') + plt.plot(self.markerDict['time'],l_calc_rel_x,label='calc') + plt.scatter(self.markerDict['time'][lHS], l_calc_rel_x[lHS], color='red', label='lHS') + plt.scatter(self.markerDict['time'][lTO], l_toe_rel_x[lTO], color='blue', label='lTO') + plt.legend() + + # Find the number of gait cycles for the foot of interest. + if leg=='auto': + # Find the last HS of either foot. + if rHS[-1] > lHS[-1]: + leg = 'r' + else: + leg = 'l' + + # Find the number of gait cycles for the foot of interest. + if leg == 'r': + hsIps = rHS + toIps = rTO + hsCont = lHS + toCont = lTO + elif leg == 'l': + hsIps = lHS + toIps = lTO + hsCont = rHS + toCont = rTO + + if len(hsIps)-1 < n_gait_cycles: + print('You requested {} gait cycles, but only {} were found. ' + 'Proceeding with this number.'.format(n_gait_cycles,len(hsIps)-1)) + n_gait_cycles = len(hsIps)-1 + if n_gait_cycles == -1: + n_gait_cycles = len(hsIps)-1 + print('Processing {} gait cycles.'.format(n_gait_cycles)) + + # Ipsilateral gait events: heel strike, toe-off, heel strike. + gaitEvents_ips = np.zeros((n_gait_cycles, 3),dtype=np.int) + # Contralateral gait events: toe-off, heel strike. + gaitEvents_cont = np.zeros((n_gait_cycles, 2),dtype=np.int) + if n_gait_cycles <1: + raise Exception('Not enough gait cycles found.') + + for i in range(n_gait_cycles): + # Ipsilateral HS, TO, HS. + gaitEvents_ips[i,0] = hsIps[-i-2] + gaitEvents_ips[i,2] = hsIps[-i-1] + + # Iterate in reverse through ipsilateral TO, finding the one that + # is within the range of gaitEvents_ips. + toIpsFound = False + for j in range(len(toIps)): + if toIps[-j-1] > gaitEvents_ips[i,0] and toIps[-j-1] < gaitEvents_ips[i,2] and not toIpsFound: + gaitEvents_ips[i,1] = toIps[-j-1] + toIpsFound = True + + # Contralateral TO, HS. + # Iterate in reverse through contralateral HS and TO, finding the + # one that is within the range of gaitEvents_ips + hsContFound = False + toContFound = False + for j in range(len(toCont)): + if toCont[-j-1] > gaitEvents_ips[i,0] and toCont[-j-1] < gaitEvents_ips[i,2] and not toContFound: + gaitEvents_cont[i,0] = toCont[-j-1] + toContFound = True + + for j in range(len(hsCont)): + if hsCont[-j-1] > gaitEvents_ips[i,0] and hsCont[-j-1] < gaitEvents_ips[i,2] and not hsContFound: + gaitEvents_cont[i,1] = hsCont[-j-1] + hsContFound = True + + # Making contralateral gait events optional. + if not toContFound or not hsContFound: + raise Warning('Could not find contralateral gait event within ipsilateral gait event range.') + gaitEvents_cont[i,0] = np.nan + gaitEvents_cont[i,1] = np.nan + + # Convert gaitEvents to times using self.markerDict['time']. + gaitEventTimes_ips = self.markerDict['time'][gaitEvents_ips] + gaitEventTimes_cont = self.markerDict['time'][gaitEvents_cont] + + gaitEvents = {'ipsilateralIdx':gaitEvents_ips, + 'contralateralIdx':gaitEvents_cont, + 'ipsilateralTime':gaitEventTimes_ips, + 'contralateralTime':gaitEventTimes_cont, + 'eventNamesIpsilateral':['HS','TO','HS'], + 'eventNamesContralateral':['TO','HS'], + 'ipsilateralLeg':leg} + + return gaitEvents + \ No newline at end of file diff --git a/Examples/example_gait_analysis.py b/Examples/example_gait_analysis.py new file mode 100644 index 00000000..84b0853a --- /dev/null +++ b/Examples/example_gait_analysis.py @@ -0,0 +1,112 @@ +''' + --------------------------------------------------------------------------- + OpenCap processing: example_gait_analysis.py + --------------------------------------------------------------------------- + Copyright 2023 Stanford University and the Authors + + Author(s): Scott Uhlrich + + Licensed under the Apache License, Version 2.0 (the "License"); you may not + use this file except in compliance with the License. You may obtain a copy + of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Please contact us for any questions: https://www.opencap.ai/#contact + + This example shows how to run a kinematic analysis of gait data. It works + with either treadmill or overground gait. You can compute scalar metrics + as well as gait cycle-averaged kinematic curves. + +''' + +import os +import sys +sys.path.append("../") +sys.path.append("../ActivityAnalyses") + +from gait_analysis import gait_analysis +from utils import get_trial_id, download_trial +from utilsPlotting import plot_dataframe_with_shading + +# %% Paths. +baseDir = os.path.join(os.getcwd(), '..') +dataFolder = os.path.join(baseDir, 'Data') + +# %% User-defined variables. +# Select example: options are treadmill and overground. +example = 'treadmill' + +if example == 'treadmill': + session_id = '4d5c3eb1-1a59-4ea1-9178-d3634610561c' # 1.25m/s + trial_name = 'walk_1_25ms' + +elif example == 'overground': + session_id = 'b39b10d1-17c7-4976-b06c-a6aaf33fead2' + trial_name = 'gait_3' + +scalar_names = {'gait_speed','stride_length','step_width','cadence', + 'single_support_time','double_support_time'} + +# Select how many gait cycles you'd like to analyze. Select -1 for all gait +# cycles detected in the trial. +n_gait_cycles = -1 + +# Select lowpass filter frequency for kinematics data. +filter_frequency = 6 + +# %% Gait analysis. +# Get trial id from name. +trial_id = get_trial_id(session_id,trial_name) + +# Set session path. +sessionDir = os.path.join(dataFolder, session_id) + +# Download data. +trialName = download_trial(trial_id,sessionDir,session_id=session_id) + +# Init gait analysis. +gait_r = gait_analysis( + sessionDir, trialName, leg='r', + lowpass_cutoff_frequency_for_coordinate_values=filter_frequency, + n_gait_cycles=n_gait_cycles) +gait_l = gait_analysis( + sessionDir, trialName, leg='l', + lowpass_cutoff_frequency_for_coordinate_values=filter_frequency, + n_gait_cycles=n_gait_cycles) + +# Compute scalars and get time-normalized kinematic curves. +gaitResults = {} +gaitResults['scalars_r'] = gait_r.compute_scalars(scalar_names) +gaitResults['curves_r'] = gait_r.get_coordinates_normalized_time() +gaitResults['scalars_l'] = gait_l.compute_scalars(scalar_names) +gaitResults['curves_l'] = gait_l.get_coordinates_normalized_time() + +# %% Print scalar results. +print('\nRight foot gait metrics:') +print('(units: m and s)') +print('-----------------') +for key, value in gaitResults['scalars_r'].items(): + rounded_value = round(value, 2) + print(f"{key}: {rounded_value}") + +print('\nLeft foot gait metrics:') +print('(units: m and s)') +print('-----------------') +for key, value in gaitResults['scalars_l'].items(): + rounded_value = round(value, 2) + print(f"{key}: {rounded_value}") + + +# %% You can plot multiple curves, in this case we compare right and left legs. + +plot_dataframe_with_shading( + [gaitResults['curves_r']['mean'], gaitResults['curves_l']['mean']], + [gaitResults['curves_r']['sd'], gaitResults['curves_l']['sd']], + leg = ['r','l'], + xlabel = '% gait cycle', + title = 'kinematics (m or deg)', + legend_entries = ['right','left']) \ No newline at end of file diff --git a/Examples/example_walking_opensimAD.py b/Examples/example_walking_opensimAD.py index 944acc48..92b8bc7a 100644 --- a/Examples/example_walking_opensimAD.py +++ b/Examples/example_walking_opensimAD.py @@ -60,6 +60,8 @@ from utilsOpenSimAD import processInputsOpenSimAD, plotResultsOpenSimAD from mainOpenSimAD import run_tracking from utilsAuthentication import get_token +from utilsProcessing import segment_gait +from utils import get_trial_id, download_trial # %% OpenCap authentication. Visit https://app.opencap.ai/login to create an # account if you don't have one yet. @@ -76,6 +78,9 @@ # Insert the name of the trial you want to simulate. trial_name = 'walk_1_25ms' +# Insert the path to where you want the data to be downloaded. +dataFolder = os.path.join(baseDir, 'Data') + # Insert the type of activity you want to simulate. We have pre-defined settings # for different activities (more details above). Visit # ./UtilsDynamicSimulations/OpenSimAD/settingsOpenSimAD.py to see all available @@ -86,17 +91,27 @@ # Insert the time interval you want to simulate. It is recommended to simulate # trials shorter than 2s (more details above). Set to [] to simulate full trial. # We here selected a time window that corresponds to a full gait stride in order -# to use poeriodic constraints. -time_window = [5.7333333, 6.9333333] - -# Insert the speed of the treadmill in m/s. A positive value indicates that the -# subject is moving forward. You should ignore this parameter or set it to 0 if -# the trial was not measured on a treadmill. -treadmill_speed = 1.25 +# to use periodic constraints. You can use the gait segmentation function to +# automatically segment the gait cycle. Also insert the speed of the treadmill +# in m/s. A positive value indicates that the subject is moving forward. +# You should ignore this parameter or set it to 0 if the trial was not measured +# on a treadmill. You can also use the gait segmenter to automatically extract +# the treadmill speed. +segmentation_method = 'automatic' +if segmentation_method == 'automatic': + # Download the trial + download_trial(get_trial_id(session_id,trial_name), + os.path.join(dataFolder,session_id), + session_id=session_id) -# Insert the path to where you want the data to be downloaded. -dataFolder = os.path.join(baseDir, 'Data') + time_window, gaitObject = segment_gait( + session_id, trial_name, dataFolder, gait_cycles_from_end=3) + treadmill_speed = gaitObject.treadmillSpeed +else: + time_window = [5.7333333, 6.9333333] + treadmill_speed = 1.25 + # %% Sub-example 1: walking simulation with torque-driven model. # Insert a string to "name" you case. case = 'torque_driven' diff --git a/UtilsDynamicSimulations/OpenSimAD/utilsOpenSimAD.py b/UtilsDynamicSimulations/OpenSimAD/utilsOpenSimAD.py index ddd3072f..7cf4def3 100644 --- a/UtilsDynamicSimulations/OpenSimAD/utilsOpenSimAD.py +++ b/UtilsDynamicSimulations/OpenSimAD/utilsOpenSimAD.py @@ -37,8 +37,8 @@ from utils import (storage_to_numpy, storage_to_dataframe, download_kinematics, import_metadata, numpy_to_storage) -from utilsProcessing import (segmentSquats, segmentSTS, adjustMuscleWrapping, - generateModelWithContacts) +from utilsProcessing import (segment_squats, segment_STS, adjust_muscle_wrapping, + generate_model_with_contacts) from settingsOpenSimAD import get_setup # %% Filter numpy array. @@ -2179,10 +2179,10 @@ def processInputsOpenSimAD(baseDir, dataFolder, session_id, trial_name, # Prepare inputs for dynamic simulations. # Adjust muscle wrapping. - adjustMuscleWrapping(baseDir, dataFolder, session_id, + adjust_muscle_wrapping(baseDir, dataFolder, session_id, OpenSimModel=OpenSimModel, overwrite=overwrite) # Add foot-ground contacts to musculoskeletal model. - generateModelWithContacts(dataFolder, session_id, + generate_model_with_contacts(dataFolder, session_id, OpenSimModel=OpenSimModel, overwrite=overwrite) # Generate external function. generateExternalFunction(baseDir, dataFolder, session_id, @@ -2198,9 +2198,9 @@ def processInputsOpenSimAD(baseDir, dataFolder, session_id, trial_name, if (repetition is not None and (motion_type == 'squats' or motion_type == 'sit_to_stand')): if motion_type == 'squats': - times_window = segmentSquats(pathMotionFile, visualize=True) + times_window = segment_squats(pathMotionFile, visualize=True) elif motion_type == 'sit_to_stand': - _, _, times_window = segmentSTS(pathMotionFile, visualize=True) + _, _, times_window = segment_STS(pathMotionFile, visualize=True) time_window = times_window[repetition] settings['repetition'] = repetition else: diff --git a/utils.py b/utils.py index faf720f3..7f62b7bf 100644 --- a/utils.py +++ b/utils.py @@ -58,7 +58,7 @@ def get_created_at(trial): sessionJson['trials'].sort(key=get_created_at) return sessionJson - + # Returns a list of all sessions of the user. def get_user_sessions(): sessions = requests.get( @@ -136,6 +136,17 @@ def get_model_and_metadata(session_id, session_path): return modelName +def get_model_name_from_metadata(sessionFolder,appendText='_scaled'): + metadataPath = os.path.join(sessionFolder,'sessionMetadata.yaml') + + if os.path.exists(metadataPath): + metadata = import_metadata(os.path.join(sessionFolder,'sessionMetadata.yaml')) + modelName = metadata['openSimModel'] + appendText + '.osim' + else: + raise Exception('Session metadata not found, could not identify OpenSim model.') + + return modelName + def get_motion_data(trial_id, session_path): trial = get_trial_json(trial_id) @@ -245,6 +256,32 @@ def download_kinematics(session_id, folder=None, trialNames=None): return loadedTrialNames, modelName +# Download pertinent trial data. +def download_trial(trial_id, folder, session_id=None): + + trial = get_trial_json(trial_id) + if session_id is None: + session_id = trial['session_id'] + + os.makedirs(folder,exist_ok=True) + + # download model + get_model_and_metadata(session_id, folder) + + # download trc and mot + get_motion_data(trial_id,folder) + + return trial['name'] + + +# Get trial ID from name. +def get_trial_id(session_id,trial_name): + session = get_session_json(session_id) + + trial_id = [t['id'] for t in session['trials'] if t['name'] == trial_name] + + return trial_id[0] + # %% Storage file to numpy array. def storage_to_numpy(storage_file, excess_header_entries=0): """Returns the data from a storage file in a numpy format. Skips all lines @@ -359,7 +396,6 @@ def numpy_to_storage(labels, data, storage_file, datatype=None): f.close() - def download_videos_from_server(session_id,trial_id, isCalibration=False, isStaticPose=False, trial_name= None, session_path = None): @@ -595,4 +631,5 @@ def zipdir(path, ziph): # Write zip as a result to last trial for now if writeToDB: post_file_to_trial(session_zip,dynamic_ids[-1],tag='session_zip', - device_id='all') \ No newline at end of file + device_id='all') + \ No newline at end of file diff --git a/utilsKinematics.py b/utilsKinematics.py index 82615443..c3bcf996 100644 --- a/utilsKinematics.py +++ b/utilsKinematics.py @@ -20,16 +20,19 @@ import os import opensim +import utils import numpy as np import pandas as pd import scipy.interpolate as interpolate + from utilsProcessing import lowPassFilter +from utilsTRC import trc_2_dict class kinematics: - def __init__(self, dataDir, trialName, - modelName='LaiUhlrich2022_scaled', + def __init__(self, sessionDir, trialName, + modelName=None, lowpass_cutoff_frequency_for_coordinate_values=-1): self.lowpass_cutoff_frequency_for_coordinate_values = ( @@ -37,13 +40,25 @@ def __init__(self, dataDir, trialName, # Model. opensim.Logger.setLevelString('error') - modelPath = os.path.join(dataDir, 'OpenSimData', 'Model', + + modelBasePath = os.path.join(sessionDir, 'OpenSimData', 'Model') + # Load model if specified, otherwise load the one that was on server + if modelName is None: + modelNameFromMetadata = utils.get_model_name_from_metadata(sessionDir) + modelPath = os.path.join(modelBasePath,modelNameFromMetadata) + else: + modelPath = os.path.join(modelBasePath, '{}.osim'.format(modelName)) + + # make sure model exists + if not os.path.exists(modelPath): + raise Exception('Model path: ' + modelPath + ' does not exist.') + self.model = opensim.Model(modelPath) self.model.initSystem() # Motion file with coordinate values. - motionPath = os.path.join(dataDir, 'OpenSimData', 'Kinematics', + motionPath = os.path.join(sessionDir, 'OpenSimData', 'Kinematics', '{}.mot'.format(trialName)) # Create time-series table with coordinate values. @@ -53,6 +68,9 @@ def __init__(self, dataDir, trialName, tableProcessor.append(opensim.TabOpUseAbsoluteStateNames()) self.time = np.asarray(self.table.getIndependentColumn()) + # Initialize the state trajectory. We will set it in other functions + # if it is needed. + self._stateTrajectory = None # Filter coordinate values. if lowpass_cutoff_frequency_for_coordinate_values > 0: @@ -101,11 +119,7 @@ def __init__(self, dataDir, trialName, if not stateVariableNameStr in existingLabels: vec_0 = opensim.Vector([0] * self.table.getNumRows()) self.table.appendColumn(stateVariableNameStr, vec_0) - - # Set state trajectory - self.stateTrajectory = opensim.StatesTrajectory.createFromStatesTable( - self.model, self.table) - + # Number of muscles. self.nMuscles = 0 self.forceSet = self.model.getForceSet() @@ -120,15 +134,13 @@ def __init__(self, dataDir, trialName, self.coordinates = [self.coordinateSet.get(i).getName() for i in range(self.nCoordinates)] - # TODO: hard coded - # Translational coordinates. - columnTrLabels = [ - 'pelvis_tx', 'pelvis_ty', 'pelvis_tz'] + # Find rotational and translational coordinates. self.idxColumnTrLabels = [ - self.columnLabels.index(i) for i in columnTrLabels] + self.columnLabels.index(i) for i in self.coordinates if \ + self.coordinateSet.get(i).getMotionType() == 2] self.idxColumnRotLabels = [ - self.columnLabels.index(i) for i in self.columnLabels - if not i in columnTrLabels] + self.columnLabels.index(i) for i in self.coordinates if \ + self.coordinateSet.get(i).getMotionType() == 1] # TODO: hard coded self.rootCoordinates = [ @@ -142,7 +154,30 @@ def __init__(self, dataDir, trialName, 'elbow_flex_r', 'pro_sup_r', 'arm_flex_l', 'arm_add_l', 'arm_rot_l', 'elbow_flex_l', 'pro_sup_l'] - + + # Only set the state trajectory when needed because it is slow. + def stateTrajectory(self): + if self._stateTrajectory is None: + self._stateTrajectory = ( + opensim.StatesTrajectory.createFromStatesTable( + self.model, self.table)) + return self._stateTrajectory + + def get_marker_dict(self, session_dir, trial_name, + lowpass_cutoff_frequency = -1): + + trcFilePath = os.path.join(session_dir, + 'MarkerData', + '{}.trc'.format(trial_name)) + + markerDict = trc_2_dict(trcFilePath) + if lowpass_cutoff_frequency > 0: + markerDict['markers'] = { + marker_name: lowPassFilter(self.time, data, lowpass_cutoff_frequency) + for marker_name, data in markerDict['markers'].items()} + + return markerDict + def get_coordinate_values(self, in_degrees=True, lowpass_cutoff_frequency=-1): @@ -224,14 +259,14 @@ def get_muscle_tendon_lengths(self, lowpass_cutoff_frequency=-1): # Compute muscle-tendon lengths. lMT = np.zeros((self.table.getNumRows(), self.nMuscles)) for i in range(self.table.getNumRows()): - self.model.realizePosition(self.stateTrajectory[i]) + self.model.realizePosition(self.stateTrajectory()[i]) if i == 0: muscleNames = [] for m in range(self.forceSet.getSize()): c_force_elt = self.forceSet.get(m) if 'Muscle' in c_force_elt.getConcreteClassName(): cObj = opensim.Muscle.safeDownCast(c_force_elt) - lMT[i,m] = cObj.getLength(self.stateTrajectory[i]) + lMT[i,m] = cObj.getLength(self.stateTrajectory()[i]) if i == 0: muscleNames.append(c_force_elt.getName()) @@ -253,7 +288,7 @@ def get_moment_arms(self, lowpass_cutoff_frequency=-1): dM = np.zeros((self.table.getNumRows(), self.nMuscles, self.nCoordinates)) for i in range(self.table.getNumRows()): - self.model.realizePosition(self.stateTrajectory[i]) + self.model.realizePosition(self.stateTrajectory()[i]) if i == 0: muscleNames = [] for m in range(self.forceSet.getSize()): @@ -280,7 +315,7 @@ def get_moment_arms(self, lowpass_cutoff_frequency=-1): coordinate = self.coordinateSet.get( self.coordinates.index(coord)) dM[i, m, c] = cObj.computeMomentArm( - self.stateTrajectory[i], coordinate) + self.stateTrajectory()[i], coordinate) # Clean numerical artefacts (ie, moment arms smaller than 1e-5 m). dM[np.abs(dM) < 1e-5] = 0 @@ -307,11 +342,11 @@ def compute_center_of_mass(self): self.com_values = np.zeros((self.table.getNumRows(),3)) self.com_speeds = np.zeros((self.table.getNumRows(),3)) for i in range(self.table.getNumRows()): - self.model.realizeVelocity(self.stateTrajectory[i]) + self.model.realizeVelocity(self.stateTrajectory()[i]) self.com_values[i,:] = self.model.calcMassCenterPosition( - self.stateTrajectory[i]).to_numpy() + self.stateTrajectory()[i]).to_numpy() self.com_speeds[i,:] = self.model.calcMassCenterVelocity( - self.stateTrajectory[i]).to_numpy() + self.stateTrajectory()[i]).to_numpy() def get_center_of_mass_values(self, lowpass_cutoff_frequency=-1): @@ -370,4 +405,4 @@ def get_center_of_mass_accelerations(self, lowpass_cutoff_frequency=-1): columns = ['time'] + ['x','y','z'] com_accelerations = pd.DataFrame(data=data, columns=columns) - return com_accelerations + return com_accelerations \ No newline at end of file diff --git a/utilsPlotting.py b/utilsPlotting.py index 92f01d2e..fe9ea28e 100644 --- a/utilsPlotting.py +++ b/utilsPlotting.py @@ -113,4 +113,97 @@ def plot_dataframe(dataframes, x=None, y=[], xlabel=None, ylabel=None, # Show plot (needed if running through terminal). plt.show() - \ No newline at end of file + + +def plot_dataframe_with_shading(mean_dataframe, sd_dataframe=None, y=None, + leg=None, xlabel=None, title=None, legend_entries=None): + if not isinstance(mean_dataframe, list): + mean_dataframe = [mean_dataframe] + + if sd_dataframe is not None: + if not isinstance(sd_dataframe, list): + sd_dataframe = [sd_dataframe] + + if not isinstance(leg, list): + leg = [leg] * len(mean_dataframe) + + if y is None: + y = [col for col in mean_dataframe[0].columns if col != 'time'] + + columns = [col for col in y if not any(sub in col for sub in ['_beta', 'mtp', 'time'])] + + if leg[0] == 'r': + columns = [col for col in columns if not col.endswith('_l')] + elif leg[0] == 'l': + columns = [col for col in columns if not col.endswith('_r')] + + num_columns = len(columns) + num_rows = (num_columns + 3) // 4 # Always 4 columns per row + + colormap = plt.cm.get_cmap('viridis', len(mean_dataframe)) + + fig, axes = plt.subplots(num_rows, 4, figsize=(12, 8)) + axes = axes.flatten() + + for i, column in enumerate(columns): + row = i // 4 + ax = axes[i] + + for j, (mean_df, sd_df) in enumerate(zip(mean_dataframe, sd_dataframe)): + if len(mean_dataframe) > 1: + color = colormap(j) + else: + color = 'black' + + if leg[j] is not None and (column.endswith('_r') or column.endswith('_l')): + col=column[:-2] + '_' + leg[j] + colLabel = column[:-2] + else: + col = column + colLabel = column + + mean_values = mean_df[col] + + if legend_entries is None: + thisLegend = [] + else: + thisLegend = legend_entries[j] + + ax.plot(mean_values, color=color, label=thisLegend) + + # Check if sd_df is not None before plotting + if sd_df is not None: + sd_column = col + if sd_column in sd_df.columns: + sd_values = sd_df[sd_column] + ax.fill_between( + range(len(mean_values)), + mean_values - sd_values, + mean_values + sd_values, + color=color, + alpha=0.3, + linewidth=0, # Remove surrounding line + ) + + + ax.set_xlabel(xlabel if row == num_rows - 1 else None, fontsize=12) + ax.set_ylabel(colLabel, fontsize=12) + + # Increase font size for axis labels + ax.tick_params(axis='both', which='major', labelsize=10) + + # Create the legend in the first subplot if legend_entries is provided + if legend_entries: + axes[0].legend() + + # Remove any unused subplots + if num_columns < num_rows * 4: + for i in range(num_columns, num_rows * 4): + fig.delaxes(axes[i]) + + # Title + if title is not None: + fig.suptitle(title) + + plt.tight_layout() + plt.show() diff --git a/utilsProcessing.py b/utilsProcessing.py index f2b6f0a2..91139c93 100644 --- a/utilsProcessing.py +++ b/utilsProcessing.py @@ -19,12 +19,16 @@ ''' import os +pathFile = os.path.dirname(os.path.realpath(__file__)) +import sys +sys.path.append(os.path.join(pathFile, 'ActivityAnalyses')) + import logging import opensim import numpy as np from scipy import signal import matplotlib.pyplot as plt -from utils import storage_to_dataframe +from utils import storage_to_dataframe, download_trial, get_trial_id def lowPassFilter(time, data, lowpass_cutoff_frequency, order=4): @@ -35,10 +39,25 @@ def lowPassFilter(time, data, lowpass_cutoff_frequency, order=4): return dataFilt +# %% Segment gait +def segment_gait(session_id, trial_name, data_folder, gait_cycles_from_end=0): + + # Segmentation is done in the gait_analysis class + from gait_analysis import gait_analysis + + gait = gait_analysis(os.path.join(data_folder,session_id), trial_name, + n_gait_cycles=-1) + heelstrikeTimes = gait.gaitEvents['ipsilateralTime'][gait_cycles_from_end,(0,2)].tolist() + + return heelstrikeTimes, gait + # %% Segment squats. -def segmentSquats(ikFilePath, pelvis_ty=None, timeVec=None, visualize=False, +def segment_squats(ikFilePath, pelvis_ty=None, timeVec=None, visualize=False, filter_pelvis_ty=True, cutoff_frequency=4, height=.2): + # TODO: eventually, this belongs in a squat_analysis class and should take + # the form of segment_gait + # Extract pelvis_ty if not given. if pelvis_ty is None and timeVec is None: ikResults = storage_to_dataframe(ikFilePath,headers={'pelvis_ty'}) @@ -97,10 +116,13 @@ def segmentSquats(ikFilePath, pelvis_ty=None, timeVec=None, visualize=False, from delayed start to corresponding periodic end in terms of vertical pelvis position. ''' -def segmentSTS(ikFilePath, pelvis_ty=None, timeVec=None, velSeated=0.3, +def segment_STS(ikFilePath, pelvis_ty=None, timeVec=None, velSeated=0.3, velStanding=0.15, visualize=False, filter_pelvis_ty=True, cutoff_frequency=4, delay=0.1): + # TODO: eventually, this belongs in a sts_analysis class and should take + # the form of segment_gait + # Extract pelvis_ty if not given. if pelvis_ty is None and timeVec is None: ikResults = storage_to_dataframe(ikFilePath,headers={'pelvis_ty'}) @@ -204,7 +226,7 @@ def segmentSTS(ikFilePath, pelvis_ty=None, timeVec=None, velSeated=0.3, # wrapping giving rise to bad muscle-tendon lengths and moment arms. Changes # are made for the gmax1, iliacus, and psoas. Changes are documented in # modelAdjustment.log. -def adjustMuscleWrapping( +def adjust_muscle_wrapping( baseDir, dataDir, subject, poseDetector='DefaultPD', cameraSetup='DefaultModel', OpenSimModel="LaiUhlrich2022", overwrite=False): @@ -480,7 +502,7 @@ def getMomentArms(model, poses, muscleName, coordinateForMomentArm): return momentArms # %% Generate model with contacts. -def generateModelWithContacts( +def generate_model_with_contacts( dataDir, subject, poseDetector='DefaultPD', cameraSetup='DefaultModel', OpenSimModel="LaiUhlrich2022", setPatellaMasstoZero=True, overwrite=False): diff --git a/utilsTRC.py b/utilsTRC.py new file mode 100644 index 00000000..c0902956 --- /dev/null +++ b/utilsTRC.py @@ -0,0 +1,298 @@ +"""Manages the movement and use of data files.""" + +import os +import warnings +from scipy.spatial.transform import Rotation as R + +import numpy as np +from numpy.lib.recfunctions import append_fields + +class TRCFile(object): + """A plain-text file format for storing motion capture marker trajectories. + TRC stands for Track Row Column. + + The metadata for the file is stored in attributes of this object. + + See + http://simtk-confluence.stanford.edu:8080/display/OpenSim/Marker+(.trc)+Files + for more information. + + """ + def __init__(self, fpath=None, **kwargs): + #path=None, + #data_rate=None, + #camera_rate=None, + #num_frames=None, + #num_markers=None, + #units=None, + #orig_data_rate=None, + #orig_data_start_frame=None, + #orig_num_frames=None, + #marker_names=None, + #time=None, + #): + """ + Parameters + ---------- + fpath : str + Valid file path to a TRC (.trc) file. + + """ + self.marker_names = [] + if fpath != None: + self.read_from_file(fpath) + else: + for k, v in kwargs.items(): + setattr(self, k, v) + + def read_from_file(self, fpath): + # Read the header lines / metadata. + # --------------------------------- + # Split by any whitespace. + # TODO may cause issues with paths that have spaces in them. + f = open(fpath) + # These are lists of each entry on the first few lines. + first_line = f.readline().split() + # Skip the 2nd line. + f.readline() + third_line = f.readline().split() + fourth_line = f.readline().split() + f.close() + + # First line. + if len(first_line) > 3: + self.path = first_line[3] + else: + self.path = '' + + # Third line. + self.data_rate = float(third_line[0]) + self.camera_rate = float(third_line[1]) + self.num_frames = int(third_line[2]) + self.num_markers = int(third_line[3]) + self.units = third_line[4] + self.orig_data_rate = float(third_line[5]) + self.orig_data_start_frame = int(third_line[6]) + self.orig_num_frames = int(third_line[7]) + + # Marker names. + # The first and second column names are 'Frame#' and 'Time'. + self.marker_names = fourth_line[2:] + + len_marker_names = len(self.marker_names) + if len_marker_names != self.num_markers: + warnings.warn('Header entry NumMarkers, %i, does not ' + 'match actual number of markers, %i. Changing ' + 'NumMarkers to match actual number.' % ( + self.num_markers, len_marker_names)) + self.num_markers = len_marker_names + + # Load the actual data. + # --------------------- + col_names = ['frame_num', 'time'] + # This naming convention comes from OpenSim's Inverse Kinematics tool, + # when it writes model marker locations. + for mark in self.marker_names: + col_names += [mark + '_tx', mark + '_ty', mark + '_tz'] + dtype = {'names': col_names, + 'formats': ['int'] + ['float64'] * (3 * self.num_markers + 1)} + usecols = [i for i in range(3 * self.num_markers + 1 + 1)] + self.data = np.loadtxt(fpath, delimiter='\t', skiprows=5, dtype=dtype, + usecols=usecols) + self.time = self.data['time'] + + # Check the number of rows. + n_rows = self.time.shape[0] + if n_rows != self.num_frames: + warnings.warn('%s: Header entry NumFrames, %i, does not ' + 'match actual number of frames, %i, Changing ' + 'NumFrames to match actual number.' % (fpath, + self.num_frames, n_rows)) + self.num_frames = n_rows + + def __getitem__(self, key): + """See `marker()`. + + """ + return self.marker(key) + + def units(self): + return self.units + + def time(self): + this_dat = np.empty((self.num_frames, 1)) + this_dat[:, 0] = self.time + return this_dat + + def marker(self, name): + """The trajectory of marker `name`, given as a `self.num_frames` x 3 + array. The order of the columns is x, y, z. + + """ + this_dat = np.empty((self.num_frames, 3)) + this_dat[:, 0] = self.data[name + '_tx'] + this_dat[:, 1] = self.data[name + '_ty'] + this_dat[:, 2] = self.data[name + '_tz'] + return this_dat + + def add_marker(self, name, x, y, z): + """Add a marker, with name `name` to the TRCFile. + + Parameters + ---------- + name : str + Name of the marker; e.g., 'R.Hip'. + x, y, z: array_like + Coordinates of the marker trajectory. All 3 must have the same + length. + + """ + if (len(x) != self.num_frames or len(y) != self.num_frames or len(z) != + self.num_frames): + raise Exception('Length of data (%i, %i, %i) is not ' + 'NumFrames (%i).', len(x), len(y), len(z), self.num_frames) + self.marker_names += [name] + self.num_markers += 1 + if not hasattr(self, 'data'): + self.data = np.array(x, dtype=[('%s_tx' % name, 'float64')]) + self.data = append_fields(self.data, + ['%s_t%s' % (name, s) for s in 'yz'], + [y, z], usemask=False) + else: + self.data = append_fields(self.data, + ['%s_t%s' % (name, s) for s in 'xyz'], + [x, y, z], usemask=False) + + def marker_at(self, name, time): + x = np.interp(time, self.time, self.data[name + '_tx']) + y = np.interp(time, self.time, self.data[name + '_ty']) + z = np.interp(time, self.time, self.data[name + '_tz']) + return [x, y, z] + + def marker_exists(self, name): + """ + Returns + ------- + exists : bool + Is the marker in the TRCFile? + + """ + return name in self.marker_names + + def write(self, fpath): + """Write this TRCFile object to a TRC file. + + Parameters + ---------- + fpath : str + Valid file path to which this TRCFile is saved. + + """ + f = open(fpath, 'w') + + # Line 1. + f.write('PathFileType 4\t(X/Y/Z) %s\n' % os.path.split(fpath)[0]) + + # Line 2. + f.write('DataRate\tCameraRate\tNumFrames\tNumMarkers\t' + 'Units\tOrigDataRate\tOrigDataStartFrame\tOrigNumFrames\n') + + # Line 3. + f.write('%.1f\t%.1f\t%i\t%i\t%s\t%.1f\t%i\t%i\n' % ( + self.data_rate, self.camera_rate, self.num_frames, + self.num_markers, self.units, self.orig_data_rate, + self.orig_data_start_frame, self.orig_num_frames)) + + # Line 4. + f.write('Frame#\tTime\t') + for imark in range(self.num_markers): + f.write('%s\t\t\t' % self.marker_names[imark]) + f.write('\n') + + # Line 5. + f.write('\t\t') + for imark in np.arange(self.num_markers) + 1: + f.write('X%i\tY%s\tZ%s\t' % (imark, imark, imark)) + f.write('\n') + + # Line 6. + f.write('\n') + + # Data. + for iframe in range(self.num_frames): + f.write('%i' % (iframe + 1)) + f.write('\t%.7f' % self.time[iframe]) + for mark in self.marker_names: + idxs = [mark + '_tx', mark + '_ty', mark + '_tz'] + f.write('\t%.7f\t%.7f\t%.7f' % tuple( + self.data[coln][iframe] for coln in idxs)) + f.write('\n') + + f.close() + + def add_noise(self, noise_width): + """ add random noise to each component of the marker trajectory + The noise mean will be zero, with the noise_width being the + standard deviation. + + noise_width : int + """ + for imarker in range(self.num_markers): + components = ['_tx', '_ty', '_tz'] + for iComponent in range(3): + # generate noise + noise = np.random.normal(0, noise_width, self.num_frames) + # add noise to each component of marker data. + self.data[self.marker_names[imarker] + components[iComponent]] += noise + + def rotate(self, axis, value): + """ rotate the data. + + axis : rotation axis + value : angle in degree + """ + for imarker in range(self.num_markers): + + temp = np.zeros((self.num_frames, 3)) + temp[:,0] = self.data[self.marker_names[imarker] + '_tx'] + temp[:,1] = self.data[self.marker_names[imarker] + '_ty'] + temp[:,2] = self.data[self.marker_names[imarker] + '_tz'] + + r = R.from_euler(axis, value, degrees=True) + temp_rot = r.apply(temp) + + self.data[self.marker_names[imarker] + '_tx'] = temp_rot[:,0] + self.data[self.marker_names[imarker] + '_ty'] = temp_rot[:,1] + self.data[self.marker_names[imarker] + '_tz'] = temp_rot[:,2] + + def offset(self, axis, value): + """ offset the data. + + axis : rotation axis + value : offset in m + """ + for imarker in range(self.num_markers): + if axis.lower() == 'x': + self.data[self.marker_names[imarker] + '_tx'] += value + elif axis.lower() == 'y': + self.data[self.marker_names[imarker] + '_ty'] += value + elif axis.lower() == 'z': + self.data[self.marker_names[imarker] + '_tz'] += value + else: + raise ValueError("Axis not recognized") + +def trc_2_dict(pathFile, rotation=None): + # rotation is a dict, eg. {'y':90} with axis, angle for rotation + trc_dict = {} + trc_file = TRCFile(pathFile) + trc_dict['time'] = trc_file.time + trc_dict['marker_names'] = trc_file.marker_names + trc_dict['markers'] = {} + + if rotation != None: + for axis,angle in rotation.items(): + trc_file.rotate(axis,angle) + for count, marker in enumerate(trc_dict['marker_names']): + trc_dict['markers'][marker] = trc_file.marker(marker) + + return trc_dict