Skip to content

Commit

Permalink
Add Venice support (#78)
Browse files Browse the repository at this point in the history
* Add Venice support

* Fix insert to exclude non-specified fields for Venice

* Fix target column inserts

* Address feedback, rebase to work with structured Key type, add tests

* Address feedback
  • Loading branch information
jogrogan authored Jan 6, 2025
1 parent 64b7b73 commit 70132fb
Show file tree
Hide file tree
Showing 38 changed files with 836 additions and 118 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.gradle
.idea/
/build
*/build/
*/*.iml
./models/external/
Expand Down
68 changes: 50 additions & 18 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,63 @@ build:

bounce: build undeploy deploy deploy-samples deploy-config deploy-demo

# Integration tests expect K8s and Kafka to be running
integration-tests: deploy-dev-environment deploy-samples
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid
./gradlew intTest || kill `cat port-forward.pid`
kill `cat port-forward.pid`

clean:
./gradlew clean

deploy-config:
kubectl create configmap hoptimator-configmap --from-file=model.yaml=test-model.yaml --dry-run=client -o yaml | kubectl apply -f -

undeploy-config:
kubectl delete configmap hoptimator-configmap || echo "skipping"

deploy: deploy-config
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy

undeploy: undeploy-config
kubectl delete -f ./deploy || echo "skipping"
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"

quickstart: build deploy

deploy-demo: deploy
kubectl apply -f ./deploy/samples/demodb.yaml

undeploy-demo: undeploy
kubectl delete -f ./deploy/samples/demodb.yaml

deploy-samples: deploy
kubectl wait --for=condition=Established=True \
crds/subscriptions.hoptimator.linkedin.com \
crds/kafkatopics.hoptimator.linkedin.com \
crds/sqljobs.hoptimator.linkedin.com
kubectl apply -f ./deploy/samples

deploy-dev-environment: deploy-config
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
undeploy-samples: undeploy
kubectl delete -f ./deploy/samples || echo "skipping"

deploy-flink:
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.9.0/
helm upgrade --install --atomic --set webhook.create=false flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

undeploy-flink:
kubectl delete flinkdeployments.flink.apache.org --all || echo "skipping"
kubectl delete flinksessionjobs.flink.apache.org --all || echo "skipping"
kubectl delete crd flinkdeployments.flink.apache.org || echo "skipping"
kubectl delete crd flinksessionjobs.flink.apache.org || echo "skipping"
helm uninstall flink-kubernetes-operator || echo "skipping"

deploy-kafka: deploy deploy-flink
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"
kubectl create namespace kafka || echo "skipping"
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
kubectl apply -f ./hoptimator-k8s/src/main/resources/
kubectl apply -f ./deploy/dev
kubectl apply -f ./deploy/samples/demodb.yaml
kubectl apply -f ./deploy/samples/kafkadb.yaml

undeploy-dev-environment:
undeploy-kafka:
kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping"
kubectl delete strimzi -n kafka --all || echo "skipping"
kubectl delete pvc -l strimzi.io/name=one-kafka -n kafka || echo "skipping"
Expand All @@ -65,12 +78,31 @@ undeploy-dev-environment:
kubectl delete -f ./deploy/dev || echo "skipping"
kubectl delete -f ./hoptimator-k8s/src/main/resources/ || echo "skipping"
kubectl delete namespace kafka || echo "skipping"
helm uninstall flink-kubernetes-operator || echo "skipping"
kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml || echo "skipping"

undeploy: undeploy-dev-environment
kubectl delete -f ./deploy || echo "skipping"
kubectl delete configmap hoptimator-configmap || echo "skipping"
# Deploys Venice cluster in docker and creates two stores in Venice. Stores are not managed via K8s for now.
deploy-venice: deploy deploy-flink
docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml up -d --wait
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store schemas/keySchema.avsc schemas/valueSchema.avsc
docker exec venice-client ./create-store.sh http://venice-controller:5555 venice-cluster0 test-store-1 schemas/keySchema.avsc schemas/valueSchema.avsc
kubectl apply -f ./deploy/samples/venicedb.yaml

undeploy-venice:
kubectl delete -f ./deploy/samples/venicedb.yaml || echo "skipping"
docker compose -f ./deploy/docker/docker-compose-single-dc-setup.yaml down

deploy-dev-environment: deploy deploy-flink deploy-kafka deploy-venice

undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy

# Integration tests expect K8s, Kafka, and Venice to be running
integration-tests: deploy-dev-environment deploy-samples
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid
./gradlew intTest || kill `cat port-forward.pid`
kill `cat port-forward.pid`

generate-models:
./generate-models.sh
Expand All @@ -80,4 +112,4 @@ release:
test -n "$(VERSION)" # MISSING ARG: $$VERSION
./gradlew publish

.PHONY: build test install clean quickstart deploy-dev-environment deploy deploy-samples deploy-demo deploy-config integration-tests bounce generate-models release
.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-demo undeploy-demo deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice integration-tests deploy-dev-environment undeploy-dev-environment generate-models release
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ The below setup will install a Kafka and Flink cluster within Kubernetes.

```
$ make install # build and install SQL CLI
$ make deploy-dev-environment # start local Kafka & Flink setups
$ make deploy-dev-environment # start all local dev setups
$ kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & # forward external Kafka port for use by SQL CLI
$ ./hoptimator # start the SQL CLI
> !intro
Expand Down
84 changes: 84 additions & 0 deletions deploy/docker/docker-compose-single-dc-setup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
services:
zookeeper:
image: venicedb/apache-zookeeper:3.9.0
container_name: zookeeper
hostname: zookeeper
healthcheck:
test: ["CMD-SHELL", "echo ruok | nc zookeeper 2181"]
start_period: 10s
interval: 5s
timeout: 5s
retries: 5

kafka:
image: venicedb/apache-kafka:3.3.1
container_name: kafka
hostname: kafka
environment:
- ZOOKEEPER_ADDRESS=zookeeper:2181
depends_on:
zookeeper:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "bash -x bin/kafka-topics.sh --bootstrap-server localhost:9092 --list"]
start_period: 60s
interval: 5s
timeout: 20s
retries: 5

venice-controller:
image: venicedb/venice-controller:0.4.340
container_name: venice-controller
hostname: venice-controller
depends_on:
kafka:
condition: service_healthy
ports:
- 5555:5555
healthcheck:
test: ["CMD-SHELL", "sleep 5"]
start_period: 20s
interval: 5s
timeout: 20s
retries: 5

venice-server:
image: venicedb/venice-server:0.4.340
container_name: venice-server
hostname: venice-server
depends_on:
venice-controller:
condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "sleep 5"]
start_period: 20s
interval: 5s
timeout: 20s
retries: 5

venice-router:
image: venicedb/venice-router:0.4.340
container_name: venice-router
hostname: venice-router
depends_on:
venice-server:
condition: service_healthy
ports:
- 7777:7777
healthcheck:
test: ["CMD-SHELL", "sleep 5"]
start_period: 20s
interval: 5s
timeout: 20s
retries: 5

venice-client:
image: venicedb/venice-client:0.4.340
container_name: venice-client
hostname: venice-client
tty: true
volumes:
- ./venice:/opt/venice/schemas
depends_on:
venice-router:
condition: service_healthy
11 changes: 11 additions & 0 deletions deploy/docker/venice/keySchema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"type": "record",
"name": "SampleTableKey",
"doc": "SampleTableKey",
"fields": [
{
"name": "id",
"type": "int"
}
]
}
23 changes: 23 additions & 0 deletions deploy/docker/venice/valueSchema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"type": "record",
"name": "SampleTableValue",
"doc": "SampleTableValue",
"fields": [
{
"name": "intField",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "stringField",
"type": [
"null",
"string"
],
"default": null
}
]
}
25 changes: 25 additions & 0 deletions deploy/samples/venicedb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Database
metadata:
name: venice-cluster0
spec:
schema: VENICE-CLUSTER0
url: jdbc:venice://cluster=venice-cluster0;router.url=http://localhost:7777
dialect: Calcite

---

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: TableTemplate
metadata:
name: venice-template-cluster0
spec:
databases:
- venice-cluster0
connector: |
connector = venice
storeName = {{table}}
partial-update-mode = true
key.fields-prefix = KEY_
key.fields = {{keys}}
value.fields-include: EXCEPT_KEY
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ slf4j-api = "org.slf4j:slf4j-api:1.7.30"
sqlline = "sqlline:sqlline:1.12.0"
commons-cli = "commons-cli:commons-cli:1.4"
quidem = "net.hydromatic:quidem:0.11"
venice = "com.linkedin.venice:venice-common:0.4.376"
venice-client = "com.linkedin.venice:venice-thin-client:0.4.376"
4 changes: 2 additions & 2 deletions hoptimator-avro/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ publishing {
license {
name = 'BSD 2-Clause'
url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE'
}
}
}
scm {
connection = 'scm:git:git://github.com:linkedin/Hoptimator.git'
developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git'
Expand All @@ -52,4 +52,4 @@ publishing {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Represents something required by a Table.
Expand All @@ -37,6 +40,7 @@
* for informational/debugging purposes.
*/
public abstract class Resource {
private static final Logger log = LoggerFactory.getLogger(Resource.class);
private final String template;
private final SortedMap<String, Supplier<String>> properties = new TreeMap<>();
private final List<Resource> inputs = new ArrayList<>();
Expand Down Expand Up @@ -345,6 +349,9 @@ private static String applyTransform(String value, String transform) {
case "concat":
res = res.replace("\n", "");
break;
default:
log.warn("Transformation function '{}' not found", f);
break;
}
}
return res;
Expand Down
1 change: 1 addition & 0 deletions hoptimator-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
implementation project(':hoptimator-demodb')
implementation project(':hoptimator-jdbc')
implementation project(':hoptimator-kafka')
implementation project(':hoptimator-venice')
implementation project(':hoptimator-k8s')
implementation project(':hoptimator-util')
implementation libs.calcite.core
Expand Down
10 changes: 5 additions & 5 deletions hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.Scanner;

import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.jline.reader.Completer;

import com.linkedin.hoptimator.SqlDialect;
Expand Down Expand Up @@ -87,8 +87,8 @@ public void execute(String line, DispatchCallback dispatchCallback) {
String sql = split[1];
CalciteConnection conn = (CalciteConnection) sqlline.getConnection();
try {
RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel;
PipelineRel.Implementor plan = DeploymentService.plan(rel);
RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root;
PipelineRel.Implementor plan = DeploymentService.plan(root);
sqlline.output(plan.sql().apply(SqlDialect.ANSI));
} catch (SQLException e) {
sqlline.error(e);
Expand Down Expand Up @@ -155,9 +155,9 @@ public void execute(String line, DispatchCallback dispatchCallback) {
}
String sql = split[1];
CalciteConnection conn = (CalciteConnection) sqlline.getConnection();
RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel;
RelRoot root = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root;
try {
List<String> specs = DeploymentService.plan(rel).pipeline().specify();
List<String> specs = DeploymentService.plan(root).pipeline().specify();
specs.forEach(x -> sqlline.output(x + "\n\n---\n\n"));
} catch (SQLException e) {
sqlline.error(e);
Expand Down
2 changes: 1 addition & 1 deletion hoptimator-cli/src/main/resources/intro.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Try:
> !tables
> !schemas
> create view foo as select * from ads.ad_clicks natural join profile.members;
> !yaml select * from foo
> !specify select * from foo
> !pipeline select * from foo


Loading

0 comments on commit 70132fb

Please sign in to comment.