Skip to content

Commit

Permalink
major cleanup, new fetch/parse separation
Browse files Browse the repository at this point in the history
  • Loading branch information
leifj committed Nov 29, 2017
1 parent 04c733c commit 0377f68
Show file tree
Hide file tree
Showing 18 changed files with 1,186 additions and 1,103 deletions.
96 changes: 39 additions & 57 deletions src/pyff/builtins.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import print_function

"""Package that contains the basic set of pipes - functions that can be used to put together a processing pipeling
for pyFF.
"""

from __future__ import absolute_import, print_function

import base64
import hashlib
import json
Expand All @@ -19,13 +20,14 @@
from iso8601 import iso8601
from lxml.etree import DocumentInvalid

from pyff.constants import NS
from pyff.decorators import deprecated
from pyff.logs import log
from pyff.pipes import Plumbing, PipeException, PipelineCallback, pipe
from pyff.stats import set_metadata_info
from pyff.utils import total_seconds, dumptree, safe_write, root, duration2timedelta, xslt_transform, \
iter_entities, validate_document
from .constants import NS
from .decorators import deprecated
from .logs import log
from .pipes import Plumbing, PipeException, PipelineCallback, pipe
from .stats import set_metadata_info
from .utils import total_seconds, dumptree, safe_write, root, duration2timedelta, xslt_transform, validate_document
from .samlmd import iter_entities, annotate_entity, set_entity_attributes
from .fetch import Resource

try:
from cStringIO import StringIO
Expand Down Expand Up @@ -148,10 +150,10 @@ def fork(req, *opts):
if req.t is not None:
nt = deepcopy(req.t)

ip = Plumbing(pipeline=req.args, pid="{}.fork".format(req.plumbing.pid))
ip = Plumbing(pipeline=req.args, pid="%s.fork" % req.plumbing.pid)
# ip.process(req.md,t=nt)
ireq = Plumbing.Request(ip, req.md, nt)
ip._process(ireq)
ip.iprocess(ireq)

if req.t is not None and ireq.t is not None and len(root(ireq.t)) > 0:
if 'merge' in opts:
Expand Down Expand Up @@ -239,7 +241,7 @@ def _pipe(req, *opts):
"""
# req.process(Plumbing(pipeline=req.args, pid="%s.pipe" % req.plumbing.pid))
ot = Plumbing(pipeline=req.args, pid="{}.pipe".format(req.plumbing.id))._process(req)
ot = Plumbing(pipeline=req.args, pid="%s.pipe" % req.plumbing.id).iprocess(req)
req.done = False
return ot

Expand All @@ -252,7 +254,6 @@ def when(req, condition, *values):
:param req: The request
:param condition: The condition key
:param values: The condition values
:param opts: More Options (unused)
:return: None
The inner pipeline is executed if the at least one of the condition values is present for the specified key in
Expand All @@ -270,12 +271,9 @@ def when(req, condition, *values):
The condition operates on the state: if 'foo' is present in the state (with any value), then the something branch is
followed. If 'bar' is present in the state with the value 'bill' then the other branch is followed.
"""
# log.debug("condition key: %s" % repr(condition))
c = req.state.get(condition, None)
# log.debug("condition %s" % repr(c))
if c is not None:
if not values or _any(values, c):
return Plumbing(pipeline=req.args, pid="%s.when" % req.plumbing.id)._process(req)
if c is not None and (not values or _any(values, c)):
return Plumbing(pipeline=req.args, pid="%s.when" % req.plumbing.id).iprocess(req)
return req.t


Expand Down Expand Up @@ -357,7 +355,7 @@ def loadstats(req, *opts):
:param opts: Options: (none)
:return: None
"""
from stats import metadata
from .stats import metadata
_stats = None
try:
if 'json' in opts:
Expand Down Expand Up @@ -467,36 +465,17 @@ def load(req, *opts):

params.setdefault('as', url)

post = None
def _null(t):
return t

post = _null
if params['via'] is not None:
post = PipelineCallback(params['via'], req)

if "://" in url:
log.debug("load {} verify {} as {} via {}".format(url, params['verify'], params['as'], params['via']))
remotes.append((url, params['verify'], params['as'], post))
elif os.path.exists(url):
if os.path.isdir(url):
log.debug("directory {} verify {} as {} via {}".format(url, params['verify'], params['as'], params['via']))
req.md.load_dir(url, url=params['as'], validate=opts['validate'], post=post,
fail_on_error=opts['fail_on_error'], filter_invalid=opts['filter_invalid'])
elif os.path.isfile(url):
log.debug("file {} verify {} as {} via {}".format(url, params['verify'], params['as'], params['via']))
remotes.append(("file://%s" % url, params['verify'], params['as'], post))
else:
error = "Unknown file type for load: '{}'".format(url)
if opts['fail_on_error']:
raise PipeException(error)
log.error(error)
else:
error = "Don't know how to load '{}' as {} verify {} via {} (file does not exist?)".format(url,
params['as'],
params['verify'],
params['via'])
if opts['fail_on_error']:
raise PipeException(error)
log.error(error)
req.md.rm.add(Resource(url, post, **params))

req.md.fetch_metadata(remotes, **opts)
log.debug("Refreshing all resources")
req.md.reload()


def _select_args(req):
Expand Down Expand Up @@ -915,6 +894,7 @@ def validate(req, *opts):

return req.t


@pipe
def prune(req, *opts):
"""
Expand Down Expand Up @@ -1018,15 +998,15 @@ def certreport(req, *opts):
keysize = cdict['modulus'].bit_length()
cert = cdict['cert']
if keysize < error_bits:
req.md.annotate(entity_elt,
annotate_entity(entity_elt,
"certificate-error",
"keysize too small",
"%s has keysize of %s bits (less than %s)" % (cert.getSubject(),
keysize,
error_bits))
log.error("%s has keysize of %s" % (eid, keysize))
elif keysize < warning_bits:
req.md.annotate(entity_elt,
annotate_entity(entity_elt,
"certificate-warning",
"keysize small",
"%s has keysize of %s bits (less than %s)" % (cert.getSubject(),
Expand All @@ -1036,7 +1016,7 @@ def certreport(req, *opts):

notafter = cert.getNotAfter()
if notafter is None:
req.md.annotate(entity_elt,
annotate_entity(entity_elt,
"certificate-error",
"certificate has no expiration time",
"%s has no expiration time" % cert.getSubject())
Expand All @@ -1046,23 +1026,24 @@ def certreport(req, *opts):
now = datetime.now()
dt = et - now
if total_seconds(dt) < error_seconds:
req.md.annotate(entity_elt,
annotate_entity(entity_elt,
"certificate-error",
"certificate has expired",
"%s expired %s ago" % (cert.getSubject(), -dt))
log.error("%s expired %s ago" % (eid, -dt))
elif total_seconds(dt) < warning_seconds:
req.md.annotate(entity_elt,
annotate_entity(entity_elt,
"certificate-warning",
"certificate about to expire",
"%s expires in %s" % (cert.getSubject(), dt))
log.warn("%s expires in %s" % (eid, dt))
except ValueError as ex:
req.md.annotate(entity_elt,
annotate_entity(entity_elt,
"certificate-error",
"certificate has unknown expiration time",
"%s unknown expiration time %s" % (cert.getSubject(), notafter))

req.md.store.update(entity_elt)
except Exception as ex:
log.error(ex)

Expand Down Expand Up @@ -1133,7 +1114,7 @@ def signcerts(req, *opts):
if req.t is None:
raise PipeException("Your pipeline is missing a select statement.")

for fp, pem in xmlsec.crypto.CertDict(req.t).iteritems():
for fp, pem in xmlsec.crypto.CertDict(req.t).items():
log.info("found signing cert with fingerprint %s" % fp)
return req.t

Expand Down Expand Up @@ -1191,12 +1172,12 @@ def finalize(req, *opts):
mdid = req.args.get('ID', 'prefix _')
if re.match('(\s)*prefix(\s)*', mdid):
prefix = re.sub('^(\s)*prefix(\s)*', '', mdid)
ID = now.strftime(prefix + "%Y%m%dT%H%M%SZ")
_id = now.strftime(prefix + "%Y%m%dT%H%M%SZ")
else:
ID = mdid
_id = mdid

if not e.get('ID'):
e.set('ID', ID)
e.set('ID', _id)

valid_until = str(req.args.get('validUntil', e.get('validUntil', None)))
if valid_until is not None and len(valid_until) > 0:
Expand All @@ -1210,7 +1191,7 @@ def finalize(req, *opts):
dt = dt.replace(tzinfo=None) # make dt "naive" (tz-unaware)
offset = dt - now
e.set('validUntil', dt.strftime("%Y-%m-%dT%H:%M:%SZ"))
except ValueError, ex:
except ValueError as ex:
log.error("Unable to parse validUntil: %s (%s)" % (valid_until, ex))

# set a reasonable default: 50% of the validity
Expand Down Expand Up @@ -1315,6 +1296,7 @@ def _setattr(req, *opts):

for e in iter_entities(req.t):
# log.debug("setting %s on %s" % (req.args,e.get('entityID')))
req.md.set_entity_attributes(e, req.args)
set_entity_attributes(e, req.args)
req.md.store.update(e)

return req.t
22 changes: 11 additions & 11 deletions src/pyff/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
Useful constants for pyFF. Mostly XML namespace declarations.
"""

import os
import sys
import pyconfig
import logging

Expand All @@ -30,14 +28,9 @@
'software': 'http://pyff.io/software',
'domain': 'http://pyff.io/domain'}

DIGESTS = ['sha1', 'md5', 'null']

EVENT_DROP_ENTITY = 'event.drop.entity'
EVENT_RETRY_URL = 'event.retry.url'
EVENT_IMPORTED_METADATA = 'event.imported.metadata'
EVENT_IMPORT_FAIL = 'event.import.failed'
EVENT_REPOSITORY_LIVE = 'event.repository.live'
PLACEHOLDER_ICON = 'data:image/gif;base64,R0lGODlhAQABAIABAP///wAAACH5BAEKAAEALAAAAAABAAEAAAICTAEAOw=='

DIGESTS = ['sha1', 'md5', 'null']

class Config(object):
google_api_key = pyconfig.setting("pyff.google_api_key", "google+api+key+not+set")
Expand All @@ -56,9 +49,16 @@ class Config(object):
aliases = pyconfig.setting("pyff.aliases", ATTRS)
base_dir = pyconfig.setting("pyff.base_dir", None)
proxy = pyconfig.setting("pyff.proxy", False)
store = pyconfig.setting("pyff.store", None)
allow_shutdown = pyconfig.setting("pyff.allow_shutdown", False)
modules = pyconfig.setting("pyff.modules", [])

cache_ttl = pyconfig.setting("pyff.cache.ttl", 300)
default_cache_duration = pyconfig.setting("pyff.default.cache_duration", "PT1H")
respect_cache_duration = pyconfig.setting("pyff.respect_cache_duration", True)
info_buffer_size = pyconfig.setting("pyff.info_buffer_size", 10)
worker_pool_size = pyconfig.setting("pyff.worker_pool_size", 10)
store_class = pyconfig.setting("pyff.store.class", "pyff.store:MemoryStore")
update_frequency = pyconfig.setting("pyff.update_frequency",600)
request_timeout = pyconfig.setting("pyff.request_timeout",10)
request_cache_time = pyconfig.setting("pyff.request_cache_time", 5)

config = Config()
23 changes: 19 additions & 4 deletions src/pyff/decorators.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function

"""
Various decorators used in pyFF.
"""
Expand Down Expand Up @@ -40,7 +38,7 @@ def f_retry(*args, **kwargs):
try:
return f(*args, **kwargs)
except ex as e:
msg = "{}, Retrying in {:d} seconds...".format(str(e), mdelay)
msg = "%s, Retrying in %d seconds..." % (str(e), mdelay)
if logger:
logger.warn(msg)
else:
Expand Down Expand Up @@ -81,6 +79,7 @@ class _HashedSeq(list):
__slots__ = 'hashvalue'

def __init__(self, tup, thehash=hash):
super(_HashedSeq, self).__init__()
self[:] = tup
self.hashvalue = thehash(tup)

Expand All @@ -95,8 +94,24 @@ def _make_key(args, kwds, typed,
thetuple=tuple,
thetype=type,
thelen=len):
'Make a cache key from optionally typed positional and keyword arguments'
"""
:param args:
:param kwds:
:param typed:
:param kwd_mark:
:param fasttypes:
:param thesorted:
:param thetuple:
:param thetype:
:param thelen:
:return:
Make a cache key from optionally typed positional and keyword arguments
"""
key = args
sorted_items = dict()
if kwds:
sorted_items = thesorted(kwds.items())
key += kwd_mark
Expand Down
14 changes: 14 additions & 0 deletions src/pyff/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

__author__ = 'leifj'


class MetadataException(Exception):
pass


class MetadataExpiredException(MetadataException):
pass


class PyffException(Exception):
pass
Loading

0 comments on commit 0377f68

Please sign in to comment.