forked from Paulcy10x/Twitter-Data-Analysis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmysql_manager.py
99 lines (74 loc) · 2.53 KB
/
mysql_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import json
from pprint import pprint
from traceback import print_exc
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import text
from sqlalchemy import inspect
from clean_tweets_dataframe import CleanTweets
from tweets_preprocess import SADataPreparation
from utils import DataLoader
LABLED_SCHEMA = "labled_schema.sql"
CLEANED_SCHEMA = "cleaned_schema.sql"
CSV_PATH = "processed_tweet_data.csv"
BANNER = "="*20
with open("db_cred.json", 'r') as f:
config = json.load(f)
# Connect to the database
connections_path = f"mysql+pymysql://{config['user']}:{config['password']}@{config['host']}/twitter_data"
engine = create_engine(connections_path)
# Create the tables
def create_tables():
try:
with engine.connect() as conn:
for name in [LABLED_SCHEMA, CLEANED_SCHEMA]:
with open(name) as file:
query = text(file.read())
conn.execute(query)
print("Successfully created 2 tables")
except:
print("Unable to create the Tables")
print(print_exc())
# Read the data
def get_data(labled=True):
loader = DataLoader('./', CSV_PATH)
tweets_df = loader.read_csv()
print("Got the dataframe")
cleaner = CleanTweets()
cleand_df = cleaner.run_pipeline(tweets_df)
print("Done Cleaning")
if not labled:
return cleand_df
pprocessor = SADataPreparation()
labled_df = pprocessor.preprocess_data(cleand_df)
return cleand_df, labled_df
# Populate the tables
def insert_data(df: pd.DataFrame, table_name):
try:
with engine.connect() as conn:
df.to_sql(name=table_name, con=conn,
if_exists='replace', index=False)
print(f"Done inserting to {table_name}")
print(BANNER)
except:
print("Unable to insert to table")
print(print_exc())
# Implement Querying functions
def get_table_names():
with engine.connect() as conn:
inspector = inspect(conn)
names = inspector.get_table_names()
return names
def get_labled_tweets():
with engine.connect() as conn:
labled_df = pd.read_sql_table('tweets_information', con=conn)
return labled_df
def get_cleaned_tweets():
with engine.connect() as conn:
cleand_df = pd.read_sql_table('cleaned_tweets_information', con=conn)
return cleand_df
if __name__ == "__main__":
create_tables()
cleand_df, labled_df = get_data()
insert_data(labled_df, "tweets_information")
insert_data(cleand_df, "cleaned_tweets_information")