diff --git a/README.md b/README.md index 1cfd9b5..c397ffc 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ MySQL input plugin for Embulk loads data by binlog. - **user**: MySQL user (string, required) - **table**: MySQL user (string, required) - **password**: MySQL password (string, required) +- **ssl_mode**: MySQL SSL mode. See [MySQL documentation](https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode) for details. (string, default: `DISABLED`) - **from_binlog_filename**: The beginning of MySQL binlog filename (string, required) - **from_binlog_position**: The beginning of MySQL binlog position (integer, required) - **to_binlog_filename**: The end of MySQL binlog filename (string, optional) if to_binlog_filename is provided and to_binlog_position is omitted, this plugin stop fetching date just after binlog rotation to this file. if to_binlog_filename is omitted, plugin stops at the end of binlog. diff --git a/src/main/java/org/embulk/input/mysql_binlog/MysqlBinlogClient.java b/src/main/java/org/embulk/input/mysql_binlog/MysqlBinlogClient.java index ebc4ae9..5f82f19 100644 --- a/src/main/java/org/embulk/input/mysql_binlog/MysqlBinlogClient.java +++ b/src/main/java/org/embulk/input/mysql_binlog/MysqlBinlogClient.java @@ -2,6 +2,7 @@ import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import com.github.shyiko.mysql.binlog.network.SSLMode; import org.embulk.input.mysql_binlog.handler.*; import org.embulk.input.mysql_binlog.model.DbInfo; import org.slf4j.Logger; @@ -25,7 +26,7 @@ public void setConnecting(boolean connecting) { } - public MysqlBinlogClient(DbInfo dbInfo, String binlogFilename) { + public MysqlBinlogClient(DbInfo dbInfo, String binlogFilename, SSLMode sslMode) { client = new BinaryLogClient(dbInfo.getHost(), dbInfo.getPort(), dbInfo.getUser(), dbInfo.getPassword()); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( @@ -36,6 +37,7 @@ public MysqlBinlogClient(DbInfo dbInfo, String binlogFilename) { client.setBlocking(false); client.registerLifecycleListener(this); client.setHeartbeatInterval(client.getKeepAliveInterval() / 2); + client.setSSLMode(sslMode); } public void registerEventListener(BinlogEventHandler binlogEventHandler) { diff --git a/src/main/java/org/embulk/input/mysql_binlog/PluginTask.java b/src/main/java/org/embulk/input/mysql_binlog/PluginTask.java index 81b3fd1..e070904 100644 --- a/src/main/java/org/embulk/input/mysql_binlog/PluginTask.java +++ b/src/main/java/org/embulk/input/mysql_binlog/PluginTask.java @@ -1,5 +1,6 @@ package org.embulk.input.mysql_binlog; +import com.github.shyiko.mysql.binlog.network.SSLMode; import com.google.common.base.Optional; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; @@ -65,6 +66,10 @@ public interface PluginTask @ConfigDefault("\"disable\"") Ssl getSsl(); + @Config("ssl_mode") + @ConfigDefault("\"DISABLED\"") + SSLMode getSslMode(); + @Config("default_timezone") @ConfigDefault("\"UTC\"") String getDefaultTimezone(); diff --git a/src/main/java/org/embulk/input/mysql_binlog/manager/MysqlBinlogManager.java b/src/main/java/org/embulk/input/mysql_binlog/manager/MysqlBinlogManager.java index c67e3ec..20aa0fd 100644 --- a/src/main/java/org/embulk/input/mysql_binlog/manager/MysqlBinlogManager.java +++ b/src/main/java/org/embulk/input/mysql_binlog/manager/MysqlBinlogManager.java @@ -40,7 +40,7 @@ public MysqlBinlogManager(PluginTask task, PageBuilder pageBuilder, Schema schem this.setCurrentDdl(t.toDdl()); DbInfo dbInfo = MysqlBinlogClient.convertTaskToDbInfo(task); - this.client = new MysqlBinlogClient(dbInfo, getBinlogFilename()); + this.client = new MysqlBinlogClient(dbInfo, getBinlogFilename(), task.getSslMode()); this.registerHandler(); this.client.registerEventListener(handler); }