forked from spark-examples/pyspark-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyspark-column-functions.py
112 lines (87 loc) · 3.14 KB
/
pyspark-column-functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# -*- coding: utf-8 -*-
"""
author SparkByExamples.com
"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data=[("James","Bond","100",None),
("Ann","Varsa","200",'F'),
("Tom Cruise","XXX","400",''),
("Tom Brand",None,"400",'M')]
columns=["fname","lname","id","gender"]
df=spark.createDataFrame(data,columns)
#alias
from pyspark.sql.functions import expr
df.select(df.fname.alias("first_name"), \
df.lname.alias("last_name"), \
expr(" fname ||','|| lname").alias("fullName") \
).show()
#asc, desc
df.sort(df.fname.asc()).show()
df.sort(df.fname.desc()).show()
#cast
df.select(df.fname,df.id.cast("int")).printSchema()
#between
df.filter(df.id.between(100,300)).show()
#contains
df.filter(df.fname.contains("Cruise")).show()
#startswith, endswith()
df.filter(df.fname.startswith("T")).show()
df.filter(df.fname.endswith("Cruise")).show()
#eqNullSafe
#isNull & isNotNull
df.filter(df.lname.isNull()).show()
df.filter(df.lname.isNotNull()).show()
#like , rlike
df.select(df.fname,df.lname,df.id) \
.filter(df.fname.like("%om"))
#over
#substr
df.select(df.fname.substr(1,2).alias("substr")).show()
#when & otherwise
from pyspark.sql.functions import when
df.select(df.fname,df.lname,when(df.gender=="M","Male") \
.when(df.gender=="F","Female") \
.when(df.gender==None ,"") \
.otherwise(df.gender).alias("new_gender") \
).show()
#isin
li=["100","200"]
df.select(df.fname,df.lname,df.id) \
.filter(df.id.isin(li)) \
.show()
from pyspark.sql.types import StructType,StructField,StringType,ArrayType,MapType
data=[(("James","Bond"),["Java","C#"],{'hair':'black','eye':'brown'}),
(("Ann","Varsa"),[".NET","Python"],{'hair':'brown','eye':'black'}),
(("Tom Cruise",""),["Python","Scala"],{'hair':'red','eye':'grey'}),
(("Tom Brand",None),["Perl","Ruby"],{'hair':'black','eye':'blue'})]
schema = StructType([
StructField('name', StructType([
StructField('fname', StringType(), True),
StructField('lname', StringType(), True)])),
StructField('languages', ArrayType(StringType()),True),
StructField('properties', MapType(StringType(),StringType()),True)
])
df=spark.createDataFrame(data,schema)
df.printSchema()
#getItem()
df.select(df.languages.getItem(1)).show()
df.select(df.properties.getItem("hair")).show()
#getField from Struct or Map
df.select(df.properties.getField("hair")).show()
df.select(df.name.getField("fname")).show()
#dropFields
#from pyspark.sql.functions import col
#df.withColumn("name1",col("name").dropFields(["fname"])).show()
#withField
#from pyspark.sql.functions import lit
#df.withColumn("name",df.name.withField("fname",lit("AA"))).show()
#from pyspark.sql import Row
#from pyspark.sql.functions import lit
#df = spark.createDataFrame([Row(a=Row(b=1, c=2))])
#df.withColumn('a', df['a'].withField('b', lit(3))).select('a.b').show()
#from pyspark.sql import Row
#from pyspark.sql.functions import col, lit
#df = spark.createDataFrame([
#Row(a=Row(b=1, c=2, d=3, e=Row(f=4, g=5, h=6)))])
#df.withColumn('a', df['a'].dropFields('b')).show()