Skip to content

Commit

Permalink
Optionally download files to EBS rather than use S3FS (#116)
Browse files Browse the repository at this point in the history
* First attempt at downloading files

* fix string reading of metadata

* remove files when you're done with them

* my code runs too fast

* use fields

* make channels into a list

* make directory for files to land in

* port to python2
  • Loading branch information
bethac07 authored Nov 10, 2020
1 parent 82dc8c6 commit c4a9f83
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 5 deletions.
1 change: 1 addition & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
MACHINE_TYPE = ['m4.xlarge']
MACHINE_PRICE = 0.10
EBS_VOL_SIZE = 30 # In GB. Minimum allowed is 22.
DOWNLOAD_FILES = 'False'

# DOCKER INSTANCE RUNNING ENVIRONMENT:
DOCKER_CORES = 1 # Number of CellProfiler processes to run inside a docker container
Expand Down
4 changes: 4 additions & 0 deletions python2worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ RUN \
RUN \
pip install watchtower==0.8.0

# Install pandas for optional file downloading

RUN pip install pandas

# SETUP NEW ENTRYPOINT

RUN mkdir -p /home/ubuntu/
Expand Down
62 changes: 59 additions & 3 deletions python2worker/cp-worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
NECESSARY_STRING = False
else:
NECESSARY_STRING = os.environ['NECESSARY_STRING']
if 'DOWNLOAD_FILES' not in os.environ:
DOWNLOAD_FILES = False
else:
DOWNLOAD_FILES = os.environ['DOWNLOAD_FILES']

localIn = '/home/ubuntu/local_input'


#################################
# CLASS TO HANDLE THE SQS QUEUE
Expand Down Expand Up @@ -159,8 +166,54 @@ def runCellProfiler(message):
logger.removeHandler(watchtowerlogger)
return 'SUCCESS'
except KeyError: #Returned if that folder does not exist
pass

pass

csv_name = os.path.join(DATA_ROOT,message['data_file'])

# Optional- download files
if DOWNLOAD_FILES:
if DOWNLOAD_FILES.lower() == 'true':
printandlog('Figuring which files to download', logger)
import pandas
s3 = boto3.resource('s3')
if not os.path.exists(localIn):
os.mkdir(localIn)
csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file']))
csv_in=csv_in.astype('str')
#Figure out what metadata fields we need in this experiment, as a dict
if type(message['Metadata'])==dict:
filter_dict = message['Metadata']
else:
filter_dict = {}
for eachMetadata in message['Metadata'].split(','):
filterkey, filterval = eachMetadata.split('=')
filter_dict[filterkey] = filterval
#Filter our CSV to just the rows CellProfiler will process, so that we can download only what we need
for eachfilter in filter_dict.keys():
csv_in = csv_in[csv_in[eachfilter] == filter_dict[eachfilter]]
#Figure out the actual file names and get them
channel_list = [x.split('FileName_')[1] for x in csv_in.columns if 'FileName' in x]
count = 0
printandlog('Downloading files', logger)
for channel in channel_list:
for field in range(csv_in.shape[0]):
full_old_file_name = os.path.join(list(csv_in['PathName_'+channel])[field],list(csv_in['FileName_'+channel])[field])
prefix_on_bucket = full_old_file_name.split(DATA_ROOT)[1][1:]
new_file_name = os.path.join(localIn,prefix_on_bucket)
if not os.path.exists(os.path.split(new_file_name)[0]):
os.makedirs(os.path.split(new_file_name)[0])
printandlog('made directory '+os.path.split(new_file_name)[0],logger)
s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name)
count +=1
printandlog('Downloaded '+str(count)+' files',logger)
local_csv_name = os.path.join(localIn,os.path.split(csv_name)[1])
if not os.path.exists(local_csv_name):
csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file']))
csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True)
csv_in.to_csv(local_csv_name,index=False)
print('Wrote updated CSV')
csv_name = local_csv_name

# Build and run CellProfiler command
cpDone = localOut + '/cp.is.done'
cp2 = False
Expand All @@ -173,7 +226,7 @@ def runCellProfiler(message):
cmdstem = 'cellprofiler -c -r '
if message['pipeline'][-3:]!='.h5':
cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone
cmd += ' --data-file=%(DATA)s/%(FL)s '
cmd += ' --data-file='+csv_name+' '
cmd += '-g %(Metadata)s'
else:
cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone + ' -g %(Metadata)s'
Expand All @@ -189,6 +242,9 @@ def runCellProfiler(message):
# Get the outputs and move them to S3
if os.path.isfile(cpDone):
time.sleep(30)
if os.path.exists(localIn):
import shutil
shutil.rmtree(localIn, ignore_errors=True)
mvtries=0
while mvtries <3:
try:
Expand Down
7 changes: 6 additions & 1 deletion run.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,11 @@ def generate_task_definition(AWS_PROFILE):
{
"name": "NECESSARY_STRING",
"value": NECESSARY_STRING
}
},
{
"name": "DOWNLOAD_FILES",
"value": DOWNLOAD_FILES
}
]
return task_definition

Expand Down Expand Up @@ -440,6 +444,7 @@ def startCluster():

# Step 6: Monitor the creation of the instances until all are present
status = ec2client.describe_spot_fleet_instances(SpotFleetRequestId=requestInfo['SpotFleetRequestId'])
time.sleep(15) # This is now too fast, so sometimes the spot fleet request history throws an error!
while len(status['ActiveInstances']) < CLUSTER_MACHINES:
# First check to make sure there's not a problem
errorcheck = ec2client.describe_spot_fleet_request_history(SpotFleetRequestId=requestInfo['SpotFleetRequestId'], EventType='error', StartTime=thistime)
Expand Down
4 changes: 4 additions & 0 deletions worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ RUN python3.8 -m pip install boto3

RUN python3.8 -m pip install watchtower

# Install pandas for optional file downloading

RUN python3.8 -m pip install pandas

# SETUP NEW ENTRYPOINT

RUN mkdir -p /home/ubuntu/
Expand Down
58 changes: 57 additions & 1 deletion worker/cp-worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
NECESSARY_STRING = False
else:
NECESSARY_STRING = os.environ['NECESSARY_STRING']
if 'DOWNLOAD_FILES' not in os.environ:
DOWNLOAD_FILES = False
else:
DOWNLOAD_FILES = os.environ['DOWNLOAD_FILES']

localIn = '/home/ubuntu/local_input'


#################################
# CLASS TO HANDLE THE SQS QUEUE
Expand Down Expand Up @@ -161,12 +168,58 @@ def runCellProfiler(message):
except KeyError: #Returned if that folder does not exist
pass

csv_name = os.path.join(DATA_ROOT,message['data_file'])

# Optional- download files
if DOWNLOAD_FILES:
if DOWNLOAD_FILES.lower() == 'true':
printandlog('Figuring which files to download', logger)
import pandas
s3 = boto3.resource('s3')
if not os.path.exists(localIn):
os.mkdir(localIn)
csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file']))
csv_in=csv_in.astype('str')
#Figure out what metadata fields we need in this experiment, as a dict
if type(message['Metadata'])==dict:
filter_dict = message['Metadata']
else:
filter_dict = {}
for eachMetadata in message['Metadata'].split(','):
filterkey, filterval = eachMetadata.split('=')
filter_dict[filterkey] = filterval
#Filter our CSV to just the rows CellProfiler will process, so that we can download only what we need
for eachfilter in filter_dict.keys():
csv_in = csv_in[csv_in[eachfilter] == filter_dict[eachfilter]]
#Figure out the actual file names and get them
channel_list = [x.split('FileName_')[1] for x in csv_in.columns if 'FileName' in x]
count = 0
printandlog('Downloading files', logger)
for channel in channel_list:
for field in range(csv_in.shape[0]):
full_old_file_name = os.path.join(list(csv_in['PathName_'+channel])[field],list(csv_in['FileName_'+channel])[field])
prefix_on_bucket = full_old_file_name.split(DATA_ROOT)[1][1:]
new_file_name = os.path.join(localIn,prefix_on_bucket)
if not os.path.exists(os.path.split(new_file_name)[0]):
os.makedirs(os.path.split(new_file_name)[0])
printandlog('made directory '+os.path.split(new_file_name)[0],logger)
s3.meta.client.download_file(AWS_BUCKET,prefix_on_bucket,new_file_name)
count +=1
printandlog('Downloaded '+str(count)+' files',logger)
local_csv_name = os.path.join(localIn,os.path.split(csv_name)[1])
if not os.path.exists(local_csv_name):
csv_in = pandas.read_csv(os.path.join(DATA_ROOT,message['data_file']))
csv_in.replace(DATA_ROOT,localIn,regex=True, inplace=True)
csv_in.to_csv(local_csv_name,index=False)
print('Wrote updated CSV')
csv_name = local_csv_name

# Build and run CellProfiler command
cpDone = localOut + '/cp.is.done'
cmdstem = 'cellprofiler -c -r '
if message['pipeline'][-3:]!='.h5':
cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone
cmd += ' --data-file=%(DATA)s/%(FL)s '
cmd += ' --data-file='+csv_name+' '
cmd += '-g %(Metadata)s'
else:
cmd = cmdstem + '-p %(DATA)s/%(PL)s -i %(DATA)s/%(IN)s -o %(OUT)s -d ' + cpDone + ' -g %(Metadata)s'
Expand All @@ -182,6 +235,9 @@ def runCellProfiler(message):
# Get the outputs and move them to S3
if os.path.isfile(cpDone):
time.sleep(30)
if os.path.exists(localIn):
import shutil
shutil.rmtree(localIn, ignore_errors=True)
mvtries=0
while mvtries <3:
try:
Expand Down

0 comments on commit c4a9f83

Please sign in to comment.