本文整理汇总了Python中synapse.server.HomeServer类的典型用法代码示例。如果您正苦于以下问题:Python HomeServer类的具体用法?Python HomeServer怎么用?Python HomeServer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了HomeServer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp(self):
self.mock_resource = MockHttpResource(prefix=PATH_PREFIX)
self.mock_handler = Mock(spec=[
"get_displayname",
"set_displayname",
"get_avatar_url",
"set_avatar_url",
])
hs = HomeServer("test",
db_pool=None,
http_client=None,
resource_for_client=self.mock_resource,
federation=Mock(),
replication_layer=Mock(),
datastore=None,
)
def _get_user_by_token(token=None):
return hs.parse_userid(myid)
hs.get_auth().get_user_by_token = _get_user_by_token
hs.get_handlers().profile_handler = self.mock_handler
hs.register_servlets()
开发者ID:martindale,项目名称:synapse,代码行数:26,代码来源:test_profile.py
示例2: setUp
def setUp(self):
self.mock_resource = MockHttpResource(prefix=PATH_PREFIX)
self.mock_handler = Mock(spec=[
"get_displayname",
"set_displayname",
"get_avatar_url",
"set_avatar_url",
])
self.mock_config = NonCallableMock()
self.mock_config.signing_key = [MockKey()]
hs = HomeServer("test",
db_pool=None,
http_client=None,
resource_for_client=self.mock_resource,
federation=Mock(),
replication_layer=Mock(),
datastore=None,
config=self.mock_config,
)
def _get_user_by_req(request=None):
return hs.parse_userid(myid)
hs.get_auth().get_user_by_req = _get_user_by_req
hs.get_handlers().profile_handler = self.mock_handler
hs.register_servlets()
开发者ID:alisheikh,项目名称:synapse,代码行数:30,代码来源:test_profile.py
示例3: setUp
def setUp(self):
self.hostname = "test"
hs = HomeServer(
self.hostname,
db_pool=None,
datastore=NonCallableMock(spec_set=[
"persist_event",
"store_room",
"get_room",
]),
resource_for_federation=NonCallableMock(),
http_client=NonCallableMock(spec_set=[]),
notifier=NonCallableMock(spec_set=["on_new_room_event"]),
handlers=NonCallableMock(spec_set=[
"room_member_handler",
"federation_handler",
]),
)
self.datastore = hs.get_datastore()
self.handlers = hs.get_handlers()
self.notifier = hs.get_notifier()
self.hs = hs
self.handlers.federation_handler = FederationHandler(self.hs)
开发者ID:martindale,项目名称:synapse,代码行数:25,代码来源:test_federation.py
示例4: setUp
def setUp(self):
self.mock_resource = MockHttpResource()
self.mock_http_client = Mock(spec=[
"get_json",
"put_json",
])
self.mock_persistence = Mock(spec=[
"get_current_state_for_context",
"get_pdu",
"persist_event",
"update_min_depth_for_context",
"prep_send_transaction",
"delivered_txn",
"get_received_txn_response",
"set_received_txn_response",
])
self.mock_persistence.get_received_txn_response.return_value = (
defer.succeed(None)
)
self.mock_config = Mock()
self.mock_config.signing_key = [MockKey()]
self.clock = MockClock()
hs = HomeServer("test",
resource_for_federation=self.mock_resource,
http_client=self.mock_http_client,
db_pool=None,
datastore=self.mock_persistence,
clock=self.clock,
config=self.mock_config,
keyring=Mock(),
)
self.federation = initialize_http_replication(hs)
self.distributor = hs.get_distributor()
开发者ID:gitter-badger,项目名称:synapse,代码行数:33,代码来源:test_federation.py
示例5: setUp
def setUp(self):
self.mock_resource = MockHttpResource(prefix=PATH_PREFIX)
self.auth_user_id = self.user_id
state_handler = Mock(spec=["handle_new_event"])
state_handler.handle_new_event.return_value = True
persistence_service = Mock(spec=["get_latest_pdus_in_context"])
persistence_service.get_latest_pdus_in_context.return_value = []
hs = HomeServer(
"red",
db_pool=None,
http_client=None,
datastore=MemoryDataStore(),
replication_layer=Mock(),
state_handler=state_handler,
persistence_service=persistence_service,
ratelimiter=NonCallableMock(spec_set=[
"send_message",
]),
config=NonCallableMock(),
)
self.ratelimiter = hs.get_ratelimiter()
self.ratelimiter.send_message.return_value = (True, 0)
hs.get_handlers().federation_handler = Mock()
def _get_user_by_token(token=None):
return hs.parse_userid(self.auth_user_id)
hs.get_auth().get_user_by_token = _get_user_by_token
synapse.rest.room.register_servlets(hs, self.mock_resource)
self.room_id = yield self.create_room_as(self.user_id)
开发者ID:martindale,项目名称:synapse,代码行数:35,代码来源:test_rooms.py
示例6: setUp
def setUp(self):
self.mock_federation = Mock(spec=[
"make_query",
])
self.query_handlers = {}
def register_query_handler(query_type, handler):
self.query_handlers[query_type] = handler
self.mock_federation.register_query_handler = register_query_handler
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
hs = HomeServer("test",
db_pool=db_pool,
http_client=None,
handlers=None,
resource_for_federation=Mock(),
replication_layer=self.mock_federation,
)
hs.handlers = ProfileHandlers(hs)
self.store = hs.get_datastore()
self.frank = hs.parse_userid("@1234ABCD:test")
self.bob = hs.parse_userid("@4567:test")
self.alice = hs.parse_userid("@alice:remote")
yield self.store.create_profile(self.frank.localpart)
self.handler = hs.get_handlers().profile_handler
# TODO(paul): Icky signal declarings.. booo
hs.get_distributor().declare("changed_presencelike_data")
开发者ID:gitter-badger,项目名称:synapse,代码行数:34,代码来源:test_profile.py
示例7: setUp
def setUp(self):
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
hs = HomeServer("test",
db_pool=db_pool,
)
self.store = DirectoryStore(hs)
self.room = hs.parse_roomid("!abcde:test")
self.alias = hs.parse_roomalias("#my-room:test")
开发者ID:uroborus,项目名称:synapse,代码行数:12,代码来源:test_directory.py
示例8: setUp
def setUp(self):
self.mock_resource = MockHttpResource(prefix=PATH_PREFIX)
persistence_service = Mock(spec=["get_latest_pdus_in_context"])
persistence_service.get_latest_pdus_in_context.return_value = []
self.mock_config = NonCallableMock()
self.mock_config.signing_key = [MockKey()]
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
hs = HomeServer(
"test",
db_pool=db_pool,
http_client=None,
replication_layer=Mock(),
persistence_service=persistence_service,
clock=Mock(spec=[
"call_later",
"cancel_call_later",
"time_msec",
"time"
]),
ratelimiter=NonCallableMock(spec_set=[
"send_message",
]),
config=self.mock_config,
)
self.ratelimiter = hs.get_ratelimiter()
self.ratelimiter.send_message.return_value = (True, 0)
hs.config.enable_registration_captcha = False
hs.get_handlers().federation_handler = Mock()
hs.get_clock().time_msec.return_value = 1000000
hs.get_clock().time.return_value = 1000
synapse.rest.register.register_servlets(hs, self.mock_resource)
synapse.rest.events.register_servlets(hs, self.mock_resource)
synapse.rest.room.register_servlets(hs, self.mock_resource)
# register an account
self.user_id = "sid1"
response = yield self.register(self.user_id)
self.token = response["access_token"]
self.user_id = response["user_id"]
# register a 2nd account
self.other_user = "other1"
response = yield self.register(self.other_user)
self.other_token = response["access_token"]
self.other_user = response["user_id"]
开发者ID:esaul,项目名称:synapse,代码行数:53,代码来源:test_events.py
示例9: setUp
def setUp(self):
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
self.mock_config = Mock()
self.mock_config.signing_key = [MockKey()]
hs = HomeServer(
"test",
db_pool=db_pool,
config=self.mock_config,
resource_for_federation=Mock(),
http_client=None,
)
# We can't test the RoomMemberStore on its own without the other event
# storage logic
self.store = hs.get_datastore()
self.event_builder_factory = hs.get_event_builder_factory()
self.handlers = hs.get_handlers()
self.message_handler = self.handlers.message_handler
self.u_alice = hs.parse_userid("@alice:test")
self.u_bob = hs.parse_userid("@bob:test")
# User elsewhere on another host
self.u_charlie = hs.parse_userid("@charlie:elsewhere")
self.room = hs.parse_roomid("!abc123:test")
开发者ID:alisheikh,项目名称:synapse,代码行数:28,代码来源:test_roommember.py
示例10: setup_test_homeserver
def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
"""Setup a homeserver suitable for running tests against. Keyword arguments
are passed to the Homeserver constructor. If no datastore is supplied a
datastore backed by an in-memory sqlite db will be given to the HS.
"""
if config is None:
config = Mock()
config.signing_key = [MockKey()]
config.event_cache_size = 1
config.disable_registration = False
config.macaroon_secret_key = "not even a little secret"
config.server_name = "server.under.test"
if "clock" not in kargs:
kargs["clock"] = MockClock()
if datastore is None:
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
hs = HomeServer(
name, db_pool=db_pool, config=config,
version_string="Synapse/tests",
database_engine=create_engine("sqlite3"),
**kargs
)
else:
hs = HomeServer(
name, db_pool=None, datastore=datastore, config=config,
version_string="Synapse/tests",
database_engine=create_engine("sqlite3"),
**kargs
)
# bcrypt is far too slow to be doing in unit tests
def swap_out_hash_for_testing(old_build_handlers):
def build_handlers():
handlers = old_build_handlers()
auth_handler = handlers.auth_handler
auth_handler.hash = lambda p: hashlib.md5(p).hexdigest()
auth_handler.validate_hash = lambda p, h: hashlib.md5(p).hexdigest() == h
return handlers
return build_handlers
hs.build_handlers = swap_out_hash_for_testing(hs.build_handlers)
defer.returnValue(hs)
开发者ID:Xe,项目名称:synapse,代码行数:46,代码来源:utils.py
示例11: setUp
def setUp(self):
self.mock_federation = Mock(spec=[
"make_query",
])
self.query_handlers = {}
def register_query_handler(query_type, handler):
self.query_handlers[query_type] = handler
self.mock_federation.register_query_handler = register_query_handler
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
self.mock_config = Mock()
self.mock_config.signing_key = [MockKey()]
hs = HomeServer(
"test",
db_pool=db_pool,
http_client=None,
resource_for_federation=Mock(),
replication_layer=self.mock_federation,
config=self.mock_config,
)
hs.handlers = DirectoryHandlers(hs)
self.handler = hs.get_handlers().directory_handler
self.store = hs.get_datastore()
self.my_room = hs.parse_roomalias("#my-room:test")
self.your_room = hs.parse_roomalias("#your-room:test")
self.remote_room = hs.parse_roomalias("#another:remote")
开发者ID:alisheikh,项目名称:synapse,代码行数:34,代码来源:test_directory.py
示例12: setUp
def setUp(self):
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
self.mock_config = Mock()
self.mock_config.signing_key = [MockKey()]
hs = HomeServer(
"test",
db_pool=db_pool,
config=self.mock_config,
resource_for_federation=Mock(),
http_client=None,
)
self.store = hs.get_datastore()
self.event_builder_factory = hs.get_event_builder_factory()
self.handlers = hs.get_handlers()
self.message_handler = self.handlers.message_handler
self.u_alice = hs.parse_userid("@alice:test")
self.u_bob = hs.parse_userid("@bob:test")
self.room1 = hs.parse_roomid("!abc123:test")
self.room2 = hs.parse_roomid("!xyx987:test")
self.depth = 1
开发者ID:alisheikh,项目名称:synapse,代码行数:27,代码来源:test_stream.py
示例13: setUp
def setUp(self):
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
hs = HomeServer("test",
clock=MockClock(),
db_pool=db_pool,
handlers=None,
resource_for_federation=Mock(),
http_client=None,
)
hs.handlers = JustPresenceHandlers(hs)
self.store = hs.get_datastore()
# Mock the RoomMemberHandler
room_member_handler = Mock(spec=[])
hs.handlers.room_member_handler = room_member_handler
# Some local users to test with
self.u_apple = hs.parse_userid("@apple:test")
self.u_banana = hs.parse_userid("@banana:test")
self.u_clementine = hs.parse_userid("@clementine:test")
yield self.store.create_presence(self.u_apple.localpart)
yield self.store.set_presence_state(
self.u_apple.localpart, {"state": ONLINE, "status_msg": "Online"}
)
self.handler = hs.get_handlers().presence_handler
self.room_members = []
def get_rooms_for_user(user):
if user in self.room_members:
return defer.succeed(["a-room"])
else:
return defer.succeed([])
room_member_handler.get_rooms_for_user = get_rooms_for_user
def get_room_members(room_id):
if room_id == "a-room":
return defer.succeed(self.room_members)
else:
return defer.succeed([])
room_member_handler.get_room_members = get_room_members
def user_rooms_intersect(userlist):
room_member_ids = map(lambda u: u.to_string(), self.room_members)
shared = all(map(lambda i: i in room_member_ids, userlist))
return defer.succeed(shared)
self.store.user_rooms_intersect = user_rooms_intersect
self.mock_start = Mock()
self.mock_stop = Mock()
self.handler.start_polling_presence = self.mock_start
self.handler.stop_polling_presence = self.mock_stop
开发者ID:uroborus,项目名称:synapse,代码行数:59,代码来源:test_presence.py
示例14: setUp
def setUp(self):
self.mock_config = NonCallableMock()
self.mock_config.signing_key = [MockKey()]
self.state_handler = NonCallableMock(spec_set=[
"compute_event_context",
])
self.auth = NonCallableMock(spec_set=[
"check",
"check_host_in_room",
])
self.hostname = "test"
hs = HomeServer(
self.hostname,
db_pool=None,
datastore=NonCallableMock(spec_set=[
"persist_event",
"store_room",
"get_room",
"get_destination_retry_timings",
"set_destination_retry_timings",
]),
resource_for_federation=NonCallableMock(),
http_client=NonCallableMock(spec_set=[]),
notifier=NonCallableMock(spec_set=["on_new_room_event"]),
handlers=NonCallableMock(spec_set=[
"room_member_handler",
"federation_handler",
]),
config=self.mock_config,
auth=self.auth,
state_handler=self.state_handler,
keyring=Mock(),
)
self.datastore = hs.get_datastore()
self.handlers = hs.get_handlers()
self.notifier = hs.get_notifier()
self.hs = hs
self.handlers.federation_handler = FederationHandler(self.hs)
开发者ID:alisheikh,项目名称:synapse,代码行数:44,代码来源:test_federation.py
示例15: setUp
def setUp(self):
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
hs = HomeServer("test",
db_pool=db_pool,
)
# Room events need the full datastore, for persist_event() and
# get_room_state()
self.store = hs.get_datastore()
self.event_factory = hs.get_event_factory();
self.room = hs.parse_roomid("!abcde:test")
yield self.store.store_room(self.room.to_string(),
room_creator_user_id="@creator:text",
is_public=True
)
开发者ID:alisheikh,项目名称:synapse,代码行数:19,代码来源:test_room.py
示例16: setUp
def setUp(self):
self.mock_resource = MockHttpResource(prefix=PATH_PREFIX)
self.auth_user_id = self.user_id
self.mock_config = NonCallableMock()
self.mock_config.signing_key = [MockKey()]
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
hs = HomeServer(
"red",
db_pool=db_pool,
http_client=None,
replication_layer=Mock(),
ratelimiter=NonCallableMock(spec_set=[
"send_message",
]),
config=self.mock_config,
)
self.ratelimiter = hs.get_ratelimiter()
self.ratelimiter.send_message.return_value = (True, 0)
hs.get_handlers().federation_handler = Mock()
def _get_user_by_token(token=None):
return {
"user": hs.parse_userid(self.auth_user_id),
"admin": False,
"device_id": None,
}
hs.get_auth().get_user_by_token = _get_user_by_token
def _insert_client_ip(*args, **kwargs):
return defer.succeed(None)
hs.get_datastore().insert_client_ip = _insert_client_ip
synapse.rest.room.register_servlets(hs, self.mock_resource)
# create the room
self.room_id = yield self.create_room_as(self.user_id)
self.path = "/rooms/%s/state/m.room.topic" % (self.room_id,)
开发者ID:alisheikh,项目名称:synapse,代码行数:43,代码来源:test_rooms.py
示例17: setUp
def setUp(self):
self.mock_resource = MockHttpResource(prefix=PATH_PREFIX)
self.mock_config = Mock()
self.mock_config.signing_key = [MockKey()]
hs = HomeServer("test",
db_pool=None,
datastore=Mock(spec=[
"has_presence_state",
"get_presence_state",
"allow_presence_visible",
"is_presence_visible",
"add_presence_list_pending",
"set_presence_list_accepted",
"del_presence_list",
"get_presence_list",
"insert_client_ip",
]),
http_client=None,
resource_for_client=self.mock_resource,
resource_for_federation=self.mock_resource,
config=self.mock_config,
)
hs.handlers = JustPresenceHandlers(hs)
self.datastore = hs.get_datastore()
def has_presence_state(user_localpart):
return defer.succeed(
user_localpart in ("apple", "banana",)
)
self.datastore.has_presence_state = has_presence_state
def _get_user_by_token(token=None):
return {
"user": hs.parse_userid(myid),
"admin": False,
"device_id": None,
}
room_member_handler = hs.handlers.room_member_handler = Mock(
spec=[
"get_rooms_for_user",
]
)
hs.get_auth().get_user_by_token = _get_user_by_token
hs.register_servlets()
self.u_apple = hs.parse_userid("@apple:test")
self.u_banana = hs.parse_userid("@banana:test")
开发者ID:alisheikh,项目名称:synapse,代码行数:52,代码来源:test_presence.py
示例18: setUp
def setUp(self):
self.mock_config = NonCallableMock()
self.mock_config.signing_key = [MockKey()]
self.hostname = "red"
hs = HomeServer(
self.hostname,
db_pool=None,
ratelimiter=NonCallableMock(spec_set=[
"send_message",
]),
datastore=NonCallableMock(spec_set=[
"persist_event",
"get_joined_hosts_for_room",
"get_room_member",
"get_room",
"store_room",
"snapshot_room",
]),
resource_for_federation=NonCallableMock(),
http_client=NonCallableMock(spec_set=[]),
notifier=NonCallableMock(spec_set=["on_new_room_event"]),
handlers=NonCallableMock(spec_set=[
"room_member_handler",
"profile_handler",
"federation_handler",
]),
auth=NonCallableMock(spec_set=["check"]),
state_handler=NonCallableMock(spec_set=["handle_new_event"]),
config=self.mock_config,
)
self.federation = NonCallableMock(spec_set=[
"handle_new_event",
"get_state_for_room",
])
self.datastore = hs.get_datastore()
self.handlers = hs.get_handlers()
self.notifier = hs.get_notifier()
self.state_handler = hs.get_state_handler()
self.distributor = hs.get_distributor()
self.hs = hs
self.handlers.federation_handler = self.federation
self.distributor.declare("collect_presencelike_data")
self.handlers.room_member_handler = RoomMemberHandler(self.hs)
self.handlers.profile_handler = ProfileHandler(self.hs)
self.room_member_handler = self.handlers.room_member_handler
self.snapshot = Mock()
self.datastore.snapshot_room.return_value = self.snapshot
self.ratelimiter = hs.get_ratelimiter()
self.ratelimiter.send_message.return_value = (True, 0)
开发者ID:gitter-badger,项目名称:synapse,代码行数:56,代码来源:test_room.py
示例19: setUp
def setUp(self):
self.mock_federation = Mock(spec=[
"make_query",
])
self.query_handlers = {}
def register_query_handler(query_type, handler):
self.query_handlers[query_type] = handler
self.mock_federation.register_query_handler = register_query_handler
hs = HomeServer("test",
datastore=Mock(spec=[
"get_association_from_room_alias",
"get_joined_hosts_for_room",
]),
http_client=None,
resource_for_federation=Mock(),
replication_layer=self.mock_federation,
)
hs.handlers = DirectoryHandlers(hs)
self.handler = hs.get_handlers().directory_handler
self.datastore = hs.get_datastore()
def hosts(room_id):
return defer.succeed([])
self.datastore.get_joined_hosts_for_room.side_effect = hosts
self.my_room = hs.parse_roomalias("#my-room:test")
self.remote_room = hs.parse_roomalias("#another:remote")
开发者ID:martindale,项目名称:synapse,代码行数:31,代码来源:test_directory.py
示例20: setUp
def setUp(self):
self.mock_http_client = Mock(spec=[])
self.mock_http_client.put_json = DeferredMockCallable()
self.mock_federation_resource = MockHttpResource()
db_pool = SQLiteMemoryDbPool()
yield db_pool.prepare()
self.mock_config = NonCallableMock()
self.mock_config.signing_key = [MockKey()]
hs = HomeServer("test",
clock=MockClock(),
db_pool=db_pool,
handlers=None,
resource_for_client=Mock(),
resource_for_federation=self.mock_federation_resource,
http_client=self.mock_http_client,
config=self.mock_config,
keyring=Mock(),
)
hs.handlers = JustPresenceHandlers(hs)
self.store = hs.get_datastore()
# Some local users to test with
self.u_apple = hs.parse_userid("@apple:test")
self.u_banana = hs.parse_userid("@banana:test")
yield self.store.create_presence(self.u_apple.localpart)
yield self.store.create_presence(self.u_banana.localpart)
# ID of a local user that does not exist
self.u_durian = hs.parse_userid("@durian:test")
# A remote user
self.u_cabbage = hs.parse_userid("@cabbage:elsewhere")
self.handler = hs.get_handlers().presence_handler
self.mock_start = Mock()
self.mock_stop = Mock()
self.handler.start_polling_presence = self.mock_start
self.handler.stop_polling_presence = self.mock_stop
开发者ID:winsontan520,项目名称:synapse,代码行数:45,代码来源:test_presence.py
注:本文中的synapse.server.HomeServer类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论