本文整理汇总了Python中test_framework.comptool.TestManager类的典型用法代码示例。如果您正苦于以下问题:Python TestManager类的具体用法?Python TestManager怎么用?Python TestManager使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了TestManager类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: run_test
def run_test(self):
test = TestManager(self, self.options.tmpdir)
test.add_all_connections(self.nodes)
self.tip = None
self.block_time = None
network_thread_start()
test.run()
开发者ID:yavwa,项目名称:Shilling,代码行数:7,代码来源:p2p_invalid_tx.py
示例2: run_test
def run_test(self):
test = TestManager(self, self.options.tmpdir)
test.add_all_connections(self.nodes)
self.tip = None
self.block_time = None
NetworkThread().start() # Start up network handling in another thread
test.run()
开发者ID:thelonecrouton,项目名称:uniqredit,代码行数:7,代码来源:invalidblockrequest.py
示例3: run_test
def run_test(self):
self.address = self.nodes[0].getnewaddress()
self.ms_address = self.nodes[0].addmultisigaddress(1,[self.address])
self.wit_address = self.nodes[0].addwitnessaddress(self.address)
self.wit_ms_address = self.nodes[0].addwitnessaddress(self.ms_address)
test = TestManager(self, self.options.tmpdir)
test.add_all_connections(self.nodes)
NetworkThread().start() # Start up network handling in another thread
self.coinbase_blocks = self.nodes[0].generate(2) # Block 2
coinbase_txid = []
for i in self.coinbase_blocks:
coinbase_txid.append(self.nodes[0].getblock(i)['tx'][0])
self.nodes[0].generate(427) # Block 429
self.lastblockhash = self.nodes[0].getbestblockhash()
self.tip = int("0x" + self.lastblockhash, 0)
self.lastblockheight = 429
self.lastblocktime = int(time.time()) + 429
print ("Test 1: NULLDUMMY compliant base transactions should be accepted to mempool and mined before activation [430]")
test1txs = [self.create_transaction(self.nodes[0], coinbase_txid[0], self.ms_address, 49)]
txid1 = self.tx_submit(self.nodes[0], test1txs[0])
test1txs.append(self.create_transaction(self.nodes[0], txid1, self.ms_address, 48))
txid2 = self.tx_submit(self.nodes[0], test1txs[1])
test1txs.append(self.create_transaction(self.nodes[0], coinbase_txid[1], self.wit_ms_address, 49))
txid3 = self.tx_submit(self.nodes[0], test1txs[2])
self.block_submit(self.nodes[0], test1txs, False, True)
print ("Test 2: Non-NULLDUMMY base multisig transaction should not be accepted to mempool before activation")
test2tx = self.create_transaction(self.nodes[0], txid2, self.ms_address, 48)
trueDummy(test2tx)
txid4 = self.tx_submit(self.nodes[0], test2tx, NULLDUMMY_ERROR)
print ("Test 3: Non-NULLDUMMY base transactions should be accepted in a block before activation [431]")
self.block_submit(self.nodes[0], [test2tx], False, True)
print ("Test 4: Non-NULLDUMMY base multisig transaction is invalid after activation")
test4tx = self.create_transaction(self.nodes[0], txid4, self.address, 47)
test6txs=[CTransaction(test4tx)]
trueDummy(test4tx)
self.tx_submit(self.nodes[0], test4tx, NULLDUMMY_ERROR)
self.block_submit(self.nodes[0], [test4tx])
print ("Test 5: Non-NULLDUMMY P2WSH multisig transaction invalid after activation")
test5tx = self.create_transaction(self.nodes[0], txid3, self.wit_address, 48)
test6txs.append(CTransaction(test5tx))
test5tx.wit.vtxinwit[0].scriptWitness.stack[0] = b'\x01'
self.tx_submit(self.nodes[0], test5tx, NULLDUMMY_ERROR)
self.block_submit(self.nodes[0], [test5tx], True)
print ("Test 6: NULLDUMMY compliant base/witness transactions should be accepted to mempool and in block after activation [432]")
for i in test6txs:
self.tx_submit(self.nodes[0], i)
self.block_submit(self.nodes[0], test6txs, True, True)
开发者ID:MarcoFalke,项目名称:bitcoin,代码行数:54,代码来源:nulldummy.py
示例4: run_test
def run_test(self):
# Set up the comparison tool TestManager
test = TestManager(self, self.options.tmpdir)
test.add_all_connections(self.nodes)
# Load scripts
self.scripts = ScriptTestFile([script_valid_file, script_invalid_file])
self.scripts.load_files()
# Some variables we re-use between test instances (to build blocks)
self.tip = None
self.block_time = None
NetworkThread().start() # Start up network handling in another thread
test.run()
开发者ID:Whiteblock,项目名称:zcash,代码行数:15,代码来源:script_test.py
示例5: run_test
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
# Start up network handling in another thread
NetworkThread().start()
self.nodes[0].setmocktime(REPLAY_PROTECTION_START_TIME)
self.test.run()
开发者ID:a7853z,项目名称:bitcoin-abc,代码行数:7,代码来源:abc-replay-protection.py
示例6: run_test
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
# Start up network handling in another thread
NetworkThread().start()
# Set the blocksize to 2MB as initial condition
self.nodes[0].setexcessiveblock(self.excessive_block_size)
self.test.run()
开发者ID:CommerciumBlockchain,项目名称:Commercium_Deprecated,代码行数:8,代码来源:abc-p2p-fullblocktest.py
示例7: FullBlockTest
class FullBlockTest(ComparisonTestFramework):
# Can either run this test as 1 node with expected answers, or two and compare them.
# Change the "outcome" variable from each TestInstance object to only do
# the comparison.
def __init__(self):
super().__init__()
self.num_nodes = 1
self.block_heights = {}
self.coinbase_key = CECKey()
self.coinbase_key.set_secretbytes(b"fatstacks")
self.coinbase_pubkey = self.coinbase_key.get_pubkey()
self.tip = None
self.blocks = {}
self.excessive_block_size = 16 * ONE_MEGABYTE
self.extra_args = [['-norelaypriority',
'-whitelist=127.0.0.1',
'-limitancestorcount=9999',
'-limitancestorsize=9999',
'-limitdescendantcount=9999',
'-limitdescendantsize=9999',
'-maxmempool=999',
"-excessiveblocksize=%d"
% self.excessive_block_size]]
def add_options(self, parser):
super().add_options(parser)
parser.add_option(
"--runbarelyexpensive", dest="runbarelyexpensive", default=True)
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
# Start up network handling in another thread
NetworkThread().start()
# Set the blocksize to 2MB as initial condition
self.nodes[0].setexcessiveblock(self.excessive_block_size)
self.test.run()
def add_transactions_to_block(self, block, tx_list):
[tx.rehash() for tx in tx_list]
block.vtx.extend(tx_list)
# this is a little handier to use than the version in blocktools.py
def create_tx(self, spend_tx, n, value, script=CScript([OP_TRUE])):
tx = create_transaction(spend_tx, n, b"", value, script)
return tx
# sign a transaction, using the key we know about
# this signs input 0 in tx, which is assumed to be spending output n in
# spend_tx
def sign_tx(self, tx, spend_tx, n):
scriptPubKey = bytearray(spend_tx.vout[n].scriptPubKey)
if (scriptPubKey[0] == OP_TRUE): # an anyone-can-spend
tx.vin[0].scriptSig = CScript()
return
sighash = SignatureHashForkId(
spend_tx.vout[n].scriptPubKey, tx, 0, SIGHASH_ALL | SIGHASH_FORKID, spend_tx.vout[n].nValue)
tx.vin[0].scriptSig = CScript(
[self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))])
def create_and_sign_transaction(self, spend_tx, n, value, script=CScript([OP_TRUE])):
tx = self.create_tx(spend_tx, n, value, script)
self.sign_tx(tx, spend_tx, n)
tx.rehash()
return tx
def next_block(self, number, spend=None, additional_coinbase_value=0, script=None, extra_sigops=0, block_size=0, solve=True):
"""
Create a block on top of self.tip, and advance self.tip to point to the new block
if spend is specified, then 1 satoshi will be spent from that to an anyone-can-spend
output, and rest will go to fees.
"""
if self.tip == None:
base_block_hash = self.genesis_hash
block_time = int(time.time()) + 1
else:
base_block_hash = self.tip.sha256
block_time = self.tip.nTime + 1
# First create the coinbase
height = self.block_heights[base_block_hash] + 1
coinbase = create_coinbase(height, self.coinbase_pubkey)
coinbase.vout[0].nValue += additional_coinbase_value
if (spend != None):
coinbase.vout[0].nValue += spend.tx.vout[
spend.n].nValue - 1 # all but one satoshi to fees
coinbase.rehash()
block = create_block(base_block_hash, coinbase, block_time)
spendable_output = None
if (spend != None):
tx = CTransaction()
# no signature yet
tx.vin.append(
CTxIn(COutPoint(spend.tx.sha256, spend.n), b"", 0xffffffff))
# We put some random data into the first transaction of the chain
# to randomize ids
tx.vout.append(
CTxOut(0, CScript([random.randint(0, 255), OP_DROP, OP_TRUE])))
if script == None:
#.........这里部分代码省略.........
开发者ID:CommerciumBlockchain,项目名称:Commercium_Deprecated,代码行数:101,代码来源:abc-p2p-fullblocktest.py
示例8: run_test
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
NetworkThread().start() # Start up network handling in another thread
self.test.run()
开发者ID:KrzysiekJ,项目名称:bitcoinxt,代码行数:5,代码来源:bip9-softforks.py
示例9: BIP9SoftForksTest
class BIP9SoftForksTest(ComparisonTestFramework):
def __init__(self):
self.num_nodes = 1
def setup_network(self):
self.nodes = start_nodes(1, self.options.tmpdir,
extra_args=[['-debug', '-whitelist=127.0.0.1']],
binary=[self.options.testbinary])
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
NetworkThread().start() # Start up network handling in another thread
self.test.run()
def create_transaction(self, node, coinbase, to_address, amount):
from_txid = node.getblock(coinbase)['tx'][0]
inputs = [{ "txid" : from_txid, "vout" : 0}]
outputs = { to_address : amount }
rawtx = node.createrawtransaction(inputs, outputs)
tx = CTransaction()
f = cStringIO.StringIO(unhexlify(rawtx))
tx.deserialize(f)
tx.nVersion = 2
return tx
def sign_transaction(self, node, tx):
signresult = node.signrawtransaction(hexlify(tx.serialize()))
tx = CTransaction()
f = cStringIO.StringIO(unhexlify(signresult['hex']))
tx.deserialize(f)
return tx
def generate_blocks(self, number, version, test_blocks = []):
for i in xrange(number):
block = create_block(self.tip, create_coinbase(absoluteHeight=self.height), self.last_block_time + 1)
block.nVersion = version
block.rehash()
block.solve()
test_blocks.append([block, True])
self.last_block_time += 1
self.tip = block.sha256
self.height += 1
return test_blocks
def get_bip9_status(self, key):
info = self.nodes[0].getblockchaininfo()
for row in info['bip9_softforks']:
if row['id'] == key:
return row
raise IndexError ('key:"%s" not found' % key)
def test_BIP(self, bipName, activated_version, invalidate, invalidatePostSignature):
# generate some coins for later
self.coinbase_blocks = self.nodes[0].generate(2)
self.height = 3 # height of the next block to build
self.tip = int ("0x" + self.nodes[0].getbestblockhash() + "L", 0)
self.nodeaddress = self.nodes[0].getnewaddress()
self.last_block_time = int(time.time())
assert_equal(self.get_bip9_status(bipName)['status'], 'defined')
# Test 1
# Advance from DEFINED to STARTED
test_blocks = self.generate_blocks(141, 4)
yield TestInstance(test_blocks, sync_every_block=False)
assert_equal(self.get_bip9_status(bipName)['status'], 'started')
# Test 2
# Fail to achieve LOCKED_IN 100 out of 144 signal bit 1
# using a variety of bits to simulate multiple parallel softforks
test_blocks = self.generate_blocks(50, activated_version) # 0x20000001 (signalling ready)
test_blocks = self.generate_blocks(20, 4, test_blocks) # 0x00000004 (signalling not)
test_blocks = self.generate_blocks(50, activated_version, test_blocks) # 0x20000101 (signalling ready)
test_blocks = self.generate_blocks(24, 4, test_blocks) # 0x20010000 (signalling not)
yield TestInstance(test_blocks, sync_every_block=False)
assert_equal(self.get_bip9_status(bipName)['status'], 'started')
# Test 3
# 108 out of 144 signal bit 1 to achieve LOCKED_IN
# using a variety of bits to simulate multiple parallel softforks
test_blocks = self.generate_blocks(58, activated_version) # 0x20000001 (signalling ready)
test_blocks = self.generate_blocks(26, 4, test_blocks) # 0x00000004 (signalling not)
test_blocks = self.generate_blocks(50, activated_version, test_blocks) # 0x20000101 (signalling ready)
test_blocks = self.generate_blocks(10, 4, test_blocks) # 0x20010000 (signalling not)
yield TestInstance(test_blocks, sync_every_block=False)
assert_equal(self.get_bip9_status(bipName)['status'], 'locked_in')
# Test 4
# 143 more version 536870913 blocks (waiting period-1)
test_blocks = self.generate_blocks(143, 4)
yield TestInstance(test_blocks, sync_every_block=False)
assert_equal(self.get_bip9_status(bipName)['status'], 'locked_in')
#.........这里部分代码省略.........
开发者ID:KrzysiekJ,项目名称:bitcoinxt,代码行数:101,代码来源:bip9-softforks.py
示例10: FullBlockTest
class FullBlockTest(ComparisonTestFramework):
# Can either run this test as 1 node with expected answers, or two and compare them.
# Change the "outcome" variable from each TestInstance object to only do
# the comparison.
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.block_heights = {}
self.coinbase_key = CECKey()
self.coinbase_key.set_secretbytes(b"horsebattery")
self.coinbase_pubkey = self.coinbase_key.get_pubkey()
self.tip = None
self.blocks = {}
def setup_network(self):
self.extra_args = [['-norelaypriority']]
self.add_nodes(self.num_nodes, self.extra_args)
self.start_nodes()
def add_options(self, parser):
super().add_options(parser)
parser.add_option(
"--runbarelyexpensive", dest="runbarelyexpensive", default=True)
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
# Start up network handling in another thread
NetworkThread().start()
self.test.run()
def add_transactions_to_block(self, block, tx_list):
[tx.rehash() for tx in tx_list]
block.vtx.extend(tx_list)
# this is a little handier to use than the version in blocktools.py
def create_tx(self, spend_tx, n, value, script=CScript([OP_TRUE])):
tx = create_transaction(spend_tx, n, b"", value, script)
return tx
# sign a transaction, using the key we know about
# this signs input 0 in tx, which is assumed to be spending output n in
# spend_tx
def sign_tx(self, tx, spend_tx, n):
scriptPubKey = bytearray(spend_tx.vout[n].scriptPubKey)
if (scriptPubKey[0] == OP_TRUE): # an anyone-can-spend
tx.vin[0].scriptSig = CScript()
return
sighash = SignatureHashForkId(
spend_tx.vout[n].scriptPubKey, tx, 0, SIGHASH_ALL | SIGHASH_FORKID, spend_tx.vout[n].nValue)
tx.vin[0].scriptSig = CScript(
[self.coinbase_key.sign(sighash) + bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))])
def create_and_sign_transaction(self, spend_tx, n, value, script=CScript([OP_TRUE])):
tx = self.create_tx(spend_tx, n, value, script)
self.sign_tx(tx, spend_tx, n)
tx.rehash()
return tx
def next_block(self, number, spend=None, additional_coinbase_value=0, script=CScript([OP_TRUE])):
if self.tip == None:
base_block_hash = self.genesis_hash
block_time = int(time.time()) + 1
else:
base_block_hash = self.tip.sha256
block_time = self.tip.nTime + 1
# First create the coinbase
height = self.block_heights[base_block_hash] + 1
coinbase = create_coinbase(height, self.coinbase_pubkey)
coinbase.vout[0].nValue += additional_coinbase_value
coinbase.rehash()
if spend == None:
block = create_block(base_block_hash, coinbase, block_time)
else:
# all but one satoshi to fees
coinbase.vout[0].nValue += spend.tx.vout[
spend.n].nValue - 1
coinbase.rehash()
block = create_block(base_block_hash, coinbase, block_time)
# spend 1 satoshi
tx = create_transaction(spend.tx, spend.n, b"", 1, script)
self.sign_tx(tx, spend.tx, spend.n)
self.add_transactions_to_block(block, [tx])
block.hashMerkleRoot = block.calc_merkle_root()
# Do PoW, which is very inexpensive on regnet
block.solve()
self.tip = block
self.block_heights[block.sha256] = height
assert number not in self.blocks
self.blocks[number] = block
return block
def get_tests(self):
self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
self.block_heights[self.genesis_hash] = 0
spendable_outputs = []
# save the current tip so it can be spent by a later block
#.........这里部分代码省略.........
开发者ID:a7853z,项目名称:bitcoin-abc,代码行数:101,代码来源:abc-mempool-accept-txn.py
示例11: BIP9SoftForksTest
class BIP9SoftForksTest(ComparisonTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.extra_args = [['-whitelist=127.0.0.1']]
self.setup_clean_chain = True
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
network_thread_start()
self.test.run()
def create_transaction(self, node, coinbase, to_address, amount):
from_txid = node.getblock(coinbase)['tx'][0]
inputs = [{ "txid" : from_txid, "vout" : 0}]
outputs = { to_address : amount }
rawtx = node.createrawtransaction(inputs, outputs)
tx = CTransaction()
f = BytesIO(hex_str_to_bytes(rawtx))
tx.deserialize(f)
tx.nVersion = 2
return tx
def sign_transaction(self, node, tx):
signresult = node.signrawtransaction(bytes_to_hex_str(tx.serialize()))
tx = CTransaction()
f = BytesIO(hex_str_to_bytes(signresult['hex']))
tx.deserialize(f)
return tx
def generate_blocks(self, number, version, test_blocks = []):
for i in range(number):
block = create_block(self.tip, create_coinbase(self.height), self.last_block_time + 1)
block.nVersion = version
block.rehash()
block.solve()
test_blocks.append([block, True])
self.last_block_time += 1
self.tip = block.sha256
self.height += 1
return test_blocks
def get_bip9_status(self, key):
info = self.nodes[0].getblockchaininfo()
return info['bip9_softforks'][key]
def test_BIP(self, bipName, activated_version, invalidate, invalidatePostSignature, bitno):
assert_equal(self.get_bip9_status(bipName)['status'], 'defined')
assert_equal(self.get_bip9_status(bipName)['since'], 0)
# generate some coins for later
self.coinbase_blocks = self.nodes[0].generate(2)
self.height = 3 # height of the next block to build
self.tip = int("0x" + self.nodes[0].getbestblockhash(), 0)
self.nodeaddress = self.nodes[0].getnewaddress()
self.last_block_time = int(time.time())
assert_equal(self.get_bip9_status(bipName)['status'], 'defined')
assert_equal(self.get_bip9_status(bipName)['since'], 0)
tmpl = self.nodes[0].getblocktemplate({})
assert(bipName not in tmpl['rules'])
assert(bipName not in tmpl['vbavailable'])
assert_equal(tmpl['vbrequired'], 0)
assert_equal(tmpl['version'], 0x20000000)
# Test 1
# Advance from DEFINED to STARTED
test_blocks = self.generate_blocks(141, 4)
yield TestInstance(test_blocks, sync_every_block=False)
assert_equal(self.get_bip9_status(bipName)['status'], 'started')
assert_equal(self.get_bip9_status(bipName)['since'], 144)
assert_equal(self.get_bip9_status(bipName)['statistics']['elapsed'], 0)
assert_equal(self.get_bip9_status(bipName)['statistics']['count'], 0)
tmpl = self.nodes[0].getblocktemplate({})
assert(bipName not in tmpl['rules'])
assert_equal(tmpl['vbavailable'][bipName], bitno)
assert_equal(tmpl['vbrequired'], 0)
assert(tmpl['version'] & activated_version)
# Test 1-A
# check stats after max number of "signalling not" blocks such that LOCKED_IN still possible this period
test_blocks = self.generate_blocks(36, 4, test_blocks) # 0x00000004 (signalling not)
test_blocks = self.generate_blocks(10, activated_version) # 0x20000001 (signalling ready)
yield TestInstance(test_blocks, sync_every_block=False)
assert_equal(self.get_bip9_status(bipName)['statistics']['elapsed'], 46)
assert_equal(self.get_bip9_status(bipName)['statistics']['count'], 10)
assert_equal(self.get_bip9_status(bipName)['statistics']['possible'], True)
# Test 1-B
# check stats after one additional "signalling not" block -- LOCKED_IN no longer possible this period
test_blocks = self.generate_blocks(1, 4, test_blocks) # 0x00000004 (signalling not)
yield TestInstance(test_blocks, sync_every_block=False)
assert_equal(self.get_bip9_status(bipName)['statistics']['elapsed'], 47)
assert_equal(self.get_bip9_status(bipName)['statistics']['count'], 10)
assert_equal(self.get_bip9_status(bipName)['statistics']['possible'], False)
# Test 1-C
#.........这里部分代码省略.........
开发者ID:doriancoins,项目名称:doriancoin,代码行数:101,代码来源:feature_bip9_softforks.py
示例12: run_test
def run_test(self):
test = TestManager(self, self.options.tmpdir)
# Don't call test.add_all_connections because there is only one node.
NetworkThread().start() # Start up network handling in another thread
test.run()
开发者ID:emreozkartal,项目名称:zcash,代码行数:5,代码来源:bipdersig-p2p.py
示例13: FullBlockTest
class FullBlockTest(ComparisonTestFramework):
# Can either run this test as 1 node with expected answers, or two and compare them.
# Change the "outcome" variable from each TestInstance object to only do
# the comparison.
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.block_heights = {}
self.tip = None
self.blocks = {}
self.excessive_block_size = 100 * ONE_MEGABYTE
self.extra_args = [['-whitelist=127.0.0.1',
"-monolithactivationtime=%d" % MONOLITH_START_TIME,
"-replayprotectionactivationtime=%d" % (
2 * MONOLITH_START_TIME),
"-excessiveblocksize=%d"
% self.excessive_block_size]]
def add_options(self, parser):
super().add_options(parser)
parser.add_option(
"--runbarelyexpensive", dest="runbarelyexpensive", default=True)
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
# Start up network handling in another thread
NetworkThread().start()
# Set the blocksize to 2MB as initial condition
self.nodes[0].setexcessiveblock(self.excessive_block_size)
self.nodes[0].setmocktime(MONOLITH_START_TIME)
self.test.run()
def add_transactions_to_block(self, block, tx_list):
[tx.rehash() for tx in tx_list]
block.vtx.extend(tx_list)
# this is a little handier to use than the version in blocktools.py
def create_tx(self, spend, value, script=CScript([OP_TRUE])):
tx = create_transaction(spend.tx, spend.n, b"", value, script)
return tx
def next_block(self, number, spend=None, script=CScript([OP_TRUE]), block_size=0, extra_sigops=0):
if self.tip == None:
base_block_hash = self.genesis_hash
block_time = int(time.time()) + 1
else:
base_block_hash = self.tip.sha256
block_time = self.tip.nTime + 1
# First create the coinbase
height = self.block_heights[base_block_hash] + 1
coinbase = create_coinbase(height)
coinbase.rehash()
if spend == None:
# We need to have something to spend to fill the block.
assert_equal(block_size, 0)
block = create_block(base_block_hash, coinbase, block_time)
else:
# all but one satoshi to fees
coinbase.vout[0].nValue += spend.tx.vout[spend.n].nValue - 1
coinbase.rehash()
block = create_block(base_block_hash, coinbase, block_time)
# Make sure we have plenty engough to spend going forward.
spendable_outputs = deque([spend])
def get_base_transaction():
# Create the new transaction
tx = CTransaction()
# Spend from one of the spendable outputs
spend = spendable_outputs.popleft()
tx.vin.append(CTxIn(COutPoint(spend.tx.sha256, spend.n)))
# Add spendable outputs
for i in range(4):
tx.vout.append(CTxOut(0, CScript([OP_TRUE])))
spendable_outputs.append(PreviousSpendableOutput(tx, i))
return tx
tx = get_base_transaction()
# Make it the same format as transaction added for padding and save the size.
# It's missing the padding output, so we add a constant to account for it.
tx.rehash()
base_tx_size = len(tx.serialize()) + 18
# If a specific script is required, add it.
if script != None:
tx.vout.append(CTxOut(1, script))
# Put some random data into the first transaction of the chain to randomize ids.
tx.vout.append(
CTxOut(0, CScript([random.randint(0, 256), OP_RETURN])))
# Add the transaction to the block
self.add_transactions_to_block(block, [tx])
# If we have a block size requirement, just fill
# the block until we get there
#.........这里部分代码省略.........
开发者ID:a7853z,项目名称:bitcoin-abc,代码行数:101,代码来源:abc-p2p-fullblocktest.py
示例14: ReplayProtectionTest
class ReplayProtectionTest(ComparisonTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.block_heights = {}
self.tip = None
self.blocks = {}
self.extra_args = [['-whitelist=127.0.0.1',
"-replayprotectionactivationtime=%d" % REPLAY_PROTECTION_START_TIME]]
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
# Start up network handling in another thread
NetworkThread().start()
self.nodes[0].setmocktime(REPLAY_PROTECTION_START_TIME)
self.test.run()
def next_block(self, number):
if self.tip == None:
base_block_hash = self.genesis_hash
block_time = int(time.time()) + 1
else:
base_block_hash = self.tip.sha256
block_time = self.tip.nTime + 1
# First create the coinbase
height = self.block_heights[base_block_hash] + 1
coinbase = create_coinbase(height)
coinbase.rehash()
block = create_block(base_block_hash, coinbase, block_time)
# Do PoW, which is cheap on regnet
block.solve()
self.tip = block
self.block_heights[block.sha256] = height
assert number not in self.blocks
self.blocks[number] = block
return block
def get_tests(self):
self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
self.block_heights[self.genesis_hash] = 0
spendable_outputs = []
# save the current tip so it can be spent by a later block
def save_spendable_output():
spendable_outputs.append(self.tip)
# get an output that we previously marked as spendable
def get_spendable_output():
return PreviousSpendableOutput(spendable_outputs.pop(0).vtx[0], 0)
# returns a test case that asserts that the current tip was accepted
def accepted():
return TestInstance([[self.tip, True]])
# returns a test case that asserts that the current tip was rejected
def rejected(reject=None):
if reject is None:
return TestInstance([[self.tip, False]])
else:
return TestInstance([[self.tip, reject]])
# move the tip back to a previous block
def tip(number):
self.tip = self.blocks[number]
# adds transactions to the block and updates state
def update_block(block_number, new_transactions):
[tx.rehash() for tx in new_transactions]
block = self.blocks[block_number]
block.vtx.extend(new_transactions)
old_sha256 = block.sha256
block.hashMerkleRoot = block.calc_merkle_root()
block.solve()
# Update the internal state just like in next_block
self.tip = block
if block.sha256 != old_sha256:
self.block_heights[
block.sha256] = self.block_heights[old_sha256]
del self.block_heights[old_sha256]
self.blocks[block_number] = block
return block
# shorthand for functions
block = self.next_block
node = self.nodes[0]
# Create a new block
block(0)
save_spendable_output()
yield accepted()
# Now we need that block to mature so we can spend the coinbase.
test = TestInstance(sync_every_block=False)
for i in range(99):
block(5000 + i)
test.blocks_and_transactions.append([self.tip, True])
save_spendable_output()
#.........这里部分代码省略.........
开发者ID:a7853z,项目名称:bitcoin-abc,代码行数:101,代码来源:abc-replay-protection.py
示例15: FullBlockTest
class FullBlockTest(ComparisonTestFramework):
# Can either run this test as 1 node with expected answers, or two and compare them.
# Change the "outcome" variable from each TestInstance object to only do
# the comparison.
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.block_heights = {}
self.tip = None
self.blocks = {}
self.excessive_block_size = 16 * ONE_MEGABYTE
self.extra_args = [['-norelaypriority',
'-whitelist=127.0.0.1',
'-limitancestorcount=999999',
'-limitancestorsize=999999',
'-limitdescendantcount=999999',
'-limitdescendantsize=999999',
'-maxmempool=99999',
"-monolithactivationtime=%d" % MONOLITH_START_TIME,
"-excessiveblocksize=%d"
% self.excessive_block_size]]
def add_options(self, parser):
super().add_options(parser)
parser.add_option(
"--runbarelyexpensive", dest="runbarelyexpensive", default=True)
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
# Start up network handling in another thread
NetworkThread().start()
# Set the blocksize to 2MB as initial condition
self.nodes[0].setexcessiveblock(self.excessive_block_size)
self.nodes[0].setmocktime(MONOLITH_START_TIME)
self.test.run()
def add_transactions_to_block(self, block, tx_list):
[tx.rehash() for tx in tx_list]
block.vtx.extend(tx_list)
# this is a little handier to use than the version in blocktools.py
def create_tx(self, spend_tx, n, value, script=CScript([OP_TRUE])):
tx = create_transaction(spend_tx, n, b"", value, script)
return tx
def next_block(self, number, spend=None, script=CScript([OP_TRUE]), block_size=0, extra_txns=0):
if self.tip == None:
base_block_hash = self.genesis_hash
block_time = int(time.time()) + 1
else:
base_block_hash = self.tip.sha256
block_time = self.tip.nTime + 1
# First create the coinbase
height = self.block_heights[base_block_hash] + 1
coinbase = create_coinbase(height)
coinbase.rehash()
if spend == None:
# We need to have something to spend to fill the block.
assert_equal(block_size, 0)
block = create_block(base_block_hash, coinbase, block_time)
else:
# all but one satoshi to fees
coinbase.vout[0].nValue += spend.tx.vout[spend.n].nValue - 1
coinbase.rehash()
block = create_block(base_block_hash, coinbase, block_time)
# Make sure we have plenty engough to spend going forward.
spendable_outputs = deque([spend])
def get_base_transaction():
# Create the new transaction
tx = CTransaction()
# Spend from one of the spendable outputs
spend = spendable_outputs.popleft()
tx.vin.append(CTxIn(COutPoint(spend.tx.sha256, spend.n)))
# Add spendable outputs
for i in range(4):
tx.vout.append(CTxOut(0, CScript([OP_TRUE])))
spendable_outputs.append(PreviousSpendableOutput(tx, i))
return tx
tx = get_base_transaction()
# Make it the same format as transaction added for padding and save the size.
# It's missing the padding output, so we add a constant to account for it.
tx.rehash()
base_tx_size = len(tx.serialize()) + 18
# If a specific script is required, add it.
if script != None:
tx.vout.append(CTxOut(1, script))
# Put some random data into the first transaction of the chain to randomize ids.
tx.vout.append(
CTxOut(0, CScript([random.randint(0, 256), OP_RETURN])))
# Add the transaction to the block
#.........这里部分代码省略.........
开发者ID:a7853z,项目名称:bitcoin-abc,代码行数:101,代码来源:abc-p2p-compactblocks.py
示例16: run_test
def run_test(self):
test = TestManager(self, self.options.tmpdir)
test.add_all_connections(self.nodes)
network_thread_start()
test.run()
开发者ID:admxjx,项目名称:bitcoin,代码行数:5,代码来源:bip68-112-113-p2p.py
示例17: MonolithActivationTest
class MonolithActivationTest(ComparisonTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.extra_args = [['-whitelist=127.0.0.1',
"-monolithactivationtime=%d" % MONOLITH_START_TIME,
"-replayprotectionactivationtime=%d" % (2 * MONOLITH_START_TIME)]]
def create_and_tx(self, count):
node = self.nodes[0]
utxos = node.listunspent()
assert(len(utxos) > 0)
utxo = utxos[0]
tx = CTransaction()
value = int(satoshi_round(
utxo["amount"] - self.relayfee) * COIN) // count
tx.vin = [CTxIn(COutPoint(int(utxo["txid"], 16), utxo["vout"]))]
tx.vout = []
for _ in range(count):
tx.vout.append(CTxOut(value, CScript([OP_1, OP_1, OP_AND])))
tx_signed = node.signrawtransaction(ToHex(tx))["hex"]
return tx_signed
def run_test(self):
self.test = TestManager(self, self.options.tmpdir)
self.test.add_all_connections(self.nodes)
# Start up network handling in another thread
NetworkThread().start()
self.test.run()
def get_tests(self):
node = self.nodes[0]
self.relayfee = self.nodes[0].getnetworkinfo()["relayfee"]
# First, we generate some coins to spend.
node.generate(125)
# Create various outputs using the OP_AND to check for activation.
tx_hex = self.create_and_tx(25)
txid = node.sendrawtransaction(tx_hex)
assert(txid in set(node.getrawmempool()))
node.generate(1)
assert(txid not in set(node.getrawmempool()))
# register the spendable outputs.
tx = FromHex(CTransaction(), tx_hex)
tx.rehash()
spendable_ands = [PreviousSpendableOutput(
tx, i) for i in range(len(tx.vout))]
def spend_and():
outpoint = spendable_ands.pop()
out = outpoint.tx.vout[outpoint.n]
value = int(out.nValue - (self.relayfee * COIN))
tx = CTransaction()
tx.vin = [CTxIn(COutPoint(outpoint.tx.sha256, outpoint.n))]
tx.vout = [CTxOut(value, CScript([]))]
tx.rehash()
return tx
# Check that large opreturn are not accepted yet.
self.log.info("Try to use the monolith opcodes before activation")
tx0 = spend_and()
tx0_hex = ToHex(tx0)
assert_raises_rpc_error(-26, RPC_DISABLED_OPCODE_ERROR,
node.sendrawtransaction, tx0_hex)
# Push MTP forward just before activation.
self.log.info("Pushing MTP just before the activation and check again")
node.setmocktime(MONOLITH_START_TIME)
# returns a test case that asserts that the current tip was accepted
def accepted(tip):
return TestInstance([[tip, True]])
# returns a test case that asserts that the current tip was rejected
def rejected(tip, reject=None):
if reject is None:
return TestInstance([[tip, False]])
else:
return TestInstance([[tip, reject]])
def next_block(block_time):
# get block height
blockchaininfo = node.getblockchaininfo()
height = int(blockchaininfo['blocks'])
# create the block
coinbase = create_coinbase(height)
coinbase.rehash()
block = create_block(
int(node.getbestblockhash(), 16), coinbase, block_time)
# Do PoW, which is cheap on regnet
block.solve()
return block
#.........这里部分代码省略.........
开发者ID:a7853z,项目名称:bitcoin-abc,代码行数:101,代码来源:abc-monolith-activation.py
注:本文中的test_framework.comptool.TestManager类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论