本文整理汇总了Python中tests.api.base.api_client.get_data函数的典型用法代码示例。如果您正苦于以下问题:Python get_data函数的具体用法?Python get_data怎么用?Python get_data使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_data函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_api_get
def test_api_get(db, api_client, calendar):
e_data = {'title': 'subj', 'when': {'time': 1},
'calendar_id': calendar.public_id, 'location': 'InboxHQ'}
e_data2 = {'title': 'subj2', 'when': {'time': 1},
'calendar_id': calendar.public_id, 'location': 'InboxHQ'}
api_client.post_data('/events', e_data)
api_client.post_data('/events', e_data2)
event_list = api_client.get_data('/events')
event_ids = [event['id'] for event in event_list]
c1found = False
c2found = False
for c_id in event_ids:
event = api_client.get_data('/events/' + c_id)
if event['title'] == 'subj':
c1found = True
if event['title'] == 'subj2':
c2found = True
assert c1found
assert c2found
开发者ID:yodiyo,项目名称:sync-engine,代码行数:25,代码来源:test_events.py
示例2: test_account_expanded
def test_account_expanded(db, api_client, generic_account, gmail_account):
# Generic accounts expose a `server_settings` attribute
# Custom IMAP
api_client = new_api_client(db, generic_account.namespace)
resp_data = api_client.get_data('/account/?view=expanded')
assert resp_data['provider'] == 'custom'
assert 'server_settings' in resp_data
assert set(resp_data['server_settings']) == set({
'imap_host': 'imap.custom.com',
'smtp_host': 'smtp.custom.com',
'imap_port': 993,
'smtp_port': 587,
'ssl_required': True})
# Yahoo
yahoo_account = add_fake_yahoo_account(db.session)
api_client = new_api_client(db, yahoo_account.namespace)
resp_data = api_client.get_data('/account/?view=expanded')
assert resp_data['provider'] == 'yahoo'
assert 'server_settings' in resp_data
assert set(resp_data['server_settings']) == set({
'imap_host': 'imap.mail.yahoo.com',
'smtp_host': 'smtp.mail.yahoo.com',
'imap_port': 993,
'smtp_port': 587,
'ssl_required': True})
# Gmail accounts don't expose a `server_settings` attribute
api_client = new_api_client(db, gmail_account.namespace)
resp_data = api_client.get_data('/account/?view=expanded')
assert resp_data['provider'] == 'gmail'
assert 'server_settings' not in resp_data
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:32,代码来源:test_account.py
示例3: test_contacts_updated
def test_contacts_updated(api_client):
"""Tests that draft-contact associations are properly created and
updated."""
draft = {
'to': [{'email': '[email protected]'}, {'email': '[email protected]'}]
}
r = api_client.post_data('/drafts', draft)
assert r.status_code == 200
draft_id = json.loads(r.data)['id']
draft_version = json.loads(r.data)['version']
r = api_client.get_data('/[email protected]')
assert len(r) == 1
updated_draft = {
'to': [{'email': '[email protected]'}, {'email': '[email protected]'}],
'version': draft_version
}
r = api_client.put_data('/drafts/{}'.format(draft_id), updated_draft)
assert r.status_code == 200
r = api_client.get_data('/[email protected]')
assert len(r) == 1
r = api_client.get_data('/[email protected]')
assert len(r) == 0
r = api_client.get_data('/[email protected]')
assert len(r) == 1
开发者ID:rskumar,项目名称:sync-engine,代码行数:31,代码来源:test_drafts.py
示例4: test_delete_from_readonly_calendar
def test_delete_from_readonly_calendar(db, default_namespace, api_client):
add_fake_event(db.session, default_namespace.id,
calendar=db.session.query(Calendar).filter(
Calendar.namespace_id == default_namespace.id,
Calendar.read_only == True).first(), # noqa
read_only=True)
calendar_list = api_client.get_data('/calendars')
read_only_calendar = None
for c in calendar_list:
if c['read_only']:
read_only_calendar = c
break
events = api_client.get_data('/events?calendar_id={}'.format(
read_only_calendar['id']))
for event in events:
if event['read_only']:
read_only_event = event
break
assert read_only_calendar
assert read_only_event
e_id = read_only_event['id']
resp = api_client.delete('/events/{}'.format(e_id))
assert resp.status_code == 400
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:26,代码来源:test_calendars.py
示例5: test_update_draft
def test_update_draft(api_client):
with freeze_time(datetime.now()) as freezer:
original_draft = {"subject": "original draft", "body": "parent draft"}
r = api_client.post_data("/drafts", original_draft)
draft_public_id = json.loads(r.data)["id"]
version = json.loads(r.data)["version"]
assert version == 0
freezer.tick()
updated_draft = {"subject": "updated draft", "body": "updated draft", "version": version}
r = api_client.put_data("/drafts/{}".format(draft_public_id), updated_draft)
updated_public_id = json.loads(r.data)["id"]
updated_version = json.loads(r.data)["version"]
assert updated_public_id == draft_public_id
assert updated_version > 0
drafts = api_client.get_data("/drafts")
assert len(drafts) == 1
assert drafts[0]["id"] == updated_public_id
# Check that the thread is updated too.
thread = api_client.get_data("/threads/{}".format(drafts[0]["thread_id"]))
assert thread["subject"] == "updated draft"
assert thread["first_message_timestamp"] == drafts[0]["date"]
assert thread["last_message_timestamp"] == drafts[0]["date"]
开发者ID:busseyl,项目名称:sync-engine,代码行数:28,代码来源:test_drafts.py
示例6: test_api_get
def test_api_get(contacts_provider, contact_sync, db, api_client,
default_namespace):
contacts_provider.supply_contact('Contact One',
'[email protected]')
contacts_provider.supply_contact('Contact Two',
'[email protected]')
contact_sync.provider = contacts_provider
contact_sync.sync()
contact_list = api_client.get_data('/contacts')
contact_ids = [contact['id'] for contact in contact_list]
c1found = False
c2found = False
for c_id in contact_ids:
contact = api_client.get_data('/contacts/' + c_id)
if contact['name'] == 'Contact One':
c1found = True
if contact['name'] == 'Contact Two':
c2found = True
assert c1found
assert c2found
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:27,代码来源:test_contacts.py
示例7: test_events_are_condensed
def test_events_are_condensed(api_client, message):
"""
Test that multiple revisions of the same object are rolled up in the
delta response.
"""
ts = int(time.time() + 22)
cursor = get_cursor(api_client, ts)
# Modify a message, then modify it again
message_id = api_client.get_data('/messages/')[0]['id']
message_path = '/messages/{}'.format(message_id)
api_client.put_data(message_path, {'unread': True})
api_client.put_data(message_path, {'unread': False})
api_client.put_data(message_path, {'unread': True})
# Check that successive modifies are condensed.
sync_data = api_client.get_data('/delta?cursor={}'.format(cursor))
deltas = sync_data['deltas']
# A message modify propagates to its thread
message_deltas = [d for d in deltas if d['object'] == 'message']
assert len(message_deltas) == 1
delta = message_deltas[0]
assert delta['object'] == 'message' and delta['event'] == 'modify'
assert delta['attributes']['unread'] is True
开发者ID:rskumar,项目名称:sync-engine,代码行数:26,代码来源:test_delta_sync.py
示例8: test_account
def test_account(db, api_client, generic_account, gmail_account):
# Because we're using the generic_account namespace
api_client = new_api_client(db, generic_account.namespace)
resp_data = api_client.get_data('/account')
assert resp_data['id'] == generic_account.namespace.public_id
assert resp_data['object'] == 'account'
assert resp_data['account_id'] == generic_account.namespace.public_id
assert resp_data['email_address'] == generic_account.email_address
assert resp_data['name'] == generic_account.name
assert resp_data['organization_unit'] == 'folder'
assert 'sync_state' in resp_data
assert 'server_settings' not in resp_data
# Because we're using the gmail account namespace
api_client = new_api_client(db, gmail_account.namespace)
resp_data = api_client.get_data('/account')
assert resp_data['id'] == gmail_account.namespace.public_id
assert resp_data['provider'] == 'gmail'
assert resp_data['organization_unit'] == 'label'
assert 'sync_state' in resp_data
assert 'server_settings' not in resp_data
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:25,代码来源:test_account.py
示例9: test_api_expand_recurring_message
def test_api_expand_recurring_message(db, api_client, message,
recurring_event):
# This is a regression test for https://phab.nylas.com/T3556
# ("InflatedEvent should not be committed" exception in API").
event = recurring_event
event.message = message
db.session.commit()
events = api_client.get_data('/events?expand_recurring=false')
assert len(events) == 1
# Make sure the recurrence info is on the recurring event
for e in events:
if e['title'] == 'recurring-weekly':
assert e.get('recurrence') is not None
assert e.get('message_id') is not None
r = api_client.get_raw('/events?expand_recurring=true')
assert r.status_code == 200
all_events = api_client.get_data('/events?expand_recurring=true')
assert len(all_events) != 0
for event in all_events:
assert event['master_event_id'] is not None
assert 'message_id' not in event
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:26,代码来源:test_events_recurring.py
示例10: test_create_draft_with_attachments
def test_create_draft_with_attachments(api_client, attachments, example_draft):
attachment_ids = []
upload_path = '/files'
for filename, path in attachments:
data = {'file': (open(path, 'rb'), filename)}
r = api_client.post_raw(upload_path, data=data)
assert r.status_code == 200
attachment_id = json.loads(r.data)[0]['id']
attachment_ids.append(attachment_id)
first_attachment = attachment_ids.pop()
example_draft['file_ids'] = [first_attachment]
r = api_client.post_data('/drafts', example_draft)
assert r.status_code == 200
returned_draft = json.loads(r.data)
draft_public_id = returned_draft['id']
assert returned_draft['version'] == 0
example_draft['version'] = returned_draft['version']
assert len(returned_draft['files']) == 1
attachment_ids.append(first_attachment)
example_draft['file_ids'] = attachment_ids
r = api_client.put_data('/drafts/{}'.format(draft_public_id),
example_draft)
assert r.status_code == 200
returned_draft = json.loads(r.data)
assert len(returned_draft['files']) == 3
assert returned_draft['version'] == 1
example_draft['version'] = returned_draft['version']
# Make sure we can't delete the files now
for file_id in attachment_ids:
r = api_client.delete('/files/{}'.format(file_id))
assert r.status_code == 400
# Now remove the attachment
example_draft['file_ids'] = [first_attachment]
r = api_client.put_data('/drafts/{}'.format(draft_public_id),
example_draft)
draft_data = api_client.get_data('/drafts/{}'.format(draft_public_id))
assert len(draft_data['files']) == 1
assert draft_data['version'] == 2
example_draft['version'] = draft_data['version']
example_draft['file_ids'] = []
r = api_client.put_data('/drafts/{}'.format(draft_public_id),
example_draft)
draft_data = api_client.get_data('/drafts/{}'.format(draft_public_id))
assert r.status_code == 200
assert len(draft_data['files']) == 0
assert draft_data['version'] == 3
# now that they're not attached, we should be able to delete them
for file_id in attachment_ids:
r = api_client.delete('/files/{}'.format(file_id))
assert r.status_code == 200
开发者ID:brandoncc,项目名称:sync-engine,代码行数:58,代码来源:test_drafts.py
示例11: test_gmail_pagination
def test_gmail_pagination(db, default_account,
patch_crispin_client,
patch_handler_from_provider,
folder):
for i in range(10):
thread = add_fake_thread(db.session, default_account.namespace.id)
message = add_fake_message(db.session, default_account.namespace.id,
thread=thread,
from_addr=[{'name': '', 'email':
'{}@test.com'.format(str(i))}],
subject='hi',
g_msgid=i,
received_date=datetime.
datetime(2000 + i, 1, 1, 1, 0, 0))
add_fake_imapuid(db.session, default_account.id, message,
folder, i)
first_ten_messages_db = db.session.query(Message)\
.filter(Message.namespace_id ==
default_account.namespace.id). \
order_by(desc(Message.received_date)). \
limit(10).all()
api_client = new_api_client(db, default_account.namespace)
first_ten_messages_api = api_client.get_data('/messages/search?q=hi'
'&limit=10')
assert len(first_ten_messages_api) == len(first_ten_messages_db)
for db_message, api_message in zip(first_ten_messages_db,
first_ten_messages_api):
assert db_message.public_id == api_message['id']
imap_uids = db.session.query(ImapUid).join(Message) \
.filter(
ImapUid.message_id == Message.id,
Message.g_msgid != None).all()
uids = [uid.msg_uid for uid in imap_uids]
first_ten_threads_db = db.session.query(Thread) \
.join(Message) \
.join(ImapUid) \
.filter(ImapUid.account_id == default_account.id,
ImapUid.msg_uid.in_(uids),
Thread.id == Message.thread_id)\
.order_by(desc(Message.received_date)) \
.limit(10).all()
first_ten_threads_api = api_client.get_data('/threads/search?q=hi'
'&limit=10')
assert len(first_ten_threads_api) == len(first_ten_threads_db)
for db_thread, api_thread in zip(first_ten_threads_db,
first_ten_threads_api):
assert db_thread.public_id == api_thread['id']
开发者ID:Eagles2F,项目名称:sync-engine,代码行数:57,代码来源:test_searching.py
示例12: test_get_with_id
def test_get_with_id(api_client, uploaded_file_ids, filename):
# See comment in uploaded_file_ids()
if filename == 'piece-jointe.jpg':
filename = u'pièce-jointe.jpg'
elif filename == 'andra-moi-ennepe.txt':
filename = u'ἄνδρα μοι ἔννεπε'
in_file = api_client.get_data(u'/files?filename={}'.format(filename))[0]
data = api_client.get_data('/files/{}'.format(in_file['id']))
assert data['filename'] == filename
开发者ID:nohobby,项目名称:sync-engine,代码行数:9,代码来源:test_files.py
示例13: test_reject_incompatible_reply_thread_and_message
def test_reject_incompatible_reply_thread_and_message(db, api_client, message, thread, default_namespace):
alt_thread = add_fake_thread(db.session, default_namespace.id)
add_fake_message(db.session, default_namespace.id, alt_thread)
thread = api_client.get_data("/threads")[0]
alt_message_id = api_client.get_data("/threads")[1]["message_ids"][0]
alt_message = api_client.get_data("/messages/{}".format(alt_message_id))
assert thread["id"] != alt_message["thread_id"]
reply_draft = {"subject": "test reply", "reply_to_message_id": alt_message["id"], "thread_id": thread["id"]}
r = api_client.post_data("/drafts", reply_draft)
assert r.status_code == 400
开发者ID:busseyl,项目名称:sync-engine,代码行数:11,代码来源:test_drafts.py
示例14: test_gmail_message_search
def test_gmail_message_search(api_client, default_account,
patch_token_manager,
patch_gmail_search_response,
sorted_gmail_messages, is_streaming):
search_client = get_search_client(default_account)
assert isinstance(search_client, GmailSearchClient)
if is_streaming:
messages = api_client.get_data('/messages/search/streaming?q=blah%20blah%20blah')
else:
messages = api_client.get_data('/messages/search?q=blah%20blah%20blah')
assert_search_result(sorted_gmail_messages, messages)
开发者ID:nagyistge,项目名称:nylas-sync-engine,代码行数:13,代码来源:test_searching.py
示例15: test_gmail_search_unicode
def test_gmail_search_unicode(db, api_client, test_gmail_thread,
patch_token_manager,
patch_gmail_search_response,
default_account,
sorted_gmail_messages,
sorted_gmail_threads, is_streaming):
search_client = get_search_client(default_account)
assert isinstance(search_client, GmailSearchClient)
if is_streaming:
threads = api_client.get_data('/threads/search/streaming?q=存档')
else:
threads = api_client.get_data('/threads/search?q=存档')
assert_search_result(sorted_gmail_threads, threads)
开发者ID:nagyistge,项目名称:nylas-sync-engine,代码行数:15,代码来源:test_searching.py
示例16: test_get_invalid
def test_get_invalid(api_client, uploaded_file_ids):
data = api_client.get_data('/files/0000000000000000000000000')
assert data['message'].startswith("Couldn't find file")
data = api_client.get_data('/files/!')
assert data['message'].startswith("Invalid id")
data = api_client.get_data('/files/0000000000000000000000000/download')
assert data['message'].startswith("Couldn't find file")
data = api_client.get_data('/files/!/download')
assert data['message'].startswith("Invalid id")
r = api_client.delete('/files/0000000000000000000000000')
assert r.status_code == 404
r = api_client.delete('/files/!')
assert r.status_code == 400
开发者ID:nohobby,项目名称:sync-engine,代码行数:15,代码来源:test_files.py
示例17: test_resource_views
def test_resource_views(resource_name, db, api_client,
message, thread, event, folder, label, contact):
"""Exercises various tests for views, mostly related to
filtering. Note: this only tests views, it assumes the
resources are working as expected."""
elements = api_client.get_data('/{}'.format(resource_name))
count = api_client.get_data('/{}?view=count'.format(resource_name))
assert count["count"] == len(elements)
ids = api_client.get_data('/{}?view=ids'.format(resource_name))
for i, elem in enumerate(elements):
assert isinstance(ids[i], basestring), \
"&views=ids should return string"
assert elem["id"] == ids[i], "view=ids should preserve order"
开发者ID:yodiyo,项目名称:sync-engine,代码行数:16,代码来源:test_views.py
示例18: test_api_override_serialization
def test_api_override_serialization(db, api_client, default_namespace,
recurring_event):
event = recurring_event
override = Event(original_start_time=event.start,
master_event_uid=event.uid,
namespace_id=default_namespace.id,
calendar_id=event.calendar_id)
override.update(event)
override.uid = event.uid + "_" + event.start.strftime("%Y%m%dT%H%M%SZ")
override.master = event
override.master_event_uid = event.uid
override.cancelled = True
db.session.add(override)
db.session.commit()
filter = 'starts_after={}&ends_before={}'.format(
urlsafe(event.start.replace(hours=-1)),
urlsafe(event.start.replace(weeks=+1)))
events = api_client.get_data('/events?' + filter)
# We should have the base event and the override back, but no extras;
# this allows clients to do their own expansion, should they ever desire
# to experience the joy that is RFC 2445 section 4.8.5.4.
assert len(events) == 2
assert events[0].get('object') == 'event'
assert events[0].get('recurrence') is not None
assert events[1].get('object') == 'event'
assert events[1].get('status') == 'cancelled'
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:28,代码来源:test_events_recurring.py
示例19: test_conflicting_updates
def test_conflicting_updates(api_client):
original_draft = {
'subject': 'parent draft',
'body': 'parent draft'
}
r = api_client.post_data('/drafts', original_draft)
original_public_id = json.loads(r.data)['id']
version = json.loads(r.data)['version']
updated_draft = {
'subject': 'updated draft',
'body': 'updated draft',
'version': version
}
r = api_client.put_data('/drafts/{}'.format(original_public_id),
updated_draft)
assert r.status_code == 200
updated_public_id = json.loads(r.data)['id']
updated_version = json.loads(r.data)['version']
assert updated_version != version
conflicting_draft = {
'subject': 'conflicting draft',
'body': 'conflicting draft',
'version': version
}
r = api_client.put_data('/drafts/{}'.format(original_public_id),
conflicting_draft)
assert r.status_code == 409
drafts = api_client.get_data('/drafts')
assert len(drafts) == 1
assert drafts[0]['id'] == updated_public_id
开发者ID:brandoncc,项目名称:sync-engine,代码行数:33,代码来源:test_drafts.py
示例20: test_create_draft_replying_to_message
def test_create_draft_replying_to_message(api_client, message):
message = api_client.get_data("/messages")[0]
reply_draft = {"subject": "test reply", "body": "test reply", "reply_to_message_id": message["id"]}
r = api_client.post_data("/drafts", reply_draft)
data = json.loads(r.data)
assert data["reply_to_message_id"] == message["id"]
assert data["thread_id"] == message["thread_id"]
开发者ID:busseyl,项目名称:sync-engine,代码行数:7,代码来源:test_drafts.py
注:本文中的tests.api.base.api_client.get_data函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论