Skip to content

Commit

Permalink
版本迭代,bugfix修复 & 优化 & 新feature
Browse files Browse the repository at this point in the history
  • Loading branch information
lulu2panpan committed Jan 13, 2022
1 parent 1f8fb3f commit d402a37
Show file tree
Hide file tree
Showing 157 changed files with 4,553 additions and 1,076 deletions.
1 change: 0 additions & 1 deletion docker/admin/app.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ function start_app() {
sh /home/admin/bin/jdk8.sh
chmod 755 /home/admin/polardbx-binlog.standalone/bin/*
sudo -E su admin -c 'sh /home/admin/polardbx-binlog.standalone/bin/daemon.sh start'
sudo -E su admin -c 'sh /home/admin/polardbx-binlog.standalone/bin/daemon.sh start'
}

function stop_app() {
Expand Down
2 changes: 1 addition & 1 deletion docker/bin/cdc_log_cleaner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ clean_process_log() {
ls -lhtr $current_process_path|sed '1d'|while read LINE
do
db_path="$current_process_path/`echo $LINE|awk '{print $9}'`"
clean_db_log $db_path
clean_db_log $db_path $2

get_dir_size
if [ $use -lt $max_used ]; then
Expand Down
4 changes: 1 addition & 3 deletions polardbx-cdc-assemble/bin/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ defaultLog=$HOME/logs/polardbx-binlog/$TASK_NAME/default.log
cd $HOME

#Start Java Process
if [[ "$TASK_NAME" == "DAEMON" ]]; then
${JAVA} ${JAVA_OPTS} -classpath ${CLASSPATH}:. com.aliyun.polardbx.binlog.daemon.DaemonBootStrap "taskName=${TASK_NAME}" 1>>$defaultLog 2>&1 &
elif [[ "$TASK_NAME" == Dumper* ]]; then
if [[ "$TASK_NAME" == Dumper* ]]; then
${JAVA} ${JAVA_OPTS} -classpath ${CLASSPATH}:. com.aliyun.polardbx.binlog.dumper.DumperBootStrap "taskName=${TASK_NAME}" 1>>$defaultLog 2>&1 &
elif [[ "$TASK_NAME" == "TRANSFER" ]]; then
${JAVA} ${JAVA_OPTS} -classpath ${CLASSPATH}:. com.aliyun.polardbx.binlog.transfer.Main "taskName=${TASK_NAME}" 1>>$defaultLog 2>&1 &
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ private void consume(ErosaConnection connection, BinlogPosition startPosition, S
handle.setEventHandler(handler);

processor.setHandle(handle);
processor.init(connection, startPosition.getFileName(), startPosition.getPosition(), false);
processor.init(connection, startPosition.getFileName(), startPosition.getPosition(), false,
mySqlInfo.getServerCharactorSet());
try {
processor.start();
} finally {
Expand All @@ -174,7 +175,7 @@ private BinlogPosition searchPosition(ErosaConnection connection, long requestTs
String searchFile = endPosition.getFileName();
while (true) {
searchTsoEventHandle.reset();
processor.init(connection.fork(), searchFile, 0, true);
processor.init(connection.fork(), searchFile, 0, true, mySqlInfo.getServerCharactorSet());
long binlogFileSize = connection.binlogFileSize(searchFile);
if (binlogFileSize == -1) {
//找不到这个文件,直接break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public static String getXid(LogEvent event) {
return query.substring(XA_ROLLBACK.length()).trim();
}
}

if (event instanceof XaPrepareLogEvent) {
XaPrepareLogEvent xaPrepareLogEvent = (XaPrepareLogEvent) event;
}
Expand Down Expand Up @@ -187,26 +187,36 @@ public static boolean isSequenceEvent(LogEvent event) {
return event.getHeader().getType() == LogEvent.SEQUENCE_EVENT;
}

public static boolean isGCNEvent(LogEvent event) {
return event.getHeader().getType() == LogEvent.GCN_EVENT;
}

public static boolean isRowsQueryEvent(LogEvent event) {
return event.getHeader().getType() == LogEvent.ROWS_QUERY_LOG_EVENT;
}

/**
* DRDS / ip / trace-seq / subseq
*
* @return / 10 / 2/
* @return / 10 / 2/, serverId
*/
public static String buildTrace(RowsQueryLogEvent event) {
public static String[] buildTrace(RowsQueryLogEvent event) {
String query = event.getRowsQuery();
if (query.startsWith("/*DRDS")) {
String[] results = new String[2];
String[] primarySplitArray = StringUtils.split(query, "/");
String[] secondarySplitArray = StringUtils.split(primarySplitArray[2], "-");
String seq = secondarySplitArray.length < 2 ? "0" : secondarySplitArray[1];
String subSeq = null;
if (NumberUtils.isCreatable(primarySplitArray[3])) {
subSeq = primarySplitArray[3];
}
return buildTraceId(seq, subSeq);
String trace = buildTraceId(seq, subSeq);
results[0] = trace;
if (primarySplitArray.length > 4) {
results[1] = primarySplitArray[4];
}
return results;
}
return null;
}
Expand All @@ -218,6 +228,29 @@ public static String buildTraceId(String mainSeq, String subSeq) {
return main + sub;
}

/**
* / DRDS / ip / trace-seq / subseq / server id / * /
*
* @return / 10 / 2/
*/
public static long getServerIdFromRowQuery(RowsQueryLogEvent event) {
long serverId = 0L;
String query = event.getRowsQuery();
if (query.startsWith("/*DRDS")) {
String[] ps = StringUtils.split(query, "/");
final int serverIdIdx = 4;
if (ps.length >= serverIdIdx + 2) {
String serverIdStr = ps[serverIdIdx];
try {
serverId = Long.parseLong(serverIdStr);
} catch (Throwable e) {

}
}
}
return serverId;
}

public static String makeXid(Long tranId, String groupName) throws UnsupportedEncodingException {
StringBuffer sb = new StringBuffer();
sb.append("X'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class RuntimeContext {
*/
private boolean recovery;

private int serverId;
private long serverId;

private int lowerCaseTableNames;

Expand Down Expand Up @@ -192,11 +192,11 @@ public void setTopology(String topology) {
this.topology = topology;
}

public int getServerId() {
public long getServerId() {
return serverId;
}

public void setServerId(int serverId) {
public void setServerId(long serverId) {
this.serverId = serverId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.aliyun.polardbx.binlog.canal.binlog;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -393,6 +394,10 @@ public static String getJavaCharset(final int id) {
}

public static String getJavaCharset(String mysqlCharset) {
if (StringUtils.isBlank(mysqlCharset)) {
return mysqlCharset;
}

Entry entry = mysqlCharsetMap.get(mysqlCharset.toUpperCase());
if (entry == null) {
return mysqlCharset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void dump(SinkFunction sinkFunction) throws IOException, TableIdNotFoundE
decoder.handle(LogEvent.TABLE_MAP_EVENT);
decoder.handle(LogEvent.XID_EVENT);
decoder.handle(LogEvent.SEQUENCE_EVENT);
decoder.handle(LogEvent.GCN_EVENT);
decoder.handle(LogEvent.WRITE_ROWS_EVENT);
decoder.handle(LogEvent.UPDATE_ROWS_EVENT);
decoder.handle(LogEvent.DELETE_ROWS_EVENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.aliyun.polardbx.binlog.canal.binlog.event.FormatDescriptionLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.TableMapLogEvent;
import com.aliyun.polardbx.binlog.canal.core.model.ServerCharactorSet;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -35,6 +36,10 @@ public final class LogContext {

private LogPosition logPosition;

private long serverId;

private ServerCharactorSet serverCharactorSet;

public LogContext() {
this.formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
}
Expand All @@ -59,6 +64,14 @@ public final void setFormatDescription(FormatDescriptionLogEvent formatDescripti
this.formatDescription = formatDescription;
}

public ServerCharactorSet getServerCharactorSet() {
return serverCharactorSet;
}

public void setServerCharactorSet(ServerCharactorSet serverCharactorSet) {
this.serverCharactorSet = serverCharactorSet;
}

public final void putTable(TableMapLogEvent mapEvent) {
mapOfTable.put(Long.valueOf(mapEvent.getTableId()), mapEvent);
}
Expand All @@ -76,4 +89,12 @@ public void reset() {

mapOfTable.clear();
}

public long getServerId() {
return serverId;
}

public void setServerId(long serverId) {
this.serverId = serverId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.aliyun.polardbx.binlog.canal.binlog;

import com.aliyun.polardbx.binlog.canal.LogEventUtil;
import com.aliyun.polardbx.binlog.canal.binlog.event.AppendBlockLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.BeginLoadQueryLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.CreateFileLogEvent;
Expand All @@ -25,6 +26,7 @@
import com.aliyun.polardbx.binlog.canal.binlog.event.ExecuteLoadLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.ExecuteLoadQueryLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.FormatDescriptionLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.GcnLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.GtidLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.HeartbeatLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.IgnorableLogEvent;
Expand Down Expand Up @@ -54,6 +56,7 @@
import com.aliyun.polardbx.binlog.canal.binlog.event.mariadb.MariaGtidListLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.mariadb.MariaGtidLogEvent;
import com.aliyun.polardbx.binlog.canal.binlog.event.mariadb.StartEncryptionLogEvent;
import com.aliyun.polardbx.binlog.canal.core.model.ServerCharactorSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -122,6 +125,11 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.GCN_EVENT: {
GcnLogEvent event = new GcnLogEvent(header, buffer, descriptionEvent);
logPosition.position = header.getLogPos();
return event;
}
case LogEvent.QUERY_EVENT: {
QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
Expand All @@ -135,7 +143,9 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
return event;
}
case LogEvent.TABLE_MAP_EVENT: {
TableMapLogEvent mapEvent = new TableMapLogEvent(header, buffer, descriptionEvent);
ServerCharactorSet charactorSet = context.getServerCharactorSet();
TableMapLogEvent mapEvent = new TableMapLogEvent(header, buffer, descriptionEvent,
CharsetConversion.getJavaCharset(charactorSet.getCharacterSetServer()));
/* updating position in context */
logPosition.position = header.getLogPos();
context.putTable(mapEvent);
Expand All @@ -146,20 +156,29 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
if (context.getServerId() != 0L) {
event.setServerId(context.getServerId());
}
return event;
}
case LogEvent.UPDATE_ROWS_EVENT_V1: {
RowsLogEvent event = new UpdateRowsLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
if (context.getServerId() != 0L) {
event.setServerId(context.getServerId());
}
return event;
}
case LogEvent.DELETE_ROWS_EVENT_V1: {
RowsLogEvent event = new DeleteRowsLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
if (context.getServerId() != 0L) {
event.setServerId(context.getServerId());
}
return event;
}
case LogEvent.ROTATE_EVENT: {
Expand Down Expand Up @@ -304,27 +323,37 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
RowsQueryLogEvent event = new RowsQueryLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
context.setServerId(LogEventUtil.getServerIdFromRowQuery(event));
return event;
}
case LogEvent.WRITE_ROWS_EVENT: {
RowsLogEvent event = new WriteRowsLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
if (context.getServerId() != 0L) {
event.setServerId(context.getServerId());
}
return event;
}
case LogEvent.UPDATE_ROWS_EVENT: {
RowsLogEvent event = new UpdateRowsLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
if (context.getServerId() != 0L) {
event.setServerId(context.getServerId());
}
return event;
}
case LogEvent.DELETE_ROWS_EVENT: {
RowsLogEvent event = new DeleteRowsLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
if (context.getServerId() != 0L) {
event.setServerId(context.getServerId());
}
return event;
}
case LogEvent.GTID_LOG_EVENT:
Expand Down Expand Up @@ -471,4 +500,5 @@ public LogEvent decode(LogBuffer buffer, LogContext context) throws IOException
buffer.rewind();
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ public abstract class LogEvent implements HandlerEvent {
*/
public static final int SEQUENCE_EVENT = 80;

/* DataNode 8.0 TSO EVENT */
public static final int GCN_EVENT = 105;

public static final int MARIA_EVENTS_BEGIN = 160;
/* New Maria event numbers start from here */
public static final int ANNOTATE_ROWS_EVENT = 160;
Expand Down Expand Up @@ -303,6 +306,8 @@ public abstract class LogEvent implements HandlerEvent {

protected byte[] newData;

protected Long traceServerId;

protected LogEvent(LogHeader header) {
this.header = header;
}
Expand Down Expand Up @@ -381,6 +386,8 @@ public static String getTypeName(final int type) {
return "Previous_gtids";
case SEQUENCE_EVENT:
return "Sequence";
case GCN_EVENT:
return "Gcn";
case XA_PREPARE_LOG_EVENT:
return "XA_Prepare";
default:
Expand Down Expand Up @@ -410,6 +417,13 @@ public final long getServerId() {
return header.getServerId();
}

/**
* Server ID of the server that created the event.
*/
public final void setServerId(long serverId) {
header.setServerId(serverId);
}

/**
* The position of the next event in the master binary log, in bytes from the beginning of the file. In a binlog
* that is not a relay log, this is just the position of the next event, in bytes from the beginning of the file. In
Expand Down Expand Up @@ -456,4 +470,12 @@ public String info() {
public String getCommitLogInfo() {
return null;
}

public Long getTraceServerId() {
return traceServerId;
}

public void setTraceServerId(Long traceServerId) {
this.traceServerId = traceServerId;
}
}
Loading

0 comments on commit d402a37

Please sign in to comment.