Skip to content

Commit

Permalink
add branch dev:
Browse files Browse the repository at this point in the history
Dev is used for new feature development and will be merged regularly into master
  • Loading branch information
ideal committed Sep 11, 2018
1 parent 5b10d6c commit 6a28611
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.etl.PluginConfig;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.api.RealTimeSink;
import org.slf4j.Logger;
Expand All @@ -27,9 +28,6 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;

import static java.util.Objects.requireNonNull;

@Name("mysql")
@Description("this is mysql Sink, if table not execit ze create table")
Expand All @@ -38,20 +36,15 @@ public class MysqlSink
{
private static final Logger logger = LoggerFactory.getLogger(MysqlSink.class);

private String url;
private String userName;
private String password;
private final MysqlConfig config;

private Connection connection;
private PreparedStatement statement;
private int num = 0;

@Override
public void driverInit(Map<String, Object> optionMap)
public MysqlSink(MysqlConfig mysqlConfig)
{
this.url = (String) requireNonNull(optionMap.get("url"), "url is not setting");
this.userName = (String) requireNonNull(optionMap.get("userName"), "userName is not setting");
this.password = (String) requireNonNull(optionMap.get("password"), "password is not setting");
this.config = mysqlConfig;
}

@Override
Expand All @@ -60,7 +53,7 @@ public boolean open(long partitionId, long version)
String sql = "insert into mysql_table_sink values(?,?,?)";
try {
Class.forName("com.mysql.jdbc.Driver");
this.connection = DriverManager.getConnection(url, userName, password);
this.connection = DriverManager.getConnection(config.getJdbcUrl(), config.getUser(), config.getPassword());
this.statement = connection.prepareStatement(sql);
}
catch (SQLException | ClassNotFoundException e) {
Expand Down Expand Up @@ -108,4 +101,35 @@ public void close(Throwable errorOrNull)
logger.error("close connection fail", e);
}
}

public static class MysqlConfig
implements PluginConfig
{
@Name("jdbc.url")
@Description("this is mysql jdbc url")
private String jdbcUrl;

@Name("jdbc.user")
@Description("this is mysql userName")
private String user;

@Name("jdbc.password")
@Description("this is mysql userName")
private String password;

public String getJdbcUrl()
{
return jdbcUrl;
}

public String getPassword()
{
return password;
}

public String getUser()
{
return user;
}
}
}
20 changes: 20 additions & 0 deletions sylph-etl-api/src/main/java/ideal/sylph/etl/PluginConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (C) 2018 The Sylph Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.etl;

public interface PluginConfig
{
}

0 comments on commit 6a28611

Please sign in to comment.