From a7953fe3ff29fc76e8d0d2f0a26548b2530efa66 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 16 Apr 2024 17:49:54 +0100 Subject: [PATCH 1/2] Try to cap incremental syncs from working on massive gaps --- syncapi/storage/shared/storage_consumer.go | 4 +-- syncapi/sync/requestpool.go | 41 +++++++++++++++++----- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/syncapi/storage/shared/storage_consumer.go b/syncapi/storage/shared/storage_consumer.go index 923ead9bdd..751bc6ce7b 100644 --- a/syncapi/storage/shared/storage_consumer.go +++ b/syncapi/storage/shared/storage_consumer.go @@ -508,7 +508,7 @@ func (d *Database) CleanSendToDeviceUpdates( // getMembershipFromEvent returns the value of content.membership iff the event is a state event // with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned. func getMembershipFromEvent(ctx context.Context, ev gomatrixserverlib.PDU, userID string, rsAPI api.SyncRoomserverAPI) (string, string) { - if ev.StateKey() == nil || *ev.StateKey() == "" { + if ev.Type() != "m.room.member" || ev.StateKey() == nil || *ev.StateKey() == "" { return "", "" } fullUser, err := spec.NewUserID(userID, true) @@ -520,7 +520,7 @@ func getMembershipFromEvent(ctx context.Context, ev gomatrixserverlib.PDU, userI return "", "" } - if ev.Type() != "m.room.member" || !ev.StateKeyEquals(string(*senderID)) { + if !ev.StateKeyEquals(string(*senderID)) { return "", "" } membership, err := ev.Membership() diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 5a92c70e1c..3c547dc2e0 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -397,13 +397,16 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. } } else { // Incremental sync + reasonablePositions := findReasonableIncrementalSyncWindow( + syncReq.Since, rp.Notifier.CurrentPosition(), + ) syncReq.Response.NextBatch = types.StreamingToken{ PDUPosition: withTransaction( syncReq.Since.PDUPosition, func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.PDUStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.PDUPosition, rp.Notifier.CurrentPosition().PDUPosition, + syncReq.Since.PDUPosition, reasonablePositions.PDUPosition, ) }, ), @@ -412,7 +415,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.TypingStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.TypingPosition, rp.Notifier.CurrentPosition().TypingPosition, + syncReq.Since.TypingPosition, reasonablePositions.TypingPosition, ) }, ), @@ -421,7 +424,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.ReceiptStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.ReceiptPosition, rp.Notifier.CurrentPosition().ReceiptPosition, + syncReq.Since.ReceiptPosition, reasonablePositions.ReceiptPosition, ) }, ), @@ -430,7 +433,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.InviteStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.InvitePosition, rp.Notifier.CurrentPosition().InvitePosition, + syncReq.Since.InvitePosition, reasonablePositions.InvitePosition, ) }, ), @@ -439,7 +442,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.SendToDeviceStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.SendToDevicePosition, rp.Notifier.CurrentPosition().SendToDevicePosition, + syncReq.Since.SendToDevicePosition, reasonablePositions.SendToDevicePosition, ) }, ), @@ -448,7 +451,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.AccountDataStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.AccountDataPosition, rp.Notifier.CurrentPosition().AccountDataPosition, + syncReq.Since.AccountDataPosition, reasonablePositions.AccountDataPosition, ) }, ), @@ -457,7 +460,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.NotificationDataStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.NotificationDataPosition, rp.Notifier.CurrentPosition().NotificationDataPosition, + syncReq.Since.NotificationDataPosition, reasonablePositions.NotificationDataPosition, ) }, ), @@ -466,7 +469,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.DeviceListStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.DeviceListPosition, rp.Notifier.CurrentPosition().DeviceListPosition, + syncReq.Since.DeviceListPosition, reasonablePositions.DeviceListPosition, ) }, ), @@ -475,7 +478,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. func(txn storage.DatabaseTransaction) types.StreamPosition { return rp.streams.PresenceStreamProvider.IncrementalSync( syncReq.Context, txn, syncReq, - syncReq.Since.PresencePosition, rp.Notifier.CurrentPosition().PresencePosition, + syncReq.Since.PresencePosition, reasonablePositions.PresencePosition, ) }, ), @@ -585,3 +588,23 @@ func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest, curre } return false } + +func findReasonableIncrementalSyncWindow(since, limit types.StreamingToken) types.StreamingToken { + const windowSize = 100 // TODO: reasonable number? + for s, l := range map[*types.StreamPosition]types.StreamPosition{ + &since.AccountDataPosition: limit.AccountDataPosition, + &since.DeviceListPosition: limit.DeviceListPosition, + &since.InvitePosition: limit.InvitePosition, + &since.NotificationDataPosition: limit.NotificationDataPosition, + &since.PDUPosition: limit.PDUPosition, + &since.PresencePosition: limit.PresencePosition, + &since.ReceiptPosition: limit.ReceiptPosition, + &since.SendToDevicePosition: limit.SendToDevicePosition, + &since.TypingPosition: limit.TypingPosition, + } { + if *s += windowSize; *s > l { + *s = l + } + } + return since +} From fb5e1abe71b56a600fda24661e678a65fa3ae5ca Mon Sep 17 00:00:00 2001 From: Till Faelligen <2353100+S7evinK@users.noreply.github.com> Date: Wed, 15 May 2024 11:02:23 +0200 Subject: [PATCH 2/2] Fix a bug where we would accidentally skip over some events --- syncapi/sync/requestpool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 3c547dc2e0..448b751ce9 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -400,6 +400,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. reasonablePositions := findReasonableIncrementalSyncWindow( syncReq.Since, rp.Notifier.CurrentPosition(), ) + // Also update the currentPos, which is used for the retry logic below. + // Otherwise we may skip over some events. + currentPos = reasonablePositions syncReq.Response.NextBatch = types.StreamingToken{ PDUPosition: withTransaction( syncReq.Since.PDUPosition,