Skip to content

Actions

Vicky Avison edited this page Aug 12, 2019 · 59 revisions

Waimak comes with a large set of predefined actions to help build complex Spark flows. You should be able to define the majority of your flows using combinations of actions found below.

You can find more complex and advance-use actions on the Advanced Actions page.

All of the actions on this page will be present on a SparkDataFlow object by default.

Contents

Action Concepts

The definitions of the actions are straightforward, however there are a couple of concepts to be aware of when using them:

Labels as Paths

A common concept in Waimak is to have labels form part of the underlying path when written to storage. For example, given tables table1 and table2 and base path of /tables it is advisable to write the data for the two tables out in the following structure:

table1: /tables/table1
table2: /tables/table2

Using this approach allows for multiple labels to be read or written using a single actions requiring only a common base path and list of labels to operate on.

You can override this behaviour when opening labels by using the openFileCSV and openFileParquet actions to open direct paths.

You can control the name of the output labels using in the paths when writing labels by using the alias action.

Immutable Labels

The data underneath each label is designed to be immutable (should not change), and each transformation on the data should produce a new label. Whilst it is technically possible to transform an existing label without producing a new label using Interceptors, their use should be restricted to debugging and specific use-cases. There is no performance penalty for defining multiple intermediary labels instead of a single label.

Snapshotting

The open and write actions use an optional concept of snapshotting, a simple folder structure concept used to update entire data-sets without losing previous data-set history.

For example, given a table table1, every time we write out the label we would use an increasing timestamp for the snapshot folder:

/path/to/tables/table1/snapshot_key=2018_02_12_10_59_21
/path/to/tables/table1/snapshot_key=2018_02_13_10_00_09

Using this technique we can update the data for a given label whilst leaving the option for rollback or exploring historic snapshots available. Snapshotting is supported by Opening Labels, Writing Labels and Committing labels with Commit and Push.

Label Output Prefix

As described above, labels of Datasets often form part of the path of the underlying data. Sometimes this can become an issue specifically when opening labels that have the same label but are stored in different data locations. For example, opening the following labels maybe cause issues due to duplicate labels depending on which open actions are used:

table1: /old_tables/table1
table1: /new_tables/table1

The table1 label will conflict in this case even though they reference different data locations.

To solve this problem, actions where labels conflicts could be possible also ask for an optional outputPrefix. This will put a prefix on all labels opened using this action, i.e. an output prefix of old and a label of table1 will produce a label of old_table1.

As outputPrefix is an option type you will have to specify it as a Some value by passing your optional string as: Some("prefix").

Note: An underscore is inserted between the output prefix and label.

Action Definitions

This sections details and shows examples of most of the preexisting actions within Waimak. Advanced Actions can be found on the Advanced Actions page.

Opening Labels

The actions in this section deal with opening Datasets on a flow. They take no input labels and produce output labels.

Opening CSVs

There are two actions for opening CSVs. In both cases you can provide an optional options Map to pass custom reader options to Spark (see DataFrameReader).

Opening a single label from a direct path or glob:

    /**
     * Open a CSV file based on a complete path
     *
     * @param path    Complete path of the CSV file(s) (can include glob)
     * @param label   Label to attach to the dataset
     * @param options Options for the DataFrameReader
     * @return
     */
   def openFileCSV(path: String,
                   label: String, 
                   options: Map[String, String] = Map("header" -> "true", "inferSchema" -> "true")): SparkDataFlow

For example, you would open a single CSV file with path /files/1.csv as label table1 using the flowing action invocation:

flow.openFileCSV("/files/1.csv", "table1")

And you would open all CSVs in folder /files as label allfiles but with a custom separator | by using the following action invocation:

flow.openFileCSV("/files/*.csv", "allfiles",  Map("header" -> "true", "inferSchema" -> "true", "sep" -> "|"))

Opening multiple CSVs as multiple labels using Labels as Paths:

    /**
      * Opens CSV folders as data sets.
      *
      * @param basePath       Base path of all the labels
      * @param snapshotFolder Optional snapshot folder below table folder
      * @param outputPrefix   Optional prefix to attach to the dataset label
      * @param options        Options for the DataFrameReader
      * @param labels         List of labels/folders to open
      * @return
      */
    def openCSV(basePath: String, 
                snapshotFolder: Option[String] = None,
                outputPrefix: Option[String] = None, 
                options: Map[String, String] = Map("header" -> "true", "inferSchema" -> "true"))
               (labels: String*): SparkDataFlow

This action takes an optional snapshot folder and label output prefix.

For example, to open two folders of CSV files /files/table1/ and /files/table2/ sharing the same base path /files as different labels you would use the following action invocation:

flow.openCSV("/files")("table1", "table2")

Opening Parquets

The actions for opening Parquet files are very similar to those of opening CSVs, hence there are also two actions for opening Parquet files. Again, in both cases you can provide an optional options Map to pass custom reader options to Spark (see DataFrameReader).

Opening a single label from a direct path or glob:

    /**
      * Open a Parquet path based on a complete path
      *
      * @param path    Complete path of the parquet file(s) (can include glob)
      * @param label   Label to attach to the dataset
      * @param options Options for the DataFrameReader
      * @return
      */
    def openFileParquet(path: String, label: String, options: Map[String, String] = Map()): SparkDataFlow

For example, you would open a single Parquet file with path /files/1.parquet as label table1 using the flowing action invocation:

flow.openFileParquet("/files/1.parquet", "table1")

And you would open all Parquets in folder /files as label allfiles and enabling schema merging by using the following action invocation:

flow.openFileParquet("/files/*.parquet", "allfiles",  Map("mergeSchema" -> "true"))

Opening multiple Parquets as multiple labels using Labels as Paths:

    /**
      * Opens parquet based folders using open(). See parent function for complete description.
      *
      * @param basePath       Base path of all the labels
      * @param snapshotFolder Optional snapshot folder below table folder
      * @param outputPrefix   Optional prefix to attach to the dataset label
      * @param options        Options for the DataFrameReader
      * @param labels         List of labels/folders to open
      * @return
      */
    def openParquet(basePath: String, 
                    snapshotFolder: Option[String] = None, 
                    outputPrefix: Option[String] = None, 
                    options: Map[String, String] = Map())
                   (labels: String*): SparkDataFlow

This action takes an optional snapshot folder and label output prefix.

For example, to open two folders of Parquet files /files/table1/ and /files/table2/ sharing the same base path /files as different labels you would use the following action invocation:

flow.openParquet("/files")("table1", "table2")

Opening Hive/Impala Tables

You can open a list of tables from a database in a Hive Metastore using the following action:

    /**
      * Opens multiple Hive/Impala tables. Table names become waimak lables, which can be prefixed.
      *
      * @param dbName       - name of the database that contains the table
      * @param outputPrefix - optional prefix for the waimak label
      * @param tables       - list of table names in Hive/Impala that will also become waimak labels
      * @return
      */
    def openTable(dbName: String, outputPrefix: Option[String] = None)(tables: String*): SparkDataFlow

The action takes a database name and an optional label output prefix. Note: Hive support must be enabled on the SparkSession for this action to work.

For example, to open tables table1 and table2 in database test_data you would use the following action invocation:

flow.openTable("test_data")("table1", "table2")

Generic Open Actions

There are two similar generic open actions, that allow you to open Datasets in non-standard ways.

Opening a Label using the SparkFlowContext:

    /**
      * A generic action to open a dataset with a given label by providing a function that maps from
      * a [[SparkFlowContext]] object to a Dataset.
      * In most cases the user should use a more specialised open function
      *
      * @param label Label of the resulting dataset
      * @param open  Function that maps from a [[SparkFlowContext]] object to a Dataset.
      * @return
      */
    def open(label: String, open: SparkFlowContext => Dataset[_]): SparkDataFlow

The SparkFlowContext is a simple class that provides both a SparkSession and a FileSystem object. Hence, this action lets you open a Dataset with a given label by providing any function that can take a SparkSession and produce a Dataset. With this you should be able to use any standard Spark or external Spark read functions to produce a Dataset.

For example, to open a list of Integers as a Dataset you could use the following action invocation:

      def listToDS(input: List[Int]): SparkFlowContext => Dataset[_] = {
        ctx =>
          import ctx.spark.implicits._
          input.toDF("col1")
      }

      emptyFlow.open("listdata", listToDS(List(1, 2, 3)))

Opening a Label using a DataFrameReader

    /**
      * A generic action to open a dataset with a given label by providing a function that maps from
      * a DataFrameReader object to a Dataset.
      * In most cases the user should use a more specialised open function
      *
      * @param label Label of the resulting dataset
      * @param open  Function that maps from a DataFrameReader object to a Dataset.
      * @return
      */
    def open(label: String, open: DataFrameReader => Dataset[_], options: Map[String, String]): SparkDataFlow

A slightly less generic but neater option would be to use the this action that creates a DataFrameReader object with pre-applied options from which you can open a Dataset.

For example, this action can be used to open json files using the Spark API:

      val jsonReader: DataFrameReader => Dataset[_] = _.json("/input/file.json")
      emptyFlow.open("jsondata", jsonReader, Map.empty)

Transforming Labels

Transformation actions take one or more input labels and produce a single output label.

Generic Transform Actions

The most common and useful transformation actions are the generic transformation actions, allowing you to transform any number of input Datasets into a single Dataset using Spark Dataset functions:

    /**
      * Transforms 1 input DataSet to 1 output DataSet using function f, which is a scala function.
      *
      */
    def transform(a: String)(output: String)(f: Dataset[_] => Dataset[_]): SparkDataFlow

    // There are 12 transform actions, from one input label to twelve input labels

    /**
      * Transforms 12 input DataSets to 1 output DataSet using function f, which is a scala function.
      */
    def transform(a: String, b: String, c: String, d: String, e: String, g: String, h: String, i: String, k: String, l: String, n: String, o: String)
                 (output: String)
                 (f: (Dataset[_], Dataset[_], Dataset[_], Dataset[_], Dataset[_], Dataset[_], Dataset[_], Dataset[_], Dataset[_], Dataset[_], Dataset[_], Dataset[_]) => Dataset[_]): SparkDataFlow

Note: Even though are only two transform actions listed here there are twelve transform actions available, taking from one input label to twelve input labels with each producing a single output label.

The transform actions take the series of input labels, an output label and a function with the same number of arguments as input labels. The ordering of the input labels in the action will be the same as the underlying Dataset objects in the transforming function f.

For example, to join two tables table1 and table2 on column column1 and produce a joined table joinedtable you would do the following:

flow.transform("table1", "table2")("joinedtable"){
  (table1, table2) => table1.join(table2, "column1")
}

Spark SQL Action

You can use the sql action to execute Spark SQL against any number of input labels to produce an output label:

    /**
      * Executes Spark sql. All input labels are automatically registered as sql tables.
      *
      * @param inputs      - required input labels
      * @param sqlQuery    - sql code that uses labels as table names
      * @param outputLabel - label of the output transformation
      * @param dropColumns - optional list of columns to drop after transformation
      * @return
      */
    def sql(input: String, inputs: String*)(outputLabel: String, sqlQuery: String, dropColumns: String*): SparkDataFlow

The sql action takes any number of input labels, an output label, an SQL query to execute and an optional list of columns to drop from the final output Dataset. The input labels will become the table names in the SQL query.

For example, to join two tables table1 and table2 on column column1 and produce a joined table joinedtable you would do the following:

flow.sql("table1", "table2")("joinedtable", "SELECT * FROM table1 t1 INNER JOIN table2 t2 ON t1.column1 = t2.column1")

Alias Label Action

You can use the alias action to create an alias for an existing label without changing the underlying Dataset. This is useful if you want to control the name of the output label written out:

    /**
      * Creates an alias for an existing label, it will point to the same DataSet. This can be used when reading table
      * with one name and saving it with another without any transformations.
      *
      * @param from
      * @param to
      * @return
      */
    def alias(from: String, to: String): SparkDataFlow

For example, to alias the label joinedtable as processoutput so it can be written out as this label you would use the following action invocation:

flow.alias("joinedtable", "processoutput")

Partition Sort Action

A convenience action to assist with reducing the number of small files written out by a label, the partition sort action will repartition a label by a list of columns and optionally sort by a set of columns within those partitions:

     /**
      * Before writing out data with partition folders, to avoid lots of small files in each folder, DataSet needs
      * to be reshuffled. Optionally it can be sorted as well within each partition.
      *
      * This also can be used if you need to solve problem with Secondary Sort, use mapPartitions on the output.
      *
      * @param input
      * @param output
      * @param partitionCol - columns to repartition/shuffle input data set
      * @param sortCols     - optional sort withing partition columns
      * @return
      */
    def partitionSort(input: String, output: String)(partitionCol: String*)(sortCols: String*): SparkDataFlow

Debugging Labels

There are a set of actions that can be useful when debugging and inspecting labels during development and testing. The actions take input labels and do not produce an output label.

Show Action

The show action takes a label and calls the Spark show function on the underlying Dataset. The first 10 lines of the Dataset will be printed to the console.

    /**
      * Adds actions that prints to console first 10 lines of the input. Useful for debug and development purposes.
      *
      * @param label
      * @return
      */
    def show(label: String): SparkDataFlow

The usage of show is straightforward:

flow.show("table1")

Print Schema Action

The print schema action is similar to the show action, however it will print the underlying schema of a label's Dataset to the console:

    /**
      * Prints DataSet's schema to console.
      *
      * @param label
      * @return
      */
    def printSchema(label: String): SparkDataFlow

The usage of printSchema is also straightforward:

flow.printSchema("table1")

Debug as Table Action

The debug as table action registers all labels given in the action as Spark temporary views so they are available to query through Spark SQL after a flow has executed. This debug action is particularly useful in notebook applications like Zeppelin where Spark SQL queries are visualised.

    /**
      * In Zeppelin it is easier to debug and visualise data as Spark SQL tables. This action does no data transformations,
      * it only marks labels as SQL tables.
      *
      * @param labels - labels to mark.
      * @return
      */
    def debugAsTable(labels: String*): SparkDataFlow

For example, labels table1 and table2 can be marked as debug as table:

flow.debugAsTable("table1", "table2")

They can then be queried on the SparkSession after the flow has executed:

spark.sql("select * from table1 limit 10")

Intercepting Labels

Interceptor actions are a set of actions that mutably alter the state of a label without producing a new label. These actions go against the Immutable Labels concept and it is advisable not to use these actions to apply business logic transformations, but they are often useful in the following scenarios:

  • Debugging and development: It is often useful to intercept specific labels during development and debugging e.g. to do a limit to make record counts more manageable
  • Performance Optimisation: Interceptor actions can be used to inject specific performance optimisations for certain labels e.g. a repartition before writing out a Dataset or a persist on a commonly used label

The interceptor actions can be added to the flow at any point in the flow and do not need to be next to or after the label the action intercepts. This allows you to keep your business logic specific flow code and optimisation code separate.

Note: Transform interceptors are always performed on a label before cache interceptors on the same label regardless of the order the actions were declared on the flow.

In-place Transform Action

In-place transform interceptor actions are used to change the state of the Dataset for a given label by applying a Spark transformation. As mentioned above, it is not advisable to use these actions to do business logic transformation, but they can be useful for debugging, development and apply performance optimisations:

    /**
      * Applies a transformation to the label's data set and replaces it.
      *
      * Multiple intercept actions can be chained. Like post -> post -> write.
      *
      * @param label
      * @param post
      * @return
      */
    def inPlaceTransform(label: String)(post: Dataset[_] => Dataset[_]): SparkDataFlow

For example, in the case of transforming label table1 and then applying an in-place transformation to reduce the volume you would do:

flow.transform("table1")("prunedtable")(_.select("column1"))
    .inPlaceTransform("prunedtable")(_.limit(10))

In this case, the resulting Dataset for label prunedtable would look like:

table1Dataset.select("column1").limit(10)

Spark Cache Action

The spark cache actions are used to intercept a label and cache it using Spark's in-built caching mechanism.

There are two spark cache actions, one for caching multiple labels and another for single labels which opens up more configuration options. They are defined as follows:

    /**
      * Cache a single label using Spark's in-built caching mechanism
      *
      * @param label        the label to cache
      * @param partitions   optionally, the number of partitions to partition the dataset by before caching (will invoke a `.repartition` call)
      * @param storageLevel the `StorageLevel` to use
      */
    def sparkCacheSingle(label: String, partitions: Option[Int] = None, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK): SparkDataFlow = {
      CacheMetadataExtension.addSparkCache(sparkDataFlow, label, partitions, storageLevel)
    }

    /**
      * Cache multiple labels using using Spark's in-built caching mechanism
      *
      * @param labels - list of labels to cache
      */
    def sparkCache(labels: String*): SparkDataFlow = {
      if (labels.isEmpty) throw new DataFlowException(s"At least one label must be specified for sparkCache")

      labels.foldLeft(sparkDataFlow) { (flow, l) => flow.sparkCacheSingle(l) }
    }

Cache as Parquet Action

The cache as Parquet actions are used to intercept a label and force it to be written to a temporary folder in Parquet (the temporary folder is specified when a new flow is created). The Parquet files are read back and the resulting Dataset is assigned to the intercepted label.

These actions have the main purposes simulating the behaviour of Spark's persist to disk. In some cases, in flows with large amounts of forking and column pruning Spark will attempt to cache a pruned version of the Dataset. If columns from the Dataset are needed further in the flow that were pruned from the cached Dataset the Dataset would need to be recomputed.

The main use-case of these actions is very specific, however if you are seeing the same Dataset computed multiple times on a flow and Spark persistence does not prevent the re-computation then these actions may benefit you.

There are two cache as Parquet functions, one for Datasets and one for partitioned Datasets, and they are defined as:

    /**
      * Creates a persistent snapshot into the staging folder of the spark data flow and substitutes the dataset
      * behind the label with the one opened from the stored version.
      *
      * It will not trigger for labels whose datasets are empty.
      *
      * @param labels - list of labels to snapshot
      * @return
      */
    def cacheAsParquet(labels: String*): SparkDataFlow

    /**
      * Creates a persistent snapshot into the staging folder of the spark data flow and substitutes the dataset
      * behind the label with the one opened from the stored version.
      *
      * It will not trigger for labels whose datasets are empty.
      *
      * @param labels - list of labels to snapshot
      * @return
      */
    def cacheAsPartitionedParquet(partitions: Seq[String], repartition: Boolean = true)(labels: String*): SparkDataFlow

For example, you can use the action to cache an existing label prunedtable:

flow.transform("table1")("prunedtable")(_.select("column1"))
    .cacheAsParquet("prunedtable")

Writing Labels

The actions in this section deal with writing Datasets on a flow. They take one or more input labels and produce no output labels.

For actions used to cleanly commit labels in a production environment, see Committing labels with Commit and Push on the Advanced Actions page.

Writing Hive Tables

For creating simple Hive tables from labels the writeHiveManagedTable can be used. Underneath, this action uses the Spark saveAsTable function with the data being written out to the default hive warehouse location as specified in the hive-site configuration.

    /**
      * Writes out the dataset to a Hive-managed table. Data will be written out to the default hive warehouse
      * location as specified in the hive-site configuration.
      * Table metadata is generated from the dataset schema, and tables and schemas can be overwritten by setting
      * the optional overwrite flag to true.
      *
      * It is recommended to only use this action in non-production flows as it offers no mechanism for managing
      * snapshots or cleanly committing table definitions.
      *
      * @param database  - Hive database to create the table in
      * @param overwrite - Whether to overwrite existing data and recreate table schemas if they already exist
      * @param labels    - List of labels to create as Hive tables. They will all be created in the same database
      * @return
      */
    def writeHiveManagedTable(database: String, overwrite: Boolean = false)(labels: String*): SparkDataFlow

This action takes in a database in which all the tables will be created, an optional overwrite boolean used to allow tables to be recreated if they already exist and a list of labels that will be created as Hive tables.

For example, to create tables table1 and table2 in the database test_db, you would like the action like the following:

flow.writeHiveManagedTable("test_db")("table1", "table2")

It is recommended to only use this action in non-production flows as it offers no mechanism for managing snapshots or cleanly committing table definitions.

Writing CSVs

There are two actions for writing CSV files, one for writing several labels and one for writing a single partitioned CSV. Both actions allow you to provide an optional options Map to pass custom writer options to Spark (see DataFrameWriter).

Writing non-partitioned CSVs:

    /**
      * Writes out data set as csv.
      *
      * @param basePath  - path in which folders will be created
      * @param labels    - labels whose data set will be written out
      * @param options   - list of options to apply to the dataframewriter
      * @param overwrite - whether to overwrite existing data
      * @param numFiles  - number of files to produce as output
      * @return
      */
    def writeCSV(basePath: String, options: Map[String, String] = Map.empty, overwrite: Boolean = false, numFiles: Option[Int] = Some(1))(labels: String*): SparkDataFlow

This action takes a base path to write all labels into, a list of labels to write out and a list of optional parameters including a Map of Spark writer options, a boolean parameter to allow overwriting of existing data and an Option parameter allowing you to control how many individual CSV files are generated per label.

For example, to write the labels table1 and table2 into the path /output/ you would use the following action invocation:

flow.writeCSV("/output")("table1", "table2)

The resulting folder structure would look like:

table1: /output/table1
table2: /output/table2

Writing partitioned CSVs:

    /**
      * Writes out data set as csv, can have partitioned columns.
      *
      * @param basePath         - base path of the label, label will be added to it
      * @param repartition      - repartition dataframe on partition columns
      * @param options          - list of options to apply to the dataframewriter
      * @param label            - label whose data set will be written out
      * @param partitionColumns - optional list of partition columns, which will become partition folders
      * @return
      */
    def writePartitionedCSV(basePath: String, repartition: Boolean = true, options: Map[String, String] = Map.empty)(label: String, partitionColumns: String*): SparkDataFlow

The write partitioned CSV action requires a base path to write the label into, the label to write, a list of partition columns to partition the resulting CSV by and three optional parameters including a flag to force a repartition of the Dataset before it is written, a flag to overwrite existing label data and a Map of options to apply to the CSV writer.

For example, to write the label table1 as a CSV partitioned by column1 into the base path /output/ you would use the following action invocation:

flow.writePartitionedCSV("/output")("table1", "column1")

The resulting folder structure would look like:

table1: /output/table1/column1=.../

Writing Parquet Files

Like the write CSV actions, there are two actions for writing Parquet files: one for non-partitioned outputs and one for partitioned outputs:

Writing non-partitioned Parquet files:

    /**
      * Writes multiple datasets as parquet files into basePath. Names of the labels will become names of the folders
      * under the basePath.
      *
      * @param basePath  - path in which folders will be created
      * @param overwrite - if true than overwrite the existing data. By default it is false
      * @param labels    - labels to write as parquets, labels will become folder names
      * @return
      */
    def writeParquet(basePath: String, overwrite: Boolean = false)(labels: String*): SparkDataFlow

The write Parquet action required a base path for labels to be written in, a list of labels to write a Parquet and an optional flag to allow label outputs to be overwritten.

For example, to write labels table1 and table2 into base path /output/ you would use the following action invocation:

flow.writeParquet("/output")("table1", "table2")

The resulting folder structure would look like:

table1: /output/table1
table2: /output/table2

Writing partitioned Parquet files:

There are two ways to write out partitioned parquet files: one to set partition columns and another to repartition into a set number of partitions without columns.

  • The first is defined in the following way:
    /**
      * Writes out data set as parquet, can have partitioned columns.
      *
      * @param label            - label whose data set will be written out
      * @param repartition      - repartition dataframe on partition columns
      * @param basePath         - base path of the label, label will be added to it
      * @param partitionColumns - optional list of partition columns, which will become partition folders
      * @return
      */
    def writePartitionedParquet(basePath: String, repartition: Boolean = true)(label: String, partitionColumns: String*): SparkDataFlow

The write partitioned Parquet action takes a base path to write a label into, a label to write, a list of partitions to partition the output by and an optional flag to repartition the Dataset before it is written.

For example, to write label table1 partitioned by column column1 into folder /output/ you would use the following action invocation:

flow.writePartitionedParquet("/output")("table1", "column1")

The resulting folder structure would look like:

table1: /output/table1/column1=.../
  • The second is defined in the following way:
    /**
      * Writes out data set as parquet, repartitioned into a given number of partitions.
      *
      * @param label       - label whose data set will be written out
      * @param repartition - repartition dataframe by a number of partitions
      * @param basePath    - base path of the label, label will be added to it
      * @return
      */
    def writePartitionedParquet(basePath: String, repartition: Int)(label: String): SparkDataFlow

The write partitioned Parquet action takes a base path to write a label into, an integer to set the number of partitions to repartition the dataset by before it is written, and a label to write.

For example, to write label table1 repartitioned into 3 partitions into folder /output/ you would use the following action invocation:

flow.writePartitionedParquet("/output", 3)("table1")

The resulting folder structure would look like:

table1: /output/table1/

and would contain three parquet files.

Writing Named Files

In all of the write actions above the resulting files will be produced with a Spark-specific file name (e.g. part-00000-33b48309-0442-4e86-870f-f3070268107f-c000.snappy.parquet).

However, you can use the writeAsNamedFiles action to control the output filename of the produced files.

The writeAsNamedFiles action is defined as:

    /**
      * Write a file or files with a specific filename to a folder.
      * Allows you to control the final output filename without the Spark-generated part UUIDs.
      * Filename will be `$filenamePrefix.extension` if number of files is 1, otherwise
      * `$filenamePrefix.$fileNumber.extension` where file number is incremental and zero-padded.
      *
      * @param label          Label to write
      * @param basePath       Base path to write to
      * @param numberOfFiles  Number of files to generate
      * @param filenamePrefix Prefix of name of the file up to the filenumber and extension
      * @param format         Format to write (e.g. parquet, csv)
      *                       Default: parquet
      * @param options        Options to pass to the [[DataFrameWriter]]
      *                       Default: Empty map
      */
    def writeAsNamedFiles(label: String, basePath: String, numberOfFiles: Int, filenamePrefix: String, format: String = "parquet", options: Map[String, String] = Map.empty): SparkDataFlow

It takes a label to generate files for, a path in which the files will be written, the number of files to produce (label will be repartitioned by this value), a filename prefix which is the prefix before the file number and extension, an optional format which by default is "parquet", and an optional map of options passed to the DataFrameWriter.

The files will be produced with a name of $filenamePrefix.extension if only one file is generated, or $filenamePrefix.$fileNumber.extension where $fileNumber is sequential if multiple files are generated. The extension will include both the file output format and any compression extension if compression was used (e.g. .snappy.parquet or .csv.gz).

For example, to generate a single parquet file called result from label table1 into the folder /output you would use the following action invocation:

flow.writeAsNamedFiles("table1", "/output", 1, "result")

Which would produce the file /output/result.parquet or /output/result.snappy.parquet depending on whether compression was enabled.

To generate two compressed CSV files with headers called result from label table1 into folder /output you would use the following action invocation:

flow.writeAsNamedFiles("table1", "/output", 2, "result", "csv", Map("header" -> "true", "compression" -> "gzip"))

Which would produce the following files:

/output/result.1.csv.gz
/output/result.2.csv.gz

Generic Write Action

There is a generic write action you can use to write labels when the provided more specific write actions do not fit your use case:

    /**
      * Base function for all write operation on current data flow, in most of the cases users should use more specialised one.
      *
      * @param label - label whose data set will be written out
      * @param pre   - dataset transformation function
      * @param dfr   - dataframe writer function
      */
    def write(label: String, pre: Dataset[_] => Dataset[_], dfr: DataFrameWriter[_] => Unit): SparkDataFlow

The write action takes a label to write, a function that defines a pre-write transformation on the Dataset and a function that defines the write operation on the Spark DataFrameWriter.

You should be able to express the majority of non-standard write operations using this action.

For example, to write 10 records from the label table1 as JSON to path /output/table1 you would use the following action invocation:

flow.write("table1", _.limit(10), _.json("/output/table1"))
Clone this wiki locally