-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapuntes Spark 20170720.txt
230 lines (179 loc) · 8.84 KB
/
apuntes Spark 20170720.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
//// //// //// //// ////
http://spark.apache.org/docs/latest/rdd-programming-guide.html
--Linking with Spark
Finally, you need to import some Spark classes into your program. Add the following line:
from pyspark import SparkContext, SparkConf
--External Datasets
Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. Here is an example invocation:
>>> distFile = sc.textFile("data.txt")
--Passing Functions to Spark
"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)
--Printing elements of an RDD
estos puntos no los entendiste bien porque print y println no te funcionaron
--Working with Key-Value Pairs
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1)) # te hace tupla de (contenidoLinea,1)
counts = pairs.reduceByKey(lambda a, b: a + b)
--Actions
saveAsTextFile(path) - Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
//// //// //// //// ////
http://spark.apache.org/examples.html
--Word Count
//programa muy interesante donde indica como cargar de hdfs a spark y volver a dejar en hdfs un fichero y como coger palabras de una linea
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
--Pi Estimation
def inside(p):
x, y = random.random(), random.random()
return x*x + y*y < 1
count = sc.parallelize(xrange(0, NUM_SAMPLES)) \
.filter(inside).count()
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
--Text Search
textFile = sc.textFile("hdfs://...")
# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()
--Simple Data Operations
# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", "people") \
.load()
# Looks the schema of this DataFrame.
df.printSchema()
# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()
# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
//// //// //// //// ////
http://spark.apache.org/docs/latest/sql-programming-guide.html
--Datasets and DataFrames
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel.
A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.
The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName).
A DataFrame is a Dataset organized into named columns.
--Starting Point: SparkSession
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.
--Programmatically Specifying the Schema
str.strip([chars])
Return a copy of the string with the leading and trailing characters removed. The chars argument is a string specifying the set of characters to be removed. If omitted or None, the chars argument defaults to removing whitespace. The chars argument is not a prefix or suffix; rather, all combinations of its values are stripped:
>>>
>>> ' spacious '.strip()
'spacious'
>>> 'www.example.com'.strip('cmowz.')
'example'
# EL SIGUIENTE TROZO DE CODIGO ES IMPORTANTISIMO PARA LA TRASFORMACION QUE TIENES QUE HACER EN UN PROYECTO FIN PROGRAMA EXPERTO BIG DATA PARA PILLAR LA TABLA CSV
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
<<<<<<<<
https://stackoverflow.com/questions/663171/is-there-a-way-to-substring-a-string-in-python
Is there a way to substring a string in Python?
>>> x = "Hello World!"
>>> x[2:]
'llo World!'
>>> x[:2]
'He'
>>> x[:-2]
'Hello Worl'
>>> x[-2:]
'd!'
>>> x[2:-2]
'llo Worl'
Python calls this concept "slicing" and it works on more than just strings. Take a look here for a comprehensive introduction.
>>>>>>>>
<<<<<<<<
pyspark.sql.DataFrame.show (Python method, in pyspark.sql module)
show(n=20, truncate=True)
Prints the first n rows to the console.
Parameters:
n – Number of rows to show.
truncate – If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right.
>>> df
DataFrame[age: int, name: string]
>>> df.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
+---+-----+
>>> df.show(truncate=3)
+---+----+
|age|name|
+---+----+
| 2| Ali|
| 5| Bob|
+---+----+
>>>>>>>>
--Data Sources
Registering a DataFrame as a temporary view allows you to run SQL queries over its data.
--Generic Load/Save Functions
pyspark.sql.DataFrameWriter.csv (Python method, in pyspark.sql module) #esta funcion te puede resultar muy util para tu proyecto
pyspark.sql.DataFrameWriter.save (Python method, in pyspark.sql module)
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
--Manually Specifying Options
Data sources are specified by their fully qualified name (i.e., org.apache.spark.sql.parquet), but for built-in sources you can also use their short names (json, parquet, jdbc, orc, libsvm, csv, text). DataFrames loaded from any data source type can be converted into other types using this syntax.
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
pyspark.sql.DataFrameReader.load (Python method, in pyspark.sql module)
pyspark.sql.DataFrameWriter.save (Python method, in pyspark.sql module)
--loading data programmatically ==> MUY IMPORTANTE PARA SUBIR FICHEROS A DATAFRAMES, CREARLOS COMO TABLAS TEMPORALES, Y OPERAR SOBRE ESAS TABLAS
peopleDF = spark.read.json("examples/src/main/resources/people.json")
# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()
# +------+
# | name|
# +------+
# |Justin|
# +------+
--schema merging
//como crear rapidamente una dataframe y, por tanto, una tabla
squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6)).map(lambda i: Row(single=i, double=i ** 2)))
from pyspark.sql import Row
sc = spark.sparkContext
squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
.map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
.map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
--hive tables
Find full example code at "examples/src/main/python/sql/hive.py" in the Spark repo
--JDBC to other databases
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.