Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gywndi committed Apr 28, 2021
1 parent b3a57f1 commit 199174e
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 33 deletions.
4 changes: 2 additions & 2 deletions src/main/java/net/gywn/binlog/BinlogHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ private void waitJobProcessing() {
int sleepMS = 1;
while (true) {

if (getJobCount() == 0) {
if (getCurrentJobCount() == 0) {
break;
}

Expand All @@ -536,7 +536,7 @@ private void waitJobProcessing() {
}
}

public int getJobCount() {
public int getCurrentJobCount() {
int currentJobs = 0;
for (BinlogHandlerWorker worker : binlogHandlerWorkers) {
currentJobs += worker.getJobCount();
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/net/gywn/binlog/BinlogHandlerWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void enqueue(final BinlogTransaction tx) {
queue.add(tx);
break;
} catch (Exception e) {
// logger.error("Enqueue error", e.getMessage());
// TODO: add metric to check enqueue error
UldraUtil.sleep(100);
}
}
Expand Down Expand Up @@ -154,6 +154,7 @@ private void transactionRollback(BinlogTransaction tx) {
}
}

// Current processing job + jobs in queue
public int getJobCount() {
return queue.size() + (processing ? 1 : 0);
}
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/net/gywn/binlog/BinlogServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
import net.gywn.binlog.common.UldraConfig;
import net.gywn.binlog.common.UldraUtil;

/**
* @author chan
*
*/
@Data
public class BinlogServer {
private static final Logger logger = LoggerFactory.getLogger(BinlogServer.class);
Expand Down Expand Up @@ -96,6 +100,8 @@ public void start() {
binaryLogClient = new BinaryLogClient(binlogServerUrl, binlogServerPort, binlogServerUsername,
binlogServerPassword);
EventDeserializer eventDeserializer = new EventDeserializer();

// DATE_AND_TIME_AS_LONG_MICRO : calculate the time from this number to support microseconds.
eventDeserializer.setCompatibilityMode(DATE_AND_TIME_AS_LONG_MICRO, CHAR_AND_BINARY_AS_BYTE_ARRAY);
binaryLogClient.setEventDeserializer(eventDeserializer);
binaryLogClient.setServerId(binlogServerID);
Expand Down Expand Up @@ -137,7 +143,7 @@ public void start() {
public void run() {
while (true) {
try {
int currentJobCount = binlogHandler.getJobCount();
int currentJobCount = binlogHandler.getCurrentJobCount();
List<Binlog> binlogList = binlogHandler.getWorkerBinlogList();

Binlog binlog = null, lastBinlog = null;
Expand All @@ -154,6 +160,7 @@ public void run() {
lastBinlog = binlogHandler.getTargetBinlog();
}

// When processing more than the binlog position set for recovery, the recover mode is released.
if (binlogHandler.isRecovering() && !binlogHandler.isRecoveringPosition()) {
logger.info("Recover finished, target - {}", binlogHandler.getTargetBinlog());
binlogHandler.setRecovering(false);
Expand Down Expand Up @@ -191,6 +198,7 @@ public void onEvent(Event event) {
});
}

// TODO: Need to implement code to recover in case of replication failure
private void registerLifecycleListener() {
binaryLogClient.registerLifecycleListener(new LifecycleListener() {

Expand Down
4 changes: 1 addition & 3 deletions src/main/java/net/gywn/binlog/beans/Binlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
public class Binlog implements Comparable<Binlog> {
private static final Logger logger = LoggerFactory.getLogger(Binlog.class);

// TODO: GTID support
private String binlogFile;
private long binlogPosition;

public Binlog() {
}

public Binlog(final String binlogInfo) {
String[] info = binlogInfo.trim().split(":");
this.binlogFile = info[0];
Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/gywn/binlog/beans/BinlogColumn.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class BinlogColumn {
@Setter
private boolean rowKey = false;

// TODO: add default values
public BinlogColumn(final String name, final String type, final String charset, final boolean unsigned) {
this.name = name;
this.charset = charset;
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/net/gywn/binlog/beans/BinlogTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ public class BinlogTransaction implements AutoCloseable {
private final List<BinlogOperation> binlogOperations = new ArrayList<BinlogOperation>();
private final String position;
private final Binlog binlog;
private boolean transactional = false;
private boolean recovering = false;

// TODO: set default transactional to true if target is not MySQL
private boolean transactional = false;

@Setter
private Connection connection;
Expand Down
74 changes: 54 additions & 20 deletions src/main/java/net/gywn/binlog/common/UldraUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
public class UldraUtil {
private static final Logger logger = LoggerFactory.getLogger(UldraUtil.class);
private static final CaseInsensitiveMap<String, String> charMap = new CaseInsensitiveMap<String, String>();

// TODO: Convert to mapping information for the entire character set
static {
charMap.put("euckr", "MS949");
charMap.put("utf8", "UTF-8");
Expand Down Expand Up @@ -83,6 +85,7 @@ public static String readFile(String path) {
return null;
}

// Convert to DATETIME(n)
private static String getMysqlDatetime(final Serializable serializable) {
logger.debug("getMysqlDatetime {}", serializable);
long time = (long) serializable;
Expand All @@ -91,13 +94,15 @@ private static String getMysqlDatetime(final Serializable serializable) {
return String.format("%s.%06d", format.format(new Date(time / 1000)), time % 1000000);
}

// Convert to TIMESTAMP(n)
private static String getMysqlTimestamp(final Serializable serializable) {
logger.debug("getMysqlTimestamp {}", serializable);
long time = (long) serializable;
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return String.format("%s.%06d", format.format(new Date(time / 1000)), time % 1000000);
}

// Convert to DATE
private static String getMysqlDate(final Serializable serializable) {
logger.debug("getMysqlDate {}", serializable);
long time = (long) serializable;
Expand All @@ -106,6 +111,7 @@ private static String getMysqlDate(final Serializable serializable) {
return String.format("%s", format.format(new Date(time / 1000)));
}

// Convert to TIME
private static String getMysqlTime(final Serializable serializable) {
logger.debug("getMysqlTime {}", serializable);
long time = (long) serializable;
Expand All @@ -114,12 +120,48 @@ private static String getMysqlTime(final Serializable serializable) {
return String.format("%s.%06d", format.format(new Date(time / 1000)), time % 1000000);
}

// Convert to String
public static String getMysqlString(final Serializable serializable) {
logger.debug("java String type");
return (String) serializable;
}

// Convert to INT
public static String getMysqlInt(final Serializable serializable, final boolean isUnsigned) {
logger.debug("java Integer type, unsinged {}", isUnsigned);
return isUnsigned ? Integer.toUnsignedString((Integer) serializable) : serializable.toString();
}

// Convert to BIGINT
public static String getMysqlBigint(final Serializable serializable, final boolean isUnsigned) {
logger.debug("java Long type, unsinged {}", isUnsigned);
return isUnsigned ? Long.toUnsignedString((Long) serializable) : serializable.toString();
}

// Convert to CLOB
public static String toCharsetString(final byte[] byteArray, final String mysqlCharset) {
logger.debug("java Bytes type, charset to {}", mysqlCharset);
String javaCharset = charMap.get(mysqlCharset);
if (javaCharset != null) {
try {
return new String(byteArray, javaCharset);
} catch (UnsupportedEncodingException e) {
logger.error(e.getMessage());
}
}
return new String(byteArray);
}

public static String toString(final Serializable serializable, final BinlogColumn column) {
if (serializable == null) {
return null;
}

logger.debug("column type in mysql {}", column.getType());

// ==================================
// Datetime & Timestamp & Date & Time
// ==================================
switch (column.getType()) {
case "datetime":
return getMysqlDatetime(serializable);
Expand All @@ -131,40 +173,32 @@ public static String toString(final Serializable serializable, final BinlogColum
return getMysqlTime(serializable);
}

// ==================================
// String type
// ==================================
if (serializable instanceof String) {
logger.debug("java String type");
return (String) serializable;
return getMysqlString(serializable);
}

// ==================================
// Number type
// ==================================
if (serializable instanceof java.lang.Integer) {
logger.debug("java Integer type");
return column.isUnsigned() ? Integer.toUnsignedString((Integer) serializable) : serializable.toString();
return getMysqlInt(serializable, column.isUnsigned());
}

if (serializable instanceof java.lang.Long) {
logger.debug("java Long type, unsinged {}", column.isUnsigned());
return column.isUnsigned() ? Long.toUnsignedString((Long) serializable) : serializable.toString();
return getMysqlBigint(serializable, column.isUnsigned());
}

// ==================================
// CLOB type
// ==================================
if (serializable instanceof byte[] && column.getCharset() != null) {
logger.debug("java Bytes type");
return toCharsetString((byte[]) serializable, column.getCharset());
}

logger.debug("java {} type", serializable.getClass());
return serializable.toString();
}

public static String toCharsetString(final byte[] byteArray, final String mysqlCharset) {
logger.debug("toCharsetString->{}", mysqlCharset);
String javaCharset = charMap.get(mysqlCharset);
if (javaCharset != null) {
try {
return new String(byteArray, javaCharset);
} catch (UnsupportedEncodingException e) {
logger.error(e.getMessage());
}
}
return new String(byteArray);
}
}
15 changes: 12 additions & 3 deletions src/main/java/net/gywn/binlog/handler/OperationBinlogHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ public enum OperationBinlogHandler {
@Override
public void executeBinlogOperation(final BinlogTransaction binlogTransaction,
final BinlogOperation binlogOperation, final TargetHandler targetHandler) throws Exception {

// Query is divided by the target tables defined on uldra-config.yml
for (final TargetTable targetTable : binlogOperation.getBinlogTable().getTargetTables()) {
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation, binlogTransaction.isRecovering());
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation,
binlogTransaction.isRecovering());
targetTable.getInsert().executeUpdate(binlogTransaction, targetOperation, targetHandler);
}
}
Expand All @@ -42,8 +45,11 @@ public void executeBinlogOperation(final BinlogTransaction binlogTransaction,
@Override
public void executeBinlogOperation(final BinlogTransaction binlogTransaction,
final BinlogOperation binlogOperation, final TargetHandler targetHandler) throws Exception {

// Query is divided by the target tables defined on uldra-config.yml
for (final TargetTable targetTable : binlogOperation.getBinlogTable().getTargetTables()) {
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation, binlogTransaction.isRecovering());
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation,
binlogTransaction.isRecovering());
targetTable.getUpdate().executeUpdate(binlogTransaction, targetOperation, targetHandler);
}
}
Expand All @@ -52,8 +58,11 @@ public void executeBinlogOperation(final BinlogTransaction binlogTransaction,
@Override
public void executeBinlogOperation(final BinlogTransaction binlogTransaction,
final BinlogOperation binlogOperation, final TargetHandler targetHandler) throws Exception {

// Query is divided by the target tables defined on uldra-config.yml
for (final TargetTable targetTable : binlogOperation.getBinlogTable().getTargetTables()) {
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation, binlogTransaction.isRecovering());
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation,
binlogTransaction.isRecovering());
targetTable.getDelete().executeUpdate(binlogTransaction, targetOperation, targetHandler);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/net/gywn/binlog/handler/TargetHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public interface TargetHandler {
public void delete(final Connection connection, final TargetOperation operation) throws Exception;

public void softdel(final Connection connection, final TargetOperation operation) throws Exception;

public Map<String, String> selectByOld(final Connection connection, final TargetOperation operation) throws Exception;

public Map<String, String> selectByOld(final Connection connection, final TargetOperation operation)
throws Exception;
}

0 comments on commit 199174e

Please sign in to comment.