Skip to content

Commit

Permalink
Merge pull request #81 from harbby/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
harbby authored Mar 29, 2019
2 parents 4552ec3 + 413ea10 commit c177d3a
Show file tree
Hide file tree
Showing 27 changed files with 755 additions and 450 deletions.
9 changes: 5 additions & 4 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ include 'sylph-connectors:sylph-mysql'
include 'sylph-connectors:sylph-hdfs'
include 'sylph-connectors:sylph-kafka08'
include 'sylph-connectors:sylph-kafka09'
include 'sylph-connectors:sylph-hbase'
//include 'sylph-connectors:sylph-hbase'
include 'sylph-connectors:sylph-elasticsearch6'
include 'sylph-connectors:sylph-elasticsearch5'
include 'sylph-connectors:sylph-clickhouse'
include 'sylph-connectors:spark-kafka'
include 'sylph-connectors:sylph-kudu'

//----
include 'sylph-dist'
include 'sylph-parser'
include 'sylph-docs'
include 'sylph-yarn'
//include 'sylph-clickhouse'
//include 'sylph-elasticsearch5'

include 'sylph-base-kafka'

17 changes: 17 additions & 0 deletions sylph-base-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
configurations.all {
resolutionStrategy { preferProjectModules() }
}

dependencies {
compileOnly project(':sylph-runners:sylph-runner-flink')

compileOnly project(":sylph-etl-api")

compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) {
exclude(module: 'flink-shaded-hadoop2')
}

compile (group: 'org.apache.flink', name: 'flink-connector-kafka-base_2.11', version: deps.flink){
exclude(module: 'kafka-clients')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

import ideal.sylph.etl.Schema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;

import static ideal.sylph.runner.flink.actuator.StreamSqlUtil.schemaToRowTypeInfo;

public class JsonSchema
implements KeyedDeserializationSchema<Row>
{
Expand All @@ -42,36 +38,6 @@ public JsonSchema(Schema schema)
this.rowTypeInfo = schemaToRowTypeInfo(schema);
}

public static RowTypeInfo schemaToRowTypeInfo(Schema schema)
{
TypeInformation<?>[] types = schema.getFieldTypes().stream().map(JsonSchema::getFlinkType)
.toArray(TypeInformation<?>[]::new);
String[] names = schema.getFieldNames().toArray(new String[0]);
return new RowTypeInfo(types, names);
}

private static TypeInformation<?> getFlinkType(Type type)
{
if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType() == Map.class) {
Type[] arguments = ((ParameterizedType) type).getActualTypeArguments();
Type valueType = arguments[1];
TypeInformation<?> valueInfo = getFlinkType(valueType);
return new MapTypeInfo<>(TypeExtractor.createTypeInfo(arguments[0]), valueInfo);
}
else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType() == List.class) {
TypeInformation<?> typeInformation = getFlinkType(((ParameterizedType) type).getActualTypeArguments()[0]);
if (typeInformation.isBasicType() && typeInformation != Types.STRING) {
return Types.PRIMITIVE_ARRAY(typeInformation);
}
else {
return Types.OBJECT_ARRAY(typeInformation);
}
}
else {
return TypeExtractor.createTypeInfo(type);
}
}

@Override
public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (C) 2018 The Sylph Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.plugins.kafka.flink;

import ideal.sylph.etl.SourceContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.types.Row;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

public abstract class KafkaBaseSource
{
private static final long serialVersionUID = 2L;
private static final String[] KAFKA_COLUMNS = new String[] {"_topic", "_key", "_message", "_partition", "_offset"};

public abstract FlinkKafkaConsumerBase<Row> getKafkaConsumerBase(List<String> topicSets,
KeyedDeserializationSchema<Row> deserializationSchema, Properties properties);

/**
* 初始化(driver阶段执行)
**/
public DataStream<Row> createSource(StreamExecutionEnvironment execEnv, KafkaSourceConfig config, SourceContext context)
{
requireNonNull(execEnv, "execEnv is null");
requireNonNull(config, "config is null");
String topics = config.getTopics();
String groupId = config.getGroupid(); //消费者的名字
String offsetMode = config.getOffsetMode(); //latest earliest

Properties properties = new Properties();
properties.put("bootstrap.servers", config.getBrokers()); //需要把集群的host 配置到程序所在机器
//"enable.auto.commit" -> (false: java.lang.Boolean), //不自动提交偏移量
// "session.timeout.ms" -> "30000", //session默认是30秒 超过5秒不提交offect就会报错
// "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期
properties.put("group.id", groupId); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误
properties.put("auto.offset.reset", offsetMode); //latest earliest

KeyedDeserializationSchema<Row> deserializationSchema = "json".equals(config.getValueType()) ?
new JsonSchema(context.getSchema()) : new RowDeserializer();

List<String> topicSets = Arrays.asList(topics.split(","));
//org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
FlinkKafkaConsumerBase<Row> base = getKafkaConsumerBase(topicSets, deserializationSchema, properties);
return execEnv.addSource(base);
}

private static class RowDeserializer
implements KeyedDeserializationSchema<Row>
{
@Override
public boolean isEndOfStream(Row nextElement)
{
return false;
}

@Override
public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
{
return Row.of(
topic, //topic
messageKey == null ? null : new String(messageKey, UTF_8), //key
new String(message, UTF_8), //message
partition,
offset
);
}

@Override
public TypeInformation<Row> getProducedType()
{
TypeInformation<?>[] types = new TypeInformation<?>[] {
TypeExtractor.createTypeInfo(String.class),
TypeExtractor.createTypeInfo(String.class), //createTypeInformation[String]
TypeExtractor.createTypeInfo(String.class),
Types.INT,
Types.LONG
};
return new RowTypeInfo(types, KAFKA_COLUMNS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.plugins.kafka;
package ideal.sylph.plugins.kafka.flink;

import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.etl.PluginConfig;

public class KafkaSourceConfig08
public class KafkaSourceConfig
extends PluginConfig
{
private static final long serialVersionUID = 2L;
Expand All @@ -41,8 +41,8 @@ public class KafkaSourceConfig08
private String offsetMode = "latest";

@Name("zookeeper.connect")
@Description("this is kafka zk list")
private String zookeeper = "localhost:2181";
@Description("this is kafka zk list, kafka08 and kafka09 Must need to set")
private String zookeeper = null; //"localhost:2181"

@Name("value_type")
@Description("this is kafka String value Type, use json")
Expand All @@ -58,11 +58,6 @@ public String getBrokers()
return brokers;
}

public String getZookeeper()
{
return zookeeper;
}

public String getGroupid()
{
return groupid;
Expand All @@ -73,10 +68,15 @@ public String getOffsetMode()
return offsetMode;
}

public String getZookeeper()
{
return zookeeper;
}

public String getValueType()
{
return valueType;
}

private KafkaSourceConfig08() {}
private KafkaSourceConfig() {}
}
32 changes: 32 additions & 0 deletions sylph-connectors/spark-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
apply plugin: 'scala'

dependencies {
//--------------------------------------------------spark----------------------------------------------------
compileOnly(group: 'org.apache.spark', name: 'spark-sql_2.11', version: deps.spark) {
exclude(module: 'spark-core_2.11')
}
compileOnly(group: 'org.apache.spark', name: 'spark-streaming_2.11', version: deps.spark) {
exclude(module: 'spark-core_2.11')
}
compileOnly(group: 'org.apache.spark', name: 'spark-core_2.11', version: deps.spark) {
exclude(module: 'hadoop-client')
}
compileOnly group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3'

/**
* spark 结构化流 kafka专用
* */
compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.11', version: deps.spark

/**
* spark streaming kafka 依赖
* */
compile(group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: deps.spark) {
exclude(group: 'org.spark-project.spark')
exclude(group: 'org.scala-lang')
exclude(module: 'spark-tags_2.11')
exclude(module: 'slf4j-log4j12')
exclude(module: 'slf4j-api')
exclude(module: 'snappy-java')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.plugins.kafka;
package ideal.sylph.plugins.kafka.spark;

import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
package ideal.sylph.plugins.kafka.spark

import ideal.sylph.annotation.{Description, Name, Version}
import ideal.sylph.etl.PluginConfig
import ideal.sylph.etl.api.Source
import ideal.sylph.plugins.kafka.KafkaSourceConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import java.util.Objects

import ideal.sylph.annotation.{Description, Name, Version}
import ideal.sylph.etl.PluginConfig
import ideal.sylph.etl.api.{Sink, Source, TransForm}
import org.apache.spark.api.java.JavaRDD
import ideal.sylph.etl.api.Source
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{StringType, StructField, StructType}
Expand Down
33 changes: 4 additions & 29 deletions sylph-connectors/sylph-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -1,36 +1,11 @@
apply plugin: 'scala'

dependencies {
compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) {
exclude(module: 'flink-shaded-hadoop2')
}
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: deps.flink
//--------------------------------------------------spark----------------------------------------------------
compileOnly(group: 'org.apache.spark', name: 'spark-sql_2.11', version: deps.spark) {
exclude(module: 'spark-core_2.11')
}
compileOnly(group: 'org.apache.spark', name: 'spark-streaming_2.11', version: deps.spark) {
exclude(module: 'spark-core_2.11')
}
compileOnly(group: 'org.apache.spark', name: 'spark-core_2.11', version: deps.spark) {
exclude(module: 'hadoop-client')
}
compileOnly group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.7.3'

/**
* spark 结构化流 kafka专用
* */
compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.11', version: deps.spark

compile project(':sylph-base-kafka')

/**
* spark streaming kafka 老流依赖
* */
compile(group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: deps.spark) {
exclude(group: 'org.spark-project.spark')
exclude(group: 'org.scala-lang')
exclude(module: 'spark-tags_2.11')
exclude(module: 'slf4j-log4j12')
exclude(module: 'slf4j-api')
exclude(module: 'snappy-java')
}
testCompile project(':sylph-runners:sylph-runner-flink')
testCompile project(':sylph-spi')
}
Loading

0 comments on commit c177d3a

Please sign in to comment.