Skip to content

Commit 2da21d9

Browse files
Allow partial sync after loading AOF with preamble (#2366)
The AOF preamble mechanism replaces the traditional AOF base file with an RDB snapshot during rewrite operations, which reduces I/O overhead and improves loading performance. However, when valkey loads the RDB-formatted preamble from the base AOF file, it does not process the replication ID (replid) information within the RDB AUX fields. This omission has two limitations: * On a primary, it prevents the primary from accepting PSYNC continue requests after restarting with a preamble-enabled AOF file. * On a replica, it prevents the replica from successfully performing partial sync requests (avoiding full sync) after restarting with a preamble-enabled AOF file. To resolve this, this commit aligns the AOF preamble handling with the logic used for standalone RDB files, by storing the replication ID and replication offset in the AOF preamble and restoring them when loading the AOF file. Resolves #2677 --------- Signed-off-by: arthur.lee <liziang.arthur@bytedance.com> Signed-off-by: Arthur Lee <arthurkiller@users.noreply.github.com> Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
1 parent 7fbd4cb commit 2da21d9

File tree

5 files changed

+130
-35
lines changed

5 files changed

+130
-35
lines changed

src/aof.c

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1452,7 +1452,14 @@ int loadSingleAppendOnlyFile(char *filename) {
14521452

14531453
if (fseek(fp, 0, SEEK_SET) == -1) goto readerr;
14541454
rioInitWithFile(&rdb, fp);
1455-
if (rdbLoadRio(&rdb, RDBFLAGS_AOF_PREAMBLE, NULL) != C_OK) {
1455+
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
1456+
int rdb_flags = RDBFLAGS_AOF_PREAMBLE;
1457+
int rsi_is_valid = 0;
1458+
if (iAmPrimary()) {
1459+
if (server.repl_backlog == NULL) createReplicationBacklog();
1460+
rdb_flags |= RDBFLAGS_FEED_REPL;
1461+
}
1462+
if (rdbLoadRio(&rdb, rdb_flags, &rsi) != C_OK) {
14561463
if (old_style)
14571464
serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted",
14581465
filename);
@@ -1462,10 +1469,15 @@ int loadSingleAppendOnlyFile(char *filename) {
14621469
ret = AOF_FAILED;
14631470
goto cleanup;
14641471
} else {
1472+
/* Restore the replication ID / offset from the RDB file. */
1473+
rsi_is_valid = rdbRestoreOffsetFromSaveInfo(&rsi, true);
14651474
loadingAbsProgress(ftello(fp));
14661475
last_progress_report_size = ftello(fp);
14671476
if (old_style) serverLog(LL_NOTICE, "Reading the remaining AOF tail...");
14681477
}
1478+
/* If the AOF didn't contain replication info, it's not possible to
1479+
* support partial resync, so we can free the backlog to save memory. */
1480+
if (!rsi_is_valid && server.repl_backlog && listLength(server.replicas) == 0) freeReplicationBacklog();
14691481
}
14701482

14711483
/* Read the actual AOF file, in REPL format, command by command. */
@@ -2400,8 +2412,10 @@ int rewriteAppendOnlyFile(char *filename) {
24002412
startSaving(RDBFLAGS_AOF_PREAMBLE);
24012413

24022414
if (server.aof_use_rdb_preamble) {
2415+
rdbSaveInfo rsi, *rsiptr;
2416+
rsiptr = rdbPopulateSaveInfo(&rsi);
24032417
int error;
2404-
if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
2418+
if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, rsiptr) == C_ERR) {
24052419
errno = error;
24062420
goto werr;
24072421
}

src/rdb.c

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3952,3 +3952,41 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
39523952
}
39533953
return NULL;
39543954
}
3955+
3956+
/* Restore the replication ID / offset from the RDB file
3957+
* return 1 if replication ID and offset were restored from the rdbSaveInfo */
3958+
int rdbRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble) {
3959+
int rsi_is_valid = 0;
3960+
serverAssert(rsi != NULL);
3961+
if (rsi->repl_id_is_set && rsi->repl_offset != -1 && rsi->repl_stream_db != -1) {
3962+
/* Note that older implementations may save a repl_stream_db
3963+
* of -1 inside the RDB file in a wrong way, see more
3964+
* information in function rdbPopulateSaveInfo. */
3965+
rsi_is_valid = 1;
3966+
if (!iAmPrimary()) {
3967+
memcpy(server.replid, rsi->repl_id, sizeof(server.replid));
3968+
server.primary_repl_offset = rsi->repl_offset;
3969+
if (!is_aof_preamble || (!server.primary && !server.cached_primary)) {
3970+
/* If this is a replica, create a cached primary from this
3971+
* information, in order to allow partial resynchronizations
3972+
* with primaries. For AOF, only cache the primary if replica
3973+
* has not synced to its primary node yet. */
3974+
replicationCachePrimaryUsingMyself();
3975+
selectDb(server.cached_primary, rsi->repl_stream_db);
3976+
}
3977+
} else {
3978+
/* If this is a primary, we can save the replication info
3979+
* as secondary ID and offset, in order to allow replicas
3980+
* to partial resynchronizations with primaries. */
3981+
memcpy(server.replid2, rsi->repl_id, sizeof(server.replid));
3982+
server.second_replid_offset = rsi->repl_offset + 1;
3983+
/* Rebase primary_repl_offset from rsi.repl_offset. */
3984+
server.primary_repl_offset += rsi->repl_offset;
3985+
serverAssert(server.repl_backlog);
3986+
server.repl_backlog->offset = server.primary_repl_offset - server.repl_backlog->histlen + 1;
3987+
rebaseReplicationBuffer(rsi->repl_offset);
3988+
server.repl_no_replicas_since = time(NULL);
3989+
}
3990+
}
3991+
return rsi_is_valid;
3992+
}

src/rdb.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,5 +212,6 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s
212212
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
213213
ssize_t rdbSaveFunctions(rio *rdb);
214214
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
215+
int rdbRestoreOffsetFromSaveInfo(rdbSaveInfo *rsi, bool is_aof_preamble);
215216

216217
#endif

src/server.c

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6989,36 +6989,7 @@ void loadDataFromDisk(void) {
69896989
int rdb_load_ret = rdbLoad(server.rdb_filename, &rsi, rdb_flags);
69906990
if (rdb_load_ret == RDB_OK) {
69916991
serverLog(LL_NOTICE, "DB loaded from disk: %.3f seconds", (float)(ustime() - start) / 1000000);
6992-
6993-
/* Restore the replication ID / offset from the RDB file. */
6994-
if (rsi.repl_id_is_set && rsi.repl_offset != -1 &&
6995-
/* Note that older implementations may save a repl_stream_db
6996-
* of -1 inside the RDB file in a wrong way, see more
6997-
* information in function rdbPopulateSaveInfo. */
6998-
rsi.repl_stream_db != -1) {
6999-
rsi_is_valid = 1;
7000-
if (!iAmPrimary()) {
7001-
memcpy(server.replid, rsi.repl_id, sizeof(server.replid));
7002-
server.primary_repl_offset = rsi.repl_offset;
7003-
/* If this is a replica, create a cached primary from this
7004-
* information, in order to allow partial resynchronizations
7005-
* with primaries. */
7006-
replicationCachePrimaryUsingMyself();
7007-
selectDb(server.cached_primary, rsi.repl_stream_db);
7008-
} else {
7009-
/* If this is a primary, we can save the replication info
7010-
* as secondary ID and offset, in order to allow replicas
7011-
* to partial resynchronizations with primaries. */
7012-
memcpy(server.replid2, rsi.repl_id, sizeof(server.replid));
7013-
server.second_replid_offset = rsi.repl_offset + 1;
7014-
/* Rebase primary_repl_offset from rsi.repl_offset. */
7015-
server.primary_repl_offset += rsi.repl_offset;
7016-
serverAssert(server.repl_backlog);
7017-
server.repl_backlog->offset = server.primary_repl_offset - server.repl_backlog->histlen + 1;
7018-
rebaseReplicationBuffer(rsi.repl_offset);
7019-
server.repl_no_replicas_since = time(NULL);
7020-
}
7021-
}
6992+
rsi_is_valid = rdbRestoreOffsetFromSaveInfo(&rsi, false);
70226993
} else if (rdb_load_ret != RDB_NOT_EXIST) {
70236994
serverLog(LL_WARNING, "Fatal error loading the DB, check server logs. Exiting.");
70246995
exit(1);

tests/integration/replication-4.tcl

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,8 @@ start_server {tags {"repl external:skip"}} {
266266
wait_for_sync $replica
267267
}
268268

269-
test {Data divergence can happen under default conditions} {
270-
$replica config set propagation-error-behavior ignore
269+
test {Data divergence can happen under default conditions} {
270+
$replica config set propagation-error-behavior ignore
271271
$master debug replicate fake-command-1
272272

273273
# Wait for replication to normalize
@@ -280,7 +280,7 @@ start_server {tags {"repl external:skip"}} {
280280
assert_equal [count_log_message 0 "== CRITICAL =="] 1
281281
}
282282

283-
test {Data divergence is allowed on writable replicas} {
283+
test {Data divergence is allowed on writable replicas} {
284284
$replica config set replica-read-only no
285285
$replica set number2 foo
286286
$master incrby number2 1
@@ -293,3 +293,74 @@ start_server {tags {"repl external:skip"}} {
293293
}
294294
}
295295
}
296+
297+
# test aof persistence replication info and load it after server restart
298+
start_server {tags {"repl external:skip cluster:skip"} overrides {appendonly yes repl-ping-replica-period 10000 loglevel debug}} {
299+
start_server {overrides {appendonly yes repl-ping-replica-period 10000 loglevel debug}} {
300+
set primary_id -1
301+
set replica_id 0
302+
set primary [srv $primary_id client]
303+
set primary_host [srv $primary_id host]
304+
set primary_port [srv $primary_id port]
305+
set replica [srv $replica_id client]
306+
307+
$replica replicaof $primary_host $primary_port
308+
$primary config rewrite
309+
$replica config rewrite
310+
311+
for {set k 0} {$k < 100} {incr k} {
312+
$primary set foo_$k bar_$k
313+
}
314+
$primary set bar foo
315+
wait_for_value_to_propagate_to_replica $primary $replica "bar"
316+
317+
waitForBgrewriteaof $primary
318+
waitForBgrewriteaof $replica
319+
wait_for_ofs_sync $primary $replica
320+
assert_equal [status $primary sync_full] 1
321+
wait_for_log_messages $replica_id {"*sync: Finished with success*"} 0 100 100
322+
323+
# save replid for both primary and replica
324+
set prev_replid [status $primary master_replid]
325+
set prev_repl_offset [status $primary master_repl_offset]
326+
327+
test {replica rewrite aof and load it after restart} {
328+
# persist current replication info
329+
$replica bgrewriteaof
330+
waitForBgrewriteaof $replica
331+
wait_for_ofs_sync $primary $replica
332+
set logfile [srv $replica_id stdout]
333+
set num_lines [lindex [exec wc -l $logfile] 0]
334+
335+
restart_server $replica_id true false true now
336+
set replica [srv $replica_id client]
337+
wait_for_ofs_sync $primary $replica
338+
wait_for_log_messages $replica_id {"*Primary accepted a Partial Resynchronization*"} $num_lines 100 100
339+
340+
assert_equal [status $replica master_replid] $prev_replid
341+
assert_equal [status $replica master_repl_offset] $prev_repl_offset
342+
assert_equal [status $primary sync_full] 1
343+
assert_equal [status $primary sync_partial_ok] 1
344+
}
345+
346+
test {primary rewrite aof and load it after restart} {
347+
# persist current replication info
348+
$primary bgrewriteaof
349+
waitForBgrewriteaof $primary
350+
wait_for_ofs_sync $primary $replica
351+
set prev_repl_offset [expr {[status $primary master_repl_offset] + 1}]
352+
set logfile [srv $replica_id stdout]
353+
set num_lines [lindex [exec wc -l $logfile] 0]
354+
355+
restart_server $primary_id true false true now
356+
set primary [srv $primary_id client]
357+
wait_for_ofs_sync $primary $replica
358+
wait_for_log_messages $replica_id {"*Primary accepted a Partial Resynchronization*"} $num_lines 100 100
359+
360+
assert_equal [status $primary master_replid2] $prev_replid
361+
assert_equal [status $primary second_repl_offset] $prev_repl_offset
362+
assert_equal [status $primary sync_full] 0
363+
assert_equal [status $primary sync_partial_ok] 1
364+
}
365+
}
366+
}

0 commit comments

Comments
 (0)