Skip to content

Commit

Permalink
Support Venice Primitive keys (#83)
Browse files Browse the repository at this point in the history
* Support Venice Primitive keys & add tests

* Fix template reuse
  • Loading branch information
jogrogan authored Jan 10, 2025
1 parent 1dd5c65 commit 574be65
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 7 deletions.
5 changes: 3 additions & 2 deletions deploy/samples/venicedb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ spec:
connector = venice
storeName = {{table}}
partial-update-mode = true
key.fields-prefix = KEY_
key.fields = {{keys}}
key.fields-prefix = {{keyPrefix:}}
key.fields = {{keys:KEY}}
key.type = {{keyType:PRIMITIVE}}
value.fields-include: EXCEPT_KEY
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class K8sConnector implements Connector<Source> {
@Override
public Map<String, String> configure(Source source) throws SQLException {
Template.Environment env =
Template.Environment.EMPTY.with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
new Template.SimpleEnvironment().with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
.with("database", source.database())
.with("table", source.table())
.with(source.options());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class K8sJobDeployer extends K8sYamlDeployer<Job> {
@Override
public List<String> specify(Job job) throws SQLException {
Function<SqlDialect, String> sql = job.sql();
Template.Environment env = Template.Environment.EMPTY.with("name",
Template.Environment env = new Template.SimpleEnvironment().with("name",
job.sink().database() + "-" + job.sink().table().toLowerCase(Locale.ROOT))
.with("database", job.sink().database())
.with("schema", job.sink().schema())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class K8sSourceDeployer extends K8sYamlDeployer<Source> {
@Override
public List<String> specify(Source source) throws SQLException {
Template.Environment env =
Template.Environment.EMPTY.with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
new Template.SimpleEnvironment().with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
.with("database", source.database())
.with("schema", source.schema())
.with("table", source.table())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public SimpleTemplate(String template) {
public String render(Environment env) {
StringBuffer sb = new StringBuffer();
Pattern p =
Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*((\\w+\\s*)*)\\s*\\}\\}");
Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]*))?\\s*((\\w+\\s*)*)\\s*\\}\\}");
Matcher m = p.matcher(template);
while (m.find()) {
String prefix = m.group(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.calcite.util.Litmus;
import org.apache.calcite.util.Pair;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;

import com.linkedin.hoptimator.Deployable;
Expand All @@ -44,6 +45,8 @@ public interface PipelineRel extends RelNode {

Convention CONVENTION = new Convention.Impl("PIPELINE", PipelineRel.class);
String KEY_OPTION = "keys";
String KEY_PREFIX_OPTION = "keyPrefix";
String KEY_TYPE_OPTION = "keyType";
String KEY_PREFIX = "KEY_";

void implement(Implementor implementor) throws SQLException;
Expand Down Expand Up @@ -95,7 +98,8 @@ public void setSink(String database, List<String> path, RelDataType rowType, Map
this.sinkOptions = addKeysAsOption(options, rowType);
}

private Map<String, String> addKeysAsOption(Map<String, String> options, RelDataType rowType) {
@VisibleForTesting
static Map<String, String> addKeysAsOption(Map<String, String> options, RelDataType rowType) {
Map<String, String> newOptions = new LinkedHashMap<>(options);

RelDataType flattened = DataTypeUtils.flatten(rowType, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT));
Expand All @@ -104,12 +108,15 @@ private Map<String, String> addKeysAsOption(Map<String, String> options, RelData
if (newOptions.containsKey(KEY_OPTION)) {
return newOptions;
}

String keyString = flattened.getFieldList().stream()
.map(x -> x.getName().replaceAll("\\$", "_"))
.filter(name -> name.startsWith(KEY_PREFIX))
.collect(Collectors.joining(";"));
if (!keyString.isEmpty()) {
newOptions.put(KEY_OPTION, keyString);
newOptions.put(KEY_PREFIX_OPTION, KEY_PREFIX);
newOptions.put(KEY_TYPE_OPTION, "RECORD");
}
return newOptions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.linkedin.hoptimator.util;

import java.util.Arrays;
import java.util.List;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestTemplate {

@Test
public void testRender() {
Template.Environment env = new Template.SimpleEnvironment()
.with("name", "name")
.with("nameUpper", "name")
.with("nameLower", "NAME")
.with("multiline", "1\n2\n3\n")
.with("multilineUpper", "a\nb\nc\n")
.with("other", "test");

String template = "{{keys:KEY}}\n"
+ "{{keyPrefix:}}\n"
+ "{{name:default}}\n"
+ "{{nameUpper toUpperCase}}\n"
+ "{{nameLower toLowerCase}}\n"
+ "{{multiline concat}}\n"
+ "{{multilineUpper concat toUpperCase}}\n"
+ "{{other unknown}}\n";

String renderedTemplate = new Template.SimpleTemplate(template).render(env);
List<String> renderedTemplates = Arrays.asList(renderedTemplate.split("\n"));
assertEquals(8, renderedTemplates.size());
assertEquals("KEY", renderedTemplates.get(0));
assertEquals("", renderedTemplates.get(1));
assertEquals("name", renderedTemplates.get(2));
assertEquals("NAME", renderedTemplates.get(3));
assertEquals("name", renderedTemplates.get(4));
assertEquals("123", renderedTemplates.get(5));
assertEquals("ABC", renderedTemplates.get(6));
assertEquals("test", renderedTemplates.get(7));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.linkedin.hoptimator.util.planner;

import java.util.HashMap;
import java.util.Map;

import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.jupiter.api.Test;

import static com.linkedin.hoptimator.util.planner.PipelineRel.Implementor.addKeysAsOption;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestPipelineRel {

@Test
public void testKeyOptions() {
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
RelDataTypeFactory.Builder primitiveKeyBuilder = new RelDataTypeFactory.Builder(typeFactory);
primitiveKeyBuilder.add("KEY", SqlTypeName.VARCHAR);
primitiveKeyBuilder.add("intField", SqlTypeName.INTEGER);
Map<String, String> keyOptions = addKeysAsOption(new HashMap<>(), primitiveKeyBuilder.build());
assertTrue(keyOptions.isEmpty());

RelDataTypeFactory.Builder keyBuilder = new RelDataTypeFactory.Builder(typeFactory);
keyBuilder.add("keyInt", SqlTypeName.INTEGER);
keyBuilder.add("keyString", SqlTypeName.VARCHAR);
RelDataTypeFactory.Builder recordBuilder = new RelDataTypeFactory.Builder(typeFactory);
recordBuilder.add("intField", SqlTypeName.INTEGER);
recordBuilder.add("KEY", keyBuilder.build());
keyOptions = addKeysAsOption(new HashMap<>(), recordBuilder.build());
assertEquals(3, keyOptions.size());
assertEquals("KEY_keyInt;KEY_keyString", keyOptions.get("keys"));
assertEquals("KEY_", keyOptions.get("keyPrefix"));
assertEquals("RECORD", keyOptions.get("keyType"));
}
}

0 comments on commit 574be65

Please sign in to comment.