本文整理汇总了Python中twisted.internet.ssl.Certificate类的典型用法代码示例。如果您正苦于以下问题:Python Certificate类的具体用法?Python Certificate怎么用?Python Certificate使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Certificate类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, certificate):
if not isinstance(certificate, Certificate):
if os.path.isfile(certificate):
certificate = Certificate.loadPEM(open(certificate).read())
else:
certificate = Certificate.loadPEM(certificate)
self._original = certificate
self.certificate = certificate.dumpPEM()
开发者ID:UfSoft,项目名称:SSHgD,代码行数:8,代码来源:creds.py
示例2: connect
def connect(self, host=None, port=None, cert=None, key=None):
'''
Connect to another portal somewhere. If retry is set, will attempt to reconnect
with the target continuously. As of the time of this writing, you cannot stop a
polling connection without taking down the portal.
:param retry: continuously attempt to connect on drops or rejections
:type retry: bool.
'''
host = host if host else self.host
port = port if port else self.port
cert = cert if cert else self.certCa
key = key if key else self.keyPrivate # ???
# the first term is the name the server is using in the cert (for now)
ctx = optionsForClientTLS(u"pds.production", Certificate.loadPEM(cert), PrivateCertificate.loadPEM(key))
factory = RiffleClientFactory()
SSL4ClientEndpoint(reactor, host, port, ctx,).connect(factory)
print 'Connecting to ' + host + ':' + str(port)
avatar = yield factory.login(self)
defer.returnValue(Levy(avatar))
开发者ID:SejalChauhan,项目名称:Paradrop,代码行数:25,代码来源:riffle.py
示例3: remote_login
def remote_login(self, client):
# print 'Remote login!'
peerCertificate = Certificate.peerFromTransport(self.broker.transport)
pdid = peerCertificate.getSubject().commonName.decode('utf-8')
avatar, logout = yield self.portal.login(pdid, client)
avatar = pb.AsReferenceable(avatar, "perspective")
# Formerly in _cbLogin, moved here to make the deferred chain cleaner
puid = avatar.processUniqueID()
# only call logout once, whether the connection is dropped (disconnect)
# or a logout occurs (cleanup), and be careful to drop the reference to
# it in either case
logout = [logout]
def maybeLogout():
if not logout:
return
fn = logout[0]
del logout[0]
fn()
self.broker._localCleanup[puid] = maybeLogout
self.broker.notifyOnDisconnect(maybeLogout)
defer.returnValue(avatar)
开发者ID:SejalChauhan,项目名称:Paradrop,代码行数:27,代码来源:riffle.py
示例4: main
def main(self, reactor, options):
certificates_path = FilePath(options["certificates-directory"])
ca = Certificate.loadPEM(
certificates_path.child(b"cluster.crt").getContent())
# This is a hack; from_path should be more
# flexible. https://clusterhq.atlassian.net/browse/FLOC-1865
control_credential = ControlCredential.from_path(
certificates_path, b"service")
top_service = MultiService()
persistence = ConfigurationPersistenceService(
reactor, options["data-path"])
persistence.setServiceParent(top_service)
cluster_state = ClusterStateService(reactor)
cluster_state.setServiceParent(top_service)
api_service = create_api_service(
persistence, cluster_state, serverFromString(
reactor, options["port"]),
rest_api_context_factory(ca, control_credential))
api_service.setServiceParent(top_service)
amp_service = ControlAMPService(
reactor, cluster_state, persistence, serverFromString(
reactor, options["agent-port"]),
amp_server_context_factory(ca, control_credential))
amp_service.setServiceParent(top_service)
return main_for_service(reactor, top_service)
开发者ID:Kaffa-MY,项目名称:flocker,代码行数:26,代码来源:script.py
示例5: get_configuration
def get_configuration(options):
"""
Load and validate the configuration in the file specified by the given
options.
:param DatasetAgentOptions options: The dataset agent options specifying
the location of the configuration.
:return: A ``dict`` representing the configuration loaded from the file.
"""
agent_config = options[u'agent-config']
configuration = yaml.safe_load(agent_config.getContent())
validate_configuration(configuration=configuration)
configuration['control-service'].setdefault('port', 4524)
path = agent_config.parent()
# This is a hack; from_path should be more
# flexible. https://clusterhq.atlassian.net/browse/FLOC-1865
configuration['ca-certificate'] = Certificate.loadPEM(
path.child(b"cluster.crt").getContent())
configuration['node-credential'] = NodeCredential.from_path(path, b"node")
return configuration
开发者ID:WUMUXIAN,项目名称:flocker,代码行数:25,代码来源:script.py
示例6: getCertificate
def getCertificate(self, subject):
log.msg(format='Retreving certificate for %(name)s',
name=subject)
certPath = self.publicPath.child(subject)
if not certPath.exists():
raise CertificateNotFound
cert = Certificate.loadPEM(certPath.getContent())
return defer.succeed(cert)
开发者ID:tomprince,项目名称:deed,代码行数:8,代码来源:authority.py
示例7: main
def main(reactor):
pemBytes = FilePath(b"ca-private-cert.pem").getContent()
certificateAuthority = Certificate.loadPEM(pemBytes)
myCertificate = PrivateCertificate.loadPEM(pemBytes)
serverEndpoint = SSL4ServerEndpoint(
reactor, 4321, myCertificate.options(certificateAuthority)
)
serverEndpoint.listen(Factory.forProtocol(ReportWhichClient))
return Deferred()
开发者ID:damouse,项目名称:pdservertemp,代码行数:9,代码来源:whichclient.py
示例8: getTlsAuthority_
def getTlsAuthority_(self, startTlsCaCert):
if startTlsCaCert is None:
return None
authorities = [str(cert) for cert in pem.parse_file(startTlsCaCert)]
if len(authorities) != 1:
raise Exception(
("The provided CA cert file, '{0}', "
"contained {1} certificates. It must contain exactly one.").format(
startTlsCaCert, len(authorities)))
return Certificate.loadPEM(authorities[0])
开发者ID:cwaldbieser,项目名称:txcas,代码行数:10,代码来源:ldap_cred_checker.py
示例9: open
def open(self, port=None, cert=None):
'''
Listen for connections on the given port.
'''
port = port if port else self.port
cert = cert if cert else self.certCa
ca = Certificate.loadPEM(cert)
myCertificate = PrivateCertificate.loadPEM(cert)
SSL4ServerEndpoint(reactor, port, myCertificate.options(ca)).listen(RiffleServerFactory(self))
开发者ID:SejalChauhan,项目名称:Paradrop,代码行数:11,代码来源:riffle.py
示例10: start_ssl_cmd_server
def start_ssl_cmd_server():
with open(settings["Agent_Cert"], 'r') as certfile:
certdata = certfile.read()
if settings["Agent_Priv_Key"] != settings["Agent_Cert"]:
with open(settings.get("Agent_Priv_Key"), 'r') as keyfile:
certdata += keyfile.read()
with open(settings.get("Broker_Cert"), 'r') as f:
authdata = f.read()
certificate = PrivateCertificate.loadPEM(certdata)
authority = Certificate.loadPEM(authdata)
factory = Factory.forProtocol(CommandHandler)
reactor.listenSSL(int(settings.get("Command_Port")), factory, certificate.options(authority))
开发者ID:caedm,项目名称:cabs,代码行数:12,代码来源:cabsagent.py
示例11: fromFilePath
def fromFilePath(cls, filePath):
privatePath = filePath.child('private')
publicPath = filePath.child('public')
csrPath = filePath.child('csr')
issuerPath = filePath.child('issuer')
if issuerPath.exists():
issuer = issuerPath.getContent()
key = KeyPair.loadPEM(privatePath.child(issuer).getContent())
cert = Certificate.loadPEM(publicPath.child(issuer).getContent())
store = cls(publicPath, privatePath, csrPath, key, cert, issuer)
return store
开发者ID:tomprince,项目名称:deed,代码行数:13,代码来源:authority.py
示例12: start_ssl
def start_ssl(self):
log.debug("Enabling SSL with PKey: %s, Cert: %s", self.pkey, self.cert)
check_ssl_keys()
with open(configmanager.get_config_dir(self.cert)) as cert:
certificate = Certificate.loadPEM(cert.read()).original
with open(configmanager.get_config_dir(self.pkey)) as pkey:
private_key = KeyPair.load(pkey.read(), FILETYPE_PEM).original
options = CertificateOptions(privateKey=private_key, certificate=certificate, method=SSL.SSLv23_METHOD)
options.getContext().set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
self.socket = reactor.listenSSL(self.port, self.site, options)
log.info("Serving on %s:%s view at https://127.0.0.1:%s", "0.0.0.0", self.port, self.port)
开发者ID:Kash-Krishna,项目名称:SharkByte,代码行数:13,代码来源:server.py
示例13: _create_tls_client_context
def _create_tls_client_context(config, cbdir, log):
"""
Create a CertificateOptions object for use with TLS listening endpoints.
"""
# server hostname: The expected name of the remote host.
hostname = config['hostname']
# explicit trust (certificate) root
ca_certs = None
if 'ca_certificates' in config:
log.info("TLS client using explicit trust ({cnt_certs} certificates)", cnt_certs=len(config['ca_certificates']))
ca_certs = []
for cert_fname in [os.path.abspath(os.path.join(cbdir, x)) for x in (config['ca_certificates'])]:
cert = crypto.load_certificate(
crypto.FILETYPE_PEM,
six.u(open(cert_fname, 'r').read())
)
log.info("TLS client trust root CA certificate loaded from '{fname}'", fname=cert_fname)
ca_certs.append(cert)
ca_certs = OpenSSLCertificateAuthorities(ca_certs)
else:
log.info("TLS client using platform trust")
# client key/cert to use
client_cert = None
if 'key' in config:
if 'certificate' not in config:
raise Exception('TLS client key present, but certificate missing')
key_fname = os.path.abspath(os.path.join(cbdir, config['key']))
with open(key_fname, 'r') as f:
private_key = KeyPair.load(f.read(), format=crypto.FILETYPE_PEM)
log.info("Loaded client TLS key from '{key_fname}'", key_fname=key_fname)
cert_fname = os.path.abspath(os.path.join(cbdir, config['certificate']))
with open(cert_fname, 'r') as f:
cert = Certificate.loadPEM(f.read(),)
log.info("Loaded client TLS certificate from '{cert_fname}' (cn='{cert_cn}', sha256={cert_sha256}..)",
cert_fname=cert_fname,
cert_cn=cert.getSubject().CN,
cert_sha256=cert.digest('sha256')[:12])
client_cert = PrivateCertificate.fromCertificateAndKeyPair(cert, private_key)
else:
if 'certificate' in config:
log.warn('TLS client certificate present, but key is missing')
# create TLS client context
ctx = optionsForClientTLS(hostname, trustRoot=ca_certs, clientCertificate=client_cert)
return ctx
开发者ID:FirefighterBlu3,项目名称:crossbar,代码行数:51,代码来源:endpoint.py
示例14: create_agent
def create_agent(ca_cert, client_cert, client_key):
ca_certificate = Certificate.loadPEM(FilePath(ca_cert).getContent())
client_certificate = PrivateCertificate.loadPEM(
FilePath(client_cert).getContent() + b"\n" +
FilePath(client_key).getContent())
customPolicy = BrowserLikePolicyForHTTPSWithClientCertificate(
trustRoot=ca_certificate,
clientCertificate=client_certificate)
pool = HTTPConnectionPool(reactor, persistent=True)
agent = Agent(reactor, customPolicy, pool=pool)
return agent
开发者ID:Seekscale,项目名称:smbproxy,代码行数:14,代码来源:ssl_agent.py
示例15: from_path
def from_path(cls, path):
"""
:param FilePath path: Directory where private key and certificate are
stored.
"""
if not path.isdir():
raise PathError(
b"Path {path} is not a directory.".format(path=path.path)
)
certPath = path.child(certificate_filename)
keyPath = path.child(key_filename)
if not certPath.isfile():
raise PathError(
b"Certificate file {path} does not exist.".format(
path=certPath.path)
)
if not keyPath.isfile():
raise PathError(
b"Private key file {path} does not exist.".format(
path=keyPath.path)
)
try:
certFile = certPath.open()
except IOError:
raise PathError(
(b"Certificate file {path} could not be opened. "
b"Check file permissions.").format(
path=certPath.path)
)
try:
keyFile = keyPath.open()
except IOError:
raise PathError(
(b"Private key file {path} could not be opened. "
b"Check file permissions.").format(
path=keyPath.path)
)
certificate = Certificate.load(
certFile.read(), format=crypto.FILETYPE_PEM)
keypair = FlockerKeyPair(
keypair=KeyPair.load(keyFile.read(), format=crypto.FILETYPE_PEM)
)
return cls(path=path, certificate=certificate, keypair=keypair)
开发者ID:ALSEDLAH,项目名称:flocker,代码行数:50,代码来源:_ca.py
示例16: setUp
def setUp(self):
description = yield self._httpbin_process.server_description(
reactor)
self.baseurl = URL(scheme=u"https",
host=description.host,
port=description.port).asText()
root = trustRootFromCertificates(
[Certificate.loadPEM(description.cacert)],
)
self.agent = Agent(
reactor,
contextFactory=BrowserLikePolicyForHTTPS(root),
)
self.pool = HTTPConnectionPool(reactor, False)
开发者ID:jameshilliard,项目名称:treq,代码行数:16,代码来源:test_treq_integration.py
示例17: certsFromBundle
def certsFromBundle(path, x509=False):
PEM_RE = re.compile(
"-----BEGIN CERTIFICATE-----\r?.+?\r?"
"-----END CERTIFICATE-----\r?\n?""",
re.DOTALL)
if not os.path.isfile(path):
log.warn("Attempted to load non-existent certificate bundle path %s"
% path)
return []
pems = FilePath(path).getContent()
cstr = [match.group(0) for match in PEM_RE.finditer(pems)]
certs = [Certificate.loadPEM(cert) for cert in cstr]
if x509:
certs = [cert.original for cert in certs]
return certs
开发者ID:leapcode,项目名称:leap_pycommon,代码行数:16,代码来源:http.py
示例18: test_chainCerts
def test_chainCerts(self):
"""
L{chainCerts} loads all but the first cert in a file.
"""
data = FilePath(__file__).sibling('data').child('certs')
cert1 = data.child('cert1.pem').getContent()
cert2 = data.child('cert2.pem').getContent()
cert3 = data.child('cert3.pem').getContent()
expected = [
Certificate.loadPEM(cert) for cert in [cert2, cert3]]
chain = chainCerts(cert1 + '\n' + cert2 + '\n' + cert3)
self.assertEqual(len(chain), 2)
self.assertEqual(
chain[0].digest('sha256'), expected[0].digest('sha256'))
self.assertEqual(
chain[1].digest('sha256'), expected[1].digest('sha256'))
开发者ID:fusionapp,项目名称:fusion-util,代码行数:16,代码来源:test_cert.py
示例19: start_ssl
def start_ssl(self):
check_ssl_keys()
log.debug('Enabling SSL with PKey: %s, Cert: %s', self.pkey, self.cert)
with open(configmanager.get_config_dir(self.cert)) as cert:
certificate = Certificate.loadPEM(cert.read()).original
with open(configmanager.get_config_dir(self.pkey)) as pkey:
private_key = KeyPair.load(pkey.read(), FILETYPE_PEM).original
options = CertificateOptions(privateKey=private_key, certificate=certificate, method=SSL.SSLv23_METHOD)
ctx = options.getContext()
ctx.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
ctx.use_certificate_chain_file(configmanager.get_config_dir(self.cert))
self.socket = reactor.listenSSL(self.port, self.site, options, interface=self.interface)
ip = self.socket.getHost().host
ip = '[%s]' % ip if is_ipv6(ip) else ip
log.info('Serving at https://%s:%s%s', ip, self.port, self.base)
开发者ID:deluge-torrent,项目名称:deluge,代码行数:17,代码来源:server.py
示例20: startTLS
def startTLS(self, certificate, *verifyAuthorities):
if self.hostCertificate is None:
self.hostCertificate = certificate
self._justStartedTLS = True
self.transport.startTLS(certificate.options(*verifyAuthorities))
stlsb = self._startingTLSBuffer
if stlsb is not None:
self._startingTLSBuffer = None
for box in stlsb:
self.sendPacket(box)
else:
raise RuntimeError(
"Previously authenticated connection between %s and %s "
"is trying to re-establish as %s" % (
self.hostCertificate,
Certificate.peerFromTransport(self.transport),
(certificate, verifyAuthorities)))
开发者ID:perkinslr,项目名称:epsilon-py3,代码行数:17,代码来源:juice.py
注:本文中的twisted.internet.ssl.Certificate类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论