Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for Delta Lake connector #531

Merged
merged 6 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.

- Various documentation of the CRD ([#510]).
- Helm: support labels in values.yaml ([#528]).
- Delta Lake connector ([#531]).

### Fixed

Expand All @@ -16,6 +17,7 @@ All notable changes to this project will be documented in this file.
[#510]: https://github.com/stackabletech/trino-operator/pull/510
[#526]: https://github.com/stackabletech/trino-operator/pull/526
[#528]: https://github.com/stackabletech/trino-operator/pull/528
[#531]: https://github.com/stackabletech/trino-operator/pull/531

## [23.11.0] - 2023-11-24

Expand Down
130 changes: 130 additions & 0 deletions deploy/helm/trino-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14265,6 +14265,8 @@ spec:
oneOf:
- required:
- blackHole
- required:
- deltaLake
- required:
- googleSheet
- required:
Expand All @@ -14281,6 +14283,134 @@ spec:
blackHole:
description: A [Black Hole](https://docs.stackable.tech/home/nightly/trino/usage-guide/catalogs/black-hole) connector.
type: object
deltaLake:
description: An [Delta Lake](https://docs.stackable.tech/home/nightly/trino/usage-guide/catalogs/delta-lake) connector.
properties:
hdfs:
description: Connection to an HDFS cluster. Please make sure that the underlying Hive metastore also has access to the HDFS.
nullable: true
properties:
configMap:
description: Name of the [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) providing information about the HDFS cluster.
type: string
required:
- configMap
type: object
metastore:
description: Mandatory connection to a Hive Metastore, which will be used as a storage for metadata.
properties:
configMap:
description: Name of the [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) providing information about the Hive metastore.
type: string
required:
- configMap
type: object
s3:
description: Connection to an S3 store. Please make sure that the underlying Hive metastore also has access to the S3 store. Learn more about S3 configuration in the [S3 concept docs](https://docs.stackable.tech/home/nightly/concepts/s3).
nullable: true
oneOf:
- required:
- inline
- required:
- reference
properties:
inline:
description: Inline definition of an S3 connection.
properties:
accessStyle:
description: Which access style to use. Defaults to virtual hosted-style as most of the data products out there. Have a look at the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html).
enum:
- Path
- VirtualHosted
nullable: true
type: string
credentials:
description: If the S3 uses authentication you have to specify you S3 credentials. In the most cases a [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) providing `accessKey` and `secretKey` is sufficient.
nullable: true
properties:
scope:
description: '[Scope](https://docs.stackable.tech/home/nightly/secret-operator/scope) of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass).'
nullable: true
properties:
node:
default: false
description: The node scope is resolved to the name of the Kubernetes Node object that the Pod is running on. This will typically be the DNS name of the node.
type: boolean
pod:
default: false
description: The pod scope is resolved to the name of the Kubernetes Pod. This allows the secret to differentiate between StatefulSet replicas.
type: boolean
services:
default: []
description: The service scope allows Pod objects to specify custom scopes. This should typically correspond to Service objects that the Pod participates in.
items:
type: string
type: array
type: object
secretClass:
description: '[SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) containing the LDAP bind credentials.'
type: string
required:
- secretClass
type: object
host:
description: 'Hostname of the S3 server without any protocol or port. For example: `west1.my-cloud.com`.'
nullable: true
type: string
port:
description: Port the S3 server listens on. If not specified the product will determine the port to use.
format: uint16
minimum: 0.0
nullable: true
type: integer
tls:
description: If you want to use TLS when talking to S3 you can enable TLS encrypted communication with this setting.
nullable: true
properties:
verification:
description: The verification method used to verify the certificates of the server and/or the client.
oneOf:
- required:
- none
- required:
- server
properties:
none:
description: Use TLS but don't verify certificates.
type: object
server:
description: Use TLS and a CA certificate to verify the server.
properties:
caCert:
description: CA cert to verify the server.
oneOf:
- required:
- webPki
- required:
- secretClass
properties:
secretClass:
description: Name of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) which will provide the CA certificate. Note that a SecretClass does not need to have a key but can also work with just a CA certificate, so if you got provided with a CA cert but don't have access to the key you can still use this method.
type: string
webPki:
description: Use TLS and the CA certificates trusted by the common web browsers to verify the server. This can be useful when you e.g. use public AWS S3 or other public available services.
type: object
type: object
required:
- caCert
type: object
type: object
required:
- verification
type: object
type: object
reference:
description: A reference to an S3Connection resource.
type: string
type: object
required:
- metastore
type: object
generic:
description: A [generic](https://docs.stackable.tech/home/nightly/trino/usage-guide/catalogs/generic) connector.
properties:
Expand Down
34 changes: 34 additions & 0 deletions docs/modules/trino/pages/usage-guide/catalogs/delta-lake.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
= Delta Lake

[Delta Lake](https://delta.io/) is an open-source storage framework that enables building a Lakehouse architecture with support for multiple compute engines.
It depends on a Hive metastore being present and makes use of the same metastore configMap used by the Hive connector.
sbernauer marked this conversation as resolved.
Show resolved Hide resolved

== Example Delta Lake catalog configuration

[source,yaml]
----
apiVersion: trino.stackable.tech/v1alpha1
kind: TrinoCatalog
metadata:
# The name of the catalog as it will appear in Trino
name: delta
labels:
trino: simple-trino
spec:
connector:
# Specify deltaLake here when defining a Delta Lake catalog
deltaLake:
metastore:
configMap: simple-hive-derby
maltesander marked this conversation as resolved.
Show resolved Hide resolved
s3:
inline:
host: test-minio
port: 9000
accessStyle: Path
credentials:
secretClass: minio-credentials
----

== Connect to S3 store or HDFS
The Delta Lake connector can connect to S3 or HDFS in the same way the xref:usage-guide/catalogs/hive.adoc[] connector does.
Please check that documentation on how to configure the access.
4 changes: 2 additions & 2 deletions docs/modules/trino/pages/usage-guide/catalogs/iceberg.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
= Apache Iceberg

Apache Iceberg is a format for huge analytic tables designed to address some of the scaling issues with traditional Hive tables. Iceberg depends on Hive metastore being present and makes use of the same metastore configMap used by the Hive connector.
Apache Iceberg is a format for huge analytic tables designed to address some of the scaling issues with traditional Hive tables. Iceberg depends on a Hive metastore being present and makes use of the same metastore configMap used by the Hive connector.

== Example Iceberg catalog configuration

Expand Down Expand Up @@ -29,5 +29,5 @@ spec:
----

== Connect to S3 store or HDFS
The `iceberg` connector can connect to S3 or HDFS in the same way the xref:usage-guide/catalogs/hive.adoc[] connector does.
The Iceberg connector can connect to S3 or HDFS in the same way the xref:usage-guide/catalogs/hive.adoc[] connector does.
Please check that documentation on how to configure the access.
1 change: 1 addition & 0 deletions docs/modules/trino/partials/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
** xref:trino:usage-guide/overrides.adoc[]
** xref:trino:usage-guide/catalogs/index.adoc[]
*** xref:trino:usage-guide/catalogs/black-hole.adoc[]
*** xref:trino:usage-guide/catalogs/delta-lake.adoc[]
*** xref:trino:usage-guide/catalogs/generic.adoc[]
*** xref:trino:usage-guide/catalogs/google-sheets.adoc[]
*** xref:trino:usage-guide/catalogs/hive.adoc[]
Expand Down
10 changes: 8 additions & 2 deletions rust/crd/src/affinity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ pub fn get_affinity(
.iter()
.filter_map(|catalog| match &catalog.spec.connector {
TrinoCatalogConnector::Hive(hive) => Some(&hive.metastore.config_map),
TrinoCatalogConnector::Iceberg(iceberg) => Some(&iceberg.metastore.config_map),
TrinoCatalogConnector::Iceberg(iceberg) => Some(&iceberg.hive.metastore.config_map),
TrinoCatalogConnector::DeltaLake(delta_lake) => {
Some(&delta_lake.hive.metastore.config_map)
}
TrinoCatalogConnector::BlackHole(_)
| TrinoCatalogConnector::Generic(_)
| TrinoCatalogConnector::GoogleSheet(_)
Expand All @@ -42,7 +45,10 @@ pub fn get_affinity(
hive.hdfs.as_ref().map(|hdfs| &hdfs.config_map)
}
TrinoCatalogConnector::Iceberg(iceberg) => {
iceberg.hdfs.as_ref().map(|hdfs| &hdfs.config_map)
iceberg.hive.hdfs.as_ref().map(|hdfs| &hdfs.config_map)
}
TrinoCatalogConnector::DeltaLake(delta_lake) => {
delta_lake.hive.hdfs.as_ref().map(|hdfs| &hdfs.config_map)
}
TrinoCatalogConnector::BlackHole(_)
| TrinoCatalogConnector::Generic(_)
Expand Down
11 changes: 11 additions & 0 deletions rust/crd/src/catalog/delta_lake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use super::hive::HiveConnector;
use serde::{Deserialize, Serialize};
use stackable_operator::schemars::{self, JsonSchema};

#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DeltaLakeConnector {
/// The Hive connector exposes the same config
#[serde(flatten)]
pub hive: HiveConnector,
}
sbernauer marked this conversation as resolved.
Show resolved Hide resolved
19 changes: 5 additions & 14 deletions rust/crd/src/catalog/iceberg.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
use super::commons::{HdfsConnection, MetastoreConnection};
use super::hive::HiveConnector;
use serde::{Deserialize, Serialize};
use stackable_operator::{
commons::s3::S3ConnectionDef,
schemars::{self, JsonSchema},
};
use stackable_operator::schemars::{self, JsonSchema};

#[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct IcebergConnector {
/// Mandatory connection to a Hive Metastore, which will be used as a storage for metadata.
pub metastore: MetastoreConnection,
/// Connection to an S3 store.
/// Please make sure that the underlying Hive metastore also has access to the S3 store.
/// Learn more about S3 configuration in the [S3 concept docs](DOCS_BASE_URL_PLACEHOLDER/concepts/s3).
pub s3: Option<S3ConnectionDef>,
/// Connection to an HDFS cluster.
/// Please make sure that the underlying Hive metastore also has access to the HDFS.
pub hdfs: Option<HdfsConnection>,
/// The Hive connector exposes the same config
#[serde(flatten)]
pub hive: HiveConnector,
}
6 changes: 6 additions & 0 deletions rust/crd/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod black_hole;
pub mod commons;
pub mod delta_lake;
pub mod generic;
pub mod google_sheet;
pub mod hive;
Expand All @@ -22,6 +23,8 @@ use iceberg::IcebergConnector;
use tpcds::TpcdsConnector;
use tpch::TpchConnector;

use self::delta_lake::DeltaLakeConnector;

/// The TrinoCatalog resource can be used to define catalogs in Kubernetes objects.
/// Read more about it in the [Trino operator concept docs](DOCS_BASE_URL_PLACEHOLDER/trino/concepts)
/// and the [Trino operator usage guide](DOCS_BASE_URL_PLACEHOLDER/trino/usage-guide/catalogs/).
Expand Down Expand Up @@ -55,6 +58,9 @@ pub enum TrinoCatalogConnector {
/// A [Black Hole](DOCS_BASE_URL_PLACEHOLDER/trino/usage-guide/catalogs/black-hole) connector.
BlackHole(BlackHoleConnector),

/// An [Delta Lake](DOCS_BASE_URL_PLACEHOLDER/trino/usage-guide/catalogs/delta-lake) connector.
DeltaLake(DeltaLakeConnector),

/// A [Google sheets](DOCS_BASE_URL_PLACEHOLDER/trino/usage-guide/catalogs/google-sheets) connector.
GoogleSheet(GoogleSheetConnector),

Expand Down
5 changes: 5 additions & 0 deletions rust/operator-binary/src/catalog/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ impl CatalogConfig {
.to_catalog_config(&catalog_name, catalog_namespace, client)
.await
}
TrinoCatalogConnector::DeltaLake(connector) => {
connector
.to_catalog_config(&catalog_name, catalog_namespace, client)
.await
}
TrinoCatalogConnector::GoogleSheet(connector) => {
connector
.to_catalog_config(&catalog_name, catalog_namespace, client)
Expand Down
47 changes: 47 additions & 0 deletions rust/operator-binary/src/catalog/delta_lake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use super::{config::CatalogConfig, ExtendCatalogConfig, FromTrinoCatalogError, ToCatalogConfig};
use async_trait::async_trait;
use stackable_operator::client::Client;
use stackable_trino_crd::catalog::delta_lake::DeltaLakeConnector;

pub const CONNECTOR_NAME: &str = "delta_lake";

#[async_trait]
impl ToCatalogConfig for DeltaLakeConnector {
async fn to_catalog_config(
&self,
catalog_name: &str,
catalog_namespace: Option<String>,
client: &Client,
) -> Result<CatalogConfig, FromTrinoCatalogError> {
let mut config = CatalogConfig::new(catalog_name.to_string(), CONNECTOR_NAME);

// No authorization checks are enforced at the catalog level.
// We don't want the delta connector to prevent users from dropping tables.
// We also don't want that the delta connector makes decisions on which user is allowed to do what.
// This decision should be done globally (for all catalogs) by OPA.
// See https://trino.io/docs/current/connector/delta-lake.html
config.add_property("delta.security", "allow-all");

self.hive
.metastore
.extend_catalog_config(&mut config, catalog_name, catalog_namespace.clone(), client)
.await?;

if let Some(ref s3) = self.hive.s3 {
s3.extend_catalog_config(&mut config, catalog_name, catalog_namespace.clone(), client)
.await?;
}

if let Some(ref hdfs) = self.hive.hdfs {
hdfs.extend_catalog_config(
&mut config,
catalog_name,
catalog_namespace.clone(),
client,
)
.await?;
}

Ok(config)
}
}
7 changes: 4 additions & 3 deletions rust/operator-binary/src/catalog/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ impl ToCatalogConfig for IcebergConnector {
// See https://trino.io/docs/current/connector/iceberg.html
config.add_property("iceberg.security", "allow-all");

self.metastore
self.hive
.metastore
.extend_catalog_config(&mut config, catalog_name, catalog_namespace.clone(), client)
.await?;

if let Some(ref s3) = self.s3 {
if let Some(ref s3) = self.hive.s3 {
s3.extend_catalog_config(&mut config, catalog_name, catalog_namespace.clone(), client)
.await?;
}

if let Some(ref hdfs) = self.hdfs {
if let Some(ref hdfs) = self.hive.hdfs {
hdfs.extend_catalog_config(
&mut config,
catalog_name,
Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod black_hole;
pub mod commons;
pub mod config;
pub mod delta_lake;
pub mod generic;
pub mod google_sheet;
pub mod hive;
Expand Down
Loading