• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python unpaddedbase64.encode_base64函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中unpaddedbase64.encode_base64函数的典型用法代码示例。如果您正苦于以下问题:Python encode_base64函数的具体用法?Python encode_base64怎么用?Python encode_base64使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了encode_base64函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: response_json_object

    def response_json_object(self):
        verify_keys = {}
        for key in self.config.signing_key:
            verify_key_bytes = key.verify_key.encode()
            key_id = "%s:%s" % (key.alg, key.version)
            verify_keys[key_id] = {
                u"key": encode_base64(verify_key_bytes)
            }

        old_verify_keys = {}
        for key_id, key in self.config.old_signing_keys.items():
            verify_key_bytes = key.encode()
            old_verify_keys[key_id] = {
                u"key": encode_base64(verify_key_bytes),
                u"expired_ts": key.expired_ts,
            }

        tls_fingerprints = self.config.tls_fingerprints

        json_object = {
            u"valid_until_ts": self.valid_until_ts,
            u"server_name": self.config.server_name,
            u"verify_keys": verify_keys,
            u"old_verify_keys": old_verify_keys,
            u"tls_fingerprints": tls_fingerprints,
        }
        for key in self.config.signing_key:
            json_object = sign_json(
                json_object,
                self.config.server_name,
                key,
            )
        return json_object
开发者ID:rubo77,项目名称:synapse,代码行数:33,代码来源:local_key_resource.py


示例2: sign_json

def sign_json(json_object, signature_name, signing_key):
    """Sign the JSON object. Stores the signature in json_object["signatures"].

    Args:
        json_object (dict): The JSON object to sign.
        signature_name (str): The name of the signing entity.
        signing_key (syutil.crypto.SigningKey): The key to sign the JSON with.

    Returns:
        The modified, signed JSON object."""

    signatures = json_object.pop("signatures", {})
    unsigned = json_object.pop("unsigned", None)

    message_bytes = encode_canonical_json(json_object)
    signed = signing_key.sign(message_bytes)
    signature_base64 = encode_base64(signed.signature)

    key_id = "%s:%s" % (signing_key.alg, signing_key.version)
    signatures.setdefault(signature_name, {})[key_id] = signature_base64

    # logger.debug("SIGNING: %s %s %s", signature_name, key_id, message_bytes)

    json_object["signatures"] = signatures
    if unsigned is not None:
        json_object["unsigned"] = unsigned

    return json_object
开发者ID:matrix-org,项目名称:python-signedjson,代码行数:28,代码来源:sign.py


示例3: check_event_content_hash

def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
    """Check whether the hash for this PDU matches the contents"""
    name, expected_hash = compute_content_hash(event.get_pdu_json(), hash_algorithm)
    logger.debug("Expecting hash: %s", encode_base64(expected_hash))

    # some malformed events lack a 'hashes'. Protect against it being missing
    # or a weird type by basically treating it the same as an unhashed event.
    hashes = event.get("hashes")
    if not isinstance(hashes, dict):
        raise SynapseError(400, "Malformed 'hashes'", Codes.UNAUTHORIZED)

    if name not in hashes:
        raise SynapseError(
            400,
            "Algorithm %s not in hashes %s" % (
                name, list(hashes),
            ),
            Codes.UNAUTHORIZED,
        )
    message_hash_base64 = hashes[name]
    try:
        message_hash_bytes = decode_base64(message_hash_base64)
    except Exception:
        raise SynapseError(
            400,
            "Invalid base64: %s" % (message_hash_base64,),
            Codes.UNAUTHORIZED,
        )
    return message_hash_bytes == expected_hash
开发者ID:matrix-org,项目名称:synapse,代码行数:29,代码来源:event_signing.py


示例4: read_certificate_from_disk

    def read_certificate_from_disk(self, require_cert_and_key):
        """
        Read the certificates and private key from disk.

        Args:
            require_cert_and_key (bool): set to True to throw an error if the certificate
                and key file are not given
        """
        if require_cert_and_key:
            self.tls_private_key = self.read_tls_private_key()
            self.tls_certificate = self.read_tls_certificate()
        elif self.tls_certificate_file:
            # we only need the certificate for the tls_fingerprints. Reload it if we
            # can, but it's not a fatal error if we can't.
            try:
                self.tls_certificate = self.read_tls_certificate()
            except Exception as e:
                logger.info(
                    "Unable to read TLS certificate (%s). Ignoring as no "
                    "tls listeners enabled.", e,
                )

        self.tls_fingerprints = list(self._original_tls_fingerprints)

        if self.tls_certificate:
            # Check that our own certificate is included in the list of fingerprints
            # and include it if it is not.
            x509_certificate_bytes = crypto.dump_certificate(
                crypto.FILETYPE_ASN1, self.tls_certificate
            )
            sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest())
            sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints)
            if sha256_fingerprint not in sha256_fingerprints:
                self.tls_fingerprints.append({u"sha256": sha256_fingerprint})
开发者ID:matrix-org,项目名称:synapse,代码行数:34,代码来源:tls.py


示例5: select_pdus

def select_pdus(cursor):
    cursor.execute(
        "SELECT pdu_id, origin FROM pdus ORDER BY depth ASC"
    )

    ids = cursor.fetchall()

    pdu_tuples = store._get_pdu_tuples(cursor, ids)

    pdus = [Pdu.from_pdu_tuple(p) for p in pdu_tuples]

    reference_hashes = {}

    for pdu in pdus:
        try:
            if pdu.prev_pdus:
                print "PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus
                for pdu_id, origin, hashes in pdu.prev_pdus:
                    ref_alg, ref_hsh = reference_hashes[(pdu_id, origin)]
                    hashes[ref_alg] = encode_base64(ref_hsh)
                    store._store_prev_pdu_hash_txn(cursor,  pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh)
                print "SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus
            pdu = add_event_pdu_content_hash(pdu)
            ref_alg, ref_hsh = compute_pdu_event_reference_hash(pdu)
            reference_hashes[(pdu.pdu_id, pdu.origin)] = (ref_alg, ref_hsh)
            store._store_pdu_reference_hash_txn(cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh)

            for alg, hsh_base64 in pdu.hashes.items():
                print alg, hsh_base64
                store._store_pdu_content_hash_txn(cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64))

        except:
            print "FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus
开发者ID:0-T-0,项目名称:synapse,代码行数:33,代码来源:hash_history.py


示例6: add_event_hashes

    def add_event_hashes(self, event_ids):
        hashes = yield self.get_event_reference_hashes(event_ids)
        hashes = {
            e_id: {k: encode_base64(v) for k, v in h.items() if k == "sha256"}
            for e_id, h in hashes.items()
        }

        defer.returnValue(list(hashes.items()))
开发者ID:matrix-org,项目名称:synapse,代码行数:8,代码来源:signatures.py


示例7: encode_signing_key_base64

def encode_signing_key_base64(key):
    """Encode a signing key as base64
    Args:
        key (SigningKey): A signing key to encode.
    Returns:
        base64 encoded string.
    """
    return encode_base64(key.encode())
开发者ID:matrix-org,项目名称:python-signedjson,代码行数:8,代码来源:key.py


示例8: encode_verify_key_base64

def encode_verify_key_base64(key):
    """Encode a verify key as base64
    Args:
        key (VerifyKey): A signing key to encode.
    Returns:
        base64 encoded string.
    """
    return encode_base64(key.encode())
开发者ID:matrix-org,项目名称:python-signedjson,代码行数:8,代码来源:key.py


示例9: get_server_verify_key_v2_direct

    def get_server_verify_key_v2_direct(self, server_name, key_ids):
        keys = {}

        for requested_key_id in key_ids:
            if requested_key_id in keys:
                continue

            (response, tls_certificate) = yield fetch_server_key(
                server_name, self.hs.tls_server_context_factory,
                path=(b"/_matrix/key/v2/server/%s" % (
                    urllib.quote(requested_key_id),
                )).encode("ascii"),
            )

            if (u"signatures" not in response
                    or server_name not in response[u"signatures"]):
                raise KeyLookupError("Key response not signed by remote server")

            if "tls_fingerprints" not in response:
                raise KeyLookupError("Key response missing TLS fingerprints")

            certificate_bytes = crypto.dump_certificate(
                crypto.FILETYPE_ASN1, tls_certificate
            )
            sha256_fingerprint = hashlib.sha256(certificate_bytes).digest()
            sha256_fingerprint_b64 = encode_base64(sha256_fingerprint)

            response_sha256_fingerprints = set()
            for fingerprint in response[u"tls_fingerprints"]:
                if u"sha256" in fingerprint:
                    response_sha256_fingerprints.add(fingerprint[u"sha256"])

            if sha256_fingerprint_b64 not in response_sha256_fingerprints:
                raise KeyLookupError("TLS certificate not allowed by fingerprints")

            response_keys = yield self.process_v2_response(
                from_server=server_name,
                requested_ids=[requested_key_id],
                response_json=response,
            )

            keys.update(response_keys)

        yield logcontext.make_deferred_yieldable(defer.gatherResults(
            [
                run_in_background(
                    self.store_keys,
                    server_name=key_server_name,
                    from_server=server_name,
                    verify_keys=verify_keys,
                )
                for key_server_name, verify_keys in keys.items()
            ],
            consumeErrors=True
        ).addErrback(unwrapFirstError))

        defer.returnValue(keys)
开发者ID:rubo77,项目名称:synapse,代码行数:57,代码来源:keyring.py


示例10: render_GET

    def render_GET(self, request):
        err, args = get_args(request, ("public_key",))
        if err:
            return json.dumps(err)

        pubKey = self.sydent.keyring.ed25519.verify_key
        pubKeyBase64 = encode_base64(pubKey.encode())

        return json.dumps({'valid': args["public_key"] == pubKeyBase64})
开发者ID:matrix-org,项目名称:sydent,代码行数:9,代码来源:pubkeyservlets.py


示例11: select_v1_keys

def select_v1_keys(connection):
    cursor = connection.cursor()
    cursor.execute("SELECT server_name, key_id, verify_key FROM server_signature_keys")
    rows = cursor.fetchall()
    cursor.close()
    results = {}
    for server_name, key_id, verify_key in rows:
        results.setdefault(server_name, {})[key_id] = encode_base64(verify_key)
    return results
开发者ID:DoubleMalt,项目名称:synapse,代码行数:9,代码来源:convert_server_keys.py


示例12: event_id

    def event_id(self):
        # We have to import this here as otherwise we get an import loop which
        # is hard to break.
        from synapse.crypto.event_signing import compute_event_reference_hash

        if self._event_id:
            return self._event_id
        self._event_id = "$" + encode_base64(compute_event_reference_hash(self)[1])
        return self._event_id
开发者ID:matrix-org,项目名称:synapse,代码行数:9,代码来源:__init__.py


示例13: test_sign_and_verify

 def test_sign_and_verify(self):
     self.assertIn('signatures', self.signed)
     self.assertIn('Alice', self.signed['signatures'])
     self.assertIn('mock:test', self.signed['signatures']['Alice'])
     self.assertEqual(
         self.signed['signatures']['Alice']['mock:test'],
         encode_base64(b'x_______')
     )
     self.assertEqual(self.sigkey.signed_bytes, b'{"foo":"bar"}')
     verify_signed_json(self.signed, 'Alice', self.verkey)
开发者ID:matrix-org,项目名称:python-signedjson,代码行数:10,代码来源:test_sign.py


示例14: response_json_object

    def response_json_object(self):
        verify_keys = {}
        for key in self.config.signing_key:
            verify_key_bytes = key.verify_key.encode()
            key_id = "%s:%s" % (key.alg, key.version)
            verify_keys[key_id] = {
                u"key": encode_base64(verify_key_bytes)
            }

        old_verify_keys = {}
        for key in self.config.old_signing_keys:
            key_id = "%s:%s" % (key.alg, key.version)
            verify_key_bytes = key.encode()
            old_verify_keys[key_id] = {
                u"key": encode_base64(verify_key_bytes),
                u"expired_ts": key.expired,
            }

        x509_certificate_bytes = crypto.dump_certificate(
            crypto.FILETYPE_ASN1,
            self.config.tls_certificate
        )

        sha256_fingerprint = sha256(x509_certificate_bytes).digest()

        json_object = {
            u"valid_until_ts": self.valid_until_ts,
            u"server_name": self.config.server_name,
            u"verify_keys": verify_keys,
            u"old_verify_keys": old_verify_keys,
            u"tls_fingerprints": [{
                u"sha256": encode_base64(sha256_fingerprint),
            }]
        }
        for key in self.config.signing_key:
            json_object = sign_json(
                json_object,
                self.config.server_name,
                key,
            )
        return json_object
开发者ID:0-T-0,项目名称:synapse,代码行数:41,代码来源:local_key_resource.py


示例15: add_event_hashes

    def add_event_hashes(self, event_ids):
        hashes = yield self.get_event_reference_hashes(
            event_ids
        )
        hashes = [
            {
                k: encode_base64(v) for k, v in h.items()
                if k == "sha256"
            }
            for h in hashes
        ]

        defer.returnValue(zip(event_ids, hashes))
开发者ID:Vutsuak16,项目名称:synapse,代码行数:13,代码来源:signatures.py


示例16: response_json_object

    def response_json_object(server_config):
        verify_keys = {}
        for key in server_config.signing_key:
            verify_key_bytes = key.verify_key.encode()
            key_id = "%s:%s" % (key.alg, key.version)
            verify_keys[key_id] = encode_base64(verify_key_bytes)

        x509_certificate_bytes = crypto.dump_certificate(
            crypto.FILETYPE_ASN1,
            server_config.tls_certificate
        )
        json_object = {
            u"server_name": server_config.server_name,
            u"verify_keys": verify_keys,
            u"tls_certificate": encode_base64(x509_certificate_bytes)
        }
        for key in server_config.signing_key:
            json_object = sign_json(
                json_object,
                server_config.server_name,
                key,
            )

        return json_object
开发者ID:roblabla,项目名称:synapse,代码行数:24,代码来源:server_key_resource.py


示例17: _get_latest_event_ids_and_hashes_in_room

    def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id):
        sql = (
            "SELECT e.event_id, e.depth FROM events as e "
            "INNER JOIN event_forward_extremities as f "
            "ON e.event_id = f.event_id "
            "AND e.room_id = f.room_id "
            "WHERE f.room_id = ?"
        )

        txn.execute(sql, (room_id,))

        results = []
        for event_id, depth in txn.fetchall():
            hashes = self._get_event_reference_hashes_txn(txn, event_id)
            prev_hashes = {
                k: encode_base64(v) for k, v in hashes.items() if k == "sha256"
            }
            results.append((event_id, prev_hashes, depth))

        return results
开发者ID:matrix-org,项目名称:synapse,代码行数:20,代码来源:event_federation.py


示例18: main

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin
    )
    args = parser.parse_args()
    logging.basicConfig()

    event_json = dictobj(json.load(args.input_json))

    algorithms = {"sha256": hashlib.sha256}

    for alg_name in event_json.hashes:
        if check_event_content_hash(event_json, algorithms[alg_name]):
            print("PASS content hash %s" % (alg_name,))
        else:
            print("FAIL content hash %s" % (alg_name,))

    for algorithm in algorithms.values():
        name, h_bytes = compute_event_reference_hash(event_json, algorithm)
        print("Reference hash %s: %s" % (name, encode_base64(h_bytes)))
开发者ID:DoubleMalt,项目名称:synapse,代码行数:21,代码来源:check_event_hash.py


示例19: check_event_content_hash

def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
    """Check whether the hash for this PDU matches the contents"""
    name, expected_hash = compute_content_hash(event, hash_algorithm)
    logger.debug("Expecting hash: %s", encode_base64(expected_hash))
    if name not in event.hashes:
        raise SynapseError(
            400,
            "Algorithm %s not in hashes %s" % (
                name, list(event.hashes),
            ),
            Codes.UNAUTHORIZED,
        )
    message_hash_base64 = event.hashes[name]
    try:
        message_hash_bytes = decode_base64(message_hash_base64)
    except:
        raise SynapseError(
            400,
            "Invalid base64: %s" % (message_hash_base64,),
            Codes.UNAUTHORIZED,
        )
    return message_hash_bytes == expected_hash
开发者ID:roblabla,项目名称:synapse,代码行数:22,代码来源:event_signing.py


示例20: read_config

    def read_config(self, config):
        self.tls_certificate = self.read_tls_certificate(
            config.get("tls_certificate_path")
        )
        self.tls_certificate_file = config.get("tls_certificate_path")

        self.no_tls = config.get("no_tls", False)

        if self.no_tls:
            self.tls_private_key = None
        else:
            self.tls_private_key = self.read_tls_private_key(
                config.get("tls_private_key_path")
            )

        self.tls_dh_params_path = self.check_file(
            config.get("tls_dh_params_path"), "tls_dh_params"
        )

        self.tls_fingerprints = config["tls_fingerprints"]

        # Check that our own certificate is included in the list of fingerprints
        # and include it if it is not.
        x509_certificate_bytes = crypto.dump_certificate(
            crypto.FILETYPE_ASN1,
            self.tls_certificate
        )
        sha256_fingerprint = encode_base64(sha256(x509_certificate_bytes).digest())
        sha256_fingerprints = set(f["sha256"] for f in self.tls_fingerprints)
        if sha256_fingerprint not in sha256_fingerprints:
            self.tls_fingerprints.append({u"sha256": sha256_fingerprint})

        # This config option applies to non-federation HTTP clients
        # (e.g. for talking to recaptcha, identity servers, and such)
        # It should never be used in production, and is intended for
        # use only when running tests.
        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
            "use_insecure_ssl_client_just_for_testing_do_not_use"
        )
开发者ID:DoubleMalt,项目名称:synapse,代码行数:39,代码来源:tls.py



注:本文中的unpaddedbase64.encode_base64函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python untangle.parse函数代码示例发布时间:2022-05-27
下一篇:
Python unpaddedbase64.decode_base64函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap