-
Notifications
You must be signed in to change notification settings - Fork 1
/
crypto-prices.py
61 lines (48 loc) · 1.45 KB
/
crypto-prices.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import os
import time
from typing import List, Dict
from influxdb import InfluxDBClient
from poloniex import Poloniex
def get_ticker_data(polo: Poloniex, currency_pair: str) -> float:
"""
Retrieve the last price of a given currency pair from Poloniex.
Args:
polo (Poloniex): The Poloniex API object.
currency_pair (str): The currency pair (e.g., "USDT_BTC").
Returns:
float: The last price of the currency pair.
"""
ticker = polo.returnTicker()[currency_pair]
return round(float(ticker["last"]), 3)
def create_influxdb_client() -> InfluxDBClient:
"""
Create an InfluxDB client.
Returns:
InfluxDBClient: The InfluxDB client object.
"""
return InfluxDBClient(
os.getenv("INFLUX_HOSTNAME"),
8086,
os.getenv("INFLUX_USERNAME"),
os.getenv("INFLUX_PASSWORD"),
os.getenv("INFLUX_DATABASE"),
)
def main():
polo = Poloniex()
client = create_influxdb_client()
while True:
json_body: List[Dict] = [
{
"measurement": "cryptocurrency",
"fields": {
"USDT_BTC": get_ticker_data(polo, "USDT_BTC"),
"USDT_ETH": get_ticker_data(polo, "USDT_ETH"),
"USDT_XMR": get_ticker_data(polo, "USDT_XMR"),
},
}
]
print(json_body)
client.write_points(json_body)
time.sleep(5)
if __name__ == "__main__":
main()