• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python crypto_helpers.fetch_crypto_keys函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test.unit.common.middleware.crypto.crypto_helpers.fetch_crypto_keys函数的典型用法代码示例。如果您正苦于以下问题:Python fetch_crypto_keys函数的具体用法?Python fetch_crypto_keys怎么用?Python fetch_crypto_keys使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了fetch_crypto_keys函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_get_multiple_keys

 def test_get_multiple_keys(self):
     env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
     mutliple_keys = self.crypto_context.get_multiple_keys(env)
     self.assertEqual(
         [fetch_crypto_keys(),
          fetch_crypto_keys(key_id={'secret_id': 'myid'})],
         mutliple_keys)
开发者ID:mahak,项目名称:swift,代码行数:7,代码来源:test_crypto_utils.py


示例2: _test_PUT_with_etag_override_in_headers

    def _test_PUT_with_etag_override_in_headers(self, override_etag):
        # verify handling of another middleware's
        # container-update-override-etag in headers
        plaintext = b'FAKE APP'
        plaintext_etag = md5hex(plaintext)

        env = {'REQUEST_METHOD': 'PUT',
               CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
        hdrs = {'content-type': 'text/plain',
                'content-length': str(len(plaintext)),
                'Etag': plaintext_etag,
                'X-Object-Sysmeta-Container-Update-Override-Etag':
                    override_etag}
        req = Request.blank(
            '/v1/a/c/o', environ=env, body=plaintext, headers=hdrs)
        self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
        resp = req.get_response(self.encrypter)

        self.assertEqual('201 Created', resp.status)
        self.assertEqual(plaintext_etag, resp.headers['Etag'])

        # verify metadata items
        self.assertEqual(1, len(self.app.calls), self.app.calls)
        self.assertEqual(('PUT', '/v1/a/c/o'), self.app.calls[0])
        req_hdrs = self.app.headers[0]

        # verify encrypted etag for container update
        self.assertIn(
            'X-Object-Sysmeta-Container-Update-Override-Etag', req_hdrs)
        parts = req_hdrs[
            'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1)
        self.assertEqual(2, len(parts))
        cont_key = fetch_crypto_keys()['container']

        # extract crypto_meta from end of etag for container update
        param = parts[1].strip()
        crypto_meta_tag = 'swift_meta='
        self.assertTrue(param.startswith(crypto_meta_tag), param)
        actual_meta = json.loads(
            urlparse.unquote_plus(param[len(crypto_meta_tag):]))
        self.assertEqual(Crypto().cipher, actual_meta['cipher'])
        self.assertEqual(fetch_crypto_keys()['id'], actual_meta['key_id'])

        cont_etag_iv = base64.b64decode(actual_meta['iv'])
        self.assertEqual(FAKE_IV, cont_etag_iv)
        exp_etag = encrypt(override_etag.encode('ascii'),
                           cont_key, cont_etag_iv)
        self.assertEqual(exp_etag, base64.b64decode(parts[0]))
开发者ID:mahak,项目名称:swift,代码行数:48,代码来源:test_encrypter.py


示例3: test_encrypt_header_val

    def test_encrypt_header_val(self):
        # Prepare key and Crypto instance
        object_key = fetch_crypto_keys()['object']

        # - Normal string can be crypted
        encrypted = encrypter.encrypt_header_val(Crypto(), 'aaa', object_key)
        # sanity: return value is 2 item tuple
        self.assertEqual(2, len(encrypted))
        crypted_val, crypt_info = encrypted
        expected_crypt_val = base64.b64encode(
            encrypt('aaa', object_key, FAKE_IV))
        expected_crypt_info = {
            'cipher': 'AES_CTR_256', 'iv': 'This is an IV123'}
        self.assertEqual(expected_crypt_val, crypted_val)
        self.assertEqual(expected_crypt_info, crypt_info)

        # - Empty string raises a ValueError for safety
        with self.assertRaises(ValueError) as cm:
            encrypter.encrypt_header_val(Crypto(), '', object_key)

        self.assertEqual('empty value is not acceptable',
                         cm.exception.message)

        # - None also raises a ValueError for safety
        with self.assertRaises(ValueError) as cm:
            encrypter.encrypt_header_val(Crypto(), None, object_key)

        self.assertEqual('empty value is not acceptable',
                         cm.exception.message)
开发者ID:jgmerritt,项目名称:swift,代码行数:29,代码来源:test_encrypter.py


示例4: test_headers_case

    def test_headers_case(self):
        body = 'fAkE ApP'
        req = Request.blank('/v1/a/c/o', body='FaKe')
        req.environ[CRYPTO_KEY_CALLBACK] = fetch_crypto_keys
        plaintext_etag = md5hex(body)
        body_key = os.urandom(32)
        enc_body = encrypt(body, body_key, FAKE_IV)
        hdrs = self._make_response_headers(
            len(enc_body), plaintext_etag, fetch_crypto_keys(), body_key)

        hdrs.update({
            'x-Object-mEta-ignoRes-caSe': 'thIs pArt WilL bE cOol',
        })
        self.app.register(
            'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)

        status, headers, app_iter = req.call_application(self.decrypter)
        self.assertEqual(status, '200 OK')
        expected = {
            'Etag': '7f7837924188f7b511a9e3881a9f77a8',
            'X-Object-Sysmeta-Container-Update-Override-Etag':
            'encrypt me, too',
            'X-Object-Meta-Test': 'encrypt me',
            'Content-Length': '8',
            'X-Object-Meta-Ignores-Case': 'thIs pArt WilL bE cOol',
            'X-Object-Sysmeta-Test': 'do not encrypt me',
            'Content-Type': 'text/plain',
        }
        self.assertEqual(dict(headers), expected)
        self.assertEqual('fAkE ApP', ''.join(app_iter))
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:30,代码来源:test_decrypter.py


示例5: _test_request_success

    def _test_request_success(self, method, body):
        env = {'REQUEST_METHOD': method,
               CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
        req = Request.blank('/v1/a/c/o', environ=env)
        plaintext_etag = md5hex(body)
        body_key = os.urandom(32)
        enc_body = encrypt(body, body_key, FAKE_IV)
        hdrs = self._make_response_headers(
            len(enc_body), plaintext_etag, fetch_crypto_keys(), body_key)

        # there shouldn't be any x-object-meta- headers, but if there are
        # then the decrypted header will win where there is a name clash...
        hdrs.update({
            'x-object-meta-test': 'unexpected, overwritten by decrypted value',
            'x-object-meta-distinct': 'unexpected but distinct from encrypted'
        })
        self.app.register(
            method, '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
        resp = req.get_response(self.decrypter)
        self.assertEqual('200 OK', resp.status)
        self.assertEqual(plaintext_etag, resp.headers['Etag'])
        self.assertEqual('text/plain', resp.headers['Content-Type'])
        self.assertEqual('encrypt me', resp.headers['x-object-meta-test'])
        self.assertEqual('unexpected but distinct from encrypted',
                         resp.headers['x-object-meta-distinct'])
        self.assertEqual('do not encrypt me',
                         resp.headers['x-object-sysmeta-test'])
        self.assertEqual(
            'encrypt me, too',
            resp.headers['X-Object-Sysmeta-Container-Update-Override-Etag'])
        self.assertNotIn('X-Object-Sysmeta-Crypto-Body-Meta', resp.headers)
        self.assertNotIn('X-Object-Sysmeta-Crypto-Etag', resp.headers)
        return resp
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:33,代码来源:test_decrypter.py


示例6: test_GET_missing_key_callback

 def test_GET_missing_key_callback(self):
     # Do not provide keys, and do not set override flag
     env = {'REQUEST_METHOD': 'GET'}
     req = Request.blank('/v1/a/c/o', environ=env)
     body = 'FAKE APP'
     enc_body = encrypt(body, fetch_crypto_keys()['object'], FAKE_IV)
     hdrs = self._make_response_headers(
         len(body), md5hex('not the body'), fetch_crypto_keys(), 'not used')
     self.app.register(
         'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
     resp = req.get_response(self.decrypter)
     self.assertEqual('500 Internal Error', resp.status)
     self.assertEqual('Unable to retrieve encryption keys.',
                      resp.body)
     self.assertIn('missing callback',
                   self.decrypter.logger.get_lines_for_level('error')[0])
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:16,代码来源:test_decrypter.py


示例7: test_GET_multipart_content_type

    def test_GET_multipart_content_type(self):
        # *just* having multipart content type shouldn't trigger the mime doc
        # code path
        body_key = os.urandom(32)
        plaintext = 'Cwm fjord veg balks nth pyx quiz'
        plaintext_etag = md5hex(plaintext)
        ciphertext = encrypt(plaintext, body_key, FAKE_IV)

        # register request with fake swift
        hdrs = self._make_response_headers(
            len(ciphertext), plaintext_etag, fetch_crypto_keys(), body_key)
        hdrs['content-type'] = \
            'multipart/byteranges;boundary=multipartboundary'
        self.app.register('GET', '/v1/a/c/o', HTTPOk, body=ciphertext,
                          headers=hdrs)

        # issue request
        env = {'REQUEST_METHOD': 'GET',
               CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
        req = Request.blank('/v1/a/c/o', environ=env)
        resp = req.get_response(self.decrypter)

        self.assertEqual('200 OK', resp.status)
        self.assertEqual(plaintext_etag, resp.headers['Etag'])
        self.assertEqual(len(plaintext), int(resp.headers['Content-Length']))
        self.assertEqual('multipart/byteranges;boundary=multipartboundary',
                         resp.headers['Content-Type'])
        self.assertEqual(plaintext, resp.body)
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:28,代码来源:test_decrypter.py


示例8: test_GET_unencrypted_data

 def test_GET_unencrypted_data(self):
     # testing case of an unencrypted object with encrypted metadata from
     # a later POST
     env = {'REQUEST_METHOD': 'GET',
            CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
     req = Request.blank('/v1/a/c/o', environ=env)
     body = 'FAKE APP'
     obj_key = fetch_crypto_keys()['object']
     hdrs = {'Etag': md5hex(body),
             'content-type': 'text/plain',
             'content-length': len(body),
             'x-object-transient-sysmeta-crypto-meta-test':
                 base64.b64encode(encrypt('encrypt me', obj_key, FAKE_IV)) +
                 ';swift_meta=' + get_crypto_meta_header(),
             'x-object-sysmeta-test': 'do not encrypt me'}
     self.app.register('GET', '/v1/a/c/o', HTTPOk, body=body, headers=hdrs)
     resp = req.get_response(self.decrypter)
     self.assertEqual(body, resp.body)
     self.assertEqual('200 OK', resp.status)
     self.assertEqual(md5hex(body), resp.headers['Etag'])
     self.assertEqual('text/plain', resp.headers['Content-Type'])
     # POSTed user meta was encrypted
     self.assertEqual('encrypt me', resp.headers['x-object-meta-test'])
     # PUT sysmeta was not encrypted
     self.assertEqual('do not encrypt me',
                      resp.headers['x-object-sysmeta-test'])
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:26,代码来源:test_decrypter.py


示例9: do_test

        def do_test(method, plain_etags, expected_plain_etags=None):
            env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
            match_header_value = ", ".join(plain_etags)
            req = Request.blank(
                "/v1/a/c/o", environ=env, method=method, headers={match_header_name: match_header_value}
            )
            app = FakeSwift()
            app.register(method, "/v1/a/c/o", HTTPOk, {})
            resp = req.get_response(encrypter.Encrypter(app, {}))
            self.assertEqual("200 OK", resp.status)

            self.assertEqual(1, len(app.calls), app.calls)
            self.assertEqual(method, app.calls[0][0])
            actual_headers = app.headers[0]

            # verify the alternate etag location has been specified
            if match_header_value and match_header_value != "*":
                self.assertIn("X-Backend-Etag-Is-At", actual_headers)
                self.assertEqual("X-Object-Sysmeta-Crypto-Etag-Mac", actual_headers["X-Backend-Etag-Is-At"])

            # verify etags have been supplemented with masked values
            self.assertIn(match_header_name, actual_headers)
            actual_etags = set(actual_headers[match_header_name].split(", "))
            key = fetch_crypto_keys()["object"]
            masked_etags = [
                '"%s"' % base64.b64encode(hmac.new(key, etag.strip('"'), hashlib.sha256).digest())
                for etag in plain_etags
                if etag not in ("*", "")
            ]
            expected_etags = set((expected_plain_etags or plain_etags) + masked_etags)
            self.assertEqual(expected_etags, actual_etags)
            # check that the request environ was returned to original state
            self.assertEqual(set(plain_etags), set(req.headers[match_header_name].split(", ")))
开发者ID:notmyname,项目名称:swift,代码行数:33,代码来源:test_encrypter.py


示例10: test_POST_req

    def test_POST_req(self):
        body = 'FAKE APP'
        env = {'REQUEST_METHOD': 'POST',
               CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
        hdrs = {'x-object-meta-test': 'encrypt me',
                'x-object-meta-test2': '',
                'x-object-sysmeta-test': 'do not encrypt me'}
        req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs)
        key = fetch_crypto_keys()['object']
        self.app.register('POST', '/v1/a/c/o', HTTPAccepted, {})
        resp = req.get_response(self.encrypter)
        self.assertEqual('202 Accepted', resp.status)
        self.assertNotIn('Etag', resp.headers)

        # verify metadata items
        self.assertEqual(1, len(self.app.calls), self.app.calls)
        self.assertEqual('POST', self.app.calls[0][0])
        req_hdrs = self.app.headers[0]

        # user meta is encrypted
        self._verify_user_metadata(req_hdrs, 'Test', 'encrypt me', key)
        # unless it had no value
        self.assertEqual('', req_hdrs['X-Object-Meta-Test2'])

        # sysmeta is not encrypted
        self.assertEqual('do not encrypt me',
                         req_hdrs['X-Object-Sysmeta-Test'])
开发者ID:jgmerritt,项目名称:swift,代码行数:27,代码来源:test_encrypter.py


示例11: _test_etag_is_at_not_duplicated

    def _test_etag_is_at_not_duplicated(self, method):
        # verify only one occurrence of X-Object-Sysmeta-Crypto-Etag-Mac in
        # X-Backend-Etag-Is-At
        key = fetch_crypto_keys()['object']
        env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
        req = Request.blank(
            '/v1/a/c/o', environ=env, method=method,
            headers={'If-Match': '"an etag"',
                     'If-None-Match': '"another etag"'})
        self.app.register(method, '/v1/a/c/o', HTTPOk, {})
        resp = req.get_response(self.encrypter)
        self.assertEqual('200 OK', resp.status)

        self.assertEqual(1, len(self.app.calls), self.app.calls)
        self.assertEqual(method, self.app.calls[0][0])
        actual_headers = self.app.headers[0]
        self.assertIn('X-Backend-Etag-Is-At', actual_headers)
        self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac',
                         actual_headers['X-Backend-Etag-Is-At'])

        self.assertIn('"%s"' % base64.b64encode(
            hmac.new(key, 'an etag', hashlib.sha256).digest()),
            actual_headers['If-Match'])
        self.assertIn('"another etag"', actual_headers['If-None-Match'])
        self.assertIn('"%s"' % base64.b64encode(
            hmac.new(key, 'another etag', hashlib.sha256).digest()),
            actual_headers['If-None-Match'])
开发者ID:jgmerritt,项目名称:swift,代码行数:27,代码来源:test_encrypter.py


示例12: test_GET_container_json

    def test_GET_container_json(self):
        content_type_1 = u'\uF10F\uD20D\uB30B\u9409'
        content_type_2 = 'text/plain; param=foo'
        pt_etag1 = 'c6e8196d7f0fff6444b90861fe8d609d'
        pt_etag2 = 'ac0374ed4d43635f803c82469d0b5a10'
        key = fetch_crypto_keys()['container']

        obj_dict_1 = {"bytes": 16,
                      "last_modified": "2015-04-14T23:33:06.439040",
                      "hash": encrypt_and_append_meta(
                          pt_etag1.encode('utf-8'), key),
                      "name": "testfile",
                      "content_type": content_type_1}

        obj_dict_2 = {"bytes": 24,
                      "last_modified": "2015-04-14T23:33:06.519020",
                      "hash": encrypt_and_append_meta(
                          pt_etag2.encode('utf-8'), key),
                      "name": "testfile2",
                      "content_type": content_type_2}

        listing = [obj_dict_1, obj_dict_2]
        fake_body = json.dumps(listing)

        resp = self._make_cont_get_req(fake_body, 'json')

        self.assertEqual('200 OK', resp.status)
        body = resp.body
        self.assertEqual(len(body), int(resp.headers['Content-Length']))
        body_json = json.loads(body)
        self.assertEqual(2, len(body_json))
        obj_dict_1['hash'] = pt_etag1
        self.assertDictEqual(obj_dict_1, body_json[0])
        obj_dict_2['hash'] = pt_etag2
        self.assertDictEqual(obj_dict_2, body_json[1])
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:35,代码来源:test_decrypter.py


示例13: test_get_keys_with_crypto_meta

    def test_get_keys_with_crypto_meta(self):
        # verify that key_id from crypto_meta is passed to fetch_crypto_keys
        keys = fetch_crypto_keys()
        mock_fetch_crypto_keys = mock.MagicMock(return_value=keys)
        env = {CRYPTO_KEY_CALLBACK: mock_fetch_crypto_keys}
        key_id = {'secret_id': '123'}
        keys = self.crypto_context.get_keys(env, key_id=key_id)
        self.assertDictEqual(fetch_crypto_keys(), keys)
        mock_fetch_crypto_keys.assert_called_with(key_id={'secret_id': '123'})

        # but it's ok for there to be no crypto_meta
        keys = self.crypto_context.get_keys(env, key_id={})
        self.assertDictEqual(fetch_crypto_keys(), keys)
        mock_fetch_crypto_keys.assert_called_with(key_id={})
        keys = self.crypto_context.get_keys(env)
        self.assertDictEqual(fetch_crypto_keys(), keys)
        mock_fetch_crypto_keys.assert_called_with(key_id=None)
开发者ID:mahak,项目名称:swift,代码行数:17,代码来源:test_crypto_utils.py


示例14: _test_bad_key

    def _test_bad_key(self, method):
        # use bad key
        def bad_fetch_crypto_keys():
            keys = fetch_crypto_keys()
            keys['object'] = 'bad key'
            return keys

        env = {'REQUEST_METHOD': method,
               CRYPTO_KEY_CALLBACK: bad_fetch_crypto_keys}
        req = Request.blank('/v1/a/c/o', environ=env)
        body = 'FAKE APP'
        key = fetch_crypto_keys()['object']
        enc_body = encrypt(body, key, FAKE_IV)
        hdrs = self._make_response_headers(
            len(body), md5hex(body), fetch_crypto_keys(), 'not used')
        self.app.register(method, '/v1/a/c/o', HTTPOk, body=enc_body,
                          headers=hdrs)
        return req.get_response(self.decrypter)
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:18,代码来源:test_decrypter.py


示例15: do_test

        def do_test(method, plain_etags, expected_plain_etags=None):
            env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
            match_header_value = ', '.join(plain_etags)
            req = Request.blank(
                '/v1/a/c/o', environ=env, method=method,
                headers={match_header_name: match_header_value})
            app = FakeSwift()
            app.register(method, '/v1/a/c/o', HTTPOk, {})
            resp = req.get_response(encrypter.Encrypter(app, {}))
            self.assertEqual('200 OK', resp.status)

            self.assertEqual(1, len(app.calls), app.calls)
            self.assertEqual(method, app.calls[0][0])
            actual_headers = app.headers[0]

            # verify the alternate etag location has been specified
            if match_header_value and match_header_value != '*':
                self.assertIn('X-Backend-Etag-Is-At', actual_headers)
                self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac',
                                 actual_headers['X-Backend-Etag-Is-At'])

            # verify etags have been supplemented with masked values
            self.assertIn(match_header_name, actual_headers)
            actual_etags = set(actual_headers[match_header_name].split(', '))
            # masked values for secret_id None
            key = fetch_crypto_keys()['object']
            masked_etags = [
                '"%s"' % bytes_to_wsgi(base64.b64encode(hmac.new(
                    key, wsgi_to_bytes(etag.strip('"')),
                    hashlib.sha256).digest()))
                for etag in plain_etags if etag not in ('*', '')]
            # masked values for secret_id myid
            key = fetch_crypto_keys(key_id={'secret_id': 'myid'})['object']
            masked_etags_myid = [
                '"%s"' % bytes_to_wsgi(base64.b64encode(hmac.new(
                    key, wsgi_to_bytes(etag.strip('"')),
                    hashlib.sha256).digest()))
                for etag in plain_etags if etag not in ('*', '')]
            expected_etags = set((expected_plain_etags or plain_etags) +
                                 masked_etags + masked_etags_myid)
            self.assertEqual(expected_etags, actual_etags)
            # check that the request environ was returned to original state
            self.assertEqual(set(plain_etags),
                             set(req.headers[match_header_name].split(', ')))
开发者ID:mahak,项目名称:swift,代码行数:44,代码来源:test_encrypter.py


示例16: _test_GET_with_bad_crypto_meta_for_object_body

 def _test_GET_with_bad_crypto_meta_for_object_body(self, bad_crypto_meta):
     # use bad iv for object body
     env = {'REQUEST_METHOD': 'GET',
            CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
     req = Request.blank('/v1/a/c/o', environ=env)
     body = 'FAKE APP'
     key = fetch_crypto_keys()['object']
     enc_body = encrypt(body, key, FAKE_IV)
     hdrs = self._make_response_headers(
         len(body), md5hex(body), fetch_crypto_keys(), 'not used')
     hdrs['X-Object-Sysmeta-Crypto-Body-Meta'] = \
         get_crypto_meta_header(crypto_meta=bad_crypto_meta)
     self.app.register('GET', '/v1/a/c/o', HTTPOk, body=enc_body,
                       headers=hdrs)
     resp = req.get_response(self.decrypter)
     self.assertEqual('500 Internal Error', resp.status)
     self.assertEqual('Error decrypting object', resp.body)
     self.assertIn('Error decrypting object',
                   self.decrypter.logger.get_lines_for_level('error')[0])
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:19,代码来源:test_decrypter.py


示例17: test_GET_missing_etag_crypto_meta

 def test_GET_missing_etag_crypto_meta(self):
     env = {'REQUEST_METHOD': 'GET',
            CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
     req = Request.blank('/v1/a/c/o', environ=env)
     body = 'FAKE APP'
     key = fetch_crypto_keys()['object']
     enc_body = encrypt(body, key, FAKE_IV)
     hdrs = self._make_response_headers(
         len(body), md5hex(body), fetch_crypto_keys(), 'not used')
     # simulate missing crypto meta from encrypted etag
     hdrs['X-Object-Sysmeta-Crypto-Etag'] = \
         base64.b64encode(encrypt(md5hex(body), key, FAKE_IV))
     self.app.register('GET', '/v1/a/c/o', HTTPOk, body=enc_body,
                       headers=hdrs)
     resp = req.get_response(self.decrypter)
     self.assertEqual('500 Internal Error', resp.status)
     self.assertIn('Error decrypting header', resp.body)
     self.assertIn('Error decrypting header X-Object-Sysmeta-Crypto-Etag',
                   self.decrypter.logger.get_lines_for_level('error')[0])
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:19,代码来源:test_decrypter.py


示例18: test_GET_error_in_key_callback

    def test_GET_error_in_key_callback(self):
        def raise_exc():
            raise Exception('Testing')

        env = {'REQUEST_METHOD': 'GET',
               CRYPTO_KEY_CALLBACK: raise_exc}
        req = Request.blank('/v1/a/c/o', environ=env)
        body = 'FAKE APP'
        enc_body = encrypt(body, fetch_crypto_keys()['object'], FAKE_IV)
        hdrs = self._make_response_headers(
            len(body), md5hex(body), fetch_crypto_keys(), 'not used')
        self.app.register(
            'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
        resp = req.get_response(self.decrypter)
        self.assertEqual('500 Internal Error', resp.status)
        self.assertEqual('Unable to retrieve encryption keys.',
                         resp.body)
        self.assertIn('from callback: Testing',
                      self.decrypter.logger.get_lines_for_level('error')[0])
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:19,代码来源:test_decrypter.py


示例19: test_get_keys_missing_key_for_default_required_list

 def test_get_keys_missing_key_for_default_required_list(self):
     bad_keys = dict(fetch_crypto_keys())
     bad_keys.pop('object')
     with self.assertRaises(HTTPException) as cm:
         self.crypto_context.get_keys(
             {CRYPTO_KEY_CALLBACK: lambda: bad_keys})
     self.assertIn('500 Internal Error', cm.exception.message)
     self.assertIn("Missing key for 'object'",
                   self.fake_logger.get_lines_for_level('error')[0])
     self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:10,代码来源:test_crypto_utils.py


示例20: test_bad_object_key_for_default_required_list

 def test_bad_object_key_for_default_required_list(self):
     bad_keys = dict(fetch_crypto_keys())
     bad_keys['object'] = 'the minor key'
     with self.assertRaises(HTTPException) as cm:
         self.crypto_context.get_keys(
             {CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys})
     self.assertIn('500 Internal Error', cm.exception.message)
     self.assertIn("Bad key for 'object'",
                   self.fake_logger.get_lines_for_level('error')[0])
     self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
开发者ID:jgmerritt,项目名称:swift,代码行数:10,代码来源:test_crypto_utils.py



注:本文中的test.unit.common.middleware.crypto.crypto_helpers.fetch_crypto_keys函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python helpers.FakeSwift类代码示例发布时间:2022-05-27
下一篇:
Python unit.FakeLogger类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap