本文整理汇总了Python中test_framework.util.assert_equal函数的典型用法代码示例。如果您正苦于以下问题:Python assert_equal函数的具体用法?Python assert_equal怎么用?Python assert_equal使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了assert_equal函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: run_test
def run_test(self):
# Have every node except last import its block signing private key.
for i in range(self.num_keys):
self.nodes[i].importprivkey(self.wifs[i])
self.check_height(0)
# mine a block with no transactions
print("Mining and signing 101 blocks to unlock funds")
self.mine_blocks(101, False)
# mine blocks with transactions
print("Mining and signing non-empty blocks")
self.mine_blocks(10, True)
# Height check also makes sure non-signing, p2p connected node gets block
self.check_height(111)
# signblock rpc field stuff
tip = self.nodes[0].getblockhash(self.nodes[0].getblockcount())
header = self.nodes[0].getblockheader(tip)
block = self.nodes[0].getblock(tip)
info = self.nodes[0].getblockchaininfo()
assert('signblock_witness_asm' in header)
assert('signblock_witness_hex' in header)
assert('signblock_witness_asm' in block)
assert('signblock_witness_hex' in block)
signblockscript = make_signblockscript(self.num_keys, self.required_signers, self.keys)
assert_equal(info['signblock_asm'], self.nodes[0].decodescript(signblockscript)['asm'])
assert_equal(info['signblock_hex'], signblockscript)
开发者ID:ElementsProject,项目名称:elements,代码行数:32,代码来源:feature_blocksign.py
示例2: test_auxpow
def test_auxpow(nodes):
"""
Test behaviour of getauxpow. Calling getauxpow should reserve
a key from the pool, but it should be released again if the
created block is not actually used. On the other hand, if the
auxpow is submitted and turned into a block, the keypool should
be drained.
"""
nodes[0].walletpassphrase('test', 12000)
nodes[0].keypoolrefill(1)
nodes[0].walletlock()
assert_equal (nodes[0].getwalletinfo()['keypoolsize'], 1)
nodes[0].getauxblock()
assert_equal (nodes[0].getwalletinfo()['keypoolsize'], 1)
nodes[0].generate(1)
assert_equal (nodes[0].getwalletinfo()['keypoolsize'], 1)
auxblock = nodes[0].getauxblock()
assert_equal (nodes[0].getwalletinfo()['keypoolsize'], 1)
target = reverseHex(auxblock['_target'])
solved = computeAuxpow(auxblock['hash'], target, True)
res = nodes[0].getauxblock(auxblock['hash'], solved)
assert res
assert_equal(nodes[0].getwalletinfo()['keypoolsize'], 0)
assert_raises_rpc_error(-12, 'Keypool ran out', nodes[0].getauxblock)
开发者ID:domob1812,项目名称:namecore,代码行数:28,代码来源:wallet_keypool.py
示例3: test_getblocktxn_response
def test_getblocktxn_response(compact_block, peer, expected_result):
msg = msg_cmpctblock(compact_block.to_p2p())
peer.send_and_ping(msg)
with mininode_lock:
assert("getblocktxn" in peer.last_message)
absolute_indexes = peer.last_message["getblocktxn"].block_txn_request.to_absolute()
assert_equal(absolute_indexes, expected_result)
开发者ID:Flowdalic,项目名称:bitcoin,代码行数:7,代码来源:p2p_compactblocks.py
示例4: test_rest_request
def test_rest_request(self, uri, http_method='GET', req_type=ReqType.JSON, body='', status=200, ret_type=RetType.JSON):
rest_uri = '/rest' + uri
if req_type == ReqType.JSON:
rest_uri += '.json'
elif req_type == ReqType.BIN:
rest_uri += '.bin'
elif req_type == ReqType.HEX:
rest_uri += '.hex'
conn = http.client.HTTPConnection(self.url.hostname, self.url.port)
self.log.debug('%s %s %s', http_method, rest_uri, body)
if http_method == 'GET':
conn.request('GET', rest_uri)
elif http_method == 'POST':
conn.request('POST', rest_uri, body)
resp = conn.getresponse()
assert_equal(resp.status, status)
if ret_type == RetType.OBJ:
return resp
elif ret_type == RetType.BYTES:
return resp.read()
elif ret_type == RetType.JSON:
return json.loads(resp.read().decode('utf-8'), parse_float=Decimal)
开发者ID:kanzure,项目名称:bitcoin,代码行数:25,代码来源:interface_rest.py
示例5: create_tx
def create_tx(self, from_txid, to_address, amount):
inputs = [{ "txid" : from_txid, "vout" : 0}]
outputs = { to_address : amount }
rawtx = self.nodes[0].createrawtransaction(inputs, outputs)
signresult = self.nodes[0].signrawtransaction(rawtx)
assert_equal(signresult["complete"], True)
return signresult["hex"]
开发者ID:bitcartel,项目名称:zcash,代码行数:7,代码来源:mempool_resurrect_test.py
示例6: _test_getnettotals
def _test_getnettotals(self):
# getnettotals totalbytesrecv and totalbytessent should be
# consistent with getpeerinfo. Since the RPC calls are not atomic,
# and messages might have been recvd or sent between RPC calls, call
# getnettotals before and after and verify that the returned values
# from getpeerinfo are bounded by those values.
net_totals_before = self.nodes[0].getnettotals()
peer_info = self.nodes[0].getpeerinfo()
net_totals_after = self.nodes[0].getnettotals()
assert_equal(len(peer_info), 2)
peers_recv = sum([peer['bytesrecv'] for peer in peer_info])
peers_sent = sum([peer['bytessent'] for peer in peer_info])
assert_greater_than_or_equal(peers_recv, net_totals_before['totalbytesrecv'])
assert_greater_than_or_equal(net_totals_after['totalbytesrecv'], peers_recv)
assert_greater_than_or_equal(peers_sent, net_totals_before['totalbytessent'])
assert_greater_than_or_equal(net_totals_after['totalbytessent'], peers_sent)
# test getnettotals and getpeerinfo by doing a ping
# the bytes sent/received should change
# note ping and pong are 32 bytes each
self.nodes[0].ping()
wait_until(lambda: (self.nodes[0].getnettotals()['totalbytessent'] >= net_totals_after['totalbytessent'] + 32 * 2), timeout=1)
wait_until(lambda: (self.nodes[0].getnettotals()['totalbytesrecv'] >= net_totals_after['totalbytesrecv'] + 32 * 2), timeout=1)
peer_info_after_ping = self.nodes[0].getpeerinfo()
for before, after in zip(peer_info, peer_info_after_ping):
assert_greater_than_or_equal(after['bytesrecv_per_msg']['pong'], before['bytesrecv_per_msg']['pong'] + 32)
assert_greater_than_or_equal(after['bytessent_per_msg']['ping'], before['bytessent_per_msg']['ping'] + 32)
开发者ID:GlobalBoost,项目名称:GlobalBoost-Y,代码行数:29,代码来源:rpc_net.py
示例7: _test_getblockheader
def _test_getblockheader(self):
node = self.nodes[0]
assert_raises_rpc_error(-5, "Block not found",
node.getblockheader, "nonsense")
besthash = node.getbestblockhash()
secondbesthash = node.getblockhash(199)
header = node.getblockheader(besthash)
assert_equal(header['hash'], besthash)
assert_equal(header['height'], 200)
assert_equal(header['confirmations'], 1)
assert_equal(header['previousblockhash'], secondbesthash)
assert_is_hex_string(header['chainwork'])
assert_equal(header['nTx'], 1)
assert_is_hash_string(header['hash'])
assert_is_hash_string(header['previousblockhash'])
assert_is_hash_string(header['merkleroot'])
assert_is_hash_string(header['bits'], length=None)
assert isinstance(header['time'], int)
assert isinstance(header['mediantime'], int)
assert isinstance(header['nonce'], int)
assert isinstance(header['version'], int)
assert isinstance(int(header['versionHex'], 16), int)
assert isinstance(header['difficulty'], Decimal)
开发者ID:viacoin,项目名称:viacoin,代码行数:26,代码来源:rpc_blockchain.py
示例8: test_create_submit_auxblock
def test_create_submit_auxblock (self):
"""
Test the createauxblock / submitauxblock method pair.
"""
# Check for errors with wrong parameters.
assert_raises_rpc_error (-1, None, self.nodes[0].createauxblock)
assert_raises_rpc_error (-5, "Invalid coinbase payout address",
self.nodes[0].createauxblock,
"this_an_invalid_address")
# Fix a coinbase address and construct methods for it.
coinbaseAddr = self.nodes[0].getnewaddress ()
def create ():
return self.nodes[0].createauxblock (coinbaseAddr)
submit = self.nodes[0].submitauxblock
# Run common tests.
self.test_common (create, submit)
# Ensure that the payout address is the one which we specify
hash1 = mineAuxpowBlockWithMethods (create, submit)
hash2 = mineAuxpowBlockWithMethods (create, submit)
self.sync_all ()
addr1 = getCoinbaseAddr (self.nodes[1], hash1)
addr2 = getCoinbaseAddr (self.nodes[1], hash2)
assert_equal (addr1, coinbaseAddr)
assert_equal (addr2, coinbaseAddr)
开发者ID:domob1812,项目名称:namecore,代码行数:28,代码来源:auxpow_mining.py
示例9: wait_and_assert_operationid_status
def wait_and_assert_operationid_status(self, node, myopid, in_status='success', in_errormsg=None):
print('waiting for async operation {}'.format(myopid))
opids = []
opids.append(myopid)
timeout = 300
status = None
errormsg = None
txid = None
for x in xrange(1, timeout):
results = node.z_getoperationresult(opids)
if len(results)==0:
time.sleep(1)
else:
print("Results", results[0])
status = results[0]["status"]
if status == "failed":
errormsg = results[0]['error']['message']
elif status == "success":
txid = results[0]['result']['txid']
break
print('...returned status: {}'.format(status))
assert_equal(in_status, status)
if errormsg is not None:
assert(in_errormsg is not None)
assert_equal(in_errormsg in errormsg, True)
print('...returned error: {}'.format(errormsg))
return txid
开发者ID:diantaowang,项目名称:zcash,代码行数:27,代码来源:zkey_import_export.py
示例10: checkNameValueAddr
def checkNameValueAddr (self, name, value, addr):
"""
Verifies that the given name has the given value and address.
"""
data = self.checkName (0, name, value, None, False)
assert_equal (data['address'], addr)
开发者ID:domob1812,项目名称:namecore,代码行数:7,代码来源:name_segwit.py
示例11: run_test
def run_test(self):
gen_node = self.nodes[0] # The block and tx generating node
gen_node.generate(1)
inbound_peer = self.nodes[0].add_p2p_connection(P2PNode()) # An "attacking" inbound peer
MAX_REPEATS = 100
self.log.info("Running test up to {} times.".format(MAX_REPEATS))
for i in range(MAX_REPEATS):
self.log.info('Run repeat {}'.format(i + 1))
txid = gen_node.sendtoaddress(gen_node.getnewaddress(), 0.01)
want_tx = msg_getdata()
want_tx.inv.append(CInv(t=1, h=int(txid, 16)))
inbound_peer.last_message.pop('notfound', None)
inbound_peer.send_message(want_tx)
inbound_peer.sync_with_ping()
if inbound_peer.last_message.get('notfound'):
self.log.debug('tx {} was not yet announced to us.'.format(txid))
self.log.debug("node has responded with a notfound message. End test.")
assert_equal(inbound_peer.last_message['notfound'].vec[0].hash, int(txid, 16))
inbound_peer.last_message.pop('notfound')
break
else:
self.log.debug('tx {} was already announced to us. Try test again.'.format(txid))
assert int(txid, 16) in [inv.hash for inv in inbound_peer.last_message['inv'].inv]
开发者ID:CubanCorona,项目名称:bitcoin,代码行数:27,代码来源:p2p_leak_tx.py
示例12: run_test
def run_test (self):
print "Mining blocks..."
self.nodes[0].generate(1)
do_not_shield_taddr = self.nodes[0].getnewaddress()
self.nodes[0].generate(4)
walletinfo = self.nodes[0].getwalletinfo()
assert_equal(walletinfo['immature_balance'], 50)
assert_equal(walletinfo['balance'], 0)
self.sync_all()
self.nodes[2].generate(1)
self.nodes[2].getnewaddress()
self.nodes[2].generate(1)
self.nodes[2].getnewaddress()
self.nodes[2].generate(1)
self.sync_all()
self.nodes[1].generate(101)
self.sync_all()
assert_equal(self.nodes[0].getbalance(), 50)
assert_equal(self.nodes[1].getbalance(), 10)
assert_equal(self.nodes[2].getbalance(), 30)
# Prepare to send taddr->zaddr
mytaddr = self.nodes[0].getnewaddress()
myzaddr = self.nodes[0].z_getnewaddress()
# Shielding will fail when trying to spend from watch-only address
self.nodes[2].importaddress(mytaddr)
try:
self.nodes[2].z_shieldcoinbase(mytaddr, myzaddr)
except JSONRPCException,e:
errorString = e.error['message']
开发者ID:Whiteblock,项目名称:zcash,代码行数:33,代码来源:wallet_shieldcoinbase.py
示例13: run_test
def run_test(self):
node = self.nodes[0]
self.log.info("test getmemoryinfo")
memory = node.getmemoryinfo()['locked']
assert_greater_than(memory['used'], 0)
assert_greater_than(memory['free'], 0)
assert_greater_than(memory['total'], 0)
# assert_greater_than_or_equal() for locked in case locking pages failed at some point
assert_greater_than_or_equal(memory['locked'], 0)
assert_greater_than(memory['chunks_used'], 0)
assert_greater_than(memory['chunks_free'], 0)
assert_equal(memory['used'] + memory['free'], memory['total'])
self.log.info("test mallocinfo")
try:
mallocinfo = node.getmemoryinfo(mode="mallocinfo")
self.log.info('getmemoryinfo(mode="mallocinfo") call succeeded')
tree = ET.fromstring(mallocinfo)
assert_equal(tree.tag, 'malloc')
except JSONRPCException:
self.log.info('getmemoryinfo(mode="mallocinfo") not available')
assert_raises_rpc_error(-8, 'mallocinfo is only available when compiled with glibc 2.10+', node.getmemoryinfo, mode="mallocinfo")
assert_raises_rpc_error(-8, "unknown mode foobar", node.getmemoryinfo, mode="foobar")
开发者ID:JeremyRubin,项目名称:bitcoin,代码行数:25,代码来源:rpc_misc.py
示例14: mine_block
def mine_block(self, make_transactions):
# mine block in round robin sense: depending on the block number, a node
# is selected to create the block, others sign it and the selected node
# broadcasts it
mineridx = self.nodes[0].getblockcount() % self.num_nodes # assuming in sync
mineridx_next = (self.nodes[0].getblockcount() + 1) % self.num_nodes
miner = self.nodes[mineridx]
miner_next = self.nodes[mineridx_next]
blockcount = miner.getblockcount()
# Make a few transactions to make non-empty blocks for compact transmission
if make_transactions:
print(mineridx)
for i in range(5):
miner.sendtoaddress(miner_next.getnewaddress(), int(miner.getbalance()['bitcoin']/10), "", "", True)
# miner makes a block
block = miner.getnewblockhex()
# other signing nodes get fed compact blocks
for i in range(self.num_keys):
if i == mineridx:
continue
sketch = miner.getcompactsketch(block)
compact_response = self.nodes[i].consumecompactsketch(sketch)
if make_transactions:
block_txn = self.nodes[i].consumegetblocktxn(block, compact_response["block_tx_req"])
final_block = self.nodes[i].finalizecompactblock(sketch, block_txn, compact_response["found_transactions"])
else:
# If there's only coinbase, it should succeed immediately
final_block = compact_response["blockhex"]
# Block should be complete, sans signatures
self.nodes[i].testproposedblock(final_block)
# non-signing node can not sign
assert_raises_rpc_error(-25, "Could not sign the block.", self.nodes[-1].signblock, block)
# collect num_keys signatures from signers, reduce to required_signers sigs during combine
sigs = []
for i in range(self.num_keys):
result = miner.combineblocksigs(block, sigs)
sigs = sigs + self.nodes[i].signblock(block)
assert_equal(result["complete"], i >= self.required_signers)
# submitting should have no effect pre-threshhold
if i < self.required_signers:
miner.submitblock(result["hex"])
self.check_height(blockcount)
result = miner.combineblocksigs(block, sigs)
assert_equal(result["complete"], True)
# All signing nodes must submit... we're not connected!
self.nodes[0].submitblock(result["hex"])
early_proposal = self.nodes[0].getnewblockhex() # testproposedblock should reject
# Submit blocks to all other signing nodes next, as well as too-early block proposal
for i in range(1, self.num_keys):
assert_raises_rpc_error(-25, "proposal was not based on our best chain", self.nodes[i].testproposedblock, early_proposal)
self.nodes[i].submitblock(result["hex"])
# All nodes should be synced in blocks and transactions(mempool should be empty)
self.sync_all()
开发者ID:ElementsProject,项目名称:elements,代码行数:60,代码来源:feature_blocksign.py
示例15: test_small_output_with_feerate_succeeds
def test_small_output_with_feerate_succeeds(rbf_node, dest_address):
# Make sure additional inputs exist
rbf_node.generatetoaddress(101, rbf_node.getnewaddress())
rbfid = spend_one_input(rbf_node, dest_address)
original_input_list = rbf_node.getrawtransaction(rbfid, 1)["vin"]
assert_equal(len(original_input_list), 1)
original_txin = original_input_list[0]
# Keep bumping until we out-spend change output
tx_fee = 0
while tx_fee < Decimal("0.0005"):
new_input_list = rbf_node.getrawtransaction(rbfid, 1)["vin"]
new_item = list(new_input_list)[0]
assert_equal(len(original_input_list), 1)
assert_equal(original_txin["txid"], new_item["txid"])
assert_equal(original_txin["vout"], new_item["vout"])
rbfid_new_details = rbf_node.bumpfee(rbfid)
rbfid_new = rbfid_new_details["txid"]
raw_pool = rbf_node.getrawmempool()
assert rbfid not in raw_pool
assert rbfid_new in raw_pool
rbfid = rbfid_new
tx_fee = rbfid_new_details["origfee"]
# input(s) have been added
final_input_list = rbf_node.getrawtransaction(rbfid, 1)["vin"]
assert_greater_than(len(final_input_list), 1)
# Original input is in final set
assert [txin for txin in final_input_list
if txin["txid"] == original_txin["txid"]
and txin["vout"] == original_txin["vout"]]
rbf_node.generatetoaddress(1, rbf_node.getnewaddress())
assert_equal(rbf_node.gettransaction(rbfid)["confirmations"], 1)
开发者ID:dgenr8,项目名称:bitcoin,代码行数:34,代码来源:wallet_bumpfee.py
示例16: run_test
def run_test(self):
self.nodes[0].generate(3)
stop_node(self.nodes[0], 0)
wait_bitcoinds()
self.nodes[0]=start_node(0, self.options.tmpdir, ["-debug", "-reindex", "-checkblockindex=1"])
assert_equal(self.nodes[0].getblockcount(), 3)
print "Success"
开发者ID:bitcartel,项目名称:zcash,代码行数:7,代码来源:reindex.py
示例17: _test_getblockheader
def _test_getblockheader(self):
node = self.nodes[0]
assert_raises_rpc_error(-8, "hash must be of length 64 (not 8, for 'nonsense')", node.getblockheader, "nonsense")
assert_raises_rpc_error(-8, "hash must be hexadecimal string (not 'ZZZ7bb8b1697ea987f3b223ba7819250cae33efacb068d23dc24859824a77844')", node.getblockheader, "ZZZ7bb8b1697ea987f3b223ba7819250cae33efacb068d23dc24859824a77844")
assert_raises_rpc_error(-5, "Block not found", node.getblockheader, "0cf7bb8b1697ea987f3b223ba7819250cae33efacb068d23dc24859824a77844")
besthash = node.getbestblockhash()
secondbesthash = node.getblockhash(199)
header = node.getblockheader(blockhash=besthash)
assert_equal(header['hash'], besthash)
assert_equal(header['height'], 200)
assert_equal(header['confirmations'], 1)
assert_equal(header['previousblockhash'], secondbesthash)
assert_is_hex_string(header['chainwork'])
assert_equal(header['nTx'], 1)
assert_is_hash_string(header['hash'])
assert_is_hash_string(header['previousblockhash'])
assert_is_hash_string(header['merkleroot'])
assert_is_hash_string(header['bits'], length=None)
assert isinstance(header['time'], int)
assert isinstance(header['mediantime'], int)
assert isinstance(header['nonce'], int)
assert isinstance(header['version'], int)
assert isinstance(int(header['versionHex'], 16), int)
assert isinstance(header['difficulty'], Decimal)
开发者ID:domob1812,项目名称:namecore,代码行数:27,代码来源:rpc_blockchain.py
示例18: assert_pool_balance
def assert_pool_balance(self, node, name, balance):
pools = node.getblockchaininfo()['valuePools']
for pool in pools:
if pool['id'] == name:
assert_equal(pool['chainValue'], balance, message="for pool named %r" % (name,))
return
assert False, "pool named %r not found" % (name,)
开发者ID:zcash,项目名称:zcash,代码行数:7,代码来源:turnstile.py
示例19: successful_signing_test
def successful_signing_test(self):
"""Create and sign a valid raw transaction with one input.
Expected results:
1) The transaction has a complete set of signatures
2) No script verification error occurred"""
privKeys = ['cUeKHd5orzT3mz8P9pxyREHfsWtVfgsfDjiZZBcjUBAaGk1BTj7N', 'cVKpPfVKSJxKqVpE9awvXNWuLHCa5j5tiE7K6zbUSptFpTEtiFrA']
inputs = [
# Valid pay-to-pubkey scripts
{'txid': '9b907ef1e3c26fc71fe4a4b3580bc75264112f95050014157059c736f0202e71', 'vout': 0,
'scriptPubKey': '76a91460baa0f494b38ce3c940dea67f3804dc52d1fb9488ac'},
{'txid': '83a4f6a6b73660e13ee6cb3c6063fa3759c50c9b7521d0536022961898f4fb02', 'vout': 0,
'scriptPubKey': '76a914669b857c03a5ed269d5d85a1ffac9ed5d663072788ac'},
]
outputs = {'mpLQjfK79b7CCV4VMJWEWAj5Mpx8Up5zxB': 0.1}
rawTx = self.nodes[0].createrawtransaction(inputs, outputs)
rawTxSigned = self.nodes[0].signrawtransactionwithkey(rawTx, privKeys, inputs)
# 1) The transaction has a complete set of signatures
assert rawTxSigned['complete']
# 2) No script verification error occurred
assert 'errors' not in rawTxSigned
# Perform the same test on signrawtransaction
rawTxSigned2 = self.nodes[0].signrawtransaction(rawTx, inputs, privKeys)
assert_equal(rawTxSigned, rawTxSigned2)
开发者ID:machinecoin-project,项目名称:machinecoin-core,代码行数:31,代码来源:rpc_signrawtransaction.py
示例20: create_transactions
def create_transactions(node, address, amt, fees):
# Create and sign raw transactions from node to address for amt.
# Creates a transaction for each fee and returns an array
# of the raw transactions.
utxos = [u for u in node.listunspent(0) if u['spendable']]
# Create transactions
inputs = []
ins_total = 0
for utxo in utxos:
inputs.append({"txid": utxo["txid"], "vout": utxo["vout"]})
ins_total += utxo['amount']
if ins_total >= amt + max(fees):
break
# make sure there was enough utxos
assert ins_total >= amt + max(fees)
txs = []
for fee in fees:
outputs = {address: amt}
# prevent 0 change output
if ins_total > amt + fee:
outputs[node.getrawchangeaddress()] = ins_total - amt - fee
raw_tx = node.createrawtransaction(inputs, outputs, 0, True)
raw_tx = node.signrawtransactionwithwallet(raw_tx)
assert_equal(raw_tx['complete'], True)
txs.append(raw_tx)
return txs
开发者ID:sudosurootdev,项目名称:bitcoin,代码行数:29,代码来源:wallet_balance.py
注:本文中的test_framework.util.assert_equal函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论