-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFetchApp.py
69 lines (51 loc) · 2.26 KB
/
FetchApp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from AppConfig import AppConfig
from PiiMasker import PiiMasker
from PostgresWriter import PostgresWriter
from SqsQueue import SqsQueue
from Validator import Validator
import logging
class FetchApp:
"""
FetchApp is the main application class that reads messages from an SQS queue,
masks the PII fields, and writes the data to a Postgres database. It uses the
AppConfig class to read the configuration file and initialize the components.
"""
def __init__(self):
self.app_config = AppConfig("config.ini")
log_format = "%(asctime)s [%(levelname)s] %(message)s"
logging.basicConfig(level=logging.DEBUG, format=log_format)
def run(self):
# Initialize the queue, masker, writer and logging
try:
queue = SqsQueue(
self.app_config.get_sqs_queue_url(),
self.app_config.get_sqs_endpoint_url(),
self.app_config.get_sqs_region(),
self.app_config.get_sqs_access_key_id(),
self.app_config.get_sqs_secret_access_key(),
)
pii_masker = PiiMasker()
postgres_writer = PostgresWriter(
self.app_config.get_database_connection_string()
)
postgres_writer.connect()
logging.info(f"Startup successful")
except Exception as e:
logging.error(f"Startup error: {e}.\nExiting app due to error.")
return
# Process messages from the queue
# TODO: Remove PII data from log messages
for message in queue.read_message():
try:
logging.debug(f"Processing message: {message}")
if not Validator().is_valid_message(message):
logging.error(f"Skipping invalid message: {message}.")
continue
masked_message = pii_masker.mask_all(message)
logging.debug(f"Masked message: {masked_message}")
postgres_writer.write_user_logins(masked_message)
logging.debug(f"Wrote to Postgres: {masked_message}")
except Exception as e:
logging.error(f"Error processing message: {e} for message: {message}.")
logging.info("Finished processing all messages")
FetchApp().run()