• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python api_client.get_raw函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.api.base.api_client.get_raw函数的典型用法代码示例。如果您正苦于以下问题:Python get_raw函数的具体用法?Python get_raw怎么用?Python get_raw使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_raw函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_expanded_message

def test_expanded_message(stub_message, api_client):
    def _check_json_message(msg_dict):
        assert "body" in msg_dict
        assert msg_dict["object"] == "message"
        assert msg_dict["thread_id"] == stub_message.thread.public_id

        assert isinstance(msg_dict["headers"], dict)
        assert "In-Reply-To" in msg_dict["headers"]
        assert "References" in msg_dict["headers"]
        assert "Message-Id" in msg_dict["headers"]

        valid_keys = ["account_id", "to", "from", "files", "unread", "unread", "date", "snippet"]
        assert all(x in msg_dict for x in valid_keys)

    # /message/<message_id>
    resp = api_client.get_raw("/messages/{}?view=expanded".format(stub_message.public_id))
    assert resp.status_code == 200
    resp_dict = json.loads(resp.data)
    _check_json_message(resp_dict)

    # /messages/
    resp = api_client.get_raw("/messages/?view=expanded")
    assert resp.status_code == 200
    resp_dict = json.loads(resp.data)

    for message_json in resp_dict:
        if message_json["id"] == stub_message.public_id:
            _check_json_message(message_json)
开发者ID:nohobby,项目名称:sync-engine,代码行数:28,代码来源:test_messages.py


示例2: test_expanded_message

def test_expanded_message(stub_message, api_client):
    def _check_json_message(msg_dict):
        assert 'body' in msg_dict
        assert msg_dict['object'] == 'message'
        assert msg_dict['thread_id'] == stub_message.thread.public_id

        assert isinstance(msg_dict['headers'], dict)
        assert 'In-Reply-To' in msg_dict['headers']
        assert 'References' in msg_dict['headers']
        assert 'Message-Id' in msg_dict['headers']

        valid_keys = ['account_id', 'to', 'from', 'files', 'unread',
                      'unread', 'date', 'snippet']
        assert all(x in msg_dict for x in valid_keys)

    # /message/<message_id>
    resp = api_client.get_raw(
        '/messages/{}?view=expanded'.format(stub_message.public_id))
    assert resp.status_code == 200
    resp_dict = json.loads(resp.data)
    _check_json_message(resp_dict)

    # /messages/
    resp = api_client.get_raw('/messages/?view=expanded')
    assert resp.status_code == 200
    resp_dict = json.loads(resp.data)

    for message_json in resp_dict:
        if message_json['id'] == stub_message.public_id:
            _check_json_message(message_json)
开发者ID:yodiyo,项目名称:sync-engine,代码行数:30,代码来源:test_messages.py


示例3: test_search_endpoints

def test_search_endpoints(db, api_client, token_manager, search_response,
                          default_account):
    # Message, thread search succeeds.
    for endpoint in ('messages', 'threads'):
        r = api_client.get_raw('/{}/search?q=queryme'.format(endpoint))
        assert r.status_code == 200

    default_account.sync_state = 'invalid'
    db.session.commit()

    # Message, thread search on an invalid account fails with an HTTP 403.
    for endpoint in ('messages', 'threads'):
        r = api_client.get_raw('/{}/search?q=queryme'.format(endpoint))
        assert r.status_code == 403
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:14,代码来源:test_invalid_account.py


示例4: test_invalid_gmail_account_search

def test_invalid_gmail_account_search(db, api_client, default_account,
                                      invalid_gmail_token,
                                      patch_gmail_search_response,
                                      sorted_gmail_messages, is_streaming):

    if is_streaming:
        response = api_client.get_raw('/messages/search/streaming?'
                                      'q=blah%20blah%20blah')
    else:
        response = api_client.get_raw('/messages/search?'
                                      'q=blah%20blah%20blah')

    assert response.status_code == 403
    assert "This search can\'t be performed because the account\'s "\
        "credentials are out of date." in json.loads(response.data)['message']
开发者ID:nagyistge,项目名称:nylas-sync-engine,代码行数:15,代码来源:test_searching.py


示例5: test_thread_count

def test_thread_count(db, api_client, default_account):
    date1 = datetime.datetime(2015, 1, 1, 0, 0, 0)
    date2 = datetime.datetime(2012, 1, 1, 0, 0, 0)
    date3 = datetime.datetime(2010, 1, 1, 0, 0, 0)
    date4 = datetime.datetime(2009, 1, 1, 0, 0, 0)
    date5 = datetime.datetime(2008, 1, 1, 0, 0, 0)

    thread1 = add_fake_thread(db.session, default_account.namespace.id)
    thread2 = add_fake_thread(db.session, default_account.namespace.id)

    test_subject = "test_thread_view_count_with_category"

    for thread in [thread1, thread2]:
        add_fake_message(db.session, default_account.namespace.id, thread,
                         subject=test_subject, received_date=date1)
        add_fake_message(db.session, default_account.namespace.id, thread,
                         subject=test_subject, received_date=date2,
                         add_sent_category=True)
        add_fake_message(db.session, default_account.namespace.id, thread,
                         subject=test_subject, received_date=date3)
        add_fake_message(db.session, default_account.namespace.id, thread,
                         subject=test_subject, received_date=date4,
                         add_sent_category=True)
        add_fake_message(db.session, default_account.namespace.id, thread,
                         subject=test_subject, received_date=date5)

    resp = api_client.get_raw('/threads/?view=count&in=sent')
    assert resp.status_code == 200
    threads = json.loads(resp.data)
    assert threads['count'] == 2
开发者ID:busseyl,项目名称:sync-engine,代码行数:30,代码来源:test_threads.py


示例6: test_thread_sent_recent_date

def test_thread_sent_recent_date(db, api_client, default_account):
    date1 = datetime.datetime(2015, 1, 1, 0, 0, 0)
    date2 = datetime.datetime(2012, 1, 1, 0, 0, 0)
    date3 = datetime.datetime(2010, 1, 1, 0, 0, 0)
    date4 = datetime.datetime(2009, 1, 1, 0, 0, 0)
    date5 = datetime.datetime(2008, 1, 1, 0, 0, 0)

    thread1 = add_fake_thread(db.session, default_account.namespace.id)

    test_subject = "test_thread_sent_recent_date"

    add_fake_message(db.session, default_account.namespace.id, thread1,
                     subject=test_subject, received_date=date1)
    add_fake_message(db.session, default_account.namespace.id, thread1,
                     subject=test_subject, received_date=date2,
                     add_sent_category=True)
    add_fake_message(db.session, default_account.namespace.id, thread1,
                     subject=test_subject, received_date=date3)
    add_fake_message(db.session, default_account.namespace.id, thread1,
                     subject=test_subject, received_date=date4,
                     add_sent_category=True)
    add_fake_message(db.session, default_account.namespace.id, thread1,
                     subject=test_subject, received_date=date5)

    resp = api_client.get_raw('/threads/')
    assert resp.status_code == 200
    threads = json.loads(resp.data)

    for thread in threads:  # should only be one
        assert datetime.datetime.fromtimestamp(
            thread['last_message_sent_timestamp']) == date2
开发者ID:busseyl,项目名称:sync-engine,代码行数:31,代码来源:test_threads.py


示例7: test_thread_received_recent_date

def test_thread_received_recent_date(db, api_client, default_account):
    date1 = datetime.datetime(2015, 1, 1, 0, 0, 0)
    date2 = datetime.datetime(2012, 1, 1, 0, 0, 0)

    thread1 = add_fake_thread(db.session, default_account.namespace.id)

    date_dict = dict()

    add_fake_message(db.session, default_account.namespace.id, thread1,
                     subject="Test Thread 1", received_date=date1,
                     add_sent_category=True)
    add_fake_message(db.session, default_account.namespace.id, thread1,
                     subject="Test Thread 1", received_date=date2)

    date_dict["Test Thread 1"] = date2

    thread2 = add_fake_thread(db.session, default_account.namespace.id)
    add_fake_message(db.session, default_account.namespace.id, thread2,
                     subject="Test Thread 2", received_date=date1,
                     add_sent_category=True)

    date_dict["Test Thread 2"] = date1

    resp = api_client.get_raw('/threads/')
    assert resp.status_code == 200
    threads = json.loads(resp.data)

    for thread in threads:
        assert date_dict[thread['subject']] == \
            datetime.datetime.fromtimestamp(
                thread['last_message_received_timestamp'])
开发者ID:busseyl,项目名称:sync-engine,代码行数:31,代码来源:test_threads.py


示例8: test_api_expand_recurring_message

def test_api_expand_recurring_message(db, api_client, message,
                                      recurring_event):
    # This is a regression test for https://phab.nylas.com/T3556
    # ("InflatedEvent should not be committed" exception in API").
    event = recurring_event
    event.message = message
    db.session.commit()

    events = api_client.get_data('/events?expand_recurring=false')
    assert len(events) == 1

    # Make sure the recurrence info is on the recurring event
    for e in events:
        if e['title'] == 'recurring-weekly':
            assert e.get('recurrence') is not None
            assert e.get('message_id') is not None

    r = api_client.get_raw('/events?expand_recurring=true')
    assert r.status_code == 200

    all_events = api_client.get_data('/events?expand_recurring=true')
    assert len(all_events) != 0

    for event in all_events:
        assert event['master_event_id'] is not None
        assert 'message_id' not in event
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:26,代码来源:test_events_recurring.py


示例9: test_empty_response_when_latest_cursor_given

def test_empty_response_when_latest_cursor_given(db, api_client,
                                                 default_namespace):
    cursor = get_cursor(api_client, int(time.time() + 22), default_namespace)
    url = url_concat('/delta/streaming', {'timeout': .1, 'cursor': cursor})
    r = api_client.get_raw(url)
    assert r.status_code == 200
    assert r.data.strip() == ''
开发者ID:rskumar,项目名称:sync-engine,代码行数:7,代码来源:test_streaming.py


示例10: test_rfc822_format

def test_rfc822_format(stub_message_from_raw, api_client, mime_message):
    """ Test the API response to retreive raw message contents """
    full_path = '/messages/{}'.format(stub_message_from_raw.public_id)

    results = api_client.get_raw(full_path,
                                 headers={'Accept': 'message/rfc822'})
    assert results.data == get_from_blockstore(stub_message_from_raw.data_sha256)
开发者ID:yodiyo,项目名称:sync-engine,代码行数:7,代码来源:test_messages.py


示例11: test_expanded_threads

def test_expanded_threads(stub_message, api_client):
    def _check_json_thread(resp_dict):
        assert "message_ids" not in resp_dict
        assert "messages" in resp_dict
        assert "drafts" in resp_dict
        assert len(resp_dict["participants"]) == 3
        assert len(resp_dict["messages"]) == 2
        assert len(resp_dict["drafts"]) == 1

        for msg_dict in resp_dict["messages"]:
            assert "body" not in msg_dict
            assert msg_dict["object"] == "message"
            assert msg_dict["thread_id"] == stub_message.thread.public_id
            valid_keys = ["account_id", "to", "from", "files", "unread", "unread", "date", "snippet"]
            assert all(x in msg_dict for x in valid_keys)

        for draft in resp_dict["drafts"]:
            assert "body" not in draft
            assert draft["object"] == "draft"
            assert draft["thread_id"] == stub_message.thread.public_id
            valid_keys = [
                "account_id",
                "to",
                "from",
                "files",
                "unread",
                "snippet",
                "date",
                "version",
                "reply_to_message_id",
            ]
            assert all(x in draft for x in valid_keys)

    # /threads/<thread_id>
    resp = api_client.get_raw("/threads/{}?view=expanded".format(stub_message.thread.public_id))
    assert resp.status_code == 200
    resp_dict = json.loads(resp.data)
    _check_json_thread(resp_dict)

    # /threads/
    resp = api_client.get_raw("/threads/?view=expanded".format(stub_message.thread.public_id))
    assert resp.status_code == 200
    resp_dict = json.loads(resp.data)

    for thread_json in resp_dict:
        if thread_json["id"] == stub_message.thread.public_id:
            _check_json_thread(thread_json)
开发者ID:nohobby,项目名称:sync-engine,代码行数:47,代码来源:test_messages.py


示例12: test_expanded_threads

def test_expanded_threads(stub_message, api_client, api_version):
    def _check_json_thread(resp_dict):
        assert 'message_ids' not in resp_dict
        assert 'messages' in resp_dict
        assert 'drafts' in resp_dict
        assert len(resp_dict['participants']) == 3
        assert len(resp_dict['messages']) == 2
        assert len(resp_dict['drafts']) == 1

        for msg_dict in resp_dict['messages']:
            assert 'body' not in msg_dict
            assert msg_dict['object'] == 'message'
            assert msg_dict['thread_id'] == stub_message.thread.public_id
            valid_keys = ['account_id', 'to', 'from', 'files', 'unread',
                          'unread', 'date', 'snippet']
            assert all(x in msg_dict for x in valid_keys)

        for draft in resp_dict['drafts']:
            assert 'body' not in draft
            assert draft['object'] == 'draft'
            assert draft['thread_id'] == stub_message.thread.public_id
            valid_keys = ['account_id', 'to', 'from', 'files', 'unread',
                          'snippet', 'date', 'version', 'reply_to_message_id']
            assert all(x in draft for x in valid_keys)

    headers = dict()
    headers['Api-Version'] = api_version

    # /threads/<thread_id>
    resp = api_client.get_raw(
        '/threads/{}?view=expanded'.format(stub_message.thread.public_id),
        headers=headers)
    assert resp.status_code == 200
    resp_dict = json.loads(resp.data)
    _check_json_thread(resp_dict)

    # /threads/
    resp = api_client.get_raw(
        '/threads/?view=expanded'.format(stub_message.thread.public_id),
        headers=headers)
    assert resp.status_code == 200
    resp_dict = json.loads(resp.data)

    for thread_json in resp_dict:
        if thread_json['id'] == stub_message.thread.public_id:
            _check_json_thread(thread_json)
开发者ID:busseyl,项目名称:sync-engine,代码行数:46,代码来源:test_messages.py


示例13: test_response_when_old_cursor_given

def test_response_when_old_cursor_given(db, api_client, default_namespace):
    url = url_concat('/delta/streaming', {'timeout': .1, 'cursor': '0'})
    r = api_client.get_raw(url)
    assert r.status_code == 200
    responses = r.data.split('\n')
    for response_string in responses:
        if response_string:
            validate_response_format(response_string)
开发者ID:rskumar,项目名称:sync-engine,代码行数:8,代码来源:test_streaming.py


示例14: test_sender_and_participants

def test_sender_and_participants(stub_message, api_client):
    resp = api_client.get_raw("/threads/{}".format(stub_message.thread.public_id))
    assert resp.status_code == 200
    resp_dict = json.loads(resp.data)
    participants = resp_dict["participants"]
    assert len(participants) == 3

    # Not expanded, should only return IDs
    assert "message" not in resp_dict
    assert "drafts" not in resp_dict
开发者ID:nohobby,项目名称:sync-engine,代码行数:10,代码来源:test_messages.py


示例15: test_exclude_and_include_object_types

def test_exclude_and_include_object_types(db, api_client, thread,
                                          default_namespace):

    add_fake_message(
        db.session,
        default_namespace.id,
        thread,
        from_addr=[('Bob', '[email protected]')])
    # Check that we do get message and contact changes by default.
    url = url_concat('/delta/streaming', {'timeout': .1, 'cursor': '0'})
    r = api_client.get_raw(url)
    assert r.status_code == 200
    responses = r.data.split('\n')
    parsed_responses = [json.loads(resp) for resp in responses if resp != '']
    assert any(resp['object'] == 'message' for resp in parsed_responses)
    assert any(resp['object'] == 'contact' for resp in parsed_responses)

    # And check that we don't get message/contact changes if we exclude them.
    url = url_concat('/delta/streaming', {
        'timeout': .1,
        'cursor': '0',
        'exclude_types': 'message,contact'
    })
    r = api_client.get_raw(url)
    assert r.status_code == 200
    responses = r.data.split('\n')
    parsed_responses = [json.loads(resp) for resp in responses if resp != '']
    assert not any(resp['object'] == 'message' for resp in parsed_responses)
    assert not any(resp['object'] == 'contact' for resp in parsed_responses)

    # And check we only get message objects if we use include_types
    url = url_concat('/delta/streaming', {
        'timeout': .1,
        'cursor': '0',
        'include_types': 'message'
    })
    r = api_client.get_raw(url)
    assert r.status_code == 200
    responses = r.data.split('\n')
    parsed_responses = [json.loads(resp) for resp in responses if resp != '']
    assert all(resp['object'] == 'message' for resp in parsed_responses)
开发者ID:rskumar,项目名称:sync-engine,代码行数:41,代码来源:test_streaming.py


示例16: test_expanded_view

def test_expanded_view(db, api_client, thread, message, default_namespace):
    url = url_concat('/delta/streaming', {'timeout': .1, 'cursor': '0',
                                          'include_types': 'message,thread',
                                          'view': 'expanded'})
    r = api_client.get_raw(url)
    assert r.status_code == 200
    responses = r.data.split('\n')
    parsed_responses = [json.loads(resp) for resp in responses if resp != '']
    for delta in parsed_responses:
        if delta['object'] == 'message':
            assert 'headers' in delta['attributes']
        elif delta['object'] == 'thread':
            assert 'messages' in delta['attributes']
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:13,代码来源:test_streaming.py


示例17: test_gracefully_handle_new_namespace

def test_gracefully_handle_new_namespace(db, api_client):
    new_namespace = Namespace()
    new_account = GenericAccount()
    new_account.password = 'hunter2'
    new_namespace.account = new_account
    db.session.add(new_namespace)
    db.session.add(new_account)
    db.session.commit()
    cursor = get_cursor(api_client, int(time.time()),
                        new_namespace)
    url = url_concat('/n/{}/delta/streaming'.format(new_namespace.public_id),
                     {'timeout': .1, 'cursor': cursor})
    r = api_client.get_raw(url)
    assert r.status_code == 200
开发者ID:rskumar,项目名称:sync-engine,代码行数:14,代码来源:test_streaming.py


示例18: label_client

def label_client(db, gmail_account):
    api_client = new_api_client(db, gmail_account.namespace)

    # Whereas calling generic_account always makes a new IMAP account,
    # calling gmail_account checks first to see if there's an existing
    # Gmail account and uses it if so. This can cause namespace
    # conflicts if a label is "created" more than once. Since
    # labels can't be deleted and then re-created, this fixture only
    # makes a new label if there are no existing labels.
    g_data = api_client.get_raw('/labels/')
    if not json.loads(g_data.data):
        api_client.post_data('/labels/',
                             {"display_name": "Test_Label"})
    return api_client
开发者ID:busseyl,项目名称:sync-engine,代码行数:14,代码来源:test_folders_labels.py


示例19: test_read_endpoints

def test_read_endpoints(db, setup_account, api_client, default_account):
    # Read operations succeed.
    for resource, public_id in setup_account.items():
        endpoint = '/{}s'.format(resource)
        r = api_client.get_raw(endpoint)
        assert r.status_code == 200

        read_endpoint = '{}/{}'.format(endpoint, public_id)
        r = api_client.get_raw(read_endpoint)
        assert r.status_code == 200

    default_account.sync_state = 'invalid'
    db.session.commit()

    # Read operations on an invalid account also succeed.
    for resource, public_id in setup_account.items():
        endpoint = '/{}s'.format(resource)
        r = api_client.get_raw(endpoint)
        assert r.status_code == 200

        read_endpoint = '{}/{}'.format(endpoint, public_id)
        r = api_client.get_raw(read_endpoint)
        assert r.status_code == 200
开发者ID:DrMoriarty,项目名称:sync-engine,代码行数:23,代码来源:test_invalid_account.py


示例20: test_sender_and_participants

def test_sender_and_participants(stub_message, api_client, api_version):
    headers = dict()
    headers['Api-Version'] = api_version

    resp = api_client.get_raw('/threads/{}'
                              .format(stub_message.thread.public_id),
                              headers=headers)
    assert resp.status_code == 200
    resp_dict = json.loads(resp.data)
    participants = resp_dict['participants']
    assert len(participants) == 3

    # Not expanded, should only return IDs
    assert 'message' not in resp_dict
    assert 'drafts' not in resp_dict
开发者ID:busseyl,项目名称:sync-engine,代码行数:15,代码来源:test_messages.py



注:本文中的tests.api.base.api_client.get_raw函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python api_client.post_data函数代码示例发布时间:2022-05-27
下一篇:
Python api_client.get_data函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap