-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsql.py
98 lines (74 loc) · 2.63 KB
/
sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""
Provides database functionality for CryptoCoinScraper.
"""
import os
import sys
import logging
import pandas as pd
import sqlalchemy
from sqlalchemy import Column, Float, ForeignKey, Integer, String, Time
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
import cryptocoinscraper
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Base = declarative_base()
Session = sessionmaker()
engine = None
class CryptoCurrency(Base):
__tablename__ = "cryptocurrencies"
id = Column(Integer, primary_key=True)
name = Column(String(255))
symbol = Column(String(15))
data = relationship('MarketData', backref='cryptocurrencies')
class MarketData(Base):
__tablename__ = 'marketdata'
id = Column(Integer, primary_key=True)
currency_id = Column(Integer, ForeignKey('cryptocurrencies.id'))
price = Column(Float)
circulatingSupply = Column(Float)
percentChange24h = Column(Float)
percentChange7d = Column(Float)
marketCap = Column(Float)
volume24h = Column(Float)
datetime = Column(Time)
def insert_dataframe(df: pd.DataFrame, filename: str):
"""
Insert a Pandas DataFrame into the database.
Args:
df (pd.DataFrame): the DataFrame to get data from.
filename (str): path to the database.
"""
Base.metadata.create_all(get_engine(filename))
with Session(bind=get_engine()) as session:
for i in range(len(df)):
row = df.iloc[i]
coin = session.query(CryptoCurrency).filter_by(name=row['name'], symbol=row['symbol']).first()
if coin is None:
coin = CryptoCurrency(name=row['name'], symbol=row['symbol'])
session.add(coin)
session.commit()
d = row.to_dict()
d.pop('name')
d.pop('symbol')
d.update({'currency_id': coin.id, 'datetime': row.datetime.time()})
data = MarketData(**d)
session.add(data)
session.commit()
def get_engine(filename=None):
"""
Get a SQLAlchemy engine; note that subsequent calls will use the
first `filename` argument that this function received.
"""
global engine
if engine is None:
engine = sqlalchemy.create_engine("sqlite+pysqlite:///{}".format(filename))
elif filename is not None:
logger.warning("ignoring the provided filename in get_engine(): {}".format(filename))
return engine
# simple tests
if __name__ == '__main__':
scraper = cryptocoinscraper.CoinMarketCap()
df = scraper.to_dataframe()
insert_dataframe(df, ":memory:")
print("Tests passed.")
sys.exit(os.EX_OK)