-
Notifications
You must be signed in to change notification settings - Fork 0
/
BetrokkeneRelatieSyncer.py
96 lines (85 loc) · 4.91 KB
/
BetrokkeneRelatieSyncer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import logging
import time
import traceback
from datetime import datetime
from requests.exceptions import ConnectionError
from BetrokkeneRelatieFeedEventsCollector import BetrokkeneRelatieFeedEventsCollector
from BetrokkeneRelatieFeedEventsProcessor import BetrokkeneRelatieFeedEventsProcessor
from BetrokkeneRelatiesUpdater import BetrokkeneRelatiesUpdater
from EMInfraImporter import EMInfraImporter
from Exceptions.AgentMissingError import AgentMissingError
from Exceptions.AssetMissingError import AssetMissingError
from PostGISConnector import PostGISConnector
from ResourceEnum import colorama_table, ResourceEnum
from SyncTimer import SyncTimer
class BetrokkeneRelatieSyncer:
def __init__(self, postgis_connector: PostGISConnector, eminfra_importer: EMInfraImporter):
self.postgis_connector: PostGISConnector = postgis_connector
self.eminfra_importer: EMInfraImporter = eminfra_importer
self.updater: BetrokkeneRelatiesUpdater = BetrokkeneRelatiesUpdater()
self.events_collector: BetrokkeneRelatieFeedEventsCollector = BetrokkeneRelatieFeedEventsCollector(
eminfra_importer=eminfra_importer)
self.events_processor: BetrokkeneRelatieFeedEventsProcessor = BetrokkeneRelatieFeedEventsProcessor(
postgis_connector, eminfra_importer=eminfra_importer)
self.color = colorama_table[ResourceEnum.betrokkenerelaties]
def sync(self, connection, stop_when_fully_synced: bool = False):
while True:
try:
sync_allowed_by_time = SyncTimer.calculate_sync_allowed_by_time()
if not sync_allowed_by_time:
logging.info(self.color + 'syncing is not allowed at this time. Trying again in 5 minutes')
time.sleep(300)
continue
params = self.postgis_connector.get_params(connection)
current_page = params['page_betrokkenerelaties']
completed_event_id = params['event_uuid_betrokkenerelaties']
page_size = params['pagesize']
logging.info(self.color + f'starting a sync cycle for betrokkenerelaties, page: {str(current_page)} event_uuid: {str(completed_event_id)}')
start = time.time()
eventsparams_to_process = None
try:
eventsparams_to_process = self.events_collector.collect_starting_from_page(
current_page, completed_event_id, page_size, resource='betrokkenerelaties')
total_events = sum(len(lists) for lists in eventsparams_to_process.event_dict.values())
if total_events == 0:
logging.info(self.color + 'The database is fully synced for betrokkenerelaties. Continuing keep up to date in 30 seconds')
self.postgis_connector.update_params(params={'last_update_utc_betrokkenerelaties': datetime.utcnow()},
connection=connection)
if stop_when_fully_synced:
break
time.sleep(30) # wait 30 seconds to prevent overloading API
continue
except ConnectionError:
logging.info(self.color + "failed connection, retrying in 1 minute")
time.sleep(60)
continue
except Exception as err:
logging.error(err)
end = time.time()
self.log_eventparams(eventsparams_to_process.event_dict, round(end - start, 2), self.color)
time.sleep(30)
continue
try:
self.events_processor.process_events(eventsparams_to_process, connection)
except AssetMissingError or AgentMissingError:
logging.warning(self.color + 'Tried to add betrokkenerelaties but a source or target is missing. '
'Trying again in 60 seconds to allow other feeds to create the missing objects.')
time.sleep(60)
continue
except Exception as exc:
logging.error(exc)
connection.rollback()
time.sleep(30)
except ConnectionError:
logging.info(self.color + "failed connection, retrying in 1 minute")
time.sleep(60)
except Exception as err:
logging.error(self.color + err)
time.sleep(30)
@staticmethod
def log_eventparams(event_dict, timespan: float, color):
total = sum(len(events) for events in event_dict.values())
logging.info(color + f'fetched {total} betrokkenerelaties events to sync in {timespan} seconds')
for k, v in event_dict.items():
if len(v) > 0:
logging.info(color + f'number of events of type {k}: {len(v)}')