-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdbengine.py
More file actions
232 lines (204 loc) · 7.27 KB
/
dbengine.py
File metadata and controls
232 lines (204 loc) · 7.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sqlite3
import threading
from typing import Any, Dict, Generator
from venues.abstract_venue import AbstractVenue
dbname = "bandevents.db"
def init_db() -> None:
sql_schema = """
CREATE TABLE IF NOT EXISTS artist (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
playcount INTEGER
);
CREATE TABLE IF NOT EXISTS venue (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
city TEXT NOT NULL,
country TEXT NOT NULL,
UNIQUE (name, city, country)
);
CREATE TABLE IF NOT EXISTS event (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
venueid INTEGER NOT NULL,
date TEXT NOT NULL,
price TEXT,
FOREIGN KEY (venueid) REFERENCES venue(id),
UNIQUE (name, date, venueid)
);
"""
with sqlite3.connect(dbname) as conn:
cur = conn.cursor()
cur.executescript(sql_schema)
conn.commit()
cur.close()
class DBEngine(object):
def __init__(self) -> None:
self.conn = None
self.__first_run()
self.lock = threading.Lock()
def __first_run(self) -> None:
if self.conn is None:
self.conn = sqlite3.connect(dbname)
def close(self) -> None:
with self.lock:
if self.conn:
self.conn.close()
def get_conn(self) -> sqlite3.Connection:
return self.conn
def plugin_create_venue_entity(self, venue: Dict[str, str]) -> None:
"""
Create needed venue entries.
Parameter venue is Dogshome.eventSQLentity(), i.e.
"""
cols = ", ".join(venue.keys())
placeholders = ":" + ", :".join(venue.keys())
q = "INSERT OR IGNORE INTO venue (%s) VALUES (%s);" \
% (cols, placeholders)
with self.lock:
cur = None
try:
cur = self.conn.cursor()
cur.execute(q, venue)
self.conn.commit()
except Exception as e:
print(f"Couldn't create venue entity: {e}")
finally:
cur.close()
def insert_venue_events(self, venue: AbstractVenue, events: list[dict[str, Any]]) -> None:
"""
Insert parsed events from a venue into the database.
"""
with self.lock:
cur = None
try:
cur = self.conn.cursor()
for event in events:
venue_data = self.get_venue_by_name(
event["venue"],
venue.get_city(),
venue.get_country())
if venue_data is None:
raise f"Couldn't insert events into venue '{venue.name}'"
event["venueid"] = venue_data[0]
event.pop("venue") # venue -> venueid to match sql implementation
cols = ", ".join(event.keys())
placeholders = ":" + ", :".join(event.keys())
q = f"INSERT OR IGNORE INTO event ({cols}) VALUES ({placeholders});"
cur.execute(q, event)
except Exception as e:
print(f"Couldn't insert events for venue '{venue.name}': {e}")
finally:
self.conn.commit()
cur.close()
def insert_lastfm_artists(self, artist: str, playcount: int) -> None:
q = "INSERT OR REPLACE INTO artist (name, playcount) VALUES (?, ?);"
with self.lock:
cur = None
try:
cur = self.conn.cursor()
cur.execute(q, [artist, playcount])
self.conn.commit()
except Exception as e:
print(f"Couldn't insert artist '{artist}' to LastFM table: {e}")
finally:
cur.close()
def get_venues(self) -> Dict[str, str]:
q = "SELECT id, name, city, country FROM venue"
cur = None
results = dict()
try:
cur = self.conn.cursor()
res = cur.conn.execute(q)
results = res.fetchall()
except Exception as e:
print(f"Couldn't get venue data: {e}")
finally:
cur.close()
return results
def get_venue_by_name(self, vname: str, city: str, country: str) -> str:
q = "SELECT id, name, city, country FROM venue " \
+ "WHERE name = ? AND city = ? AND country = ? LIMIT 1;"
venue_name = str()
cur = None
try:
cur = self.conn.cursor()
results = cur.execute(q, [vname, city, country])
venue_name = results.fetchone()
except Exception as e:
print(f"Couldn't get venue '{vname}:{city}:{country}' by name: {e}")
finally:
cur.close()
if len(venue_name) != 4:
raise Exception(f"Wrong number of arguments: {venue_name}")
return venue_name
def get_relevant_gigs(self) -> dict[str, str]:
q = """SELECT
e.date,
v.name AS venue_name,
v.city,
e.name AS event_name,
GROUP_CONCAT(DISTINCT a.name || ' (' || a.playcount || ')') AS matching_artists
FROM event AS e
JOIN venue AS v ON e.venueid = v.id
JOIN artist AS a
ON ' ' || LOWER(REPLACE(REPLACE(e.name, '-', ' '), ':', ' ')) || ' '
LIKE '% ' || LOWER(a.name) || ' %'
WHERE a.playcount > 10
GROUP BY e.date, v.name, v.city
ORDER BY e.date ASC;
"""
gigs = dict()
cur = None
try:
self.conn.row_factory = sqlite3.Row
cur = self.conn.cursor()
results = cur.execute(q)
gigs = results.fetchall()
except Exception as e:
print(f"Couldn't get relevant gigs: {e}")
finally:
cur.close()
return gigs
def get_artists(self) -> Generator[Dict[str, str], None, None]:
q = "SELECT name, playcount FROM artist;"
cur = None
try:
cur = self.conn.cursor()
results = cur.execute(q)
for artist, playcount in results.fetchall():
yield {"artist": artist,
"playcount": playcount}
except Exception as e:
print(f"Couldn't get artists: {e}")
finally:
cur.close()
def get_artist(self, name: str) -> Generator[Dict[str, str], None, None]:
q = "SELECT name, playcount FROM artist " \
+ "WHERE name = ? LIMIT 5;"
cur = None
try:
cur = self.conn.cursor()
results = cur.execute(q, [name])
for artist, playcount in results.fetchall():
yield {"artist": artist,
"playcount": playcount}
except Exception as e:
print(f"Couldn't get artist '{name}': {e}")
finally:
cur.close()
def purge_old_events(self) -> None:
q = "DELETE FROM event " \
+ "WHERE strftime('%Y-%m-%d', date) < date('now');"
with self.lock:
cur = None
try:
cur = self.conn.cursor()
cur.execute(q)
self.conn.commit()
except Exception as e:
print(f"Couldn't purge old events: {e}")
finally:
cur.close()