Skip to content

Commit

Permalink
modify recover mode
Browse files Browse the repository at this point in the history
  • Loading branch information
gywndi committed Apr 24, 2021
1 parent 8589c25 commit b3a57f1
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 20 deletions.
4 changes: 3 additions & 1 deletion src/main/java/net/gywn/binlog/BinlogHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class BinlogHandler {
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private Exception threadException;
private BinlogTransaction binlogTransaction = null;
private boolean recovering = true;

private boolean threadRunning = false;

// used for checking group key changed in binlog transaction level
Expand All @@ -84,6 +84,8 @@ public class BinlogHandler {
private Binlog currntBinlog;
@Setter
private Binlog targetBinlog;
@Setter
private boolean recovering = true;

public BinlogHandler(final UldraConfig uldraConfig) {
this.uldraConfig = uldraConfig;
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/net/gywn/binlog/BinlogServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public class BinlogServer {
@Getter
private Exception threadException;

private boolean recovering = true;

public BinlogServer(final UldraConfig uldraConfig) {
this.uldraConfig = uldraConfig;
this.binlogServer = uldraConfig.getBinlogServer();
Expand Down Expand Up @@ -156,9 +154,9 @@ public void run() {
lastBinlog = binlogHandler.getTargetBinlog();
}

if (recovering && !binlogHandler.isRecoveringPosition()) {
if (binlogHandler.isRecovering() && !binlogHandler.isRecoveringPosition()) {
logger.info("Recover finished, target - {}", binlogHandler.getTargetBinlog());
recovering = false;
binlogHandler.setRecovering(false);
}

// flush binlog position info
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/net/gywn/binlog/beans/TargetOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ public class TargetOperation {
private final Map<String, String> keyMap;
private final TargetTable targetTable;
private final boolean isGroupKeyChanged;
private final boolean recovering;

public TargetOperation(final TargetTable targetTable, final BinlogOperation binlogOperation) {
public TargetOperation(final TargetTable targetTable, final BinlogOperation binlogOperation, final boolean recovering) {
this.targetTable = targetTable;
this.tableName = targetTable.getName();
this.recovering = recovering;
this.isGroupKeyChanged = binlogOperation.isGroupKeyChanged();
this.datMap = targetTable.getTargetDataMap(binlogOperation.getDatMap());
this.keyMap = targetTable.getTargetDataMap(binlogOperation.getKeyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public enum OperationBinlogHandler {
public void executeBinlogOperation(final BinlogTransaction binlogTransaction,
final BinlogOperation binlogOperation, final TargetHandler targetHandler) throws Exception {
for (final TargetTable targetTable : binlogOperation.getBinlogTable().getTargetTables()) {
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation);
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation, binlogTransaction.isRecovering());
targetTable.getInsert().executeUpdate(binlogTransaction, targetOperation, targetHandler);
}
}
Expand All @@ -43,7 +43,7 @@ public void executeBinlogOperation(final BinlogTransaction binlogTransaction,
public void executeBinlogOperation(final BinlogTransaction binlogTransaction,
final BinlogOperation binlogOperation, final TargetHandler targetHandler) throws Exception {
for (final TargetTable targetTable : binlogOperation.getBinlogTable().getTargetTables()) {
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation);
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation, binlogTransaction.isRecovering());
targetTable.getUpdate().executeUpdate(binlogTransaction, targetOperation, targetHandler);
}
}
Expand All @@ -53,7 +53,7 @@ public void executeBinlogOperation(final BinlogTransaction binlogTransaction,
public void executeBinlogOperation(final BinlogTransaction binlogTransaction,
final BinlogOperation binlogOperation, final TargetHandler targetHandler) throws Exception {
for (final TargetTable targetTable : binlogOperation.getBinlogTable().getTargetTables()) {
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation);
TargetOperation targetOperation = new TargetOperation(targetTable, binlogOperation, binlogTransaction.isRecovering());
targetTable.getDelete().executeUpdate(binlogTransaction, targetOperation, targetHandler);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ public void executeUpdate(final BinlogTransaction binlogTransaction, final Targe
return;
}

if (binlogTransaction.isRecovering()) {
if (targetOperation.isRecovering()) {
logger.debug("TargetOpType->INSERT->UPSERT");
targetHandler.upsert(binlogTransaction.getConnection(), targetOperation);
return;
}

logger.debug("TargetOpType->INSERT");
targetHandler.insert(binlogTransaction.getConnection(), targetOperation);
}
Expand All @@ -65,7 +66,7 @@ public void executeUpdate(final BinlogTransaction binlogTransaction, final Targe

if (targetOperation.isGroupKeyChanged()) {
logger.debug("TargetOpType->UPDATE->KEY_CHANGED");

// Fill by old image
Map<String, String> map = targetHandler.selectByOld(binlogTransaction.getConnection(), targetOperation);

Expand All @@ -81,7 +82,7 @@ public void executeUpdate(final BinlogTransaction binlogTransaction, final Targe
targetOperation.getDatMap().put(entry.getKey(), entry.getValue());
}
}

logger.debug("TargetOpType->INSERT->NO_ROWKEY->MERGED {}", targetOperation);
targetHandler.upsert(binlogTransaction.getConnection(), targetOperation);
targetHandler.delete(binlogTransaction.getConnection(), targetOperation);
Expand Down
20 changes: 12 additions & 8 deletions src/main/java/net/gywn/binlog/handler/TargetHandlerMysql.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public void insert(final Connection connection, final TargetOperation operation)
params.add(e.getValue());
}

String sql = String.format("insert into %s (%s) values (%s)", operation.getTableName(), sbCol.toString(),
sbVal.toString());
String sql = String.format("insert%s into %s (%s) values (%s)", getIgnore(operation), operation.getTableName(),
sbCol.toString(), sbVal.toString());

executeUpdate(connection, sql, params);
}
Expand Down Expand Up @@ -91,7 +91,7 @@ public void upsert(final Connection connection, final TargetOperation operation)
params.add(e.getValue());
}

String sql = String.format("insert ignore into %s (%s) values (%s) on duplicate key update %s",
String sql = String.format("insert%s into %s (%s) values (%s) on duplicate key update %s", getIgnore(operation),
operation.getTableName(), sbCol.toString(), sbVal.toString(), sbDup.toString());
executeUpdate(connection, sql, params);
}
Expand Down Expand Up @@ -120,8 +120,8 @@ public void update(final Connection connection, final TargetOperation operation)
params.add(e.getValue());
}

String sql = String.format("update ignore %s set %s where 1=1 %s", operation.getTableName(), sbSet.toString(),
sbWhe.toString());
String sql = String.format("update%s %s set %s where 1=1 %s", getIgnore(operation), operation.getTableName(),
sbSet.toString(), sbWhe.toString());
executeUpdate(connection, sql, params);

}
Expand All @@ -140,7 +140,8 @@ public void delete(final Connection connection, final TargetOperation operation)
params.add(e.getValue());
}

String sql = String.format("delete ignore from %s where 1=1 %s", operation.getTableName(), sbWhe.toString());
String sql = String.format("delete%s from %s where 1=1 %s", getIgnore(operation), operation.getTableName(),
sbWhe.toString());
executeUpdate(connection, sql, params);
}

Expand All @@ -167,8 +168,8 @@ public void softdel(final Connection connection, final TargetOperation operation
params.add(e.getValue());
}

String sql = String.format("update ignore %s set %s where 1=1 %s", operation.getTableName(), sbSet.toString(),
sbWhe.toString());
String sql = String.format("update%s %s set %s where 1=1 %s", getIgnore(operation), operation.getTableName(),
sbSet.toString(), sbWhe.toString());
executeUpdate(connection, sql, params);
}

Expand Down Expand Up @@ -248,7 +249,10 @@ private static void executeUpdate(final Connection connection, final String sql,
}
pstmt.executeUpdate();
pstmt.close();
}

private String getIgnore(final TargetOperation operation) {
return operation.isRecovering() ? " ignore" : "";
}

}

0 comments on commit b3a57f1

Please sign in to comment.