本文整理汇总了Python中telegram.Message类的典型用法代码示例。如果您正苦于以下问题:Python Message类的具体用法?Python Message怎么用?Python Message使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Message类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_webhook
def test_webhook(self):
print('Testing Webhook')
self._setup_updater('', messages=0)
d = self.updater.dispatcher
d.addTelegramMessageHandler(
self.telegramHandlerTest)
# Select random port for travis
port = randrange(1024, 49152)
self.updater.start_webhook('127.0.0.1', port,
url_path='TOKEN',
cert='./tests/test_updater.py',
key='./tests/test_updater.py')
sleep(0.5)
# SSL-Wrapping will fail, so we start the server without SSL
Thread(target=self.updater.httpd.serve_forever).start()
# Now, we send an update to the server via urlopen
message = Message(1, User(1, "Tester"), datetime.now(),
Chat(1, "group", title="Test Group"))
message.text = "Webhook Test"
update = Update(1)
update.message = message
try:
payload = bytes(update.to_json(), encoding='utf-8')
except TypeError:
payload = bytes(update.to_json())
header = {
'content-type': 'application/json',
'content-length': str(len(payload))
}
r = Request('http://127.0.0.1:%d/TOKEN' % port,
data=payload,
headers=header)
urlopen(r)
sleep(1)
self.assertEqual(self.received_message, 'Webhook Test')
print("Test other webhook server functionalities...")
request = Request('http://localhost:%d/webookhandler.py' % port)
response = urlopen(request)
self.assertEqual(b'', response.read())
self.assertEqual(200, response.code)
request.get_method = lambda: 'HEAD'
response = urlopen(request)
self.assertEqual(b'', response.read())
self.assertEqual(200, response.code)
# Test multiple shutdown() calls
self.updater.httpd.shutdown()
self.updater.httpd.shutdown()
self.assertTrue(True)
开发者ID:azigran,项目名称:python-telegram-bot,代码行数:60,代码来源:test_updater.py
示例2: test_conversation_handler
def test_conversation_handler(self, dp, bot, user1, user2):
handler = ConversationHandler(entry_points=self.entry_points, states=self.states,
fallbacks=self.fallbacks)
dp.add_handler(handler)
# User one, starts the state machine.
message = Message(0, user1, None, self.group, text='/start', bot=bot)
dp.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.THIRSTY
# The user is thirsty and wants to brew coffee.
message.text = '/brew'
dp.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.BREWING
# Lets see if an invalid command makes sure, no state is changed.
message.text = '/nothing'
dp.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.BREWING
# Lets see if the state machine still works by pouring coffee.
message.text = '/pourCoffee'
dp.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.DRINKING
# Let's now verify that for another user, who did not start yet,
# the state has not been changed.
message.from_user = user2
dp.process_update(Update(update_id=0, message=message))
with pytest.raises(KeyError):
self.current_state[user2.id]
开发者ID:technoekat,项目名称:prodeda1,代码行数:31,代码来源:test_conversationhandler.py
示例3: test_conversation_handler_fallback
def test_conversation_handler_fallback(self, dp, bot, user1, user2):
handler = ConversationHandler(entry_points=self.entry_points, states=self.states,
fallbacks=self.fallbacks)
dp.add_handler(handler)
# first check if fallback will not trigger start when not started
message = Message(0, user1, None, self.group, text='/eat', bot=bot)
dp.process_update(Update(update_id=0, message=message))
with pytest.raises(KeyError):
self.current_state[user1.id]
# User starts the state machine.
message.text = '/start'
dp.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.THIRSTY
# The user is thirsty and wants to brew coffee.
message.text = '/brew'
dp.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.BREWING
# Now a fallback command is issued
message.text = '/eat'
dp.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.THIRSTY
开发者ID:technoekat,项目名称:prodeda1,代码行数:25,代码来源:test_conversationhandler.py
示例4: test_CommandHandler_commandList
def test_CommandHandler_commandList(self):
self._setup_updater('', messages=0)
handler = CommandHandler(['foo', 'bar', 'spameggs'], self.telegramHandlerTest)
self.updater.dispatcher.add_handler(handler)
bot = self.updater.bot
user = User(0, 'TestUser')
queue = self.updater.start_polling(0.01)
message = Message(0, user, 0, None, text='/foo', bot=bot)
queue.put(Update(0, message=message))
sleep(.1)
self.assertEqual(self.received_message, '/foo')
message.text = '/bar'
queue.put(Update(1, message=message))
sleep(.1)
self.assertEqual(self.received_message, '/bar')
message.text = '/spameggs'
queue.put(Update(2, message=message))
sleep(.1)
self.assertEqual(self.received_message, '/spameggs')
self.reset()
message.text = '/not_in_list'
queue.put(Update(3, message=message))
sleep(.1)
self.assertTrue(self.received_message is None)
开发者ID:KenOokamiHoro,项目名称:python-telegram-bot,代码行数:28,代码来源:test_updater.py
示例5: test_conversation_timeout_keeps_extending
def test_conversation_timeout_keeps_extending(self, dp, bot, user1):
handler = ConversationHandler(entry_points=self.entry_points, states=self.states,
fallbacks=self.fallbacks, conversation_timeout=0.5)
dp.add_handler(handler)
# Start state machine, wait, do something, verify the timeout is extended.
# t=0 /start (timeout=.5)
# t=.25 /brew (timeout=.75)
# t=.5 original timeout
# t=.6 /pourCoffee (timeout=1.1)
# t=.75 second timeout
# t=1.1 actual timeout
message = Message(0, user1, None, self.group, text='/start', bot=bot)
dp.process_update(Update(update_id=0, message=message))
assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY
sleep(0.25) # t=.25
dp.job_queue.tick()
assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY
message.text = '/brew'
dp.process_update(Update(update_id=0, message=message))
assert handler.conversations.get((self.group.id, user1.id)) == self.BREWING
sleep(0.35) # t=.6
dp.job_queue.tick()
assert handler.conversations.get((self.group.id, user1.id)) == self.BREWING
message.text = '/pourCoffee'
dp.process_update(Update(update_id=0, message=message))
assert handler.conversations.get((self.group.id, user1.id)) == self.DRINKING
sleep(.4) # t=1
dp.job_queue.tick()
assert handler.conversations.get((self.group.id, user1.id)) == self.DRINKING
sleep(.1) # t=1.1
dp.job_queue.tick()
assert handler.conversations.get((self.group.id, user1.id)) is None
开发者ID:technoekat,项目名称:prodeda1,代码行数:33,代码来源:test_conversationhandler.py
示例6: test_parse_caption_entity
def test_parse_caption_entity(self):
caption = (b'\\U0001f469\\u200d\\U0001f469\\u200d\\U0001f467'
b'\\u200d\\U0001f467\\U0001f431http://google.com').decode('unicode-escape')
entity = MessageEntity(type=MessageEntity.URL, offset=13, length=17)
message = Message(1, self.from_user, self.date, self.chat, caption=caption,
caption_entities=[entity])
assert message.parse_caption_entity(entity) == 'http://google.com'
开发者ID:ferisystem,项目名称:python-telegram-bot,代码行数:7,代码来源:test_message.py
示例7: de_json
def de_json(data, bot):
"""
Args:
data (dict):
bot (telegram.Bot):
Returns:
telegram.Update:
"""
if not data:
return None
data = super(Update, Update).de_json(data, bot)
data['message'] = Message.de_json(data.get('message'), bot)
data['edited_message'] = Message.de_json(data.get('edited_message'), bot)
data['inline_query'] = InlineQuery.de_json(data.get('inline_query'), bot)
data['chosen_inline_result'] = ChosenInlineResult.de_json(
data.get('chosen_inline_result'), bot)
data['callback_query'] = CallbackQuery.de_json(data.get('callback_query'), bot)
data['shipping_query'] = ShippingQuery.de_json(data.get('shipping_query'), bot)
data['pre_checkout_query'] = PreCheckoutQuery.de_json(data.get('pre_checkout_query'), bot)
data['channel_post'] = Message.de_json(data.get('channel_post'), bot)
data['edited_channel_post'] = Message.de_json(data.get('edited_channel_post'), bot)
return Update(**data)
开发者ID:KenOokamiHoro,项目名称:python-telegram-bot,代码行数:26,代码来源:update.py
示例8: test_webhook_no_ssl
def test_webhook_no_ssl(self):
print("Testing Webhook without SSL")
bot = MockBot("", messages=0)
self.updater.bot = bot
d = self.updater.dispatcher
d.addTelegramMessageHandler(self.telegramHandlerTest)
# Select random port for travis
port = randrange(1024, 49152)
self.updater.start_webhook("127.0.0.1", port)
sleep(0.5)
# Now, we send an update to the server via urlopen
message = Message(1, User(1, "Tester 2"), datetime.now(), GroupChat(1, "Test Group 2"))
message.text = "Webhook Test 2"
update = Update(1)
update.message = message
try:
payload = bytes(update.to_json(), encoding="utf-8")
except TypeError:
payload = bytes(update.to_json())
header = {"content-type": "application/json", "content-length": str(len(payload))}
r = Request("http://127.0.0.1:%d/" % port, data=payload, headers=header)
urlopen(r)
sleep(1)
self.assertEqual(self.received_message, "Webhook Test 2")
开发者ID:gisho,项目名称:python-telegram-bot,代码行数:31,代码来源:test_updater.py
示例9: test_parse_entities_url_emoji
def test_parse_entities_url_emoji(self):
url = b'http://github.com/?unicode=\\u2713\\U0001f469'.decode('unicode-escape')
text = 'some url'
link_entity = MessageEntity(type=MessageEntity.URL, offset=0, length=8, url=url)
message = Message(1, self.from_user, self.date, self.chat,
text=text, entities=[link_entity])
assert message.parse_entities() == {link_entity: text}
assert next(iter(message.parse_entities())).url == url
开发者ID:ferisystem,项目名称:python-telegram-bot,代码行数:8,代码来源:test_message.py
示例10: test_parse_caption_entities
def test_parse_caption_entities(self):
text = (b'\\U0001f469\\u200d\\U0001f469\\u200d\\U0001f467'
b'\\u200d\\U0001f467\\U0001f431http://google.com').decode('unicode-escape')
entity = MessageEntity(type=MessageEntity.URL, offset=13, length=17)
entity_2 = MessageEntity(type=MessageEntity.BOLD, offset=13, length=1)
message = Message(1, self.from_user, self.date, self.chat,
caption=text, caption_entities=[entity_2, entity])
assert message.parse_caption_entities(MessageEntity.URL) == {entity: 'http://google.com'}
assert message.parse_caption_entities() == {entity: 'http://google.com', entity_2: 'h'}
开发者ID:ferisystem,项目名称:python-telegram-bot,代码行数:9,代码来源:test_message.py
示例11: mockUpdate
def mockUpdate(self, text):
message = Message(0, None, None, None)
message.text = text
update = Update(0)
if self.edited:
update.edited_message = message
else:
update.message = message
return update
开发者ID:Clifford-Beta,项目名称:python-telegram-bot,代码行数:11,代码来源:test_updater.py
示例12: mock_update
def mock_update(self, text):
message = Message(0, User(0, 'Testuser'), None, Chat(0, Chat.GROUP), bot=self)
message.text = text
update = Update(0)
if self.edited:
update.edited_message = message
else:
update.message = message
return update
开发者ID:KenOokamiHoro,项目名称:python-telegram-bot,代码行数:11,代码来源:test_updater.py
示例13: test_webhook
def test_webhook(self):
print("Testing Webhook")
bot = MockBot("", messages=0)
self.updater.bot = bot
d = self.updater.dispatcher
d.addTelegramMessageHandler(self.telegramHandlerTest)
# Select random port for travis
port = randrange(1024, 49152)
self.updater.start_webhook(
"127.0.0.1", port, url_path="TOKEN", cert="./tests/test_updater.py", key="./tests/test_updater.py"
)
sleep(0.5)
# SSL-Wrapping will fail, so we start the server without SSL
Thread(target=self.updater.httpd.serve_forever).start()
# Now, we send an update to the server via urlopen
message = Message(1, User(1, "Tester"), datetime.now(), GroupChat(1, "Test Group"))
message.text = "Webhook Test"
update = Update(1)
update.message = message
try:
payload = bytes(update.to_json(), encoding="utf-8")
except TypeError:
payload = bytes(update.to_json())
header = {"content-type": "application/json", "content-length": str(len(payload))}
r = Request("http://127.0.0.1:%d/TOKEN" % port, data=payload, headers=header)
urlopen(r)
sleep(1)
self.assertEqual(self.received_message, "Webhook Test")
print("Test other webhook server functionalities...")
request = Request("http://localhost:%d/webookhandler.py" % port)
response = urlopen(request)
self.assertEqual(b"", response.read())
self.assertEqual(200, response.code)
request.get_method = lambda: "HEAD"
response = urlopen(request)
self.assertEqual(b"", response.read())
self.assertEqual(200, response.code)
# Test multiple shutdown() calls
self.updater.httpd.shutdown()
self.updater.httpd.shutdown()
self.assertTrue(True)
开发者ID:gisho,项目名称:python-telegram-bot,代码行数:53,代码来源:test_updater.py
示例14: test_webhook
def test_webhook(self):
self._setup_updater('', messages=0)
d = self.updater.dispatcher
handler = MessageHandler([], self.telegramHandlerTest)
d.addHandler(handler)
ip = '127.0.0.1'
port = randrange(1024, 49152) # Select random port for travis
self.updater.start_webhook(ip,
port,
url_path='TOKEN',
cert='./tests/test_updater.py',
key='./tests/test_updater.py',
webhook_url=None)
sleep(0.5)
# SSL-Wrapping will fail, so we start the server without SSL
Thread(target=self.updater.httpd.serve_forever).start()
# Now, we send an update to the server via urlopen
message = Message(1,
User(1, "Tester"),
datetime.now(),
Chat(1, "group", title="Test Group"))
message.text = "Webhook Test"
update = Update(1)
update.message = message
self._send_webhook_msg(ip, port, update.to_json(), 'TOKEN')
sleep(1)
self.assertEqual(self.received_message, 'Webhook Test')
print("Test other webhook server functionalities...")
response = self._send_webhook_msg(ip, port, None, 'webookhandler.py')
self.assertEqual(b'', response.read())
self.assertEqual(200, response.code)
response = self._send_webhook_msg(
ip, port,
None, 'webookhandler.py',
get_method=lambda: 'HEAD')
self.assertEqual(b'', response.read())
self.assertEqual(200, response.code)
# Test multiple shutdown() calls
self.updater.httpd.shutdown()
self.updater.httpd.shutdown()
self.assertTrue(True)
开发者ID:Gurzo,项目名称:python-telegram-bot,代码行数:50,代码来源:test_updater.py
示例15: decorator
def decorator(self, *args, **kwargs):
"""
decorator
"""
url, data = func(self, *args, **kwargs)
if not data.get('chat_id'):
raise TelegramError('Invalid chat_id')
if kwargs.get('reply_to_message_id'):
reply_to_message_id = kwargs.get('reply_to_message_id')
data['reply_to_message_id'] = reply_to_message_id
if kwargs.get('reply_markup'):
reply_markup = kwargs.get('reply_markup')
if isinstance(reply_markup, ReplyMarkup):
data['reply_markup'] = reply_markup.to_json()
else:
data['reply_markup'] = reply_markup
result = request.post(url, data)
if result is True:
return result
return Message.de_json(result)
开发者ID:peczony,项目名称:python-telegram-bot,代码行数:26,代码来源:bot.py
示例16: decorator
def decorator(self, *args, **kwargs):
url, data = func(self, *args, **kwargs)
if kwargs.get('reply_to_message_id'):
data['reply_to_message_id'] = \
kwargs.get('reply_to_message_id')
if kwargs.get('disable_notification'):
data['disable_notification'] = \
kwargs.get('disable_notification')
if kwargs.get('reply_markup'):
reply_markup = kwargs.get('reply_markup')
if isinstance(reply_markup, ReplyMarkup):
data['reply_markup'] = reply_markup.to_json()
else:
data['reply_markup'] = reply_markup
result = request.post(url, data,
timeout=kwargs.get('timeout'))
if result is True:
return result
return Message.de_json(result)
开发者ID:Gurzo,项目名称:python-telegram-bot,代码行数:25,代码来源:bot.py
示例17: forwardMessage
def forwardMessage(self, chat_id, from_chat_id, message_id):
"""Use this method to forward messages of any kind.
Args:
chat_id:
Unique identifier for the message recipient — User or GroupChat id.
from_chat_id:
Unique identifier for the chat where the original message was sent
— User or GroupChat id.
message_id:
Unique message identifier.
Returns:
A telegram.Message instance representing the message forwarded.
"""
url = "%s/forwardMessage" % (self.base_url)
if not self.__auth:
raise TelegramError({"message": "API must be authenticated."})
data = {}
if chat_id:
data["chat_id"] = chat_id
if from_chat_id:
data["from_chat_id"] = from_chat_id
if message_id:
data["message_id"] = message_id
json_data = self._requestUrl(url, "POST", data=data)
data = self._parseAndCheckTelegram(json_data.decode())
return Message.de_json(data)
开发者ID:kalloc,项目名称:python-telegram-bot,代码行数:33,代码来源:bot.py
示例18: decorator
def decorator(self, *args, **kwargs):
"""
decorator
"""
url, data = func(self, *args, **kwargs)
if not kwargs.get('chat_id'):
raise TelegramError('Invalid chat_id.')
if kwargs.get('reply_to_message_id'):
reply_to_message_id = kwargs.get('reply_to_message_id')
data['reply_to_message_id'] = reply_to_message_id
if kwargs.get('reply_markup'):
reply_markup = kwargs.get('reply_markup')
if isinstance(reply_markup, ReplyMarkup):
data['reply_markup'] = reply_markup.to_json()
else:
data['reply_markup'] = reply_markup
json_data = Bot._requestUrl(url, 'POST', data=data)
data = Bot._parseAndCheckTelegram(json_data)
if data is True:
return data
return Message.de_json(data)
开发者ID:jacostag,项目名称:python-telegram-bot,代码行数:27,代码来源:bot.py
示例19: _post_message
def _post_message(url, data, kwargs, timeout=None, network_delay=2.):
"""Posts a message to the telegram servers.
Returns:
telegram.Message
"""
if not data.get('chat_id'):
raise TelegramError('Invalid chat_id')
if kwargs.get('reply_to_message_id'):
reply_to_message_id = kwargs.get('reply_to_message_id')
data['reply_to_message_id'] = reply_to_message_id
if kwargs.get('reply_markup'):
reply_markup = kwargs.get('reply_markup')
if isinstance(reply_markup, ReplyMarkup):
data['reply_markup'] = reply_markup.to_json()
else:
data['reply_markup'] = reply_markup
result = request.post(url, data, timeout=timeout,
network_delay=network_delay)
if result is True:
return result
return Message.de_json(result)
开发者ID:sensitiveio,项目名称:python-telegram-bot,代码行数:28,代码来源:bot.py
示例20: de_json
def de_json(data):
if not data:
return None
data['from_user'] = User.de_json(data.get('from'))
data['message'] = Message.de_json(data.get('message'))
return CallbackQuery(**data)
开发者ID:AlexR1712,项目名称:python-telegram-bot,代码行数:8,代码来源:callbackquery.py
注:本文中的telegram.Message类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论