本文整理汇总了Python中test.ably.restsetup.RestSetup类的典型用法代码示例。如果您正苦于以下问题:Python RestSetup类的具体用法?Python RestSetup怎么用?Python RestSetup使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了RestSetup类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_with_key
def test_with_key(self):
self.ably = RestSetup.get_ably_rest(use_binary_protocol=self.use_binary_protocol)
token_details = self.ably.auth.request_token()
assert isinstance(token_details, TokenDetails)
ably = RestSetup.get_ably_rest(key=None, token_details=token_details,
use_binary_protocol=self.use_binary_protocol)
channel = self.get_channel_name('test_request_token_with_key')
ably.channels[channel].publish('event', 'foo')
assert ably.channels[channel].history().items[0].data == 'foo'
开发者ID:ably,项目名称:ably-python,代码行数:13,代码来源:restauth_test.py
示例2: test_idempotent_rest_publishing
def test_idempotent_rest_publishing(self):
# Test default value
if api_version < '1.2':
assert self.ably.options.idempotent_rest_publishing is False
else:
assert self.ably.options.idempotent_rest_publishing is True
# Test setting value explicitly
ably = RestSetup.get_ably_rest(idempotent_rest_publishing=True)
assert ably.options.idempotent_rest_publishing is True
ably = RestSetup.get_ably_rest(idempotent_rest_publishing=False)
assert ably.options.idempotent_rest_publishing is False
开发者ID:ably,项目名称:ably-python,代码行数:13,代码来源:restchannelpublish_test.py
示例3: setUpClass
def setUpClass(cls):
RestSetup._RestSetup__test_vars = None
cls.ably = RestSetup.get_ably_rest()
cls.ably_text = RestSetup.get_ably_rest(use_binary_protocol=False)
cls.last_year = datetime.now().year - 1
cls.previous_year = datetime.now().year - 2
cls.last_interval = datetime(cls.last_year, 2, 3, 15, 5)
cls.previous_interval = datetime(cls.previous_year, 2, 3, 15, 5)
previous_year_stats = 120
stats = [
{
'intervalId': Stats.to_interval_id(cls.last_interval -
timedelta(minutes=2),
'minute'),
'inbound': {'realtime': {'messages': {'count': 50, 'data': 5000}}},
'outbound': {'realtime': {'messages': {'count': 20, 'data': 2000}}}
},
{
'intervalId': Stats.to_interval_id(cls.last_interval - timedelta(minutes=1),
'minute'),
'inbound': {'realtime': {'messages': {'count': 60, 'data': 6000}}},
'outbound': {'realtime': {'messages': {'count': 10, 'data': 1000}}}
},
{
'intervalId': Stats.to_interval_id(cls.last_interval, 'minute'),
'inbound': {'realtime': {'messages': {'count': 70, 'data': 7000}}},
'outbound': {'realtime': {'messages': {'count': 40, 'data': 4000}}},
'persisted': {'presence': {'count': 20, 'data': 2000}},
'connections': {'tls': {'peak': 20, 'opened': 10}},
'channels': {'peak': 50, 'opened': 30},
'apiRequests': {'succeeded': 50, 'failed': 10},
'tokenRequests': {'succeeded': 60, 'failed': 20},
}
]
previous_stats = []
for i in range(previous_year_stats):
previous_stats.append(
{
'intervalId': Stats.to_interval_id(cls.previous_interval -
timedelta(minutes=i),
'minute'),
'inbound': {'realtime': {'messages': {'count': i}}}
}
)
cls.ably.http.post('/stats', body=stats + previous_stats)
开发者ID:ably,项目名称:ably-python,代码行数:48,代码来源:reststats_test.py
示例4: setUp
def setUp(self):
self.ably = RestSetup.get_ably_rest(use_binary_protocol=False)
# Mocked responses
# without headers
responses.add(responses.GET,
'http://rest.ably.io/channels/channel_name/ch1',
body='[{"id": 0}, {"id": 1}]', status=200,
content_type='application/json')
# with headers
responses.add_callback(
responses.GET,
'http://rest.ably.io/channels/channel_name/ch2',
self.get_response_callback(
headers={
'link':
'<http://rest.ably.io/channels/channel_name/ch2?page=1>; rel="first",'
' <http://rest.ably.io/channels/channel_name/ch2?page=2>; rel="next"'
},
body='[{"id": 0}, {"id": 1}]',
status=200),
content_type='application/json')
# start intercepting requests
responses.start()
self.paginated_result = PaginatedResult.paginated_query(
self.ably.http,
url='http://rest.ably.io/channels/channel_name/ch1',
response_processor=lambda response: response.to_native())
self.paginated_result_with_headers = PaginatedResult.paginated_query(
self.ably.http,
url='http://rest.ably.io/channels/channel_name/ch2',
response_processor=lambda response: response.to_native())
开发者ID:ably,项目名称:ably-python,代码行数:34,代码来源:restpaginatedresult_test.py
示例5: test_without_permissions
def test_without_permissions(self):
key = test_vars["keys"][2]
ably = RestSetup.get_ably_rest(key=key["key_str"])
with pytest.raises(AblyException) as excinfo:
ably.channels['test_publish_without_permission'].publish('foo', 'woop')
assert 'not permitted' in excinfo.value.message
开发者ID:ably,项目名称:ably-python,代码行数:7,代码来源:restchannels_test.py
示例6: test_time_accuracy
def test_time_accuracy(self):
ably = RestSetup.get_ably_rest(use_binary_protocol=self.use_binary_protocol)
reported_time = ably.time()
actual_time = time.time() * 1000.0
seconds = 10
assert abs(actual_time - reported_time) < seconds * 1000, "Time is not within %s seconds" % seconds
开发者ID:ably,项目名称:ably-python,代码行数:8,代码来源:resttime_test.py
示例7: test_key_name_and_secret_are_required
def test_key_name_and_secret_are_required(self):
ably = RestSetup.get_ably_rest(key=None, token='not a real token')
with pytest.raises(AblyException, match="40101 401 No key specified"):
ably.auth.create_token_request()
with pytest.raises(AblyException, match="40101 401 No key specified"):
ably.auth.create_token_request(key_name=self.key_name)
with pytest.raises(AblyException, match="40101 401 No key specified"):
ably.auth.create_token_request(key_secret=self.key_secret)
开发者ID:ably,项目名称:ably-python,代码行数:8,代码来源:resttoken_test.py
示例8: test_request_token_with_specified_key
def test_request_token_with_specified_key(self):
key = RestSetup.get_test_vars()["keys"][1]
token_details = self.ably.auth.request_token(
key_name=key["key_name"], key_secret=key["key_secret"])
self.assertIsNotNone(token_details.token, msg="Expected token")
self.assertEqual(key.get("capability"),
token_details.capability,
msg="Unexpected capability")
开发者ID:vintasoftware,项目名称:ably-python,代码行数:8,代码来源:resttoken_test.py
示例9: test_time_without_key_or_token
def test_time_without_key_or_token(self):
ably = RestSetup.get_ably_rest(key=None, token='foo',
use_binary_protocol=self.use_binary_protocol)
reported_time = ably.time()
actual_time = time.time() * 1000.0
seconds = 10
assert abs(actual_time - reported_time) < seconds * 1000, "Time is not within %s seconds" % seconds
开发者ID:ably,项目名称:ably-python,代码行数:9,代码来源:resttime_test.py
示例10: setUpClass
def setUpClass(cls):
cls.ably = RestSetup.get_ably_rest()
# Populate the channel (using the new api)
cls.channel = cls.get_channel_name()
cls.path = '/channels/%s/messages' % cls.channel
for i in range(20):
body = {'name': 'event%s' % i, 'data': 'lorem ipsum %s' % i}
cls.ably.request('POST', cls.path, body=body)
开发者ID:ably,项目名称:ably-python,代码行数:9,代码来源:restrequest_test.py
示例11: test_client_id_null_for_anonymous_auth
def test_client_id_null_for_anonymous_auth(self):
ably = RestSetup.get_ably_rest(
key=None,
key_name=test_vars["keys"][0]["key_name"],
key_secret=test_vars["keys"][0]["key_secret"])
token = ably.auth.authorize()
assert isinstance(token, TokenDetails)
assert token.client_id is None
assert ably.auth.client_id is None
开发者ID:ably,项目名称:ably-python,代码行数:10,代码来源:restauth_test.py
示例12: test_publish_error
def test_publish_error(self):
ably = RestSetup.get_ably_rest(use_binary_protocol=self.use_binary_protocol)
ably.auth.authorize(
token_params={'capability': {"only_subscribe": ["subscribe"]}})
with pytest.raises(AblyException) as excinfo:
ably.channels["only_subscribe"].publish()
assert 401 == excinfo.value.status_code
assert 40160 == excinfo.value.code
开发者ID:ably,项目名称:ably-python,代码行数:10,代码来源:restchannelpublish_test.py
示例13: test_query_time_param
def test_query_time_param(self):
ably = RestSetup.get_ably_rest(query_time=True,
use_binary_protocol=self.use_binary_protocol)
timestamp = ably.auth._timestamp
with patch('ably.rest.rest.AblyRest.time', wraps=ably.time) as server_time,\
patch('ably.rest.auth.Auth._timestamp', wraps=timestamp) as local_time:
ably.auth.request_token()
assert not local_time.called
assert server_time.called
开发者ID:ably,项目名称:ably-python,代码行数:10,代码来源:restinit_test.py
示例14: test_when_auth_url_has_query_string
def test_when_auth_url_has_query_string(self):
url = 'http://www.example.com?with=query'
headers = {'foo': 'bar'}
self.ably = RestSetup.get_ably_rest(key=None, auth_url=url)
responses.add(responses.GET, 'http://www.example.com',
body='token_string')
self.ably.auth.request_token(auth_url=url,
auth_headers=headers,
auth_params={'spam': 'eggs'})
assert responses.calls[0].request.url.endswith('?with=query&spam=eggs')
开发者ID:ably,项目名称:ably-python,代码行数:11,代码来源:restauth_test.py
示例15: test_publish_message_with_wrong_client_id_on_implicit_identified_client
def test_publish_message_with_wrong_client_id_on_implicit_identified_client(self):
new_token = self.ably.auth.authorize(
token_params={'client_id': uuid.uuid4().hex})
new_ably = RestSetup.get_ably_rest(key=None, token=new_token.token,
use_binary_protocol=self.use_binary_protocol)
channel = new_ably.channels[
self.get_channel_name('persisted:wrong_client_id_implicit_client')]
with pytest.raises(AblyException) as excinfo:
channel.publish(name='publish', data='test', client_id='invalid')
assert 400 == excinfo.value.status_code
assert 40012 == excinfo.value.code
开发者ID:ably,项目名称:ably-python,代码行数:13,代码来源:restchannelpublish_test.py
示例16: test_client_id_null_until_auth
def test_client_id_null_until_auth(self):
client_id = uuid.uuid4().hex
token_ably = RestSetup.get_ably_rest(
default_token_params={'client_id': client_id})
# before auth, client_id is None
assert token_ably.auth.client_id is None
token = token_ably.auth.authorize()
assert isinstance(token, TokenDetails)
# after auth, client_id is defined
assert token.client_id == client_id
assert token_ably.auth.client_id == client_id
开发者ID:ably,项目名称:ably-python,代码行数:13,代码来源:restauth_test.py
示例17: test_token_request_can_be_used_to_get_a_token
def test_token_request_can_be_used_to_get_a_token(self):
token_request = self.ably.auth.create_token_request(
key_name=self.key_name, key_secret=self.key_secret)
assert isinstance(token_request, TokenRequest)
def auth_callback(token_params):
return token_request
ably = RestSetup.get_ably_rest(key=None, auth_callback=auth_callback,
use_binary_protocol=self.use_binary_protocol)
token = ably.auth.authorize()
assert isinstance(token, TokenDetails)
开发者ID:ably,项目名称:ably-python,代码行数:14,代码来源:resttoken_test.py
示例18: test_when_not_renewable
def test_when_not_renewable(self):
self.ably = RestSetup.get_ably_rest(
key=None,
token='token ID cannot be used to create a new token',
use_binary_protocol=False)
self.ably.channels[self.channel].publish('evt', 'msg')
assert 1 == self.publish_attempts
publish = self.ably.channels[self.channel].publish
with pytest.raises(AblyAuthException, match="The provided token is not renewable and there is no means to generate a new token"):
publish('evt', 'msg')
assert 0 == self.token_requests
开发者ID:ably,项目名称:ably-python,代码行数:14,代码来源:restauth_test.py
示例19: setUpClass
def setUpClass(cls):
cls.ably = RestSetup.get_ably_rest()
# Register several devices for later use
cls.devices = {}
for i in range(10):
cls.save_device()
# Register several subscriptions for later use
cls.channels = {'canpublish:test1': [], 'canpublish:test2': [], 'canpublish:test3': []}
for key, channel in zip(cls.devices, itertools.cycle(cls.channels)):
device = cls.devices[key]
cls.save_subscription(channel, device_id=device.id)
assert len(list(itertools.chain(*cls.channels.values()))) == len(cls.devices)
开发者ID:ably,项目名称:ably-python,代码行数:14,代码来源:restpush_test.py
示例20: test_client_id_precedence
def test_client_id_precedence(self):
client_id = uuid.uuid4().hex
overridden_client_id = uuid.uuid4().hex
ably = RestSetup.get_ably_rest(
use_binary_protocol=self.use_binary_protocol,
client_id=client_id,
default_token_params={'client_id': overridden_client_id})
token = ably.auth.authorize()
assert token.client_id == client_id
assert ably.auth.client_id == client_id
channel = ably.channels[
self.get_channel_name('test_client_id_precedence')]
channel.publish('test', 'data')
assert channel.history().items[0].client_id == client_id
开发者ID:ably,项目名称:ably-python,代码行数:15,代码来源:restauth_test.py
注:本文中的test.ably.restsetup.RestSetup类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论