本文整理汇总了Python中neutron.agent.linux.iptables_manager.get_chain_name函数的典型用法代码示例。如果您正苦于以下问题:Python get_chain_name函数的具体用法?Python get_chain_name怎么用?Python get_chain_name使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_chain_name函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _enable_policy_chain
def _enable_policy_chain(self, fwid, ipt_if_prefix):
bname = iptables_manager.binary_name
ipt_mgr = ipt_if_prefix['ipt']
if_prefix = ipt_if_prefix['if_prefix']
for (ver, tbl) in [(IPV4, ipt_mgr.ipv4['filter']),
(IPV6, ipt_mgr.ipv6['filter'])]:
for direction in [INGRESS_DIRECTION, EGRESS_DIRECTION]:
chain_name = self._get_chain_name(fwid, ver, direction)
chain_name = iptables_manager.get_chain_name(chain_name)
if chain_name in tbl.chains:
jump_rule = ['%s %s+ -j %s-%s' % (IPTABLES_DIR[direction],
if_prefix, bname, chain_name)]
self._add_rules_to_chain(ipt_mgr,
ver, 'FORWARD', jump_rule)
#jump to DROP_ALL policy
chain_name = iptables_manager.get_chain_name(FWAAS_DEFAULT_CHAIN)
jump_rule = ['-o %s+ -j %s-%s' % (if_prefix, bname, chain_name)]
self._add_rules_to_chain(ipt_mgr, IPV4, 'FORWARD', jump_rule)
self._add_rules_to_chain(ipt_mgr, IPV6, 'FORWARD', jump_rule)
#jump to DROP_ALL policy
chain_name = iptables_manager.get_chain_name(FWAAS_DEFAULT_CHAIN)
jump_rule = ['-i %s+ -j %s-%s' % (if_prefix, bname, chain_name)]
self._add_rules_to_chain(ipt_mgr, IPV4, 'FORWARD', jump_rule)
self._add_rules_to_chain(ipt_mgr, IPV6, 'FORWARD', jump_rule)
开发者ID:absolutarin,项目名称:neutron,代码行数:27,代码来源:iptables_fwaas.py
示例2: _process_disassociate_metering_label
def _process_disassociate_metering_label(self, router):
rm = self.routers.get(router['id'])
if not rm:
return
with IptablesManagerTransaction(rm.iptables_manager):
labels = router.get(constants.METERING_LABEL_KEY, [])
for label in labels:
label_id = label['id']
if label_id not in rm.metering_labels:
continue
label_chain = iptables_manager.get_chain_name(WRAP_NAME +
LABEL + label_id,
wrap=False)
rules_chain = iptables_manager.get_chain_name(WRAP_NAME +
RULE + label_id,
wrap=False)
rm.iptables_manager.ipv4['filter'].remove_chain(label_chain,
wrap=False)
rm.iptables_manager.ipv4['filter'].remove_chain(rules_chain,
wrap=False)
del rm.metering_labels[label_id]
开发者ID:Akanksha08,项目名称:neutron,代码行数:25,代码来源:iptables_driver.py
示例3: _process_associate_metering_label
def _process_associate_metering_label(self, router):
self._update_router(router)
rm = self.routers.get(router['id'])
with IptablesManagerTransaction(rm.iptables_manager):
labels = router.get(constants.METERING_LABEL_KEY, [])
for label in labels:
label_id = label['id']
label_chain = iptables_manager.get_chain_name(WRAP_NAME +
LABEL + label_id,
wrap=False)
rm.iptables_manager.ipv4['filter'].add_chain(label_chain,
wrap=False)
rules_chain = iptables_manager.get_chain_name(WRAP_NAME +
RULE + label_id,
wrap=False)
rm.iptables_manager.ipv4['filter'].add_chain(rules_chain,
wrap=False)
rm.iptables_manager.ipv4['filter'].add_rule(TOP_CHAIN, '-j ' +
rules_chain,
wrap=False)
rm.iptables_manager.ipv4['filter'].add_rule(label_chain,
'',
wrap=False)
rules = label.get('rules')
if rules:
self._process_metering_label_rules(rm, rules,
label_chain,
rules_chain)
rm.metering_labels[label_id] = label
开发者ID:Akanksha08,项目名称:neutron,代码行数:35,代码来源:iptables_driver.py
示例4: _process_metering_rule_action
def _process_metering_rule_action(self, router, action):
rm = self.routers.get(router['id'])
if not rm:
return
ext_dev = self.get_external_device_name(rm.router['gw_port_id'])
if not ext_dev:
return
with IptablesManagerTransaction(rm.iptables_manager):
labels = router.get(constants.METERING_LABEL_KEY, [])
for label in labels:
label_id = label['id']
label_chain = iptables_manager.get_chain_name(WRAP_NAME +
LABEL + label_id,
wrap=False)
rules_chain = iptables_manager.get_chain_name(WRAP_NAME +
RULE + label_id,
wrap=False)
rule = label.get('rule')
if rule:
if action == 'create':
self._process_metering_label_rule_add(rm, rule,
ext_dev,
label_chain,
rules_chain)
elif action == 'delete':
self._process_metering_label_rule_delete(rm, rule,
ext_dev,
label_chain,
rules_chain)
开发者ID:Akanksha08,项目名称:neutron,代码行数:30,代码来源:iptables_driver.py
示例5: _process_metering_rule_action_based_on_ns
def _process_metering_rule_action_based_on_ns(
self, router, action, ext_dev, im):
'''Process metering rule actions based specific namespaces.'''
rm = self.routers.get(router['id'])
with IptablesManagerTransaction(im):
labels = router.get(constants.METERING_LABEL_KEY, [])
for label in labels:
label_id = label['id']
label_chain = iptables_manager.get_chain_name(
WRAP_NAME + LABEL + label_id, wrap=False)
rules_chain = iptables_manager.get_chain_name(
WRAP_NAME + RULE + label_id, wrap=False)
exists = rm.metering_labels.get(label_id)
if action == 'create' and not exists:
self._create_metering_label_chain(rm,
label_chain,
rules_chain)
rm.metering_labels[label_id] = label
rule = label.get('rule')
if rule:
if action == 'create':
self._process_metering_label_rule_add(
rule, ext_dev, label_chain, rules_chain, im)
elif action == 'delete':
self._process_metering_label_rule_delete(
rule, ext_dev, label_chain, rules_chain, im)
开发者ID:openstack,项目名称:neutron,代码行数:29,代码来源:iptables_driver.py
示例6: test_get_chain_name
def test_get_chain_name(self):
name = "0123456789" * 5
# 28 chars is the maximum length of iptables chain name.
self.assertEqual(iptables_manager.get_chain_name(name, wrap=False), name[:28])
# 11 chars is the maximum length of chain name of iptable_manager
# if binary_name is prepended.
self.assertEqual(iptables_manager.get_chain_name(name, wrap=True), name[:11])
开发者ID:nitinnain,项目名称:neutron,代码行数:7,代码来源:test_iptables_manager.py
示例7: _process_ns_specific_metering_label
def _process_ns_specific_metering_label(self, router, ext_dev, im):
'''Process metering label based on the associated namespaces.'''
rm = self.routers.get(router['id'])
with IptablesManagerTransaction(im):
labels = router.get(constants.METERING_LABEL_KEY, [])
for label in labels:
label_id = label['id']
label_chain = iptables_manager.get_chain_name(
WRAP_NAME + LABEL + label_id, wrap=False)
rules_chain = iptables_manager.get_chain_name(
WRAP_NAME + RULE + label_id, wrap=False)
exists = rm.metering_labels.get(label_id)
if not exists:
self._create_metering_label_chain(rm,
label_chain,
rules_chain)
rm.metering_labels[label_id] = label
rules = label.get('rules')
if rules:
self._process_metering_label_rules(
rules, label_chain, rules_chain, ext_dev, im)
开发者ID:openstack,项目名称:neutron,代码行数:25,代码来源:iptables_driver.py
示例8: test_rule_with_wrap_target
def test_rule_with_wrap_target(self):
name = '0123456789' * 5
wrap = "%s-%s" % (iptables_manager.binary_name,
iptables_manager.get_chain_name(name))
iptables_args = {'bn': iptables_manager.binary_name,
'wrap': wrap}
filter_dump_mod = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-local - [0:0]\n'
':%(wrap)s - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
'[0:0] -A FORWARD -j neutron-filter-top\n'
'[0:0] -A OUTPUT -j neutron-filter-top\n'
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
'[0:0] -A %(bn)s-INPUT -s 0/0 -d 192.168.0.2 -j '
'%(wrap)s\n'
'COMMIT\n'
'# Completed by iptables_manager\n'
% iptables_args)
expected_calls_and_values = [
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=NAT_DUMP + filter_dump_mod,
root_helper=self.root_helper),
None),
(mock.call(['iptables-save', '-c'],
root_helper=self.root_helper),
''),
(mock.call(['iptables-restore', '-c'],
process_input=NAT_DUMP + FILTER_DUMP,
root_helper=self.root_helper),
None),
]
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain(name)
self.iptables.ipv4['filter'].add_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' $%s' % name)
self.iptables.apply()
self.iptables.ipv4['filter'].remove_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' $%s' % name)
self.iptables.ipv4['filter'].remove_chain(name)
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
开发者ID:ArifovicH,项目名称:neutron,代码行数:60,代码来源:test_iptables_manager.py
示例9: get_traffic_counters
def get_traffic_counters(self, context, routers):
accs = {}
routers_to_reconfigure = set()
for router in routers:
rm = self.routers.get(router["id"])
if not rm:
continue
for label_id, label in rm.metering_labels.items():
try:
chain = iptables_manager.get_chain_name(WRAP_NAME + LABEL + label_id, wrap=False)
chain_acc = rm.iptables_manager.get_traffic_counters(chain, wrap=False, zero=True)
except RuntimeError:
LOG.exception(_LE("Failed to get traffic counters, " "router: %s"), router)
routers_to_reconfigure.add(router["id"])
continue
if not chain_acc:
continue
acc = accs.get(label_id, {"pkts": 0, "bytes": 0})
acc["pkts"] += chain_acc["pkts"]
acc["bytes"] += chain_acc["bytes"]
accs[label_id] = acc
for router_id in routers_to_reconfigure:
del self.routers[router_id]
return accs
开发者ID:openstack,项目名称:neutron,代码行数:32,代码来源:iptables_driver.py
示例10: get_traffic_counters
def get_traffic_counters(self, context, routers):
accs = {}
for router in routers:
rm = self.routers.get(router['id'])
if not rm:
continue
for label_id, label in rm.metering_labels.items():
try:
chain = iptables_manager.get_chain_name(WRAP_NAME +
LABEL +
label_id,
wrap=False)
chain_acc = rm.iptables_manager.get_traffic_counters(
chain, wrap=False, zero=True)
except RuntimeError:
LOG.exception(_LE('Failed to get traffic counters, '
'router: %s'), router)
continue
if not chain_acc:
continue
acc = accs.get(label_id, {'pkts': 0, 'bytes': 0})
acc['pkts'] += chain_acc['pkts']
acc['bytes'] += chain_acc['bytes']
accs[label_id] = acc
return accs
开发者ID:Akanksha08,项目名称:neutron,代码行数:32,代码来源:iptables_driver.py
示例11: get_traffic_counters
def get_traffic_counters(self, context, routers):
accs = {}
for router in routers:
rm = self.routers.get(router['id'])
if not rm:
continue
for label_id, label in rm.metering_labels.items():
chain = iptables_manager.get_chain_name(WRAP_NAME + LABEL +
label_id, wrap=False)
chain_acc = rm.iptables_manager.get_traffic_counters(
chain, wrap=False, zero=True)
if not chain_acc:
continue
acc = accs.get(label_id, {'pkts': 0, 'bytes': 0})
acc['pkts'] += chain_acc['pkts']
acc['bytes'] += chain_acc['bytes']
accs[label_id] = acc
return accs
开发者ID:chrisacbr,项目名称:openstack-neutron,代码行数:25,代码来源:iptables_driver.py
示例12: _process_ns_specific_disassociate_metering_label
def _process_ns_specific_disassociate_metering_label(self, router, im):
'''Disassociate metering label based on specific namespaces.'''
rm = self.routers.get(router['id'])
with IptablesManagerTransaction(im):
labels = router.get(constants.METERING_LABEL_KEY, [])
for label in labels:
label_id = label['id']
if label_id not in rm.metering_labels:
continue
label_chain = iptables_manager.get_chain_name(
WRAP_NAME + LABEL + label_id, wrap=False)
rules_chain = iptables_manager.get_chain_name(
WRAP_NAME + RULE + label_id, wrap=False)
im.ipv4['filter'].remove_chain(label_chain, wrap=False)
im.ipv4['filter'].remove_chain(rules_chain, wrap=False)
开发者ID:openstack,项目名称:neutron,代码行数:16,代码来源:iptables_driver.py
示例13: _update_metering_label_rules_based_on_ns
def _update_metering_label_rules_based_on_ns(self, router, ext_dev, im):
'''Update metering lable rules based on namespace.'''
with IptablesManagerTransaction(im):
labels = router.get(constants.METERING_LABEL_KEY, [])
for label in labels:
label_id = label['id']
label_chain = iptables_manager.get_chain_name(
WRAP_NAME + LABEL + label_id, wrap=False)
rules_chain = iptables_manager.get_chain_name(
WRAP_NAME + RULE + label_id, wrap=False)
im.ipv4['filter'].empty_chain(rules_chain, wrap=False)
rules = label.get('rules')
if rules:
self._process_metering_label_rules(
rules, label_chain, rules_chain, ext_dev, im)
开发者ID:openstack,项目名称:neutron,代码行数:17,代码来源:iptables_driver.py
示例14: _update_metering_label_rules
def _update_metering_label_rules(self, router):
rm = self.routers.get(router["id"])
if not rm:
return
with IptablesManagerTransaction(rm.iptables_manager):
labels = router.get(constants.METERING_LABEL_KEY, [])
for label in labels:
label_id = label["id"]
label_chain = iptables_manager.get_chain_name(WRAP_NAME + LABEL + label_id, wrap=False)
rules_chain = iptables_manager.get_chain_name(WRAP_NAME + RULE + label_id, wrap=False)
rm.iptables_manager.ipv4["filter"].empty_chain(rules_chain, wrap=False)
rules = label.get("rules")
if rules:
self._process_metering_label_rules(rm, rules, label_chain, rules_chain)
开发者ID:CiscoSystems,项目名称:neutron,代码行数:17,代码来源:iptables_driver.py
示例15: _test_rule_with_wrap_target_helper
def _test_rule_with_wrap_target_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(root_helper=self.root_helper, use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
name = "0123456789" * 5
wrap = "%s-%s" % (iptables_manager.binary_name, iptables_manager.get_chain_name(name))
iptables_args = {"bn": iptables_manager.binary_name, "wrap": wrap}
filter_dump_mod = (
"# Generated by iptables_manager\n"
"*filter\n"
":neutron-filter-top - [0:0]\n"
":%(wrap)s - [0:0]\n"
":%(bn)s-FORWARD - [0:0]\n"
":%(bn)s-INPUT - [0:0]\n"
":%(bn)s-OUTPUT - [0:0]\n"
":%(bn)s-local - [0:0]\n"
"[0:0] -A FORWARD -j neutron-filter-top\n"
"[0:0] -A OUTPUT -j neutron-filter-top\n"
"[0:0] -A neutron-filter-top -j %(bn)s-local\n"
"[0:0] -A INPUT -j %(bn)s-INPUT\n"
"[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n"
"[0:0] -A FORWARD -j %(bn)s-FORWARD\n"
"[0:0] -A %(bn)s-INPUT -s 0/0 -d 192.168.0.2 -j "
"%(wrap)s\n"
"COMMIT\n"
"# Completed by iptables_manager\n" % iptables_args
)
expected_calls_and_values = [
(mock.call(["iptables-save", "-c"], root_helper=self.root_helper), ""),
(
mock.call(
["iptables-restore", "-c"],
process_input=RAW_DUMP + NAT_DUMP + filter_dump_mod,
root_helper=self.root_helper,
),
None,
),
(mock.call(["iptables-save", "-c"], root_helper=self.root_helper), ""),
(
mock.call(
["iptables-restore", "-c"],
process_input=RAW_DUMP + NAT_DUMP + FILTER_DUMP,
root_helper=self.root_helper,
),
None,
),
]
if use_ipv6:
self._extend_with_ip6tables_filter(expected_calls_and_values, FILTER_DUMP)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4["filter"].add_chain(name)
self.iptables.ipv4["filter"].add_rule("INPUT", "-s 0/0 -d 192.168.0.2 -j" " $%s" % name)
self.iptables.apply()
self.iptables.ipv4["filter"].remove_rule("INPUT", "-s 0/0 -d 192.168.0.2 -j" " $%s" % name)
self.iptables.ipv4["filter"].remove_chain(name)
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
开发者ID:hyunsun,项目名称:quantum,代码行数:65,代码来源:test_iptables_manager.py
示例16: _get_action_chain
def _get_action_chain(self, name):
binary_name = iptables_manager.binary_name
chain_name = iptables_manager.get_chain_name(name)
return '%s-%s' % (binary_name, chain_name)
开发者ID:openstack,项目名称:neutron-fwaas,代码行数:4,代码来源:iptables_fwaas_v2.py
示例17: _test_rule_with_wrap_target_helper
def _test_rule_with_wrap_target_helper(self, use_ipv6):
self.iptables = iptables_manager.IptablesManager(
use_ipv6=use_ipv6)
self.execute = mock.patch.object(self.iptables, "execute").start()
name = '0123456789' * 5
wrap = "%s-%s" % (iptables_manager.binary_name,
iptables_manager.get_chain_name(name))
iptables_args = {'bn': iptables_manager.binary_name,
'wrap': wrap}
filter_dump_mod = ('# Generated by iptables_manager\n'
'*filter\n'
':neutron-filter-top - [0:0]\n'
':%(wrap)s - [0:0]\n'
':%(bn)s-FORWARD - [0:0]\n'
':%(bn)s-INPUT - [0:0]\n'
':%(bn)s-OUTPUT - [0:0]\n'
':%(bn)s-local - [0:0]\n'
'[0:0] -A FORWARD -j neutron-filter-top\n'
'[0:0] -A OUTPUT -j neutron-filter-top\n'
'[0:0] -A neutron-filter-top -j %(bn)s-local\n'
'[0:0] -A INPUT -j %(bn)s-INPUT\n'
'[0:0] -A OUTPUT -j %(bn)s-OUTPUT\n'
'[0:0] -A FORWARD -j %(bn)s-FORWARD\n'
'[0:0] -A %(bn)s-INPUT -s 0/0 -d 192.168.0.2 -j '
'%(wrap)s\n'
'COMMIT\n'
'# Completed by iptables_manager\n'
% iptables_args)
expected_calls_and_values = [
(mock.call(['iptables-save', '-c'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-c'],
process_input=(RAW_DUMP + NAT_DUMP + MANGLE_DUMP +
filter_dump_mod),
run_as_root=True),
None),
(mock.call(['iptables-save', '-c'],
run_as_root=True),
''),
(mock.call(['iptables-restore', '-c'],
process_input=(RAW_DUMP + NAT_DUMP + MANGLE_DUMP +
FILTER_DUMP),
run_as_root=True),
None),
]
if use_ipv6:
self._extend_with_ip6tables_filter(expected_calls_and_values,
FILTER_DUMP)
tools.setup_mock_calls(self.execute, expected_calls_and_values)
self.iptables.ipv4['filter'].add_chain(name)
self.iptables.ipv4['filter'].add_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' $%s' % name)
self.iptables.apply()
self.iptables.ipv4['filter'].remove_rule('INPUT',
'-s 0/0 -d 192.168.0.2 -j'
' $%s' % name)
self.iptables.ipv4['filter'].remove_chain(name)
self.iptables.apply()
tools.verify_mock_calls(self.execute, expected_calls_and_values)
开发者ID:bradleyjones,项目名称:neutron,代码行数:70,代码来源:test_iptables_manager.py
示例18: _port_chain_name
def _port_chain_name(self, port, direction):
return iptables_manager.get_chain_name(
'%s%s' % (CHAIN_NAME_PREFIX[direction], port['device']))
开发者ID:Mierdin,项目名称:neutron,代码行数:3,代码来源:iptables_firewall.py
示例19: _port_chain_name
def _port_chain_name(self, port, direction):
return iptables_manager.get_chain_name("%s%s" % (CHAIN_NAME_PREFIX[direction], port["device"]))
开发者ID:prophaner,项目名称:neutron,代码行数:2,代码来源:iptables_firewall.py
注:本文中的neutron.agent.linux.iptables_manager.get_chain_name函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论