Skip to content

Commit

Permalink
Create default log directory if not exists
Browse files Browse the repository at this point in the history
  • Loading branch information
calebbraun committed Dec 4, 2018
1 parent 8fd1cdc commit f6a764a
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions cassandra/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from cassandra.constants import TAG_CONFIG, SUPERVISOR_RANK
from cassandra.compfactory import create_component
import logging
import os


def bootstrap_mp(argvals):
Expand All @@ -52,21 +53,21 @@ def bootstrap_mp(argvals):

if argvals.logdir is None:
logdir = 'logs'
os.makedirs(logdir, exist_ok=True)
else:
logdir = argvals.logdir

logging.basicConfig(filename=f'{logdir}/cassandra-{rank}.log', level=argvals.loglvl,
filemode='w')
# print the location of the logs so that it will appear in the
# output file of batch jobs.
print(f'\nThis is Cassandra version {__version__}. Log output will be written to {logdir}.')


if rank == SUPERVISOR_RANK:
# Print the location of the logs so that it will appear in the
# output file of batch jobs.
print(f'\nThis is Cassandra version {__version__}. Log output will be written to {logdir}.')
my_assignment = distribute_assignments_supervisor(argvals)
else:
my_assignment = distribute_assignments_worker(argvals)


# my_assignment will be a dictionary of configuration sections assigned to
# this process. We need to create and initialize the components assigned to
# us. We also need to create a RAB.
Expand All @@ -78,7 +79,7 @@ def bootstrap_mp(argvals):
component = create_component(section, cap_tbl)
component.params.update(conf)
component.finalize_parsing()
comps.append(component)
comps.append(component)

# Next we need to compile a table of remote capabilities. To do this, each
# component needs to distribute its local capability table to all the other
Expand Down Expand Up @@ -126,7 +127,7 @@ def distribute_assignments_worker(argvals):
def distribute_assignments_supervisor(argvals):
"""Parse config file and distribute component assignments
:param argvals: Arguments structure parsed by argparse.
:param argvals: Arguments structure parsed by argparse.
:return: Dictionary of component definitions for components assigned to this
process (i.e., the supervisor).
Expand All @@ -150,15 +151,15 @@ def distribute_assignments_supervisor(argvals):

config = ConfigObj(argvals.ctlfile)

# Get list of section names
# Get list of section names
section_names = list(config.keys())
try:
# Global section goes to everyone. Also, it's required, so check for it
# here
section_names.remove('Global')
except ValueError as e:
raise RuntimeError("Config file must have a '[Global]' section") from e

section_weights = [config[s].get('mp.weight', 1.0) for s in section_names]
name_weight = zip(section_names, section_weights)
section_names = [s[0] for s in sorted(name_weight, key=lambda x:x[1], reverse=True)]
Expand All @@ -169,10 +170,10 @@ def distribute_assignments_supervisor(argvals):

assignments = []
for i in range(nproc):
assignments.append({'Global':config['Global']})
assignments.append({'Global': config['Global']})
for section in section_names:
assignments[nextrank][section] = config[section]
nextrank = (nextrank+1)%nproc
nextrank = (nextrank+1) % nproc

# Distribute these assignments to the workers
for r in range(nproc):
Expand All @@ -183,7 +184,7 @@ def distribute_assignments_supervisor(argvals):
logging.debug(f'supervisor assignment: {assignments[SUPERVISOR_RANK]}')
return assignments[SUPERVISOR_RANK]


def finalize(rab, thread):
"""Finalization procedure for mp calculations.
Expand All @@ -200,11 +201,10 @@ def finalize(rab, thread):
"""

logging.debug(f'{rab.comm.Get_rank()} entering finalize.')

rab.comm.barrier()

rab.shutdown()
thread.join()

# End of finalize()

0 comments on commit f6a764a

Please sign in to comment.