diff --git a/bin/adhoc-scripts/usePycurl.py b/bin/adhoc-scripts/usePycurl.py new file mode 100644 index 0000000000..249e3410f1 --- /dev/null +++ b/bin/adhoc-scripts/usePycurl.py @@ -0,0 +1,74 @@ +import time +import types +import sys +from pprint import pformat +from urllib.parse import urlencode +from Utils.Utilities import decodeBytesToUnicode + +from WMCore.Services.pycurl_manager import RequestHandler +from Utils.CertTools import getKeyCertFromEnv, getCAPathFromEnv + +def main(): + #uri = "https://cmsweb.cern.ch/couchdb/_all_dbs" + #uri = "https://cmsweb.cern.ch/couchdb/reqmgr_config_cache/6c075881a8454070ce3c1e8921cdb45e" + uri = "https://cmsweb.cern.ch/couchdb/reqmgr_config_cache/6c075881a8454070ce3c1e8921cdb45e/configFile" + + ### deal with headers + data = {} + verb = "GET" + incoming_headers = {} + encoder = True + contentType = None + data, headers = encodeParams(data, verb, incoming_headers, encoder, contentType) + + headers["Accept-Encoding"] = "gzip,deflate,identity" + ckey, cert = getKeyCertFromEnv() + capath = getCAPathFromEnv() + reqHandler = RequestHandler() + timeStart = time.time() + response, result = reqHandler.request(uri, data, headers, verb=verb, ckey=ckey, cert=cert, capath=capath) + timeEnd = time.time() + print(f"Uri: {uri}") + print(f"Request headers: {headers}") + print(f"Time: {timeEnd - timeStart}") + print(f"Response headers: {pformat(response.header)}") + + ### deal with the response object + decoder = True + result = decodeResult(result, decoder) + #print(f"Response: {result}") + + +def encodeParams(data, verb, incomingHeaders, encoder, contentType): + headers = {"Content-type": contentType if contentType else "application/json", + "User-Agent": "WMCore/usePycurl", + "Accept": "application/json"} + + incomingHeaders["Accept-Encoding"] = "gzip,identity" + headers.update(incomingHeaders) + + encoded_data = '' + if verb != 'GET' and data: + if isinstance(encoder, (types.MethodType, types.FunctionType)): + encoded_data = encoder(data) + elif encoder is False: + encoded_data = data + else: + encoded_data = self.encode(data) + headers["Content-Length"] = len(encoded_data) + elif verb != 'GET': + headers["Content-Length"] = 0 + elif verb == 'GET' and data: + # encode the data as a get string + encoded_data = urlencode(data, doseq=True) + return encoded_data, headers + +def decodeResult(result, decoder): + if isinstance(decoder, (types.MethodType, types.FunctionType)): + result = decoder(result) + elif decoder is not False: + result = decodeBytesToUnicode(result) + return result + +if __name__ == '__main__': + sys.exit(main()) \ No newline at end of file diff --git a/src/python/WMCore/Cache/WMConfigCache.py b/src/python/WMCore/Cache/WMConfigCache.py index be8caf533a..26d1323224 100644 --- a/src/python/WMCore/Cache/WMConfigCache.py +++ b/src/python/WMCore/Cache/WMConfigCache.py @@ -11,6 +11,7 @@ from builtins import str from future.utils import with_metaclass from future.utils import viewkeys, viewvalues +import cherrypy import hashlib import logging @@ -98,10 +99,10 @@ def __init__(self, dbURL, couchDBName=None, id=None, rev=None, usePYCurl=True, self.dburl = dbURL self.detail = detail try: + cherrypy.log("AMR connecting to CouchServer with usePYCurl: %s" % usePYCurl) self.couchdb = CouchServer(self.dburl, usePYCurl=usePYCurl, ckey=ckey, cert=cert, capath=capath) - if self.dbname not in self.couchdb.listDatabases(): - self.createDatabase() + cherrypy.log("AMR connecting to database %s" % self.dbname) self.database = self.couchdb.connectDatabase(self.dbname) except Exception as ex: msg = "Error connecting to couch: %s\n" % str(ex) @@ -145,9 +146,13 @@ def connectUserGroup(self, groupname, username): _connectUserGroup_ """ + cherrypy.log("AMR setting group name to %s" % groupname) self.group = Group(name=groupname) + cherrypy.log("AMR setting couchdb for dburl %s and dbname %s" % (self.dburl, self.dbname)) self.group.setCouch(self.dburl, self.dbname) + cherrypy.log("AMR connecting to group backend") self.group.connect() + cherrypy.log("AMR making user for username %s" % username) self.owner = makeUser(groupname, username, couchUrl=self.dburl, couchDatabase=self.dbname) @@ -272,13 +277,17 @@ def loadByID(self, configID): Load a document from the server given its couchID """ try: + cherrypy.log("AMR getting document id %s" % configID) self.document = self.database.document(id=configID) if 'owner' in self.document: + cherrypy.log("AMR connectUserGroup") self.connectUserGroup(groupname=self.document['owner'].get('group', None), username=self.document['owner'].get('user', None)) if '_attachments' in self.document: + cherrypy.log("AMR loadAttachment") # Then we need to load the attachments for key in viewkeys(self.document['_attachments']): + cherrypy.log("AMR loading attachment for key %s" % key) self.loadAttachment(name=key) except CouchNotFoundError as ex: msg = "Document with id %s not found in couch\n" % (configID) @@ -301,7 +310,7 @@ def loadAttachment(self, name, overwrite=True): Load an attachment from the database and put it somewhere useful """ - + logging.info("AMR loading attachment for docID: %s and name: %s", self.document["_id"], name) attach = self.database.getAttachment(self.document["_id"], name) if not overwrite: @@ -538,11 +547,12 @@ def __str__(self): return self.document.__str__() def validate(self, configID): - try: # TODO: need to change to DataCache # self.loadDocument(configID = configID) + cherrypy.log("AMR loading ConfigCache for ID: %s" % configID) self.loadByID(configID=configID) + cherrypy.log("AMR ConfigCache loaded") except Exception as ex: raise ConfigCacheException("Failure to load ConfigCache while validating workload: %s" % str(ex)) diff --git a/src/python/WMCore/Database/CMSCouch.py b/src/python/WMCore/Database/CMSCouch.py index 99730b1d15..e6146cad6a 100644 --- a/src/python/WMCore/Database/CMSCouch.py +++ b/src/python/WMCore/Database/CMSCouch.py @@ -1021,7 +1021,7 @@ def deleteDatabase(self, dbname): raise RuntimeError(msg) return self.delete("/%s" % dbname) - def connectDatabase(self, dbname='database', create=True, size=1000): + def connectDatabase(self, dbname='database', create=False, size=1000): """ Return a Database instance, pointing to a database in the server. If the database doesn't exist create it if create is True. diff --git a/src/python/WMCore/JobStateMachine/ConfigureState.py b/src/python/WMCore/JobStateMachine/ConfigureState.py deleted file mode 100644 index b168166d21..0000000000 --- a/src/python/WMCore/JobStateMachine/ConfigureState.py +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env python -""" -_ConfigureState_ - -Populate the states tables with all known states, and set the max retries for -each state. Default to one retry. -Create the CouchDB and associated views if needed. -""" - - - - -from WMCore.Database.CMSCouch import CouchServer -from WMCore.DataStructs.WMObject import WMObject - -class ConfigureState(WMObject): - def configure(self): - server = CouchServer(self.config.JobStateMachine.couchurl) - dbname = 'JSM/JobHistory' - if dbname not in server.listDatabases(): - server.createDatabase(dbname) diff --git a/src/python/WMCore/Services/pycurl_manager.py b/src/python/WMCore/Services/pycurl_manager.py index 72fb4338fe..4a2af76bad 100644 --- a/src/python/WMCore/Services/pycurl_manager.py +++ b/src/python/WMCore/Services/pycurl_manager.py @@ -44,7 +44,7 @@ from past.builtins import basestring from future.utils import viewitems - +import cherrypy # system modules import copy import json @@ -257,7 +257,7 @@ def set_opts(self, curl, url, params, headers, # we need to enable this header here, in case it has not been provided by upstream. thisHeaders.setdefault("Accept-Encoding", "gzip") curl.setopt(pycurl.HTTPHEADER, [encodeUnicodeToBytes("%s: %s" % (k, v)) for k, v in viewitems(thisHeaders)]) - + cherrypy.log("AMR url %s, thisHeaders %s" % (url, thisHeaders)) bbuf = BytesIO() hbuf = BytesIO() curl.setopt(pycurl.WRITEFUNCTION, bbuf.write) @@ -311,6 +311,8 @@ def request(self, url, params, headers=None, verb='GET', verbose=0, ckey=None, cert=None, capath=None, doseq=True, encode=False, decode=False, cainfo=None, cookie=None): """Fetch data for given set of parameters""" + cherrypy.log("AMR url: %s, params: %s, headers: %s, verb: %s" % (url, params, headers, verb)) + cherrypy.log(" AMR doseq: %s, encode: %s, decode: %s, cookie: %s" % (doseq, encode, decode, cookie)) curl = pycurl.Curl() bbuf, hbuf = self.set_opts(curl, url, params, headers, ckey, cert, capath, verbose, verb, doseq, encode, cainfo, cookie) @@ -318,6 +320,7 @@ def request(self, url, params, headers=None, verb='GET', if verbose: print(verb, url, params, headers) header = self.parse_header(hbuf.getvalue()) + cherrypy.log("AMR response headers: %s" % (header.header)) data = bbuf.getvalue() data = decompress(data, header.header) if header.status < 300: diff --git a/src/python/WMCore/WMSpec/StdSpecs/DQMHarvest.py b/src/python/WMCore/WMSpec/StdSpecs/DQMHarvest.py index 5b19a4605a..a77c1d0fe6 100644 --- a/src/python/WMCore/WMSpec/StdSpecs/DQMHarvest.py +++ b/src/python/WMCore/WMSpec/StdSpecs/DQMHarvest.py @@ -48,12 +48,16 @@ def validateSchema(self, schema): Standard DataProcessing schema validation. """ + import cherrypy + cherrypy.log("AMR calling DQMHarvest.validateSchema") DataProcessing.validateSchema(self, schema) + cherrypy.log("AMR calling DQMHarvest.validateConfigCacheExists") self.validateConfigCacheExists(configID=schema["DQMConfigCacheID"], configCacheUrl=schema['ConfigCacheUrl'], couchDBName=schema["CouchDBName"], getOutputModules=False) + cherrypy.log("AMR done with DQMHarvest.validateConfigCacheExists") @staticmethod def getWorkloadCreateArgs(): diff --git a/src/python/WMCore/WMSpec/StdSpecs/StdBase.py b/src/python/WMCore/WMSpec/StdSpecs/StdBase.py index 2f8e44f53a..44dc29ffd9 100644 --- a/src/python/WMCore/WMSpec/StdSpecs/StdBase.py +++ b/src/python/WMCore/WMSpec/StdSpecs/StdBase.py @@ -7,9 +7,10 @@ from __future__ import division from future.utils import viewitems from builtins import range, object - +import cherrypy import logging import json +from collections import deque from Utils.PythonVersion import PY3 from Utils.Utilities import decodeBytesToUnicodeConditional @@ -53,7 +54,8 @@ def __init__(self): # Internal parameters self.workloadName = None - self.config_cache = {} + # cache to be used for the workflow config IDs + self.config_cache = deque([], 10) return @@ -65,6 +67,7 @@ def __call__(self, workloadName, arguments): method and pull out any that are setup by this base class. """ self.workloadName = workloadName + cherrypy.log("AMR getWorkloadCreateArgs") argumentDefinition = self.getWorkloadCreateArgs() for arg in argumentDefinition: try: @@ -81,6 +84,7 @@ def __call__(self, workloadName, arguments): raise WMSpecFactoryException("parameter %s: %s" % (arg, str(ex))) # TODO: this replace can be removed in one year from now, thus March 2022 + cherrypy.log("AMR updating dbs") if hasattr(self, "dbsUrl"): self.dbsUrl = self.dbsUrl.replace("cmsweb.cern.ch", "cmsweb-prod.cern.ch") self.dbsUrl = self.dbsUrl.rstrip("/") @@ -90,6 +94,18 @@ def __call__(self, workloadName, arguments): # static copy of the skim mapping skimMap = {} + def getCachedConfigID(self, absoluteConfigID): + """ + Given a ConfigCacheID (including the url and db name), return it if it's + available in the memory cache. + :param absoluteConfigID: string with the configID url + :return: the config cache document, or None if not found + """ + for item in self.config_cache: + if absoluteConfigID == item['name']: + return item['configDoc'] + return + @staticmethod def calcEvtsPerJobLumi(ePerJob, ePerLumi, tPerEvent, requestedEvents=None): """ @@ -193,15 +209,17 @@ def determineOutputModules(self, scenarioFunc=None, scenarioArgs=None, scenarioArgs = scenarioArgs or {} outputModules = {} + cherrypy.log("AMR determineOutputModules for %s and %s" % (configCacheUrl, couchDBName)) + cacheKey = configCacheUrl + couchDBName + configDoc if configDoc is not None and configDoc != "": - if (configCacheUrl, couchDBName) in self.config_cache: - configCache = self.config_cache[(configCacheUrl, couchDBName)] - else: - configCache = ConfigCache(configCacheUrl, couchDBName, True) - self.config_cache[(configCacheUrl, couchDBName)] = configCache + configCache = self.getCachedConfigID(cacheKey) + if not configCache: + configCacheDB = ConfigCache(configCacheUrl, couchDBName) + configCacheDB.loadByID(configDoc) + # FIXME: this does not cache the attachment + self.config_cache[cacheKey] = configCache # TODO: need to change to DataCache # configCache.loadDocument(configDoc) - configCache.loadByID(configDoc) outputModules = configCache.getOutputModuleInfo() else: if 'outputs' in scenarioArgs and scenarioFunc in ["promptReco", "expressProcessing", "repack"]: @@ -235,6 +253,7 @@ def determineOutputModules(self, scenarioFunc=None, scenarioArgs=None, outputModules[moduleLabel] = {'dataTier': dataTier, 'primaryDataset': scenarioArgs.get('primaryDataset'), 'filterName': alcaSkim} + cherrypy.log("AMR determineOutputModules self.config_cache keys %s" % self.config_cache.keys()) return outputModules @@ -902,11 +921,16 @@ def factoryWorkloadConstruction(self, workloadName, arguments): if arguments.get('RequestType') == 'Resubmission': self.validateSchema(schema=arguments) else: + cherrypy.log("AMR running masterValidation") self.masterValidation(schema=arguments) + cherrypy.log("AMR running validateSchema") self.validateSchema(schema=arguments) + cherrypy.log("AMR running StdBase.__call__") workload = self.__call__(workloadName=workloadName, arguments=arguments) + cherrypy.log("AMR running validateWorkload") self.validateWorkload(workload) + self.config_cache.clear() return workload @@ -952,11 +976,15 @@ def validateConfigCacheExists(self, configID, configCacheUrl, couchDBName, if configID == '' or configID == ' ': self.raiseValidationException(msg="ConfigCacheID is invalid and cannot be loaded") - if (configCacheUrl, couchDBName) in self.config_cache: - configCache = self.config_cache[(configCacheUrl, couchDBName)] + cherrypy.log("AMR validateConfigCacheExists for %s and %s" % (configCacheUrl, couchDBName)) + cacheKey = configCacheUrl + couchDBName + configID + if cacheKey in self.config_cache: + cherrypy.log("AMR fetching doc from cache: %s" % configID) + configCache = self.config_cache[cacheKey] else: configCache = ConfigCache(dbURL=configCacheUrl, couchDBName=couchDBName, detail=getOutputModules) - self.config_cache[(configCacheUrl, couchDBName)] = configCache + self.config_cache[cacheKey] = configCache + cherrypy.log("AMR validateConfigCacheExists self.config_cache keys %s" % self.config_cache.keys()) try: # if detail option is set return outputModules diff --git a/src/python/WMCore/WMSpec/StdSpecs/TaskChain.py b/src/python/WMCore/WMSpec/StdSpecs/TaskChain.py index 7953b55fa0..3aa076ac89 100644 --- a/src/python/WMCore/WMSpec/StdSpecs/TaskChain.py +++ b/src/python/WMCore/WMSpec/StdSpecs/TaskChain.py @@ -84,7 +84,7 @@ }, """ from __future__ import division - +import cherrypy import json from builtins import range, object from future.utils import viewitems @@ -219,7 +219,7 @@ def __call__(self, workloadName, arguments): """ _call_ - Create a ReReco workload with the given parameters. + Create a TaskChain workload with the given parameters. """ StdBase.__call__(self, workloadName, arguments) self.workload = self.createWorkload() @@ -719,6 +719,7 @@ def validateSchema(self, schema): # Validate the existence of the configCache if task["ConfigCacheID"]: + cherrypy.log("AMR validating configCacheExists in TaskChain for id: %s" % task["ConfigCacheID"]) self.validateConfigCacheExists(configID=task['ConfigCacheID'], configCacheUrl=schema["ConfigCacheUrl"], couchDBName=schema["CouchDBName"],