The Pivotal Greenplum Database (GPDB) is an advanced, fully featured, open source data warehouse. It provides powerful and rapid analytics on petabyte scale data volumes. Uniquely geared toward big data analytics, Greenplum Database is powered by the world’s most advanced cost-based query optimizer delivering high analytical query performance on large data volumes.
https://pivotal.io/pivotal-greenplum
The Pivotal Greenplum-Spark Connector provides high speed, parallel data transfer between Greenplum Database and Apache Spark clusters to support:
- Interactive data analysis
- In-memory analytics processing
- Batch ETL
Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing. http://spark.apache.org/
- Pre-requisites
- Using docker-compose
- Connect to Greenplum and Spark via Greenplum-Spark connector
- Read data from Greenplum into Spark
- Write data from Spark DataFrame into Greenplum - with JDBC
- Using pySpark
- docker-compose
- Greenplum-Spark connector
- Postgres JDBC driver - if you want to write data from Spark into Greenplum
To create a standalone Greenplum cluster with the following command in the github root directory. It builds a docker image with Pivotal Greenplum binaries and download some existing images such as Spark master and worker. Initially, it may take some time to download the docker image.
$ ./runDocker.sh -t usecase1 -c up
Creating network "usecase1_default" with the default driver
Creating sparkmaster ... done
Creating gpdbsne ... done
Creating sparkworker ... done
...
The SparkUI will be running at http://localhost:8081
with one worker listed.
To access Greenplum cluster
, exec into a container:
$ docker exec -it gpdbsne bin/bash
root@master:/usr/spark-2.1.0#
Follow this readme
In this example, we will describe how to configure Greenplum-Spark connector when you run Spark-shell.
-
Make sure you download greenplum-spark_2.11-1.6.0.jar or latest jar from Pivotal Network.
-
Connect to the Spark master docker image
$ docker exec -it sparkmaster bash
- Run the command to start a spark shell that loads Greenplum-Spark connector. This section assumes you have downloaded latest greenplum-spark.jar under the github repo with subfolder
scripts
. The root directory is mounted by the docker images under /code directory. You can also use scripts such asscripts/download_postgresql.sh
to download binaries.
Also, we included Postgresql (optional), in order to write data from Spark into Greenplum. Greenplum-Spark connector will support write features in future release and support parallel data transfer that performs significantly better than JDBC driver.
root@master:/usr/spark-2.4.0#GSC_JAR=$(ls /code/scripts/greenplum-spark_2.11-*.jar)
root@master:/usr/spark-2.4.0#POSTGRES_JAR=$(ls /code/scripts/postgresql-*.jar)
root@master:/usr/spark-2.4.0#spark-shell --jars "${GSC_JAR},${POSTGRES_JAR}" --driver-class-path ${POSTGRES_JAR}
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
- Verify Greenplum-Spark driver is successfully loaded by Spark Shell
You can follow the example below to verify the Greenplum-Spark driver. The scala repl confirms the driver is accessible by returning
res0
result.
scala> Class.forName("io.pivotal.greenplum.spark.GreenplumRelationProvider")
res0: Class[_] = class io.pivotal.greenplum.spark.GreenplumRelationProvider
Verify JDBC driver is successfully loaded by Spark Shell
You can follow the example below to verify the JDBC driver. The scala repl confirms the driver is accessible by returning res1
result.
scala> Class.forName("org.postgresql.Driver")
res1: Class[_] = class org.postgresql.Driver
In this section, we will read data from Greenplum into Spark. It assumes the database and table are already created. See how to setup GPDB DB with script
- By default, you can run the command below to retrieve data from Greenplum with a single data partition in Spark cluster. In order to paste the command, you need to type
:paste
in the scala environment and paste the code below, followed byCtrl-D
scala> :paste
// Entering paste mode (ctrl-D to finish)
// that gives an one-partition Dataset
val dataFrame = spark.read.format("greenplum")
.option("url", "jdbc:postgresql://gpdbsne/basic_db")
.option("user", "gpadmin")
.option("password", "pivotal")
.option("dbschema", "public")
.option("dbtable", "basictable")
.option("partitionColumn", "id")
.load()
// Exiting paste mode, now interpreting.
- You can verify the Spark DataFrame by running these commands
dataFrame.printSchema
anddataFrame.show()
scala> dataFrame.printSchema
root
|-- id: integer (nullable = false)
|-- value: string (nullable = true)
scala> dataFrame.show()
+---+--------+
| id| value|
+---+--------+
| 1| Alice|
| 3| Charlie|
| 5| Jim|
| 7| Jack|
| 9| Zim|
| 15| Jim|
| 11| Bob|
| 13| Eve|
| 17|Victoria|
| 25|Victoria|
| 27| Alice|
| 29| Charlie|
| 31| Zim|
| 19| Alice|
| 21| Charlie|
| 23| Jim|
| 33| Jim|
| 35| Eve|
| 43|Victoria|
| 45| Alice|
+---+--------+
only showing top 20 rows
scala> dataFrame.filter(dataFrame("id") > 40).show()
+---+--------+
| id| value|
+---+--------+
| 41| Jim|
| 43| Jack|
| 45| Zim|
| 47| Alice|
| 49| Charlie|
| 51| Jim|
| 53| Jack|
| 55| Bob|
| 57| Eve|
| 59| John|
| 61|Victoria|
| 63| Zim|
| 65| Bob|
| 67| Eve|
| 69| John|
| 71|Victoria|
| 73| Bob|
| 75| Alice|
| 77| Charlie|
| 79| Jim|
+---+--------+
only showing top 20 rows
scala> dataFrame.explain
== Physical Plan ==
*Scan GreenplumRelation(StructType(StructField(id,IntegerType,false), StructField(value,StringType,true)),[Lio.pivotal.greenplum.spark.GreenplumPartition;@738ed8f5,io.pivotal.greenplum.spark.GreenplumOptions@1cfb7450) [id#0,value#1]
- You create a temporary table to cache the results from Greenplum and using option to speed your in-memory processing in Spark cluster. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1. Meanwhile, Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates.
scala>
// Register the DataFrame as a global temporary view
dataFrame.createGlobalTempView("tempdataFrame")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.tempdataFrame").show()
In this section, you can write data from Spark DataFrame into Greenplum table by using Greenplum-Spark connector.
Pre-requisites:
- Make sure your spark shell is loaded Greenplum-spark connector jar.
root@master:/usr/spark-2.1.0#GSC_JAR=$(ls /code/scripts/greenplum-spark_2.11-*.jar)
root@master:/usr/spark-2.1.0#POSTGRES_JAR=$(ls /code/scripts/postgresql-*.jar)
root@master:/usr/spark-2.1.0#spark-shell --jars "${GSC_JAR},${POSTGRES_JAR}" --driver-class-path ${POSTGRES_JAR}
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
- Determine the number of records in the "basictable" table by using psql command.
$ docker exec -it gpdbsne /bin/bash
[root@d632f535db87 data]# psql -h localhost -U gpadmin -d basic_db -c "select count(*) from basictable"
count
-------
18432
(1 row)
- Configure JDBC URL and connection Properties and use DataFrame write operation to write data from Spark into Greenplum. You can use different write mode
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.SaveMode
val gscWriteOptionMap = Map(
"url" -> "jdbc:postgresql://gpdbsne/basic_db",
"user" -> "gpadmin",
"password" -> "changeme",
"password" -> "pivotal",
"dbschema" -> "public",
"dbtable" -> "basictable"
)
dataFrame.write.format("greenplum")
.options(gscWriteOptionMap)
.mode(SaveMode.Append)
.save()
// Exiting paste mode, now interpreting.
- Verify the write operation is successful by exec into GPDB container and run psql command-line. The total number records in the Greenplum table must be 2x of the original data.
$ docker exec -it gpdbsne /bin/bash
[root@d632f535db87 data]# psql -h localhost -U gpadmin -d basic_db -c "select count(*) from basictable" -w pivotal
psql: warning: extra command-line argument "pivotal" ignored
count
-------
`18432`
(1 row)
In this section, you can write data from Spark DataFrame into Greenplum table. by using JDBC driver.
Pre-requisites:
-
Run the script under scripts/download_postgresql.sh to download postgresql-42.2.5.jar
-
Make sure your spark shell is loaded the Postgresql jar.
root@master:/usr/spark-2.1.0#GSC_JAR=$(ls /code/scripts/greenplum-spark_2.11-*.jar)
root@master:/usr/spark-2.1.0#POSTGRES_JAR=$(ls /code/scripts/postgresql-*.jar)
root@master:/usr/spark-2.1.0#spark-shell --jars "${GSC_JAR},${POSTGRES_JAR}" --driver-class-path ${POSTGRES_JAR}
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
- Determine the number of records in the "basictable" table by using psql command.
$ docker exec -it gpdbsne /bin/bash
[root@d632f535db87 data]# psql -h localhost -U gpadmin -d basic_db -c "select count(*) from basictable"
count
-------
18432
(1 row)
- Configure JDBC URL and connection Properties and use DataFrame write operation to write data from Spark into Greenplum. You can use different write mode
scala> :paste
// Entering paste mode (ctrl-D to finish)
val jdbcUrl = s"jdbc:postgresql://gpdbsne/basic_db?user=gpadmin&password=pivotal"
val connectionProperties = new java.util.Properties()
dataFrame.write.mode("Append") .jdbc( url = jdbcUrl, table = "basictable", connectionProperties = connectionProperties)
// Exiting paste mode, now interpreting.
- Verify the write operation is successful by exec into GPDB container and run psql command-line. The total number records in the Greenplum table must be 2x of the original data.
$ docker exec -it gpdbsne /bin/bash
[root@d632f535db87 data]# psql -h localhost -U gpadmin -d basic_db -c "select count(*) from basictable" -w pivotal
psql: warning: extra command-line argument "pivotal" ignored
count
-------
`18432`
(1 row)
- Next, you can write DataFrame data into an new Greenplum table via
append
mode.
scala>dataFrame.write.mode("Append") .jdbc( url = jdbcUrl, table = "NEWTable", connectionProperties = connectionProperties)
- Run psql commands to verify the new table with new records.
[root@d632f535db87 scripts]# psql -h localhost -U gpadmin -d basic_db -c "\dt"
List of relations
Schema | Name | Type | Owner
--------+-----------------------------+-------+---------
public | basictable | table | gpadmin
public | newtable | table | gpadmin
public | spark_7ac1947b17a17725_0_41 | table | gpadmin
public | spark_7ac1947b17a17725_0_42 | table | gpadmin
(4 rows)
[root@d632f535db87 data]# psql -h localhost -U gpadmin -d basic_db -c "select count(*) from newtable" -w pivotal
psql: warning: extra command-line argument "pivotal" ignored
count
-------
18432
(1 row)
In this section, you can write data from Spark DataFrame into Greenplum table. by using JDBC driver.
Pre-requisites:
-
Run the script under scripts/download_postgresql.sh to download postgresql-42.2.5.jar
-
Make sure your spark shell is loaded the Postgresql jar.
root@master:/usr/spark-2.1.0#GSC_JAR=$(ls /code/scripts/greenplum-spark_2.11-*.jar)
root@master:/usr/spark-2.1.0#POSTGRES_JAR=$(ls /code/scripts/postgresql-*.jar)
root@master:/usr/spark-2.1.0#spark-shell --jars "${GSC_JAR},${POSTGRES_JAR}" --driver-class-path ${POSTGRES_JAR}
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
- Determine the number of records in the "basictable" table by using psql command.
$ docker exec -it gpdbsne /bin/bash
[root@d632f535db87 data]# psql -h localhost -U gpadmin -d basic_db -c "select count(*) from basictable"
count
-------
18432
(1 row)
- Configure JDBC URL and connection Properties and use DataFrame write operation to write data from Spark into Greenplum. You can use different write mode
scala> :paste
// Entering paste mode (ctrl-D to finish)
val jdbcUrl = s"jdbc:postgresql://gpdbsne/basic_db?user=gpadmin&password=pivotal"
val connectionProperties = new java.util.Properties()
dataFrame.write.mode("Append") .jdbc( url = jdbcUrl, table = "basictable", connectionProperties = connectionProperties)
// Exiting paste mode, now interpreting.
- Verify the write operation is successful by exec into GPDB container and run psql command-line. The total number records in the Greenplum table must be 2x of the original data.
$ docker exec -it gpdbsne /bin/bash
[root@d632f535db87 data]# psql -h localhost -U gpadmin -d basic_db -c "select count(*) from basictable" -w pivotal
psql: warning: extra command-line argument "pivotal" ignored
count
-------
`18432`
(1 row)
- Next, you can write DataFrame data into an new Greenplum table via
append
mode.
scala>dataFrame.write.mode("Append") .jdbc( url = jdbcUrl, table = "NEWTable", connectionProperties = connectionProperties)
- Run psql commands to verify the new table with new records.
[root@d632f535db87 scripts]# psql -h localhost -U gpadmin -d basic_db -c "\dt"
List of relations
Schema | Name | Type | Owner
--------+-----------------------------+-------+---------
public | basictable | table | gpadmin
public | newtable | table | gpadmin
public | spark_7ac1947b17a17725_0_41 | table | gpadmin
public | spark_7ac1947b17a17725_0_42 | table | gpadmin
(4 rows)
[root@d632f535db87 data]# psql -h localhost -U gpadmin -d basic_db -c "select count(*) from newtable" -w pivotal
psql: warning: extra command-line argument "pivotal" ignored
count
-------
18432
(1 row)
Greenplum-Spark connector uses Greenplum gpfdist protocol to parallelize data transfer between Greenplum and Spark clusters. Therefore, this connector provides better read throughput, compared to typical JDBC driver.
For more details, see this link
MIT