Skip to content

Commit

Permalink
Merge pull request #72 from eea/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
olimpiurob authored Jul 9, 2024
2 parents 55e38f7 + b9c30ba commit 9b9f89d
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 478 deletions.
187 changes: 70 additions & 117 deletions Products/Reportek/ContentRegistryPingger.py
Original file line number Diff line number Diff line change
@@ -1,193 +1,146 @@
import logging
import pickle
import threading
import time

import requests
from BeautifulSoup import BeautifulSoup as bs
from config import (DEPLOYMENT_CDR, REPORTEK_DEPLOYMENT,
REDIS_DATABASE, REDIS_HOSTNAME, REDIS_PORT)
from constants import PING_ENVELOPES_REDIS_KEY
from config import DEPLOYMENT_CDR, REPORTEK_DEPLOYMENT

from Products.Reportek.rabbitmq import send_message

logger = logging.getLogger("Reportek")


class ContentRegistryPingger(object):

PING_STORE = None

if REPORTEK_DEPLOYMENT == DEPLOYMENT_CDR and REDIS_DATABASE:
import redis
try:
PING_STORE = redis.Redis(host=REDIS_HOSTNAME,
port=REDIS_PORT,
db=REDIS_DATABASE)
except Exception:
PING_STORE = None

def __init__(self, api_url, cr_rmq=False):
self.api_url = api_url
self.cr_rmq = cr_rmq

def _log_ping(self, success, message, url, ping_argument=None):
if not ping_argument or ping_argument == 'create':
action = 'update/create'
elif ping_argument == 'delete':
action = 'delete'
if not ping_argument or ping_argument == "create":
action = "update/create"
elif ping_argument == "delete":
action = "delete"
messageBody = self.content_registry_pretty_message(message)
if success:
logger.info(
'''Content Registry (%s) pingged OK for the %s of %s. '''
'''Response was: %s''' % (self.api_url, action, url,
messageBody))
"""Content Registry (%s) pingged OK for the %s of %s. """
"""Response was: %s"""
% (self.api_url, action, url, messageBody)
)
else:
logger.warning(
'''Content Registry (%s) ping unsuccessful for the %s '''
'''of %s. Response was: %s''' % (self.api_url, action, url,
messageBody))

def content_registry_ping(self, uris, ping_argument=None, envPathName=None,
wk=None):
""" Pings the Content Registry to harvest a new envelope almost
immediately after the envelope is released or revoked
with the name of the envelope's RDF output
"""Content Registry (%s) ping unsuccessful for the %s """
"""of %s. Response was: %s"""
% (self.api_url, action, url, messageBody)
)

def content_registry_ping(
self, uris, ping_argument=None, envPathName=None, wk=None
):
"""Pings the Content Registry to harvest a new envelope almost
immediately after the envelope is released or revoked
with the name of the envelope's RDF output
"""

def parse_uri(uri):
""" Use only http uris for CDR
"""
"""Use only http uris for CDR"""
if REPORTEK_DEPLOYMENT == DEPLOYMENT_CDR:
new_uri = uri.replace('https://', 'http://')
logger.info("Original uri: %s has been replaced with uri: %s"
% (uri, new_uri))
new_uri = uri.replace("https://", "http://")
logger.info(
"Original uri: %s has been replaced with uri: %s"
% (uri, new_uri)
)
uri = new_uri
return uri

allOk = True
ping_res = ''
ping_res = ""
http_code = None
if not ping_argument:
ping_argument = 'create'
if envPathName and self.PING_STORE and not self.cr_rmq:
ts = self._start_ping(envPathName, op=ping_argument)
ping_argument = "create"
for uri in uris:
uri = parse_uri(uri)
success, response = self._content_registry_ping(
uri, ping_argument=ping_argument)
ping_res = getattr(response, 'text', '')
http_code = getattr(response, 'status_code', None)
uri, ping_argument=ping_argument
)
ping_res = getattr(response, "text", "")
http_code = getattr(response, "status_code", None)
self._log_ping(success, ping_res, uri, ping_argument)
if wk:
msgs = {
True: "CR Ping successful for the {} of {} (HTTP status: {})".format(ping_argument, uri, http_code), # noqa
False: "CR Ping failed for the {} of {} (HTTP status: {})".format(ping_argument, uri, http_code) # noqa
True: """CR Ping successful for the {} of """
"""{} (HTTP status: {})""".format(
ping_argument, uri, http_code
),
False: """CR Ping failed for the {} of """
"""{} (HTTP status: {})""".format(
ping_argument, uri, http_code
),
}
wk.addEvent(msgs.get(success))
allOk = allOk and success
if envPathName and self.PING_STORE and not self.cr_rmq:
if not self._check_ping(envPathName, ts):
allOk = False
break
if envPathName and self.PING_STORE and not self.cr_rmq:
self._stop_ping(envPathName, ts)

return allOk, ping_res

def content_registry_ping_async(self, uris, ping_argument=None,
envPathName=None, wk=None):
""" Delegate this to fire and forget thread
don't keep the user (browser) waiting
def content_registry_ping_async(
self, uris, ping_argument=None, envPathName=None, wk=None
):
"""Delegate this to fire and forget thread
don't keep the user (browser) waiting
"""

pingger = threading.Thread(
target=ContentRegistryPingger.content_registry_ping,
name='contentRegistryPing',
name="contentRegistryPing",
args=(self, uris),
kwargs={'ping_argument': ping_argument,
'envPathName': envPathName,
'wk': wk})
kwargs={
"ping_argument": ping_argument,
"envPathName": envPathName,
"wk": wk,
},
)
pingger.setDaemon(True)
pingger.start()
return

def _content_registry_ping(self, uri, ping_argument=None):
params = {'uri': uri}
if ping_argument == 'create':
params['create'] = 'true'
elif ping_argument == 'delete':
params['delete'] = 'true'
if not getattr(self, 'cr_rmq', None):
params = {"uri": uri}
if ping_argument == "create":
params["create"] = "true"
elif ping_argument == "delete":
params["delete"] = "true"
if not getattr(self, "cr_rmq", None):
resp = requests.get(self.api_url, params=params)
if resp.status_code == 200:
return (True, resp)
else:
options = {}
options['create'] = ping_argument
options['service_to_ping'] = self.api_url
options['obj_url'] = uri
options["create"] = ping_argument
options["service_to_ping"] = self.api_url
options["obj_url"] = uri
resp = self.ping_RabbitMQ(options)
if resp:
return (True, 'Queued')
return (True, "Queued")

return (False, resp)

@classmethod
def content_registry_pretty_message(cls, message):
messageBody = ''
messageBody = ""
try:
if '<html' in message:
messageBody = bs(message).find('body').text
elif '<?xml' in message:
messageBody = bs(message).find('response').text
if "<html" in message:
messageBody = bs(message).find("body").text
elif "<?xml" in message:
messageBody = bs(message).find("response").text
except Exception:
messageBody = message

return messageBody

def _start_ping(self, envPathName, op='up'):
""" `envPingStatus` string containing the path of the envelope to work
on. `op` is the operation that will the envelope wil be pingged for
"""
# lock the store globaly?
# FIXME we'll try without a lock for the begining
# pingStoreLock.aquire()
ts = time.time()
val = {'op': op, 'ts': ts}
val = pickle.dumps(val)
# start no matter what. expect the other to stop when he sees dirty ts
self.PING_STORE.hset(PING_ENVELOPES_REDIS_KEY, envPathName, val)
return ts

def _check_ping(self, envPathName, ts):
envPingStatus = self.PING_STORE.hget(
PING_ENVELOPES_REDIS_KEY, envPathName)
envPingStatus = pickle.loads(envPingStatus)
# also check if a later task already finished and reset the ts
if envPingStatus['ts'] > ts or envPingStatus['ts'] == 0:
# got dirty, somebody else started doing stuff on this envelope
return False
return True

def _stop_ping(self, envPathName, ts):
envPingStatus = self.PING_STORE.hget(
PING_ENVELOPES_REDIS_KEY, envPathName)
if envPingStatus:
envPingStatus = pickle.loads(envPingStatus)
# not us! don't reset
if envPingStatus['ts'] != ts:
return
else:
envPingStatus['op'] = None
envPingStatus['ts'] = 0
else:
envPingStatus = {'op': None, 'ts': 0}
envPingStatus = pickle.dumps(envPingStatus)
self.PING_STORE.hset(PING_ENVELOPES_REDIS_KEY,
envPathName, envPingStatus)

def ping_RabbitMQ(self, options):
""" Ping the CR/SDS service via RabbitMQ
"""
"""Ping the CR/SDS service via RabbitMQ"""
msg = "{create}|{service_to_ping}|{obj_url}".format(**options)
try:
send_message(msg, queue="cr_queue")
Expand Down
67 changes: 0 additions & 67 deletions Products/Reportek/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@


import logging
from traceback import format_exception_only

# Zope imports
import Zope2
Expand All @@ -47,10 +46,6 @@
)
from Products.Reportek.config import (
DEPLOYMENT_BDR,
DEPLOYMENT_CDR,
REDIS_DATABASE,
REDIS_HOSTNAME,
REDIS_PORT,
REPORTEK_DEPLOYMENT,
)
from Products.Reportek import (
Expand Down Expand Up @@ -136,19 +131,6 @@ def create_reportek_objects(app):
repo_engine = ReportekEngine.ReportekEngine()
app._setObject(constants.ENGINE_ID, repo_engine)

if REPORTEK_DEPLOYMENT == DEPLOYMENT_CDR:
import threading

crPingger = repo_engine.contentRegistryPingger
if crPingger:
pingger = threading.Thread(
target=ping_remaining_envelopes,
name="pingRemainingEnvelopes",
args=(app, crPingger),
)
pingger.setDaemon(True)
pingger.start()

# Add converters folder
try:
converters = getattr(app, constants.CONVERTERS_ID)
Expand Down Expand Up @@ -250,55 +232,6 @@ def _strip_protocol_domain(full_url):
return "/".join(parts[i:]), "/".join(parts[:i])


def ping_remaining_envelopes(app, crPingger):
import pickle

import redis

if REDIS_DATABASE and REDIS_HOSTNAME and REDIS_PORT:
try:
rs = redis.Redis(
host=REDIS_HOSTNAME, port=REDIS_PORT, db=REDIS_DATABASE
)
envPathNames = rs.hkeys(constants.PING_ENVELOPES_REDIS_KEY)
except Exception as e:
lines = format_exception_only(e.__class__, e)
lines.insert(0, "Could not get to redis server")
logger.warn(". ".join(lines))
return

for envPathName in envPathNames:
# get this fresh on every iteration
envStatus = rs.hget(
constants.PING_ENVELOPES_REDIS_KEY, envPathName
)
envStatus = pickle.loads(envStatus)
if not envStatus["op"]:
continue
envPathOnly, proto_domain = _strip_protocol_domain(envPathName)
env = app.unrestrictedTraverse(envPathOnly, None)
uris = [envPathName + "/rdf"]
if not env:
logger.warning(
"Envelope {} no longer exists in the zope "
"environment. Setting cr ping argument to "
"'delete'".format(envPathName)
)
envStatus["op"] = "delete"
else:
innerObjsByMetatype = env._getObjectsForContentRegistry()
# as we are not called from browser there is no domain part in
# the absolute_url
uris.extend(
proto_domain + "/" + o.absolute_url(1)
for objs in innerObjsByMetatype.values()
for o in objs
)
crPingger.content_registry_ping(
uris, ping_argument=envStatus["op"], envPathName=envPathName
)


def add_index(name, catalog, meta_type, meta=False):
if name not in catalog.indexes():
if meta_type == "ZCTextIndex":
Expand Down
Loading

0 comments on commit 9b9f89d

Please sign in to comment.