本文整理汇总了Python中txaws.ec2.client.Query类的典型用法代码示例。如果您正苦于以下问题:Python Query类的具体用法?Python Query怎么用?Python Query使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Query类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_handle_unicode_api_error
def test_handle_unicode_api_error(self):
"""
If an L{APIError} contains a unicode message, L{QueryAPI} is able to
protect itself from it.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
def fail_execute(call):
raise APIError(400, code="LangError",
message=u"\N{HIRAGANA LETTER A}dvanced")
self.api.execute = fail_execute
def check(ignored):
errors = self.flushLoggedErrors()
self.assertEquals(0, len(errors))
self.assertTrue(request.finished)
self.assertTrue(request.response.startswith("LangError"))
self.assertEqual(400, request.code)
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:25,代码来源:test_resource.py
示例2: test_handle_pass_params_to_call
def test_handle_pass_params_to_call(self):
"""
L{QueryAPI.handle} creates a L{Call} object with the correct
parameters.
"""
self.registry.add(TestMethod, "SomeAction", "1.2.3")
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint,
other_params={"Foo": "bar", "Version": "1.2.3"})
query.sign()
request = FakeRequest(query.params, endpoint)
def execute(call):
self.assertEqual({"Foo": "bar"}, call.get_raw_params())
self.assertIdentical(self.api.principal, call.principal)
self.assertEqual("SomeAction", call.action)
self.assertEqual("1.2.3", call.version)
self.assertEqual(request.id, call.id)
return "ok"
def check(ignored):
self.assertEqual("ok", request.response)
self.assertEqual(200, request.code)
self.api.execute = execute
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:28,代码来源:test_resource.py
示例3: test_handle_with_signature_version_1
def test_handle_with_signature_version_1(self):
"""SignatureVersion 1 is supported as well."""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint,
other_params={"SignatureVersion": "1"})
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignore):
self.assertEqual("data", request.response)
self.assertEqual(200, request.code)
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:15,代码来源:test_resource.py
示例4: test_handle_with_unsupported_action
def test_handle_with_unsupported_action(self):
"""Only actions registered in the L{Registry} are supported."""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="FooBar", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.flushLoggedErrors()
self.assertEqual("InvalidAction - The action FooBar is not valid"
" for this web service.", request.response)
self.assertEqual(400, request.code)
return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:15,代码来源:test_resource.py
示例5: test_handle_with_dump_result
def test_handle_with_dump_result(self):
"""
L{QueryAPI.handle} serializes the action result with C{dump_result}.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.assertEqual("data", json.loads(request.response))
self.api.dump_result = json.dumps
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:16,代码来源:test_resource.py
示例6: test_handle_with_unsupported_version
def test_handle_with_unsupported_version(self):
"""If signature versions is not supported an error is raised."""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.flushLoggedErrors()
self.assertEqual("InvalidSignature - SignatureVersion '2' "
"not supported", request.response)
self.assertEqual(403, request.code)
self.api.signature_versions = (1,)
return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:16,代码来源:test_resource.py
示例7: test_handle_with_endpoint_with_terminating_slash
def test_handle_with_endpoint_with_terminating_slash(self):
"""
Check signature should handle a URI with a terminating slash.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://endpoint/")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.assertEqual("data", request.response)
self.assertEqual(200, request.code)
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:16,代码来源:test_resource.py
示例8: test_handle_with_deprecated_actions
def test_handle_with_deprecated_actions(self):
"""
L{QueryAPI.handle} supports the legacy 'actions' attribute.
"""
self.api.actions = ["SomeAction"]
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.assertEqual("data", request.response)
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:16,代码来源:test_resource.py
示例9: test_handle_with_post_method
def test_handle_with_post_method(self):
"""
L{QueryAPI.handle} forwards valid requests using the HTTP POST method
to L{QueryAPI.execute}.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri", method="POST")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.assertEqual("data", request.response)
self.assertEqual(200, request.code)
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:17,代码来源:test_resource.py
示例10: test_handle_with_signature_sha1
def test_handle_with_signature_sha1(self):
"""
The C{HmacSHA1} signature method is supported, in which case the
signing using sha1 instead.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign(hash_type="sha1")
request = FakeRequest(query.params, endpoint)
def check(ignore):
self.assertEqual("data", request.response)
self.assertEqual(200, request.code)
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:17,代码来源:test_resource.py
示例11: test_handle_with_port_number
def test_handle_with_port_number(self):
"""
If the request Host header includes a port number, it's included
in the text that get signed when checking the signature.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://endpoint:1234")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.assertEqual("data", request.response)
self.assertEqual(200, request.code)
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:17,代码来源:test_resource.py
示例12: test_handle_with_non_existing_user
def test_handle_with_non_existing_user(self):
"""
If no L{Principal} can be found with the given access key ID,
L{QueryAPI.handle} responds with HTTP status 400.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.flushLoggedErrors()
self.assertEqual("AuthFailure - No user with access key 'access'",
request.response)
self.assertEqual(401, request.code)
return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:18,代码来源:test_resource.py
示例13: test_handle_with_deprecated_actions_and_unsupported_action
def test_handle_with_deprecated_actions_and_unsupported_action(self):
"""
If the deprecated L{QueryAPI.actions} attribute is set, it will be
used for looking up supported actions.
"""
self.api.actions = ["SomeAction"]
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="FooBar", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.flushLoggedErrors()
self.assertEqual("InvalidAction - The action FooBar is not valid"
" for this web service.", request.response)
self.assertEqual(400, request.code)
return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:19,代码来源:test_resource.py
示例14: test_handle_with_parameter_error
def test_handle_with_parameter_error(self):
"""
If an error occurs while parsing the parameters, L{QueryAPI.handle}
responds with HTTP status 400.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
query.params.pop("Action")
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.flushLoggedErrors()
self.assertEqual("MissingParameter - The request must contain "
"the parameter Action", request.response)
self.assertEqual(400, request.code)
return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:19,代码来源:test_resource.py
示例15: test_handle
def test_handle(self):
"""
L{QueryAPI.handle} forwards valid requests to L{QueryAPI.execute}.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.assertTrue(request.finished)
self.assertEqual("data", request.response)
self.assertEqual("4", request.headers["Content-Length"])
self.assertEqual("text/plain", request.headers["Content-Type"])
self.assertEqual(200, request.code)
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:19,代码来源:test_resource.py
示例16: test_handle_with_non_expired_signature
def test_handle_with_non_expired_signature(self):
"""
If the request contains an Expires parameter with a time that is after
the current time, everything is fine.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint,
other_params={"Expires": "2010-01-01T12:00:00Z"})
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.assertEqual("data", request.response)
self.assertEqual(200, request.code)
now = datetime(2009, 12, 31, tzinfo=tzutc())
self.api.get_utc_time = lambda: now
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:20,代码来源:test_resource.py
示例17: test_handle_with_custom_path
def test_handle_with_custom_path(self):
"""
If L{QueryAPI.path} is not C{None} it will be used in place of
the HTTP request path when calculating the signature.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://endpoint/path/")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
# Simulate a request rewrite, like apache would do
request.endpoint.path = "/"
def check(ignored):
self.assertTrue(request.finished)
self.assertEqual(200, request.code)
self.api.principal = TestPrincipal(creds)
self.api.path = "/path/"
return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:20,代码来源:test_resource.py
示例18: test_handle_with_timestamp_and_expires
def test_handle_with_timestamp_and_expires(self):
"""
If the request contains both Expires and Timestamp parameters,
an error is returned.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint,
other_params={"Timestamp": "2010-01-01T12:00:00Z",
"Expires": "2010-01-01T12:00:00Z"})
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.flushLoggedErrors()
self.assertEqual(
"InvalidParameterCombination - The parameter Timestamp"
" cannot be used with the parameter Expires",
request.response)
self.assertEqual(400, request.code)
return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:22,代码来源:test_resource.py
示例19: test_handle_with_internal_error
def test_handle_with_internal_error(self):
"""
If an unknown error occurs while handling the request,
L{QueryAPI.handle} responds with HTTP status 500.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
query.sign()
request = FakeRequest(query.params, endpoint)
self.api.execute = lambda call: 1 / 0
def check(ignored):
errors = self.flushLoggedErrors()
self.assertEquals(1, len(errors))
self.assertTrue(request.finished)
self.assertEqual("Server error", request.response)
self.assertEqual(500, request.code)
self.api.principal = TestPrincipal(creds)
return self.api.handle(request).addCallback(check)
开发者ID:antisvin,项目名称:txAWS,代码行数:22,代码来源:test_resource.py
示例20: test_handle_with_expired_signature
def test_handle_with_expired_signature(self):
"""
If the request contains an Expires parameter with a time that is before
the current time, an error is returned.
"""
creds = AWSCredentials("access", "secret")
endpoint = AWSServiceEndpoint("http://uri")
query = Query(action="SomeAction", creds=creds, endpoint=endpoint,
other_params={"Expires": "2010-01-01T12:00:00Z"})
query.sign()
request = FakeRequest(query.params, endpoint)
def check(ignored):
self.flushLoggedErrors()
self.assertEqual(
"RequestExpired - Request has expired. Expires date is"
" 2010-01-01T12:00:00Z", request.response)
self.assertEqual(400, request.code)
now = datetime(2010, 1, 1, 12, 0, 1, tzinfo=UTC)
self.api.get_utc_time = lambda: now
return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:22,代码来源:test_resource.py
注:本文中的txaws.ec2.client.Query类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论