-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb.py
79 lines (63 loc) · 2.28 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import datetime
import pytz
from flask import session
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy_utils.types import ChoiceType
from sqlalchemy.orm.exc import NoResultFound
from config import app
db = SQLAlchemy(app)
class Note(db.Model):
classifier_choices = [
('light', 'Light'),
('moderate', 'Moderate'),
('severe', 'Severe'),
('important', 'Important'),
]
id = db.Column(db.Integer, primary_key=True)
text = db.Column(db.String(250))
classifier = db.Column(ChoiceType(classifier_choices))
created_date = db.Column(db.DateTime)
def __repr__(self):
return '<Note created[%r] text[%r]>' % (self.created_date, self.text)
@staticmethod
def save(text, classifier):
note = Note(text=text, classifier=classifier, created_date=datetime.datetime.now(tz=pytz.utc))
db.session.add(note)
db.session.commit()
return note
@staticmethod
def list_notes(days=1):
return Note.query.filter(
Note.created_date > datetime.datetime.now() - datetime.timedelta(days=days)
).order_by(Note.created_date.asc())
class Auth(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(250))
expiry_date = db.Column(db.DateTime)
access_token = db.Column(db.String(250))
refresh_token = db.Column(db.String(250))
def __repr__(self):
return '<Auth usr[%r] expiry[%r]>' % (self.user_id, self.expiry_date)
@staticmethod
def save(user_id, access_token, refresh_token, expiry_date):
auth = Auth.get_auth(user_id)
if not auth:
auth = Auth()
auth.user_id = user_id
auth.access_token = access_token
auth.refresh_token = refresh_token
auth.expiry_date = datetime.datetime.fromtimestamp(int(expiry_date))
db.session.add(auth)
db.session.commit()
return auth
@staticmethod
def get_auth(user_id):
try:
auth = Auth.query.filter(Auth.user_id == user_id).one()
except NoResultFound:
auth = None
return auth
@staticmethod
def is_auth_expired():
expiry_timestamp = session.get('expiry')
return datetime.datetime.fromtimestamp(int(expiry_timestamp)) < datetime.datetime.now()