forked from spark-examples/pyspark-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyspark-udf.py
80 lines (55 loc) · 2.02 KB
/
pyspark-udf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# -*- coding: utf-8 -*-
"""
author SparkByExamples.com
"""
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders")]
df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)
def convertCase(str):
resStr=""
arr = str.split(" ")
for x in arr:
resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
return resStr
""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z))
df.select(col("Seqno"), \
convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)
@udf(returnType=StringType())
def upperCase(str):
return str.upper()
upperCaseUDF = udf(lambda z:upperCase(z),StringType())
df.withColumn("Cureated Name", upperCase(col("Name"))) \
.show(truncate=False)
""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
.show(truncate=False)
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + \
"where Name is not null and convertUDF(Name) like '%John%'") \
.show(truncate=False)
""" null check """
columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders"),
('4',None)]
df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , StringType())
spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2") \
.show(truncate=False)
spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
" where Name is not null and _nullsafeUDF(Name) like '%John%'") \
.show(truncate=False)