-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathauth.py
162 lines (139 loc) · 5.07 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import streamlit as st
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from passlib.hash import bcrypt
import os
import jwt
import datetime
import logging
from dotenv import load_dotenv
load_dotenv()
# Database setup
engine = create_engine(os.getenv('DATABASE_URL'))
Session = sessionmaker(bind=engine)
Base = declarative_base()
# JWT configuration
JWT_SECRET = os.getenv('JWT_SECRET')
JWT_ALGORITHM = 'HS256'
JWT_EXPIRATION_DELTA = datetime.timedelta(minutes=5)
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String, unique=True, nullable=False)
password_hash = Column(String, nullable=False)
Base.metadata.create_all(engine)
def hash_password(password):
return bcrypt.hash(password)
def verify_password(plain_password, hashed_password):
return bcrypt.verify(plain_password, hashed_password)
def register_user(username, password):
username = username.lower() # Convert username to lowercase
session = Session()
if session.query(User).filter_by(username=username).first():
return False
new_user = User(username=username, password_hash=hash_password(password))
session.add(new_user)
session.commit()
session.close()
return True
def authenticate_user(username, password):
username = username.lower() # Convert username to lowercase
session = Session()
user = session.query(User).filter_by(username=username).first()
session.close()
if user and verify_password(password, user.password_hash):
return user.id
return None
def create_token(user_id):
payload = {
'user_id': user_id,
'exp': datetime.datetime.utcnow() + JWT_EXPIRATION_DELTA
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def decode_token(token):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload['user_id']
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def login():
st.subheader("Login")
username = st.text_input("Username", key="login_username")
password = st.text_input("Password", type="password", key="login_password")
if st.button("Login"):
user_id = authenticate_user(username, password)
if user_id:
token = create_token(user_id)
st.session_state.token = token
st.session_state.user = username.lower() # Store lowercase username
st.session_state.user_id = user_id
st.success(f"Logged in as {username.lower()}")
logging.info(f"Authenticated user: {username.lower()}")
# Store token in sessionStorage
st.write("""
<script>
sessionStorage.setItem('jwt_token', '""" + token + """');
</script>
""", unsafe_allow_html=True)
return True
else:
st.error("Invalid username or password")
return False
def logout():
st.session_state.token = None
st.session_state.user = None
st.session_state.user_id = None
# Clear token from sessionStorage
st.write("""
<script>
sessionStorage.removeItem('jwt_token');
</script>
""", unsafe_allow_html=True)
st.success("Logged out successfully")
def register():
st.subheader("Register")
username = st.text_input("Username", key="register_username")
password = st.text_input("Password", type="password", key="register_password")
if st.button("Register"):
if register_user(username, password):
st.success("Registration successful. You can now log in.")
else:
st.error("Username already exists. Please choose a different username.")
def is_authenticated():
if 'token' not in st.session_state or not st.session_state.token:
# Check if token exists in sessionStorage
token = st.query_params.get('jwt_token')
if token:
user_id = decode_token(token)
if user_id:
st.session_state.token = token
st.session_state.user_id = user_id
logging.info("Token retrieved from sessionStorage")
return True
logging.info("No valid token found")
return False
user_id = decode_token(st.session_state.token)
if user_id:
new_token = create_token(user_id)
st.session_state.token = new_token
# Update token in sessionStorage
st.write(f"""
<script>
sessionStorage.setItem('jwt_token', '{new_token}');
</script>
""", unsafe_allow_html=True)
logging.info("Token refreshed")
return True
logging.info("User not authenticated")
return False
def authentication_required(func):
def wrapper(*args, **kwargs):
if is_authenticated():
return func(*args, **kwargs)
else:
st.warning("Please log in to access this page.")
login()
return wrapper