-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathUser Defined Functions.py
90 lines (69 loc) · 3.62 KB
/
User Defined Functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
###### Transpose PySpark DataFrame for Apache Spark Distributed System
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
def TransposeDF(df, columns, pivotCol):
columnsValue = list(map(lambda x: str("'") + str(x) + str("',") + str(x), columns))
stackCols = ','.join(x for x in columnsValue)
df_1 = df.selectExpr(pivotCol, "stack(" + str(len(columns)) + "," + stackCols + ")")\
.select(pivotCol, "col0", "col1")
final_df = df_1.groupBy(col("col0")).pivot(pivotCol).agg(concat_ws("", collect_list(col("col1"))))\
.withColumnRenamed("col0", pivotCol)
return final_df
#First parameter is input DataFrame.
#Second Parameter is Sequence of columns of Input DataFrame that need to transpose into rows.
#Third Parameter is pivot column.
df = TransposeDF(df, df.columns[1:], "AAPL_dateTime")
###### Monte Carlo Simulation for Apache Spark Distributed System
import random
import time
from operator import add
#The function generates a random return on the investment, as a percentage, every year for the duration of a specified term.
#The function takes a seed value as a parameter.
#This value is used to reseed the random number generator, which ensures that the function doesn't get the same list of random numbers each time it runs.
#The random.normalvariate function ensures that random values occur across a normal distribution for the specified mean and standard deviation.
#The function increases the value of the portfolio by the growth amount, which could be positive or negative, and adds a yearly sum that represents further investment.
def grow(seed):
random.seed(seed)
portfolio_value = INVESTMENT_INIT
for i in range(TERM):
growth = random.normalvariate(MKT_AVG_RETURN, MKT_STD_DEV)
portfolio_value += portfolio_value * growth + INVESTMENT_ANN
return portfolio_value
#Create many seeds to feed to the function
seeds = sc.parallelize([time.time() + i for i in range(10000)])
#Feed the RDD that contains the seeds to the growth function
results = seeds.map(grow)
#Specify some values for the function
INVESTMENT_INIT = 100000 # starting amount
INVESTMENT_ANN = 0 # yearly new investment
TERM = 252 # number of years
MKT_AVG_RETURN = df.select(avg('portfolio_daily_return')) # percentage
MKT_STD_DEV = df.select(stddev ('portfolio_daily_return')) # standard deviation
#Aggregate the values in the RDD
sum = results.reduce(add)
final_mc = (sum / 10000.)
#Display the average return
print (final_mc)
###### Shapley Additive Explanations (SHAP) for small datasets
import shap
def shap_small():
explainer = shap.TreeExplainer(clf)
shap_values = explainer.shap_values(df)
return shap_values
###### Shapley Additive Explanations (SHAP) for Apache Spark Distributed System
# https://www.databricks.com/blog/2022/02/02/scaling-shap-calculations-with-pyspark-and-pandas-udf.html
import shap
def calculate_shap(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
explainer = shap.TreeExplainer(clf)
shap_values = explainer.shap_values(df)
for X in iterator:
yield pd.DataFrame(
explainer.shap_values(np.array(X), check_additivity=False)[0],
columns=columns_for_shap_calculation,
)
return_schema = StructType()
for feature in columns_for_shap_calculation:
return_schema = return_schema.add(StructField(feature, FloatType()))
shap_values = df.mapInPandas(calculate_shap, schema=return_schema)
#Strike a balance between creating small enough partitions and not so small that the overhead of creating them outweighs the benefits of parallelizing the calculations.
df = df.repartition(sc.defaultParallelism)