Skip to content

Commit

Permalink
Add PluginConfig to configure the parameters of the connector through…
Browse files Browse the repository at this point in the history
… injection

other: Optimize part of the code
  • Loading branch information
ideal committed Sep 12, 2018
1 parent 6a28611 commit e1a9d3a
Show file tree
Hide file tree
Showing 37 changed files with 447 additions and 292 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ subprojects{
excludes(["**/*.properties","**/*.sql"])
//include "**/*.java"
}
assemble.dependsOn 'licenseMain','licenseTest'
//assemble.dependsOn 'licenseMain','licenseTest'
//licenseMain.includes
//license.mapping('javascript', 'JAVADOC_STYLE')
}
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ java -version

./gradlew -v

./gradlew clean checkstyle assemble test "$@"
./gradlew clean checkstyle licenseMain licenseTest assemble test "$@"
24 changes: 18 additions & 6 deletions ideal-common/src/main/java/ideal/common/jvm/JVMLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,23 @@

public final class JVMLauncher<R extends Serializable>
{
private VmCallable<R> callable;
private final VmCallable<R> callable;
private final Collection<URL> userJars;
private final Consumer<String> consoleHandler;
private final boolean depThisJvm;

private Process process;
private Collection<URL> userJars;
private Consumer<String> consoleHandler;

public JVMLauncher(VmCallable<R> callable, Consumer<String> consoleHandler, Collection<URL> userJars)
public JVMLauncher(
VmCallable<R> callable,
Consumer<String> consoleHandler,
Collection<URL> userJars,
boolean depThisJvm)
{
this.callable = callable;
this.userJars = userJars;
this.consoleHandler = consoleHandler;
this.depThisJvm = depThisJvm;
}

public VmFuture<R> startAndGet()
Expand Down Expand Up @@ -113,7 +120,12 @@ private List<String> buildMainArg(int port)
ops.add("-classpath");
//ops.add(System.getProperty("java.class.path"));
String userSdkJars = getUserAddClasspath(); //编译时还需要 用户的额外jar依赖
ops.add(System.getProperty("java.class.path") + ":" + userSdkJars);
if (depThisJvm) {
ops.add(System.getProperty("java.class.path") + ":" + userSdkJars);
}
else {
ops.add(userSdkJars);
}

String javaLibPath = System.getProperty("java.library.path");
if (javaLibPath != null) {
Expand All @@ -131,8 +143,8 @@ public static void main(String[] args)
VmFuture<? extends Serializable> future;

try (ObjectInputStreamProxy ois = new ObjectInputStreamProxy(System.in)) {
VmCallable<? extends Serializable> callable = (VmCallable<? extends Serializable>) ois.readObject();
System.out.println("vm start init ok ...");
VmCallable<? extends Serializable> callable = (VmCallable<? extends Serializable>) ois.readObject();
future = new VmFuture<>(callable.call());
}
catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private JVMLaunchers() {}
public static class VmBuilder<T extends Serializable>
{
private VmCallable<T> callable;
private boolean depThisJvm = true;
private Consumer<String> consoleHandler;
private final List<URL> tmpJars = new ArrayList<>();

Expand All @@ -48,6 +49,12 @@ public VmBuilder<T> setConsole(Consumer<String> consoleHandler)
return this;
}

public VmBuilder<T> notDepThisJvmClassPath()
{
depThisJvm = false;
return this;
}

public VmBuilder<T> addUserURLClassLoader(URLClassLoader vmClassLoader)
{
ClassLoader classLoader = vmClassLoader;
Expand All @@ -67,7 +74,7 @@ public VmBuilder<T> addUserjars(Collection<URL> jars)
public JVMLauncher<T> build()
{
requireNonNull(consoleHandler, "setConsole(Consumer<String> consoleHandler) not setting");
return new JVMLauncher<T>(callable, consoleHandler, tmpJars);
return new JVMLauncher<T>(callable, consoleHandler, tmpJars, depThisJvm);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public boolean open(long partitionId, long version)
String sql = "insert into mysql_table_sink values(?,?,?)";
try {
Class.forName("com.mysql.jdbc.Driver");
this.connection = DriverManager.getConnection(config.getJdbcUrl(), config.getUser(), config.getPassword());
this.connection = DriverManager.getConnection(config.jdbcUrl, config.user, config.password);
this.statement = connection.prepareStatement(sql);
}
catch (SQLException | ClassNotFoundException e) {
Expand Down Expand Up @@ -103,33 +103,25 @@ public void close(Throwable errorOrNull)
}

public static class MysqlConfig
implements PluginConfig
extends PluginConfig
{
@Name("jdbc.url")
@Name("url")
@Description("this is mysql jdbc url")
private String jdbcUrl;
private final String jdbcUrl;

@Name("jdbc.user")
@Name("userName")
@Description("this is mysql userName")
private String user;
private final String user;

@Name("jdbc.password")
@Description("this is mysql userName")
private String password;

public String getJdbcUrl()
{
return jdbcUrl;
}

public String getPassword()
{
return password;
}
@Name("password")
@Description("this is mysql password")
private final String password;

public String getUser()
private MysqlConfig(String jdbcUrl, String user, String password)
{
return user;
this.jdbcUrl = jdbcUrl;
this.user = user;
this.password = password;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,4 @@ class PrintSink extends RealTimeSink {
* partition级别的资源释放
**/
override def close(errorOrNull: Throwable): Unit = {}

override def driverInit(optionMap: java.util.Map[String, Object]): Unit = {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.PluginConfig;
import ideal.sylph.etl.api.Source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
Expand All @@ -36,59 +37,50 @@
import java.util.Properties;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

@Name(value = "kafka")
@Version("1.0.0")
@Description("this flink kafka source inputStream")
public class KafkaSource
implements Source<StreamTableEnvironment, DataStream<Row>>
implements Source<DataStream<Row>>
{
private static final long serialVersionUID = 2L;
private static final String[] KAFKA_COLUMNS = new String[] {"_topic", "_key", "_message", "_partition", "_offset"};

private transient java.util.Map<String, Object> optionMap;
private transient StreamTableEnvironment tableEnv;
private final transient Supplier<DataStream<Row>> loadStream;

/**
* 初始化(driver阶段执行)
**/
@Override
public void driverInit(
StreamTableEnvironment tableEnv,
java.util.Map<String, Object> optionMap)
public KafkaSource(StreamTableEnvironment tableEnv, KafkaSourceConfig config)
{
this.optionMap = optionMap;
this.tableEnv = tableEnv;
}
requireNonNull(tableEnv, "tableEnv is null");
requireNonNull(config, "config is null");
loadStream = Suppliers.memoize(() -> {
String topics = config.topics;
String brokers = config.brokers; //需要把集群的host 配置到程序所在机器
String groupid = config.groupid; //消费者的名字
String offset = config.offsetMode; //latest earliest

private final transient Supplier<DataStream<Row>> loadStream = Suppliers.memoize(() -> {
String topics = (String) optionMap.get("kafka_topic");
String brokers = (String) optionMap.get("kafka_broker"); //需要把集群的host 配置到程序所在机器
String groupid = (String) optionMap.get("kafka_group_id"); //消费者的名字
String offset = (String) optionMap.getOrDefault("auto.offset.reset", "latest"); //latest earliest

Properties properties = new Properties();
properties.put("bootstrap.servers", brokers);
//"enable.auto.commit" -> (false: java.lang.Boolean), //不自动提交偏移量
// "session.timeout.ms" -> "30000", //session默认是30秒 超过5秒不提交offect就会报错
// "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期
properties.put("group.id", groupid); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误
properties.put("auto.offset.reset", offset); //latest earliest

List<String> topicSets = Arrays.asList(topics.split(","));
//org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
DataStream<Row> stream = FlinkEnvUtil.getFlinkEnv(tableEnv).addSource(new FlinkKafkaConsumer010<Row>(
topicSets,
new RowDeserializer(),
properties)
);
//------------registerDataStream--------------
String tableName = (String) optionMap.getOrDefault("table_name", null);
if (tableName != null) {
tableEnv.registerDataStream(tableName, stream, String.join(",", KAFKA_COLUMNS) + ",proctime.proctime");
}
return stream;
});
Properties properties = new Properties();
properties.put("bootstrap.servers", brokers);
//"enable.auto.commit" -> (false: java.lang.Boolean), //不自动提交偏移量
// "session.timeout.ms" -> "30000", //session默认是30秒 超过5秒不提交offect就会报错
// "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期
properties.put("group.id", groupid); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误
properties.put("auto.offset.reset", offset); //latest earliest

List<String> topicSets = Arrays.asList(topics.split(","));
//org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
DataStream<Row> stream = FlinkEnvUtil.getFlinkEnv(tableEnv).addSource(new FlinkKafkaConsumer010<Row>(
topicSets,
new RowDeserializer(),
properties)
);
return stream;
});
}

@Override
public DataStream<Row> getSource()
Expand Down Expand Up @@ -129,4 +121,28 @@ public TypeInformation<Row> getProducedType()
return new RowTypeInfo(types, KAFKA_COLUMNS);
}
}

public static class KafkaSourceConfig
extends PluginConfig
{
private static final long serialVersionUID = 2L;

@Name("kafka_topic")
@Description("this is kafka topic list")
private String topics;

@Name("kafka_broker")
@Description("this is kafka broker list")
private String brokers;

@Name("kafka_group_id")
@Description("this is kafka_group_id")
private String groupid;

@Name("auto.offset.reset")
@Description("this is auto.offset.reset mode")
private String offsetMode = "latest";

private KafkaSourceConfig() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package ideal.sylph.plugins.flink.source

import java.util
import java.util.Random
import java.util.concurrent.TimeUnit

Expand All @@ -37,28 +36,12 @@ import scala.util.parsing.json.JSONObject
@Description("this flink test source inputStream")
@Version("1.0.0")
@SerialVersionUID(2L) //使用注解来制定序列化id
class TestSource extends Source[StreamTableEnvironment, DataStream[Row]] {

@transient private var optionMap: java.util.Map[String, Object] = _
@transient private var tableEnv: StreamTableEnvironment = _
class TestSource(@transient private val tableEnv: StreamTableEnvironment) extends Source[DataStream[Row]] {

@transient private lazy val loadStream: DataStream[Row] = {
val stream = FlinkEnvUtil.getFlinkEnv(tableEnv).addSource(new MyDataSource)
val tableName = optionMap.getOrDefault("table_name", null).asInstanceOf[String]
if (tableName != null) {
tableEnv.registerDataStream(tableName, stream)
}
stream
FlinkEnvUtil.getFlinkEnv(tableEnv).addSource(new MyDataSource)
}

/**
* 初始化(driver阶段执行)
**/
override def driverInit(tableEnv: StreamTableEnvironment,
optionMap: util.Map[String, Object]): Unit = {
this.optionMap = optionMap
this.tableEnv = tableEnv
}

override def getSource(): DataStream[Row] = loadStream

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,4 @@ class SqlWindow extends TransForm[DataStream[Row]] {

tableEnv.toAppendStream(result2, classOf[Row])
}

override def driverInit(optionMap: java.util.Map[String, Object]): Unit = {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ class TestTrans extends RealTimeTransForm {
**/
override def close(errorOrNull: Throwable): Unit = {}

override def driverInit(optionMap: java.util.Map[String, Object]): Unit = {
}

/**
* driver 上运行
*/
Expand Down
Loading

0 comments on commit e1a9d3a

Please sign in to comment.