20
20
#
21
21
#
22
22
import logging
23
- from typing import TYPE_CHECKING , Dict , Iterable , List , Mapping , Optional , Set , Tuple
23
+ from typing import (
24
+ TYPE_CHECKING ,
25
+ AbstractSet ,
26
+ Dict ,
27
+ Iterable ,
28
+ List ,
29
+ Mapping ,
30
+ Optional ,
31
+ Set ,
32
+ Tuple ,
33
+ )
24
34
25
35
from synapse .api import errors
26
- from synapse .api .constants import EduTypes , EventTypes
36
+ from synapse .api .constants import EduTypes , EventTypes , Membership
27
37
from synapse .api .errors import (
28
38
Codes ,
29
39
FederationDeniedError ,
38
48
wrap_as_background_process ,
39
49
)
40
50
from synapse .storage .databases .main .client_ips import DeviceLastConnectionInfo
51
+ from synapse .storage .databases .main .state_deltas import StateDelta
41
52
from synapse .types import (
42
53
DeviceListUpdates ,
43
54
JsonDict ,
@@ -222,129 +233,115 @@ async def get_user_ids_changed(
222
233
223
234
set_tag ("user_id" , user_id )
224
235
set_tag ("from_token" , str (from_token ))
225
- now_room_key = self .store .get_room_max_token ()
226
236
227
- room_ids = await self .store . get_rooms_for_user ( user_id )
237
+ now_token = self ._event_sources . get_current_token ( )
228
238
229
- changed = await self .get_device_changes_in_shared_rooms (
230
- user_id , room_ids , from_token
231
- )
239
+ # We need to work out all the different membership changes for the user
240
+ # and user they share a room with, to pass to
241
+ # `generate_sync_entry_for_device_list`. See its docstring for details
242
+ # on the data required.
232
243
233
- # Then work out if any users have since joined
234
- rooms_changed = self .store .get_rooms_that_changed (room_ids , from_token .room_key )
244
+ joined_room_ids = await self .store .get_rooms_for_user (user_id )
235
245
236
- member_events = await self .store .get_membership_changes_for_user (
237
- user_id , from_token .room_key , now_room_key
246
+ # Get the set of rooms that the user has joined/left
247
+ membership_changes = (
248
+ await self .store .get_current_state_delta_membership_changes_for_user (
249
+ user_id , from_key = from_token .room_key , to_key = now_token .room_key
250
+ )
238
251
)
239
- rooms_changed .update (event .room_id for event in member_events )
240
252
241
- stream_ordering = from_token .room_key .stream
253
+ # Check for newly joined or left rooms. We need to make sure that we add
254
+ # to newly joined in the case membership goes from join -> leave -> join
255
+ # again.
256
+ newly_joined_rooms : Set [str ] = set ()
257
+ newly_left_rooms : Set [str ] = set ()
258
+ for change in membership_changes :
259
+ # We check for changes in "joinedness", i.e. if the membership has
260
+ # changed to or from JOIN.
261
+ if change .membership == Membership .JOIN :
262
+ if change .prev_membership != Membership .JOIN :
263
+ newly_joined_rooms .add (change .room_id )
264
+ newly_left_rooms .discard (change .room_id )
265
+ elif change .prev_membership == Membership .JOIN :
266
+ newly_joined_rooms .discard (change .room_id )
267
+ newly_left_rooms .add (change .room_id )
268
+
269
+ # We now work out if any other users have since joined or left the rooms
270
+ # the user is currently in. First we filter out rooms that we know
271
+ # haven't changed recently.
272
+ rooms_changed = self .store .get_rooms_that_changed (
273
+ joined_room_ids , from_token .room_key
274
+ )
242
275
243
- possibly_changed = set (changed )
244
- possibly_left = set ()
276
+ # List of membership changes per room
277
+ room_to_deltas : Dict [str , List [StateDelta ]] = {}
278
+ # The set of event IDs of membership events (so we can fetch their
279
+ # associated membership).
280
+ memberships_to_fetch : Set [str ] = set ()
245
281
for room_id in rooms_changed :
246
- # Check if the forward extremities have changed. If not then we know
247
- # the current state won't have changed, and so we can skip this room.
248
- try :
249
- if not await self .store .have_room_forward_extremities_changed_since (
250
- room_id , stream_ordering
251
- ):
252
- continue
253
- except errors .StoreError :
254
- pass
255
-
256
- current_state_ids = await self ._state_storage .get_current_state_ids (
257
- room_id , await_full_state = False
282
+ # TODO: Only pull out membership events?
283
+ state_changes = await self .store .get_current_state_deltas_for_room (
284
+ room_id , from_token = from_token .room_key , to_token = now_token .room_key
258
285
)
259
-
260
- # The user may have left the room
261
- # TODO: Check if they actually did or if we were just invited.
262
- if room_id not in room_ids :
263
- for etype , state_key in current_state_ids .keys ():
264
- if etype != EventTypes .Member :
265
- continue
266
- possibly_left .add (state_key )
267
- continue
268
-
269
- # Fetch the current state at the time.
270
- try :
271
- event_ids = await self .store .get_forward_extremities_for_room_at_stream_ordering (
272
- room_id , stream_ordering = stream_ordering
273
- )
274
- except errors .StoreError :
275
- # we have purged the stream_ordering index since the stream
276
- # ordering: treat it the same as a new room
277
- event_ids = []
278
-
279
- # special-case for an empty prev state: include all members
280
- # in the changed list
281
- if not event_ids :
282
- log_kv (
283
- {"event" : "encountered empty previous state" , "room_id" : room_id }
284
- )
285
- for etype , state_key in current_state_ids .keys ():
286
- if etype != EventTypes .Member :
287
- continue
288
- possibly_changed .add (state_key )
289
- continue
290
-
291
- current_member_id = current_state_ids .get ((EventTypes .Member , user_id ))
292
- if not current_member_id :
293
- continue
294
-
295
- # mapping from event_id -> state_dict
296
- prev_state_ids = await self ._state_storage .get_state_ids_for_events (
297
- event_ids ,
298
- await_full_state = False ,
299
- )
300
-
301
- # Check if we've joined the room? If so we just blindly add all the users to
302
- # the "possibly changed" users.
303
- for state_dict in prev_state_ids .values ():
304
- member_event = state_dict .get ((EventTypes .Member , user_id ), None )
305
- if not member_event or member_event != current_member_id :
306
- for etype , state_key in current_state_ids .keys ():
307
- if etype != EventTypes .Member :
308
- continue
309
- possibly_changed .add (state_key )
310
- break
311
-
312
- # If there has been any change in membership, include them in the
313
- # possibly changed list. We'll check if they are joined below,
314
- # and we're not toooo worried about spuriously adding users.
315
- for key , event_id in current_state_ids .items ():
316
- etype , state_key = key
317
- if etype != EventTypes .Member :
286
+ for delta in state_changes :
287
+ if delta .event_type != EventTypes .Member :
318
288
continue
319
289
320
- # check if this member has changed since any of the extremities
321
- # at the stream_ordering, and add them to the list if so.
322
- for state_dict in prev_state_ids .values ():
323
- prev_event_id = state_dict .get (key , None )
324
- if not prev_event_id or prev_event_id != event_id :
325
- if state_key != user_id :
326
- possibly_changed .add (state_key )
327
- break
328
-
329
- if possibly_changed or possibly_left :
330
- possibly_joined = possibly_changed
331
- possibly_left = possibly_changed | possibly_left
332
-
333
- # Double check if we still share rooms with the given user.
334
- users_rooms = await self .store .get_rooms_for_users (possibly_left )
335
- for changed_user_id , entries in users_rooms .items ():
336
- if any (rid in room_ids for rid in entries ):
337
- possibly_left .discard (changed_user_id )
338
- else :
339
- possibly_joined .discard (changed_user_id )
290
+ room_to_deltas .setdefault (room_id , []).append (delta )
291
+ if delta .event_id :
292
+ memberships_to_fetch .add (delta .event_id )
293
+ if delta .prev_event_id :
294
+ memberships_to_fetch .add (delta .prev_event_id )
340
295
341
- else :
342
- possibly_joined = set ()
343
- possibly_left = set ()
296
+ # Fetch all the memberships for the membership events
297
+ event_id_to_memberships = await self .store .get_membership_from_event_ids (
298
+ memberships_to_fetch
299
+ )
300
+
301
+ joined_invited_knocked = (
302
+ Membership .JOIN ,
303
+ Membership .INVITE ,
304
+ Membership .KNOCK ,
305
+ )
344
306
345
- device_list_updates = DeviceListUpdates (
346
- changed = possibly_joined ,
347
- left = possibly_left ,
307
+ # We now want to find any user that have newly joined/invited/knocked,
308
+ # or newly left, similarly to above.
309
+ newly_joined_or_invited_or_knocked_users : Set [str ] = set ()
310
+ newly_left_users : Set [str ] = set ()
311
+ for _ , deltas in room_to_deltas .items ():
312
+ for delta in deltas :
313
+ # Get the prev/new memberships for the delta
314
+ new_membership = None
315
+ prev_membership = None
316
+ if delta .event_id :
317
+ m = event_id_to_memberships .get (delta .event_id )
318
+ if m is not None :
319
+ new_membership = m .membership
320
+ if delta .prev_event_id :
321
+ m = event_id_to_memberships .get (delta .prev_event_id )
322
+ if m is not None :
323
+ prev_membership = m .membership
324
+
325
+ # Check if a user has newly joined/invited/knocked, or left.
326
+ if new_membership in joined_invited_knocked :
327
+ if prev_membership not in joined_invited_knocked :
328
+ newly_joined_or_invited_or_knocked_users .add (delta .state_key )
329
+ newly_left_users .discard (delta .state_key )
330
+ elif prev_membership in joined_invited_knocked :
331
+ newly_joined_or_invited_or_knocked_users .discard (delta .state_key )
332
+ newly_left_users .add (delta .state_key )
333
+
334
+ # Now we actually calculate the device list entry with the information
335
+ # calculated above.
336
+ device_list_updates = await self .generate_sync_entry_for_device_list (
337
+ user_id = user_id ,
338
+ since_token = from_token ,
339
+ now_token = now_token ,
340
+ joined_room_ids = joined_room_ids ,
341
+ newly_joined_rooms = newly_joined_rooms ,
342
+ newly_joined_or_invited_or_knocked_users = newly_joined_or_invited_or_knocked_users ,
343
+ newly_left_rooms = newly_left_rooms ,
344
+ newly_left_users = newly_left_users ,
348
345
)
349
346
350
347
log_kv (
@@ -356,6 +353,88 @@ async def get_user_ids_changed(
356
353
357
354
return device_list_updates
358
355
356
+ @measure_func ("_generate_sync_entry_for_device_list" )
357
+ async def generate_sync_entry_for_device_list (
358
+ self ,
359
+ user_id : str ,
360
+ since_token : StreamToken ,
361
+ now_token : StreamToken ,
362
+ joined_room_ids : AbstractSet [str ],
363
+ newly_joined_rooms : AbstractSet [str ],
364
+ newly_joined_or_invited_or_knocked_users : AbstractSet [str ],
365
+ newly_left_rooms : AbstractSet [str ],
366
+ newly_left_users : AbstractSet [str ],
367
+ ) -> DeviceListUpdates :
368
+ """Generate the DeviceListUpdates section of sync
369
+
370
+ Args:
371
+ sync_result_builder
372
+ newly_joined_rooms: Set of rooms user has joined since previous sync
373
+ newly_joined_or_invited_or_knocked_users: Set of users that have joined,
374
+ been invited to a room or are knocking on a room since
375
+ previous sync.
376
+ newly_left_rooms: Set of rooms user has left since previous sync
377
+ newly_left_users: Set of users that have left a room we're in since
378
+ previous sync
379
+ """
380
+ # Take a copy since these fields will be mutated later.
381
+ newly_joined_or_invited_or_knocked_users = set (
382
+ newly_joined_or_invited_or_knocked_users
383
+ )
384
+ newly_left_users = set (newly_left_users )
385
+
386
+ # We want to figure out what user IDs the client should refetch
387
+ # device keys for, and which users we aren't going to track changes
388
+ # for anymore.
389
+ #
390
+ # For the first step we check:
391
+ # a. if any users we share a room with have updated their devices,
392
+ # and
393
+ # b. we also check if we've joined any new rooms, or if a user has
394
+ # joined a room we're in.
395
+ #
396
+ # For the second step we just find any users we no longer share a
397
+ # room with by looking at all users that have left a room plus users
398
+ # that were in a room we've left.
399
+
400
+ users_that_have_changed = set ()
401
+
402
+ # Step 1a, check for changes in devices of users we share a room
403
+ # with
404
+ users_that_have_changed = await self .get_device_changes_in_shared_rooms (
405
+ user_id ,
406
+ joined_room_ids ,
407
+ from_token = since_token ,
408
+ now_token = now_token ,
409
+ )
410
+
411
+ # Step 1b, check for newly joined rooms
412
+ for room_id in newly_joined_rooms :
413
+ joined_users = await self .store .get_users_in_room (room_id )
414
+ newly_joined_or_invited_or_knocked_users .update (joined_users )
415
+
416
+ # TODO: Check that these users are actually new, i.e. either they
417
+ # weren't in the previous sync *or* they left and rejoined.
418
+ users_that_have_changed .update (newly_joined_or_invited_or_knocked_users )
419
+
420
+ user_signatures_changed = await self .store .get_users_whose_signatures_changed (
421
+ user_id , since_token .device_list_key
422
+ )
423
+ users_that_have_changed .update (user_signatures_changed )
424
+
425
+ # Now find users that we no longer track
426
+ for room_id in newly_left_rooms :
427
+ left_users = await self .store .get_users_in_room (room_id )
428
+ newly_left_users .update (left_users )
429
+
430
+ # Remove any users that we still share a room with.
431
+ left_users_rooms = await self .store .get_rooms_for_users (newly_left_users )
432
+ for user_id , entries in left_users_rooms .items ():
433
+ if any (rid in joined_room_ids for rid in entries ):
434
+ newly_left_users .discard (user_id )
435
+
436
+ return DeviceListUpdates (changed = users_that_have_changed , left = newly_left_users )
437
+
359
438
async def on_federation_query_user_devices (self , user_id : str ) -> JsonDict :
360
439
if not self .hs .is_mine (UserID .from_string (user_id )):
361
440
raise SynapseError (400 , "User is not hosted on this homeserver" )
0 commit comments