本文整理汇总了Python中test_common.lineno函数的典型用法代码示例。如果您正苦于以下问题:Python lineno函数的具体用法?Python lineno怎么用?Python lineno使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了lineno函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: check_bgp_asn
def check_bgp_asn(self, fq_name, asn):
router = self._vnc_lib.bgp_router_read(fq_name)
params = router.get_bgp_router_parameters()
if not params:
print "retrying ... ", test_common.lineno()
raise Exception('bgp params is None for %s' % fq_name)
self.assertEqual(params.get_autonomous_system(), asn)
开发者ID:Pojen-Huang,项目名称:contrail-controller,代码行数:7,代码来源:test_bgp.py
示例2: test_policy_in_policy
def test_policy_in_policy(self):
vn1_name = 'vn1'
vn2_name = 'vn2'
vn3_name = 'vn3'
vn1_obj = VirtualNetwork(vn1_name)
vn2_obj = VirtualNetwork(vn2_name)
np1 = self.create_network_policy(vn1_obj, vn2_obj)
np2 = self.create_network_policy(vn2_obj, vn1_obj)
np1.network_policy_entries.policy_rule[0].dst_addresses[0].virtual_network = None
np1.network_policy_entries.policy_rule[0].dst_addresses[0].network_policy = np2.get_fq_name_str()
np1.set_network_policy_entries(np1.network_policy_entries)
self._vnc_lib.network_policy_update(np1)
np2.network_policy_entries.policy_rule[0].src_addresses[0].virtual_network = 'local'
np2.set_network_policy_entries(np1.network_policy_entries)
self._vnc_lib.network_policy_update(np2)
seq = SequenceType(1, 1)
vnp = VirtualNetworkPolicyType(seq)
vn1_obj.set_network_policy(np1, vnp)
vn2_obj.set_network_policy(np2, vnp)
vn1_uuid = self._vnc_lib.virtual_network_create(vn1_obj)
vn2_uuid = self._vnc_lib.virtual_network_create(vn2_obj)
try:
self.check_ri_state_vn_policy(fq_name=[u'default-domain', u'default-project', 'vn1', 'vn1'],
to_fq_name=[u'default-domain', u'default-project', u'vn2', u'vn2'])
except NoIdError, e:
print "failed : routing instance state is not correct... ", test_common.lineno()
self.assertTrue(False)
开发者ID:Semihalf,项目名称:contrail-controller,代码行数:31,代码来源:test_service.py
示例3: check_lr_asn
def check_lr_asn(self, fq_name, rt_target):
router = self._vnc_lib.logical_router_read(fq_name)
rt_refs = router.get_route_target_refs()
if not rt_refs:
print "retrying ... ", test_common.lineno()
raise Exception('ri_refs is None for %s' % fq_name)
self.assertEqual(rt_refs[0]['to'][0], rt_target)
开发者ID:Pojen-Huang,项目名称:contrail-controller,代码行数:7,代码来源:test_bgp.py
示例4: test_basic_policy
def test_basic_policy(self):
vn1_name = 'vn1'
vn2_name = 'vn2'
vn1_obj = VirtualNetwork(vn1_name)
vn2_obj = VirtualNetwork(vn2_name)
np = self.create_network_policy(vn1_obj, vn2_obj)
seq = SequenceType(1, 1)
vnp = VirtualNetworkPolicyType(seq)
vn1_obj.set_network_policy(np, vnp)
vn2_obj.set_network_policy(np, vnp)
vn1_uuid = self._vnc_lib.virtual_network_create(vn1_obj)
vn2_uuid = self._vnc_lib.virtual_network_create(vn2_obj)
#import pdb; pdb.set_trace()
for obj in [vn1_obj, vn2_obj]:
ident_name = self.get_obj_imid(obj)
gevent.sleep(2)
ifmap_ident = self.assertThat(FakeIfmapClient._graph, Contains(ident_name))
try:
self.check_ri_state_vn_policy(fq_name=[u'default-domain', u'default-project', 'vn1', 'vn1'],
to_fq_name=[u'default-domain', u'default-project', u'vn2', u'vn2'])
except NoIdError, e:
print "failed : routing instance state is not correct... ", test_common.lineno()
self.assertTrue(False)
开发者ID:Semihalf,项目名称:contrail-controller,代码行数:26,代码来源:test_service.py
示例5: check_lr_is_deleted
def check_lr_is_deleted(self, uuid):
try:
self._vnc_lib.logical_router_read(id=uuid)
print "retrying ... ", test_common.lineno()
raise Exception('logical router %s still exists' % uuid)
except NoIdError:
print 'lr deleted'
开发者ID:Pojen-Huang,项目名称:contrail-controller,代码行数:7,代码来源:test_bgp.py
示例6: check_ri_is_deleted
def check_ri_is_deleted(self, fq_name):
try:
self._vnc_lib.routing_instance_read(fq_name)
print "retrying ... ", test_common.lineno()
raise Exception('routing instance %s still exists' % fq_name)
except NoIdError:
print 'ri deleted'
开发者ID:GIC-de,项目名称:contrail-controller,代码行数:7,代码来源:test_service.py
示例7: check_service_chain_prefix_match
def check_service_chain_prefix_match(self, fq_name, prefix):
ri = self._vnc_lib.routing_instance_read(fq_name)
sci = ri.get_service_chain_information()
if sci is None:
print "retrying ... ", test_common.lineno()
raise Exception('Service chain info not found for %s' % fq_name)
self.assertEqual(sci.prefix[0], prefix)
开发者ID:GIC-de,项目名称:contrail-controller,代码行数:7,代码来源:test_service.py
示例8: check_ri_state_vn_policy
def check_ri_state_vn_policy(self, fq_name, to_fq_name):
ri = self._vnc_lib.routing_instance_read(fq_name)
ri_refs = ri.get_routing_instance_refs()
if not ri_refs:
print "retrying ... ", test_common.lineno()
raise Exception('ri_refs is None for %s' % fq_name)
self.assertEqual(ri_refs[0]['to'], to_fq_name)
开发者ID:GIC-de,项目名称:contrail-controller,代码行数:7,代码来源:test_service.py
示例9: test_policy_with_cidr
def test_policy_with_cidr(self):
vn1 = self.create_virtual_network("vn1", "10.1.1.0/24")
vn2 = self.create_virtual_network("vn2", "10.2.1.0/24")
rules = []
rule1 = { "protocol": "icmp",
"direction": "<>",
"src-port": "any",
"src": {"type": "vn", "value": vn1},
"dst": {"type": "cidr", "value": "10.2.1.2/32"},
"dst-port": "any",
"action": "deny"
}
rules.append(rule1)
np = self.create_network_policy_with_multiple_rules(rules)
seq = SequenceType(1, 1)
vnp = VirtualNetworkPolicyType(seq)
vn1.set_network_policy(np, vnp)
self._vnc_lib.virtual_network_update(vn1)
for obj in [vn1]:
ident_name = self.get_obj_imid(obj)
ifmap_ident = self.assertThat(FakeIfmapClient._graph, Contains(ident_name))
try:
self.check_vn_ri_state(fq_name=[u'default-domain', u'default-project', 'vn1', 'vn1'])
except NoIdError, e:
print "failed : Routing instance state is not correct... ", test_common.lineno()
self.assertTrue(False)
开发者ID:Semihalf,项目名称:contrail-controller,代码行数:30,代码来源:test_service.py
示例10: check_vn_is_deleted
def check_vn_is_deleted(self, uuid):
try:
self._vnc_lib.virtual_network_read(id=uuid)
print "retrying ... ", test_common.lineno()
raise Exception('virtual network %s still exists' % uuid)
except NoIdError:
print 'vn deleted'
开发者ID:GIC-de,项目名称:contrail-controller,代码行数:7,代码来源:test_service.py
示例11: check_rt_is_deleted
def check_rt_is_deleted(self, name):
try:
rt_obj = self._vnc_lib.route_target_read(fq_name=[name])
print "retrying ... ", test_common.lineno()
raise Exception(
'rt %s still exists: RI backrefs %s LR backrefs %s' % (
name, rt_obj.get_routing_instance_back_refs(),
rt_obj.get_logical_router_back_refs()))
except NoIdError:
print 'rt deleted'
开发者ID:rombie,项目名称:contrail-controller,代码行数:10,代码来源:test_route_target.py
示例12: check_ri_asn
def check_ri_asn(self, fq_name, rt_target):
ri = self._vnc_lib.routing_instance_read(fq_name)
rt_refs = ri.get_route_target_refs()
if not rt_refs:
print "retrying ... ", test_common.lineno()
raise Exception('ri_refs is None for %s' % fq_name)
for rt_ref in rt_refs:
if rt_ref['to'][0] == rt_target:
return
raise Exception('rt_target %s not found in ri %s' % (rt_target, fq_name))
开发者ID:Pojen-Huang,项目名称:contrail-controller,代码行数:10,代码来源:test_bgp.py
示例13: test_vn_delete
def test_vn_delete(self):
vn = self.create_virtual_network("vn", "10.1.1.0/24")
gevent.sleep(2)
for obj in [vn]:
ident_name = self.get_obj_imid(obj)
ifmap_ident = self.assertThat(FakeIfmapClient._graph, Contains(ident_name))
try:
self.check_vn_ri_state(fq_name=[u'default-domain', u'default-project', 'vn', 'vn'])
except NoIdError, e:
print "failed : routing instance state is not created ... ", test_common.lineno()
self.assertTrue(False)
开发者ID:Semihalf,项目名称:contrail-controller,代码行数:13,代码来源:test_service.py
示例14: check_ri_rt_state_vn_policy
def check_ri_rt_state_vn_policy(self, fq_name, to_fq_name, expect_to_find):
ri = self._vnc_lib.routing_instance_read(fq_name)
rt_refs = ri.get_route_target_refs()
if not rt_refs:
print "retrying ... ", test_common.lineno()
raise Exception('ri_refs is None for %s' % fq_name)
found = False
for rt_ref in rt_refs:
rt_obj = self._vnc_lib.route_target_read(id=rt_ref['uuid'])
ri_refs = rt_obj.get_routing_instance_back_refs()
for ri_ref in ri_refs:
if ri_ref['to'] == to_fq_name:
found = True
break
if found == True:
break
self.assertTrue(found == expect_to_find)
开发者ID:GIC-de,项目名称:contrail-controller,代码行数:18,代码来源:test_service.py
示例15: test_multiple_policy
def test_multiple_policy(self):
vn1_name = 'vn1'
vn2_name = 'vn2'
vn1_obj = VirtualNetwork(vn1_name)
vn2_obj = VirtualNetwork(vn2_name)
np1 = self.create_network_policy(vn1_obj, vn2_obj)
np2 = self.create_network_policy(vn2_obj, vn1_obj)
seq = SequenceType(1, 1)
vnp = VirtualNetworkPolicyType(seq)
vn1_obj.set_network_policy(np1, vnp)
vn2_obj.set_network_policy(np2, vnp)
vn1_uuid = self._vnc_lib.virtual_network_create(vn1_obj)
vn2_uuid = self._vnc_lib.virtual_network_create(vn2_obj)
try:
self.check_ri_state_vn_policy(fq_name=[u'default-domain', u'default-project', 'vn1', 'vn1'],
to_fq_name=[u'default-domain', u'default-project', u'vn2', u'vn2'])
except NoIdError, e:
print "failed : routing instance state is not correct... ", test_common.lineno()
self.assertTrue(False)
开发者ID:Semihalf,项目名称:contrail-controller,代码行数:21,代码来源:test_service.py
示例16: test_multiple_policy
def test_multiple_policy(self):
vn1_name = 'vn1'
vn2_name = 'vn2'
vn1_obj = VirtualNetwork(vn1_name)
vn2_obj = VirtualNetwork(vn2_name)
np1 = self.create_network_policy(vn1_obj, vn2_obj)
np2 = self.create_network_policy(vn2_obj, vn1_obj)
seq = SequenceType(1, 1)
vnp = VirtualNetworkPolicyType(seq)
vn1_obj.set_network_policy(np1, vnp)
vn2_obj.set_network_policy(np2, vnp)
vn1_uuid = self._vnc_lib.virtual_network_create(vn1_obj)
vn2_uuid = self._vnc_lib.virtual_network_create(vn2_obj)
while True:
gevent.sleep(2)
try:
ri = self._vnc_lib.routing_instance_read(
fq_name=[u'default-domain', u'default-project', 'vn1', 'vn1'])
except NoIdError:
print "retrying ... ", test_common.lineno()
continue
ri_refs = ri.get_routing_instance_refs()
if ri_refs:
self.assertEqual(
ri_refs[0]['to'],
[u'default-domain', u'default-project', u'vn2', u'vn2'])
break
print "retrying ... ", test_common.lineno()
# end while True
while True:
try:
ri = self._vnc_lib.routing_instance_read(
fq_name=[u'default-domain', u'default-project',
'vn2', 'vn2'])
except NoIdError:
gevent.sleep(2)
print "retrying ... ", test_common.lineno()
continue
ri_refs = ri.get_routing_instance_refs()
if ri_refs:
self.assertEqual(
ri_refs[0]['to'],
[u'default-domain', u'default-project', u'vn1', u'vn1'])
break
print "retrying ... ", test_common.lineno()
gevent.sleep(2)
# end while True
np1.network_policy_entries.policy_rule[0].action_list.simple_action = 'deny'
np1.set_network_policy_entries(np1.network_policy_entries)
self._vnc_lib.network_policy_update(np1)
while True:
gevent.sleep(2)
if ('contrail:connection contrail:routing-instance:default-domain:default-project:vn2:vn2' in
FakeIfmapClient._graph['contrail:routing-instance:default-domain:default-project:vn1:vn1']['links']):
print "retrying ... ", test_common.lineno()
continue
break
np1.network_policy_entries.policy_rule[0].action_list.simple_action = 'pass'
np1.set_network_policy_entries(np1.network_policy_entries)
self._vnc_lib.network_policy_update(np1)
np2.network_policy_entries.policy_rule[0].action_list.simple_action = 'deny'
np2.set_network_policy_entries(np2.network_policy_entries)
self._vnc_lib.network_policy_update(np2)
while True:
gevent.sleep(2)
if ('contrail:connection contrail:routing-instance:default-domain:default-project:vn2:vn2' in
FakeIfmapClient._graph['contrail:routing-instance:default-domain:default-project:vn1:vn1']['links']):
print "retrying ... ", test_common.lineno()
continue
break
vn1_obj.del_network_policy(np1)
vn2_obj.del_network_policy(np2)
self._vnc_lib.virtual_network_update(vn1_obj)
self._vnc_lib.virtual_network_update(vn2_obj)
while True:
ri = self._vnc_lib.routing_instance_read(
fq_name=[u'default-domain', u'default-project', 'vn2', 'vn2'])
ri_refs = ri.get_routing_instance_refs()
if ri_refs:
gevent.sleep(2)
else:
break
print "retrying ... ", test_common.lineno()
# end while True
self.delete_network_policy(np1)
self.delete_network_policy(np2)
self._vnc_lib.virtual_network_delete(fq_name=vn1_obj.get_fq_name())
self._vnc_lib.virtual_network_delete(fq_name=vn2_obj.get_fq_name())
while True:
#.........这里部分代码省略.........
开发者ID:chrigl,项目名称:contrail-controller,代码行数:101,代码来源:test_service.py
示例17: check_ri_refs_are_deleted
def check_ri_refs_are_deleted(self, fq_name):
ri = self._vnc_lib.routing_instance_read(fq_name)
ri_refs = ri.get_routing_instance_refs()
if ri_refs:
print "retrying ... ", test_common.lineno()
raise Exception('ri_refs still exist for %s' % fq_name)
开发者ID:GIC-de,项目名称:contrail-controller,代码行数:6,代码来源:test_service.py
示例18: test_policy_in_policy
def test_policy_in_policy(self):
vn1_name = 'vn1'
vn2_name = 'vn2'
vn3_name = 'vn3'
vn1_obj = VirtualNetwork(vn1_name)
vn2_obj = VirtualNetwork(vn2_name)
np1 = self.create_network_policy(vn1_obj, vn2_obj)
np2 = self.create_network_policy(vn2_obj, vn1_obj)
np1.network_policy_entries.policy_rule[0].dst_addresses[0].virtual_network = None
np1.network_policy_entries.policy_rule[0].dst_addresses[0].network_policy = np2.get_fq_name_str()
np1.set_network_policy_entries(np1.network_policy_entries)
self._vnc_lib.network_policy_update(np1)
np2.network_policy_entries.policy_rule[0].src_addresses[0].virtual_network = 'local'
np2.set_network_policy_entries(np1.network_policy_entries)
self._vnc_lib.network_policy_update(np2)
seq = SequenceType(1, 1)
vnp = VirtualNetworkPolicyType(seq)
vn1_obj.set_network_policy(np1, vnp)
vn2_obj.set_network_policy(np2, vnp)
vn1_uuid = self._vnc_lib.virtual_network_create(vn1_obj)
vn2_uuid = self._vnc_lib.virtual_network_create(vn2_obj)
while True:
gevent.sleep(2)
try:
ri = self._vnc_lib.routing_instance_read(
fq_name=[u'default-domain', u'default-project', 'vn1', 'vn1'])
except NoIdError:
print "retrying ... ", test_common.lineno()
continue
ri_refs = ri.get_routing_instance_refs()
if ri_refs:
self.assertEqual(
ri_refs[0]['to'],
[u'default-domain', u'default-project', u'vn2', u'vn2'])
break
print "retrying ... ", test_common.lineno()
# end while True
while True:
try:
ri = self._vnc_lib.routing_instance_read(
fq_name=[u'default-domain', u'default-project',
'vn2', 'vn2'])
except NoIdError:
gevent.sleep(2)
print "retrying ... ", test_common.lineno()
continue
ri_refs = ri.get_routing_instance_refs()
if ri_refs:
self.assertEqual(
ri_refs[0]['to'],
[u'default-domain', u'default-project', u'vn1', u'vn1'])
break
print "retrying ... ", test_common.lineno()
gevent.sleep(2)
# end while True
vn3_obj = VirtualNetwork(vn3_name)
vn3_obj.set_network_policy(np2, vnp)
vn3_uuid = self._vnc_lib.virtual_network_create(vn3_obj)
while True:
try:
ri = self._vnc_lib.routing_instance_read(
fq_name=[u'default-domain', u'default-project',
'vn3', 'vn3'])
except NoIdError:
gevent.sleep(2)
print "retrying ... ", test_common.lineno()
continue
ri_refs = ri.get_routing_instance_refs()
if ri_refs:
self.assertEqual(
ri_refs[0]['to'],
[u'default-domain', u'default-project', u'vn1', u'vn1'])
break
print "retrying ... ", test_common.lineno()
gevent.sleep(2)
# end while True
vn1_obj.del_network_policy(np1)
vn2_obj.del_network_policy(np2)
vn3_obj.del_network_policy(np2)
self._vnc_lib.virtual_network_update(vn1_obj)
self._vnc_lib.virtual_network_update(vn2_obj)
self._vnc_lib.virtual_network_update(vn3_obj)
self.delete_network_policy(np1)
self.delete_network_policy(np2)
self._vnc_lib.virtual_network_delete(fq_name=vn1_obj.get_fq_name())
self._vnc_lib.virtual_network_delete(fq_name=vn2_obj.get_fq_name())
self._vnc_lib.virtual_network_delete(fq_name=vn3_obj.get_fq_name())
while True:
#.........这里部分代码省略.........
开发者ID:chrigl,项目名称:contrail-controller,代码行数:101,代码来源:test_service.py
示例19: test_service_policy
def test_service_policy(self):
# create vn1
vn1_obj = VirtualNetwork('vn1')
ipam_obj = NetworkIpam('ipam1')
self._vnc_lib.network_ipam_create(ipam_obj)
vn1_obj.add_network_ipam(ipam_obj, VnSubnetsType(
[IpamSubnetType(SubnetType("10.0.0.0", 24))]))
self._vnc_lib.virtual_network_create(vn1_obj)
# create vn2
vn2_obj = VirtualNetwork('vn2')
ipam_obj = NetworkIpam('ipam2')
self._vnc_lib.network_ipam_create(ipam_obj)
vn2_obj.add_network_ipam(ipam_obj, VnSubnetsType(
[IpamSubnetType(SubnetType("20.0.0.0", 24))]))
self._vnc_lib.virtual_network_create(vn2_obj)
np = self.create_network_policy(vn1_obj, vn2_obj, ["s1"])
seq = SequenceType(1, 1)
vnp = VirtualNetworkPolicyType(seq)
vn1_obj.clear_pending_updates()
vn2_obj.clear_pending_updates()
vn1_obj.set_network_policy(np, vnp)
vn2_obj.set_network_policy(np, vnp)
self._vnc_lib.virtual_network_update(vn1_obj)
self._vnc_lib.virtual_network_update(vn2_obj)
while True:
gevent.sleep(2)
try:
ri = self._vnc_lib.routing_instance_read(
fq_name=[u'default-domain', u'default-project',
'vn1', 'vn1'])
except NoIdError:
print "retrying ... ", test_common.lineno()
continue
ri_refs = ri.get_routing_instance_refs()
if ri_refs:
sc = [x for x in to_bgp.ServiceChain]
sc_ri_name = 'service-'+sc[0]+'-default-domain_default-project_s1'
self.assertEqual(
ri_refs[0]['to'],
[u'default-domain', u'default-project', u'vn1', sc_ri_name])
break
print "retrying ... ", test_common.lineno()
# end while True
while True:
try:
test_common.FakeApiConfigLog._print()
ri = self._vnc_lib.routing_instance_read(
fq_name=[u'default-domain', u'default-project', u'vn2', sc_ri_name])
except NoIdError:
gevent.sleep(2)
print "retrying ... ", test_common.lineno()
continue
ri_refs = ri.get_routing_instance_refs()
if ri_refs:
self.assertEqual(
ri_refs[0]['to'],
[u'default-domain', u'default-project', u'vn2', u'vn2'])
sci = ri.get_service_chain_information()
if sci is None:
print "retrying ... ", test_common.lineno()
gevent.sleep(2)
continue
self.assertEqual(sci.prefix[0], '10.0.0.0/24')
break
print "retrying ... ", test_common.lineno()
gevent.sleep(2)
# end while True
vn1_obj.del_network_policy(np)
vn2_obj.del_network_policy(np)
self._vnc_lib.virtual_network_update(vn1_obj)
self._vnc_lib.virtual_network_update(vn2_obj)
while True:
gevent.sleep(2)
try:
ri = self._vnc_lib.routing_instance_read(
fq_name=[u'default-domain', u'default-project',
'vn1', 'vn1'])
except NoIdError:
print "retrying ... ", test_common.lineno()
continue
ri_refs = ri.get_routing_instance_refs()
if ri_refs is None:
break
print "retrying ... ", test_common.lineno()
# end while True
self.delete_network_policy(np)
self._vnc_lib.virtual_network_delete(fq_name=vn1_obj.get_fq_name())
self._vnc_lib.virtual_network_delete(fq_name=vn2_obj.get_fq_name())
while True:
try:
self._vnc_lib.virtual_network_read(id=vn1_obj.uuid)
gevent.sleep(2)
print "retrying ... ", test_common.lineno()
continue
except NoIdError:
#.........这里部分代码省略.........
开发者ID:chrigl,项目名称:contrail-controller,代码行数:101,代码来源:test_service.py
示例20: wait_to_get_sc
def wait_to_get_sc(self):
sc = [x for x in to_bgp.ServiceChain]
if len(sc) == 0:
print "retrying ... ", test_common.lineno()
raise Exception('Service chain not found')
return sc
开发者ID:GIC-de,项目名称:contrail-controller,代码行数:6,代码来源:test_service.py
注:本文中的test_common.lineno函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论