• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python client.Query类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中txaws.ec2.client.Query的典型用法代码示例。如果您正苦于以下问题:Python Query类的具体用法?Python Query怎么用?Python Query使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Query类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_handle_unicode_api_error

    def test_handle_unicode_api_error(self):
        """
        If an L{APIError} contains a unicode message, L{QueryAPI} is able to
        protect itself from it.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def fail_execute(call):
            raise APIError(400, code="LangError",
                           message=u"\N{HIRAGANA LETTER A}dvanced")
        self.api.execute = fail_execute

        def check(ignored):
            errors = self.flushLoggedErrors()
            self.assertEquals(0, len(errors))
            self.assertTrue(request.finished)
            self.assertTrue(request.response.startswith("LangError"))
            self.assertEqual(400, request.code)

        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:25,代码来源:test_resource.py


示例2: test_handle_pass_params_to_call

    def test_handle_pass_params_to_call(self):
        """
        L{QueryAPI.handle} creates a L{Call} object with the correct
        parameters.
        """
        self.registry.add(TestMethod, "SomeAction", "1.2.3")
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint,
                      other_params={"Foo": "bar", "Version": "1.2.3"})
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def execute(call):
            self.assertEqual({"Foo": "bar"}, call.get_raw_params())
            self.assertIdentical(self.api.principal, call.principal)
            self.assertEqual("SomeAction", call.action)
            self.assertEqual("1.2.3", call.version)
            self.assertEqual(request.id, call.id)
            return "ok"

        def check(ignored):
            self.assertEqual("ok", request.response)
            self.assertEqual(200, request.code)

        self.api.execute = execute
        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:28,代码来源:test_resource.py


示例3: test_handle_with_signature_version_1

    def test_handle_with_signature_version_1(self):
        """SignatureVersion 1 is supported as well."""
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint,
                      other_params={"SignatureVersion": "1"})
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignore):
            self.assertEqual("data", request.response)
            self.assertEqual(200, request.code)

        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:15,代码来源:test_resource.py


示例4: test_handle_with_unsupported_action

    def test_handle_with_unsupported_action(self):
        """Only actions registered in the L{Registry} are supported."""
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="FooBar", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.flushLoggedErrors()
            self.assertEqual("InvalidAction - The action FooBar is not valid"
                             " for this web service.", request.response)
            self.assertEqual(400, request.code)

        return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:15,代码来源:test_resource.py


示例5: test_handle_with_dump_result

    def test_handle_with_dump_result(self):
        """
        L{QueryAPI.handle} serializes the action result with C{dump_result}.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.assertEqual("data", json.loads(request.response))

        self.api.dump_result = json.dumps
        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:16,代码来源:test_resource.py


示例6: test_handle_with_unsupported_version

    def test_handle_with_unsupported_version(self):
        """If signature versions is not supported an error is raised."""
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.flushLoggedErrors()
            self.assertEqual("InvalidSignature - SignatureVersion '2' "
                             "not supported", request.response)
            self.assertEqual(403, request.code)

        self.api.signature_versions = (1,)
        return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:16,代码来源:test_resource.py


示例7: test_handle_with_endpoint_with_terminating_slash

    def test_handle_with_endpoint_with_terminating_slash(self):
        """
        Check signature should handle a URI with a terminating slash.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://endpoint/")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.assertEqual("data", request.response)
            self.assertEqual(200, request.code)

        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:16,代码来源:test_resource.py


示例8: test_handle_with_deprecated_actions

    def test_handle_with_deprecated_actions(self):
        """
        L{QueryAPI.handle} supports the legacy 'actions' attribute.
        """
        self.api.actions = ["SomeAction"]
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.assertEqual("data", request.response)

        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:16,代码来源:test_resource.py


示例9: test_handle_with_post_method

    def test_handle_with_post_method(self):
        """
        L{QueryAPI.handle} forwards valid requests using the HTTP POST method
        to L{QueryAPI.execute}.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri", method="POST")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.assertEqual("data", request.response)
            self.assertEqual(200, request.code)

        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:17,代码来源:test_resource.py


示例10: test_handle_with_signature_sha1

    def test_handle_with_signature_sha1(self):
        """
        The C{HmacSHA1} signature method is supported, in which case the
        signing using sha1 instead.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign(hash_type="sha1")
        request = FakeRequest(query.params, endpoint)

        def check(ignore):
            self.assertEqual("data", request.response)
            self.assertEqual(200, request.code)

        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:17,代码来源:test_resource.py


示例11: test_handle_with_port_number

    def test_handle_with_port_number(self):
        """
        If the request Host header includes a port number, it's included
        in the text that get signed when checking the signature.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://endpoint:1234")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.assertEqual("data", request.response)
            self.assertEqual(200, request.code)

        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:17,代码来源:test_resource.py


示例12: test_handle_with_non_existing_user

    def test_handle_with_non_existing_user(self):
        """
        If no L{Principal} can be found with the given access key ID,
        L{QueryAPI.handle} responds with HTTP status 400.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.flushLoggedErrors()
            self.assertEqual("AuthFailure - No user with access key 'access'",
                             request.response)
            self.assertEqual(401, request.code)

        return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:18,代码来源:test_resource.py


示例13: test_handle_with_deprecated_actions_and_unsupported_action

    def test_handle_with_deprecated_actions_and_unsupported_action(self):
        """
        If the deprecated L{QueryAPI.actions} attribute is set, it will be
        used for looking up supported actions.
        """
        self.api.actions = ["SomeAction"]
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="FooBar", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.flushLoggedErrors()
            self.assertEqual("InvalidAction - The action FooBar is not valid"
                             " for this web service.", request.response)
            self.assertEqual(400, request.code)

        return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:19,代码来源:test_resource.py


示例14: test_handle_with_parameter_error

    def test_handle_with_parameter_error(self):
        """
        If an error occurs while parsing the parameters, L{QueryAPI.handle}
        responds with HTTP status 400.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        query.params.pop("Action")
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.flushLoggedErrors()
            self.assertEqual("MissingParameter - The request must contain "
                             "the parameter Action", request.response)
            self.assertEqual(400, request.code)

        return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:19,代码来源:test_resource.py


示例15: test_handle

    def test_handle(self):
        """
        L{QueryAPI.handle} forwards valid requests to L{QueryAPI.execute}.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.assertTrue(request.finished)
            self.assertEqual("data", request.response)
            self.assertEqual("4", request.headers["Content-Length"])
            self.assertEqual("text/plain", request.headers["Content-Type"])
            self.assertEqual(200, request.code)

        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:19,代码来源:test_resource.py


示例16: test_handle_with_non_expired_signature

    def test_handle_with_non_expired_signature(self):
        """
        If the request contains an Expires parameter with a time that is after
        the current time, everything is fine.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint,
                      other_params={"Expires": "2010-01-01T12:00:00Z"})
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.assertEqual("data", request.response)
            self.assertEqual(200, request.code)

        now = datetime(2009, 12, 31, tzinfo=tzutc())
        self.api.get_utc_time = lambda: now
        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:20,代码来源:test_resource.py


示例17: test_handle_with_custom_path

    def test_handle_with_custom_path(self):
        """
        If L{QueryAPI.path} is not C{None} it will be used in place of
        the HTTP request path when calculating the signature.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://endpoint/path/")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)
        # Simulate a request rewrite, like apache would do
        request.endpoint.path = "/"

        def check(ignored):
            self.assertTrue(request.finished)
            self.assertEqual(200, request.code)

        self.api.principal = TestPrincipal(creds)
        self.api.path = "/path/"
        return self.api.handle(request).addCallback(check)
开发者ID:lud4ik,项目名称:txAWS,代码行数:20,代码来源:test_resource.py


示例18: test_handle_with_timestamp_and_expires

    def test_handle_with_timestamp_and_expires(self):
        """
        If the request contains both Expires and Timestamp parameters,
        an error is returned.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint,
                      other_params={"Timestamp": "2010-01-01T12:00:00Z",
                                    "Expires": "2010-01-01T12:00:00Z"})
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.flushLoggedErrors()
            self.assertEqual(
                "InvalidParameterCombination - The parameter Timestamp"
                " cannot be used with the parameter Expires",
                request.response)
            self.assertEqual(400, request.code)

        return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:22,代码来源:test_resource.py


示例19: test_handle_with_internal_error

    def test_handle_with_internal_error(self):
        """
        If an unknown error occurs while handling the request,
        L{QueryAPI.handle} responds with HTTP status 500.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint)
        query.sign()
        request = FakeRequest(query.params, endpoint)

        self.api.execute = lambda call: 1 / 0

        def check(ignored):
            errors = self.flushLoggedErrors()
            self.assertEquals(1, len(errors))
            self.assertTrue(request.finished)
            self.assertEqual("Server error", request.response)
            self.assertEqual(500, request.code)

        self.api.principal = TestPrincipal(creds)
        return self.api.handle(request).addCallback(check)
开发者ID:antisvin,项目名称:txAWS,代码行数:22,代码来源:test_resource.py


示例20: test_handle_with_expired_signature

    def test_handle_with_expired_signature(self):
        """
        If the request contains an Expires parameter with a time that is before
        the current time, an error is returned.
        """
        creds = AWSCredentials("access", "secret")
        endpoint = AWSServiceEndpoint("http://uri")
        query = Query(action="SomeAction", creds=creds, endpoint=endpoint,
                      other_params={"Expires": "2010-01-01T12:00:00Z"})
        query.sign()
        request = FakeRequest(query.params, endpoint)

        def check(ignored):
            self.flushLoggedErrors()
            self.assertEqual(
                "RequestExpired - Request has expired. Expires date is"
                " 2010-01-01T12:00:00Z", request.response)
            self.assertEqual(400, request.code)

        now = datetime(2010, 1, 1, 12, 0, 1, tzinfo=UTC)
        self.api.get_utc_time = lambda: now
        return self.api.handle(request).addCallback(check)
开发者ID:ArtRichards,项目名称:txaws,代码行数:22,代码来源:test_resource.py



注:本文中的txaws.ec2.client.Query类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python schema.Schema类代码示例发布时间:2022-05-27
下一篇:
Python base.BaseQuery类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap