diff --git a/settings.gradle b/settings.gradle index 73deae276..58343d981 100644 --- a/settings.gradle +++ b/settings.gradle @@ -27,16 +27,17 @@ include 'sylph-connectors:sylph-mysql' include 'sylph-connectors:sylph-hdfs' include 'sylph-connectors:sylph-kafka08' include 'sylph-connectors:sylph-kafka09' -include 'sylph-connectors:sylph-hbase' +//include 'sylph-connectors:sylph-hbase' include 'sylph-connectors:sylph-elasticsearch6' include 'sylph-connectors:sylph-elasticsearch5' include 'sylph-connectors:sylph-clickhouse' +include 'sylph-connectors:spark-kafka' +include 'sylph-connectors:sylph-kudu' + //---- include 'sylph-dist' include 'sylph-parser' include 'sylph-docs' include 'sylph-yarn' -//include 'sylph-clickhouse' -//include 'sylph-elasticsearch5' - +include 'sylph-base-kafka' diff --git a/sylph-base-kafka/build.gradle b/sylph-base-kafka/build.gradle new file mode 100644 index 000000000..5920a14ab --- /dev/null +++ b/sylph-base-kafka/build.gradle @@ -0,0 +1,17 @@ +configurations.all { + resolutionStrategy { preferProjectModules() } +} + +dependencies { + compileOnly project(':sylph-runners:sylph-runner-flink') + + compileOnly project(":sylph-etl-api") + + compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) { + exclude(module: 'flink-shaded-hadoop2') + } + + compile (group: 'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version: deps.flink){ + exclude(module: 'kafka-clients') + } +} diff --git a/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java b/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java similarity index 58% rename from sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java rename to sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java index a1a625971..3223e6a84 100644 --- a/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java +++ b/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java @@ -17,20 +17,16 @@ import ideal.sylph.etl.Schema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.types.Row; import java.io.IOException; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.List; import java.util.Map; +import static ideal.sylph.runner.flink.actuator.StreamSqlUtil.schemaToRowTypeInfo; + public class JsonSchema implements KeyedDeserializationSchema { @@ -42,36 +38,6 @@ public JsonSchema(Schema schema) this.rowTypeInfo = schemaToRowTypeInfo(schema); } - public static RowTypeInfo schemaToRowTypeInfo(Schema schema) - { - TypeInformation[] types = schema.getFieldTypes().stream().map(JsonSchema::getFlinkType) - .toArray(TypeInformation[]::new); - String[] names = schema.getFieldNames().toArray(new String[0]); - return new RowTypeInfo(types, names); - } - - private static TypeInformation getFlinkType(Type type) - { - if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType() == Map.class) { - Type[] arguments = ((ParameterizedType) type).getActualTypeArguments(); - Type valueType = arguments[1]; - TypeInformation valueInfo = getFlinkType(valueType); - return new MapTypeInfo<>(TypeExtractor.createTypeInfo(arguments[0]), valueInfo); - } - else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType() == List.class) { - TypeInformation typeInformation = getFlinkType(((ParameterizedType) type).getActualTypeArguments()[0]); - if (typeInformation.isBasicType() && typeInformation != Types.STRING) { - return Types.PRIMITIVE_ARRAY(typeInformation); - } - else { - return Types.OBJECT_ARRAY(typeInformation); - } - } - else { - return TypeExtractor.createTypeInfo(type); - } - } - @Override public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException diff --git a/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaBaseSource.java b/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaBaseSource.java new file mode 100644 index 000000000..19db1b763 --- /dev/null +++ b/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaBaseSource.java @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.kafka.flink; + +import ideal.sylph.etl.SourceContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.types.Row; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +public abstract class KafkaBaseSource +{ + private static final long serialVersionUID = 2L; + private static final String[] KAFKA_COLUMNS = new String[] {"_topic", "_key", "_message", "_partition", "_offset"}; + + public abstract FlinkKafkaConsumerBase getKafkaConsumerBase(List topicSets, + KeyedDeserializationSchema deserializationSchema, Properties properties); + + /** + * 初始化(driver阶段执行) + **/ + public DataStream createSource(StreamExecutionEnvironment execEnv, KafkaSourceConfig config, SourceContext context) + { + requireNonNull(execEnv, "execEnv is null"); + requireNonNull(config, "config is null"); + String topics = config.getTopics(); + String groupId = config.getGroupid(); //消费者的名字 + String offsetMode = config.getOffsetMode(); //latest earliest + + Properties properties = new Properties(); + properties.put("bootstrap.servers", config.getBrokers()); //需要把集群的host 配置到程序所在机器 + //"enable.auto.commit" -> (false: java.lang.Boolean), //不自动提交偏移量 + // "session.timeout.ms" -> "30000", //session默认是30秒 超过5秒不提交offect就会报错 + // "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期 + properties.put("group.id", groupId); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误 + properties.put("auto.offset.reset", offsetMode); //latest earliest + + KeyedDeserializationSchema deserializationSchema = "json".equals(config.getValueType()) ? + new JsonSchema(context.getSchema()) : new RowDeserializer(); + + List topicSets = Arrays.asList(topics.split(",")); + //org.apache.flink.streaming.api.checkpoint.CheckpointedFunction + FlinkKafkaConsumerBase base = getKafkaConsumerBase(topicSets, deserializationSchema, properties); + return execEnv.addSource(base); + } + + private static class RowDeserializer + implements KeyedDeserializationSchema + { + @Override + public boolean isEndOfStream(Row nextElement) + { + return false; + } + + @Override + public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) + { + return Row.of( + topic, //topic + messageKey == null ? null : new String(messageKey, UTF_8), //key + new String(message, UTF_8), //message + partition, + offset + ); + } + + @Override + public TypeInformation getProducedType() + { + TypeInformation[] types = new TypeInformation[] { + TypeExtractor.createTypeInfo(String.class), + TypeExtractor.createTypeInfo(String.class), //createTypeInformation[String] + TypeExtractor.createTypeInfo(String.class), + Types.INT, + Types.LONG + }; + return new RowTypeInfo(types, KAFKA_COLUMNS); + } + } +} diff --git a/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/KafkaSourceConfig08.java b/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSourceConfig.java similarity index 88% rename from sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/KafkaSourceConfig08.java rename to sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSourceConfig.java index 4dda4459f..9f9304ac4 100644 --- a/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/KafkaSourceConfig08.java +++ b/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSourceConfig.java @@ -13,13 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ideal.sylph.plugins.kafka; +package ideal.sylph.plugins.kafka.flink; import ideal.sylph.annotation.Description; import ideal.sylph.annotation.Name; import ideal.sylph.etl.PluginConfig; -public class KafkaSourceConfig08 +public class KafkaSourceConfig extends PluginConfig { private static final long serialVersionUID = 2L; @@ -41,8 +41,8 @@ public class KafkaSourceConfig08 private String offsetMode = "latest"; @Name("zookeeper.connect") - @Description("this is kafka zk list") - private String zookeeper = "localhost:2181"; + @Description("this is kafka zk list, kafka08 and kafka09 Must need to set") + private String zookeeper = null; //"localhost:2181" @Name("value_type") @Description("this is kafka String value Type, use json") @@ -58,11 +58,6 @@ public String getBrokers() return brokers; } - public String getZookeeper() - { - return zookeeper; - } - public String getGroupid() { return groupid; @@ -73,10 +68,15 @@ public String getOffsetMode() return offsetMode; } + public String getZookeeper() + { + return zookeeper; + } + public String getValueType() { return valueType; } - private KafkaSourceConfig08() {} + private KafkaSourceConfig() {} } diff --git a/sylph-connectors/spark-kafka/build.gradle b/sylph-connectors/spark-kafka/build.gradle new file mode 100644 index 000000000..3813f0d9c --- /dev/null +++ b/sylph-connectors/spark-kafka/build.gradle @@ -0,0 +1,32 @@ +apply plugin: 'scala' + +dependencies { + //--------------------------------------------------spark---------------------------------------------------- + compileOnly(group: 'org.apache.spark', name: 'spark-sql_2.11', version: deps.spark) { + exclude(module: 'spark-core_2.11') + } + compileOnly(group: 'org.apache.spark', name: 'spark-streaming_2.11', version: deps.spark) { + exclude(module: 'spark-core_2.11') + } + compileOnly(group: 'org.apache.spark', name: 'spark-core_2.11', version: deps.spark) { + exclude(module: 'hadoop-client') + } + compileOnly group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3' + + /** + * spark 结构化流 kafka专用 + * */ + compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.11', version: deps.spark + + /** + * spark streaming kafka 依赖 + * */ + compile(group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: deps.spark) { + exclude(group: 'org.spark-project.spark') + exclude(group: 'org.scala-lang') + exclude(module: 'spark-tags_2.11') + exclude(module: 'slf4j-log4j12') + exclude(module: 'slf4j-api') + exclude(module: 'snappy-java') + } +} diff --git a/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/KafkaSourceConfig.java b/sylph-connectors/spark-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/KafkaSourceConfig.java similarity index 97% rename from sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/KafkaSourceConfig.java rename to sylph-connectors/spark-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/KafkaSourceConfig.java index 4124eb763..f96d616d3 100644 --- a/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/KafkaSourceConfig.java +++ b/sylph-connectors/spark-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/KafkaSourceConfig.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ideal.sylph.plugins.kafka; +package ideal.sylph.plugins.kafka.spark; import ideal.sylph.annotation.Description; import ideal.sylph.annotation.Name; diff --git a/sylph-connectors/sylph-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/MyKafkaSource.scala b/sylph-connectors/spark-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/MyKafkaSource.scala similarity index 97% rename from sylph-connectors/sylph-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/MyKafkaSource.scala rename to sylph-connectors/spark-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/MyKafkaSource.scala index 3f4e5648a..5543c013e 100644 --- a/sylph-connectors/sylph-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/MyKafkaSource.scala +++ b/sylph-connectors/spark-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/MyKafkaSource.scala @@ -16,9 +16,7 @@ package ideal.sylph.plugins.kafka.spark import ideal.sylph.annotation.{Description, Name, Version} -import ideal.sylph.etl.PluginConfig import ideal.sylph.etl.api.Source -import ideal.sylph.plugins.kafka.KafkaSourceConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema diff --git a/sylph-connectors/sylph-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/SocketSource.scala b/sylph-connectors/spark-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/SocketSource.scala similarity index 96% rename from sylph-connectors/sylph-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/SocketSource.scala rename to sylph-connectors/spark-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/SocketSource.scala index af5cb448a..1e20241c8 100644 --- a/sylph-connectors/sylph-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/SocketSource.scala +++ b/sylph-connectors/spark-kafka/src/main/scala/ideal/sylph/plugins/kafka/spark/SocketSource.scala @@ -19,8 +19,7 @@ import java.util.Objects import ideal.sylph.annotation.{Description, Name, Version} import ideal.sylph.etl.PluginConfig -import ideal.sylph.etl.api.{Sink, Source, TransForm} -import org.apache.spark.api.java.JavaRDD +import ideal.sylph.etl.api.Source import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.types.{StringType, StructField, StructType} diff --git a/sylph-connectors/sylph-kafka/build.gradle b/sylph-connectors/sylph-kafka/build.gradle index 53cddafe7..c0d936905 100644 --- a/sylph-connectors/sylph-kafka/build.gradle +++ b/sylph-connectors/sylph-kafka/build.gradle @@ -1,36 +1,11 @@ -apply plugin: 'scala' - dependencies { compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) { exclude(module: 'flink-shaded-hadoop2') } compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: deps.flink - //--------------------------------------------------spark---------------------------------------------------- - compileOnly(group: 'org.apache.spark', name: 'spark-sql_2.11', version: deps.spark) { - exclude(module: 'spark-core_2.11') - } - compileOnly(group: 'org.apache.spark', name: 'spark-streaming_2.11', version: deps.spark) { - exclude(module: 'spark-core_2.11') - } - compileOnly(group: 'org.apache.spark', name: 'spark-core_2.11', version: deps.spark) { - exclude(module: 'hadoop-client') - } - compileOnly group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3' - - /** - * spark 结构化流 kafka专用 - * */ - compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.11', version: deps.spark + + compile project(':sylph-base-kafka') - /** - * spark streaming kafka 老流依赖 - * */ - compile(group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: deps.spark) { - exclude(group: 'org.spark-project.spark') - exclude(group: 'org.scala-lang') - exclude(module: 'spark-tags_2.11') - exclude(module: 'slf4j-log4j12') - exclude(module: 'slf4j-api') - exclude(module: 'snappy-java') - } + testCompile project(':sylph-runners:sylph-runner-flink') + testCompile project(':sylph-spi') } \ No newline at end of file diff --git a/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource.java b/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource.java index e50361a0d..c7c9f7659 100644 --- a/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource.java +++ b/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource.java @@ -20,35 +20,28 @@ import ideal.sylph.annotation.Version; import ideal.sylph.etl.SourceContext; import ideal.sylph.etl.api.Source; -import ideal.sylph.plugins.kafka.KafkaSourceConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.shaded.guava18.com.google.common.base.Supplier; import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.types.Row; -import java.util.Arrays; import java.util.List; import java.util.Properties; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; @Name(value = "kafka") @Version("1.0.0") @Description("this flink kafka source inputStream") public class KafkaSource + extends KafkaBaseSource implements Source> { private static final long serialVersionUID = 2L; - private static final String[] KAFKA_COLUMNS = new String[] {"_topic", "_key", "_message", "_partition", "_offset"}; - private final transient Supplier> loadStream; /** @@ -58,71 +51,21 @@ public KafkaSource(StreamExecutionEnvironment execEnv, KafkaSourceConfig config, { requireNonNull(execEnv, "execEnv is null"); requireNonNull(config, "config is null"); - loadStream = Suppliers.memoize(() -> { - String topics = config.getTopics(); - String groupId = config.getGroupid(); //消费者的名字 - String offsetMode = config.getOffsetMode(); //latest earliest - - Properties properties = new Properties(); - properties.put("bootstrap.servers", config.getBrokers()); //需要把集群的host 配置到程序所在机器 - //"enable.auto.commit" -> (false: java.lang.Boolean), //不自动提交偏移量 - // "session.timeout.ms" -> "30000", //session默认是30秒 超过5秒不提交offect就会报错 - // "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期 - properties.put("group.id", groupId); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误 - properties.put("auto.offset.reset", offsetMode); //latest earliest - - KeyedDeserializationSchema deserializationSchema = "json".equals(config.getValueType()) ? - new JsonSchema(context.getSchema()) : new RowDeserializer(); - - List topicSets = Arrays.asList(topics.split(",")); - //org.apache.flink.streaming.api.checkpoint.CheckpointedFunction - DataStream stream = execEnv.addSource(new FlinkKafkaConsumer010( - topicSets, - deserializationSchema, - properties) - ); - return stream; - }); + loadStream = Suppliers.memoize(() -> this.createSource(execEnv, config, context)); } @Override - public DataStream getSource() + public FlinkKafkaConsumerBase getKafkaConsumerBase(List topicSets, KeyedDeserializationSchema deserializationSchema, Properties properties) { - return loadStream.get(); + return new FlinkKafkaConsumer010<>( + topicSets, + deserializationSchema, + properties); } - private static class RowDeserializer - implements KeyedDeserializationSchema + @Override + public DataStream getSource() { - @Override - public boolean isEndOfStream(Row nextElement) - { - return false; - } - - @Override - public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) - { - return Row.of( - topic, //topic - messageKey == null ? null : new String(messageKey, UTF_8), //key - new String(message, UTF_8), //message - partition, - offset - ); - } - - @Override - public TypeInformation getProducedType() - { - TypeInformation[] types = new TypeInformation[] { - TypeExtractor.createTypeInfo(String.class), - TypeExtractor.createTypeInfo(String.class), //createTypeInformation[String] - TypeExtractor.createTypeInfo(String.class), - Types.INT, - Types.LONG - }; - return new RowTypeInfo(types, KAFKA_COLUMNS); - } + return loadStream.get(); } } diff --git a/sylph-connectors/sylph-kafka/src/test/java/ideal/sylph/plugins/kafka/flink/KafkaSourceTest.java b/sylph-connectors/sylph-kafka/src/test/java/ideal/sylph/plugins/kafka/flink/KafkaSourceTest.java new file mode 100644 index 000000000..4a8ce90fe --- /dev/null +++ b/sylph-connectors/sylph-kafka/src/test/java/ideal/sylph/plugins/kafka/flink/KafkaSourceTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.kafka.flink; + +import ideal.sylph.parser.antlr.AntlrSqlParser; +import ideal.sylph.runner.flink.actuator.StreamSqlBuilder; +import ideal.sylph.spi.model.PipelinePluginManager; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.junit.Assert; +import org.junit.Test; + +public class KafkaSourceTest +{ + private static final AntlrSqlParser sqlParser = new AntlrSqlParser(); + + public static StreamTableEnvironment getTableEnv() + { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createLocalEnvironment(); + execEnv.setParallelism(2); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnv); + return tableEnv; + } + + @Test + public void createSource() + throws Exception + { + StreamTableEnvironment tableEnv = getTableEnv(); + String sql = "create input table tb1(\n" + + " _topic varchar,\n" + + " _message varchar\n" + + ") with (\n" + + " type = '" + KafkaSource.class.getName() + "',\n" + + " kafka_topic = 'N603_A_1,N603_A_2,N603_A_3,N603_A_4,N603_A_5,N603_A_7',\n" + + " \"auto.offset.reset\" = latest,\n" + + " kafka_broker = 'localhost:9092',\n" + + " kafka_group_id = 'streamload1'\n" + + ")"; + + StreamSqlBuilder streamSqlBuilder = new StreamSqlBuilder(tableEnv, PipelinePluginManager.getDefault(), sqlParser); + streamSqlBuilder.buildStreamBySql(sql); + + Table kafka = tableEnv.sqlQuery("select * from tb1"); + tableEnv.toAppendStream(kafka, Row.class).print(); + + Assert.assertNotNull(tableEnv.execEnv().getStreamGraph().getJobGraph()); + } +} \ No newline at end of file diff --git a/sylph-connectors/sylph-kafka08/build.gradle b/sylph-connectors/sylph-kafka08/build.gradle index 1345186ee..b9ea8ec87 100644 --- a/sylph-connectors/sylph-kafka08/build.gradle +++ b/sylph-connectors/sylph-kafka08/build.gradle @@ -1,5 +1,7 @@ dependencies { + compile project(':sylph-base-kafka') + compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) { exclude(module: 'flink-shaded-hadoop2') } diff --git a/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java b/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java deleted file mode 100644 index a1a625971..000000000 --- a/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright (C) 2018 The Sylph Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ideal.sylph.plugins.kafka.flink; - -import ideal.sylph.etl.Schema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.types.Row; - -import java.io.IOException; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.List; -import java.util.Map; - -public class JsonSchema - implements KeyedDeserializationSchema -{ - private static final ObjectMapper MAPPER = new ObjectMapper(); - private final RowTypeInfo rowTypeInfo; - - public JsonSchema(Schema schema) - { - this.rowTypeInfo = schemaToRowTypeInfo(schema); - } - - public static RowTypeInfo schemaToRowTypeInfo(Schema schema) - { - TypeInformation[] types = schema.getFieldTypes().stream().map(JsonSchema::getFlinkType) - .toArray(TypeInformation[]::new); - String[] names = schema.getFieldNames().toArray(new String[0]); - return new RowTypeInfo(types, names); - } - - private static TypeInformation getFlinkType(Type type) - { - if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType() == Map.class) { - Type[] arguments = ((ParameterizedType) type).getActualTypeArguments(); - Type valueType = arguments[1]; - TypeInformation valueInfo = getFlinkType(valueType); - return new MapTypeInfo<>(TypeExtractor.createTypeInfo(arguments[0]), valueInfo); - } - else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType() == List.class) { - TypeInformation typeInformation = getFlinkType(((ParameterizedType) type).getActualTypeArguments()[0]); - if (typeInformation.isBasicType() && typeInformation != Types.STRING) { - return Types.PRIMITIVE_ARRAY(typeInformation); - } - else { - return Types.OBJECT_ARRAY(typeInformation); - } - } - else { - return TypeExtractor.createTypeInfo(type); - } - } - - @Override - public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) - throws IOException - { - @SuppressWarnings("unchecked") - Map map = MAPPER.readValue(message, Map.class); - String[] names = rowTypeInfo.getFieldNames(); - Row row = new Row(names.length); - for (int i = 0; i < names.length; i++) { - Object value = map.get(names[i]); - Class aClass = rowTypeInfo.getTypeAt(i).getTypeClass(); - if (aClass.isArray()) { - row.setField(i, MAPPER.convertValue(value, aClass)); - } - else { - row.setField(i, value); - } - } - return row; - } - - @Override - public boolean isEndOfStream(Row nextElement) - { - return false; - } - - @Override - public TypeInformation getProducedType() - { - return rowTypeInfo; - } -} diff --git a/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource08.java b/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource08.java index 31de688c8..d67a87f48 100644 --- a/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource08.java +++ b/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource08.java @@ -20,69 +20,40 @@ import ideal.sylph.annotation.Version; import ideal.sylph.etl.SourceContext; import ideal.sylph.etl.api.Source; -import ideal.sylph.plugins.kafka.KafkaSourceConfig08; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.shaded.guava18.com.google.common.base.Supplier; import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.types.Row; -import java.util.Arrays; import java.util.List; import java.util.Properties; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; @Name(value = "kafka08") @Version("1.0.0") @Description("this flink kafka0.8 source inputStream") public class KafkaSource08 + extends KafkaBaseSource implements Source> { private static final long serialVersionUID = 2L; - private static final String[] KAFKA_COLUMNS = new String[] {"_topic", "_key", "_message", "_partition", "_offset"}; - private final transient Supplier> loadStream; + private final KafkaSourceConfig config; /** * 初始化(driver阶段执行) **/ - public KafkaSource08(StreamExecutionEnvironment execEnv, KafkaSourceConfig08 config, SourceContext context) + public KafkaSource08(StreamExecutionEnvironment execEnv, KafkaSourceConfig config, SourceContext context) { requireNonNull(execEnv, "execEnv is null"); requireNonNull(config, "config is null"); - loadStream = Suppliers.memoize(() -> { - String topics = config.getTopics(); - String groupId = config.getGroupid(); //消费者的名字 - String offsetMode = config.getOffsetMode(); //latest earliest - - Properties properties = new Properties(); - properties.put("bootstrap.servers", config.getBrokers()); //需要把集群的host 配置到程序所在机器 - properties.put("zookeeper.connect", config.getZookeeper()); - //"enable.auto.commit" -> (false: java.lang.Boolean), //不自动提交偏移量 - // "session.timeout.ms" -> "30000", //session默认是30秒 超过5秒不提交offect就会报错 - // "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期 - properties.put("group.id", groupId); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误 - properties.put("auto.offset.reset", offsetMode); //latest earliest - - KeyedDeserializationSchema deserializationSchema = "json".equals(config.getValueType()) ? - new JsonSchema(context.getSchema()) : new RowDeserializer(); - - List topicSets = Arrays.asList(topics.split(",")); - //org.apache.flink.streaming.api.checkpoint.CheckpointedFunction - return execEnv.addSource(new FlinkKafkaConsumer08( - topicSets, - deserializationSchema, - properties) - ); - }); + this.config = config; + loadStream = Suppliers.memoize(() -> this.createSource(execEnv, config, context)); } @Override @@ -91,38 +62,11 @@ public DataStream getSource() return loadStream.get(); } - private static class RowDeserializer - implements KeyedDeserializationSchema + @Override + public FlinkKafkaConsumerBase getKafkaConsumerBase(List topicSets, KeyedDeserializationSchema deserializationSchema, Properties properties) { - @Override - public boolean isEndOfStream(Row nextElement) - { - return false; - } - - @Override - public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) - { - return Row.of( - topic, //topic - messageKey == null ? null : new String(messageKey, UTF_8), //key - new String(message, UTF_8), //message - partition, - offset - ); - } - - @Override - public TypeInformation getProducedType() - { - TypeInformation[] types = new TypeInformation[] { - TypeExtractor.createTypeInfo(String.class), - TypeExtractor.createTypeInfo(String.class), //createTypeInformation[String] - TypeExtractor.createTypeInfo(String.class), - Types.INT, - Types.LONG - }; - return new RowTypeInfo(types, KAFKA_COLUMNS); - } + //kafka08 kafka09 需要设置 zk + properties.put("zookeeper.connect", config.getZookeeper()); + return new FlinkKafkaConsumer08<>(topicSets, deserializationSchema, properties); } } diff --git a/sylph-connectors/sylph-kafka09/build.gradle b/sylph-connectors/sylph-kafka09/build.gradle index b7eac4a21..f03d93607 100644 --- a/sylph-connectors/sylph-kafka09/build.gradle +++ b/sylph-connectors/sylph-kafka09/build.gradle @@ -1,4 +1,6 @@ dependencies { + compile project(':sylph-base-kafka') + compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) { exclude(module: 'flink-shaded-hadoop2') } diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource09.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource09.java index 5b3d1dfcc..b03c60ac5 100644 --- a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource09.java +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource09.java @@ -18,133 +18,56 @@ import ideal.sylph.annotation.Description; import ideal.sylph.annotation.Name; import ideal.sylph.annotation.Version; -import ideal.sylph.etl.PluginConfig; +import ideal.sylph.etl.SourceContext; import ideal.sylph.etl.api.Source; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.calcite.shaded.com.google.common.base.Supplier; -import org.apache.flink.calcite.shaded.com.google.common.base.Suppliers; +import org.apache.flink.shaded.guava18.com.google.common.base.Supplier; +import org.apache.flink.shaded.guava18.com.google.common.base.Suppliers; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; -import java.util.Arrays; import java.util.List; import java.util.Properties; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; @Name(value = "kafka09") @Version("1.0.0") @Description("this flink kafka source inputStream") public class KafkaSource09 + extends KafkaBaseSource implements Source> { private static final long serialVersionUID = 2L; - private static final String[] KAFKA_COLUMNS = new String[] {"_topic", "_key", "_message", "_partition", "_offset"}; + private final KafkaSourceConfig config; private final transient Supplier> loadStream; /** * 初始化(driver执行) **/ - public KafkaSource09(StreamTableEnvironment tableEnv, KafkaSource09Config config) + public KafkaSource09(StreamExecutionEnvironment execEnv, KafkaSourceConfig config, SourceContext context) { - requireNonNull(tableEnv, "tableEnv is null"); + requireNonNull(execEnv, "execEnv is null"); requireNonNull(config, "config is null"); - loadStream = Suppliers.memoize(() -> { - String topics = config.topics; - - Properties properties = new Properties(); - properties.put("bootstrap.servers", config.brokers); //需要注意hosts问题 - //"enable.auto.commit" -> (false: java.lang.Boolean), //不自动提交偏移量 - // "session.timeout.ms" -> "30000", //session默认是30秒 超过5秒不提交offect就会报错 - // "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期 - properties.put("group.id", config.groupid); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误 - properties.put("auto.offset.reset", config.offsetMode); //latest earliest - properties.put("zookeeper.connect", config.zookeeper); - - List topicSets = Arrays.asList(topics.split(",")); - //org.apache.flink.streaming.api.checkpoint.CheckpointedFunction - DataStream stream = tableEnv.execEnv().addSource(new FlinkKafkaConsumer09( - topicSets, - new RowDeserializer(), - properties) - ); - return stream; - }); + this.config = config; + loadStream = Suppliers.memoize(() -> this.createSource(execEnv, config, context)); } @Override - public DataStream getSource() + public FlinkKafkaConsumerBase getKafkaConsumerBase(List topicSets, KeyedDeserializationSchema deserializationSchema, Properties properties) { - return loadStream.get(); - } - - private static class RowDeserializer - implements KeyedDeserializationSchema - { - @Override - public boolean isEndOfStream(Row nextElement) - { - return false; - } - - @Override - public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) - { - return Row.of( - topic, //topic - messageKey == null ? null : new String(messageKey, UTF_8), //key - new String(message, UTF_8), //message - partition, - offset - ); - } - - public TypeInformation getProducedType() - { - TypeInformation[] types = new TypeInformation[] { - TypeExtractor.createTypeInfo(String.class), - TypeExtractor.createTypeInfo(String.class), //createTypeInformation[String] - TypeExtractor.createTypeInfo(String.class), - Types.INT, - Types.LONG - }; - return new RowTypeInfo(types, KAFKA_COLUMNS); - } + //kafka08 kafka09 需要设置 zk + properties.put("zookeeper.connect", config.getZookeeper()); + return new FlinkKafkaConsumer09<>(topicSets, deserializationSchema, properties); } - public static class KafkaSource09Config - extends PluginConfig + @Override + public DataStream getSource() { - private static final long serialVersionUID = 2L; - - @Name("kafka_topic") - @Description("this is kafka topic list") - private String topics = "test1"; - - @Name("kafka_broker") - @Description("this is kafka broker list") - private String brokers = "localhost:9092"; - - @Name("zookeeper.connect") - @Description("this is kafka zk list") - private String zookeeper = "localhost:2181"; - - @Name("kafka_group_id") - @Description("this is kafka_group_id") - private String groupid = "sylph_streamSql_test1"; - - @Name("auto.offset.reset") - @Description("this is auto.offset.reset mode") - private String offsetMode = "latest"; - - private KafkaSource09Config() {} + return loadStream.get(); } } diff --git a/sylph-connectors/sylph-kudu/build.gradle b/sylph-connectors/sylph-kudu/build.gradle new file mode 100644 index 000000000..25168938e --- /dev/null +++ b/sylph-connectors/sylph-kudu/build.gradle @@ -0,0 +1,10 @@ + +dependencies { + compile 'org.apache.kudu:kudu-client:1.7.0' + + testCompile project(':sylph-runners:sylph-runner-flink') + testCompile project(':sylph-spi') + testCompile(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) { + exclude(module: 'flink-shaded-hadoop2') + } +} diff --git a/sylph-connectors/sylph-kudu/src/main/java/ideal/sylph/plugins/kudu/KuduSink.java b/sylph-connectors/sylph-kudu/src/main/java/ideal/sylph/plugins/kudu/KuduSink.java new file mode 100644 index 000000000..d0760c581 --- /dev/null +++ b/sylph-connectors/sylph-kudu/src/main/java/ideal/sylph/plugins/kudu/KuduSink.java @@ -0,0 +1,225 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.kudu; + +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.etl.PluginConfig; +import ideal.sylph.etl.Row; +import ideal.sylph.etl.SinkContext; +import ideal.sylph.etl.api.RealTimeSink; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Type; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.SessionConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.List; +import java.util.function.Supplier; + +import static com.github.harbby.gadtry.base.Throwables.throwsException; +import static java.util.Objects.requireNonNull; + +@Name("kudu") +@Description("this sylph kudu sink") +public class KuduSink + implements RealTimeSink +{ + private static final Logger logger = LoggerFactory.getLogger(KuduSink.class); + private final String tableName; + private final String kuduHost; + private final List fieldNames; + private final KuduSinkConfig kuduSinkConfig; + + private KuduClient kuduClient; + private KuduSession kuduSession; + private KuduTable kuduTable; + + private final int maxBatchSize; + private int rowNumCnt = 0; + + private Supplier operationCreater; + + public KuduSink(SinkContext context, KuduSinkConfig kuduSinkConfig) + { + this.kuduSinkConfig = kuduSinkConfig; + this.tableName = requireNonNull(kuduSinkConfig.tableName, "kudu.table is null"); + this.kuduHost = requireNonNull(kuduSinkConfig.hosts, "kudu.hosts is null"); + this.fieldNames = context.getSchema().getFieldNames(); + + this.maxBatchSize = (int) kuduSinkConfig.batchSize; + + //--check write mode + getOperationCreater(kuduSinkConfig.mode, null); + } + + private static Supplier getOperationCreater(String mode, KuduTable kuduTable) + { + //INSERT OR UPSET OR UPDATE OR DELETE + switch (mode.toUpperCase()) { + case "INSERT": + return () -> kuduTable.newInsert(); + case "UPSET": + return () -> kuduTable.newUpsert(); + case "UPDATE": + return () -> kuduTable.newUpdate(); + case "DELETE": + return () -> kuduTable.newDelete(); + default: + throw new IllegalArgumentException(); + } + } + + @Override + public boolean open(long partitionId, long version) + throws Exception + { + this.kuduClient = new KuduClient.KuduClientBuilder(kuduHost).build(); + this.kuduSession = kuduClient.newSession(); + this.kuduTable = kuduClient.openTable(tableName); + this.operationCreater = getOperationCreater(kuduSinkConfig.mode, kuduTable); + + kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); + //kuduSession.setFlushInterval(); + kuduSession.setMutationBufferSpace(1024 * 1024 * 8); //8m + return true; + } + + @Override + public void process(Row row) + { + Operation operation = operationCreater.get(); + try { + for (int i = 0; i < fieldNames.size(); i++) { + appendColumn(operation, fieldNames.get(i), row.getField(i)); + } + + kuduSession.apply(operation); + // submit batch + if (rowNumCnt++ > maxBatchSize) { + rowNumCnt = 0; + kuduSession.flush(); //真正落地 + } + } + catch (IOException e) { + throwsException(e); + } + } + + private void appendColumn(Operation operation, String name, Object value) + { + ColumnSchema columnSchema = kuduTable.getSchema().getColumn(name); + + if (value == null) { + operation.getRow().setNull(name); + return; + } + + Type kuduType = columnSchema.getType(); + switch (kuduType) { + case BINARY: + operation.getRow().addBinary(name, (byte[]) value); + break; + + case STRING: + operation.getRow().addString(name, String.valueOf(value)); + break; + case BOOL: + operation.getRow().addBoolean(name, (Boolean) value); + break; + + case INT8: + case INT16: + operation.getRow().addShort(name, (Short) value); + break; + + case INT32: + operation.getRow().addInt(name, (Integer) value); + break; + + case INT64: { + if (value instanceof Date) { + operation.getRow().addLong(name, ((Date) value).getTime()); + } + else if (value instanceof Time) { + operation.getRow().addLong(name, ((Time) value).getTime()); + } + else if (value instanceof Timestamp) { + operation.getRow().addLong(name, ((Timestamp) value).getTime()); + } + else { + operation.getRow().addLong(name, (Long) value); + } + break; + } + case DOUBLE: + operation.getRow().addDouble(name, (Double) value); + break; + case FLOAT: + operation.getRow().addFloat(name, (Float) value); + break; + + case DECIMAL: + operation.getRow().addDecimal(name, (BigDecimal) value); + break; + + default: + throw new IllegalStateException("不受支持的kudu类型:" + kuduType); + } + } + + @Override + public void close(Throwable errorOrNull) + { + try (KuduClient client = kuduClient) { + if (kuduSession != null) { + kuduSession.close(); + } + } + catch (IOException e) { + throwsException(e); + } + } + + public static class KuduSinkConfig + extends PluginConfig + { + @Name("kudu.hosts") + @Description("this is kudu cluster hosts, demo: slave01:7051,slave02:7051") + private String hosts; + + @Name("kudu.tableName") + @Description("this is kudu tableName") + private String tableName; + + @Name("kudu.mode") + @Description("this is kudu, INSERT OR UPSET OR UPDATE OR DELETE") + private String mode = "UPSET"; + + @Name("batchSize") + @Description("this is kudu write batchSize") + private long batchSize = 100; + } +} diff --git a/sylph-connectors/sylph-kudu/src/test/java/ideal/sylph/plugins/kudu/KuduSinkTest.java b/sylph-connectors/sylph-kudu/src/test/java/ideal/sylph/plugins/kudu/KuduSinkTest.java new file mode 100644 index 000000000..c250896c2 --- /dev/null +++ b/sylph-connectors/sylph-kudu/src/test/java/ideal/sylph/plugins/kudu/KuduSinkTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.plugins.kudu; + +import com.github.harbby.gadtry.ioc.IocFactory; +import ideal.sylph.etl.Schema; +import ideal.sylph.etl.SinkContext; +import ideal.sylph.parser.antlr.AntlrSqlParser; +import ideal.sylph.parser.antlr.tree.CreateTable; +import ideal.sylph.runner.flink.actuator.StreamSqlBuilder; +import ideal.sylph.runner.flink.etl.FlinkNodeLoader; +import ideal.sylph.spi.NodeLoader; +import ideal.sylph.spi.model.PipelinePluginManager; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +import static ideal.sylph.runner.flink.actuator.StreamSqlUtil.getTableSchema; + +public class KuduSinkTest +{ + private static final AntlrSqlParser sqlParser = new AntlrSqlParser(); + + private final String kuduSinkSql = "create output table kudu(\n" + + " key varchar,\n" + + " value varchar\n" + + ") with (\n" + + " type = '" + KuduSink.class.getName() + "',\n" + + " kudu.hosts = 'localhost:7051',\n" + + " kudu.tableName = 'impala::a1.a1',\n" + + " batchSize = 100\n" + + ")"; + + public static StreamTableEnvironment getTableEnv() + { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createLocalEnvironment(); + execEnv.setParallelism(2); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnv); + return tableEnv; + } + + @Test + public void createKuduSinkTest() + throws ClassNotFoundException + { + CreateTable createStream = (CreateTable) sqlParser.createStatement(kuduSinkSql); + final String tableName = createStream.getName(); + Schema schema = getTableSchema(createStream); + + final Map withConfig = createStream.getWithConfig(); + final String driverClass = (String) withConfig.get("type"); + + final IocFactory iocFactory = IocFactory.create(binder -> binder.bind(SinkContext.class, new SinkContext() + { + @Override + public Schema getSchema() + { + return schema; + } + + @Override + public String getSinkTable() + { + return tableName; + } + })); + NodeLoader> loader = new FlinkNodeLoader(PipelinePluginManager.getDefault(), iocFactory); + + KuduSink kuduSink = (KuduSink) loader.getPluginInstance(Class.forName(driverClass), withConfig); + Assert.assertTrue(kuduSink != null); + } + + @Test + public void createKuduSink() + throws Exception + { + StreamTableEnvironment tableEnv = getTableEnv(); + + StreamSqlBuilder streamSqlBuilder = new StreamSqlBuilder(tableEnv, PipelinePluginManager.getDefault(), sqlParser); + streamSqlBuilder.buildStreamBySql(kuduSinkSql); + + tableEnv.sqlUpdate("insert into kudu select 'key' as key, '' as `value`"); + Assert.assertNotNull(tableEnv.execEnv().getStreamGraph().getJobGraph()); + } +} \ No newline at end of file diff --git a/sylph-runners/flink/build.gradle b/sylph-runners/flink/build.gradle index 087104545..9feee10cd 100644 --- a/sylph-runners/flink/build.gradle +++ b/sylph-runners/flink/build.gradle @@ -31,10 +31,4 @@ dependencies { //colour compile group: 'org.fusesource.jansi', name: 'jansi', version: '1.17.1' - - testCompile project(':sylph-connectors:sylph-kafka') - testCompile project(':sylph-connectors:sylph-mysql') - testCompile project(':sylph-connectors:sylph-hdfs') - testCompile project(':sylph-connectors:sylph-hbase') - testCompile project(path: ':sylph-connectors:sylph-elasticsearch6', configuration: 'shadow') } \ No newline at end of file diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/etl/FlinkNodeLoader.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/etl/FlinkNodeLoader.java index 8f7940d00..a292b690d 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/etl/FlinkNodeLoader.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/etl/FlinkNodeLoader.java @@ -62,7 +62,7 @@ public UnaryOperator> loadSource(String driverStr, final Map driverClass = pluginManager.loadPluginDriver(driverStr, PipelinePlugin.PipelineType.source); checkState(Source.class.isAssignableFrom(driverClass), - "driverStr must is RealTimeSink.class or Sink.class"); + "The Source driver must is Source.class, But your " + driverClass); checkDataStreamRow(Source.class, driverClass); @SuppressWarnings("unchecked") final Source> source = (Source>) getPluginInstance(driverClass, config); @@ -98,7 +98,7 @@ public UnaryOperator> loadSink(String driverStr, final Map driverClass = pluginManager.loadPluginDriver(driverStr, PipelinePlugin.PipelineType.sink); checkState(RealTimeSink.class.isAssignableFrom(driverClass) || Sink.class.isAssignableFrom(driverClass), - "driverStr must is RealTimeSink.class or Sink.class"); + "The Sink driver must is RealTimeSink.class or Sink.class, But your " + driverClass); if (Sink.class.isAssignableFrom(driverClass)) { checkDataStreamRow(Sink.class, driverClass); } @@ -176,7 +176,7 @@ else if (driver instanceof TransForm) { private static Sink> loadRealTimeSink(RealTimeSink realTimeSink) { // or user stream.addSink(new FlinkSink(realTimeSink, stream.getType())); - return (Sink>) stream -> stream.addSink(new FlinkSink(realTimeSink, stream.getType())); + return (Sink>) stream -> stream.addSink(new FlinkSink(realTimeSink, stream.getType())).name(realTimeSink.getClass().getName()); } private static TransForm> loadRealTimeTransForm(RealTimeTransForm realTimeTransForm) diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/udf/UDFJson.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/udf/UDFJson.java index d27db5bae..976f659e9 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/udf/UDFJson.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/udf/UDFJson.java @@ -15,8 +15,9 @@ */ package ideal.sylph.runner.flink.udf; +import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.PathNotFoundException; +import com.jayway.jsonpath.Option; import com.jayway.jsonpath.ReadContext; import ideal.sylph.annotation.Name; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -31,7 +32,8 @@ public class UDFJson extends ScalarFunction { - private HashCache cache = new HashCache<>(); + private static final Configuration jsonConf = Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS); + private final HashCache cache = new HashCache<>(); /** * @return json string or null @@ -43,14 +45,8 @@ public String eval(String jsonString, String pathString) if (!pathString.startsWith("$")) { pathString = "$." + pathString; } - ReadContext context = cache.computeIfAbsent(jsonString, JsonPath::parse); - - Object value = null; - try { - value = context.read(pathString); - } - catch (PathNotFoundException ignored) { - } + ReadContext context = cache.computeIfAbsent(jsonString, key -> JsonPath.using(jsonConf).parse(jsonString)); + Object value = context.read(pathString); if (value == null) { return null; diff --git a/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/JoinTest.java b/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/JoinTest.java index fa1578148..8208eb03b 100644 --- a/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/JoinTest.java +++ b/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/JoinTest.java @@ -16,7 +16,6 @@ package ideal.sylph.runner.flink.sqlTest; import ideal.sylph.etl.Collector; -import ideal.sylph.etl.Row; import ideal.sylph.etl.Schema; import ideal.sylph.etl.api.RealTimeTransForm; import ideal.sylph.etl.join.JoinContext; @@ -24,19 +23,25 @@ import ideal.sylph.parser.antlr.tree.CreateTable; import ideal.sylph.runner.flink.sql.FlinkSqlParser; import ideal.sylph.runner.flink.sqlTest.utils.PrintTableSink; -import ideal.sylph.runner.flink.sqlTest.utils.TestTableSource; +import ideal.sylph.runner.flink.table.SylphTableSource; import ideal.sylph.runner.flink.udf.TimeUtil; import ideal.sylph.spi.model.PipelinePluginManager; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.Types; import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.sources.TableSource; +import org.apache.flink.types.Row; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; + import static com.google.common.base.Preconditions.checkState; /** @@ -60,9 +65,14 @@ public void init() tableEnv = TableEnvironment.getTableEnvironment(tableEnv.execEnv()); tableEnv.registerFunction("from_unixtime", new TimeUtil.FromUnixTime()); - TableSource tableSource = new TestTableSource(); - tableEnv.registerTableSource("tb1", tableSource); - tableEnv.registerTableSource("tb0", new TestTableSource()); + //---create stream source + TypeInformation[] fieldTypes = {Types.STRING(), Types.STRING(), Types.LONG()}; + String[] fieldNames = {"topic", "user_id", "time"}; + RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); + DataStream dataSource = execEnv.fromCollection(new ArrayList<>(), rowTypeInfo); + + tableEnv.registerTableSource("tb1", new SylphTableSource(rowTypeInfo, dataSource)); + tableEnv.registerTableSource("tb0", new SylphTableSource(rowTypeInfo, dataSource)); final AntlrSqlParser sqlParser = new AntlrSqlParser(); this.dimTable = (CreateTable) sqlParser.createStatement("create batch table users(id string, name string, city string) with(type = '" + JoinOperator.class.getName() + "')"); @@ -182,7 +192,7 @@ public JoinOperator(JoinContext context) } @Override - public void process(Row input, Collector collector) + public void process(ideal.sylph.etl.Row input, Collector collector) { } diff --git a/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/JsonPathUdfTest.java b/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/JsonPathUdfTest.java new file mode 100644 index 000000000..4bc9e0aad --- /dev/null +++ b/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/JsonPathUdfTest.java @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.runner.flink.sqlTest; + +import ideal.sylph.runner.flink.udf.UDFJson; +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class JsonPathUdfTest +{ + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ConcurrentMap result = new ConcurrentHashMap<>(); + + private StreamTableEnvironment tableEnv; + private Table table; + + @Before + public void init() + throws JsonProcessingException + { + String json = MAPPER.writeValueAsString(ImmutableMap.of("user_id", "uid_001", + "ip", "127.0.0.1", + "store", 12.0, + "key1", ImmutableMap.of("key2", 123) + )); + + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createLocalEnvironment(); + execEnv.setParallelism(2); + tableEnv = TableEnvironment.getTableEnvironment(execEnv); + tableEnv.registerFunction("get_json_object", new UDFJson()); + table = tableEnv.sqlQuery("select '" + json + "' as message"); + } + + @Test + public void jsonPathUdfTestReturn123() + throws Exception + { + String jsonKey = "$.key1.key2"; + //Table table = tableEnv.sqlQuery("select cast(json['store'] as double) from tp , LATERAL TABLE(json_parser(message, 'store', 'ip')) as T(json) "); + Table table1 = tableEnv.sqlQuery("select get_json_object(message,'" + jsonKey + "') from " + table); + tableEnv.toAppendStream(table1, Row.class) + .addSink(new SinkFunction() + { + @Override + public void invoke(Row value, Context context) + throws Exception + { + result.put(jsonKey, (String) value.getField(0)); + } + }); + tableEnv.execEnv().execute(); + Assert.assertEquals("123", result.get(jsonKey)); + } + + @Test + public void jsonPathUdfTest() + throws Exception + { + String jsonKey = "$.key2.key2"; + result.put(jsonKey, "ok"); + Table table1 = tableEnv.sqlQuery("select get_json_object(message,'" + jsonKey + "') from " + table); + tableEnv.toAppendStream(table1, Row.class) + .addSink(new SinkFunction() + { + @Override + public void invoke(Row value, Context context) + throws Exception + { + if (value.getField(0) == null) { + result.remove(jsonKey); + } + } + }); + tableEnv.execEnv().execute(); + Assert.assertNull(result.get(jsonKey)); + } +} diff --git a/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/SqlSplit.java b/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/SqlSplit.java index de79924b2..3dc1ca288 100644 --- a/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/SqlSplit.java +++ b/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/SqlSplit.java @@ -27,7 +27,6 @@ public void splitTest1() { String code = "a1;a2;'12;34';\"a4;a8\";10"; String[] split = code.split(SQL_REGEX); - Assert.assertEquals(split.length, 5); Assert.assertArrayEquals(split, new String[] {"a1", "a2", "'12;34'", "\"a4;a8\"", "10"}); } } diff --git a/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/TestStreamMode.java b/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/TestStreamMode.java index 9afcca57c..221ca8ab9 100644 --- a/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/TestStreamMode.java +++ b/sylph-runners/flink/src/test/java/ideal/sylph/runner/flink/sqlTest/TestStreamMode.java @@ -15,7 +15,6 @@ */ package ideal.sylph.runner.flink.sqlTest; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; @@ -57,7 +56,6 @@ public void toRetractStreamTest() // this is global window Table table = tableEnv.sqlQuery("SELECT name, count(1) FROM (VALUES ('Bob'), ('Bob')) AS NameTable(name) GROUP BY name"); Assert.assertNotNull(tableEnv.toRetractStream(table, Row.class).print()); - //tableEnv.execEnv().execute(); } }