-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPostgresWriter.py
77 lines (64 loc) · 2.12 KB
/
PostgresWriter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import psycopg2
from datetime import date
class PostgresWriter:
"""
PostgresWriter is the class that writes user login data to the Postgres
database. It uses the psycopg2 library to connect to the database and write
the data.
TODO: Add connection pool, exponential backoff retry logic, and error
handling. Use secure method for getting user and password.
"""
LOGIN_QUERY = """
INSERT INTO user_logins(
user_id,
device_type,
masked_ip,
masked_device_id,
locale,
app_version,
create_date
) VALUES (
%s, %s, %s, %s, %s, %s, %s
);
"""
def __init__(self, connection_string):
self.connection_string = connection_string
self.connection = None
self.cursor = None
def connect(self):
self.connection = psycopg2.connect(self.connection_string)
self.cursor = self.connection.cursor()
def write_user_logins(self, data):
"""
CREATE TABLE IF NOT EXISTS user_logins(
user_id varchar(128),
device_type varchar(32),
masked_ip varchar(256),
masked_device_id varchar(256),
locale varchar(32),
app_version integer,
create_date date
);
"""
# Convert app_version to integer 2.3.4 becomes 20304
version_parts = data["app_version"].split(".")
version_parts = version_parts + [0] * (3 - len(version_parts))
major, minor, patch = map(int, version_parts)
app_version = major * 10000 + minor * 100 + patch
today = date.today()
if not self.cursor:
raise Exception("Database connection not initialized")
# Write data to Postgres
self.cursor.execute(
self.LOGIN_QUERY,
(
data["user_id"],
data["device_type"],
data["masked_ip"],
data["masked_device_id"],
data["locale"],
app_version,
today,
),
)
self.connection.commit()