本文整理汇总了Python中tests.api.base.api_client.get_raw函数的典型用法代码示例。如果您正苦于以下问题:Python get_raw函数的具体用法?Python get_raw怎么用?Python get_raw使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_raw函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_expanded_message
def test_expanded_message(stub_message, api_client):
def _check_json_message(msg_dict):
assert "body" in msg_dict
assert msg_dict["object"] == "message"
assert msg_dict["thread_id"] == stub_message.thread.public_id
assert isinstance(msg_dict["headers"], dict)
assert "In-Reply-To" in msg_dict["headers"]
assert "References" in msg_dict["headers"]
assert "Message-Id" in msg_dict["headers"]
valid_keys = ["account_id", "to", "from", "files", "unread", "unread", "date", "snippet"]
assert all(x in msg_dict for x in valid_keys)
# /message/<message_id>
resp = api_client.get_raw("/messages/{}?view=expanded".format(stub_message.public_id))
assert resp.status_code == 200
resp_dict = json.loads(resp.data)
_check_json_message(resp_dict)
# /messages/
resp = api_client.get_raw("/messages/?view=expanded")
assert resp.status_code == 200
resp_dict = json.loads(resp.data)
for message_json in resp_dict:
if message_json["id"] == stub_message.public_id:
_check_json_message(message_json)
开发者ID:nohobby,项目名称:sync-engine,代码行数:28,代码来源:test_messages.py
示例2: test_expanded_message
def test_expanded_message(stub_message, api_client):
def _check_json_message(msg_dict):
assert 'body' in msg_dict
assert msg_dict['object'] == 'message'
assert msg_dict['thread_id'] == stub_message.thread.public_id
assert isinstance(msg_dict['headers'], dict)
assert 'In-Reply-To' in msg_dict['headers']
assert 'References' in msg_dict['headers']
assert 'Message-Id' in msg_dict['headers']
valid_keys = ['account_id', 'to', 'from', 'files', 'unread',
'unread', 'date', 'snippet']
assert all(x in msg_dict for x in valid_keys)
# /message/<message_id>
resp = api_client.get_raw(
'/messages/{}?view=expanded'.format(stub_message.public_id))
assert resp.status_code == 200
resp_dict = json.loads(resp.data)
_check_json_message(resp_dict)
# /messages/
resp = api_client.get_raw('/messages/?view=expanded')
assert resp.status_code == 200
resp_dict = json.loads(resp.data)
for message_json in resp_dict:
if message_json['id'] == stub_message.public_id:
_check_json_message(message_json)
开发者ID:yodiyo,项目名称:sync-engine,代码行数:30,代码来源:test_messages.py
示例3: test_search_endpoints
def test_search_endpoints(db, api_client, token_manager, search_response,
default_account):
# Message, thread search succeeds.
for endpoint in ('messages', 'threads'):
r = api_client.get_raw('/{}/search?q=queryme'.format(endpoint))
assert r.status_code == 200
default_account.sync_state = 'invalid'
db.session.commit()
# Message, thread search on an invalid account fails with an HTTP 403.
for endpoint in ('messages', 'threads'):
r = api_client.get_raw('/{}/search?q=queryme'.format(endpoint))
assert r.status_code == 403
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:14,代码来源:test_invalid_account.py
示例4: test_invalid_gmail_account_search
def test_invalid_gmail_account_search(db, api_client, default_account,
invalid_gmail_token,
patch_gmail_search_response,
sorted_gmail_messages, is_streaming):
if is_streaming:
response = api_client.get_raw('/messages/search/streaming?'
'q=blah%20blah%20blah')
else:
response = api_client.get_raw('/messages/search?'
'q=blah%20blah%20blah')
assert response.status_code == 403
assert "This search can\'t be performed because the account\'s "\
"credentials are out of date." in json.loads(response.data)['message']
开发者ID:nagyistge,项目名称:nylas-sync-engine,代码行数:15,代码来源:test_searching.py
示例5: test_thread_count
def test_thread_count(db, api_client, default_account):
date1 = datetime.datetime(2015, 1, 1, 0, 0, 0)
date2 = datetime.datetime(2012, 1, 1, 0, 0, 0)
date3 = datetime.datetime(2010, 1, 1, 0, 0, 0)
date4 = datetime.datetime(2009, 1, 1, 0, 0, 0)
date5 = datetime.datetime(2008, 1, 1, 0, 0, 0)
thread1 = add_fake_thread(db.session, default_account.namespace.id)
thread2 = add_fake_thread(db.session, default_account.namespace.id)
test_subject = "test_thread_view_count_with_category"
for thread in [thread1, thread2]:
add_fake_message(db.session, default_account.namespace.id, thread,
subject=test_subject, received_date=date1)
add_fake_message(db.session, default_account.namespace.id, thread,
subject=test_subject, received_date=date2,
add_sent_category=True)
add_fake_message(db.session, default_account.namespace.id, thread,
subject=test_subject, received_date=date3)
add_fake_message(db.session, default_account.namespace.id, thread,
subject=test_subject, received_date=date4,
add_sent_category=True)
add_fake_message(db.session, default_account.namespace.id, thread,
subject=test_subject, received_date=date5)
resp = api_client.get_raw('/threads/?view=count&in=sent')
assert resp.status_code == 200
threads = json.loads(resp.data)
assert threads['count'] == 2
开发者ID:busseyl,项目名称:sync-engine,代码行数:30,代码来源:test_threads.py
示例6: test_thread_sent_recent_date
def test_thread_sent_recent_date(db, api_client, default_account):
date1 = datetime.datetime(2015, 1, 1, 0, 0, 0)
date2 = datetime.datetime(2012, 1, 1, 0, 0, 0)
date3 = datetime.datetime(2010, 1, 1, 0, 0, 0)
date4 = datetime.datetime(2009, 1, 1, 0, 0, 0)
date5 = datetime.datetime(2008, 1, 1, 0, 0, 0)
thread1 = add_fake_thread(db.session, default_account.namespace.id)
test_subject = "test_thread_sent_recent_date"
add_fake_message(db.session, default_account.namespace.id, thread1,
subject=test_subject, received_date=date1)
add_fake_message(db.session, default_account.namespace.id, thread1,
subject=test_subject, received_date=date2,
add_sent_category=True)
add_fake_message(db.session, default_account.namespace.id, thread1,
subject=test_subject, received_date=date3)
add_fake_message(db.session, default_account.namespace.id, thread1,
subject=test_subject, received_date=date4,
add_sent_category=True)
add_fake_message(db.session, default_account.namespace.id, thread1,
subject=test_subject, received_date=date5)
resp = api_client.get_raw('/threads/')
assert resp.status_code == 200
threads = json.loads(resp.data)
for thread in threads: # should only be one
assert datetime.datetime.fromtimestamp(
thread['last_message_sent_timestamp']) == date2
开发者ID:busseyl,项目名称:sync-engine,代码行数:31,代码来源:test_threads.py
示例7: test_thread_received_recent_date
def test_thread_received_recent_date(db, api_client, default_account):
date1 = datetime.datetime(2015, 1, 1, 0, 0, 0)
date2 = datetime.datetime(2012, 1, 1, 0, 0, 0)
thread1 = add_fake_thread(db.session, default_account.namespace.id)
date_dict = dict()
add_fake_message(db.session, default_account.namespace.id, thread1,
subject="Test Thread 1", received_date=date1,
add_sent_category=True)
add_fake_message(db.session, default_account.namespace.id, thread1,
subject="Test Thread 1", received_date=date2)
date_dict["Test Thread 1"] = date2
thread2 = add_fake_thread(db.session, default_account.namespace.id)
add_fake_message(db.session, default_account.namespace.id, thread2,
subject="Test Thread 2", received_date=date1,
add_sent_category=True)
date_dict["Test Thread 2"] = date1
resp = api_client.get_raw('/threads/')
assert resp.status_code == 200
threads = json.loads(resp.data)
for thread in threads:
assert date_dict[thread['subject']] == \
datetime.datetime.fromtimestamp(
thread['last_message_received_timestamp'])
开发者ID:busseyl,项目名称:sync-engine,代码行数:31,代码来源:test_threads.py
示例8: test_api_expand_recurring_message
def test_api_expand_recurring_message(db, api_client, message,
recurring_event):
# This is a regression test for https://phab.nylas.com/T3556
# ("InflatedEvent should not be committed" exception in API").
event = recurring_event
event.message = message
db.session.commit()
events = api_client.get_data('/events?expand_recurring=false')
assert len(events) == 1
# Make sure the recurrence info is on the recurring event
for e in events:
if e['title'] == 'recurring-weekly':
assert e.get('recurrence') is not None
assert e.get('message_id') is not None
r = api_client.get_raw('/events?expand_recurring=true')
assert r.status_code == 200
all_events = api_client.get_data('/events?expand_recurring=true')
assert len(all_events) != 0
for event in all_events:
assert event['master_event_id'] is not None
assert 'message_id' not in event
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:26,代码来源:test_events_recurring.py
示例9: test_empty_response_when_latest_cursor_given
def test_empty_response_when_latest_cursor_given(db, api_client,
default_namespace):
cursor = get_cursor(api_client, int(time.time() + 22), default_namespace)
url = url_concat('/delta/streaming', {'timeout': .1, 'cursor': cursor})
r = api_client.get_raw(url)
assert r.status_code == 200
assert r.data.strip() == ''
开发者ID:rskumar,项目名称:sync-engine,代码行数:7,代码来源:test_streaming.py
示例10: test_rfc822_format
def test_rfc822_format(stub_message_from_raw, api_client, mime_message):
""" Test the API response to retreive raw message contents """
full_path = '/messages/{}'.format(stub_message_from_raw.public_id)
results = api_client.get_raw(full_path,
headers={'Accept': 'message/rfc822'})
assert results.data == get_from_blockstore(stub_message_from_raw.data_sha256)
开发者ID:yodiyo,项目名称:sync-engine,代码行数:7,代码来源:test_messages.py
示例11: test_expanded_threads
def test_expanded_threads(stub_message, api_client):
def _check_json_thread(resp_dict):
assert "message_ids" not in resp_dict
assert "messages" in resp_dict
assert "drafts" in resp_dict
assert len(resp_dict["participants"]) == 3
assert len(resp_dict["messages"]) == 2
assert len(resp_dict["drafts"]) == 1
for msg_dict in resp_dict["messages"]:
assert "body" not in msg_dict
assert msg_dict["object"] == "message"
assert msg_dict["thread_id"] == stub_message.thread.public_id
valid_keys = ["account_id", "to", "from", "files", "unread", "unread", "date", "snippet"]
assert all(x in msg_dict for x in valid_keys)
for draft in resp_dict["drafts"]:
assert "body" not in draft
assert draft["object"] == "draft"
assert draft["thread_id"] == stub_message.thread.public_id
valid_keys = [
"account_id",
"to",
"from",
"files",
"unread",
"snippet",
"date",
"version",
"reply_to_message_id",
]
assert all(x in draft for x in valid_keys)
# /threads/<thread_id>
resp = api_client.get_raw("/threads/{}?view=expanded".format(stub_message.thread.public_id))
assert resp.status_code == 200
resp_dict = json.loads(resp.data)
_check_json_thread(resp_dict)
# /threads/
resp = api_client.get_raw("/threads/?view=expanded".format(stub_message.thread.public_id))
assert resp.status_code == 200
resp_dict = json.loads(resp.data)
for thread_json in resp_dict:
if thread_json["id"] == stub_message.thread.public_id:
_check_json_thread(thread_json)
开发者ID:nohobby,项目名称:sync-engine,代码行数:47,代码来源:test_messages.py
示例12: test_expanded_threads
def test_expanded_threads(stub_message, api_client, api_version):
def _check_json_thread(resp_dict):
assert 'message_ids' not in resp_dict
assert 'messages' in resp_dict
assert 'drafts' in resp_dict
assert len(resp_dict['participants']) == 3
assert len(resp_dict['messages']) == 2
assert len(resp_dict['drafts']) == 1
for msg_dict in resp_dict['messages']:
assert 'body' not in msg_dict
assert msg_dict['object'] == 'message'
assert msg_dict['thread_id'] == stub_message.thread.public_id
valid_keys = ['account_id', 'to', 'from', 'files', 'unread',
'unread', 'date', 'snippet']
assert all(x in msg_dict for x in valid_keys)
for draft in resp_dict['drafts']:
assert 'body' not in draft
assert draft['object'] == 'draft'
assert draft['thread_id'] == stub_message.thread.public_id
valid_keys = ['account_id', 'to', 'from', 'files', 'unread',
'snippet', 'date', 'version', 'reply_to_message_id']
assert all(x in draft for x in valid_keys)
headers = dict()
headers['Api-Version'] = api_version
# /threads/<thread_id>
resp = api_client.get_raw(
'/threads/{}?view=expanded'.format(stub_message.thread.public_id),
headers=headers)
assert resp.status_code == 200
resp_dict = json.loads(resp.data)
_check_json_thread(resp_dict)
# /threads/
resp = api_client.get_raw(
'/threads/?view=expanded'.format(stub_message.thread.public_id),
headers=headers)
assert resp.status_code == 200
resp_dict = json.loads(resp.data)
for thread_json in resp_dict:
if thread_json['id'] == stub_message.thread.public_id:
_check_json_thread(thread_json)
开发者ID:busseyl,项目名称:sync-engine,代码行数:46,代码来源:test_messages.py
示例13: test_response_when_old_cursor_given
def test_response_when_old_cursor_given(db, api_client, default_namespace):
url = url_concat('/delta/streaming', {'timeout': .1, 'cursor': '0'})
r = api_client.get_raw(url)
assert r.status_code == 200
responses = r.data.split('\n')
for response_string in responses:
if response_string:
validate_response_format(response_string)
开发者ID:rskumar,项目名称:sync-engine,代码行数:8,代码来源:test_streaming.py
示例14: test_sender_and_participants
def test_sender_and_participants(stub_message, api_client):
resp = api_client.get_raw("/threads/{}".format(stub_message.thread.public_id))
assert resp.status_code == 200
resp_dict = json.loads(resp.data)
participants = resp_dict["participants"]
assert len(participants) == 3
# Not expanded, should only return IDs
assert "message" not in resp_dict
assert "drafts" not in resp_dict
开发者ID:nohobby,项目名称:sync-engine,代码行数:10,代码来源:test_messages.py
示例15: test_exclude_and_include_object_types
def test_exclude_and_include_object_types(db, api_client, thread,
default_namespace):
add_fake_message(
db.session,
default_namespace.id,
thread,
from_addr=[('Bob', '[email protected]')])
# Check that we do get message and contact changes by default.
url = url_concat('/delta/streaming', {'timeout': .1, 'cursor': '0'})
r = api_client.get_raw(url)
assert r.status_code == 200
responses = r.data.split('\n')
parsed_responses = [json.loads(resp) for resp in responses if resp != '']
assert any(resp['object'] == 'message' for resp in parsed_responses)
assert any(resp['object'] == 'contact' for resp in parsed_responses)
# And check that we don't get message/contact changes if we exclude them.
url = url_concat('/delta/streaming', {
'timeout': .1,
'cursor': '0',
'exclude_types': 'message,contact'
})
r = api_client.get_raw(url)
assert r.status_code == 200
responses = r.data.split('\n')
parsed_responses = [json.loads(resp) for resp in responses if resp != '']
assert not any(resp['object'] == 'message' for resp in parsed_responses)
assert not any(resp['object'] == 'contact' for resp in parsed_responses)
# And check we only get message objects if we use include_types
url = url_concat('/delta/streaming', {
'timeout': .1,
'cursor': '0',
'include_types': 'message'
})
r = api_client.get_raw(url)
assert r.status_code == 200
responses = r.data.split('\n')
parsed_responses = [json.loads(resp) for resp in responses if resp != '']
assert all(resp['object'] == 'message' for resp in parsed_responses)
开发者ID:rskumar,项目名称:sync-engine,代码行数:41,代码来源:test_streaming.py
示例16: test_expanded_view
def test_expanded_view(db, api_client, thread, message, default_namespace):
url = url_concat('/delta/streaming', {'timeout': .1, 'cursor': '0',
'include_types': 'message,thread',
'view': 'expanded'})
r = api_client.get_raw(url)
assert r.status_code == 200
responses = r.data.split('\n')
parsed_responses = [json.loads(resp) for resp in responses if resp != '']
for delta in parsed_responses:
if delta['object'] == 'message':
assert 'headers' in delta['attributes']
elif delta['object'] == 'thread':
assert 'messages' in delta['attributes']
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:13,代码来源:test_streaming.py
示例17: test_gracefully_handle_new_namespace
def test_gracefully_handle_new_namespace(db, api_client):
new_namespace = Namespace()
new_account = GenericAccount()
new_account.password = 'hunter2'
new_namespace.account = new_account
db.session.add(new_namespace)
db.session.add(new_account)
db.session.commit()
cursor = get_cursor(api_client, int(time.time()),
new_namespace)
url = url_concat('/n/{}/delta/streaming'.format(new_namespace.public_id),
{'timeout': .1, 'cursor': cursor})
r = api_client.get_raw(url)
assert r.status_code == 200
开发者ID:rskumar,项目名称:sync-engine,代码行数:14,代码来源:test_streaming.py
示例18: label_client
def label_client(db, gmail_account):
api_client = new_api_client(db, gmail_account.namespace)
# Whereas calling generic_account always makes a new IMAP account,
# calling gmail_account checks first to see if there's an existing
# Gmail account and uses it if so. This can cause namespace
# conflicts if a label is "created" more than once. Since
# labels can't be deleted and then re-created, this fixture only
# makes a new label if there are no existing labels.
g_data = api_client.get_raw('/labels/')
if not json.loads(g_data.data):
api_client.post_data('/labels/',
{"display_name": "Test_Label"})
return api_client
开发者ID:busseyl,项目名称:sync-engine,代码行数:14,代码来源:test_folders_labels.py
示例19: test_read_endpoints
def test_read_endpoints(db, setup_account, api_client, default_account):
# Read operations succeed.
for resource, public_id in setup_account.items():
endpoint = '/{}s'.format(resource)
r = api_client.get_raw(endpoint)
assert r.status_code == 200
read_endpoint = '{}/{}'.format(endpoint, public_id)
r = api_client.get_raw(read_endpoint)
assert r.status_code == 200
default_account.sync_state = 'invalid'
db.session.commit()
# Read operations on an invalid account also succeed.
for resource, public_id in setup_account.items():
endpoint = '/{}s'.format(resource)
r = api_client.get_raw(endpoint)
assert r.status_code == 200
read_endpoint = '{}/{}'.format(endpoint, public_id)
r = api_client.get_raw(read_endpoint)
assert r.status_code == 200
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:23,代码来源:test_invalid_account.py
示例20: test_sender_and_participants
def test_sender_and_participants(stub_message, api_client, api_version):
headers = dict()
headers['Api-Version'] = api_version
resp = api_client.get_raw('/threads/{}'
.format(stub_message.thread.public_id),
headers=headers)
assert resp.status_code == 200
resp_dict = json.loads(resp.data)
participants = resp_dict['participants']
assert len(participants) == 3
# Not expanded, should only return IDs
assert 'message' not in resp_dict
assert 'drafts' not in resp_dict
开发者ID:busseyl,项目名称:sync-engine,代码行数:15,代码来源:test_messages.py
注:本文中的tests.api.base.api_client.get_raw函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论