Skip to content
This repository has been archived by the owner on Nov 25, 2024. It is now read-only.

Try to cap incremental syncs from working on massive gaps #3374

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions syncapi/storage/shared/storage_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (d *Database) CleanSendToDeviceUpdates(
// getMembershipFromEvent returns the value of content.membership iff the event is a state event
// with type 'm.room.member' and state_key of userID. Otherwise, an empty string is returned.
func getMembershipFromEvent(ctx context.Context, ev gomatrixserverlib.PDU, userID string, rsAPI api.SyncRoomserverAPI) (string, string) {
if ev.StateKey() == nil || *ev.StateKey() == "" {
if ev.Type() != "m.room.member" || ev.StateKey() == nil || *ev.StateKey() == "" {
return "", ""
}
fullUser, err := spec.NewUserID(userID, true)
Expand All @@ -520,7 +520,7 @@ func getMembershipFromEvent(ctx context.Context, ev gomatrixserverlib.PDU, userI
return "", ""
}

if ev.Type() != "m.room.member" || !ev.StateKeyEquals(string(*senderID)) {
if !ev.StateKeyEquals(string(*senderID)) {
return "", ""
}
membership, err := ev.Membership()
Expand Down
44 changes: 35 additions & 9 deletions syncapi/sync/requestpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,19 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
} else {
// Incremental sync
reasonablePositions := findReasonableIncrementalSyncWindow(
syncReq.Since, rp.Notifier.CurrentPosition(),
)
// Also update the currentPos, which is used for the retry logic below.
// Otherwise we may skip over some events.
currentPos = reasonablePositions
syncReq.Response.NextBatch = types.StreamingToken{
PDUPosition: withTransaction(
syncReq.Since.PDUPosition,
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.PDUStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.PDUPosition, rp.Notifier.CurrentPosition().PDUPosition,
syncReq.Since.PDUPosition, reasonablePositions.PDUPosition,
)
},
),
Expand All @@ -412,7 +418,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.TypingStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.TypingPosition, rp.Notifier.CurrentPosition().TypingPosition,
syncReq.Since.TypingPosition, reasonablePositions.TypingPosition,
)
},
),
Expand All @@ -421,7 +427,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.ReceiptStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.ReceiptPosition, rp.Notifier.CurrentPosition().ReceiptPosition,
syncReq.Since.ReceiptPosition, reasonablePositions.ReceiptPosition,
)
},
),
Expand All @@ -430,7 +436,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.InviteStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.InvitePosition, rp.Notifier.CurrentPosition().InvitePosition,
syncReq.Since.InvitePosition, reasonablePositions.InvitePosition,
)
},
),
Expand All @@ -439,7 +445,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.SendToDeviceStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.SendToDevicePosition, rp.Notifier.CurrentPosition().SendToDevicePosition,
syncReq.Since.SendToDevicePosition, reasonablePositions.SendToDevicePosition,
)
},
),
Expand All @@ -448,7 +454,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.AccountDataStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.AccountDataPosition, rp.Notifier.CurrentPosition().AccountDataPosition,
syncReq.Since.AccountDataPosition, reasonablePositions.AccountDataPosition,
)
},
),
Expand All @@ -457,7 +463,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.NotificationDataStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.NotificationDataPosition, rp.Notifier.CurrentPosition().NotificationDataPosition,
syncReq.Since.NotificationDataPosition, reasonablePositions.NotificationDataPosition,
)
},
),
Expand All @@ -466,7 +472,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.DeviceListStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.DeviceListPosition, rp.Notifier.CurrentPosition().DeviceListPosition,
syncReq.Since.DeviceListPosition, reasonablePositions.DeviceListPosition,
)
},
),
Expand All @@ -475,7 +481,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
func(txn storage.DatabaseTransaction) types.StreamPosition {
return rp.streams.PresenceStreamProvider.IncrementalSync(
syncReq.Context, txn, syncReq,
syncReq.Since.PresencePosition, rp.Notifier.CurrentPosition().PresencePosition,
syncReq.Since.PresencePosition, reasonablePositions.PresencePosition,
)
},
),
Expand Down Expand Up @@ -585,3 +591,23 @@ func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest, curre
}
return false
}

func findReasonableIncrementalSyncWindow(since, limit types.StreamingToken) types.StreamingToken {
const windowSize = 100 // TODO: reasonable number?
for s, l := range map[*types.StreamPosition]types.StreamPosition{
&since.AccountDataPosition: limit.AccountDataPosition,
&since.DeviceListPosition: limit.DeviceListPosition,
&since.InvitePosition: limit.InvitePosition,
&since.NotificationDataPosition: limit.NotificationDataPosition,
&since.PDUPosition: limit.PDUPosition,
&since.PresencePosition: limit.PresencePosition,
&since.ReceiptPosition: limit.ReceiptPosition,
&since.SendToDevicePosition: limit.SendToDevicePosition,
&since.TypingPosition: limit.TypingPosition,
} {
if *s += windowSize; *s > l {
*s = l
}
}
return since
}
Loading