• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python pubnub_asyncio.PubNubAsyncio类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pubnub.pubnub_asyncio.PubNubAsyncio的典型用法代码示例。如果您正苦于以下问题:Python PubNubAsyncio类的具体用法?Python PubNubAsyncio怎么用?Python PubNubAsyncio使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了PubNubAsyncio类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_single_channel_group

def test_single_channel_group(event_loop):
    pubnub = PubNubAsyncio(pnconf_pam_copy(), custom_event_loop=event_loop)
    pubnub.config.uuid = "test-pam-asyncio-uuid"
    cg = "test-pam-asyncio-cg"

    env = (yield from pubnub.grant()
           .channel_groups(cg)
           .write(True)
           .read(True)
           .future())

    assert isinstance(env.result, PNAccessManagerGrantResult)
    assert env.result.level == 'channel-group'
    assert env.result.groups[cg].read_enabled == 1
    assert env.result.groups[cg].write_enabled == 1
    assert env.result.groups[cg].manage_enabled == 0

    env = (yield from pubnub.audit()
           .channel_groups(cg)
           .future())

    assert isinstance(env.result, PNAccessManagerAuditResult)
    assert env.result.level == 'channel-group'
    assert env.result.groups[cg].read_enabled == 1
    assert env.result.groups[cg].write_enabled == 1
    assert env.result.groups[cg].manage_enabled == 0

    pubnub.stop()
开发者ID:,项目名称:,代码行数:28,代码来源:


示例2: test_success

def test_success(event_loop):
    pubnub = PubNubAsyncio(pnconf, custom_event_loop=event_loop)

    res = yield from pubnub.delete_messages().channel("my-ch").start(123).end(456).future()

    if res.status.is_error():
        raise AssertionError()
开发者ID:pubnub,项目名称:python,代码行数:7,代码来源:test_history_delete.py


示例3: test_single_channel_with_auth

def test_single_channel_with_auth(event_loop):
    pubnub = PubNubAsyncio(pnconf_pam_copy(), custom_event_loop=event_loop)
    pubnub.config.uuid = "test-pam-asyncio-uuid"
    ch = "test-pam-asyncio-ch"
    auth = "test-pam-asyncio-auth"

    env = (yield from pubnub.grant()
           .channels(ch)
           .write(True)
           .read(True)
           .auth_keys(auth)
           .future())

    assert isinstance(env.result, PNAccessManagerGrantResult)
    assert env.result.channels[ch].auth_keys[auth].read_enabled == 1
    assert env.result.channels[ch].auth_keys[auth].write_enabled == 1
    assert env.result.channels[ch].auth_keys[auth].manage_enabled == 0

    env = (yield from pubnub.audit()
           .channels(ch)
           .auth_keys(auth)
           .future())

    assert isinstance(env.result, PNAccessManagerAuditResult)
    assert env.result.channels[ch].auth_keys[auth].read_enabled == 1
    assert env.result.channels[ch].auth_keys[auth].write_enabled == 1
    assert env.result.channels[ch].auth_keys[auth].manage_enabled == 0

    pubnub.stop()
开发者ID:,项目名称:,代码行数:29,代码来源:


示例4: test_multiple_channels

def test_multiple_channels(event_loop):
    pubnub = PubNubAsyncio(pnconf_pam_copy(), custom_event_loop=event_loop)
    pubnub.config.uuid = "test-pam-asyncio-uuid"
    ch1 = "test-pam-asyncio-ch1"
    ch2 = "test-pam-asyncio-ch2"

    env = (yield from pubnub.grant()
           .channels([ch1, ch2])
           .write(True)
           .read(True)
           .future())

    assert isinstance(env.result, PNAccessManagerGrantResult)
    assert env.result.channels[ch1].read_enabled is True
    assert env.result.channels[ch2].read_enabled is True
    assert env.result.channels[ch1].write_enabled is True
    assert env.result.channels[ch2].write_enabled is True
    assert env.result.channels[ch1].manage_enabled is False
    assert env.result.channels[ch2].manage_enabled is False

    env = (yield from pubnub.audit()
           .channels([ch1, ch2])
           .future())

    assert isinstance(env.result, PNAccessManagerAuditResult)
    assert env.result.channels[ch1].read_enabled is True
    assert env.result.channels[ch2].read_enabled is True
    assert env.result.channels[ch1].write_enabled is True
    assert env.result.channels[ch2].write_enabled is True
    assert env.result.channels[ch1].manage_enabled is False
    assert env.result.channels[ch2].manage_enabled is False

    pubnub.stop()
开发者ID:,项目名称:,代码行数:33,代码来源:


示例5: test_not_permitted

def test_not_permitted(event_loop):
    pnconf = pnconf_pam_copy()
    pnconf.secret_key = None
    pubnub = PubNubAsyncio(pnconf, custom_event_loop=event_loop)

    yield from assert_server_side_error_yield(pubnub.publish().channel(ch).message("hey"), "HTTP Client Error (403")
    pubnub.stop()
开发者ID:,项目名称:,代码行数:7,代码来源:


示例6: test_multiple_channels

def test_multiple_channels(event_loop):
    pubnub = PubNubAsyncio(pnconf, custom_event_loop=event_loop)
    ch1 = 'test-state-asyncio-ch1'
    ch2 = 'test-state-asyncio-ch2'
    pubnub.config.uuid = 'test-state-asyncio-uuid'
    state = {"name": "Alex", "count": 5}

    env = yield from pubnub.set_state() \
        .channels([ch1, ch2]) \
        .state(state) \
        .future()

    assert env.result.state['name'] == "Alex"
    assert env.result.state['count'] == 5

    env = yield from pubnub.get_state() \
        .channels([ch1, ch2]) \
        .future()

    assert env.result.channels[ch1]['name'] == "Alex"
    assert env.result.channels[ch2]['name'] == "Alex"
    assert env.result.channels[ch1]['count'] == 5
    assert env.result.channels[ch2]['count'] == 5

    pubnub.stop()
开发者ID:pubnub,项目名称:python,代码行数:25,代码来源:test_state.py


示例7: test_multiple_channel_groups_with_auth

def test_multiple_channel_groups_with_auth(event_loop):
    pubnub = PubNubAsyncio(pnconf_pam_copy(), custom_event_loop=event_loop)
    pubnub.config.uuid = "my_uuid"
    gr1 = "test-pam-asyncio-cg1"
    gr2 = "test-pam-asyncio-cg2"
    auth = "test-pam-asyncio-auth"

    env = (yield from pubnub.grant()
           .channel_groups([gr1, gr2])
           .write(True)
           .read(True)
           .auth_keys(auth)
           .future())

    assert isinstance(env.result, PNAccessManagerGrantResult)
    assert env.result.groups[gr1].auth_keys[auth].read_enabled is True
    assert env.result.groups[gr2].auth_keys[auth].read_enabled is True
    assert env.result.groups[gr1].auth_keys[auth].write_enabled is True
    assert env.result.groups[gr2].auth_keys[auth].write_enabled is True
    assert env.result.groups[gr1].auth_keys[auth].manage_enabled is False
    assert env.result.groups[gr2].auth_keys[auth].manage_enabled is False

    env = (yield from pubnub.audit()
           .channel_groups([gr1, gr2])
           .future())

    assert isinstance(env.result, PNAccessManagerAuditResult)
    assert env.result.groups[gr1].auth_keys[auth].read_enabled is True
    assert env.result.groups[gr2].auth_keys[auth].read_enabled is True
    assert env.result.groups[gr1].auth_keys[auth].write_enabled is True
    assert env.result.groups[gr2].auth_keys[auth].write_enabled is True
    assert env.result.groups[gr1].auth_keys[auth].manage_enabled is False
    assert env.result.groups[gr2].auth_keys[auth].manage_enabled is False

    pubnub.stop()
开发者ID:,项目名称:,代码行数:35,代码来源:


示例8: test_publish_envelope

def test_publish_envelope(event_loop):
    pubnub = PubNubAsyncio(pnconf_copy(), custom_event_loop=event_loop)
    envelope = yield from pubnub.publish().message('hey').channel('blah').future()
    assert isinstance(envelope, AsyncioEnvelope)
    assert not envelope.is_error()

    pubnub.stop()
开发者ID:pubnub,项目名称:python,代码行数:7,代码来源:test_invocations.py


示例9: test_error_non_serializable

def test_error_non_serializable(event_loop):
    pubnub = PubNubAsyncio(pnconf_copy(), custom_event_loop=event_loop)

    def method():
        pass

    yield from assert_client_side_error(pubnub.publish().channel(ch).message(method), "not JSON serializable")
    pubnub.stop()
开发者ID:,项目名称:,代码行数:8,代码来源:


示例10: test_publish_envelope_raises

def test_publish_envelope_raises(event_loop):
    pubnub = PubNubAsyncio(corrupted_keys, custom_event_loop=event_loop)
    e = yield from pubnub.publish().message('hey').channel('blah').future()
    assert isinstance(e, PubNubAsyncioException)
    assert e.is_error()
    assert 400 == e.value()._status_code

    pubnub.stop()
开发者ID:pubnub,项目名称:python,代码行数:8,代码来源:test_invocations.py


示例11: test_publish_super_admin_call

def test_publish_super_admin_call(event_loop):
    pubnub = PubNubAsyncio(pnconf_pam_copy(), custom_event_loop=event_loop)

    yield from pubnub.publish().channel(ch).message("hey").future()
    yield from pubnub.publish().channel("foo.bar").message("hey^&#$").should_store(True).meta({
        'name': 'alex'
    }).future()

    pubnub.stop()
开发者ID:,项目名称:,代码行数:9,代码来源:


示例12: test_publish_mixed_via_post

def test_publish_mixed_via_post(event_loop):
    pubnub = PubNubAsyncio(pnconf_copy(), custom_event_loop=event_loop)
    yield from asyncio.gather(
        asyncio.ensure_future(assert_success_publish_post(pubnub, "hi")),
        asyncio.ensure_future(assert_success_publish_post(pubnub, 5)),
        asyncio.ensure_future(assert_success_publish_post(pubnub, True)),
        asyncio.ensure_future(assert_success_publish_post(pubnub, ["hi", "hi2", "hi3"])))

    pubnub.stop()
开发者ID:,项目名称:,代码行数:9,代码来源:


示例13: test_time

def test_time(event_loop):
    pubnub = PubNubAsyncio(pnconf, custom_event_loop=event_loop)

    res = yield from pubnub.time().result()

    assert int(res) > 0
    assert isinstance(res.date_time(), date)

    pubnub.stop()
开发者ID:pubnub,项目名称:python,代码行数:9,代码来源:test_time.py


示例14: test_publish_future_raises_pubnub_error

def test_publish_future_raises_pubnub_error(event_loop):
    pubnub = PubNubAsyncio(corrupted_keys, custom_event_loop=event_loop)

    with pytest.raises(PubNubException) as exinfo:
        yield from pubnub.publish().message('hey').channel('blah').result()

    assert 'Invalid Subscribe Key' in str(exinfo.value)
    assert 400 == exinfo.value._status_code

    pubnub.stop()
开发者ID:pubnub,项目名称:python,代码行数:10,代码来源:test_invocations.py


示例15: test_error_invalid_key

def test_error_invalid_key(event_loop):
    conf = PNConfiguration()
    conf.publish_key = "fake"
    conf.subscribe_key = "demo"
    conf.enable_subscribe = False

    pubnub = PubNubAsyncio(conf, custom_event_loop=event_loop)

    yield from assert_server_side_error_yield(pubnub.publish().channel(ch).message("hey"), "Invalid Key")
    pubnub.stop()
开发者ID:,项目名称:,代码行数:10,代码来源:


示例16: test_publish_envelope_raises_lower_level_error

def test_publish_envelope_raises_lower_level_error(event_loop):
    pubnub = PubNubAsyncio(corrupted_keys, custom_event_loop=event_loop)

    pubnub._connector.close()

    e = yield from pubnub.publish().message('hey').channel('blah').future()
    assert isinstance(e, PubNubAsyncioException)
    assert e.is_error()
    assert str(e.value()) == 'Session is closed'

    pubnub.stop()
开发者ID:pubnub,项目名称:python,代码行数:11,代码来源:test_invocations.py


示例17: test_publish_future_raises_lower_level_error

def test_publish_future_raises_lower_level_error(event_loop):
    pubnub = PubNubAsyncio(corrupted_keys, custom_event_loop=event_loop)

    pubnub._connector.close()

    with pytest.raises(RuntimeError) as exinfo:
        yield from pubnub.publish().message('hey').channel('blah').result()

    assert 'Session is closed' in str(exinfo.value)

    pubnub.stop()
开发者ID:pubnub,项目名称:python,代码行数:11,代码来源:test_invocations.py


示例18: test_where_now_super_admin_call

def test_where_now_super_admin_call(event_loop):
    pubnub = PubNubAsyncio(pnconf_pam_copy(), custom_event_loop=event_loop)

    uuid = 'test-where-now-asyncio-uuid'
    pubnub.config.uuid = uuid

    res = yield from pubnub.where_now() \
        .uuid(uuid) \
        .result()
    assert isinstance(res, PNWhereNowResult)

    pubnub.stop()
开发者ID:,项目名称:,代码行数:12,代码来源:


示例19: test_super_call

def test_super_call(event_loop):
    pubnub = PubNubAsyncio(pnconf_pam_copy(), custom_event_loop=event_loop)

    ch = "channel-groups-torna|do-ch"
    gr = "channel-groups-torna|do-cg"
    pubnub.config.auth = "h.e|l%l,0"

    # add
    env = yield from pubnub.add_channel_to_channel_group() \
        .channels(ch).channel_group(gr).future()

    assert isinstance(env.result, PNChannelGroupsAddChannelResult)

    # list
    env = yield from pubnub.list_channels_in_channel_group().channel_group(gr).future()
    assert isinstance(env.result, PNChannelGroupsListResult)

    # remove channel
    env = yield from pubnub.remove_channel_from_channel_group().channel_group(gr).channels(ch).future()
    assert isinstance(env.result, PNChannelGroupsRemoveChannelResult)

    # remove group
    env = yield from pubnub.remove_channel_group().channel_group(gr).future()
    assert isinstance(env.result, PNChannelGroupsRemoveGroupResult)

    # list
    env = yield from pubnub.list_channels_in_channel_group().channel_group(gr).future()
    assert isinstance(env.result, PNChannelGroupsListResult)

    pubnub.stop()
开发者ID:pubnub,项目名称:python,代码行数:30,代码来源:test_channel_groups.py


示例20: test_state_super_admin_call

def test_state_super_admin_call(event_loop):
    pnconf = pnconf_pam_copy()
    pubnub = PubNubAsyncio(pnconf, custom_event_loop=event_loop)
    ch1 = "test-state-asyncio-ch1"
    ch2 = "test-state-asyncio-ch2"
    pubnub.config.uuid = "test-state-asyncio-uuid"
    state = {"name": "Alex", "count": 5}

    env = yield from pubnub.set_state().channels([ch1, ch2]).state(state).future()
    assert isinstance(env.result, PNSetStateResult)

    env = yield from pubnub.get_state().channels([ch1, ch2]).future()
    assert isinstance(env.result, PNGetStateResult)

    pubnub.stop()
开发者ID:pubnub,项目名称:python,代码行数:15,代码来源:test_state.py



注:本文中的pubnub.pubnub_asyncio.PubNubAsyncio类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python pubnub_tornado.PubNubTornado类代码示例发布时间:2022-05-25
下一篇:
Python pubnub.Pubnub类代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap