Skip to content

Commit

Permalink
Merge pull request #63 from eea/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
olimpiurob authored May 2, 2024
2 parents 2ebac6f + 570b2ce commit 9c1817f
Show file tree
Hide file tree
Showing 6 changed files with 1,923 additions and 1,314 deletions.
53 changes: 31 additions & 22 deletions Products/Reportek/BdrAuthorizationMiddleware.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import logging
from time import time

from ZODB.PersistentMapping import PersistentMapping
from OFS.SimpleItem import SimpleItem
from plone.memoize import ram
from ZODB.PersistentMapping import PersistentMapping

import logging
logger = logging.getLogger("Reportek")

__all__ = [
'BdrAuthorizationMiddleware'
]
__all__ = ["BdrAuthorizationMiddleware"]


class BdrAuthorizationMiddleware(SimpleItem):

recheck_interval = 300

def __init__(self, url):
Expand All @@ -23,40 +20,52 @@ def __init__(self, url):
def setServiceRecheckInterval(self, seconds):
self.recheck_interval = seconds

@ram.cache(lambda *args, **kwargs: args[2] + str(time() // kwargs['recheck_interval'])) # noqa
def getUserCollectionPaths(self, username,
recheck_interval=recheck_interval):
@ram.cache(
lambda *args, **kwargs: args[2]
+ str(time() // kwargs["recheck_interval"])
)
def getUserCollectionPaths(
self, username, userdata=None, recheck_interval=recheck_interval
):
logger.debug(
"Get companies from middleware for ecas user: %s" % username)
accessiblePaths = self.FGASRegistryAPI.getCollectionPaths(username)
"Get companies from middleware for ecas user: %s" % username
)
accessiblePaths = self.FGASRegistryAPI.getCollectionPaths(
username, userdata=userdata
)
return accessiblePaths

def authorizedUser(self, username, path):
def authorizedUser(self, username, path, userdata=None):
if self.lockedCollection(path):
logger.warning("This collection is locked down: %s!" % path)
return False
accessiblePaths = self.getUserCollectionPaths(
username, recheck_interval=self.recheck_interval)
if path in accessiblePaths.get('paths'):
username, userdata=userdata, recheck_interval=self.recheck_interval
)
if path in accessiblePaths.get("paths"):
return "RW"
if path in accessiblePaths.get('prev_paths'):
if path in accessiblePaths.get("prev_paths"):
return "RO"

def lockDownCollection(self, path, user):
if path not in self.lockedDownCollections:
self.lockedDownCollections[path] = None
self.lockedDownCollections[path] = {'state': 'locked',
'ts': time(),
'user': user}
self.lockedDownCollections[path] = {
"state": "locked",
"ts": time(),
"user": user,
}

def unlockCollection(self, path, user):
if path not in self.lockedDownCollections:
# log unlock without lock
self.lockedDownCollections[path] = None
self.lockedDownCollections[path] = {'state': 'unlocked',
'ts': time(),
'user': user}
self.lockedDownCollections[path] = {
"state": "unlocked",
"ts": time(),
"user": user,
}

def lockedCollection(self, path):
lockedItem = self.lockedDownCollections.get(path)
return lockedItem and lockedItem['state'] == 'locked'
return lockedItem and lockedItem["state"] == "locked"
Loading

0 comments on commit 9c1817f

Please sign in to comment.