diff --git a/.gitignore b/.gitignore index bf8580a..e88e1f3 100644 --- a/.gitignore +++ b/.gitignore @@ -46,7 +46,7 @@ doc/*.gz # PyCharm files .idea/ -# Output +# Output *.nc output/ batch-out/ @@ -66,5 +66,8 @@ build dist cassandra.egg-info +# ignore log dirs +logs/ + # raw hector data not committed with this package hector-outputstream*.csv diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 51cc131..88d6f99 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -1,14 +1,14 @@ #!/bin/env python -"""GCAM automation system package. +"""Cassandra model coupling framework -This package contains the components that implement the GCAM automation system, -along with utility functions that are common to all GCAM functional areas. -Several GCAM functional areas also have subpackages that implement calculations -specific to those areas; for example, gcam.water for water downscaling or -gcam.land for land use downscaling. The package also contains gcam_driver, a -stand-alone program for running the automation system. +This package contains the components that implement the Cassandra model +coupling framework. The package also contains cassandra_main.py, a +stand-alone program for running the framework. """ +import pkg_resources -__all__ = ['util', 'water', 'components'] +__version__ = pkg_resources.get_distribution('cassandra').version + +__all__ = ['util', 'components'] diff --git a/cassandra/cassandra_main.py b/cassandra/cassandra_main.py index efb3be6..616cd26 100755 --- a/cassandra/cassandra_main.py +++ b/cassandra/cassandra_main.py @@ -14,20 +14,36 @@ import re import threading import argparse +import logging +import os -def bootstrap_sp(cfgfile_name): +def bootstrap_sp(argvals): """ Bootstrap the multithreaded (single processing) version of the calculation. - :param cfgfile_name: Name of the configuration file + :param argvals: Command line arguments parsed by argparse. :return: (component-list, capability-table) """ from configobj import ConfigObj + from cassandra import __version__ from cassandra.compfactory import create_component + # Configure logger + if argvals.logdir is None: + logging.basicConfig(stream=sys.stdout, level=argvals.loglvl) + logging.info(f'This is Cassandra version {__version__}.') + else: + os.makedirs(argvals.logdir, exist_ok=True) + logging.basicConfig(filename=f'{argvals.logdir}/cassandra.log', + level=argvals.loglvl, filemode='w') + # Write to screen the location of the logging output + print(f'This is Cassandra version {__version__}. Output will be logged to {argvals.logdir}/cassandra.log') + + cfgfile_name = argvals.ctlfile + # initialize the structures that will receive the data we are # parsing from the file capability_table = {} @@ -53,12 +69,23 @@ def bootstrap_sp(cfgfile_name): def main(argvals): + + # Set the appropriate logging level + # NB: You MUST NOT call any logging functions until either bootstrap_mp + # or bootstrap_sp has been called. + if argvals.verbose: + argvals.loglvl = logging.DEBUG + elif argvals.quiet: + argvals.loglvl = logging.WARNING + else: + argvals.loglvl = logging.INFO + if argvals.mp: # See notes in mp.py about side effects of importing that module. from cassandra.mp import bootstrap_mp, finalize (component_list, cap_table) = bootstrap_mp(argvals) else: - (component_list, cap_table) = bootstrap_sp(argvals.ctlfile) + (component_list, cap_table) = bootstrap_sp(argvals) # We will look up "general" in the cap_table and process any # global parameters here, but in the current version we don't @@ -67,7 +94,7 @@ def main(argvals): threads = [] for component in component_list: - print(f"running {str(component.__class__)}") + logging.info(f"running {str(component.__class__)}") threads.append(component.run()) # Wait for all component threads to complete before printing end @@ -86,30 +113,29 @@ def main(argvals): # is still running. Once again take advantage of the fact that # if the RAB is present, it is always the first in the list. nfail = 0 - + if argvals.mp: reg_comps = component_list[1:] rab_comp = component_list[0] else: reg_comps = component_list rab_comp = None - + for component in reg_comps: if component.status != 1: from logging import error error(f'Component {str(component.__class__)} returned failure status\n') nfail += 1 - + if rab_comp is not None and rab_comp.status != 0: from logging import error error('RAB has crashed or is otherwise not running.') nfail += 1 - if nfail == 0: - print('\n****************All components completed successfully.') + logging.info('\n****************All components completed successfully.') else: - print(f'\n****************{nfail} components failed.') + logging.error(f'\n****************{nfail} components failed.') raise RuntimeError(f'{nfail} components failed.') # If this is a multiprocessing calculation, then we need to @@ -117,7 +143,7 @@ def main(argvals): if argvals.mp: finalize(component_list[0], threads[0]) - print("\nFIN.") + logging.info("\nFIN.") return nfail @@ -125,6 +151,12 @@ def main(argvals): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--mp', action='store_true', default=False, help='Use multiprocessing.') + parser.add_argument('-l', dest='logdir', + help='Directory for writing logfiles. Default = stdout for SP, "logs" for MP') + parser.add_argument('-v', dest='verbose', action='store_true', default=False, + help='Verbose mode: log output at DEBUG level (overrides -q).') + parser.add_argument('-q', dest='quiet', action='store_true', default=False, + help='Quiet mode: log output at WARNING level (overridden by -v).') parser.add_argument('ctlfile', help='Name of the configuration file for the calculation.') argvals = parser.parse_args() diff --git a/cassandra/components.py b/cassandra/components.py index 9da9820..603532f 100644 --- a/cassandra/components.py +++ b/cassandra/components.py @@ -39,15 +39,12 @@ # relevant python component. import os -import os.path import re import subprocess import threading -import tempfile +import logging import pkg_resources import pandas as pd -from sys import stdout -from sys import stderr from cassandra import util # This class is here to make it easy for a class to ignore failures to @@ -201,8 +198,7 @@ def run_component_wrapper(self): # eventually try to implement co-simulations. with self.condition: try: - import logging - logging.info(f'starting {self.__class__}') + logging.debug(f'starting {self.__class__}') rv = self.run_component() if not rv == 0: # possibly add some other error handling here. @@ -210,7 +206,7 @@ def run_component_wrapper(self): logging.error(msg) raise RuntimeError(msg) else: - logging.info(f"{self.__class__}: finished successfully.\n") + logging.debug(f"{self.__class__}: finished successfully.\n") self.status = 1 # set success condition except: @@ -220,7 +216,7 @@ def run_component_wrapper(self): finally: self.condition.notify_all() # release any waiting threads - logging.info(f'completed {self.__class__}') + logging.debug(f'completed {self.__class__}') # end of with block: lock on condition var released. def fetch(self, capability): @@ -273,7 +269,7 @@ def fetch(self, capability): # statement tries to obtain the lock. with self.condition: if self.status == 0: # component hasn't run yet. Wait on it - print(f"\twaiting on {self.__class__}\n") + logging.debug(f"\twaiting on {self.__class__}\n") self.condition.wait() # end of with block: lock is released @@ -388,8 +384,8 @@ def __init__(self, cap_tbl): # this is a reference copy, so any entries added to params will also appear in results. self.addresults('general', self.params) - print('General parameters as input:') - print(self.results['general']) + logging.debug('General parameters as input:') + logging.debug(self.results['general']) # We need to allow gcamutil access to these parameters, since it doesn't otherwise know how to find the # global params component. <- gross. we need a better way to do this. @@ -410,7 +406,7 @@ def run_component(self): if 'rgnconfig' in genrslt: rgnconfig = genrslt['rgnconfig'] else: - stdout.write('[GlobalParamsComponent]: Using default region mapping (14 region)') + logging.warning('[GlobalParamsComponent]: Using default region mapping (14 region)') rgnconfig = 'rgn14' genrslt['rgnconfig'] = util.abspath(rgnconfig, genrslt['inputdir']) @@ -501,7 +497,7 @@ def run_component(self): dbxmlfile = match.group(1) break - print(f"{self.__class__}: dbxmlfile = {dbxmlfile}") + logging.info(f"{self.__class__}: dbxmlfile = {dbxmlfile}") # The file spec is a relative path, starting from the # directory that contains the config file. dbxmlfile = os.path.join(self.workdir, dbxmlfile) @@ -510,7 +506,7 @@ def run_component(self): if not self.clobber: # This is not an error; it just means we can leave # the existing output in place and return it. - print("GcamComponent: results exist and no clobber. Skipping.") + logging.info("GcamComponent: results exist and no clobber. Skipping.") gcamrslt["changed"] = 0 # mark the cached results as clean return 0 else: @@ -531,7 +527,7 @@ def run_component(self): self.addresults('gcam-core', gcamrslt) # now we're ready to actually do the run. We don't check the return code; we let the run() method do that. - print(f"Running: {exe} -C{cfg} -L{logcfg}") + logging.info(f"Running: {exe} -C{cfg} -L{logcfg}") if logfile is None: return subprocess.call([exe, '-C'+cfg, '-L'+logcfg], cwd=self.workdir) @@ -893,7 +889,7 @@ class DummyComponent(ComponentBase): request_delays - list of time delays (ms) before each request is made finish_delay - delay (ms) before the component finalizes and exports except - Throw an exception with the parameter value just before - the component would have exited (this is used for testing + the component would have exited (this is used for testing error handling). """ diff --git a/cassandra/mp.py b/cassandra/mp.py index ad4a01e..8a2445d 100644 --- a/cassandra/mp.py +++ b/cassandra/mp.py @@ -28,10 +28,12 @@ """ from mpi4py import MPI +from cassandra import __version__ from cassandra.rab import RAB from cassandra.constants import TAG_CONFIG, SUPERVISOR_RANK from cassandra.compfactory import create_component import logging +import os def bootstrap_mp(argvals): @@ -49,14 +51,24 @@ def bootstrap_mp(argvals): world = MPI.COMM_WORLD rank = world.Get_rank() - logging.basicConfig(filename=f'logs/cassandra-{rank}.log', level=logging.DEBUG) - + if argvals.logdir is None: + logdir = 'logs' + else: + logdir = argvals.logdir + + os.makedirs(logdir, exist_ok=True) + + logging.basicConfig(filename=f'{logdir}/cassandra-{rank}.log', level=argvals.loglvl, + filemode='w') + if rank == SUPERVISOR_RANK: + # Print the location of the logs so that it will appear in the + # output file of batch jobs. + print(f'\nThis is Cassandra version {__version__}. Log output will be written to {logdir}.') my_assignment = distribute_assignments_supervisor(argvals) else: my_assignment = distribute_assignments_worker(argvals) - # my_assignment will be a dictionary of configuration sections assigned to # this process. We need to create and initialize the components assigned to # us. We also need to create a RAB. @@ -68,7 +80,7 @@ def bootstrap_mp(argvals): component = create_component(section, cap_tbl) component.params.update(conf) component.finalize_parsing() - comps.append(component) + comps.append(component) # Next we need to compile a table of remote capabilities. To do this, each # component needs to distribute its local capability table to all the other @@ -116,7 +128,7 @@ def distribute_assignments_worker(argvals): def distribute_assignments_supervisor(argvals): """Parse config file and distribute component assignments - :param argvals: Arguments structure parsed by argparse. + :param argvals: Arguments structure parsed by argparse. :return: Dictionary of component definitions for components assigned to this process (i.e., the supervisor). @@ -140,7 +152,7 @@ def distribute_assignments_supervisor(argvals): config = ConfigObj(argvals.ctlfile) - # Get list of section names + # Get list of section names section_names = list(config.keys()) try: # Global section goes to everyone. Also, it's required, so check for it @@ -148,7 +160,7 @@ def distribute_assignments_supervisor(argvals): section_names.remove('Global') except ValueError as e: raise RuntimeError("Config file must have a '[Global]' section") from e - + section_weights = [config[s].get('mp.weight', 1.0) for s in section_names] name_weight = zip(section_names, section_weights) section_names = [s[0] for s in sorted(name_weight, key=lambda x:x[1], reverse=True)] @@ -159,10 +171,10 @@ def distribute_assignments_supervisor(argvals): assignments = [] for i in range(nproc): - assignments.append({'Global':config['Global']}) + assignments.append({'Global': config['Global']}) for section in section_names: assignments[nextrank][section] = config[section] - nextrank = (nextrank+1)%nproc + nextrank = (nextrank+1) % nproc # Distribute these assignments to the workers for r in range(nproc): @@ -173,7 +185,7 @@ def distribute_assignments_supervisor(argvals): logging.debug(f'supervisor assignment: {assignments[SUPERVISOR_RANK]}') return assignments[SUPERVISOR_RANK] - + def finalize(rab, thread): """Finalization procedure for mp calculations. @@ -190,11 +202,10 @@ def finalize(rab, thread): """ logging.debug(f'{rab.comm.Get_rank()} entering finalize.') - + rab.comm.barrier() rab.shutdown() thread.join() # End of finalize() - diff --git a/cassandra/util.py b/cassandra/util.py index 6dcd182..233b3e9 100644 --- a/cassandra/util.py +++ b/cassandra/util.py @@ -7,6 +7,7 @@ import subprocess import tempfile import random +import logging # utility functions used in other gcam python code @@ -159,7 +160,7 @@ def gcam_query(batchqfiles, dbxmlfiles, inputdir, outfiles): # Display numbers up to 1024 seem to be safe. random.jumpahead(os.getpid()) # make sure that different instances have different rng states. disp = random.randint(1, 1024) - print('X display is: %d' % disp) + logging.info('X display is: %d' % disp) xvfb = subprocess.Popen(['Xvfb', ':%d' % disp, '-pn', '-audit', '4', '-screen', '0', '800x600x16']) try: ldlibpath = os.getenv('LD_LIBRARY_PATH') @@ -169,7 +170,7 @@ def gcam_query(batchqfiles, dbxmlfiles, inputdir, outfiles): ldlibpath = "LD_LIBRARY_PATH=%s:%s" % (ldlibpath, DBXMLlib) for (query, dbxml, output) in zip(qlist, dbxmllist, outlist): - print(query, output) + logging.info(query, output) # make a temporary file tempquery = None try: @@ -299,7 +300,7 @@ def abspath(filename, defaultpath=None, tag=None): """ - print('[%s]: default path= %s filename= %s' % (str(tag), str(defaultpath), str(filename))) + logging.info('[%s]: default path= %s filename= %s' % (str(tag), str(defaultpath), str(filename))) if filename[0] == '/': return filename diff --git a/setup.py b/setup.py index a121251..47b7f16 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ def get_requirements(): setup( name='cassandra', - version='0.4.0', + version='0.5.0', description='A GCAM automation system', url='https://github.com/jgcri/cassandra', author='Robert Link; Caleb Braun',