-
Notifications
You must be signed in to change notification settings - Fork 0
/
big_query_utils.py
122 lines (113 loc) · 5.65 KB
/
big_query_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import os
import json
from google.oauth2 import service_account
import pandas as pd
from datetime import date
# ------------------------------------------------------------------------------------------------ #
# ------------------------------- PJB Sports Data BigQuery Project ------------------------------- #
# ------- https://console.cloud.google.com/bigquery?hl=en&project=pjb-sports-data&ws=!1m0 -------- #
# ------------------------------------------------------------------------------------------------ #
if os.path.isfile('pjb-sports-data-4ad8cbc89360.json'):
with open('pjb-sports-data-4ad8cbc89360.json') as f:
os.environ['GOOGLE_CLOUD_API_KEY'] = f.read()
credentials = service_account.Credentials.from_service_account_info(json.loads(os.environ['GOOGLE_CLOUD_API_KEY']))
print(f'Successfully connected to Google Cloud project "', credentials.project_id, '" using service account "', credentials.service_account_email,
'"...', sep = '')
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------------ #
# ----------------------------------------- pandas-gbq: ------------------------------------------ #
# ----------------- https://cloud.google.com/bigquery/docs/pandas-gbq-migration ------------------ #
# ------------------------------------------------------------------------------------------------ #
def query_to_df(query: str):
return pd.read_gbq(query, project_id = credentials.project_id, dialect = 'standard', credentials = credentials)
def add_df_rows_to_table(table: str, df: pd.DataFrame):
if len(df.index) > 0:
df.columns = [col.replace('.', '_') for col in df.columns]
df.to_gbq(table, project_id = credentials.project_id, if_exists = 'append', credentials = credentials)
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------------ #
# -------------------------------------------- Tables -------------------------------------------- #
# ------------------------------------------------------------------------------------------------ #
table_primary_keys = {
'mlb.statcast_pitches': ['game_pk', 'at_bat_number', 'pitch_number'],
'mlb.fangraphs_injuries': ['playerId', 'season', 'date', 'injurySurgery']
}
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------- Queries -------------------------------------------- #
# ------------------------------------------------------------------------------------------------ #
class queries:
duplicate_rows_check = ''
for i, (table_name, primary_keys) in enumerate(table_primary_keys.items()):
if i > 0:
duplicate_rows_check += ' UNION ALL '
duplicate_rows_check += f'''
SELECT
"{table_name}" `Table`,
COUNT(*) `Duplicate Rows`
FROM
(
SELECT
0 a
FROM
`{table_name}`
GROUP BY
{', '.join(primary_keys)}
HAVING
COUNT(*) > 1
)
'''
statcast_pitches_by_year = '''
SELECT
game_year Year,
COUNT(*) `Big Query Pitches`
FROM
`mlb.statcast_pitches`
WHERE
game_type = "R"
GROUP BY
game_year
'''
def existing_game_dates(year: int | None = None, game_type: str | None = None):
where_clause = ''
if type(year) == int:
where_clause = f'WHERE game_year = {year}'
if type(game_type) == str:
if where_clause == '':
where_clause = f'WHERE game_type = "{game_type}"'
else:
where_clause += f' AND game_type = "{game_type}"'
return f'''
SELECT
DISTINCT game_date
FROM
`mlb.statcast_pitches`
{where_clause}
ORDER BY
game_date
'''
clear_fangraphs_injuries = '''
DELETE
FROM
`mlb.fangraphs_injuries`
WHERE
true
'''
def statcast(start_date: date, end_date: date):
return f'''
SELECT
*
FROM
`mlb.statcast_pitches` pitches
WHERE
game_date >= "{start_date.strftime('%Y-%m-%d')}" AND game_date <= "{end_date.strftime('%Y-%m-%d')}"
'''
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------------ #
# ------------------------------------------------------------------------------------------------ #