本文整理汇总了Python中tests.util.base.api_client.get_data函数的典型用法代码示例。如果您正苦于以下问题:Python get_data函数的具体用法?Python get_data怎么用?Python get_data使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_data函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_events_are_condensed
def test_events_are_condensed(api_client):
"""Test that multiple revisions of the same object are rolled up in the
delta response."""
ts = int(time.time())
cursor = get_cursor(api_client, ts)
# Create, then modify a tag; then modify it again
tag = json.loads(api_client.post_data('/tags/', {'name': 'foo'}).data)
tag_id = tag['id']
api_client.put_data('/tags/{}'.format(tag_id), {'name': 'bar'})
api_client.put_data('/tags/{}'.format(tag_id), {'name': 'baz'})
# Modify a thread, then modify it again
thread_id = api_client.get_data('/threads/')[0]['id']
thread_path = '/threads/{}'.format(thread_id)
api_client.put_data(thread_path, {'add_tags': [tag_id]})
api_client.put_data(thread_path, {'remove_tags': [tag_id]})
sync_data = api_client.get_data('/delta?cursor={}'.format(cursor))
assert len(sync_data['deltas']) == 3
first_delta = sync_data['deltas'][0]
assert first_delta['object'] == 'tag' and first_delta['event'] == 'create'
# Check that successive modifies are condensed.
second_delta = sync_data['deltas'][1]
assert (second_delta['object'] == 'tag' and
second_delta['event'] == 'modify')
assert second_delta['attributes']['name'] == 'baz'
third_delta = sync_data['deltas'][2]
assert (third_delta['object'] == 'thread' and
third_delta['event'] == 'modify')
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:33,代码来源:test_delta_sync.py
示例2: test_contacts_updated
def test_contacts_updated(api_client):
"""Tests that draft-contact associations are properly created and
updated."""
draft = {"to": [{"email": "[email protected]"}, {"email": "[email protected]"}]}
r = api_client.post_data("/drafts", draft)
assert r.status_code == 200
draft_id = json.loads(r.data)["id"]
draft_version = json.loads(r.data)["version"]
r = api_client.get_data("/[email protected]")
assert len(r) == 1
updated_draft = {"to": [{"email": "[email protected]"}, {"email": "[email protected]"}], "version": draft_version}
r = api_client.put_data("/drafts/{}".format(draft_id), updated_draft)
assert r.status_code == 200
r = api_client.get_data("/[email protected]")
assert len(r) == 1
r = api_client.get_data("/[email protected]")
assert len(r) == 0
r = api_client.get_data("/[email protected]")
assert len(r) == 1
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:26,代码来源:test_drafts.py
示例3: test_create_tag
def test_create_tag(api_client, default_namespace):
ns_id = default_namespace.public_id
post_resp = api_client.post_data('/tags/', {'name': 'foo'})
assert post_resp.status_code == 200
tag_resp = json.loads(post_resp.data)
assert tag_resp['name'] == 'foo'
assert tag_resp['namespace_id'] == ns_id
tag_id = tag_resp['id']
# Check getting the tag
tag_data = api_client.get_data('/tags/{}'.format(tag_id))
assert tag_data['name'] == 'foo'
assert tag_data['namespace_id'] == ns_id
assert tag_data['id'] == tag_id
# Check listing the tag
assert 'foo' in [tag['name'] for tag in api_client.get_data('/tags/')]
# Make sure we can specify the namespace that we are creating the tag in
bad_ns_id = 0000000000000000000000000
tag_data = {'name': 'foo3', 'namespace_id': bad_ns_id}
put_resp = api_client.post_data('/tags/', tag_data)
assert put_resp.status_code == 400
assert 'foo3' not in [tag['name'] for tag in api_client.get_data('/tags/')]
开发者ID:dlitz,项目名称:inbox,代码行数:25,代码来源:test_tags.py
示例4: test_api_expand_recurring_before_after
def test_api_expand_recurring_before_after(db, api_client):
acct = db.session.query(Account).filter_by(id=ACCOUNT_ID).one()
ns_id = acct.namespace.public_id
event = add_recurring_event(db, acct)
starts_after = event.start.replace(weeks=+15)
ends_before = starts_after.replace(days=+1)
recur = 'expand_recurring=true&starts_after={}&ends_before={}'.format(
urlsafe(starts_after), urlsafe(ends_before))
all_events = api_client.get_data('/events?' + recur, ns_id)
assert len(all_events) == 1
recur = 'expand_recurring=true&starts_after={}&starts_before={}'.format(
urlsafe(starts_after), urlsafe(ends_before))
all_events = api_client.get_data('/events?' + recur, ns_id)
assert len(all_events) == 1
recur = 'expand_recurring=true&ends_after={}&starts_before={}'.format(
urlsafe(starts_after), urlsafe(ends_before))
all_events = api_client.get_data('/events?' + recur, ns_id)
assert len(all_events) == 1
recur = 'expand_recurring=true&ends_after={}&ends_before={}'.format(
urlsafe(starts_after), urlsafe(ends_before))
all_events = api_client.get_data('/events?' + recur, ns_id)
assert len(all_events) == 1
开发者ID:Analect,项目名称:sync-engine,代码行数:27,代码来源:test_events_recurring.py
示例5: test_api_expand_recurring
def test_api_expand_recurring(db, api_client):
acct = db.session.query(Account).filter_by(id=ACCOUNT_ID).one()
ns_id = acct.namespace.public_id
event = add_recurring_event(db, acct)
# 3 existing test events in database + 1 new one
events = api_client.get_data('/events?expand_recurring=false', ns_id)
assert len(events) == 4
# Make sure the recurrence info is on the recurring event
for e in events:
if e['title'] == 'recurring-weekly':
assert e.get('recurrence') is not None
thirty_weeks = event.start.replace(weeks=+30).isoformat()
starts_after = event.start.replace(days=-1).isoformat()
recur = 'expand_recurring=true&starts_after={}&ends_before={}'.format(
urllib.quote_plus(starts_after), urllib.quote_plus(thirty_weeks))
all_events = api_client.get_data('/events?' + recur, ns_id)
assert len(all_events) == 30
# the ordering should be correct
prev = all_events[0]['when']['start_time']
for e in all_events[1:]:
assert e['when']['start_time'] > prev
prev = e['when']['start_time']
events = api_client.get_data('/events?' + recur + '&limit=5', ns_id)
assert len(events) == 5
events = api_client.get_data('/events?' + recur + '&offset=5', ns_id)
assert events[0]['id'] == all_events[5]['id']
events = api_client.get_data('/events?' + recur + '&view=count', ns_id)
assert events.get('count') == 30
开发者ID:Analect,项目名称:sync-engine,代码行数:34,代码来源:test_events_recurring.py
示例6: test_send_existing_draft
def test_send_existing_draft(patch_smtp, api_client, example_draft):
r = api_client.post_data('/drafts', example_draft)
draft_public_id = json.loads(r.data)['id']
version = json.loads(r.data)['version']
r = api_client.post_data('/send',
{'draft_id': draft_public_id,
'version': version})
assert r.status_code == 200
# Test that the sent draft can't be sent again.
r = api_client.post_data('/send',
{'draft_id': draft_public_id,
'version': version})
assert r.status_code == 400
drafts = api_client.get_data('/drafts')
threads_with_drafts = api_client.get_data('/threads?tag=drafts')
assert not drafts
assert not threads_with_drafts
sent_threads = api_client.get_data('/threads?tag=sent')
assert len(sent_threads) == 1
message = api_client.get_data('/messages/{}'.format(draft_public_id))
assert message['object'] == 'message'
开发者ID:Analect,项目名称:sync-engine,代码行数:26,代码来源:test_sending.py
示例7: test_api_get
def test_api_get(contacts_provider, contact_sync, db, api_client, default_namespace):
contacts_provider.supply_contact("Contact One", "[email protected]")
contacts_provider.supply_contact("Contact Two", "[email protected]")
contact_sync.provider = contacts_provider
contact_sync.sync()
ns_id = default_namespace.public_id
contact_list = api_client.get_data("/contacts", ns_id)
contact_ids = [contact["id"] for contact in contact_list]
c1found = False
c2found = False
for c_id in contact_ids:
contact = api_client.get_data("/contacts/" + c_id, ns_id)
if contact["name"] == "Contact One":
c1found = True
if contact["name"] == "Contact Two":
c2found = True
assert c1found
assert c2found
开发者ID:htk,项目名称:sync-engine,代码行数:25,代码来源:test_contacts.py
示例8: test_drafts_filter
def test_drafts_filter(api_client, example_draft):
r = api_client.post_data('/drafts', example_draft)
public_id = json.loads(r.data)['id']
r = api_client.get_data('/drafts')
matching_saved_drafts = [draft for draft in r if draft['id'] == public_id]
thread_public_id = matching_saved_drafts[0]['thread_id']
reply_draft = {
'subject': 'test reply',
'body': 'test reply',
'thread_id': thread_public_id
}
r = api_client.post_data('/drafts', reply_draft)
_filter = '?thread_id=0000000000000000000000000'
results = api_client.get_data('/drafts' + _filter)
assert len(results) == 0
results = api_client.get_data('/drafts?thread_id={}'
.format(thread_public_id))
assert len(results) == 2
results = api_client.get_data('/drafts?offset={}&thread_id={}'
.format(1, thread_public_id))
assert len(results) == 1
开发者ID:dlitz,项目名称:inbox,代码行数:26,代码来源:test_drafts.py
示例9: test_delete_tag
def test_delete_tag(api_client, default_namespace, thread):
post_resp = api_client.post_data('/tags/', {'name': 'foo'})
tag_resp = json.loads(post_resp.data)
tag_id = tag_resp['id']
thread_id = api_client.get_data('/threads')[0]['id']
api_client.put_data('/threads/{}'.format(thread_id), {'add_tags': ['foo']})
del_resp = api_client.delete('/tags/' + tag_id)
assert del_resp.status_code == 200
tag_data = api_client.get_data('/tags/{}'.format(tag_id))
assert tag_data['message'] == 'No tag found'
thread = api_client.get_data('/threads/{}'.format(thread_id))
assert 'foo' not in [tag['name'] for tag in thread['tags']]
del_resp = api_client.delete('/tags/!' + tag_id)
assert del_resp.status_code == 400
assert json.loads(del_resp.data)['message'].startswith('Invalid id')
del_resp = api_client.delete('/tags/0000000000000000000000000')
assert del_resp.status_code == 404
del_resp = api_client.delete('/tags/inbox')
assert del_resp.status_code == 400
assert 'user-created' in json.loads(del_resp.data)['message']
开发者ID:raghuveerkancherla,项目名称:sync-engine,代码行数:26,代码来源:test_tags.py
示例10: test_api_get
def test_api_get(db, api_client, calendar):
e_data = {'title': 'subj', 'when': {'time': 1},
'calendar_id': calendar.public_id, 'location': 'InboxHQ'}
e_data2 = {'title': 'subj2', 'when': {'time': 1},
'calendar_id': calendar.public_id, 'location': 'InboxHQ'}
api_client.post_data('/events', e_data)
api_client.post_data('/events', e_data2)
event_list = api_client.get_data('/events')
event_ids = [event['id'] for event in event_list]
c1found = False
c2found = False
for c_id in event_ids:
event = api_client.get_data('/events/' + c_id)
if event['title'] == 'subj':
c1found = True
if event['title'] == 'subj2':
c2found = True
assert c1found
assert c2found
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:25,代码来源:test_events.py
示例11: test_api_participant_reply
def test_api_participant_reply(db, api_client, rsvp):
acct = db.session.query(Account).filter_by(id=ACCOUNT_ID).one()
ns_id = acct.namespace.public_id
e_data = {
'title': 'Friday Office Party',
'when': {'time': 1407542195},
'participants': [{'email': '[email protected]'},
{'email': '[email protected]'},
{'email': '[email protected]'},
{'email': '[email protected]'},
{'email': '[email protected]'}]
}
e_resp = api_client.post_data('/events', e_data, ns_id)
e_resp_data = json.loads(e_resp.data)
assert len(e_resp_data['participants']) == 5
event_id = e_resp_data['id']
participants = e_resp_data['participants']
participant_id = participants[0]['id']
url = '/events/{}?'.format(event_id)
url += 'action=rsvp&participant_id={}&rsvp={}'.format(participant_id, rsvp)
e_resp_data = api_client.get_data(url, ns_id)
participants = e_resp_data['participants']
assert len(participants) == 5
assert participants[0]['status'] == rsvp
e_resp_data = api_client.get_data('/events/' + e_resp_data['id'], ns_id)
participants = e_resp_data['participants']
assert len(participants) == 5
assert participants[0]['status'] == rsvp
开发者ID:Dracophoenix1,项目名称:inbox,代码行数:34,代码来源:test_event_participants.py
示例12: test_delete_remote_draft
def test_delete_remote_draft(db, api_client):
from inbox.models.message import Message
# Non-Inbox created draft, therefore don't set inbox_uid
message = Message()
message.namespace_id = NAMESPACE_ID
message.thread_id = 1
message.received_date = datetime.utcnow()
message.size = len('')
message.is_draft = True
message.is_read = True
message.sanitized_body = ''
message.snippet = ''
db.session.add(message)
db.session.commit()
drafts = api_client.get_data('/drafts')
assert len(drafts) == 1
public_id = drafts[0]['id']
version = drafts[0]['version']
assert public_id == message.public_id and version == message.version
api_client.delete('/drafts/{}'.format(public_id),
{'version': version})
# Check that drafts were deleted
drafts = api_client.get_data('/drafts')
assert not drafts
开发者ID:apolmig,项目名称:inbox,代码行数:31,代码来源:test_drafts.py
示例13: test_contacts_updated
def test_contacts_updated(api_client):
"""Tests that draft-contact associations are properly created and
updated."""
draft = {
'to': [{'email': '[email protected]'}, {'email': '[email protected]'}]
}
r = api_client.post_data('/drafts', draft)
assert r.status_code == 200
draft_id = json.loads(r.data)['id']
draft_version = json.loads(r.data)['version']
r = api_client.get_data('/[email protected]')
assert len(r) == 1
updated_draft = {
'to': [{'email': '[email protected]'}, {'email': '[email protected]ample.com'}],
'version': draft_version
}
r = api_client.put_data('/drafts/{}'.format(draft_id), updated_draft)
assert r.status_code == 200
r = api_client.get_data('/[email protected]')
assert len(r) == 1
r = api_client.get_data('/[email protected]')
assert len(r) == 0
r = api_client.get_data('/[email protected]')
assert len(r) == 1
开发者ID:apolmig,项目名称:inbox,代码行数:31,代码来源:test_drafts.py
示例14: test_filter_calendar
def test_filter_calendar(db, api_client):
acct = db.session.query(Account).filter_by(id=ACCOUNT_ID).one()
ns_id = acct.namespace.public_id
c_data = {'name': 'Holidays', 'description': 'Local Holidays'}
resp = api_client.post_data('/calendars', c_data, ns_id)
resp_data = json.loads(resp.data)
cal_id = resp_data['id']
_filter = "?filter=Holidays"
resp_data = api_client.get_data('/calendars' + _filter, ns_id)[0]
assert resp_data['namespace_id'] == ns_id
assert resp_data['name'] == c_data['name']
assert resp_data['description'] == 'Local Holidays'
assert resp_data['read_only'] is False
assert resp_data['object'] == 'calendar'
assert resp_data['event_ids'] == []
_filter = "?filter=Local%20Holidays"
resp_data = api_client.get_data('/calendars' + _filter, ns_id)
assert len(resp_data) == 1
cal = db.session.query(Calendar).filter_by(public_id=cal_id).one()
db.session.delete(cal)
db.session.commit()
开发者ID:apolmig,项目名称:inbox,代码行数:27,代码来源:test_calendars.py
示例15: test_api_get
def test_api_get(contacts_provider, contact_sync, db, api_client):
contacts_provider.supply_contact('Contact One',
'[email protected]')
contacts_provider.supply_contact('Contact Two',
'[email protected]')
contact_sync.provider_instance = contacts_provider
contact_sync.poll()
acct = db.session.query(Account).filter_by(id=ACCOUNT_ID).one()
ns_id = acct.namespace.public_id
contact_list = api_client.get_data('/contacts', ns_id)
contact_ids = [contact['id'] for contact in contact_list]
c1found = False
c2found = False
for c_id in contact_ids:
contact = api_client.get_data('/contacts/' + c_id, ns_id)
if contact['name'] == 'Contact One':
c1found = True
if contact['name'] == 'Contact Two':
c2found = True
assert c1found
assert c2found
开发者ID:Dracophoenix1,项目名称:inbox,代码行数:28,代码来源:test_contacts.py
示例16: test_send
def test_send(api_client, example_draft, real_syncback_service, monkeypatch):
# We're not testing the actual SMTP sending or syncback here, so
# monkey-patch to make this run faster.
monkeypatch.setattr('inbox.sendmail.base.get_sendmail_client',
lambda *args, **kwargs: MockSMTPClient())
monkeypatch.setattr('inbox.actions.save_draft',
lambda *args, **kwargs: None)
monkeypatch.setattr('inbox.actions.delete_draft',
lambda *args, **kwargs: None)
r = api_client.post_data('/drafts', example_draft)
draft_public_id = json.loads(r.data)['id']
r = api_client.post_data('/send', {'draft_id': draft_public_id})
# TODO(emfree) do this more rigorously
gevent.sleep(1)
drafts = api_client.get_data('/drafts')
threads_with_drafts = api_client.get_data('/threads?tag=drafts')
assert not drafts
assert not threads_with_drafts
sent_threads = api_client.get_data('/threads?tag=sent')
assert len(sent_threads) == 1
message = api_client.get_data('/messages/{}'.format(draft_public_id))
assert message['state'] == 'sent'
assert message['object'] == 'message'
开发者ID:AmyWeiner,项目名称:inbox,代码行数:28,代码来源:test_drafts.py
示例17: test_move_to_read_only_calendar
def test_move_to_read_only_calendar(db, api_client):
acct = db.session.query(Account).filter_by(id=ACCOUNT_ID).one()
ns_id = acct.namespace.public_id
calendar_list = api_client.get_data('/calendars', ns_id)
read_only_calendar = None
writeable_calendar = None
writeable_event = None
for c in calendar_list:
if c['read_only']:
read_only_calendar = c
else:
writeable_calendar = c
for e_id in c['event_ids']:
e = api_client.get_data('/events/' + e_id, ns_id)
if not e['read_only']:
writeable_event = e
break
assert read_only_calendar
assert writeable_event
assert writeable_calendar
e_id = writeable_event['id']
e_data = {'calendar_id': read_only_calendar['id']}
resp = api_client.put_data('/events/' + e_id, e_data, ns_id)
assert resp.status_code != 200
开发者ID:apolmig,项目名称:inbox,代码行数:27,代码来源:test_calendars.py
示例18: test_get_with_id
def test_get_with_id(api_client, uploaded_file_ids, filename):
# See comment in uploaded_file_ids()
if filename == 'piece-jointe.jpg':
filename = u'pièce-jointe.jpg'
in_file = api_client.get_data(u'/files?filename={}'.format(filename))[0]
data = api_client.get_data('/files/{}'.format(in_file['id']))
assert data['filename'] == filename
开发者ID:Analect,项目名称:sync-engine,代码行数:7,代码来源:test_files.py
示例19: test_api_get
def test_api_get(contacts_provider, contact_sync, db, api_client,
default_namespace):
contacts_provider.supply_contact('Contact One',
'[email protected]')
contacts_provider.supply_contact('Contact Two',
'[email protected]')
contact_sync.provider = contacts_provider
contact_sync.sync()
ns_id = default_namespace.public_id
contact_list = api_client.get_data('/contacts', ns_id)
contact_ids = [contact['id'] for contact in contact_list]
c1found = False
c2found = False
for c_id in contact_ids:
contact = api_client.get_data('/contacts/' + c_id, ns_id)
if contact['name'] == 'Contact One':
c1found = True
if contact['name'] == 'Contact Two':
c2found = True
assert c1found
assert c2found
开发者ID:raghuveerkancherla,项目名称:sync-engine,代码行数:28,代码来源:test_contacts.py
示例20: test_update_draft
def test_update_draft(api_client):
original_draft = {"subject": "original draft", "body": "parent draft"}
r = api_client.post_data("/drafts", original_draft)
draft_public_id = json.loads(r.data)["id"]
version = json.loads(r.data)["version"]
assert version == 0
# Sleep so that timestamp on updated draft is different.
gevent.sleep(1)
updated_draft = {"subject": "updated draft", "body": "updated draft", "version": version}
r = api_client.put_data("/drafts/{}".format(draft_public_id), updated_draft)
updated_public_id = json.loads(r.data)["id"]
updated_version = json.loads(r.data)["version"]
assert updated_public_id == draft_public_id
assert updated_version > 0
drafts = api_client.get_data("/drafts")
assert len(drafts) == 1
assert drafts[0]["id"] == updated_public_id
# Check that the thread is updated too.
thread = api_client.get_data("/threads/{}".format(drafts[0]["thread_id"]))
assert thread["subject"] == "updated draft"
assert thread["first_message_timestamp"] == drafts[0]["date"]
assert thread["last_message_timestamp"] == drafts[0]["date"]
开发者ID:PriviPK,项目名称:privipk-sync-engine,代码行数:28,代码来源:test_drafts.py
注:本文中的tests.util.base.api_client.get_data函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论