Skip to content

Commit

Permalink
Small robustness improvements (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenvandisseldorp authored Sep 6, 2023
1 parent 046621e commit dd0badf
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 81 deletions.
38 changes: 27 additions & 11 deletions ksml-runner/src/main/java/io/axual/ksml/runner/KSMLRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -81,19 +81,35 @@ public static void main(String[] args) {
final KSMLRunnerConfig config = mapper.readValue(configPath, KSMLRunnerConfig.class);
config.validate();
log.info("Using {} backend", config.getBackendConfig().getType());
Backend backend = config.getConfiguredBackend();
try (final Backend backend = config.getConfiguredBackend()) {
var shutdownHook = new Thread(() -> {
try {
log.debug("In KSML shutdown hook");
backend.close();
} catch (Exception e) {
log.error("Could not properly close the KSML backend", e);
}
});

if (Boolean.TRUE.equals(config.getKSMLRunnerKsmlConfig().getApplicationServerEnabled())) {
// Run with the REST server
HostInfo hostInfo = new HostInfo(config.getKSMLRunnerKsmlConfig().getApplicationServerHost(), config.getKSMLRunnerKsmlConfig().getApplicationServerPort());
Runtime.getRuntime().addShutdownHook(shutdownHook);

try (RestServer restServer = new RestServer(hostInfo)) {
restServer.start(backend.getQuerier());
if (Boolean.TRUE.equals(config.getKSMLRunnerKsmlConfig().getApplicationServerEnabled())) {
// Run with the REST server
HostInfo hostInfo = new HostInfo(config.getKSMLRunnerKsmlConfig().getApplicationServerHost(), config.getKSMLRunnerKsmlConfig().getApplicationServerPort());

try (RestServer restServer = new RestServer(hostInfo)) {
restServer.start(backend.getQuerier());
run(backend);
}
} else {
// Run without the REST server
run(backend);
}
} else {
// Run without the REST server
run(backend);

Runtime.getRuntime().removeShutdownHook(shutdownHook);
} catch (Exception e) {
log.error("An exception occurred while running KSML", e);
System.exit(2);
}
} catch (IOException e) {
log.error("An exception occurred while reading the configuration", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -235,6 +235,9 @@ private void createStreams() {
log.info("Creating StreamRunnerConfig...");

log.info("Creating AxualStreams...");

// TODO: create sane configuration here to allow for quick restart after crashes
// configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "vastewaarde");
axualStreams = new AxualStreams(configs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -73,7 +73,7 @@ public static DataSchema getSchema(String notationName, String schemaName, boole
}

if (!allowNull && schema == null) {
throw new KSMLExecutionException("Can not load schema: " + schemaName);
throw new KSMLExecutionException("Can not load " + (notationName != null ? notationName : "UNKNOWN") + " schema: " + schemaName);
}
return schema;
}
Expand Down
145 changes: 80 additions & 65 deletions ksml/src/main/java/io/axual/ksml/parser/UserTypeParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -70,7 +70,7 @@ private UserTypeParser() {
}

public static UserType parse(String type) {
UserType[] types = parseListOfTypesAndNotation(type, UserType.DEFAULT_NOTATION, true);
UserType[] types = parseListOfTypesAndNotation(type, UserType.DEFAULT_NOTATION);
if (types.length == 1) {
return types[0];
}
Expand All @@ -79,18 +79,18 @@ public static UserType parse(String type) {

// Parses a list of comma-separated user data types. If no comma is found, then the returned
// list only contains one dataType.
private static UserType[] parseListOfTypesAndNotation(String type, String defaultNotation, boolean allowOverrideNotation) {
private static UserType[] parseListOfTypesAndNotation(String type, String defaultNotation) {
if (type == null || type.isEmpty()) {
return new UserType[]{UNKNOWN};
}
type = type.trim();

String leftTerm = parseLeftMostTerm(type);
String remainder = type.substring(leftTerm.length()).trim();
UserType leftTermType = parseTypeAndNotation(leftTerm, defaultNotation, allowOverrideNotation);
UserType leftTermType = parseTypeAndNotation(leftTerm, defaultNotation);
var remainderTypes = new UserType[0];
if (remainder.startsWith(TYPE_SEPARATOR)) {
remainderTypes = parseListOfTypesAndNotation(remainder.substring(1), defaultNotation, false);
remainderTypes = parseListOfTypesAndNotation(remainder.substring(1), defaultNotation);
} else if (!remainder.isEmpty()) {
throw new KSMLParseException("Could not parse data type: " + type);
}
Expand All @@ -101,123 +101,138 @@ private static UserType[] parseListOfTypesAndNotation(String type, String defaul
return result;
}

private static UserType parseTypeAndNotation(String type, String defaultNotation, boolean allowOverrideNotation) {
var resultNotation = defaultNotation;
var typeNotation = defaultNotation;

var posColon = type.contains(NOTATION_SEPARATOR) ? type.indexOf(NOTATION_SEPARATOR) : type.length();
var posOpenRound = type.contains(ROUND_BRACKET_OPEN) ? type.indexOf(ROUND_BRACKET_OPEN) : type.length();
var posOpenSquare = type.contains(SQUARE_BRACKET_OPEN) ? type.indexOf(SQUARE_BRACKET_OPEN) : type.length();

// Extract any explicit notation from the type
if (posColon < posOpenRound && posColon < posOpenSquare) {
typeNotation = type.substring(0, type.indexOf(NOTATION_SEPARATOR));
type = type.substring(type.indexOf(NOTATION_SEPARATOR) + 1);
private record DecomposedType(String notation, String datatype) {
}

if (allowOverrideNotation) {
resultNotation = typeNotation.toUpperCase();
}
}
private static UserType parseTypeAndNotation(String composedType, String defaultNotation) {
final var decomposedType = decompose(composedType, defaultNotation);
final var datatype = decomposedType.datatype();
final var notation = decomposedType.notation();

// List type
if (type.startsWith(SQUARE_BRACKET_OPEN)) {
if (!type.endsWith(SQUARE_BRACKET_CLOSE)) {
throw new KSMLParseException("Error in data type: " + type);
if (datatype.startsWith(SQUARE_BRACKET_OPEN)) {
if (!datatype.endsWith(SQUARE_BRACKET_CLOSE)) {
throw new KSMLParseException("List type not properly closed: " + datatype);
}
var valueType = parseTypeAndNotation(type.substring(1, type.length() - 1), resultNotation, false);
return new UserType(valueType.notation(), new ListType(valueType.dataType()));
// Parse the type in brackets separately as the type of list elements. Notation overrides are not allowed
// for list elements. If specified (eg. "[avro:SomeSchema]") then the value notation is ignored.
var valueType = parseTypeAndNotation(datatype.substring(1, datatype.length() - 1), notation);
// Return the parsed value type with the above parsed notation
return new UserType(notation, new ListType(valueType.dataType()));
}

// Tuple type
if (type.startsWith(ROUND_BRACKET_OPEN)) {
if (!type.endsWith(ROUND_BRACKET_CLOSE)) {
throw new KSMLParseException("Error in data type: " + type);
if (datatype.startsWith(ROUND_BRACKET_OPEN)) {
if (!datatype.endsWith(ROUND_BRACKET_CLOSE)) {
throw new KSMLParseException("Tuple type not properly closed: " + datatype);
}
var valueTypes = parseListOfTypesAndNotation(type.substring(1, type.length() - 1), resultNotation, false);
return new UserType(resultNotation, new UserTupleType(valueTypes));
var valueTypes = parseListOfTypesAndNotation(datatype.substring(1, datatype.length() - 1), notation);
return new UserType(notation, new UserTupleType(valueTypes));
}

// enum(literal1,literal2,...)
if (type.startsWith(ENUM_TYPE + ROUND_BRACKET_OPEN) && type.endsWith(ROUND_BRACKET_CLOSE)) {
var literals = type.substring(ENUM_TYPE.length() + 1, type.length() - 1);
return new UserType(resultNotation, new EnumType(parseListOfLiterals(literals)));
if (datatype.startsWith(ENUM_TYPE + ROUND_BRACKET_OPEN) && datatype.endsWith(ROUND_BRACKET_CLOSE)) {
var literals = datatype.substring(ENUM_TYPE.length() + 1, datatype.length() - 1);
return new UserType(notation, new EnumType(parseListOfLiterals(literals)));
}

// union(type1,type2,...)
if (type.startsWith(UNION_TYPE + ROUND_BRACKET_OPEN) && type.endsWith(ROUND_BRACKET_CLOSE)) {
type = type.substring(UNION_TYPE.length() + 1, type.length() - 1);
return new UserType(resultNotation, new UnionType(parseListOfTypesAndNotation(type, resultNotation, true)));
if (datatype.startsWith(UNION_TYPE + ROUND_BRACKET_OPEN) && datatype.endsWith(ROUND_BRACKET_CLOSE)) {
var unionSubtypes = datatype.substring(UNION_TYPE.length() + 1, datatype.length() - 1);
return new UserType(notation, new UnionType(parseListOfTypesAndNotation(unionSubtypes, notation)));
}

// windowed(type)
if (type.startsWith(WINDOWED_TYPE + ROUND_BRACKET_OPEN) && type.endsWith(ROUND_BRACKET_CLOSE)) {
type = type.substring(WINDOWED_TYPE.length() + 1, type.length() - 1);
return new UserType(resultNotation, new WindowedType(parseType(type)));
if (datatype.startsWith(WINDOWED_TYPE + ROUND_BRACKET_OPEN) && datatype.endsWith(ROUND_BRACKET_CLOSE)) {
var windowedType = datatype.substring(WINDOWED_TYPE.length() + 1, datatype.length() - 1);
return new UserType(notation, new WindowedType(parseType(windowedType)));
}

// AVRO with schema
if (typeNotation.equalsIgnoreCase(AvroNotation.NOTATION_NAME)) {
var schema = SchemaLibrary.getSchema(AvroNotation.NOTATION_NAME, type, false);
if (notation.equalsIgnoreCase(AvroNotation.NOTATION_NAME)) {
var schema = SchemaLibrary.getSchema(AvroNotation.NOTATION_NAME, datatype, false);
if (!(schema instanceof StructSchema structSchema))
throw new KSMLParseException("Schema definition is not a STRUCT: " + type);
throw new KSMLParseException("AVRO schema is not a STRUCT: " + datatype);
return new UserType(AvroNotation.NOTATION_NAME, new StructType(structSchema));
}

// AVRO without schema
if (type.equalsIgnoreCase(AvroNotation.NOTATION_NAME)) {
if (datatype.equalsIgnoreCase(AvroNotation.NOTATION_NAME)) {
return new UserType(AvroNotation.NOTATION_NAME, AvroNotation.DEFAULT_TYPE);
}

// CSV with schema
if (typeNotation.equalsIgnoreCase(CsvNotation.NOTATION_NAME)) {
var schema = SchemaLibrary.getSchema(CsvNotation.NOTATION_NAME, type, false);
if (notation.equalsIgnoreCase(CsvNotation.NOTATION_NAME)) {
var schema = SchemaLibrary.getSchema(CsvNotation.NOTATION_NAME, datatype, false);
if (!(schema instanceof StructSchema structSchema))
throw new KSMLParseException("Schema definition is not a STRUCT: " + type);
throw new KSMLParseException("CSV schema is not a STRUCT: " + datatype);
return new UserType(CsvNotation.NOTATION_NAME, new StructType(structSchema));
}

// CSV without schema
if (type.equalsIgnoreCase(CsvNotation.NOTATION_NAME)) {
if (datatype.equalsIgnoreCase(CsvNotation.NOTATION_NAME)) {
return new UserType(CsvNotation.NOTATION_NAME, CsvNotation.DEFAULT_TYPE);
}

// JSON with schema
if (typeNotation.equalsIgnoreCase(JsonNotation.NOTATION_NAME)) {
var schema = SchemaLibrary.getSchema(JsonNotation.NOTATION_NAME, type, false);
if (notation.equalsIgnoreCase(JsonNotation.NOTATION_NAME)) {
var schema = SchemaLibrary.getSchema(JsonNotation.NOTATION_NAME, datatype, false);
if (!(schema instanceof StructSchema structSchema))
throw new KSMLParseException("Schema definition is not a STRUCT: " + type);
throw new KSMLParseException("JSON schema is not a STRUCT: " + datatype);
return new UserType(JsonNotation.NOTATION_NAME, new StructType(structSchema));
}

// JSON without schema
if (type.equalsIgnoreCase(JsonNotation.NOTATION_NAME)) {
if (datatype.equalsIgnoreCase(JsonNotation.NOTATION_NAME)) {
return new UserType(JsonNotation.NOTATION_NAME, JsonNotation.DEFAULT_TYPE);
}

// SOAP without schema
if (type.equalsIgnoreCase(SOAPNotation.NOTATION_NAME)) {
// SOAP with schema (not implemented yet)
if (notation.equalsIgnoreCase(SOAPNotation.NOTATION_NAME)) {
return new UserType(SOAPNotation.NOTATION_NAME, SOAPNotation.DEFAULT_TYPE);
}

// SOAP with schema (not implemented yet)
if (typeNotation.equalsIgnoreCase(SOAPNotation.NOTATION_NAME)) {
// SOAP without schema
if (datatype.equalsIgnoreCase(SOAPNotation.NOTATION_NAME)) {
return new UserType(SOAPNotation.NOTATION_NAME, SOAPNotation.DEFAULT_TYPE);
}

// XML with schema
if (notation.equalsIgnoreCase(XmlNotation.NOTATION_NAME)) {
var schema = SchemaLibrary.getSchema(XmlNotation.NOTATION_NAME, datatype, false);
if (!(schema instanceof StructSchema structSchema))
throw new KSMLParseException("XML schema is not a STRUCT: " + datatype);
return new UserType(XmlNotation.NOTATION_NAME, new StructType(structSchema));
}

// XML without schema
if (type.equalsIgnoreCase(XmlNotation.NOTATION_NAME)) {
if (datatype.equalsIgnoreCase(XmlNotation.NOTATION_NAME)) {
return new UserType(XmlNotation.NOTATION_NAME, XmlNotation.DEFAULT_TYPE);
}

// XML with schema
if (typeNotation.equalsIgnoreCase(XmlNotation.NOTATION_NAME)) {
var schema = SchemaLibrary.getSchema(XmlNotation.NOTATION_NAME, type, false);
if (!(schema instanceof StructSchema structSchema))
throw new KSMLParseException("Schema definition is not a STRUCT: " + type);
return new UserType(XmlNotation.NOTATION_NAME, new StructType(structSchema));
// Parse the type as an in-built primary data type and return it
return new UserType(notation, parseType(datatype));
}

// This method decomposes a user type into its components. User types are always of the form "notation:datatype".
private static DecomposedType decompose(String composedType, String defaultNotation) {
var posColon = composedType.contains(NOTATION_SEPARATOR) ? composedType.indexOf(NOTATION_SEPARATOR) : composedType.length();
var posOpenRound = composedType.contains(ROUND_BRACKET_OPEN) ? composedType.indexOf(ROUND_BRACKET_OPEN) : composedType.length();
var posOpenSquare = composedType.contains(SQUARE_BRACKET_OPEN) ? composedType.indexOf(SQUARE_BRACKET_OPEN) : composedType.length();

// Check if the user type contains a colon (user type is of the form "notation:datatype") AND that this colon
// appears before any brackets (ie. ignore tuples, lists and other types that use brackets). If so then parse
// it as a user type with explicit notation.
if (posColon < posOpenRound && posColon < posOpenSquare) {
final var parsedNotation = composedType.substring(0, composedType.indexOf(NOTATION_SEPARATOR));
final var parsedType = composedType.substring(composedType.indexOf(NOTATION_SEPARATOR) + 1);

// Return the parsed notation and datatype
return new DecomposedType(parsedNotation.toUpperCase(), parsedType);
}

return new UserType(resultNotation, parseType(type));
// Return the whole type string to be processed further, along with default notation
return new DecomposedType(defaultNotation, composedType);
}

private static String[] parseListOfLiterals(String literals) {
Expand Down

0 comments on commit dd0badf

Please sign in to comment.