Skip to content

Commit

Permalink
Merge pull request #46 from JGCRI/loggin
Browse files Browse the repository at this point in the history
Output messages now written through python loggers
  • Loading branch information
rplzzz authored Dec 4, 2018
2 parents 54a82f2 + 67b6c3e commit feae3b3
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 52 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ doc/*.gz
# PyCharm files
.idea/

# Output
# Output
*.nc
output/
batch-out/
Expand All @@ -66,5 +66,8 @@ build
dist
cassandra.egg-info

# ignore log dirs
logs/

# raw hector data not committed with this package
hector-outputstream*.csv
16 changes: 8 additions & 8 deletions cassandra/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#!/bin/env python
"""GCAM automation system package.
"""Cassandra model coupling framework
This package contains the components that implement the GCAM automation system,
along with utility functions that are common to all GCAM functional areas.
Several GCAM functional areas also have subpackages that implement calculations
specific to those areas; for example, gcam.water for water downscaling or
gcam.land for land use downscaling. The package also contains gcam_driver, a
stand-alone program for running the automation system.
This package contains the components that implement the Cassandra model
coupling framework. The package also contains cassandra_main.py, a
stand-alone program for running the framework.
"""

import pkg_resources

__all__ = ['util', 'water', 'components']
__version__ = pkg_resources.get_distribution('cassandra').version

__all__ = ['util', 'components']
54 changes: 43 additions & 11 deletions cassandra/cassandra_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,36 @@
import re
import threading
import argparse
import logging
import os


def bootstrap_sp(cfgfile_name):
def bootstrap_sp(argvals):
"""
Bootstrap the multithreaded (single processing) version of the
calculation.
:param cfgfile_name: Name of the configuration file
:param argvals: Command line arguments parsed by argparse.
:return: (component-list, capability-table)
"""

from configobj import ConfigObj
from cassandra import __version__
from cassandra.compfactory import create_component

# Configure logger
if argvals.logdir is None:
logging.basicConfig(stream=sys.stdout, level=argvals.loglvl)
logging.info(f'This is Cassandra version {__version__}.')
else:
os.makedirs(argvals.logdir, exist_ok=True)
logging.basicConfig(filename=f'{argvals.logdir}/cassandra.log',
level=argvals.loglvl, filemode='w')
# Write to screen the location of the logging output
print(f'This is Cassandra version {__version__}. Output will be logged to {argvals.logdir}/cassandra.log')

cfgfile_name = argvals.ctlfile

# initialize the structures that will receive the data we are
# parsing from the file
capability_table = {}
Expand All @@ -53,12 +69,23 @@ def bootstrap_sp(cfgfile_name):


def main(argvals):

# Set the appropriate logging level
# NB: You MUST NOT call any logging functions until either bootstrap_mp
# or bootstrap_sp has been called.
if argvals.verbose:
argvals.loglvl = logging.DEBUG
elif argvals.quiet:
argvals.loglvl = logging.WARNING
else:
argvals.loglvl = logging.INFO

if argvals.mp:
# See notes in mp.py about side effects of importing that module.
from cassandra.mp import bootstrap_mp, finalize
(component_list, cap_table) = bootstrap_mp(argvals)
else:
(component_list, cap_table) = bootstrap_sp(argvals.ctlfile)
(component_list, cap_table) = bootstrap_sp(argvals)

# We will look up "general" in the cap_table and process any
# global parameters here, but in the current version we don't
Expand All @@ -67,7 +94,7 @@ def main(argvals):
threads = []

for component in component_list:
print(f"running {str(component.__class__)}")
logging.info(f"running {str(component.__class__)}")
threads.append(component.run())

# Wait for all component threads to complete before printing end
Expand All @@ -86,45 +113,50 @@ def main(argvals):
# is still running. Once again take advantage of the fact that
# if the RAB is present, it is always the first in the list.
nfail = 0

if argvals.mp:
reg_comps = component_list[1:]
rab_comp = component_list[0]
else:
reg_comps = component_list
rab_comp = None

for component in reg_comps:
if component.status != 1:
from logging import error
error(f'Component {str(component.__class__)} returned failure status\n')
nfail += 1

if rab_comp is not None and rab_comp.status != 0:
from logging import error
error('RAB has crashed or is otherwise not running.')
nfail += 1


if nfail == 0:
print('\n****************All components completed successfully.')
logging.info('\n****************All components completed successfully.')
else:
print(f'\n****************{nfail} components failed.')
logging.error(f'\n****************{nfail} components failed.')
raise RuntimeError(f'{nfail} components failed.')

# If this is a multiprocessing calculation, then we need to
# perform the finalization procedure
if argvals.mp:
finalize(component_list[0], threads[0])

print("\nFIN.")
logging.info("\nFIN.")

return nfail


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--mp', action='store_true', default=False, help='Use multiprocessing.')
parser.add_argument('-l', dest='logdir',
help='Directory for writing logfiles. Default = stdout for SP, "logs" for MP')
parser.add_argument('-v', dest='verbose', action='store_true', default=False,
help='Verbose mode: log output at DEBUG level (overrides -q).')
parser.add_argument('-q', dest='quiet', action='store_true', default=False,
help='Quiet mode: log output at WARNING level (overridden by -v).')
parser.add_argument('ctlfile', help='Name of the configuration file for the calculation.')

argvals = parser.parse_args()
Expand Down
28 changes: 12 additions & 16 deletions cassandra/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,12 @@
# relevant python component.

import os
import os.path
import re
import subprocess
import threading
import tempfile
import logging
import pkg_resources
import pandas as pd
from sys import stdout
from sys import stderr
from cassandra import util

# This class is here to make it easy for a class to ignore failures to
Expand Down Expand Up @@ -201,16 +198,15 @@ def run_component_wrapper(self):
# eventually try to implement co-simulations.
with self.condition:
try:
import logging
logging.info(f'starting {self.__class__}')
logging.debug(f'starting {self.__class__}')
rv = self.run_component()
if not rv == 0:
# possibly add some other error handling here.
msg = f"{self.__class__}: run_component returned error code {str(rv)}"
logging.error(msg)
raise RuntimeError(msg)
else:
logging.info(f"{self.__class__}: finished successfully.\n")
logging.debug(f"{self.__class__}: finished successfully.\n")

self.status = 1 # set success condition
except:
Expand All @@ -220,7 +216,7 @@ def run_component_wrapper(self):
finally:
self.condition.notify_all() # release any waiting threads

logging.info(f'completed {self.__class__}')
logging.debug(f'completed {self.__class__}')
# end of with block: lock on condition var released.

def fetch(self, capability):
Expand Down Expand Up @@ -273,7 +269,7 @@ def fetch(self, capability):
# statement tries to obtain the lock.
with self.condition:
if self.status == 0: # component hasn't run yet. Wait on it
print(f"\twaiting on {self.__class__}\n")
logging.debug(f"\twaiting on {self.__class__}\n")
self.condition.wait()
# end of with block: lock is released

Expand Down Expand Up @@ -388,8 +384,8 @@ def __init__(self, cap_tbl):
# this is a reference copy, so any entries added to params will also appear in results.
self.addresults('general', self.params)

print('General parameters as input:')
print(self.results['general'])
logging.debug('General parameters as input:')
logging.debug(self.results['general'])

# We need to allow gcamutil access to these parameters, since it doesn't otherwise know how to find the
# global params component. <- gross. we need a better way to do this.
Expand All @@ -410,7 +406,7 @@ def run_component(self):
if 'rgnconfig' in genrslt:
rgnconfig = genrslt['rgnconfig']
else:
stdout.write('[GlobalParamsComponent]: Using default region mapping (14 region)')
logging.warning('[GlobalParamsComponent]: Using default region mapping (14 region)')
rgnconfig = 'rgn14'
genrslt['rgnconfig'] = util.abspath(rgnconfig, genrslt['inputdir'])

Expand Down Expand Up @@ -501,7 +497,7 @@ def run_component(self):
dbxmlfile = match.group(1)
break

print(f"{self.__class__}: dbxmlfile = {dbxmlfile}")
logging.info(f"{self.__class__}: dbxmlfile = {dbxmlfile}")
# The file spec is a relative path, starting from the
# directory that contains the config file.
dbxmlfile = os.path.join(self.workdir, dbxmlfile)
Expand All @@ -510,7 +506,7 @@ def run_component(self):
if not self.clobber:
# This is not an error; it just means we can leave
# the existing output in place and return it.
print("GcamComponent: results exist and no clobber. Skipping.")
logging.info("GcamComponent: results exist and no clobber. Skipping.")
gcamrslt["changed"] = 0 # mark the cached results as clean
return 0
else:
Expand All @@ -531,7 +527,7 @@ def run_component(self):
self.addresults('gcam-core', gcamrslt)

# now we're ready to actually do the run. We don't check the return code; we let the run() method do that.
print(f"Running: {exe} -C{cfg} -L{logcfg}")
logging.info(f"Running: {exe} -C{cfg} -L{logcfg}")

if logfile is None:
return subprocess.call([exe, '-C'+cfg, '-L'+logcfg], cwd=self.workdir)
Expand Down Expand Up @@ -893,7 +889,7 @@ class DummyComponent(ComponentBase):
request_delays - list of time delays (ms) before each request is made
finish_delay - delay (ms) before the component finalizes and exports
except - Throw an exception with the parameter value just before
the component would have exited (this is used for testing
the component would have exited (this is used for testing
error handling).
"""

Expand Down
35 changes: 23 additions & 12 deletions cassandra/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
"""

from mpi4py import MPI
from cassandra import __version__
from cassandra.rab import RAB
from cassandra.constants import TAG_CONFIG, SUPERVISOR_RANK
from cassandra.compfactory import create_component
import logging
import os


def bootstrap_mp(argvals):
Expand All @@ -49,14 +51,24 @@ def bootstrap_mp(argvals):
world = MPI.COMM_WORLD
rank = world.Get_rank()

logging.basicConfig(filename=f'logs/cassandra-{rank}.log', level=logging.DEBUG)

if argvals.logdir is None:
logdir = 'logs'
else:
logdir = argvals.logdir

os.makedirs(logdir, exist_ok=True)

logging.basicConfig(filename=f'{logdir}/cassandra-{rank}.log', level=argvals.loglvl,
filemode='w')

if rank == SUPERVISOR_RANK:
# Print the location of the logs so that it will appear in the
# output file of batch jobs.
print(f'\nThis is Cassandra version {__version__}. Log output will be written to {logdir}.')
my_assignment = distribute_assignments_supervisor(argvals)
else:
my_assignment = distribute_assignments_worker(argvals)


# my_assignment will be a dictionary of configuration sections assigned to
# this process. We need to create and initialize the components assigned to
# us. We also need to create a RAB.
Expand All @@ -68,7 +80,7 @@ def bootstrap_mp(argvals):
component = create_component(section, cap_tbl)
component.params.update(conf)
component.finalize_parsing()
comps.append(component)
comps.append(component)

# Next we need to compile a table of remote capabilities. To do this, each
# component needs to distribute its local capability table to all the other
Expand Down Expand Up @@ -116,7 +128,7 @@ def distribute_assignments_worker(argvals):
def distribute_assignments_supervisor(argvals):
"""Parse config file and distribute component assignments
:param argvals: Arguments structure parsed by argparse.
:param argvals: Arguments structure parsed by argparse.
:return: Dictionary of component definitions for components assigned to this
process (i.e., the supervisor).
Expand All @@ -140,15 +152,15 @@ def distribute_assignments_supervisor(argvals):

config = ConfigObj(argvals.ctlfile)

# Get list of section names
# Get list of section names
section_names = list(config.keys())
try:
# Global section goes to everyone. Also, it's required, so check for it
# here
section_names.remove('Global')
except ValueError as e:
raise RuntimeError("Config file must have a '[Global]' section") from e

section_weights = [config[s].get('mp.weight', 1.0) for s in section_names]
name_weight = zip(section_names, section_weights)
section_names = [s[0] for s in sorted(name_weight, key=lambda x:x[1], reverse=True)]
Expand All @@ -159,10 +171,10 @@ def distribute_assignments_supervisor(argvals):

assignments = []
for i in range(nproc):
assignments.append({'Global':config['Global']})
assignments.append({'Global': config['Global']})
for section in section_names:
assignments[nextrank][section] = config[section]
nextrank = (nextrank+1)%nproc
nextrank = (nextrank+1) % nproc

# Distribute these assignments to the workers
for r in range(nproc):
Expand All @@ -173,7 +185,7 @@ def distribute_assignments_supervisor(argvals):
logging.debug(f'supervisor assignment: {assignments[SUPERVISOR_RANK]}')
return assignments[SUPERVISOR_RANK]


def finalize(rab, thread):
"""Finalization procedure for mp calculations.
Expand All @@ -190,11 +202,10 @@ def finalize(rab, thread):
"""

logging.debug(f'{rab.comm.Get_rank()} entering finalize.')

rab.comm.barrier()

rab.shutdown()
thread.join()

# End of finalize()

Loading

0 comments on commit feae3b3

Please sign in to comment.