本文整理汇总了Python中nacl.signing.SigningKey类的典型用法代码示例。如果您正苦于以下问题:Python SigningKey类的具体用法?Python SigningKey怎么用?Python SigningKey使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SigningKey类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: decrypt_message
def decrypt_message(encrypted_message, keying_material, private_key):
encrypted_key = keying_material['encrypted_key']
message_signature = keying_material['message_signature']
temp_public_key = PublicKey(
keying_material['temp_public_key'],
Base64Encoder)
box = PublicBox(private_key, temp_public_key)
message_key = box.decrypt(
encrypted_key,
encoder=Base64Encoder)
# We got the key, so let's get the message.
message_box = nacl.secret.SecretBox(message_key)
message = message_box.decrypt(
encrypted_message,
encoder=Base64Encoder)
# Check the signature.
mac_key = sha256(box._shared_key, RawEncoder)
signing_key = SigningKey(mac_key)
if signing_key.sign(message, Base64Encoder) != message_signature:
raise SignatureError("The signature is not valid")
return message
开发者ID:mozilla-services,项目名称:messaging,代码行数:26,代码来源:crypto.py
示例2: test_message_signing
def test_message_signing(self, seed, message, signature, expected):
signing_key = SigningKey(seed, encoder=HexEncoder)
signed = signing_key.sign(binascii.unhexlify(message), encoder=HexEncoder)
assert signed == expected
assert signed.message == message
assert signed.signature == signature
开发者ID:fun-alex-alex2006hw,项目名称:pynacl,代码行数:7,代码来源:test_signing.py
示例3: add_new_channel
def add_new_channel(self, nA, nB):
a_signkey = SigningKey.generate()
a_chankey = PrivateKey.generate()
a_CIDkey = os.urandom(32)
a_transports = nA.agent.individualize_transports(nA.agent.get_transports())
b_signkey = SigningKey.generate()
b_chankey = PrivateKey.generate()
b_CIDkey = os.urandom(32)
b_transports = nB.agent.individualize_transports(nB.agent.get_transports())
a_rec = { "channel_pubkey": a_chankey.public_key.encode().encode("hex"),
"CID_key": a_CIDkey.encode("hex"),
"transports": a_transports.values(),
}
b_rec = { "channel_pubkey": b_chankey.public_key.encode().encode("hex"),
"CID_key": b_CIDkey.encode("hex"),
"transports": b_transports.values(),
}
q = ("INSERT INTO addressbook"
" (petname, acked, next_outbound_seqnum,"
" my_signkey,"
" their_channel_record_json,"
" my_CID_key, next_CID_token,"
" highest_inbound_seqnum,"
" my_old_channel_privkey, my_new_channel_privkey,"
" they_used_new_channel_key, their_verfkey)"
" VALUES(?,?,?,?,?,?,?,?,?,?,?,?)")
vA=("petname-from-A", 1, 1,
a_signkey.encode().encode("hex"),
json.dumps(b_rec),
a_CIDkey.encode("hex"), None,
0,
a_chankey.encode().encode("hex"),
a_chankey.encode().encode("hex"),
0, b_signkey.verify_key.encode().encode("hex"),
)
vB=("petname-from-A", 1, 1,
b_signkey.encode().encode("hex"),
json.dumps(a_rec),
b_CIDkey.encode("hex"), None,
0,
b_chankey.encode().encode("hex"),
b_chankey.encode().encode("hex"),
0, a_signkey.verify_key.encode().encode("hex"),
)
nA.db.execute(q, vA)
nA.db.commit()
nB.db.execute(q, vB)
nA.db.commit()
entA = nA.db.execute("SELECT * FROM addressbook").fetchone()
entB = nB.db.execute("SELECT * FROM addressbook").fetchone()
return entA, entB
开发者ID:warner,项目名称:petmail,代码行数:59,代码来源:common.py
示例4: test_key_conversion
def test_key_conversion(self):
keypair_seed = (b"421151a459faeade3d247115f94aedae"
b"42318124095afabe4d1451a559faedee")
signing_key = SigningKey(binascii.unhexlify(keypair_seed))
verify_key = signing_key.verify_key
private_key = bytes(signing_key.to_curve25519_private_key())
public_key = bytes(verify_key.to_curve25519_public_key())
assert tohex(private_key) == ("8052030376d47112be7f73ed7a019293"
"dd12ad910b654455798b4667d73de166")
assert tohex(public_key) == ("f1814f0e8ff1043d8a44d25babff3ced"
"cae6c22c3edaa48f857ae70de2baae50")
开发者ID:lmctv,项目名称:pynacl,代码行数:14,代码来源:test_signing.py
示例5: sign
def sign(self, message, private_key):
"""
Sign the message.
This method will take the currently configured values for the message
prefix and suffix and create a signature using the provided Ed25519 private key.
Args:
message (bytes): message to be signed
private_key (bytes) Ed25519 private key
"""
sk = SigningKey(private_key)
self.public_key = sk.verify_key.encode()
self.signature = sk.sign(message).signature
return self.signature
开发者ID:bigchaindb,项目名称:cryptoconditions,代码行数:16,代码来源:ed25519.py
示例6: __enter__
def __enter__(self) -> typing.Tuple[PrivateKey, PrivateKey]:
"""
Provides a pair of private keys.
"""
# Derive the key from the passphrase.
derived = util.derive_passphrase(self.passphrase)
sign_box = SecretBox(derived)
enc_box = SecretBox(derived)
# Decrypt, using the two nonces.
s_d = sign_box.decrypt(self.key._private_signing_seed, self.key._private_signing_nonce)
e_d = enc_box.decrypt(self.key._private_key_raw, self.key._private_nonce)
# Generate a SigningKey out of the seed.
self.sign = SigningKey(s_d)
self.encrypt = PrivateKey(e_d)
# Update the key's public keys.
if self.key._public_key is None:
self.key._public_key = self.encrypt.public_key
if self.key._public_signing_key is None:
self.key._public_signing_key = self.sign.verify_key
return self.encrypt, self.sign
开发者ID:SunDwarf,项目名称:Gluino,代码行数:26,代码来源:private.py
示例7: test_verify_with_prefix
def test_verify_with_prefix(self):
sk = SigningKey.generate()
vk = sk.verify_key
m = "body"
prefix = "prefix:"
sk2 = SigningKey.generate()
sm1 = sk.sign(prefix+m)
sm2 = sk.sign("not the prefix"+m)
sm3 = sk2.sign(prefix+m)
self.failUnlessEqual(util.verify_with_prefix(vk, sm1, prefix), m)
self.failUnlessRaises(errors.BadSignatureError,
util.verify_with_prefix, vk, sm2, prefix)
self.failUnlessRaises(CryptoError,
util.verify_with_prefix, vk, sm3, prefix)
开发者ID:ggozad,项目名称:petmail,代码行数:16,代码来源:test_util.py
示例8: test_wrong_types
def test_wrong_types():
sk = SigningKey.generate()
check_type_error("SigningKey must be created from a 32 byte seed", SigningKey, 12)
check_type_error("SigningKey must be created from a 32 byte seed", SigningKey, sk)
check_type_error("SigningKey must be created from a 32 byte seed", SigningKey, sk.verify_key)
check_type_error("VerifyKey must be created from 32 bytes", VerifyKey, 13)
check_type_error("VerifyKey must be created from 32 bytes", VerifyKey, sk)
check_type_error("VerifyKey must be created from 32 bytes", VerifyKey, sk.verify_key)
开发者ID:fun-alex-alex2006hw,项目名称:pynacl,代码行数:10,代码来源:test_signing.py
示例9: encrypt_message
def encrypt_message(message, identifier):
"""Encrypts a message so that it can be read by all the devices of the
given identifier.
:param message: the message itself, in clear text.
:param identifier: the identifier of the message recipient.
"""
# Cipher the message with a brand new random key.
message_key = nacl.utils.random(nacl.secret.SecretBox.KEY_SIZE)
message_box = nacl.secret.SecretBox(message_key)
message_nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
encrypted_message = message_box.encrypt(message, message_nonce,
Base64Encoder)
returned = []
# Encrypt the message key with each recipient's public key.
for recipient_device_pub_key in get_public_keys(identifier):
# Generate a new keypair & cipher the key with it.
temp_private_key, temp_public_key = generate_keypair()
box = PublicBox(temp_private_key, recipient_device_pub_key)
nonce = nacl.utils.random(PublicBox.NONCE_SIZE)
encrypted_key = box.encrypt(message_key, nonce, Base64Encoder)
# Sign the message.
mac_key = sha256(box._shared_key, RawEncoder)
signing_key = SigningKey(mac_key)
message_sig = signing_key.sign(message, Base64Encoder)
# Return the public key used to cipher the message key.
returned.append({
'encrypted_key': encrypted_key,
'temp_public_key': temp_public_key.encode(Base64Encoder),
'message_signature': message_sig
})
return {
'encrypted_message': encrypted_message,
'recipients': returned
}
开发者ID:mozilla-services,项目名称:messaging,代码行数:40,代码来源:crypto.py
示例10: test_invalid_signed_message
def test_invalid_signed_message(self):
skey = SigningKey.generate()
smessage = skey.sign(b"A Test Message!")
signature, message = smessage.signature, b"A Forged Test Message!"
# Small sanity check
assert skey.verify_key.verify(smessage)
with pytest.raises(BadSignatureError):
skey.verify_key.verify(message, signature)
with pytest.raises(BadSignatureError):
forged = SignedMessage(signature + message)
skey.verify_key.verify(forged)
开发者ID:lmctv,项目名称:pynacl,代码行数:14,代码来源:test_signing.py
示例11: test_base64_smessage_with_detached_sig_matches_with_attached_sig
def test_base64_smessage_with_detached_sig_matches_with_attached_sig(self):
sk = SigningKey.generate()
vk = sk.verify_key
smsg = sk.sign(b"Hello World in base64", encoder=Base64Encoder)
msg = smsg.message
b64sig = smsg.signature
sig = Base64Encoder.decode(b64sig)
assert vk.verify(msg, sig, encoder=Base64Encoder) == \
vk.verify(smsg, encoder=Base64Encoder)
assert Base64Encoder.decode(msg) == b"Hello World in base64"
开发者ID:lmctv,项目名称:pynacl,代码行数:15,代码来源:test_signing.py
示例12: test_hex_smessage_with_detached_sig_matches_with_attached_sig
def test_hex_smessage_with_detached_sig_matches_with_attached_sig(self):
sk = SigningKey.generate()
vk = sk.verify_key
smsg = sk.sign(b"Hello World in hex", encoder=HexEncoder)
msg = smsg.message
hexsig = smsg.signature
sig = HexEncoder.decode(hexsig)
assert vk.verify(msg, sig, encoder=HexEncoder) == \
vk.verify(smsg, encoder=HexEncoder)
assert HexEncoder.decode(msg) == b"Hello World in hex"
开发者ID:lmctv,项目名称:pynacl,代码行数:15,代码来源:test_signing.py
示例13: startInvitation
def startInvitation(self, petname, code, transports):
#print "invite", petname, code.encode("hex")
stretched = stretch(code)
inviteKey = SigningKey(stretched)
inviteID = inviteKey.verify_key.encode(Hex)
mySigningKey = SigningKey.generate()
myCIDkey = os.urandom(32)
myTempPrivkey = PrivateKey.generate()
# create my channel record
tids = ",".join([str(tid) for tid in sorted(transports.keys())])
channel_key = PrivateKey.generate()
pub_crec = { "channel_pubkey": channel_key.public_key.encode(Hex),
"CID_key": myCIDkey.encode("hex"),
"transports": transports.values(),
}
priv_data = { "my_signkey": mySigningKey.encode(Hex),
"my_CID_key": myCIDkey.encode("hex"),
"my_old_channel_privkey": channel_key.encode(Hex),
"my_new_channel_privkey": channel_key.encode(Hex),
"transport_ids": tids,
}
db = self.db
c = db.execute("SELECT inviteID FROM invitations")
if inviteID in [str(row[0]) for row in c.fetchall()]:
raise CommandError("invitation code already in use")
iid = db.insert("INSERT INTO `invitations`"
" (code, petname, inviteKey,"
" inviteID,"
" myTempPrivkey, mySigningKey,"
" my_channel_record, my_private_channel_data,"
" myMessages, theirMessages, nextExpectedMessage)"
" VALUES (?,?,?, ?, ?,?, ?,?, ?,?,?)",
(code.encode("hex"), petname, stretched.encode("hex"),
inviteID,
myTempPrivkey.encode(Hex), mySigningKey.encode(Hex),
json.dumps(pub_crec), json.dumps(priv_data),
"", "", 1),
"invitations")
self.subscribe(inviteID)
i = Invitation(iid, self.db, self)
i.sendFirstMessage()
self.db.commit()
开发者ID:ggozad,项目名称:petmail,代码行数:44,代码来源:invitation.py
示例14: __init__
def __init__(self, iid, db, manager):
self.iid = iid
self.db = db
self.manager = manager
c = self.db.execute("SELECT petname, inviteID, inviteKey," # 0,1,2
" theirTempPubkey," # 3
" nextExpectedMessage," # 4
" myMessages," # 5
" theirMessages" # 6
" FROM invitations WHERE id = ?", (iid,))
res = c.fetchone()
if not res:
raise KeyError("no pending Invitation for '%d'" % iid)
self.petname = res[0]
self.inviteID = res[1]
self.inviteKey = SigningKey(res[2].decode("hex"))
self.theirTempPubkey = None
if res[3]:
self.theirTempPubkey = PublicKey(res[3].decode("hex"))
self.nextExpectedMessage = int(res[4])
self.myMessages = splitMessages(res[5])
self.theirMessages = splitMessages(res[6])
开发者ID:ggozad,项目名称:petmail,代码行数:22,代码来源:invitation.py
示例15: tick
class Invitation:
# This has a brief lifetime: one is created in response to the rendezvous
# client discovering new messages for us, used for one reactor tick, then
# dereferenced. It holds onto a few values during that tick (which may
# process multiple messages for a single invitation, e.g. A's second poll
# will receive both B-m1 and B-m2 together). But all persistent state
# beyond that one tick is stored in the database.
def __init__(self, iid, db, manager):
self.iid = iid
self.db = db
self.manager = manager
c = self.db.execute("SELECT petname, inviteID, inviteKey," # 0,1,2
" theirTempPubkey," # 3
" nextExpectedMessage," # 4
" myMessages," # 5
" theirMessages" # 6
" FROM invitations WHERE id = ?", (iid,))
res = c.fetchone()
if not res:
raise KeyError("no pending Invitation for '%d'" % iid)
self.petname = res[0]
self.inviteID = res[1]
self.inviteKey = SigningKey(res[2].decode("hex"))
self.theirTempPubkey = None
if res[3]:
self.theirTempPubkey = PublicKey(res[3].decode("hex"))
self.nextExpectedMessage = int(res[4])
self.myMessages = splitMessages(res[5])
self.theirMessages = splitMessages(res[6])
def getAddressbookID(self):
c = self.db.execute("SELECT addressbook_id FROM invitations"
" WHERE id = ?", (self.iid,))
return c.fetchone()[0]
def getMyTempPrivkey(self):
c = self.db.execute("SELECT myTempPrivkey FROM invitations"
" WHERE id = ?", (self.iid,))
return PrivateKey(c.fetchone()[0].decode("hex"))
def getMySigningKey(self):
c = self.db.execute("SELECT mySigningKey FROM invitations"
" WHERE id = ?", (self.iid,))
return SigningKey(c.fetchone()[0].decode("hex"))
def getMyPublicChannelRecord(self):
c = self.db.execute("SELECT my_channel_record FROM invitations"
" WHERE id = ?", (self.iid,))
return c.fetchone()[0]
def getMyPrivateChannelData(self):
c = self.db.execute("SELECT my_private_channel_data FROM invitations"
" WHERE id = ?", (self.iid,))
return json.loads(c.fetchone()[0])
def sendFirstMessage(self):
pub = self.getMyTempPrivkey().public_key.encode()
self.send("i0:m1:"+pub)
self.db.update("UPDATE invitations SET myMessages=? WHERE id=?",
(",".join(self.myMessages), self.iid),
"invitations", self.iid)
# that will be commited by our caller
def processMessages(self, messages):
# These messages are neither version-checked nor signature-checked.
# Also, we may have already processed some of them.
#print "processMessages", messages
#print " my", self.myMessages
#print " theirs", self.theirMessages
assert isinstance(messages, set), type(messages)
assert None not in messages, messages
assert None not in self.myMessages, self.myMessages
assert None not in self.theirMessages, self.theirMessages
# Send anything that didn't make it to the server. This covers the
# case where we commit our outbound message in send() but crash
# before finishing delivery.
for m in self.myMessages - messages:
#print "resending", m
self.manager.sendToAll(self.inviteID, m)
newMessages = messages - self.myMessages - self.theirMessages
#print " %d new messages" % len(newMessages)
if not newMessages:
print " huh, no new messages, stupid rendezvous client"
# check signatures, extract bodies. invalid messages kill the channel
# and the invitation. MAYBE TODO: lose the one channel, keep using
# the others.
bodies = set()
for m in newMessages:
#print " new inbound message", m
try:
if not m.startswith("r0:"):
print "unrecognized rendezvous message prefix"
if not VALID_MESSAGE.search(m):
raise CorruptChannelError()
m = m[len("r0:"):].decode("hex")
bodies.add(self.inviteKey.verify_key.verify(m))
except (BadSignatureError, CorruptChannelError) as e:
#.........这里部分代码省略.........
开发者ID:ggozad,项目名称:petmail,代码行数:101,代码来源:invitation.py
示例16: init
def init(server, username, keydir, action, message, recipients):
""" SHSM CLI client. """
global serverurl
serverurl = server
if action == "register":
master_signing_key = SigningKey.generate()
device_signing_key = SigningKey.generate()
device_private_key = PrivateKey.generate()
enc_master_verify_key = master_signing_key.verify_key.encode(encoder=HexEncoder)
register(username, enc_master_verify_key)
# TODO: make sure keydir exists
save_key(master_signing_key.encode(encoder=HexEncoder), keydir + "/master_signing_key")
save_key(device_signing_key.encode(encoder=HexEncoder), keydir + "/device_signing_key")
save_key(device_private_key.encode(encoder=HexEncoder), keydir + "/device_private_key")
else:
try:
master_signing_key = SigningKey(load_key(keydir + "/master_signing_key"), encoder=HexEncoder)
device_signing_key = SigningKey(load_key(keydir + "/device_signing_key"), encoder=HexEncoder)
device_private_key = PrivateKey(load_key(keydir + "/device_private_key"), encoder=HexEncoder)
except TypeError:
print "bad key, exiting."
exit()
if action == "add-device":
enc_device_verify_key = device_signing_key.verify_key.encode(encoder=HexEncoder)
enc_signed_device_verify_key = b64encode(master_signing_key.sign(enc_device_verify_key))
enc_device_public_key = device_private_key.public_key.encode(encoder=HexEncoder)
enc_signed_device_public_key = b64encode(master_signing_key.sign(enc_device_public_key))
add_device(username, enc_signed_device_verify_key, enc_signed_device_public_key)
if action == "send-message":
ephemeral_key = PrivateKey.generate()
enc_ephemeral_public_key = b64encode(
device_signing_key.sign(ephemeral_key.public_key.encode(encoder=HexEncoder))
)
# TODO:: should sign binary text, no? b"bob"
destination_usernames = recipients.split(",")
enc_dest_usernames = b64encode(
device_signing_key.sign(json.dumps({"destination_usernames": destination_usernames}))
)
symmetric_key = random(SecretBox.KEY_SIZE)
symmetric_box = SecretBox(symmetric_key)
nonce = random(SecretBox.NONCE_SIZE)
msg_manifest = {}
msg_manifest["recipients"] = {}
msg_manifest["msg"] = b64encode(symmetric_box.encrypt(str(message), nonce))
for dest_user in destination_usernames:
msg_manifest["recipients"][dest_user] = {}
for recipient_key in get_recipient_keys(
device_signing_key.verify_key.encode(encoder=HexEncoder),
b64encode(device_signing_key.sign(str(dest_user))),
):
# TODO:: should sign binary text, no?
crypt_box = Box(ephemeral_key, recipient_key)
nonce = random(Box.NONCE_SIZE)
crypt_key = b64encode(crypt_box.encrypt(symmetric_key, nonce))
dest_key = recipient_key.encode(encoder=HexEncoder)
msg_manifest["recipients"][dest_user][dest_key] = crypt_key
enc_signed_crypt_msg = b64encode(device_signing_key.sign(json.dumps(msg_manifest)))
send_message(
device_signing_key.verify_key.encode(encoder=HexEncoder),
enc_dest_usernames,
enc_signed_crypt_msg,
enc_ephemeral_public_key,
)
if action == "get-messages":
enc_device_verify_key = device_signing_key.verify_key.encode(encoder=HexEncoder)
enc_signed_device_verify_key = b64encode(device_signing_key.sign(enc_device_verify_key))
messages = get_messages(enc_device_verify_key, enc_signed_device_verify_key)
for message_public_key in messages["messages"].keys():
try:
crypto_box = Box(device_private_key, PublicKey(b64decode(message_public_key), encoder=HexEncoder))
except TypeError:
print "not a valid public key"
exit()
packed_msg = json.loads(messages["messages"][message_public_key])
msg_manifest = json.loads(b64decode(packed_msg["message_manifest"]))
dest_pub_key = device_private_key.public_key.encode(encoder=HexEncoder)
symmetric_key = crypto_box.decrypt(b64decode(msg_manifest["recipients"][username][dest_pub_key]))
symmetric_box = SecretBox(symmetric_key)
print ("From: %s\nMessage: %s") % (
#.........这里部分代码省略.........
开发者ID:sinner-,项目名称:shsmc,代码行数:101,代码来源:shsmc.py
示例17: PrivateKeyProvider
class PrivateKeyProvider(object):
"""
Decrypts private keys.
"""
def __init__(self, key, passphrase: bytes):
self.key = key
# Secure!
self.passphrase = passphrase
self.sign = None
self.encrypt = None
def __enter__(self) -> typing.Tuple[PrivateKey, PrivateKey]:
"""
Provides a pair of private keys.
"""
# Derive the key from the passphrase.
derived = util.derive_passphrase(self.passphrase)
sign_box = SecretBox(derived)
enc_box = SecretBox(derived)
# Decrypt, using the two nonces.
s_d = sign_box.decrypt(self.key._private_signing_seed, self.key._private_signing_nonce)
e_d = enc_box.decrypt(self.key._private_key_raw, self.key._private_nonce)
# Generate a SigningKey out of the seed.
self.sign = SigningKey(s_d)
self.encrypt = PrivateKey(e_d)
# Update the key's public keys.
if self.key._public_key is None:
self.key._public_key = self.encrypt.public_key
if self.key._public_signing_key is None:
self.key._public_signing_key = self.sign.verify_key
return self.encrypt, self.sign
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Re-encrypt.
"""
# Derive the key from the passphrase.
derived = util.derive_passphrase(self.passphrase)
# Generate two random nonces.
nonce1 = random(SecretBox.NONCE_SIZE)
nonce2 = random(SecretBox.NONCE_SIZE)
sign_box = SecretBox(derived)
enc_box = SecretBox(derived)
s_p = self.sign.encode()
e_p = self.encrypt.encode()
s_e = sign_box.encrypt(s_p, nonce1)
e_e = enc_box.encrypt(e_p, nonce2)
# Update `self.key`.
self.key._private_key_raw = e_e.ciphertext
self.key._private_signing_key_raw = s_e.ciphertext
# Bit of a mixed up name.
self.key._private_nonce = e_e.nonce
self.key._private_signing_nonce = s_e.nonce
if exc_type is not None:
raise exc_type(exc_val)
开发者ID:SunDwarf,项目名称:Gluino,代码行数:70,代码来源:private.py
示例18: maybe_generate_key
def maybe_generate_key(self, cbdir, privkey_path=u'key.priv', pubkey_path=u'key.pub'):
privkey_path = os.path.join(cbdir, privkey_path)
pubkey_path = os.path.join(cbdir, pubkey_path)
if os.path.exists(privkey_path):
# node private key seems to exist already .. check!
priv_tags = _parse_keyfile(privkey_path, private=True)
for tag in [u'creator', u'created-at', u'machine-id', u'public-key-ed25519', u'private-key-ed25519']:
if tag not in priv_tags:
raise Exception("Corrupt node private key file {} - {} tag not found".format(privkey_path, tag))
privkey_hex = priv_tags[u'private-key-ed25519']
privkey = SigningKey(privkey_hex, encoder=HexEncoder)
pubkey = privkey.verify_key
pubkey_hex = pubkey.encode(encoder=HexEncoder).decode('ascii')
if priv_tags[u'public-key-ed25519'] != pubkey_hex:
raise Exception(
("Inconsistent node private key file {} - public-key-ed25519 doesn't"
" correspond to private-key-ed25519").format(pubkey_path)
)
if os.path.exists(pubkey_path):
pub_tags = _parse_keyfile(pubkey_path, private=False)
for tag in [u'creator', u'created-at', u'machine-id', u'public-key-ed25519']:
if tag not in pub_tags:
raise Exception("Corrupt node public key file {} - {} tag not found".format(pubkey_path, tag))
if pub_tags[u'public-key-ed25519'] != pubkey_hex:
raise Exception(
("Inconsistent node public key file {} - public-key-ed25519 doesn't"
" correspond to private-key-ed25519").format(pubkey_path)
)
else:
self.log.info(
"Node public key file {pub_path} not found - re-creating from node private key file {priv_path}",
pub_path=pubkey_path,
priv_path=privkey_path,
)
pub_tags = OrderedDict([
(u'creator', priv_tags[u'creator']),
(u'created-at', priv_tags[u'created-at']),
(u'machine-id', priv_tags[u'machine-id']),
(u'public-key-ed25519', pubkey_hex),
])
msg = u'Crossbar.io node public key\n\n'
_write_node_key(pubkey_path, pub_tags, msg)
self.log.debug("Node key already exists (public key: {hex})", hex=pubkey_hex)
else:
# node private key does not yet exist: generate one
privkey = SigningKey.generate()
privkey_hex = privkey.encode(encoder=HexEncoder).decode('ascii')
pubkey = privkey.verify_key
pubkey_hex = pubkey.encode(encoder=HexEncoder).decode('ascii')
# first, write the public file
tags = OrderedDict([
(u'creator', _creator()),
(u'created-at', utcnow()),
(u'machine-id', _machine_id()),
(u'public-key-ed25519', pubkey_hex),
])
msg = u'Crossbar.io node public key\n\n'
_write_node_key(pubkey_path, tags, msg)
# now, add the private key and write the private file
tags[u'private-key-ed25519'] = privkey_hex
msg = u'Crossbar.io node private key - KEEP THIS SAFE!\n\n'
_write_node_key(privkey_path, tags, msg)
self.log.info("New node key pair generated!")
# fix file permissions on node public/private key files
# note: we use decimals instead of octals as octal literals have changed between Py2/3
#
if os.stat(pubkey_path).st_mode & 511 != 420: # 420 (decimal) == 0644 (octal)
os.chmod(pubkey_path, 420)
self.log.info("File permissions on node public key fixed!")
if os.stat(privkey_path).st_mode & 511 != 384: # 384 (decimal) == 0600 (octal)
os.chmod(privkey_path, 384)
self.log.info("File permissions on node private key fixed!")
self._node_key = cryptosign.SigningKey(privkey)
return pubkey_hex
开发者ID:NinjaMSP,项目名称:crossbar,代码行数:92,代码来源:node.py
示例19: _prepare_node_keys
def _prepare_node_keys(self):
from nacl.signing import SigningKey
from nacl.encoding import HexEncoder
# make sure CBDIR/.cdc exists
#
cdc_dir = os.path.join(self._cbdir, '.cdc')
if os.path.isdir(cdc_dir):
pass
elif os.path.exists(cdc_dir):
raise Exception(".cdc exists, but isn't a directory")
else:
os.mkdir(cdc_dir)
self.log.info("CDC directory created")
# load node ID, either from .cdc/node.id or from CDC_NODE_ID
#
def split_nid(nid_s):
nid_c = nid_s.strip().split('@')
if len(nid_c) != 2:
raise Exception("illegal node principal '{}' - must follow the form <node id>@<management realm>".format(nid_s))
node_id, realm = nid_c
# FIXME: regex check node_id and realm
return node_id, realm
nid_file = os.path.join(cdc_dir, 'node.id')
node_id, realm = None, None
if os.path.isfile(nid_file):
with open(nid_file, 'r') as f:
node_id, realm = split_nid(f.read())
elif os.path.exists(nid_file):
raise Exception("{} exists, but isn't a file".format(nid_file))
else:
if 'CDC_NODE_ID' in os.environ:
node_id, realm = split_nid(os.environ['CDC_NODE_ID'])
else:
raise Exception("Neither node ID file {} exists nor CDC_NODE_ID environment variable set".format(nid_file))
# Load the node key, either from .cdc/node.key or from CDC_NODE_KEY.
# The node key is a Ed25519 key in either raw format (32 bytes) or in
# hex-encoded form (64 characters).
#
# Actually, what's loaded is not the secret Ed25519 key, but the _seed_
# for that key. Private keys are derived from this 32-byte (256-bit)
# random seed value. It is thus the seed value which is sensitive and
# must be protected.
#
skey_file = os.path.join(cdc_dir, 'node.key')
skey = None
if os.path.isfile(skey_file):
# FIXME: check file permissions are 0600!
# This value is read in here.
skey_len = os.path.getsize(skey_file)
if skey_len in (32, 64):
with open(skey_file, 'r') as f:
skey_seed = f.read()
encoder = None
if skey_len == 64:
encoder = HexEncoder
skey = SigningKey(skey_seed, encoder=encoder)
self.log.info("Existing CDC node key loaded from {skey_file}.", skey_file=skey_file)
else:
raise Exception("invalid node key length {} (key must either be 32 raw bytes or hex encoded 32 bytes, hence 64 byte char length)")
elif os.path.exists(skey_file):
raise Exception("{} exists, but isn't a file".format(skey_file))
else:
skey = SigningKey.generate()
skey_seed = skey.encode(encoder=HexEncoder)
with open(skey_file, 'w') as f:
f.write(skey_seed)
# set file mode to read only for owner
# 384 (decimal) == 0600 (octal) - we use that for Py2/3 reasons
os.chmod(skey_file, 384)
self.log.info("New CDC node key {skey_file} generated.", skey_file=skey_file)
return realm, node_id, skey
开发者ID:FirefighterBlu3,项目名称:crossbar,代码行数:78,代码来源:node.py
示例20: maybe_generate_key
def maybe_generate_key(log, cbdir, privkey_path=u'key.priv', pubkey_path=u'key.pub'):
if not HAS_NACL:
log.warn("Skipping node key generation - NaCl package not installed!")
return
from nacl.signing import SigningKey
from nacl.encoding import HexEncoder
privkey = None
pubkey = None
privkey_path = os.path.join(cbdir, privkey_path)
pubkey_path = os.path.join(cbdir, pubkey_path)
if os.path.exists(privkey_path):
# node private key seems to exist already .. check!
tags = _parse_keyfile(privkey_path, private=True)
if u'private-key-ed25519' not in tags:
raise Exception("Node private key file lacks a 'private-key-ed25519' tag!")
privkey = tags[u'private-key-ed25519']
# recreate a signing key from the base64 encoding
privkey_obj = SigningKey(privkey, encoder=HexEncoder)
pubkey = privkey_obj.verify_key.encode(encoder=HexEncoder).decode('ascii')
# confirm we have the public key in the file, and that it is
# correct
if u'public-key-ed25519' in tags:
if tags[u'public-key-ed25519'] != pubkey:
raise Exception(
("Inconsistent key file '{}': 'public-key-ed25519' doesn't"
" correspond to private-key-ed25519").format(privkey_path)
)
log.debug("Node key already exists (public key: {})".format(pubkey))
if os.path.exists(pubkey_path):
pubtags = _parse_keyfile(pubkey_path, private=False)
if u'public-key-ed25519' not in pubtags:
raise Exception(
("Pubkey file '{}' exists but lacks 'public-key-ed25519'"
" tag").format(pubkey_path)
)
if pubtags[u'public-key-ed25519'] != pubkey:
raise Exception(
("Inconsistent key file '{}': 'public-key-ed25519' doesn't"
" correspond to private-key-ed25519").format(pubkey_path)
)
else:
log.info("'{}' not found; re-creating from '{}'".format(pubkey_path, privkey_path))
tags = OrderedDict([
(u'creator', _creator()),
(u'created-at', utcnow()),
(u'machine-id', _machine_id()),
(u'public-key-ed25519', pubkey),
|
请发表评论