class FullBlockTest(ComparisonTestFramework):
# Can either run this test as 1 node with expected answers, or two and compare them.
# Change the "outcome" variable from each TestInstance object to only do
# the comparison.
def __init__(self):
super().__init__()
self.num_nodes = 1
self.block_heights = {}
self.coinbase_key = CECKey()
self.coinbase_key.set_secretbytes(b"fatstacks")
self.coinbase_pubkey = self.coinbase_key.get_pubkey()
self.tip = None
self.blocks = {}
self.excessive_block_size = 16 * ONE_MEGABYTE
self.extra_args = [['-norelaypriority',
'-whitelist=127.0.0.1',
'-limitancestorcount=9999',
'-limitancestorsize=9999',
'-limitdescendantcount=9999',
'-limitdescendantsize=9999',
'-maxmempool=999',
"-excessiveblocksize=%d"
% self.excessive_block_size]]
def add_options(self, parser):
super().add_options(parser)
parser.add_option(
"--runbarelyexpensive", dest="runbarelyexpensive", default=True)
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
# Start up network handling in another thread
NetworkThread().start()
# Set the blocksize to 2MB as initial condition
self.nodes[0].setexcessiveblock(self.excessive_block_size)
self.test.run()
def add_transactions_to_block(self, block, tx_list):
[tx.rehash() for tx in tx_list]
block.vtx.extend(tx_list)
# this is a little handier to use than the version in blocktools.py
def create_tx(self, spend_tx, n, value, script=CScript([OP_TRUE])):
tx = create_transaction(spend_tx, n, b"", value, script)
return tx
# sign a transaction, using the key we know about
# this signs input 0 in tx, which is assumed to be spending output n in
# spend_tx
def sign_tx(self, tx, spend_tx, n):
scriptPubKey = bytearray(spend_tx.vout[n].scriptPubKey)
if (scriptPubKey[0] == OP_TRUE): # an anyone-can-spend
tx.vin[0].scriptSig = CScript()
return
sighash = SignatureHashForkId(
spend_tx.vout[n].scriptPubKey, tx, 0, SIGHASH_ALL | SIGHASH_FORKID, spend_tx.vout[n].nValue)
tx.vin[0].scriptSig = CScript(
[self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))])
def create_and_sign_transaction(self, spend_tx, n, value, script=CScript([OP_TRUE])):
tx = self.create_tx(spend_tx, n, value, script)
self.sign_tx(tx, spend_tx, n)
tx.rehash()
return tx
def next_block(self, number, spend=None, additional_coinbase_value=0, script=None, extra_sigops=0, block_size=0, solve=True):
"""
Create a block on top of self.tip, and advance self.tip to point to the new block
if spend is specified, then 1 satoshi will be spent from that to an anyone-can-spend
output, and rest will go to fees.
"""
if self.tip == None:
base_block_hash = self.genesis_hash
block_time = int(time.time()) + 1
else:
base_block_hash = self.tip.sha256
block_time = self.tip.nTime + 1
# First create the coinbase
height = self.block_heights[base_block_hash] + 1
coinbase = create_coinbase(height, self.coinbase_pubkey)
coinbase.vout[0].nValue += additional_coinbase_value
if (spend != None):
coinbase.vout[0].nValue += spend.tx.vout[
spend.n].nValue - 1 # all but one satoshi to fees
coinbase.rehash()
block = create_block(base_block_hash, coinbase, block_time)
spendable_output = None
if (spend != None):
tx = CTransaction()
# no signature yet
tx.vin.append(
CTxIn(COutPoint(spend.tx.sha256, spend.n), b"", 0xffffffff))
# We put some random data into the first transaction of the chain
# to randomize ids
tx.vout.append(
CTxOut(0, CScript([random.randint(0, 255), OP_DROP, OP_TRUE])))
if script == None:
#.........这里部分代码省略.........
def run_test(self):
p2p0 = self.nodes[0].add_p2p_connection(BaseNode())
# Build the blockchain
self.tip = int(self.nodes[0].getbestblockhash(), 16)
self.block_time = self.nodes[0].getblock(self.nodes[0].getbestblockhash())['time'] + 1
self.blocks = []
# Get a pubkey for the coinbase TXO
coinbase_key = CECKey()
coinbase_key.set_secretbytes(b"horsebattery")
coinbase_pubkey = coinbase_key.get_pubkey()
# Create the first block with a coinbase output to our key
height = 1
block = create_block(self.tip, create_coinbase(height, coinbase_pubkey), self.block_time)
self.blocks.append(block)
self.block_time += 1
block.solve()
# Save the coinbase for later
self.block1 = block
self.tip = block.sha256
height += 1
# Bury the block 100 deep so the coinbase output is spendable
for i in range(100):
block = create_block(self.tip, create_coinbase(height), self.block_time)
block.solve()
self.blocks.append(block)
self.tip = block.sha256
self.block_time += 1
height += 1
# Create a transaction spending the coinbase output with an invalid (null) signature
tx = CTransaction()
tx.vin.append(CTxIn(COutPoint(self.block1.vtx[0].sha256, 0), scriptSig=b""))
tx.vout.append(CTxOut(49 * 100000000, CScript([OP_TRUE])))
tx.calc_sha256()
block102 = create_block(self.tip, create_coinbase(height), self.block_time)
self.block_time += 1
block102.vtx.extend([tx])
block102.hashMerkleRoot = block102.calc_merkle_root()
block102.rehash()
block102.solve()
self.blocks.append(block102)
self.tip = block102.sha256
self.block_time += 1
height += 1
# Bury the assumed valid block 2100 deep
for i in range(2100):
block = create_block(self.tip, create_coinbase(height), self.block_time)
block.nVersion = 4
block.solve()
self.blocks.append(block)
self.tip = block.sha256
self.block_time += 1
height += 1
self.nodes[0].disconnect_p2ps()
# Start node1 and node2 with assumevalid so they accept a block with a bad signature.
self.start_node(1, extra_args=["-assumevalid=" + hex(block102.sha256)])
self.start_node(2, extra_args=["-assumevalid=" + hex(block102.sha256)])
p2p0 = self.nodes[0].add_p2p_connection(BaseNode())
p2p1 = self.nodes[1].add_p2p_connection(BaseNode())
p2p2 = self.nodes[2].add_p2p_connection(BaseNode())
# send header lists to all three nodes
p2p0.send_header_for_blocks(self.blocks[0:2000])
p2p0.send_header_for_blocks(self.blocks[2000:])
p2p1.send_header_for_blocks(self.blocks[0:2000])
p2p1.send_header_for_blocks(self.blocks[2000:])
p2p2.send_header_for_blocks(self.blocks[0:200])
# Send blocks to node0. Block 102 will be rejected.
self.send_blocks_until_disconnected(p2p0)
self.assert_blockchain_height(self.nodes[0], 101)
# Send all blocks to node1. All blocks will be accepted.
for i in range(2202):
p2p1.send_message(msg_block(self.blocks[i]))
# Syncing 2200 blocks can take a while on slow systems. Give it plenty of time to sync.
p2p1.sync_with_ping(120)
assert_equal(self.nodes[1].getblock(self.nodes[1].getbestblockhash())['height'], 2202)
# Send blocks to node2. Block 102 will be rejected.
self.send_blocks_until_disconnected(p2p2)
self.assert_blockchain_height(self.nodes[2], 101)
class FullBlockTest(ComparisonTestFramework):
''' Can either run this test as 1 node with expected answers, or two and compare them.
Change the "outcome" variable from each TestInstance object to only do the comparison. '''
def __init__(self):
self.num_nodes = 1
self.block_heights = {}
self.coinbase_key = CECKey()
self.coinbase_key.set_secretbytes(b"horsebattery")
self.coinbase_pubkey = self.coinbase_key.get_pubkey()
self.block_time = int(time.time())+1
self.tip = None
self.blocks = {}
def run_test(self):
test = TestManager(self, self.options.tmpdir)
test.add_all_connections(self.nodes)
NetworkThread().start() # Start up network handling in another thread
test.run()
def add_transactions_to_block(self, block, tx_list):
[ tx.rehash() for tx in tx_list ]
block.vtx.extend(tx_list)
block.hashMerkleRoot = block.calc_merkle_root()
block.rehash()
return block
# Create a block on top of self.tip, and advance self.tip to point to the new block
# if spend is specified, then 1 satoshi will be spent from that to an anyone-can-spend output,
# and rest will go to fees.
def next_block(self, number, spend=None, additional_coinbase_value=0, script=None):
if self.tip == None:
base_block_hash = self.genesis_hash
else:
base_block_hash = self.tip.sha256
# First create the coinbase
height = self.block_heights[base_block_hash] + 1
coinbase = create_coinbase(height, self.coinbase_pubkey)
coinbase.vout[0].nValue += additional_coinbase_value
if (spend != None):
coinbase.vout[0].nValue += spend.tx.vout[spend.n].nValue - 1 # all but one satoshi to fees
coinbase.rehash()
block = create_block(base_block_hash, coinbase, self.block_time)
if (spend != None):
tx = CTransaction()
tx.vin.append(CTxIn(COutPoint(spend.tx.sha256, spend.n), b"", 0xffffffff)) # no signature yet
# This copies the java comparison tool testing behavior: the first
# txout has a garbage scriptPubKey, "to make sure we're not
# pre-verifying too much" (?)
tx.vout.append(CTxOut(0, CScript([random.randint(0,255), height & 255])))
if script == None:
tx.vout.append(CTxOut(1, CScript([OP_TRUE])))
else:
tx.vout.append(CTxOut(1, script))
# Now sign it if necessary
scriptSig = b""
scriptPubKey = bytearray(spend.tx.vout[spend.n].scriptPubKey)
if (scriptPubKey[0] == OP_TRUE): # looks like an anyone-can-spend
scriptSig = CScript([OP_TRUE])
else:
# We have to actually sign it
(sighash, err) = SignatureHash(spend.tx.vout[spend.n].scriptPubKey, tx, 0, SIGHASH_ALL)
scriptSig = CScript([self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL]))])
tx.vin[0].scriptSig = scriptSig
# Now add the transaction to the block
block = self.add_transactions_to_block(block, [tx])
block.solve()
self.tip = block
self.block_heights[block.sha256] = height
self.block_time += 1
assert number not in self.blocks
self.blocks[number] = block
return block
def get_tests(self):
self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
self.block_heights[self.genesis_hash] = 0
spendable_outputs = []
# save the current tip so it can be spent by a later block
def save_spendable_output():
spendable_outputs.append(self.tip)
# get an output that we previous marked as spendable
def get_spendable_output():
return PreviousSpendableOutput(spendable_outputs.pop(0).vtx[0], 0)
# returns a test case that asserts that the current tip was accepted
def accepted():
return TestInstance([[self.tip, True]])
# returns a test case that asserts that the current tip was rejected
def rejected(reject = None):
if reject is None:
return TestInstance([[self.tip, False]])
else:
return TestInstance([[self.tip, reject]])
# move the tip back to a previous block
def tip(number):
#.........这里部分代码省略.........
class FullBlockTest(ComparisonTestFramework):
''' Can either run this test as 1 node with expected answers, or two and compare them.
Change the "outcome" variable from each TestInstance object to only do the comparison. '''
def __init__(self):
self.num_nodes = 1
self.block_heights = {}
self.coinbase_key = CECKey()
self.coinbase_key.set_secretbytes(bytes("horsebattery"))
self.coinbase_pubkey = self.coinbase_key.get_pubkey()
self.block_time = int(time.time())+1
self.tip = None
self.blocks = {}
def run_test(self):
test = TestManager(self, self.options.tmpdir)
test.add_all_connections(self.nodes)
NetworkThread().start() # Start up network handling in another thread
test.run()
def add_transactions_to_block(self, block, tx_list):
[ tx.rehash() for tx in tx_list ]
block.vtx.extend(tx_list)
block.hashMerkleRoot = block.calc_merkle_root()
block.rehash()
return block
# Create a block on top of self.tip, and advance self.tip to point to the new block
# if spend is specified, then 1 satoshi will be spent from that to an anyone-can-spend output,
# and rest will go to fees.
def next_block(self, number, spend=None, additional_coinbase_value=0, script=None):
if self.tip == None:
base_block_hash = self.genesis_hash
else:
base_block_hash = self.tip.sha256
# First create the coinbase
height = self.block_heights[base_block_hash] + 1
coinbase = create_coinbase(height, self.coinbase_pubkey)
coinbase.vout[0].nValue += additional_coinbase_value
if (spend != None):
coinbase.vout[0].nValue += spend.tx.vout[spend.n].nValue - 1 # all but one satoshi to fees
coinbase.rehash()
block = create_block(base_block_hash, coinbase, self.block_time)
if (spend != None):
tx = CTransaction()
tx.vin.append(CTxIn(COutPoint(spend.tx.sha256, spend.n), "", 0xffffffff)) # no signature yet
# This copies the java comparison tool testing behavior: the first
# txout has a garbage scriptPubKey, "to make sure we're not
# pre-verifying too much" (?)
tx.vout.append(CTxOut(0, CScript([random.randint(0,255), height & 255])))
if script == None:
tx.vout.append(CTxOut(1, CScript([OP_TRUE])))
else:
tx.vout.append(CTxOut(1, script))
# Now sign it if necessary
scriptSig = ""
scriptPubKey = bytearray(spend.tx.vout[spend.n].scriptPubKey)
if (scriptPubKey[0] == OP_TRUE): # looks like an anyone-can-spend
scriptSig = CScript([OP_TRUE])
else:
# We have to actually sign it
(sighash, err) = SignatureHash(spend.tx.vout[spend.n].scriptPubKey, tx, 0, SIGHASH_ALL)
scriptSig = CScript([self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL]))])
tx.vin[0].scriptSig = scriptSig
# Now add the transaction to the block
block = self.add_transactions_to_block(block, [tx])
block.solve()
self.tip = block
self.block_heights[block.sha256] = height
self.block_time += 1
assert number not in self.blocks
self.blocks[number] = block
return block
def get_tests(self):
self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
self.block_heights[self.genesis_hash] = 0
spendable_outputs = []
# save the current tip so it can be spent by a later block
def save_spendable_output():
spendable_outputs.append(self.tip)
# get an output that we previous marked as spendable
def get_spendable_output():
return PreviousSpendableOutput(spendable_outputs.pop(0).vtx[0], 0)
# returns a test case that asserts that the current tip was accepted
def accepted():
return TestInstance([[self.tip, True]])
# returns a test case that asserts that the current tip was rejected
def rejected():
return TestInstance([[self.tip, False]])
# move the tip back to a previous block
def tip(number):
self.tip = self.blocks[number]
# creates a new block and advances the tip to that block
#.........这里部分代码省略.........
def get_tests(self):
self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
self.block_heights[self.genesis_hash] = 0
spendable_outputs = []
# save the current tip so it can be spent by a later block
def save_spendable_output():
spendable_outputs.append(self.tip)
# get an output that we previously marked as spendable
def get_spendable_output():
return PreviousSpendableOutput(spendable_outputs.pop(0).vtx[0], 0)
# returns a test case that asserts that the current tip was accepted
def accepted():
return TestInstance([[self.tip, True]])
# returns a test case that asserts that the current tip was rejected
def rejected(reject=None):
if reject is None:
return TestInstance([[self.tip, False]])
else:
return TestInstance([[self.tip, reject]])
# move the tip back to a previous block
def tip(number):
self.tip = self.blocks[number]
# adds transactions to the block and updates state
def update_block(block_number, new_transactions):
[tx.rehash() for tx in new_transactions]
block = self.blocks[block_number]
block.vtx.extend(new_transactions)
old_sha256 = block.sha256
block.hashMerkleRoot = block.calc_merkle_root()
block.solve()
# Update the internal state just like in next_block
self.tip = block
if block.sha256 != old_sha256:
self.block_heights[
block.sha256] = self.block_heights[old_sha256]
del self.block_heights[old_sha256]
self.blocks[block_number] = block
return block
# shorthand for functions
block = self.next_block
node = self.nodes[0]
# Create a new block
block(0)
save_spendable_output()
yield accepted()
# Now we need that block to mature so we can spend the coinbase.
test = TestInstance(sync_every_block=False)
for i in range(99):
block(5000 + i)
test.blocks_and_transactions.append([self.tip, True])
save_spendable_output()
yield test
# collect spendable outputs now to avoid cluttering the code later on
out = []
for i in range(100):
out.append(get_spendable_output())
# Generate a key pair to test P2SH sigops count
private_key = CECKey()
private_key.set_secretbytes(b"replayprotection")
public_key = private_key.get_pubkey()
# This is a little handier to use than the version in blocktools.py
def create_fund_and_spend_tx(spend, forkvalue=0):
# Fund transaction
script = CScript([public_key, OP_CHECKSIG])
txfund = create_transaction(
spend.tx, spend.n, b'', 50 * COIN, script)
txfund.rehash()
# Spend transaction
txspend = CTransaction()
txspend.vout.append(CTxOut(50 * COIN - 1000, CScript([OP_TRUE])))
txspend.vin.append(CTxIn(COutPoint(txfund.sha256, 0), b''))
# Sign the transaction
sighashtype = (forkvalue << 8) | SIGHASH_ALL | SIGHASH_FORKID
sighash = SignatureHashForkId(
script, txspend, 0, sighashtype, 50 * COIN)
sig = private_key.sign(sighash) + \
bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))
txspend.vin[0].scriptSig = CScript([sig])
txspend.rehash()
return [txfund, txspend]
def send_transaction_to_mempool(tx):
tx_id = node.sendrawtransaction(ToHex(tx))
assert(tx_id in set(node.getrawmempool()))
return tx_id
#.........这里部分代码省略.........
class FullBlockTest(ComparisonTestFramework):
# Can either run this test as 1 node with expected answers, or two and compare them.
# Change the "outcome" variable from each TestInstance object to only do
# the comparison.
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.block_heights = {}
self.coinbase_key = CECKey()
self.coinbase_key.set_secretbytes(b"horsebattery")
self.coinbase_pubkey = self.coinbase_key.get_pubkey()
self.tip = None
self.blocks = {}
def setup_network(self):
self.extra_args = [['-norelaypriority']]
self.add_nodes(self.num_nodes, self.extra_args)
self.start_nodes()
def add_options(self, parser):
super().add_options(parser)
parser.add_option(
"--runbarelyexpensive", dest="runbarelyexpensive", default=True)
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
# Start up network handling in another thread
NetworkThread().start()
self.test.run()
def add_transactions_to_block(self, block, tx_list):
[tx.rehash() for tx in tx_list]
block.vtx.extend(tx_list)
# this is a little handier to use than the version in blocktools.py
def create_tx(self, spend_tx, n, value, script=CScript([OP_TRUE])):
tx = create_transaction(spend_tx, n, b"", value, script)
return tx
# sign a transaction, using the key we know about
# this signs input 0 in tx, which is assumed to be spending output n in
# spend_tx
def sign_tx(self, tx, spend_tx, n):
scriptPubKey = bytearray(spend_tx.vout[n].scriptPubKey)
if (scriptPubKey[0] == OP_TRUE): # an anyone-can-spend
tx.vin[0].scriptSig = CScript()
return
sighash = SignatureHashForkId(
spend_tx.vout[n].scriptPubKey, tx, 0, SIGHASH_ALL | SIGHASH_FORKID, spend_tx.vout[n].nValue)
tx.vin[0].scriptSig = CScript(
[self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))])
def create_and_sign_transaction(self, spend_tx, n, value, script=CScript([OP_TRUE])):
tx = self.create_tx(spend_tx, n, value, script)
self.sign_tx(tx, spend_tx, n)
tx.rehash()
return tx
def next_block(self, number, spend=None, additional_coinbase_value=0, script=CScript([OP_TRUE])):
if self.tip == None:
base_block_hash = self.genesis_hash
block_time = int(time.time()) + 1
else:
base_block_hash = self.tip.sha256
block_time = self.tip.nTime + 1
# First create the coinbase
height = self.block_heights[base_block_hash] + 1
coinbase = create_coinbase(height, self.coinbase_pubkey)
coinbase.vout[0].nValue += additional_coinbase_value
coinbase.rehash()
if spend == None:
block = create_block(base_block_hash, coinbase, block_time)
else:
# all but one satoshi to fees
coinbase.vout[0].nValue += spend.tx.vout[
spend.n].nValue - 1
coinbase.rehash()
block = create_block(base_block_hash, coinbase, block_time)
# spend 1 satoshi
tx = create_transaction(spend.tx, spend.n, b"", 1, script)
self.sign_tx(tx, spend.tx, spend.n)
self.add_transactions_to_block(block, [tx])
block.hashMerkleRoot = block.calc_merkle_root()
# Do PoW, which is very inexpensive on regnet
block.solve()
self.tip = block
self.block_heights[block.sha256] = height
assert number not in self.blocks
self.blocks[number] = block
return block
def get_tests(self):
self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
self.block_heights[self.genesis_hash] = 0
spendable_outputs = []
# save the current tip so it can be spent by a later block
#.........这里部分代码省略.........
请发表评论