Skip to content

Latest commit

 

History

History
391 lines (272 loc) · 11.8 KB

spark-sql-sparksession.adoc

File metadata and controls

391 lines (272 loc) · 11.8 KB

SparkSession — Entry Point to Datasets

SparkSession is the entry point to developing Spark applications using the Dataset and (less preferred these days) DataFrame APIs.

You should use the builder method to create an instance of SparkSession.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
  .master("local[*]")
  .appName("My Spark Application")
  .getOrCreate()

The private more direct API to create a SparkSession requires a SparkContext and an optional SharedState (that represents the shared state across SparkSession instances).

Note

SparkSession has replaced SQLContext as of Spark 2.0.0.

Implicits — SparkSession.implicits

The implicits object is a helper class with methods to convert objects to Datasets and DataFrames, and also comes with many Encoders for "primitive" types as well as the collections thereof.

Note

Import the implicits by import spark.implicits._ as follows:

val spark = SparkSession.builder.getOrCreate()
import spark.implicits._

It holds Encoders for Scala "primitive" types like Int, Double, String, and their products and collections.

It offers support for creating Dataset from RDD of any type (for which an encoder exists in scope), or case classes or tuples, and Seq.

It also offers conversions from Scala’s Symbol or $ to Column.

It also offers conversions from RDD or Seq of Product types (e.g. case classes or tuples) to DataFrame. It has direct conversions from RDD of Int, Long and String to DataFrame with a single column name _1.

Note
It is not possible to call toDF methods on RDD objects of "primitive" types but Int, Long, and String.

readStream

readStream: DataStreamReader

readStream returns a new DataStreamReader.

emptyDataset

emptyDataset[T: Encoder]: Dataset[T]

emptyDataset creates an empty Dataset (assuming that future records being of type T).

scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]

scala> strings.printSchema
root
 |-- value: string (nullable = true)

The LogicalPlan is LocalRelation.

createDataset methods

createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset[T : Encoder](data: RDD[T]): Dataset[T]

createDataset creates a Dataset from the local collection or the distributed RDD.

scala> val nums = spark.createDataset(1 to 5)
nums: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> nums.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
+-----+

The LogicalPlan is LocalRelation (for the input data collection) or LogicalRDD (for the input RDD[T]).

Creating Dataset With Single Long Column (range methods)

range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]

range family of methods create a Dataset of Long numbers.

scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
|  0|
|  2|
+---+
Note
The three first variants (that do not specify numPartitions explicitly) use SparkContext.defaultParallelism for the number of partitions numPartitions.

Internally, range creates a new Dataset[Long] with Range logical plan and Encoders.LONG encoder.

emptyDataFrame

emptyDataFrame: DataFrame

emptyDataFrame creates an empty DataFrame (with no rows and columns).

It calls createDataFrame with an empty RDD[Row] and an empty schema StructType(Nil).

createDataFrame method

createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

createDataFrame creates a DataFrame using RDD[Row] and the input schema. It is assumed that the rows in rowRDD all match the schema.

streams Attribute

streams: StreamingQueryManager

streams attribute gives access to StreamingQueryManager (through SessionState).

val spark: SparkSession = ...
spark.streams.active.foreach(println)

udf Attribute

udf: UDFRegistration

udf attribute gives access to UDFRegistration that allows registering user-defined functions for SQL queries.

val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)

val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")

scala> sql("select myUpper(value) from strs").show
+----------+
|UDF(value)|
+----------+
|         A|
|         B|
|         C|
+----------+

Internally, it uses SessionState.udf.

catalog Attribute

catalog attribute is an interface to the current catalog (of databases, tables, functions, table columns, and temporary views).

scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
|              name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default|       null|  MANAGED|      false|
|              strs|    null|       null|TEMPORARY|       true|
+------------------+--------+-----------+---------+-----------+

table method

table(tableName: String): DataFrame

table creates a DataFrame from records in the tableName table (if exists).

val df = spark.table("mytable")

streamingQueryManager Attribute

streamingQueryManager is…​

listenerManager Attribute

listenerManager is…​

ExecutionListenerManager

ExecutionListenerManager is…​

functionRegistry Attribute

functionRegistry is…​

experimentalMethods Attribute

experimentalMethods is…​

newSession method

newSession(): SparkSession

newSession creates (starts) a new SparkSession (with the current SparkContext and SharedState).

scala> println(sc.version)
2.0.0-SNAPSHOT

scala> val newSession = spark.newSession
newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a

sharedState Attribute

sharedState points at the current SharedState.

SharedState

SharedState represents the shared state across all active SQL sessions (i.e. SparkSession instances) by sharing CacheManager, SQLListener, and ExternalCatalog.

There are two implementations of SharedState:

  • org.apache.spark.sql.internal.SharedState (default)

  • org.apache.spark.sql.hive.HiveSharedState

You can select SharedState for the active SparkSession using spark.sql.catalogImplementation setting.

SharedState is created lazily, i.e. when first accessed after SparkSession is created. It can happen when a new session is created or when the shared services are accessed. It is created with a SparkContext.

Creating SparkSession Instance

Caution
FIXME

Creating Datasets (createDataset methods)

createDataset[T: Encoder](data: Seq[T]): Dataset[T]
createDataset[T: Encoder](data: RDD[T]): Dataset[T]

// For Java
createDataset[T: Encoder](data: java.util.List[T]): Dataset[T]

createDataset is an experimental API to create a Dataset from a local Scala collection, i.e. Seq[T] or Java’s List[T], or an RDD[T].

val ints = spark.createDataset(0 to 9)
Note
You’d rather not be using createDataset since you have the Scala implicits and toDS method.

Accessing DataFrameReader (read method)

read: DataFrameReader

read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.

val spark: SparkSession = // create instance
val dfReader: DataFrameReader = spark.read

Runtime Configuration (conf attribute)

conf: RuntimeConfig

conf returns the current runtime configuration (as RuntimeConfig) that wraps SQLConf.

Caution
FIXME

sessionState

sessionState is a transient lazy value that represents the current SessionState.

sessionState is a lazily-created value based on the internal spark.sql.catalogImplementation setting that can be:

  • org.apache.spark.sql.hive.HiveSessionState when the setting is hive

  • org.apache.spark.sql.internal.SessionState for in-memory.

Executing SQL (sql method)

sql(sqlText: String): DataFrame

sql executes the sqlText SQL statement.

scala> sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]

scala> sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []

// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")

scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata|      false|
+---------+-----------+

Internally, it creates a Dataset using the current SparkSession and the plan (based on the input sqlText and parsed using ParserInterface.parsePlan available using sessionState.sqlParser).

Caution
FIXME See Executing SQL Queries.

Creating SessionBuilder (builder method)

builder(): Builder

SessionBuilder.builder method creates a new SparkSession.Builder to build a SparkSession off it using a fluent API.

import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder

Settings

spark.sql.catalogImplementation

spark.sql.catalogImplementation (default: in-memory) is an internal setting with two possible values: hive and in-memory.