本文整理汇总了Python中synapse.types.RoomAlias类的典型用法代码示例。如果您正苦于以下问题:Python RoomAlias类的具体用法?Python RoomAlias怎么用?Python RoomAlias使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了RoomAlias类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: on_DELETE
def on_DELETE(self, request, room_alias):
dir_handler = self.handlers.directory_handler
try:
service = yield self.auth.get_appservice_by_req(request)
room_alias = RoomAlias.from_string(room_alias)
yield dir_handler.delete_appservice_association(
service, room_alias
)
logger.info(
"Application service at %s deleted alias %s",
service.url,
room_alias.to_string()
)
defer.returnValue((200, {}))
except AuthError:
# fallback to default user behaviour if they aren't an AS
pass
requester = yield self.auth.get_user_by_req(request)
user = requester.user
room_alias = RoomAlias.from_string(room_alias)
yield dir_handler.delete_association(
requester, user.to_string(), room_alias
)
logger.info(
"User %s deleted alias %s",
user.to_string(),
room_alias.to_string()
)
defer.returnValue((200, {}))
开发者ID:mebjas,项目名称:synapse,代码行数:35,代码来源:directory.py
示例2: on_DELETE
def on_DELETE(self, request, room_alias):
dir_handler = self.handlers.directory_handler
try:
service = yield self.auth.get_appservice_by_req(request)
room_alias = RoomAlias.from_string(room_alias)
yield dir_handler.delete_appservice_association(service, room_alias)
logger.info("Application service at %s deleted alias %s", service.url, room_alias.to_string())
defer.returnValue((200, {}))
except AuthError:
# fallback to default user behaviour if they aren't an AS
pass
user, _, _ = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(user)
if not is_admin:
raise AuthError(403, "You need to be a server admin")
room_alias = RoomAlias.from_string(room_alias)
yield dir_handler.delete_association(user.to_string(), room_alias)
logger.info("User %s deleted alias %s", user.to_string(), room_alias.to_string())
defer.returnValue((200, {}))
开发者ID:roblabla,项目名称:synapse,代码行数:25,代码来源:directory.py
示例3: setUp
def setUp(self):
self.mock_federation = Mock(spec=[
"make_query",
])
self.query_handlers = {}
def register_query_handler(query_type, handler):
self.query_handlers[query_type] = handler
self.mock_federation.register_query_handler = register_query_handler
hs = yield setup_test_homeserver(
http_client=None,
resource_for_federation=Mock(),
replication_layer=self.mock_federation,
)
hs.handlers = DirectoryHandlers(hs)
self.handler = hs.get_handlers().directory_handler
self.store = hs.get_datastore()
self.my_room = RoomAlias.from_string("#my-room:test")
self.your_room = RoomAlias.from_string("#your-room:test")
self.remote_room = RoomAlias.from_string("#another:remote")
开发者ID:0-T-0,项目名称:synapse,代码行数:25,代码来源:test_directory.py
示例4: on_PUT
def on_PUT(self, request, room_alias):
room_alias = RoomAlias.from_string(room_alias)
content = parse_json_object_from_request(request)
if "room_id" not in content:
raise SynapseError(400, 'Missing params: ["room_id"]',
errcode=Codes.BAD_JSON)
logger.debug("Got content: %s", content)
logger.debug("Got room name: %s", room_alias.to_string())
room_id = content["room_id"]
servers = content["servers"] if "servers" in content else None
logger.debug("Got room_id: %s", room_id)
logger.debug("Got servers: %s", servers)
# TODO(erikj): Check types.
room = yield self.store.get_room(room_id)
if room is None:
raise SynapseError(400, "Room does not exist")
requester = yield self.auth.get_user_by_req(request)
yield self.handlers.directory_handler.create_association(
requester, room_alias, room_id, servers
)
defer.returnValue((200, {}))
开发者ID:DoubleMalt,项目名称:synapse,代码行数:30,代码来源:directory.py
示例5: on_GET
def on_GET(self, request, room_alias):
room_alias = RoomAlias.from_string(room_alias)
dir_handler = self.handlers.directory_handler
res = yield dir_handler.get_association(room_alias)
defer.returnValue((200, res))
开发者ID:mebjas,项目名称:synapse,代码行数:7,代码来源:directory.py
示例6: setUp
def setUp(self):
hs = yield setup_test_homeserver(self.addCleanup)
self.store = hs.get_datastore()
self.room = RoomID.from_string("!abcde:test")
self.alias = RoomAlias.from_string("#my-room:test")
开发者ID:DoubleMalt,项目名称:synapse,代码行数:7,代码来源:test_directory.py
示例7: read_config
def read_config(self, config):
self.enable_registration = bool(
strtobool(str(config["enable_registration"]))
)
if "disable_registration" in config:
self.enable_registration = not bool(
strtobool(str(config["disable_registration"]))
)
self.registrations_require_3pid = config.get("registrations_require_3pid", [])
self.allowed_local_3pids = config.get("allowed_local_3pids", [])
self.registration_shared_secret = config.get("registration_shared_secret")
self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
self.trusted_third_party_id_servers = config["trusted_third_party_id_servers"]
self.default_identity_server = config.get("default_identity_server")
self.allow_guest_access = config.get("allow_guest_access", False)
self.invite_3pid_guest = (
self.allow_guest_access and config.get("invite_3pid_guest", False)
)
self.auto_join_rooms = config.get("auto_join_rooms", [])
for room_alias in self.auto_join_rooms:
if not RoomAlias.is_valid(room_alias):
raise ConfigError('Invalid auto_join_rooms entry %s' % (room_alias,))
self.autocreate_auto_join_rooms = config.get("autocreate_auto_join_rooms", True)
开发者ID:DoubleMalt,项目名称:synapse,代码行数:27,代码来源:registration.py
示例8: on_PUT
def on_PUT(self, request, room_alias):
content = parse_json_object_from_request(request)
if "room_id" not in content:
raise SynapseError(400, "Missing room_id key",
errcode=Codes.BAD_JSON)
logger.debug("Got content: %s", content)
room_alias = RoomAlias.from_string(room_alias)
logger.debug("Got room name: %s", room_alias.to_string())
room_id = content["room_id"]
servers = content["servers"] if "servers" in content else None
logger.debug("Got room_id: %s", room_id)
logger.debug("Got servers: %s", servers)
# TODO(erikj): Check types.
room = yield self.store.get_room(room_id)
if room is None:
raise SynapseError(400, "Room does not exist")
dir_handler = self.handlers.directory_handler
try:
# try to auth as a user
requester = yield self.auth.get_user_by_req(request)
try:
user_id = requester.user.to_string()
yield dir_handler.create_association(
user_id, room_alias, room_id, servers
)
yield dir_handler.send_room_alias_update_event(
requester,
user_id,
room_id
)
except SynapseError as e:
raise e
except Exception:
logger.exception("Failed to create association")
raise
except AuthError:
# try to auth as an application service
service = yield self.auth.get_appservice_by_req(request)
yield dir_handler.create_appservice_association(
service, room_alias, room_id, servers
)
logger.info(
"Application service at %s created alias %s pointing to %s",
service.url,
room_alias.to_string(),
room_id
)
defer.returnValue((200, {}))
开发者ID:rubo77,项目名称:synapse,代码行数:58,代码来源:directory.py
示例9: on_POST
def on_POST(self, request, room_identifier, txn_id=None):
requester = yield self.auth.get_user_by_req(
request,
allow_guest=True,
)
try:
content = parse_json_object_from_request(request)
except Exception:
# Turns out we used to ignore the body entirely, and some clients
# cheekily send invalid bodies.
content = {}
if RoomID.is_valid(room_identifier):
room_id = room_identifier
try:
remote_room_hosts = [
x.decode('ascii') for x in request.args[b"server_name"]
]
except Exception:
remote_room_hosts = None
elif RoomAlias.is_valid(room_identifier):
handler = self.room_member_handler
room_alias = RoomAlias.from_string(room_identifier)
room_id, remote_room_hosts = yield handler.lookup_room_alias(room_alias)
room_id = room_id.to_string()
else:
raise SynapseError(400, "%s was not legal room ID or room alias" % (
room_identifier,
))
yield self.room_member_handler.update_membership(
requester=requester,
target=requester.user,
room_id=room_id,
action="join",
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
content=content,
third_party_signed=content.get("third_party_signed", None),
)
defer.returnValue((200, {"room_id": room_id}))
开发者ID:DoubleMalt,项目名称:synapse,代码行数:43,代码来源:room.py
示例10: test_auto_create_auto_join_rooms_when_support_user_exists
def test_auto_create_auto_join_rooms_when_support_user_exists(self):
room_alias_str = "#room:test"
self.hs.config.auto_join_rooms = [room_alias_str]
self.store.is_support_user = Mock(return_value=True)
res = self.get_success(self.handler.register(localpart='support'))
rooms = self.get_success(self.store.get_rooms_for_user(res[0]))
self.assertEqual(len(rooms), 0)
directory_handler = self.hs.get_handlers().directory_handler
room_alias = RoomAlias.from_string(room_alias_str)
self.get_failure(directory_handler.get_association(room_alias), SynapseError)
开发者ID:matrix-org,项目名称:synapse,代码行数:11,代码来源:test_register.py
示例11: test_auto_create_auto_join_rooms
def test_auto_create_auto_join_rooms(self):
room_alias_str = "#room:test"
self.hs.config.auto_join_rooms = [room_alias_str]
res = self.get_success(self.handler.register(localpart='jeff'))
rooms = self.get_success(self.store.get_rooms_for_user(res[0]))
directory_handler = self.hs.get_handlers().directory_handler
room_alias = RoomAlias.from_string(room_alias_str)
room_id = self.get_success(directory_handler.get_association(room_alias))
self.assertTrue(room_id['room_id'] in rooms)
self.assertEqual(len(rooms), 1)
开发者ID:matrix-org,项目名称:synapse,代码行数:11,代码来源:test_register.py
示例12: _auto_join_rooms
def _auto_join_rooms(self, user_id):
"""Automatically joins users to auto join rooms - creating the room in the first place
if the user is the first to be created.
Args:
user_id(str): The user to join
"""
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = create_requester(user_id)
# try to create the room if we're the first real user on the server. Note
# that an auto-generated support user is not a real user and will never be
# the user to create the room
should_auto_create_rooms = False
is_support = yield self.store.is_support_user(user_id)
# There is an edge case where the first user is the support user, then
# the room is never created, though this seems unlikely and
# recoverable from given the support user being involved in the first
# place.
if self.hs.config.autocreate_auto_join_rooms and not is_support:
count = yield self.store.count_all_users()
should_auto_create_rooms = count == 1
for r in self.hs.config.auto_join_rooms:
try:
if should_auto_create_rooms:
room_alias = RoomAlias.from_string(r)
if self.hs.hostname != room_alias.domain:
logger.warning(
'Cannot create room alias %s, '
'it does not match server domain',
r,
)
else:
# create room expects the localpart of the room alias
room_alias_localpart = room_alias.localpart
# getting the RoomCreationHandler during init gives a dependency
# loop
yield self.hs.get_room_creation_handler().create_room(
fake_requester,
config={
"preset": "public_chat",
"room_alias_name": room_alias_localpart
},
ratelimit=False,
)
else:
yield self._join_user_to_room(fake_requester, r)
except ConsentNotGivenError as e:
# Technically not necessary to pull out this error though
# moving away from bare excepts is a good thing to do.
logger.error("Failed to join new user to %r: %r", r, e)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
开发者ID:matrix-org,项目名称:synapse,代码行数:54,代码来源:register.py
示例13: _join_user_to_room
def _join_user_to_room(self, requester, room_identifier):
room_id = None
room_member_handler = self.hs.get_room_member_handler()
if RoomID.is_valid(room_identifier):
room_id = room_identifier
elif RoomAlias.is_valid(room_identifier):
room_alias = RoomAlias.from_string(room_identifier)
room_id, remote_room_hosts = (
yield room_member_handler.lookup_room_alias(room_alias)
)
room_id = room_id.to_string()
else:
raise SynapseError(400, "%s was not legal room ID or room alias" % (
room_identifier,
))
yield room_member_handler.update_membership(
requester=requester,
target=requester.user,
room_id=room_id,
remote_room_hosts=remote_room_hosts,
action="join",
)
开发者ID:rubo77,项目名称:synapse,代码行数:23,代码来源:register.py
示例14: setUp
def setUp(self):
hs = yield setup_test_homeserver()
# We can't test RoomStore on its own without the DirectoryStore, for
# management of the 'room_aliases' table
self.store = hs.get_datastore()
self.room = RoomID.from_string("!abcde:test")
self.alias = RoomAlias.from_string("#a-room-name:test")
self.u_creator = UserID.from_string("@creator:test")
yield self.store.store_room(self.room.to_string(),
room_creator_user_id=self.u_creator.to_string(),
is_public=True
)
开发者ID:heavenlyhash,项目名称:synapse,代码行数:15,代码来源:test_room.py
示例15: on_POST
def on_POST(self, request, room_identifier, txn_id=None):
requester = yield self.auth.get_user_by_req(
request,
allow_guest=True,
)
# the identifier could be a room alias or a room id. Try one then the
# other if it fails to parse, without swallowing other valid
# SynapseErrors.
identifier = None
is_room_alias = False
try:
identifier = RoomAlias.from_string(room_identifier)
is_room_alias = True
except SynapseError:
identifier = RoomID.from_string(room_identifier)
# TODO: Support for specifying the home server to join with?
if is_room_alias:
handler = self.handlers.room_member_handler
ret_dict = yield handler.join_room_alias(
requester.user,
identifier,
)
defer.returnValue((200, ret_dict))
else: # room id
msg_handler = self.handlers.message_handler
content = {"membership": Membership.JOIN}
if requester.is_guest:
content["kind"] = "guest"
yield msg_handler.create_and_send_event(
{
"type": EventTypes.Member,
"content": content,
"room_id": identifier.to_string(),
"sender": requester.user.to_string(),
"state_key": requester.user.to_string(),
},
token_id=requester.access_token_id,
txn_id=txn_id,
is_guest=requester.is_guest,
)
defer.returnValue((200, {"room_id": identifier.to_string()}))
开发者ID:Vutsuak16,项目名称:synapse,代码行数:46,代码来源:room.py
示例16: on_directory_query
def on_directory_query(self, args):
room_alias = RoomAlias.from_string(args["room_alias"])
if not self.hs.is_mine(room_alias):
raise SynapseError(
400, "Room Alias is not hosted on this Home Server"
)
result = yield self.get_association_from_room_alias(
room_alias
)
if result is not None:
defer.returnValue({
"room_id": result.room_id,
"servers": result.servers,
})
else:
raise SynapseError(
404,
"Room alias %r not found" % (room_alias.to_string(),),
Codes.NOT_FOUND
)
开发者ID:0-T-0,项目名称:synapse,代码行数:22,代码来源:directory.py
示例17: create_room
def create_room(self, user_id, room_id, config):
""" Creates a new room.
Args:
user_id (str): The ID of the user creating the new room.
room_id (str): The proposed ID for the new room. Can be None, in
which case one will be created for you.
config (dict) : A dict of configuration options.
Returns:
The new room ID.
Raises:
SynapseError if the room ID was taken, couldn't be stored, or
something went horribly wrong.
"""
self.ratelimit(user_id)
if "room_alias_name" in config:
for wchar in string.whitespace:
if wchar in config["room_alias_name"]:
raise SynapseError(400, "Invalid characters in room alias")
room_alias = RoomAlias.create(
config["room_alias_name"],
self.hs.hostname,
)
mapping = yield self.store.get_association_from_room_alias(
room_alias
)
if mapping:
raise SynapseError(400, "Room alias already taken")
else:
room_alias = None
invite_list = config.get("invite", [])
for i in invite_list:
try:
UserID.from_string(i)
except:
raise SynapseError(400, "Invalid user_id: %s" % (i,))
is_public = config.get("visibility", None) == "public"
if room_id:
# Ensure room_id is the correct type
room_id_obj = RoomID.from_string(room_id)
if not self.hs.is_mine(room_id_obj):
raise SynapseError(400, "Room id must be local")
yield self.store.store_room(
room_id=room_id,
room_creator_user_id=user_id,
is_public=is_public
)
else:
# autogen room IDs and try to create it. We may clash, so just
# try a few times till one goes through, giving up eventually.
attempts = 0
room_id = None
while attempts < 5:
try:
random_string = stringutils.random_string(18)
gen_room_id = RoomID.create(
random_string,
self.hs.hostname,
)
yield self.store.store_room(
room_id=gen_room_id.to_string(),
room_creator_user_id=user_id,
is_public=is_public
)
room_id = gen_room_id.to_string()
break
except StoreError:
attempts += 1
if not room_id:
raise StoreError(500, "Couldn't generate a room ID.")
if room_alias:
directory_handler = self.hs.get_handlers().directory_handler
yield directory_handler.create_association(
user_id=user_id,
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
)
preset_config = config.get(
"preset",
RoomCreationPreset.PUBLIC_CHAT
if is_public
else RoomCreationPreset.PRIVATE_CHAT
)
raw_initial_state = config.get("initial_state", [])
initial_state = OrderedDict()
for val in raw_initial_state:
initial_state[(val["type"], val.get("state_key", ""))] = val["content"]
#.........这里部分代码省略.........
开发者ID:roblabla,项目名称:synapse,代码行数:101,代码来源:room.py
示例18: register
#.........这里部分代码省略.........
try:
int(localpart)
raise RegistrationError(
400,
"Numeric user IDs are reserved for guest users."
)
except ValueError:
pass
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
token = None
if generate_token:
token = self.macaroon_gen.generate_access_token(user_id)
yield self.store.register(
user_id=user_id,
token=token,
password_hash=password_hash,
was_guest=was_guest,
make_guest=make_guest,
create_profile_with_localpart=(
# If the user was a guest then they already have a profile
None if was_guest else user.localpart
),
admin=admin,
)
if self.hs.config.user_directory_search_all_users:
profile = yield self.store.get_profileinfo(localpart)
yield self.user_directory_handler.handle_local_profile_change(
user_id, profile
)
else:
# autogen a sequential user ID
attempts = 0
token = None
user = None
while not user:
localpart = yield self._generate_user_id(attempts > 0)
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
yield self.check_user_id_not_appservice_exclusive(user_id)
if generate_token:
token = self.macaroon_gen.generate_access_token(user_id)
try:
yield self.store.register(
user_id=user_id,
token=token,
password_hash=password_hash,
make_guest=make_guest,
create_profile_with_localpart=user.localpart,
)
except SynapseError:
# if user id is taken, just generate another
user = None
user_id = None
token = None
attempts += 1
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = create_requester(user_id)
# try to create the room if we're the first user on the server
should_auto_create_rooms = False
if self.hs.config.autocreate_auto_join_rooms:
count = yield self.store.count_all_users()
should_auto_create_rooms = count == 1
for r in self.hs.config.auto_join_rooms:
try:
if should_auto_create_rooms:
room_alias = RoomAlias.from_string(r)
if self.hs.hostname != room_alias.domain:
logger.warning(
'Cannot create room alias %s, '
'it does not match server domain',
r,
)
else:
# create room expects the localpart of the room alias
room_alias_localpart = room_alias.localpart
# getting the RoomCreationHandler during init gives a dependency
# loop
yield self.hs.get_room_creation_handler().create_room(
fake_requester,
config={
"preset": "public_chat",
"room_alias_name": room_alias_localpart
},
ratelimit=False,
)
else:
yield self._join_user_to_room(fake_requester, r)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
defer.returnValue((user_id, token))
开发者ID:gergelypolonkai,项目名称:synapse,代码行数:101,代码来源:register.py
示例19: handle_new_client_event
def handle_new_client_event(self, event, context, extra_users=[]):
# We now need to go and hit out to wherever we need to hit out to.
self.auth.check(event, auth_events=context.current_state)
yield self.maybe_kick_guest_users(event, context.current_state.values())
if event.type == EventTypes.CanonicalAlias:
# Check the alias is acually valid (at this time at least)
room_alias_str = event.content.get("alias", None)
if room_alias_str:
room_alias = RoomAlias.from_string(room_alias_str)
directory_handler = self.hs.get_handlers().directory_handler
mapping = yield directory_handler.get_association(room_alias)
if mapping["room_id"] != event.room_id:
raise SynapseError(
400,
"Room alias %s does not point to the room" % (
room_alias_str,
)
)
federation_handler = self.hs.get_handlers().federation_handler
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.INVITE:
event.unsigned["invite_room_state"] = [
{
"type": e.type,
"state_key": e.state_key,
"content": e.content,
"sender": e.sender,
}
for k, e in context.current_state.items()
if e.type in (
EventTypes.JoinRules,
EventTypes.CanonicalAlias,
EventTypes.RoomAvatar,
EventTypes.Name,
)
]
invitee = UserID.from_string(event.state_key)
if not self.hs.is_mine(invitee):
# TODO: Can we add signature from remote server in a nicer
# way? If we have been invited by a remote server, we need
# to get them to sign the event.
returned_invite = yield federation_handler.send_invite(
invitee.domain,
event,
)
event.unsigned.pop("room_state", None)
# TODO: Make sure the signatures actually are correct.
event.signatures.update(
returned_invite.signatures
)
if event.type == EventTypes.Redaction:
if self.auth.check_redaction(event, auth_events=context.current_state):
original_event = yield self.store.get_event(
event.redacts,
check_redacted=False,
get_prev_content=False,
allow_rejected=False,
allow_none=False
)
if event.user_id != original_event.user_id:
raise AuthError(
403,
"You don't have permission to redact events"
)
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
event, context, self
)
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)
destinations = set()
for k, s in context.current_state.items():
try:
if k[0] == EventTypes.Member:
if s.content["membership"] == Membership.JOIN:
destinations.add(
UserID.from_string(s.state_key).domain
)
except SynapseError:
logger.warn(
"Failed to get destination from event %s", s.event_id
)
with PreserveLoggingContext():
# Don't block waiting on waking up all the listeners.
#.........这里部分代码省略.........
开发者ID:Vutsuak16,项目名称:synapse,代码行数:101,代码来源:_base.py
示例20: create_room
def create_room(self, user_id, room_id, config):
""" Creates a new room.
Args:
user_id (str): The ID of the user creating the new room.
room_id (str): The proposed ID for the new room. Can be None, in
which case one will be created for you.
config (dict) : A dict of configuration options.
Returns:
The new room ID.
Raises:
SynapseError if the room ID was taken, couldn't be stored, or
something went horribly wrong.
"""
self.ratelimit(user_id)
if "room_alias_name" in config:
room_alias = RoomAlias.create_local(
config["room_alias_name"],
self.hs
)
mapping = yield self.store.get_association_from_room_alias(
room_alias
)
if mapping:
raise SynapseError(400, "Room alias already taken")
else:
room_alias = None
invite_list = config.get("invite", [])
for i in invite_list:
try:
self.hs.parse_userid(i)
except:
raise SynapseError(400, "Invalid user_id: %s" % (i,))
is_public = config.get("visibility", None) == "public"
if room_id:
# Ensure room_id is the correct type
room_id_obj = RoomID.from_string(room_id, self.hs)
if not room_id_obj.is_mine:
raise SynapseError(400, "Room id must be local")
yield self.store.store_room(
room_id=room_id,
room_creator_user_id=user_id,
is_public=is_public
)
else:
# autogen room IDs and try to create it. We may clash, so just
# try a few times till one goes through, giving up eventually.
attempts = 0
room_id = None
while attempts < 5:
try:
random_string = stringutils.random_string(18)
gen_room_id = RoomID.create_local(random_string, self.hs)
yield self.store.store_room(
room_id=gen_room_id.to_string(),
room_creator_user_id=user_id,
is_public=is_public
)
room_id = gen_room_id.to_string()
break
except StoreError:
attempts += 1
if not room_id:
raise StoreError(500, "Couldn't generate a room ID.")
if room_alias:
directory_handler = self.hs.get_handlers().directory_handler
yield directory_handler.create_association(
user_id=user_id,
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
)
user = self.hs.parse_userid(user_id)
creation_events = self._create_events_for_new_room(
user, room_id, is_public=is_public
)
room_member_handler = self.hs.get_handlers().room_member_handler
@defer.inlineCallbacks
def handle_event(event):
snapshot = yield self.store.snapshot_room(event)
logger.debug("Event: %s", event)
if event.type == RoomMemberEvent.TYPE:
yield room_member_handler.change_membership(
event,
do_auth=True
)
else:
yield self._on_new_room_event(
#.........这里部分代码省略.........
开发者ID:tjardick,项目名称:synapse,代码行数:101,代码来源:room.py
注:本文中的synapse.types.RoomAlias类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论