Skip to content
Snippets Groups Projects
test_sync.py 49.1 KiB
Newer Older
  • Learn to ignore specific revisions
  •         recent_events = self.get_success(
                self.store.get_prev_events_for_room(private_room_id)
            )
            self.assertIn(private_call_event.event_id, recent_events)
    
            private_sync_result: SyncResult = self.get_success(
                self.sync_handler.wait_for_sync_for_user(
    
                    generate_sync_config(user2, use_state_after=self.use_state_after),
    
                )
            )
            priv_event_ids = []
            for event in private_sync_result.joined[0].timeline.events:
                priv_event_ids.append(event.event_id)
    
            self.assertIn(private_call_event.event_id, priv_event_ids)
    
    
        def test_push_rules_with_bad_account_data(self) -> None:
            """Some old accounts have managed to set a `m.push_rules` account data,
            which we should ignore in /sync response.
            """
    
            user = self.register_user("alice", "password")
    
            # Insert the bad account data.
            self.get_success(
                self.store.add_account_data_for_user(user, AccountDataTypes.PUSH_RULES, {})
            )
    
            sync_result: SyncResult = self.get_success(
                self.sync_handler.wait_for_sync_for_user(
    
                    generate_sync_config(user, use_state_after=self.use_state_after),
    
                    sync_version=SyncVersion.SYNC_V2,
                    request_key=generate_request_key(),
    
                )
            )
    
            for account_dict in sync_result.account_data:
                if account_dict["type"] == AccountDataTypes.PUSH_RULES:
                    # We should have lots of push rules here, rather than the bad
                    # empty data.
                    self.assertNotEqual(account_dict["content"], {})
                    return
    
            self.fail("No push rules found")
    
    
        def test_wait_for_future_sync_token(self) -> None:
            """Test that if we receive a token that is ahead of our current token,
            we'll wait until the stream position advances.
    
            This can happen if replication streams start lagging, and the client's
            previous sync request was serviced by a worker ahead of ours.
            """
            user = self.register_user("alice", "password")
    
            # We simulate a lagging stream by getting a stream ID from the ID gen
            # and then waiting to mark it as "persisted".
            presence_id_gen = self.store.get_presence_stream_id_gen()
            ctx_mgr = presence_id_gen.get_next()
            stream_id = self.get_success(ctx_mgr.__aenter__())
    
            # Create the new token based on the stream ID above.
            current_token = self.hs.get_event_sources().get_current_token()
            since_token = current_token.copy_and_advance(StreamKeyType.PRESENCE, stream_id)
    
            sync_d = defer.ensureDeferred(
                self.sync_handler.wait_for_sync_for_user(
                    create_requester(user),
    
                    generate_sync_config(user, use_state_after=self.use_state_after),
    
                    sync_version=SyncVersion.SYNC_V2,
                    request_key=generate_request_key(),
                    since_token=since_token,
                    timeout=0,
                )
            )
    
            # This should block waiting for the presence stream to update
            self.pump()
            self.assertFalse(sync_d.called)
    
            # Marking the stream ID as persisted should unblock the request.
            self.get_success(ctx_mgr.__aexit__(None, None, None))
    
            self.get_success(sync_d, by=1.0)
    
    
        @parameterized.expand(
            [(key,) for key in StreamKeyType.__members__.values()],
            name_func=lambda func, _, param: f"{func.__name__}_{param.args[0].name}",
        )
        def test_wait_for_invalid_future_sync_token(
            self, stream_key: StreamKeyType
        ) -> None:
    
            """Like the previous test, except we give a token that has a stream
            position ahead of what is in the DB, i.e. its invalid and we shouldn't
            wait for the stream to advance (as it may never do so).
    
            This can happen due to older versions of Synapse giving out stream
            positions without persisting them in the DB, and so on restart the
            stream would get reset back to an older position.
            """
            user = self.register_user("alice", "password")
    
    
            # Create a token and advance one of the streams.
    
            current_token = self.hs.get_event_sources().get_current_token()
    
            token_value = current_token.get_field(stream_key)
    
            # How we advance the streams depends on the type.
            if isinstance(token_value, int):
                since_token = current_token.copy_and_advance(stream_key, token_value + 1)
            elif isinstance(token_value, MultiWriterStreamToken):
                since_token = current_token.copy_and_advance(
                    stream_key, MultiWriterStreamToken(stream=token_value.stream + 1)
                )
            elif isinstance(token_value, RoomStreamToken):
                since_token = current_token.copy_and_advance(
                    stream_key, RoomStreamToken(stream=token_value.stream + 1)
                )
            else:
                raise Exception("Unreachable")
    
    
            sync_d = defer.ensureDeferred(
                self.sync_handler.wait_for_sync_for_user(
                    create_requester(user),
    
                    generate_sync_config(user, use_state_after=self.use_state_after),
    
                    sync_version=SyncVersion.SYNC_V2,
                    request_key=generate_request_key(),
                    since_token=since_token,
                    timeout=0,
                )
            )
    
            # We should return without waiting for the presence stream to advance.
            self.get_success(sync_d)
    
    
        user_id: str,
        device_id: Optional[str] = "device_id",
        filter_collection: Optional[FilterCollection] = None,
    
        use_state_after: bool = False,
    
        """Generate a sync config (with a unique request key).
    
        Args:
            user_id: user who is syncing.
            device_id: device that is syncing. Defaults to "device_id".
            filter_collection: filter to apply. Defaults to the default filter (ie,
    
                return everything, with a default limit)
            use_state_after: whether the `use_state_after` flag was set.
    
        """
        if filter_collection is None:
            filter_collection = Filtering(Mock()).DEFAULT_FILTER_COLLECTION
    
    
            filter_collection=filter_collection,
    
            use_state_after=use_state_after,
    
    
    
    class SyncStateAfterTestCase(tests.unittest.HomeserverTestCase):
        """Tests Sync Handler state behavior when using `use_state_after."""
    
        servlets = [
            admin.register_servlets,
            knock.register_servlets,
            login.register_servlets,
            room.register_servlets,
        ]
    
        def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
            self.sync_handler = self.hs.get_sync_handler()
            self.store = self.hs.get_datastores().main
    
            # AuthBlocking reads from the hs' config on initialization. We need to
            # modify its config instead of the hs'
            self.auth_blocking = self.hs.get_auth_blocking()
    
        def test_initial_sync_multiple_deltas(self) -> None:
            """Test that if multiple state deltas have happened during processing of
            a full state sync we return the correct state"""
    
            user = self.register_user("user", "password")
            tok = self.login("user", "password")
    
            # Create a room as the user and set some custom state.
            joined_room = self.helper.create_room_as(user, tok=tok)
    
            first_state = self.helper.send_state(
                joined_room, event_type="m.test_event", body={"num": 1}, tok=tok
            )
    
            # Take a snapshot of the stream token, to simulate doing an initial sync
            # at this point.
            end_stream_token = self.hs.get_event_sources().get_current_token()
    
            # Send some state *after* the stream token
            self.helper.send_state(
                joined_room, event_type="m.test_event", body={"num": 2}, tok=tok
            )
    
            # Calculating the full state will return the first state, and not the
            # second.
            state = self.get_success(
                self.sync_handler._compute_state_delta_for_full_sync(
                    room_id=joined_room,
                    sync_config=generate_sync_config(user, use_state_after=True),
                    batch=TimelineBatch(
                        prev_batch=end_stream_token, events=[], limited=True
                    ),
                    end_token=end_stream_token,
                    members_to_fetch=None,
                    timeline_state={},
                    joined=True,
                )
            )
            self.assertEqual(state[("m.test_event", "")], first_state["event_id"])
    
        def test_incremental_sync_multiple_deltas(self) -> None:
            """Test that if multiple state deltas have happened since an incremental
            state sync we return the correct state"""
    
            user = self.register_user("user", "password")
            tok = self.login("user", "password")
    
            # Create a room as the user and set some custom state.
            joined_room = self.helper.create_room_as(user, tok=tok)
    
            # Take a snapshot of the stream token, to simulate doing an incremental sync
            # from this point.
            since_token = self.hs.get_event_sources().get_current_token()
    
            self.helper.send_state(
                joined_room, event_type="m.test_event", body={"num": 1}, tok=tok
            )
    
            # Send some state *after* the stream token
            second_state = self.helper.send_state(
                joined_room, event_type="m.test_event", body={"num": 2}, tok=tok
            )
    
            end_stream_token = self.hs.get_event_sources().get_current_token()
    
            # Calculating the incrementals state will return the second state, and not the
            # first.
            state = self.get_success(
                self.sync_handler._compute_state_delta_for_incremental_sync(
                    room_id=joined_room,
                    sync_config=generate_sync_config(user, use_state_after=True),
                    batch=TimelineBatch(
                        prev_batch=end_stream_token, events=[], limited=True
                    ),
                    since_token=since_token,
                    end_token=end_stream_token,
                    members_to_fetch=None,
                    timeline_state={},
                )
            )
            self.assertEqual(state[("m.test_event", "")], second_state["event_id"])
    
    
        def test_incremental_sync_lazy_loaded_no_timeline(self) -> None:
            """Test that lazy-loading with an empty timeline doesn't return the full
            state.
    
            There was a bug where an empty state filter would cause the DB to return
            the full state, rather than an empty set.
            """
            user = self.register_user("user", "password")
            tok = self.login("user", "password")
    
            # Create a room as the user and set some custom state.
            joined_room = self.helper.create_room_as(user, tok=tok)
    
            since_token = self.hs.get_event_sources().get_current_token()
            end_stream_token = self.hs.get_event_sources().get_current_token()
    
            state = self.get_success(
                self.sync_handler._compute_state_delta_for_incremental_sync(
                    room_id=joined_room,
                    sync_config=generate_sync_config(user, use_state_after=True),
                    batch=TimelineBatch(
                        prev_batch=end_stream_token, events=[], limited=True
                    ),
                    since_token=since_token,
                    end_token=end_stream_token,
                    members_to_fetch=set(),
                    timeline_state={},
                )
            )
    
            self.assertEqual(state, {})