Newer
Older
# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from twisted.internet import defer
from synapse.api.constants import EventTypes, LimitBlockingTypes, ServerNoticeMsgType
from synapse.api.errors import ResourceLimitError
from synapse.server_notices.resource_limits_server_notices import (
ResourceLimitsServerNotices,
)
from tests import unittest
class TestResourceLimitsServerNotices(unittest.HomeserverTestCase):
def make_homeserver(self, reactor, clock):
hs_config = self.default_config("test")
Amber Brown
committed
hs_config["server_notices"] = {
"system_mxid_localpart": "server",
"system_mxid_display_name": "test display name",
"system_mxid_avatar_url": None,
"room_name": "Server Notices",
}
hs = self.setup_test_homeserver(config=hs_config)
return hs
def prepare(self, reactor, clock, hs):
self.server_notices_sender = self.hs.get_server_notices_sender()
# relying on [1] is far from ideal, but the only case where
# ResourceLimitsServerNotices class needs to be isolated is this test,
# general code should never have a reason to do so ...
self._rlsn = self.server_notices_sender._server_notices[1]
if not isinstance(self._rlsn, ResourceLimitsServerNotices):
raise Exception("Failed to find reference to ResourceLimitsServerNotices")
self._rlsn._store.user_last_seen_monthly_active = Mock(
return_value=defer.succeed(1000)
)
self._send_notice = self._rlsn._server_notices_manager.send_notice
self._rlsn._server_notices_manager.send_notice = Mock()
self._rlsn._state.get_current_state = Mock(return_value=defer.succeed(None))
self._rlsn._store.get_events = Mock(return_value=defer.succeed({}))
self._send_notice = self._rlsn._server_notices_manager.send_notice
self.user_id = "@user_id:test"
# self.server_notices_mxid = "@server:test"
# self.server_notices_mxid_display_name = None
# self.server_notices_mxid_avatar_url = None
# self.server_notices_room_name = "Server Notices"
self._rlsn._server_notices_manager.get_notice_room_for_user = Mock(
returnValue=""
)
self._rlsn._store.add_tag_to_room = Mock()
self._rlsn._store.get_tags_for_room = Mock(return_value={})
self.hs.config.admin_contact = "mailto:user@test.com"
def test_maybe_send_server_notice_to_user_flag_off(self):
"""Tests cases where the flags indicate nothing to do"""
# test hs disabled case
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
self._send_notice.assert_not_called()
# Test when mau limiting disabled
Amber Brown
committed
self.hs.config.limit_usage_by_mau = False
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
self._send_notice.assert_not_called()
def test_maybe_send_server_notice_to_user_remove_blocked_notice(self):
"""Test when user has blocked notice, but should have it removed"""
self._rlsn._auth.check_auth_blocking = Mock()
type=EventTypes.Message, content={"msgtype": ServerNoticeMsgType}
)
self._rlsn._store.get_events = Mock(
return_value=defer.succeed({"123": mock_event})
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
# Would be better to check the content, but once == remove blocking event
self._send_notice.assert_called_once()
def test_maybe_send_server_notice_to_user_remove_blocked_notice_noop(self):
"""
Test when user has blocked notice, but notice ought to be there (NOOP)
"""
self._rlsn._auth.check_auth_blocking = Mock(
)
type=EventTypes.Message, content={"msgtype": ServerNoticeMsgType}
)
self._rlsn._store.get_events = Mock(
return_value=defer.succeed({"123": mock_event})
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
self._send_notice.assert_not_called()
def test_maybe_send_server_notice_to_user_add_blocked_notice(self):
"""
Test when user does not have blocked notice, but should have one
"""
self._rlsn._auth.check_auth_blocking = Mock(
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
# Would be better to check contents, but 2 calls == set blocking event
self.assertEqual(self._send_notice.call_count, 2)
def test_maybe_send_server_notice_to_user_add_blocked_notice_noop(self):
"""
Test when user does not have blocked notice, nor should they (NOOP)
"""
self._rlsn._auth.check_auth_blocking = Mock()
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
self._send_notice.assert_not_called()
def test_maybe_send_server_notice_to_user_not_in_mau_cohort(self):
"""
Test when user is not part of the MAU cohort - this should not ever
happen - but ...
"""
self._rlsn._auth.check_auth_blocking = Mock()
self._rlsn._store.user_last_seen_monthly_active = Mock(
return_value=defer.succeed(None)
)
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
self._send_notice.assert_not_called()
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def test_maybe_send_server_notice_when_alerting_suppressed_room_unblocked(self):
"""
Test that when server is over MAU limit and alerting is suppressed, then
an alert message is not sent into the room
"""
self.hs.config.mau_limit_alerting = False
self._rlsn._auth.check_auth_blocking = Mock(
side_effect=ResourceLimitError(
403, "foo", limit_type=LimitBlockingTypes.MONTHLY_ACTIVE_USER
)
)
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
self.assertTrue(self._send_notice.call_count == 0)
def test_check_hs_disabled_unaffected_by_mau_alert_suppression(self):
"""
Test that when a server is disabled, that MAU limit alerting is ignored.
"""
self.hs.config.mau_limit_alerting = False
self._rlsn._auth.check_auth_blocking = Mock(
side_effect=ResourceLimitError(
403, "foo", limit_type=LimitBlockingTypes.HS_DISABLED
)
)
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
# Would be better to check contents, but 2 calls == set blocking event
self.assertEqual(self._send_notice.call_count, 2)
def test_maybe_send_server_notice_when_alerting_suppressed_room_blocked(self):
"""
When the room is already in a blocked state, test that when alerting
is suppressed that the room is returned to an unblocked state.
"""
self.hs.config.mau_limit_alerting = False
self._rlsn._auth.check_auth_blocking = Mock(
side_effect=ResourceLimitError(
403, "foo", limit_type=LimitBlockingTypes.MONTHLY_ACTIVE_USER
)
)
self._rlsn._server_notices_manager.__is_room_currently_blocked = Mock(
return_value=defer.succeed((True, []))
)
mock_event = Mock(
type=EventTypes.Message, content={"msgtype": ServerNoticeMsgType}
)
self._rlsn._store.get_events = Mock(
return_value=defer.succeed({"123": mock_event})
)
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
self._send_notice.assert_called_once()
class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.store = self.hs.get_datastore()
self.server_notices_sender = self.hs.get_server_notices_sender()
self.server_notices_manager = self.hs.get_server_notices_manager()
self.event_source = self.hs.get_event_sources()
# relying on [1] is far from ideal, but the only case where
# ResourceLimitsServerNotices class needs to be isolated is this test,
# general code should never have a reason to do so ...
self._rlsn = self.server_notices_sender._server_notices[1]
if not isinstance(self._rlsn, ResourceLimitsServerNotices):
raise Exception("Failed to find reference to ResourceLimitsServerNotices")
self.hs.config.limit_usage_by_mau = True
self.hs.config.hs_disabled = False
self.hs.config.max_mau_value = 5
self.hs.config.server_notices_mxid = "@server:test"
self.hs.config.server_notices_mxid_display_name = None
self.hs.config.server_notices_mxid_avatar_url = None
self.hs.config.server_notices_room_name = "Test Server Notice Room"
self.user_id = "@user_id:test"
self.hs.config.admin_contact = "mailto:user@test.com"
def test_server_notice_only_sent_once(self):
self.store.get_monthly_active_count = Mock(return_value=1000)
self.store.user_last_seen_monthly_active = Mock(return_value=1000)
# Call the function multiple times to ensure we only send the notice once
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
self.get_success(self._rlsn.maybe_send_server_notice_to_user(self.user_id))
# Now lets get the last load of messages in the service notice room and
# check that there is only one server notice
room_id = self.get_success(
self.server_notices_manager.get_notice_room_for_user(self.user_id)
token = self.get_success(self.event_source.get_current_token())
events, _ = self.get_success(
self.store.get_recent_events_for_room(
room_id, limit=100, end_token=token.room_key
)
)
count = 0
for event in events:
if event.type != EventTypes.Message:
continue
if event.content.get("msgtype") != ServerNoticeMsgType:
continue
count += 1
self.assertEqual(count, 1)