diff --git a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java index f746189e96..8de2acfdb0 100644 --- a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java +++ b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/BioSocketChannel.java @@ -138,6 +138,16 @@ public SocketAddress getRemoteSocketAddress() { if (socket != null) { return socket.getRemoteSocketAddress(); } + + return null; + } + + public SocketAddress getLocalSocketAddress() { + Socket socket = this.socket; + if (socket != null) { + return socket.getLocalSocketAddress(); + } + return null; } diff --git a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java index 782af21eec..878dc605b7 100644 --- a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java +++ b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/NettySocketChannel.java @@ -216,6 +216,10 @@ public SocketAddress getRemoteSocketAddress() { return channel != null ? channel.remoteAddress() : null; } + public SocketAddress getLocalSocketAddress() { + return channel != null ? channel.localAddress() : null; + } + public void close() { if (channel != null) { channel.close(); diff --git a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java index 67f967abe1..b67702aecb 100644 --- a/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java +++ b/driver/src/main/java/com/alibaba/otter/canal/parse/driver/mysql/socket/SocketChannel.java @@ -21,5 +21,7 @@ public interface SocketChannel { public SocketAddress getRemoteSocketAddress(); + public SocketAddress getLocalSocketAddress(); + public void close(); } diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java index 256b1ecc6b..4348dc2994 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlConnection.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.charset.Charset; import java.util.List; import java.util.concurrent.TimeUnit; @@ -254,10 +255,18 @@ public void dump(GTIDSet gtidSet, MultiStageCoprocessor coprocessor) throws IOEx private void sendRegisterSlave() throws IOException { RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket(); - cmd.reportHost = authInfo.getAddress().getAddress().getHostAddress(); + SocketAddress socketAddress = connector.getChannel().getLocalSocketAddress(); + if (socketAddress == null || !(socketAddress instanceof InetSocketAddress)) { + return; + } + + InetSocketAddress address = (InetSocketAddress) socketAddress; + String host = address.getHostString(); + int port = address.getPort(); + cmd.reportHost = host; + cmd.reportPort = port; cmd.reportPasswd = authInfo.getPassword(); cmd.reportUser = authInfo.getUsername(); - cmd.reportPort = authInfo.getAddress().getPort(); // 暂时先用master节点的port cmd.serverId = this.slaveId; byte[] cmdBody = cmd.toBytes(); diff --git a/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java b/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java index 8ca9b0447e..25c88764e3 100644 --- a/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java +++ b/parse/src/test/java/com/alibaba/otter/canal/parse/MysqlBinlogDumpPerformanceTest.java @@ -20,12 +20,12 @@ public class MysqlBinlogDumpPerformanceTest { public static void main(String args[]) { final MysqlEventParser controller = new MysqlEventParser(); - final EntryPosition startPosition = new EntryPosition("mysql-bin.001699", 120L, 100L); + final EntryPosition startPosition = new EntryPosition("mysql-bin.000007", 89796293L, 100L); controller.setConnectionCharset(Charset.forName("UTF-8")); controller.setSlaveId(3344L); controller.setDetectingEnable(false); controller.setFilterQueryDml(true); - controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3328), "root", "hello")); + controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("100.81.154.142", 3306), "canal", "canal")); controller.setMasterPosition(startPosition); controller.setEnableTsdb(false); controller.setDestination("example");