diff --git a/connectors/flink/README.md b/connectors/flink/README.md index 30bd38bf2b5..0ea6318ad51 100644 --- a/connectors/flink/README.md +++ b/connectors/flink/README.md @@ -16,6 +16,7 @@ Official Delta Lake connector for [Apache Flink](https://flink.apache.org/). - [Bounded Mode](#bounded-mode) - [Continuous Mode](#continuous-mode) - [Examples](#delta-source-examples) +- [SQL Support](#sql-support) - [Usage](#usage) - [Maven](#maven) - [SBT](#sbt) @@ -41,6 +42,7 @@ Depending on the version of the connector you can use it with following Apache F | 0.4.x (Sink Only) | 1.12.0 <= X <= 1.14.5 | | 0.5.0 | 1.13.0 <= X <= 1.13.6 | | 0.6.0 | X >= 1.15.3 | +| 2.5.0 | X >= 1.16.1 | ### APIs @@ -48,8 +50,6 @@ See the [Java API docs](https://delta-io.github.io/connectors/latest/delta-flink ### Known limitations -- The current version only supports Flink `Datastream` API. Support for Flink Table API / SQL, along with Flink Catalog's implementation for storing Delta table's metadata in an external metastore, are planned to be added in a future release. -- For GCP Object Storage, the current version only supports reading. Writing to GCP Object Storage is not supported. This is due to Flink not supporting recoverable writes to GCS, which was added in Flink [1.15](https://issues.apache.org/jira/browse/FLINK-11838). - For Azure Blob Storage, the current version only supports reading. Writing to Azure Blob Storage is not supported by Flink due to [issue](https://issues.apache.org/jira/browse/FLINK-17444) with class shading and will probably be added along with [Azure Data Lake Store Gen 2 support](https://issues.apache.org/jira/browse/FLINK-18568). - For AWS S3 storage, in order to ensure concurrent transactional writes from different clusters, use [multi-cluster configuration guidelines](https://docs.delta.io/latest/delta-storage.html#multi-cluster-setup). Please see [example](#3-sink-creation-with-multi-cluster-support-for-delta-standalone) for how to use this configuration in Flink Delta Sink. @@ -311,27 +311,380 @@ public DataStream createContinuousDeltaSourceUserColumns( } ``` +## SQL Support +Starting from version 2.5.0 the Delta connector can be used for Flink SQL jobs. +Both Delta Source and Delta Sink can be used as Flink Tables for SELECT and INSERT queries. + +Flink/Delta SQL connector **must** be used with Delta Catalog. Trying to execute SQL queries on Delta table +using Flink API without Delta Catalog configured will cause SQL job to fail. + +| Feature support | Notes | +|------------------------------------------------|-----------------------------------------------------------------------------------------| +| [CREATE CATALOG](#delta-catalog-configuration) | A Delta Catalog is required for Delta Flink SQL support. | +| [CREATE DATABASE](#create-database) | | +| [CREATE TABLE](#create-table) | | +| [CREATE TABLE LIKE](#create-table-like) | | +| [ALTER TABLE](#alter-table) | Support only altering table properties; column and partition changes are not supported. | +| [DROP TABLE](#drop-table) | Remove data from metastore leaving Delta table files on filesystem untouched. | +| [SQL SELECT](#select-query) | Supports both batch (default) and streaming modes. | +| [SQL INSERT](#insert-query) | Support both streaming and batch mode. | + +### Delta Catalog +The delta log is the source of truth for Delta tables, and the Delta Catalog is the only +Flink catalog implementation that enforces this. +It is required for every interaction with Delta tables via the Flink SQL API. If you attempt to use +any other catalog other than the Delta Catalog, your SQL query will fail. + +At the same time, any other Flink connector (Kafka, Filesystem etc.) can be used with Delta Catalog +(so long as it doesn't have any restrictions of its own). This is achieved by Delta Catalog acting +as a proxy for non-Delta tables. + +For Delta tables, however, the Delta Catalog ensures that any DDL operation is reflected in the +underlying Delta table. In other words, the Delta Catalog ensures that only valid Delta tables +can be created and used by Flink job. + +#### Decorated catalog +Delta Catalog acts as a wrapper around other [Catalog](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/) implementation. +Currently, we support `in-memory` and `hive` decorated catalogs. +The `in-memory` type is ephemeral and does not persist any data in external metastore. This means that +its bounded only to single session. + +The `hive` type is based on Flink's Hive catalog where metadata is persistence external Hive metastore. +In this case, tables defined by user A can be used by user B. + +For Delta tables, only minimum information such as database/table name, connector type +and delta table file path will be stored in the metastore. +For Delta tables no information about table properties or schema will be stored in the metastore. +Delta Catalog will store those in `_delta_log`. + +#### Delta Catalog Configuration +A catalog is created and named by executing the following query: +```sql +CREATE CATALOG WITH ( + 'type' = 'delta-catalog', + 'catalog-type' = '', + '' = '', + '' = '' +); +USE CATALOG ; +``` +Replace `` with your catalog's name. +Replace `` with the Catalog implementation type that you want to use as the decorated catalog. +Currently, only `in-memory` (default) and `hive` decorated catalogs are supported. + +The following properties can be set: ++ `type` - must be `delta-catalog`. This option is required by Flink. ++ `catalog-type` - an optional option that allows to specify type of decorated catalog. Allowed options are: + + `in-memory`- a default value if no other specified. Will use Flink's In-Memory catalog as decorated catalog. + + `hive` - Use Flink's Hive catalog as decorated catalog. + +Any extra defined property will be passed to the decorated catalog. + +#### Hive Metastore +Delta Catalog backed by Hive catalog and use Hive's catalog `hadoop-conf-dir` option call the below query: +```sql +CREATE CATALOG WITH ( + 'type' = 'delta-catalog', + 'catalog-type' = 'hive', + 'hadoop-conf-dir' = '' +); +USE CATALOG ; +``` + +The logic for resolving configuration from `hadoop-conf-dir` depends on [Flink Hive Catalog implementation](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/hive_catalog/). +It is expected by Flink Hive catalog, that `hadoop-conf-dir` will contain at least one of the files: +- core-site.xml +- hdfs-site.xml +- yarn-site.xml +- mapred-site.xml + +The exact list of properties that have to be included in the configuration files depends on your +Hive metastore endpoint/server. The minimum configuration that can be stored in `core-site.xml` file is presented below: +```xml + + + hive.metastore.uris + thrift://hive-metastore:9083 + IP address (or fully-qualified domain name) and port of the metastore host + + +``` +The `hive-metastore` should be resolved to IP address of hive metastore service. + +In order to use Hive Catalog with Flink cluster, an additional [Flink cluster configuration](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/overview/#dependencies) +is required. in a nutshell, it is required to add `flink-sql-connector-hive-x-x-x.jar` file to Flink's lib folder and +provide Hadoop dependencies, by setting the HADOOP_CLASSPATH environment variable: +`export HADOOP_CLASSPATH='hadoop classpath'`. + +Our connector was tested with: +- `flink-sql-connector-hive-2.3.9_2.12-1.16.0.jar` +- Hive 2.3.2 metastore +- Hadoop 3.3.2 + +It is recommended to use Hadoop version 3.3.2. When using version prior to 3.3.2 we have encountered many issues regarding incompatible +class definitions between Flink fs, Flink Hive connector and Delta Standalone. +For this moment, no tests were conducted for Hadoop version > 3.3.2. + +Examples of issues caused by incompatible Hadoop version (`HADOOP_CLASSPATH` env or hadoop dependency in Flink job pom.xml) +while deploying Flink Delta SQL jobs: + +``` +Caused by: java.lang.IllegalArgumentException: +Cannot invoke org.apache.commons.configuration2.AbstractConfiguration.setListDelimiterHandler on bean class +'class org.apache.commons.configuration2.PropertiesConfiguration' - argument type mismatch - +had objects of type "org.apache.commons.configuration2.convert.DefaultListDelimiterHandler" +but expected signature "org.apache.commons.configuration2.convert.ListDelimiterHandler" +``` + +``` +Caused by: java.lang.LinkageError: loader constraint violation: when resolving method 'void org.apache.hadoop.util.SemaphoredDelegatingExecutor.(com.google.common.util.concurrent.ListeningExecutorService, int, boolean)' the class loader org.apache.flink.util.ChildFirstClassLoader @2486925f of the current class, org/apache/hadoop/fs/s3a/S3AFileSystem, and the class loader 'app' for the method's defining class, org/apache/hadoop/util/SemaphoredDelegatingExecutor, have different Class objects for the type com/google/common/util/concurrent/ListeningExecutorService used in the signature (org.apache.hadoop.fs.s3a.S3AFileSystem is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2486925f, parent loader 'app'; org.apache.hadoop.util.SemaphoredDelegatingExecutor is in unnamed module of loader 'app') + at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769) + at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118) + at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098) + at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987) + at io.delta.storage.S3SingleDriverLogStore.write(S3SingleDriverLogStore.java:299) +``` + +``` +java.lang.NoSuchMethodError: org.apache.hadoop.util.SemaphoredDelegatingExecutor.(Lcom/google/common/util/concurrent/ListeningExecutorService;IZ)V + at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769) + at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195) + at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175) + at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064) + at io.delta.storage.S3SingleDriverLogStore.write(S3SingleDriverLogStore.java:299) + at io.delta.standalone.internal.storage.DelegatingLogStore.write(DelegatingLogStore.scala:91) +``` + +#### Delta Catalog Table Cache +As a performance optimization, the Delta Catalog automatically caches Delta tables, +since these tables can be expensive to recompute. + +This cache has default size of 100 (tables) and uses an LRU policy to evict old cached entries. +You can change this value by adding deltaCatalogTableCacheSize to your Flink cluster's +hadoop configuration. Please note that this configuration will have a global effect for every +Delta Catalog instance running on your cluster. See [Hadoop Configuration](#hadoop-configuration) section for details. + +### DDL commands +#### CREATE DATABASE +By default, Delta Catalog will use the `default` database. +Use the following example to create a separate database: + +```sql +CREATE DATABASE custom_DB; +USE custom_DB; +``` + +#### CREATE TABLE +To create non-partitioned table use `CREARTE TABLE` statement: +```sql +CREATE TABLE testTable ( + id BIGINT, + data STRING + ) WITH ( + 'connector' = 'delta', + 'table-path' = '', + '', + '' = ' +); +``` + +To create a partitioned table, use `PARTITIONED BY`: +```sql +CREATE TABLE testTable ( + id BIGINT, + data STRING, + part_a STRING, + part_b STRING + ) + PARTITIONED BY (part_a, part_b); + WITH ( + 'connector' = 'delta', + 'table-path' = '', + '', + '' = ' +); +``` + +Delta connector supports all Flink's table schema [types](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#list-of-data-types). + +Currently, we do not support computed and metadata columns, primary key and watermark definition in +`CREATE TABLE` statement. + +The mandatory DDL options are: ++ `connector` that must be set to 'delta' ++ `table-path` path (filesystem, S3 etc.) of your Delta table. + If the table doesn't exist on the file system, it will be created for you. + +Additionally, to the mandatory options, DDL for Delta table can accept other table properties. +These properties will be persisted into _delta_log for created table. However, they will not be used +by Delta connector during the processing. + +Properties NOT allowed as table properties defined in DDL: ++ job-specific-properties like: + + versionAsOf + + timestampAsOf + + startingTimestamp + + mode + + ignoreChanges + + ignoreDeletes ++ Delta Standalone log store configurations such as `delta.logStore.*` properties ++ Parquet Format options such as `parquet.*` + +##### Creating the Delta table +When executing `CREATE TABLE` for Delta connector, we can have two situations: ++ Delta table does not exist under `table-path` ++ Delta table already exists under `table-path` + +In the first case, Delta Catalog will create Delta table folder and initialize +an empty (zero row) Delta table with schema defined in DDL. Additionally, all table properties defined in DDL +except `connector` and `table-path` will be added to Delta table metadata. On top of that a metastore entry +for new table will be created. + +In the second case, where `_delta_log` already exists under specified `tabl-path`, Delta Catalog will throw an exception when: ++ DDL schema does not match `_delta_log` schema. ++ DDL partition definition does not match partition definition from `_delta_log`. ++ Table properties from DDL overrides existing table properties in `_delta_log` + +If all above checks were passing, Delta Catalog will add metastore entry for the new table and will +add new table properties to the existing `_delta_log`. + +#### CREATE TABLE LIKE +To create a table with the same schema, partitioning, and table properties as another table, use `CREATE TABLE LIKE`. + +```sql +CREATE TABLE testTable ( + id BIGINT, + data STRING + ) WITH ( + 'connector' = 'delta', + 'table-path' = '' +); + +CREATE TABLE likeTestTable + WITH ( + 'connector' = 'delta', + 'table-path' = '%s' +) LIKE testTable; +``` + +#### ALTER TABLE +Delta connector only supports: ++ altering table name, ++ altering table property value, ++ adding new table property. + +```sql +ALTER TABLE sourceTable SET ('userCustomProp'='myVal1') +ALTER TABLE sourceTable RENAME TO newSourceTable +``` + +#### DROP TABLE +To delete a table, run: +```sql +DROP TABLE sample; +``` + +This operation will remove ONLY the metastore entry. No Delta table files will be removed. + +### Querying with SQL +#### SELECT query +Delta connector supports both batch (default) and streaming read for Flink jobs. +In order to run `SELECT` query in `batch` mode run: +```sql +SELECT * FROM testTable; +``` +Above query will read all records from `testTable` and stop. It is suitable for `BATCH` Flink jobs. + +In order to run `SELECT` query in `streaming` mode run: +```sql +SELECT * FROM testTable /*+ OPTIONS('mode' = 'streaming') */; +``` +Above query will read all records from `testTable` and will keep monitoring underlying Delta table +for any new data (appends). + +Both queries above will read all columns from Delta table. In order to specify subset of columns +that should be read, specify those columns in `SELECT` statement instead using `*` like so: +```sql +SELECT col1, col2, col3 FROM testTable; +``` + +For more details about Flink `SELECT` statement, please look at [Flink SELECT documentation](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/select/). +#### INSERT query +To append new data to the Delta table with a Flink job, use `INSERT INTO` statement. + +Inserts three rows into Delta table called `sinkTable` and stops. +```sql +INSERT INTO sinkTable VALUES + ('a', 'b', 1), + ('c', 'd', 2), + ('e', 'f', 3); +``` + +For examples below it is assumed that `sourceTable` refers to Delta table (Delta connector). +Inserts entire content of table called `sourceTable` into Delta table `sinkTable` and stop. The table schema's must match. +```sql +INSERT INTO sinkTable SELECT * FROM sourceTable; +``` + +Inserts entire data from `sourceTable` into Delta table `sinkTable` under static partition `region = europe` and stops. +```sql +INSERT INTO sinkTable PARTITION (region='europe') SELECT * FROM sourceTable; +``` + +Creates a continuous query that will insert entire content of table called `sourceTable` into Delta table `sinkTable` and will continuously monitor `sourceTable` for new data. +```sql +INSERT INTO sinkTable SELECT * FROM sourceTable /*+ OPTIONS('mode' = 'streaming') */; +``` + +### Hadoop Configuration +Delta Connector will resolve Flink cluster Hadoop configuration in order to use properties such as +Delta log store properties or Delta Catalog cache size. + +For SQL jobs, Delta connector will resolve Flink cluster hadoop configuration in specify which takes higher/lower precedence: ++ `HADOOP_HOME` environment variable, ++ hdfs-default.xml pointed by deprecated flink config option `fs.hdfs.hdfsdefault` (deprecated), ++ `HADOOP_CONF_DIR` environment variable, ++ properties from Flink cluster config prefixed with `flink.hadoop`. + +### SQL API Limitations +The Delta connector currently supports only Physical columns. The Metadata and Computed columns +are currently not supported. For details please see [here](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#columns). + +Other unsupported features: ++ Watermark definition for CREATE TABLE statement. ++ Primary Key definition for CREATE TABLE statement. ++ Schema ALTER queries (create, drop column) including partitions columns. ++ Table and column comments. + ## Usage You can add the Flink/Delta Connector library as a dependency using your favorite build tool. Please note that it expects the following packages to be provided: +DataStream API Only: - `delta-standalone` - `flink-parquet` - `flink-table-common` - `hadoop-client` -Please see the following build files for more details. +Additional libraries for Table/SQL API: +- `flink-clients` +- `flink-table-planner_2.12` + +Additional libraries for AWS/S3 support +- enabling flink-s3-fs-hadoop plugin on Flink cluster [details here](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins) +- `hadoop-aws` ### Maven +Please see the following build files for more details. -Scala 2.12: +#### Scala 2.12: ```xml 2.12 - 0.6.0 + 2.5.0 1.16.1 3.1.0 @@ -357,28 +710,39 @@ Scala 2.12: flink-parquet_${scala.main.version} ${flink-version} - - org.apache.hadoop - hadoop-client - ${hadoop-version} - org.apache.flink flink-table-common ${flink-version} provided - - org.apache.flink - flink-table-runtime_${scala.main.version} - ${flink-version} - provided - - + + + + org.apache.hadoop + hadoop-client + ${hadoop-version} + + + org.apache.flink + flink-table-runtime_${scala.main.version} + ${flink-version} + provided + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop-version} + + + ``` -### SBT +#### SBT Please replace the versions of the dependencies with the ones you are using.