本文整理汇总了Python中mock.call.add_chain函数的典型用法代码示例。如果您正苦于以下问题:Python add_chain函数的具体用法?Python add_chain怎么用?Python add_chain使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了add_chain函数的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_remove_metering_label
def test_remove_metering_label(self):
routers = [{'_metering_labels': [
{'id': 'c5df2fe5-c600-4a2a-b2f4-c0fb6df73c83',
'rules': [{
'direction': 'ingress',
'excluded': False,
'id': '7f1a261f-2489-4ed1-870c-a62754501379',
'metering_label_id': 'c5df2fe5-c600-4a2a-b2f4-c0fb6df73c83',
'remote_ip_prefix': '10.0.0.0/24'}]
}],
'admin_state_up': True,
'gw_port_id': '7d411f48-ecc7-45e0-9ece-3b5bdb54fcee',
'id': '473ec392-1711-44e3-b008-3251ccfc5099',
'name': 'router1',
'status': 'ACTIVE',
'tenant_id': '6c5f5d2a1fa2441e88e35422926f48e8'}]
self.metering.add_metering_label(None, routers)
self.metering.remove_metering_label(None, routers)
calls = [call.add_chain('neutron-meter-l-c5df2fe5-c60', wrap=False),
call.add_chain('neutron-meter-r-c5df2fe5-c60', wrap=False),
call.add_rule('neutron-meter-FORWARD', '-j '
'neutron-meter-r-c5df2fe5-c60', wrap=False),
call.add_rule('neutron-meter-l-c5df2fe5-c60',
'',
wrap=False),
call.add_rule('neutron-meter-r-c5df2fe5-c60',
'-i qg-7d411f48-ec -d 10.0.0.0/24'
' -j neutron-meter-l-c5df2fe5-c60',
wrap=False, top=False),
call.remove_chain('neutron-meter-l-c5df2fe5-c60', wrap=False),
call.remove_chain('neutron-meter-r-c5df2fe5-c60', wrap=False)]
self.v4filter_inst.assert_has_calls(calls)
开发者ID:50infivedays,项目名称:neutron,代码行数:34,代码来源:test_iptables_driver.py
示例2: test_prepare_port_filter_with_no_sg
def test_prepare_port_filter_with_no_sg(self):
port = self._fake_port()
self.firewall.prepare_port_filter(port)
calls = [
call.add_chain("sg-fallback"),
call.add_rule("sg-fallback", "-j DROP"),
call.ensure_remove_chain("sg-chain"),
call.add_chain("sg-chain"),
call.add_chain("ifake_dev"),
call.add_rule("FORWARD", "-m physdev --physdev-is-bridged " "--physdev-out tapfake_dev " "-j $sg-chain"),
call.add_rule("sg-chain", "-m physdev --physdev-is-bridged " "--physdev-out tapfake_dev " "-j $ifake_dev"),
call.add_rule("ifake_dev", "-m state --state INVALID -j DROP"),
call.add_rule("ifake_dev", "-m state --state ESTABLISHED,RELATED -j RETURN"),
call.add_rule("ifake_dev", "-j $sg-fallback"),
call.add_chain("ofake_dev"),
call.add_rule("FORWARD", "-m physdev --physdev-is-bridged " "--physdev-in tapfake_dev " "-j $sg-chain"),
call.add_rule("sg-chain", "-m physdev --physdev-is-bridged " "--physdev-in tapfake_dev " "-j $ofake_dev"),
call.add_rule("INPUT", "-m physdev --physdev-is-bridged " "--physdev-in tapfake_dev " "-j $ofake_dev"),
call.add_rule("ofake_dev", "-m mac ! --mac-source ff:ff:ff:ff -j DROP"),
call.add_rule("ofake_dev", "-p udp --sport 68 --dport 67 -j RETURN"),
call.add_rule("ofake_dev", "! -s 10.0.0.1 -j DROP"),
call.add_rule("ofake_dev", "-p udp --sport 67 --dport 68 -j DROP"),
call.add_rule("ofake_dev", "-m state --state INVALID -j DROP"),
call.add_rule("ofake_dev", "-m state --state ESTABLISHED,RELATED -j RETURN"),
call.add_rule("ofake_dev", "-j $sg-fallback"),
call.add_rule("sg-chain", "-j ACCEPT"),
]
self.v4filter_inst.assert_has_calls(calls)
开发者ID:vglafirov,项目名称:quantum,代码行数:29,代码来源:test_iptables_firewall.py
示例3: test_create_firewall_no_rules
def test_create_firewall_no_rules(self):
apply_list = self._fake_apply_list()
firewall = self._fake_firewall_no_rule()
self.firewall.create_firewall(apply_list, firewall)
invalid_rule = '-m state --state INVALID -j DROP'
est_rule = '-m state --state ESTABLISHED,RELATED -j ACCEPT'
bname = fwaas.iptables_manager.binary_name
for ip_version in (4, 6):
ingress_chain = ('iv%s%s' % (ip_version, firewall['id']))
egress_chain = ('ov%s%s' % (ip_version, firewall['id']))
calls = [call.ensure_remove_chain('iv%sfake-fw-uuid' % ip_version),
call.ensure_remove_chain('ov%sfake-fw-uuid' % ip_version),
call.ensure_remove_chain('fwaas-default-policy'),
call.add_chain('fwaas-default-policy'),
call.add_rule('fwaas-default-policy', '-j DROP'),
call.add_chain(ingress_chain),
call.add_rule(ingress_chain, invalid_rule),
call.add_rule(ingress_chain, est_rule),
call.add_chain(egress_chain),
call.add_rule(egress_chain, invalid_rule),
call.add_rule(egress_chain, est_rule),
call.add_rule('FORWARD',
'-o qr-+ -j %s-fwaas-defau' % bname),
call.add_rule('FORWARD',
'-i qr-+ -j %s-fwaas-defau' % bname)]
if ip_version == 4:
v4filter_inst = apply_list[0].iptables_manager.ipv4['filter']
v4filter_inst.assert_has_calls(calls)
else:
v6filter_inst = apply_list[0].iptables_manager.ipv6['filter']
v6filter_inst.assert_has_calls(calls)
开发者ID:50infivedays,项目名称:neutron,代码行数:32,代码来源:test_iptables_fwaas.py
示例4: test_ip_spoofing_filter_with_multiple_ips
def test_ip_spoofing_filter_with_multiple_ips(self):
port = {'device': 'tapfake_dev',
'mac_address': 'ff:ff:ff:ff',
'fixed_ips': ['10.0.0.1', 'fe80::1', '10.0.0.2']}
self.firewall.prepare_port_filter(port)
calls = [call.add_chain('sg-fallback'),
call.add_rule('sg-fallback', '-j DROP'),
call.ensure_remove_chain('sg-chain'),
call.add_chain('sg-chain'),
call.add_chain('ifake_dev'),
call.add_rule('FORWARD',
'-m physdev --physdev-out tapfake_dev '
'--physdev-is-bridged '
'-j $sg-chain'),
call.add_rule('sg-chain',
'-m physdev --physdev-out tapfake_dev '
'--physdev-is-bridged '
'-j $ifake_dev'),
call.add_rule(
'ifake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ifake_dev',
'-m state --state RELATED,ESTABLISHED -j RETURN'),
call.add_rule('ifake_dev', '-j $sg-fallback'),
call.add_chain('ofake_dev'),
call.add_rule('FORWARD',
'-m physdev --physdev-in tapfake_dev '
'--physdev-is-bridged '
'-j $sg-chain'),
call.add_rule('sg-chain',
'-m physdev --physdev-in tapfake_dev '
'--physdev-is-bridged '
'-j $ofake_dev'),
call.add_rule('INPUT',
'-m physdev --physdev-in tapfake_dev '
'--physdev-is-bridged '
'-j $ofake_dev'),
call.add_chain('sfake_dev'),
call.add_rule(
'sfake_dev',
'-m mac --mac-source ff:ff:ff:ff -s 10.0.0.1 -j RETURN'),
call.add_rule(
'sfake_dev',
'-m mac --mac-source ff:ff:ff:ff -s 10.0.0.2 -j RETURN'),
call.add_rule('sfake_dev', '-j DROP'),
call.add_rule(
'ofake_dev',
'-p udp -m udp --sport 68 --dport 67 -j RETURN'),
call.add_rule('ofake_dev', '-j $sfake_dev'),
call.add_rule(
'ofake_dev',
'-p udp -m udp --sport 67 --dport 68 -j DROP'),
call.add_rule(
'ofake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ofake_dev',
'-m state --state RELATED,ESTABLISHED -j RETURN'),
call.add_rule('ofake_dev', '-j $sg-fallback'),
call.add_rule('sg-chain', '-j ACCEPT')]
self.v4filter_inst.assert_has_calls(calls)
开发者ID:CampHarmony,项目名称:neutron,代码行数:60,代码来源:test_iptables_firewall.py
示例5: _setup_firewall_with_rules
def _setup_firewall_with_rules(self, func):
apply_list = self._fake_apply_list()
rule_list = self._fake_rules_v4(FAKE_FW_ID)
firewall = self._fake_firewall(rule_list)
func(apply_list, firewall)
invalid_rule = '-m state --state INVALID -j DROP'
est_rule = '-m state --state ESTABLISHED,RELATED -j ACCEPT'
rule1 = '-p tcp --dport 80 -s 10.24.4.2 -j ACCEPT'
rule2 = '-p tcp --dport 22 -j DROP'
ingress_chain = 'iv4%s' % firewall['id']
egress_chain = 'ov4%s' % firewall['id']
bname = fwaas.iptables_manager.binary_name
ipt_mgr_ichain = '%s-%s' % (bname, ingress_chain[:11])
ipt_mgr_echain = '%s-%s' % (bname, egress_chain[:11])
calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),
call.ensure_remove_chain('ov4fake-fw-uuid'),
call.ensure_remove_chain('fwaas-default-policy'),
call.add_chain('fwaas-default-policy'),
call.add_rule('fwaas-default-policy', '-j DROP'),
call.add_chain(ingress_chain),
call.add_rule(ingress_chain, invalid_rule),
call.add_rule(ingress_chain, est_rule),
call.add_chain(egress_chain),
call.add_rule(egress_chain, invalid_rule),
call.add_rule(egress_chain, est_rule),
call.add_rule(ingress_chain, rule1),
call.add_rule(egress_chain, rule1),
call.add_rule(ingress_chain, rule2),
call.add_rule(egress_chain, rule2),
call.add_rule('FORWARD', '-o qr-+ -j %s' % ipt_mgr_ichain),
call.add_rule('FORWARD', '-i qr-+ -j %s' % ipt_mgr_echain),
call.add_rule('FORWARD', '-o qr-+ -j %s-fwaas-defau' % bname),
call.add_rule('FORWARD', '-i qr-+ -j %s-fwaas-defau' % bname)]
self.v4filter_inst.assert_has_calls(calls)
开发者ID:CampHarmony,项目名称:neutron,代码行数:34,代码来源:test_iptables_fwaas.py
示例6: test_ip_spoofing_no_fixed_ips
def test_ip_spoofing_no_fixed_ips(self):
port = {"device": "tapfake_dev", "mac_address": "ff:ff:ff:ff", "fixed_ips": []}
self.firewall.prepare_port_filter(port)
calls = [
call.add_chain("sg-fallback"),
call.add_rule("sg-fallback", "-j DROP"),
call.ensure_remove_chain("sg-chain"),
call.add_chain("sg-chain"),
call.add_chain("ifake_dev"),
call.add_rule("FORWARD", "-m physdev --physdev-out tapfake_dev " "--physdev-is-bridged " "-j $sg-chain"),
call.add_rule("sg-chain", "-m physdev --physdev-out tapfake_dev " "--physdev-is-bridged " "-j $ifake_dev"),
call.add_rule("ifake_dev", "-m state --state INVALID -j DROP"),
call.add_rule("ifake_dev", "-m state --state RELATED,ESTABLISHED -j RETURN"),
call.add_rule("ifake_dev", "-j $sg-fallback"),
call.add_chain("ofake_dev"),
call.add_rule("FORWARD", "-m physdev --physdev-in tapfake_dev " "--physdev-is-bridged " "-j $sg-chain"),
call.add_rule("sg-chain", "-m physdev --physdev-in tapfake_dev " "--physdev-is-bridged " "-j $ofake_dev"),
call.add_rule("INPUT", "-m physdev --physdev-in tapfake_dev " "--physdev-is-bridged " "-j $ofake_dev"),
call.add_chain("sfake_dev"),
call.add_rule("sfake_dev", "-m mac --mac-source ff:ff:ff:ff -j RETURN"),
call.add_rule("sfake_dev", "-j DROP"),
call.add_rule("ofake_dev", "-p udp -m udp --sport 68 --dport 67 -j RETURN"),
call.add_rule("ofake_dev", "-j $sfake_dev"),
call.add_rule("ofake_dev", "-p udp -m udp --sport 67 --dport 68 -j DROP"),
call.add_rule("ofake_dev", "-m state --state INVALID -j DROP"),
call.add_rule("ofake_dev", "-m state --state RELATED,ESTABLISHED -j RETURN"),
call.add_rule("ofake_dev", "-j $sg-fallback"),
call.add_rule("sg-chain", "-j ACCEPT"),
]
self.v4filter_inst.assert_has_calls(calls)
开发者ID:nabilmaad,项目名称:neutron,代码行数:30,代码来源:test_iptables_firewall.py
示例7: _test_prepare_port_filter
def _test_prepare_port_filter(self, rule, ingress_expected_call=None, egress_expected_call=None):
port = self._fake_port()
ethertype = rule["ethertype"]
prefix = FAKE_IP[ethertype]
filter_inst = self.v4filter_inst
dhcp_rule = call.add_rule("ofake_dev", "-p udp -m udp --sport 68 --dport 67 -j RETURN")
if ethertype == "IPv6":
filter_inst = self.v6filter_inst
dhcp_rule = call.add_rule("ofake_dev", "-p icmpv6 -j RETURN")
sg = [rule]
port["security_group_rules"] = sg
self.firewall.prepare_port_filter(port)
calls = [
call.add_chain("sg-fallback"),
call.add_rule("sg-fallback", "-j DROP"),
call.ensure_remove_chain("sg-chain"),
call.add_chain("sg-chain"),
call.add_chain("ifake_dev"),
call.add_rule("FORWARD", "-m physdev --physdev-out tapfake_dev " "--physdev-is-bridged " "-j $sg-chain"),
call.add_rule("sg-chain", "-m physdev --physdev-out tapfake_dev " "--physdev-is-bridged " "-j $ifake_dev"),
]
if ethertype == "IPv6":
for icmp6_type in constants.ICMPV6_ALLOWED_TYPES:
calls.append(call.add_rule("ifake_dev", "-p icmpv6 --icmpv6-type %s -j RETURN" % icmp6_type))
calls += [
call.add_rule("ifake_dev", "-m state --state INVALID -j DROP"),
call.add_rule("ifake_dev", "-m state --state RELATED,ESTABLISHED -j RETURN"),
]
if ingress_expected_call:
calls.append(ingress_expected_call)
calls += [
call.add_rule("ifake_dev", "-j $sg-fallback"),
call.add_chain("ofake_dev"),
call.add_rule("FORWARD", "-m physdev --physdev-in tapfake_dev " "--physdev-is-bridged " "-j $sg-chain"),
call.add_rule("sg-chain", "-m physdev --physdev-in tapfake_dev " "--physdev-is-bridged " "-j $ofake_dev"),
call.add_rule("INPUT", "-m physdev --physdev-in tapfake_dev " "--physdev-is-bridged " "-j $ofake_dev"),
call.add_chain("sfake_dev"),
call.add_rule("sfake_dev", "-m mac --mac-source ff:ff:ff:ff -s %s -j RETURN" % prefix),
call.add_rule("sfake_dev", "-j DROP"),
dhcp_rule,
call.add_rule("ofake_dev", "-j $sfake_dev"),
]
if ethertype == "IPv4":
calls.append(call.add_rule("ofake_dev", "-p udp -m udp --sport 67 --dport 68 -j DROP"))
calls += [
call.add_rule("ofake_dev", "-m state --state INVALID -j DROP"),
call.add_rule("ofake_dev", "-m state --state RELATED,ESTABLISHED -j RETURN"),
]
if egress_expected_call:
calls.append(egress_expected_call)
calls += [call.add_rule("ofake_dev", "-j $sg-fallback"), call.add_rule("sg-chain", "-j ACCEPT")]
filter_inst.assert_has_calls(calls)
开发者ID:nabilmaad,项目名称:neutron,代码行数:60,代码来源:test_iptables_firewall.py
示例8: test_prepare_port_filter_with_no_sg
def test_prepare_port_filter_with_no_sg(self):
port = self._fake_port()
self.firewall.prepare_port_filter(port)
calls = [call.add_chain('sg-fallback'),
call.add_rule('sg-fallback', '-j DROP'),
call.ensure_remove_chain('sg-chain'),
call.add_chain('sg-chain'),
call.add_chain('ifake_dev'),
call.add_rule('FORWARD',
'-m physdev --physdev-out tapfake_dev '
'--physdev-is-bridged '
'-j $sg-chain'),
call.add_rule('sg-chain',
'-m physdev --physdev-out tapfake_dev '
'--physdev-is-bridged '
'-j $ifake_dev'),
call.add_rule(
'ifake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ifake_dev',
'-m state --state RELATED,ESTABLISHED -j RETURN'),
call.add_rule('ifake_dev', '-j $sg-fallback'),
call.add_chain('ofake_dev'),
call.add_rule('FORWARD',
'-m physdev --physdev-in tapfake_dev '
'--physdev-is-bridged '
'-j $sg-chain'),
call.add_rule('sg-chain',
'-m physdev --physdev-in tapfake_dev '
'--physdev-is-bridged '
'-j $ofake_dev'),
call.add_rule('INPUT',
'-m physdev --physdev-in tapfake_dev '
'--physdev-is-bridged '
'-j $ofake_dev'),
call.add_chain('sfake_dev'),
call.add_rule(
'sfake_dev', '-m mac --mac-source ff:ff:ff:ff '
'-s 10.0.0.1 -j RETURN'),
call.add_rule('sfake_dev', '-j DROP'),
call.add_rule(
'ofake_dev',
'-p udp -m udp --sport 68 --dport 67 -j RETURN'),
call.add_rule('ofake_dev', '-j $sfake_dev'),
call.add_rule(
'ofake_dev',
'-p udp -m udp --sport 67 --dport 68 -j DROP'),
call.add_rule(
'ofake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ofake_dev',
'-m state --state RELATED,ESTABLISHED -j RETURN'),
call.add_rule('ofake_dev', '-j $sg-fallback'),
call.add_rule('sg-chain', '-j ACCEPT')]
self.v4filter_inst.assert_has_calls(calls)
开发者ID:CampHarmony,项目名称:neutron,代码行数:56,代码来源:test_iptables_firewall.py
示例9: test_create_firewall_with_admin_down
def test_create_firewall_with_admin_down(self):
apply_list = self._fake_apply_list()
rule_list = self._fake_rules_v4(FAKE_FW_ID, apply_list)
firewall = self._fake_firewall_with_admin_down(rule_list)
self.firewall.create_firewall(apply_list, firewall)
calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),
call.ensure_remove_chain('ov4fake-fw-uuid'),
call.ensure_remove_chain('fwaas-default-policy'),
call.add_chain('fwaas-default-policy'),
call.add_rule('fwaas-default-policy', '-j DROP')]
apply_list[0].iptables_manager.ipv4['filter'].assert_has_calls(calls)
开发者ID:50infivedays,项目名称:neutron,代码行数:11,代码来源:test_iptables_fwaas.py
示例10: test_create_firewall_with_admin_down
def test_create_firewall_with_admin_down(self):
rule_list = self._fake_rules_v4(FAKE_FW_ID)
apply_list = self._fake_apply_list()
firewall = self._fake_firewall_with_admin_down(rule_list)
self.firewall.create_firewall(apply_list, firewall)
calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),
call.ensure_remove_chain('ov4fake-fw-uuid'),
call.ensure_remove_chain('fwaas-default-policy'),
call.add_chain('fwaas-default-policy'),
call.add_rule('fwaas-default-policy', '-j DROP')]
self.v4filter_inst.assert_has_calls(calls)
开发者ID:CampHarmony,项目名称:neutron,代码行数:11,代码来源:test_iptables_fwaas.py
示例11: test_add_metering_label
def test_add_metering_label(self):
routers = [{'_metering_labels': [
{'id': 'c5df2fe5-c600-4a2a-b2f4-c0fb6df73c83',
'rules': []}],
'admin_state_up': True,
'gw_port_id': '7d411f48-ecc7-45e0-9ece-3b5bdb54fcee',
'id': '473ec392-1711-44e3-b008-3251ccfc5099',
'name': 'router1',
'status': 'ACTIVE',
'tenant_id': '6c5f5d2a1fa2441e88e35422926f48e8'}]
self.metering.add_metering_label(None, routers)
calls = [call.add_chain('neutron-meter-l-c5df2fe5-c60', wrap=False),
call.add_chain('neutron-meter-r-c5df2fe5-c60', wrap=False),
call.add_rule('neutron-meter-FORWARD', '-j '
'neutron-meter-r-c5df2fe5-c60', wrap=False),
call.add_rule('neutron-meter-l-c5df2fe5-c60',
'',
wrap=False)]
self.v4filter_inst.assert_has_calls(calls)
开发者ID:50infivedays,项目名称:neutron,代码行数:21,代码来源:test_iptables_driver.py
示例12: test_create_firewall_no_rules
def test_create_firewall_no_rules(self):
apply_list = self._fake_apply_list()
firewall = self._fake_firewall_no_rule()
self.firewall.create_firewall(apply_list, firewall)
invalid_rule = '-m state --state INVALID -j DROP'
est_rule = '-m state --state ESTABLISHED,RELATED -j ACCEPT'
ingress_chain = ('iv4%s' % firewall['id'])
egress_chain = ('ov4%s' % firewall['id'])
bname = fwaas.iptables_manager.binary_name
calls = [call.ensure_remove_chain('iv4fake-fw-uuid'),
call.ensure_remove_chain('ov4fake-fw-uuid'),
call.ensure_remove_chain('fwaas-default-policy'),
call.add_chain('fwaas-default-policy'),
call.add_rule('fwaas-default-policy', '-j DROP'),
call.add_chain(ingress_chain),
call.add_rule(ingress_chain, invalid_rule),
call.add_rule(ingress_chain, est_rule),
call.add_chain(egress_chain),
call.add_rule(egress_chain, invalid_rule),
call.add_rule(egress_chain, est_rule),
call.add_rule('FORWARD', '-o qr-+ -j %s-fwaas-defau' % bname),
call.add_rule('FORWARD', '-i qr-+ -j %s-fwaas-defau' % bname)]
self.v4filter_inst.assert_has_calls(calls)
开发者ID:CampHarmony,项目名称:neutron,代码行数:23,代码来源:test_iptables_fwaas.py
示例13: test_update_delete_port_filter
def test_update_delete_port_filter(self):
port = self._fake_port()
port["security_group_rules"] = [{"ethertype": "IPv4", "direction": "ingress"}]
self.firewall.prepare_port_filter(port)
port["security_group_rules"] = [{"ethertype": "IPv4", "direction": "egress"}]
self.firewall.update_port_filter(port)
self.firewall.update_port_filter({"device": "no-exist-device"})
self.firewall.remove_port_filter(port)
self.firewall.remove_port_filter({"device": "no-exist-device"})
calls = [
call.add_chain("sg-fallback"),
call.add_rule("sg-fallback", "-j DROP"),
call.ensure_remove_chain("sg-chain"),
call.add_chain("sg-chain"),
call.add_chain("ifake_dev"),
call.add_rule("FORWARD", "-m physdev --physdev-is-bridged " "--physdev-out tapfake_dev -j $sg-chain"),
call.add_rule("sg-chain", "-m physdev --physdev-is-bridged " "--physdev-out tapfake_dev -j $ifake_dev"),
call.add_rule("ifake_dev", "-m state --state INVALID -j DROP"),
call.add_rule("ifake_dev", "-m state --state ESTABLISHED,RELATED -j RETURN"),
call.add_rule("ifake_dev", "-j RETURN"),
call.add_rule("ifake_dev", "-j $sg-fallback"),
call.add_chain("ofake_dev"),
call.add_rule("FORWARD", "-m physdev --physdev-is-bridged " "--physdev-in tapfake_dev -j $sg-chain"),
call.add_rule("sg-chain", "-m physdev --physdev-is-bridged " "--physdev-in tapfake_dev -j $ofake_dev"),
call.add_rule("INPUT", "-m physdev --physdev-is-bridged " "--physdev-in tapfake_dev -j $ofake_dev"),
call.add_rule("ofake_dev", "-m mac ! --mac-source ff:ff:ff:ff -j DROP"),
call.add_rule("ofake_dev", "-p udp --sport 68 --dport 67 -j RETURN"),
call.add_rule("ofake_dev", "! -s 10.0.0.1 -j DROP"),
call.add_rule("ofake_dev", "-p udp --sport 67 --dport 68 -j DROP"),
call.add_rule("ofake_dev", "-m state --state INVALID -j DROP"),
call.add_rule("ofake_dev", "-m state --state ESTABLISHED,RELATED -j RETURN"),
call.add_rule("ofake_dev", "-j $sg-fallback"),
call.add_rule("sg-chain", "-j ACCEPT"),
call.ensure_remove_chain("ifake_dev"),
call.ensure_remove_chain("ofake_dev"),
call.ensure_remove_chain("sg-chain"),
call.add_chain("sg-chain"),
call.add_chain("ifake_dev"),
call.add_rule("FORWARD", "-m physdev --physdev-is-bridged " "--physdev-out tapfake_dev -j $sg-chain"),
call.add_rule("sg-chain", "-m physdev --physdev-is-bridged " "--physdev-out tapfake_dev -j $ifake_dev"),
call.add_rule("ifake_dev", "-m state --state INVALID -j DROP"),
call.add_rule("ifake_dev", "-m state --state ESTABLISHED,RELATED -j RETURN"),
call.add_rule("ifake_dev", "-j $sg-fallback"),
call.add_chain("ofake_dev"),
call.add_rule("FORWARD", "-m physdev --physdev-is-bridged " "--physdev-in tapfake_dev -j $sg-chain"),
call.add_rule("sg-chain", "-m physdev --physdev-is-bridged " "--physdev-in tapfake_dev -j $ofake_dev"),
call.add_rule("INPUT", "-m physdev --physdev-is-bridged " "--physdev-in tapfake_dev -j $ofake_dev"),
call.add_rule("ofake_dev", "-m mac ! --mac-source ff:ff:ff:ff -j DROP"),
call.add_rule("ofake_dev", "-p udp --sport 68 --dport 67 -j RETURN"),
call.add_rule("ofake_dev", "! -s 10.0.0.1 -j DROP"),
call.add_rule("ofake_dev", "-p udp --sport 67 --dport 68 -j DROP"),
call.add_rule("ofake_dev", "-m state --state INVALID -j DROP"),
call.add_rule("ofake_dev", "-m state --state ESTABLISHED,RELATED -j RETURN"),
call.add_rule("ofake_dev", "-j RETURN"),
call.add_rule("ofake_dev", "-j $sg-fallback"),
call.add_rule("sg-chain", "-j ACCEPT"),
call.ensure_remove_chain("ifake_dev"),
call.ensure_remove_chain("ofake_dev"),
call.ensure_remove_chain("sg-chain"),
call.add_chain("sg-chain"),
]
self.v4filter_inst.assert_has_calls(calls)
开发者ID:vglafirov,项目名称:quantum,代码行数:63,代码来源:test_iptables_firewall.py
示例14: test_update_delete_port_filter
def test_update_delete_port_filter(self):
port = self._fake_port()
port['security_group_rules'] = [{'ethertype': 'IPv4',
'direction': 'ingress'}]
self.firewall.prepare_port_filter(port)
port['security_group_rules'] = [{'ethertype': 'IPv4',
'direction': 'egress'}]
self.firewall.update_port_filter(port)
self.firewall.update_port_filter({'device': 'no-exist-device'})
self.firewall.remove_port_filter(port)
self.firewall.remove_port_filter({'device': 'no-exist-device'})
calls = [call.add_chain('sg-fallback'),
call.add_rule('sg-fallback', '-j DROP'),
call.ensure_remove_chain('sg-chain'),
call.add_chain('sg-chain'),
call.add_chain('ifake_dev'),
call.add_rule(
'FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev -j $sg-chain'),
call.add_rule(
'sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev -j $ifake_dev'),
call.add_rule(
'ifake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ifake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN'),
call.add_rule('ifake_dev', '-j RETURN'),
call.add_rule('ifake_dev', '-j $sg-fallback'),
call.add_chain('ofake_dev'),
call.add_rule(
'FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $sg-chain'),
call.add_rule(
'sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $ofake_dev'),
call.add_rule(
'INPUT',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $ofake_dev'),
call.add_rule(
'ofake_dev',
'-m mac ! --mac-source ff:ff:ff:ff -j DROP'),
call.add_rule(
'ofake_dev',
'-p udp --sport 68 --dport 67 -j RETURN'),
call.add_rule(
'ofake_dev',
'! -s 10.0.0.1 -j DROP'),
call.add_rule(
'ofake_dev',
'-p udp --sport 67 --dport 68 -j DROP'),
call.add_rule(
'ofake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ofake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN'),
call.add_rule('ofake_dev', '-j $sg-fallback'),
call.add_rule('sg-chain', '-j ACCEPT'),
call.ensure_remove_chain('ifake_dev'),
call.ensure_remove_chain('ofake_dev'),
call.ensure_remove_chain('sg-chain'),
call.add_chain('sg-chain'),
call.add_chain('ifake_dev'),
call.add_rule(
'FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev -j $sg-chain'),
call.add_rule(
'sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev -j $ifake_dev'),
call.add_rule(
'ifake_dev',
'-m state --state INVALID -j DROP'),
call.add_rule(
'ifake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN'),
call.add_rule('ifake_dev', '-j $sg-fallback'),
call.add_chain('ofake_dev'),
call.add_rule(
'FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $sg-chain'),
call.add_rule(
'sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $ofake_dev'),
call.add_rule(
'INPUT',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev -j $ofake_dev'),
call.add_rule(
'ofake_dev',
'-m mac ! --mac-source ff:ff:ff:ff -j DROP'),
call.add_rule(
#.........这里部分代码省略.........
开发者ID:abhiraut,项目名称:quantum,代码行数:101,代码来源:test_iptables_firewall.py
示例15: _test_prepare_port_filter
def _test_prepare_port_filter(self,
rule,
ingress_expected_call=None,
egress_expected_call=None):
port = self._fake_port()
ethertype = rule['ethertype']
prefix = FAKE_IP[ethertype]
filter_inst = self.v4filter_inst
dhcp_rule = call.add_rule(
'ofake_dev',
'-p udp --sport 68 --dport 67 -j RETURN')
if ethertype == 'IPv6':
filter_inst = self.v6filter_inst
dhcp_rule = call.add_rule('ofake_dev', '-p icmpv6 -j RETURN')
sg = [rule]
port['security_group_rules'] = sg
self.firewall.prepare_port_filter(port)
calls = [call.add_chain('sg-fallback'),
call.add_rule('sg-fallback', '-j DROP'),
call.ensure_remove_chain('sg-chain'),
call.add_chain('sg-chain'),
call.add_chain('ifake_dev'),
call.add_rule('FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev '
'-j $sg-chain'),
call.add_rule('sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-out tapfake_dev '
'-j $ifake_dev'),
call.add_rule(
'ifake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ifake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN')]
if ingress_expected_call:
calls.append(ingress_expected_call)
calls += [call.add_rule('ifake_dev', '-j $sg-fallback'),
call.add_chain('ofake_dev'),
call.add_rule('FORWARD',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev '
'-j $sg-chain'),
call.add_rule('sg-chain',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev '
'-j $ofake_dev'),
call.add_rule('INPUT',
'-m physdev --physdev-is-bridged '
'--physdev-in tapfake_dev '
'-j $ofake_dev'),
call.add_rule(
'ofake_dev',
'-m mac ! --mac-source ff:ff:ff:ff -j DROP'),
dhcp_rule,
call.add_rule('ofake_dev', '! -s %s -j DROP' % prefix)]
if ethertype == 'IPv4':
calls.append(call.add_rule(
'ofake_dev',
'-p udp --sport 67 --dport 68 -j DROP'))
calls += [call.add_rule(
'ofake_dev', '-m state --state INVALID -j DROP'),
call.add_rule(
'ofake_dev',
'-m state --state ESTABLISHED,RELATED -j RETURN')]
if egress_expected_call:
calls.append(egress_expected_call)
calls += [call.add_rule('ofake_dev', '-j $sg-fallback'),
call.add_rule('sg-chain', '-j ACCEPT')]
filter_inst.assert_has_calls(calls)
开发者ID:abhiraut,项目名称:quantum,代码行数:79,代码来源:test_iptables_firewall.py
示例16: test_update_routers
def test_update_routers(self):
routers = [{'_metering_labels': [
{'id': 'c5df2fe5-c600-4a2a-b2f4-c0fb6df73c83',
'rules': [{
'direction': 'ingress',
'excluded': False,
'id': '7f1a261f-2489-4ed1-870c-a62754501379',
'metering_label_id': 'c5df2fe5-c600-4a2a-b2f4-c0fb6df73c83',
'remote_ip_prefix': '10.0.0.0/24'}]}],
'admin_state_up': True,
'gw_port_id': '6d411f48-ecc7-45e0-9ece-3b5bdb54fcee',
'id': '473ec392-1711-44e3-b008-3251ccfc5099',
'name': 'router1',
'status': 'ACTIVE',
'tenant_id': '6c5f5d2a1fa2441e88e35422926f48e8'},
{'_metering_labels': [
{'id': 'eeef45da-c600-4a2a-b2f4-c0fb6df73c83',
'rules': [{
'direction': 'ingress',
'excluded': True,
'id': 'fa2441e8-2489-4ed1-870c-a62754501379',
'metering_label_id': 'eeef45da-c600-4a2a-b2f4-c0fb6df73c83',
'remote_ip_prefix': '20.0.0.0/24'}]}],
'admin_state_up': True,
'gw_port_id': '7d411f48-ecc7-45e0-9ece-3b5bdb54fcee',
'id': '373ec392-1711-44e3-b008-3251ccfc5099',
'name': 'router2',
'status': 'ACTIVE',
'tenant_id': '6c5f5d2a1fa2441e88e35422926f48e8'}]
self.metering.add_metering_label(None, routers)
updates = copy.deepcopy(routers)
updates[0]['gw_port_id'] = '587b63c1-22a3-40b3-9834-486d1fb215a5'
self.metering.update_routers(None, updates)
calls = [call.add_chain('neutron-meter-l-c5df2fe5-c60', wrap=False),
call.add_chain('neutron-meter-r-c5df2fe5-c60', wrap=False),
call.add_rule('neutron-meter-FORWARD', '-j '
'neutron-meter-r-c5df2fe5-c60', wrap=False),
call.add_rule('neutron-meter-l-c5df2fe5-c60',
'',
wrap=False),
call.add_rule('neutron-meter-r-c5df2fe5-c60',
'-i qg-6d411f48-ec -d 10.0.0.0/24'
' -j neutron-meter-l-c5df2fe5-c60',
wrap=False, top=False),
call.add_chain('neutron-meter-l-eeef45da-c60', wrap=False),
call.add_chain('neutron-meter-r-eeef45da-c60', wrap=False),
call.add_rule('neutron-meter-FORWARD', '-j '
'neutron-meter-r-eeef45da-c60', wrap=False),
call.add_rule('neutron-meter-l-eeef45da-c60',
'',
wrap=False),
call.add_rule('neutron-meter-r-eeef45da-c60',
'-i qg-7d411f48-ec -d 20.0.0.0/24 -j RETURN',
wrap=False, top=True),
call.remove_chain('neutron-meter-l-c5df2fe5-c60', wrap=False),
call.remove_chain('neutron-meter-r-c5df2fe5-c60', wrap=False),
call.add_chain('neutron-meter-l-c5df2fe5-c60', wrap=False),
call.add_chain('neutron-meter-r-c5df2fe5-c60', wrap=False),
call.add_rule('neutron-meter-FORWARD', '-j '
'neutron-meter-r-c5df2fe5-c60', wrap=False),
call.add_rule('neutron-meter-l-c5df2fe5-c60',
'',
wrap=False),
call.add_rule('neutron-meter-r-c5df2fe5-c60',
'-i qg-587b63c1-22 -d 10.0.0.0/24'
' -j neutron-meter-l-c5df2fe5-c60',
wrap=False, top=False)]
self.v4filter_inst.assert_has_calls(calls)
开发者ID:50infivedays,项目名称:neutron,代码行数:72,代码来源:test_iptables_driver.py
注:本文中的mock.call.add_chain函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论