-
Notifications
You must be signed in to change notification settings - Fork 0
/
ghs_hex_plot.py
316 lines (259 loc) · 14.9 KB
/
ghs_hex_plot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
"""
NOTE: This code WILL NOT work outside of a Hex environment. It is designed to be used within the Hex ecosystem.
You must have a Hex account and be logged in to run this code, along with a valid datasource set up to use the data.
See the dbschema.sql for setting up a Snowflake datasource in Hex here: https://hex.tech/docs/snowflake
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import hextoolkit
import matplotlib.dates as mdates
import datetime
from dateutil.relativedelta import relativedelta
import itertools
# Establish connection using HexToolkit
hex_snowflake_conn = hextoolkit.get_data_connection('Contributor Stats')
session = hex_snowflake_conn.get_snowpark_session()
def create_and_process_dataframe(dict_inputs: dict, selected_metric: str) -> dict:
"""
Returns a dict of {"df": the plottable dataset , "contributors_list": a list of contributors, if filtered in selectors}
"""
dict_results: dict = {"df": None, "contributors_list": None}
# Extracting necessary parameters from dict_inputs
repo_aggregated_view = dict_inputs.get("repo_aggregated", False)
repo_topic_aggregated_view = dict_inputs.get("repo_topic_aggregated", False)
username_filter = dict_inputs.get("multiselect_contributors", [])
repo_filter = dict_inputs.get("multiselect_repo_names", [])
topic_filter = dict_inputs.get("multiselect_repo_topics", [])
start_date_date = dict_inputs.get("start_date", datetime.date.today() - relativedelta(years=1))
start_date_str = start_date_date.strftime('%Y-%m-%d')
# Adjust the query based on filters applied
contributors: str = ""
query_conditions = []
if 'multiselect_contributors' in dict_inputs and dict_inputs['multiselect_contributors']:
contributors = "', '".join(dict_inputs['multiselect_contributors'])
query_conditions.append(f'cs."contributor_name" IN (\'{contributors}\')')
if 'multiselect_repo_names' in dict_inputs and dict_inputs['multiselect_repo_names']:
repos = "', '".join(dict_inputs['multiselect_repo_names'])
query_conditions.append(f'cs."repo" IN (\'{repos}\')')
if 'multiselect_repo_topics' in dict_inputs and dict_inputs['multiselect_repo_topics']:
topics = "', '".join(dict_inputs['multiselect_repo_topics'])
query_conditions.append(f'rt."repo_topic" IN (\'{topics}\')')
conditions_str = " AND ".join(query_conditions) if query_conditions else "1=1"
query_template = f"""
SELECT
cs."{selected_metric}" AS "selected_metric",
rt."repo_topic",
cs.*
FROM "contributor_stats" cs
LEFT JOIN "repo_topics" rt ON cs."repo" = rt."repo_name"
WHERE cs."stats_beginning" >= '{start_date_str}' AND ({conditions_str})
ORDER BY cs."stats_beginning" ASC, cs."repo";
"""
# print(query_template)
# Execute the query and load results into a pandas DataFrame
df = session.sql(query_template).to_pandas()
df['stats_beginning'] = pd.to_datetime(df['stats_beginning'])
# Capture the contributor names before aggregation if username_filter is not empty
if len(username_filter) > 0:
# contributors_list = df[df['contributor_name'].isin(username_filter)]['contributor_name'].unique()
contributors_list = df['contributor_name'].unique().tolist()
else:
contributors_list = []
dict_results["contributors_list"] = contributors_list
# Aggregation and calculation of repo_count for repo_topic_aggregated_view
# repo_topic is selected but not aggregated: include all contributors to a repo_topic
if topic_filter and not repo_topic_aggregated_view:
agg_columns = {
selected_metric: 'median',
'repo': 'nunique', # Count unique repos
'changed_lines_per_day': 'median',
'commits_per_day': 'median',
'review_comments_per_day': 'median',
'prs_per_day': 'median',
'avg_pr_duration': 'median',
'avg_code_movement_per_pr': 'median'
}
df_agg: pd.DataFrame = df.groupby(['repo_topic', 'stats_beginning', 'contributor_name']).agg(agg_columns).reset_index()
"""
['repo_topic', 'stats_beginning', 'prs_per_day', 'repo',
'changed_lines_per_day', 'commits_per_day', 'review_comments_per_day',
'avg_pr_duration', 'avg_code_movement_per_pr'],
"""
df_agg = df_agg.rename(columns={'repo': 'repo_count'})
# Calculate per contributor metrics
contributor_count: int = len(df_agg['contributor_name'].unique().tolist())
for metric in ['commits_per_day', 'prs_per_day', 'changed_lines_per_day', 'review_comments_per_day']:
per_contributor_metric = f"{metric}_per_contributor"
df_agg[per_contributor_metric] = df_agg[metric] / contributor_count
df = df_agg
# Basic repo topic aggregation when both a topic is selected and aggregation requested
elif topic_filter and repo_topic_aggregated_view:
# Aggregate data
agg_dict = {
selected_metric: 'median', # Average of the selected metric
'contributor_name': 'nunique', # Count unique contributors
'repo': 'nunique', # Count unique repos
# Sum of per day metrics
'commits_per_day': 'median',
'prs_per_day': 'median',
'changed_lines_per_day': 'median',
'review_comments_per_day': 'median',
'avg_pr_duration' : 'median',
'avg_code_movement_per_pr' : 'median'
}
df_agg = df.groupby(['repo_topic', 'stats_beginning']).agg(agg_dict).reset_index()
df_agg = df_agg.rename(columns={'contributor_name': 'contributor_count', 'repo': 'repo_count'})
# Calculate per contributor metrics
for metric in ['commits_per_day', 'prs_per_day', 'changed_lines_per_day', 'review_comments_per_day']:
per_contributor_metric = f"{metric}_per_contributor"
df_agg[per_contributor_metric] = df_agg[metric] / df_agg['contributor_count']
df = df_agg
# Repo aggregation selected
elif repo_aggregated_view:
# Define aggregation dictionary
agg_dict = {
selected_metric: 'median', # Average of the selected metric
'contributor_name': 'nunique', # Count unique contributors
# Sum of per day metrics
'commits_per_day': 'median',
'prs_per_day': 'median',
'changed_lines_per_day': 'median',
'review_comments_per_day': 'median',
'avg_pr_duration' : 'median',
'avg_code_movement_per_pr' : 'median'
}
# Perform aggregation
df_agg = df.groupby(['repo', 'stats_beginning']).agg(agg_dict).reset_index()
# Rename 'contributor_name' column to 'contributor_count' for clarity
df_agg.rename(columns={'contributor_name': 'contributor_count'}, inplace=True)
# Calculate "per contributor" metrics for the aggregated data
for metric in ['commits_per_day', 'prs_per_day', "changed_lines_per_day", "review_comments_per_day"]:
per_contributor_metric = f"{metric}_per_contributor"
df_agg[per_contributor_metric] = df_agg[metric] / df_agg['contributor_count']
# Assign the aggregated DataFrame back to df
df = df_agg
# Handling duplicates and filtering based on the initial configuration
if not repo_aggregated_view and not repo_topic_aggregated_view and len(topic_filter) == 0:
df = df.drop_duplicates(subset=['contributor_name', 'repo', 'stats_beginning'], keep='first')
overall_median: float = df[selected_metric].median()
df['overall_median'] = overall_median
dict_results["df"] = df
return dict_results
def plot_data(dict_data: dict, dict_inputs: dict, selected_metric:str):
df: pd.DataFrame = dict_data.get("df")
contributors_list: list = dict_data.get("contributors_list", [])
title: str = dict_data.get("title", "Productivity")
start_date_str: str = dict_inputs["start_date"].strftime('%Y-%m-%d')
repo_aggregated_view: bool = dict_inputs.get("repo_aggregated", False)
repo_topic_aggregated_view: bool = dict_inputs.get("repo_topic_aggregated", False)
repo_topic_list: list = dict_inputs.get("multiselect_repo_topics", [])
label_suffix: str = ""
# Initialize the figure and axes
fig, ax = plt.subplots(figsize=(15, 6))
# Define separate cycles for line styles, markers, and colors
line_styles = itertools.cycle(['-', '--', '-.', ':'])
markers = itertools.cycle(['o', '^', 's', '*', '+', 'x', 'D', '|', '_'])
colors = itertools.cycle(plt.cm.tab20(np.linspace(0, 1, 20))) # Using tab20 for more distinct colors
widths = itertools.cycle([2])
# internal function to get the next style
def get_next_style():
return next(line_styles), next(markers), next(colors), next(widths)
plot_lines: list = []
line_labels: list = []
if repo_topic_aggregated_view or repo_aggregated_view:
# Determine if "per contributor" metrics should be plotted
per_contributor_metric = f"{selected_metric}_per_contributor"
if per_contributor_metric in df.columns:
plot_metric = per_contributor_metric
label_suffix = " per contributor"
else:
plot_metric = selected_metric
label_suffix = ""
if repo_topic_aggregated_view:
# When data is aggregated by repo_topic
for topic, group in df.groupby('repo_topic'):
style, marker, color, width = get_next_style()
line, = ax.plot(group['stats_beginning'], group[plot_metric], linestyle=style, marker=marker, color=color, linewidth=width,
label=f"{topic}{label_suffix} - Contributors: {group['contributor_count'].iloc[0]}, Repos: {group['repo_count'].iloc[0]}, Median: {group['overall_median'].iloc[0]}")
plot_lines.append(line)
line_labels.append(topic)
legend_title = 'Repo Topic'
elif repo_aggregated_view:
# When data is aggregated by repo
for (repo, group) in df.groupby('repo'):
style, marker, color, width = get_next_style()
line, = ax.plot(group['stats_beginning'], group[plot_metric], linestyle=style, marker=marker, color=color, linewidth=width,
label=f"{repo}{label_suffix} (Contributors: {group['contributor_count'].iloc[0]}), Median: {round(group['overall_median'].iloc[0],2)}")
plot_lines.append(line)
line_labels.append(repo)
legend_title = 'Repo'
else:
# When showing individual contributor data
for name, group in df.groupby('contributor_name'):
style, marker, color, width = get_next_style()
line, = ax.plot(group['stats_beginning'], group[selected_metric], linestyle=style, marker=marker, color=color, linewidth=width,
label=f"{name}, Peer group median: {round(group['overall_median'].iloc[0],2)}")
plot_lines.append(line)
line_labels.append(name)
legend_title = 'Contributor Name'
# Setting titles and labels
ytitle: str = f'{selected_metric}{label_suffix} since {start_date_str}'
ax.set_title( f'{title} {label_suffix}', color='black') # Ensure title is visible
ax.set_xlabel('Date', color='black')
ax.set_ylabel(ytitle)
# Date formatting
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))
ax.xaxis.set_major_locator(mdates.MonthLocator())
# Adjust tick parameters to ensure they are visible
ax.tick_params(axis='x', colors='black')
ax.tick_params(axis='y', colors='black')
# Setting legend with explicit visibility
ax.legend(title=f'{legend_title}', bbox_to_anchor=(1.05, 1), loc='upper left', edgecolor='black')
# Explicitly set the figure and axes background color after all plotting commands
fig.patch.set_facecolor('white') # Set the outer background color of the plot
ax.set_facecolor('white') # Set the inner background color of the plot area
# Rotate date labels to avoid overlap
plt.xticks(rotation=45) # Adjust the rotation angle if necessary
# Ensure layout is adjusted to not cut off elements
plt.tight_layout()
# Display the plot
# print(line_labels)
plt.show()
return label_suffix
list_dict_dfs: list = [] # We store a list of dictionaries, including all our DFs so we can look at the table later if we want.
if start_date_date is not None:
# list entry format is dict = {"title": str_title, "df": pd.DataFrame, "csv": str_filename}
for metric in gdict_inputs.get("metric_selector"):
# Create and process the DataFrame for the current metric
dict_results: dict = {}
dict_results = create_and_process_dataframe(gdict_inputs, metric)
df = dict_results.get("df", pd.DataFrame())
contributors_list: list = dict_results.get("contributors_list", [])
# Check if DataFrame is not empty
if not df.empty:
# Build parts of the message conditionally using list comprehensions
MAX_NAMES_TO_LIST: int = 8
parts: list = [
f"since {start_date_date.strftime('%Y-%m-%d')} in repos " + ", ".join(multiselect_repo_names) if (len(multiselect_repo_names)>0 and len(multiselect_repo_names)<MAX_NAMES_TO_LIST) else "",
f"since {start_date_date.strftime('%Y-%m-%d')} in repos " + ", ".join(multiselect_repo_names[:MAX_NAMES_TO_LIST]) + "... (etc)" if len(multiselect_repo_names)>=MAX_NAMES_TO_LIST else "",
f"since {start_date_date.strftime('%Y-%m-%d')} in repo topic " + ", ".join(multiselect_repo_topics) if multiselect_repo_topics else "",
f"since {start_date_date.strftime('%Y-%m-%d')} in repo topic " + ", ".join(multiselect_repo_topics[:MAX_NAMES_TO_LIST]) + "... (etc)" if len(multiselect_repo_topics)>=MAX_NAMES_TO_LIST else "",
f"\nfiltered by contributors " + ", ".join(contributors_list) if (len(contributors_list)>0 and len(contributors_list)<MAX_NAMES_TO_LIST) else "",
f"\nfiltered by contributors " + ", ".join(contributors_list[:MAX_NAMES_TO_LIST]) + "... (etc)" if len(contributors_list)>=MAX_NAMES_TO_LIST else ""
]
message = f"Plot of {metric} " + " ".join(filter(None, parts))
dict_results["title"] = message
stats_message: str = f" ".join(filter(None, parts))
dict_results["stats_title"] = stats_message
# csv_file: str = f"{start_date_date.strftime('%Y-%m-%d')}-{message}.csv"
list_dict_dfs.append(dict_results.copy())
# df.to_csv(csv_file, index=False)
print(message)
# Plot the data for the current metric
plot_data(dict_results, gdict_inputs, metric)
else:
print(f"No data to plot for {metric} based on the selected filters.")
else:
print("Skipped running due to lack of inputs")