本文整理汇总了Python中neutron.agent.linux.utils.replace_file函数的典型用法代码示例。如果您正苦于以下问题:Python replace_file函数的具体用法?Python replace_file怎么用?Python replace_file使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了replace_file函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _generate_radvd_conf
def _generate_radvd_conf(self, router_ports):
radvd_conf = utils.get_conf_file_name(cfg.CONF.ra_confs,
self._router_id,
'radvd.conf',
True)
buf = six.StringIO()
for p in router_ports:
subnets = p.get('subnets', [])
v6_subnets = [subnet for subnet in subnets if
netaddr.IPNetwork(subnet['cidr']).version == 6]
if not v6_subnets:
continue
ra_modes = {subnet['ipv6_ra_mode'] for subnet in v6_subnets}
auto_config_prefixes = [subnet['cidr'] for subnet in v6_subnets if
subnet['ipv6_ra_mode'] == constants.IPV6_SLAAC or
subnet['ipv6_ra_mode'] == constants.DHCPV6_STATELESS]
interface_name = self._dev_name_helper(p['id'])
buf.write('%s' % CONFIG_TEMPLATE.render(
ra_modes=list(ra_modes),
interface_name=interface_name,
prefixes=auto_config_prefixes,
constants=constants))
utils.replace_file(radvd_conf, buf.getvalue())
return radvd_conf
开发者ID:Akanksha08,项目名称:neutron,代码行数:25,代码来源:ra.py
示例2: _write_notify_script
def _write_notify_script(self, state, script):
name = self._get_notifier_path(state)
utils.replace_file(name, script)
st = os.stat(name)
os.chmod(name, st.st_mode | stat.S_IEXEC)
return name
开发者ID:bradleyjones,项目名称:neutron,代码行数:7,代码来源:keepalived.py
示例3: _output_hosts_file
def _output_hosts_file(self):
"""Writes a dnsmasq compatible hosts file."""
r = re.compile('[:.]')
buf = six.StringIO()
for port in self.network.ports:
for alloc in port.fixed_ips:
name = 'host-%s.%s' % (r.sub('-', alloc.ip_address),
self.conf.dhcp_domain)
set_tag = ''
# (dzyu) Check if it is legal ipv6 address, if so, need wrap
# it with '[]' to let dnsmasq to distinguish MAC address from
# IPv6 address.
ip_address = alloc.ip_address
if netaddr.valid_ipv6(ip_address):
ip_address = '[%s]' % ip_address
if getattr(port, 'extra_dhcp_opts', False):
if self.version >= self.MINIMUM_VERSION:
set_tag = 'set:'
buf.write('%s,%s,%s,%s%s\n' %
(port.mac_address, name, ip_address,
set_tag, port.id))
else:
buf.write('%s,%s,%s\n' %
(port.mac_address, name, ip_address))
name = self.get_conf_file_name('host')
utils.replace_file(name, buf.getvalue())
return name
开发者ID:vijayendrabvs,项目名称:ssl-neutron,代码行数:30,代码来源:dhcp.py
示例4: _output_hosts_file
def _output_hosts_file(self):
"""Writes a dnsmasq compatible dhcp hosts file.
The generated file is sent to the --dhcp-hostsfile option of dnsmasq,
and lists the hosts on the network which should receive a dhcp lease.
Each line in this file is in the form::
'mac_address,FQDN,ip_address'
IMPORTANT NOTE: a dnsmasq instance does not resolve hosts defined in
this file if it did not give a lease to a host listed in it (e.g.:
multiple dnsmasq instances on the same network if this network is on
multiple network nodes). This file is only defining hosts which
should receive a dhcp lease, the hosts resolution in itself is
defined by the `_output_addn_hosts_file` method.
"""
buf = six.StringIO()
filename = self.get_conf_file_name('host')
LOG.debug('Building host file: %s', filename)
dhcp_enabled_subnet_ids = [s.id for s in self.network.subnets
if s.enable_dhcp]
# NOTE(ihrachyshka): the loop should not log anything inside it, to
# avoid potential performance drop when lots of hosts are dumped
for host_tuple in self._iter_hosts():
port, alloc, hostname, name, no_dhcp, no_opts = host_tuple
if no_dhcp:
if not no_opts and self._get_port_extra_dhcp_opts(port):
buf.write('%s,%s%s\n' %
(port.mac_address, 'set:', port.id))
continue
# don't write ip address which belongs to a dhcp disabled subnet.
if alloc.subnet_id not in dhcp_enabled_subnet_ids:
continue
ip_address = self._format_address_for_dnsmasq(alloc.ip_address)
if self._get_port_extra_dhcp_opts(port):
client_id = self._get_client_id(port)
if client_id and len(port.extra_dhcp_opts) > 1:
buf.write('%s,%s%s,%s,%s,%s%s\n' %
(port.mac_address, self._ID, client_id, name,
ip_address, 'set:', port.id))
elif client_id and len(port.extra_dhcp_opts) == 1:
buf.write('%s,%s%s,%s,%s\n' %
(port.mac_address, self._ID, client_id, name,
ip_address))
else:
buf.write('%s,%s,%s,%s%s\n' %
(port.mac_address, name, ip_address,
'set:', port.id))
else:
buf.write('%s,%s,%s\n' %
(port.mac_address, name, ip_address))
utils.replace_file(filename, buf.getvalue())
LOG.debug('Done building host file %s with contents:\n%s', filename,
buf.getvalue())
return filename
开发者ID:glove747,项目名称:liberty-neutron,代码行数:60,代码来源:dhcp.py
示例5: save_config
def save_config(conf_path, logical_config, socket_path=None):
"""Convert a logical configuration to the HAProxy version."""
data = []
data.extend(_build_global(logical_config, socket_path=socket_path))
data.extend(_build_defaults(logical_config))
data.extend(_build_frontend(logical_config))
data.extend(_build_backend(logical_config))
utils.replace_file(conf_path, '\n'.join(data))
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Neutron,代码行数:8,代码来源:cfg.py
示例6: ensure_config_file
def ensure_config_file(self, kind, template, vpnservice, file_mode=None):
"""Update config file, based on current settings for service."""
config_str = self._gen_config_content(template, vpnservice)
config_file_name = self._get_config_filename(kind)
if file_mode is None:
utils.replace_file(config_file_name, config_str)
else:
utils.replace_file(config_file_name, config_str, file_mode)
开发者ID:armando-migliaccio,项目名称:neutron-vpnaas,代码行数:8,代码来源:ipsec.py
示例7: _output_opts_file
def _output_opts_file(self):
"""Write a dnsmasq compatible options file."""
options, subnet_index_map = self._generate_opts_per_subnet()
options += self._generate_opts_per_port(subnet_index_map)
name = self.get_conf_file_name('opts')
utils.replace_file(name, '\n'.join(options))
return name
开发者ID:SmartInfrastructures,项目名称:neutron,代码行数:8,代码来源:dhcp.py
示例8: _output_opts_file
def _output_opts_file(self):
"""Write a dnsmasq compatible options file."""
if self.conf.enable_isolated_metadata:
subnet_to_interface_ip = self._make_subnet_interface_ip_map()
options = []
for i, subnet in enumerate(self.network.subnets):
if not subnet.enable_dhcp:
continue
if subnet.dns_nameservers:
options.append(
self._format_option(i, 'dns-server',
','.join(subnet.dns_nameservers)))
gateway = subnet.gateway_ip
host_routes = []
for hr in subnet.host_routes:
if hr.destination == "0.0.0.0/0":
gateway = hr.nexthop
else:
host_routes.append("%s,%s" % (hr.destination, hr.nexthop))
# Add host routes for isolated network segments
enable_metadata = (
self.conf.enable_isolated_metadata
and not subnet.gateway_ip
and subnet.ip_version == 4)
if enable_metadata:
subnet_dhcp_ip = subnet_to_interface_ip[subnet.id]
host_routes.append(
'%s/32,%s' % (METADATA_DEFAULT_IP, subnet_dhcp_ip)
)
if host_routes:
options.append(
self._format_option(i, 'classless-static-route',
','.join(host_routes)))
options.append(
self._format_option(i, WIN2k3_STATIC_DNS,
','.join(host_routes)))
if subnet.ip_version == 4:
if gateway:
options.append(self._format_option(i, 'router', gateway))
else:
options.append(self._format_option(i, 'router'))
for port in self.network.ports:
if getattr(port, 'extra_dhcp_opts', False):
options.extend(
self._format_option(port.id, opt.opt_name, opt.opt_value)
for opt in port.extra_dhcp_opts)
name = self.get_conf_file_name('opts')
utils.replace_file(name, '\n'.join(options))
return name
开发者ID:CampHarmony,项目名称:neutron,代码行数:58,代码来源:dhcp.py
示例9: save_config
def save_config(conf_path, logical_config, socket_path=None,
user_group='nogroup'):
"""Convert a logical configuration to the HAProxy version."""
data = []
data.extend(_build_global(logical_config, socket_path=socket_path,
user_group=user_group))
data.extend(_build_defaults(logical_config))
data.extend(_build_frontend(logical_config))
data.extend(_build_backend(logical_config))
utils.replace_file(conf_path, '\n'.join(data)) #替换/var/lib/neutron/lbaas/3195b1f3-a453-4c35-a4a2-aa63b2678765/conf
开发者ID:xiongmeng1108,项目名称:gcloud7_neutron-2014.2.2,代码行数:10,代码来源:cfg.py
示例10: _write_intf_file
def _write_intf_file(self):
global _devices
confs_dir = os.path.abspath(os.path.normpath(self.conf.dhcp_confs))
conf_dir = os.path.join(confs_dir, self.network.id)
if not os.path.isdir(conf_dir):
os.makedirs(conf_dir, 0o755)
intf_filename = os.path.join(conf_dir, 'interface')
if self.network.id not in _devices:
return
ifname = _devices[self.network.id]
utils.replace_file(intf_filename, ifname)
开发者ID:abattye,项目名称:networking-cisco,代码行数:11,代码来源:dhcp_driver.py
示例11: _output_hosts_file
def _output_hosts_file(self):
"""Writes a dnsmasq compatible dhcp hosts file.
The generated file is sent to the --dhcp-hostsfile option of dnsmasq,
and lists the hosts on the network which should receive a dhcp lease.
Each line in this file is in the form::
'mac_address,FQDN,ip_address'
IMPORTANT NOTE: a dnsmasq instance does not resolve hosts defined in
this file if it did not give a lease to a host listed in it (e.g.:
multiple dnsmasq instances on the same network if this network is on
multiple network nodes). This file is only defining hosts which
should receive a dhcp lease, the hosts resolution in itself is
defined by the `_output_addn_hosts_file` method.
"""
buf = six.StringIO()
filename = self.get_conf_file_name('host')
LOG.debug('Building host file: %s', filename)
dhcp_enabled_subnet_ids = [s.id for s in self.network.subnets
if s.enable_dhcp]
# NOTE(ihrachyshka): the loop should not log anything inside it, to
# avoid potential performance drop when lots of hosts are dumped
for (port, alloc, hostname, name) in self._iter_hosts():
if not alloc:
if getattr(port, 'extra_dhcp_opts', False):
buf.write('%s,%s%s\n' %
(port.mac_address, 'set:', port.id))
continue
# don't write ip address which belongs to a dhcp disabled subnet.
if alloc.subnet_id not in dhcp_enabled_subnet_ids:
continue
# (dzyu) Check if it is legal ipv6 address, if so, need wrap
# it with '[]' to let dnsmasq to distinguish MAC address from
# IPv6 address.
ip_address = alloc.ip_address
if netaddr.valid_ipv6(ip_address):
ip_address = '[%s]' % ip_address
if getattr(port, 'extra_dhcp_opts', False):
buf.write('%s,%s,%s,%s%s\n' %
(port.mac_address, name, ip_address,
'set:', port.id))
else:
buf.write('%s,%s,%s\n' %
(port.mac_address, name, ip_address))
utils.replace_file(filename, buf.getvalue())
LOG.debug('Done building host file %s with contents:\n%s', filename,
buf.getvalue())
return filename
开发者ID:SmartInfrastructures,项目名称:neutron,代码行数:54,代码来源:dhcp.py
示例12: _store_listener_crt
def _store_listener_crt(haproxy_base_dir, listener, cert):
"""Store TLS certificate
:param haproxy_base_dir: location of the instances state data
:param listener: the listener object
:param cert: the TLS certificate
:return: location of the stored certificate
"""
cert_path = _retrieve_crt_path(haproxy_base_dir, listener,
cert.primary_cn)
# build a string that represents the pem file to be saved
pem = _build_pem(cert)
utils.replace_file(cert_path, pem)
return cert_path
开发者ID:allen-yin,项目名称:neutron-lbaas,代码行数:14,代码来源:jinja_cfg.py
示例13: _output_hosts_file
def _output_hosts_file(self):
"""Writes a dnsmasq compatible hosts file."""
r = re.compile('[:.]')
buf = StringIO.StringIO()
for port in self.network.ports:
for alloc in port.fixed_ips:
name = 'host-%s.%s' % (r.sub('-', alloc.ip_address),
self.conf.dhcp_domain)
buf.write('%s,%s,%s\n' %
(port.mac_address, name, alloc.ip_address))
name = self.get_conf_file_name('host')
utils.replace_file(name, buf.getvalue())
return name
开发者ID:abhirajbutala,项目名称:neutron,代码行数:15,代码来源:dhcp.py
示例14: save_config
def save_config(conf_path, loadbalancer, socket_path, user_group,
haproxy_base_dir):
"""Convert a logical configuration to the HAProxy version.
:param conf_path: location of Haproxy configuration
:param loadbalancer: the load balancer object
:param socket_path: location of haproxy socket data
:param user_group: user group
:param haproxy_base_dir: location of the instances state data
"""
config_str = render_loadbalancer_obj(loadbalancer,
user_group,
socket_path,
haproxy_base_dir)
utils.replace_file(conf_path, config_str)
开发者ID:allen-yin,项目名称:neutron-lbaas,代码行数:15,代码来源:jinja_cfg.py
示例15: test_replace_file
def test_replace_file(self):
# make file to replace
with mock.patch('tempfile.NamedTemporaryFile') as ntf:
ntf.return_value.name = '/baz'
with mock.patch('os.chmod') as chmod:
with mock.patch('os.rename') as rename:
utils.replace_file('/foo', 'bar')
expected = [mock.call('w+', dir='/', delete=False),
mock.call().write('bar'),
mock.call().close()]
ntf.assert_has_calls(expected)
chmod.assert_called_once_with('/baz', 0o644)
rename.assert_called_once_with('/baz', '/foo')
开发者ID:bradleyjones,项目名称:neutron,代码行数:15,代码来源:test_utils.py
示例16: save_config
def save_config(conf_path, logical_config):
"""Convert a logical configuration to the SEnginx version."""
protocol = logical_config['vip']['protocol']
if not protocol:
return
data = []
data.extend(_build_global(logical_config))
# build protocol specified configs
if PROTOCOL_MAP[protocol] == "http":
data.extend(_build_http(logical_config))
else:
data.extend(_build_tcp(logical_config))
utils.replace_file(conf_path, '\n'.join(data))
开发者ID:112358wise,项目名称:SEnginx-LBaaS-Driver,代码行数:16,代码来源:cfg.py
示例17: _output_hosts_file
def _output_hosts_file(self):
"""Writes a dnsmasq compatible dhcp hosts file.
The generated file is sent to the --dhcp-hostsfile option of dnsmasq,
and lists the hosts on the network which should receive a dhcp lease.
Each line in this file is in the form::
'mac_address,FQDN,ip_address'
IMPORTANT NOTE: a dnsmasq instance does not resolve hosts defined in
this file if it did not give a lease to a host listed in it (e.g.:
multiple dnsmasq instances on the same network if this network is on
multiple network nodes). This file is only defining hosts which
should receive a dhcp lease, the hosts resolution in itself is
defined by the `_output_addn_hosts_file` method.
"""
buf = six.StringIO()
filename = self.get_conf_file_name('host')
LOG.debug(_('Building host file: %s'), filename)
for (port, alloc, hostname, name) in self._iter_hosts():
set_tag = ''
# (dzyu) Check if it is legal ipv6 address, if so, need wrap
# it with '[]' to let dnsmasq to distinguish MAC address from
# IPv6 address.
ip_address = alloc.ip_address
if netaddr.valid_ipv6(ip_address):
ip_address = '[%s]' % ip_address
LOG.debug(_('Adding %(mac)s : %(name)s : %(ip)s'),
{"mac": port.mac_address, "name": name,
"ip": ip_address})
if getattr(port, 'extra_dhcp_opts', False):
if self.version >= self.MINIMUM_VERSION:
set_tag = 'set:'
buf.write('%s,%s,%s,%s%s\n' %
(port.mac_address, name, ip_address,
set_tag, port.id))
else:
buf.write('%s,%s,%s\n' %
(port.mac_address, name, ip_address))
utils.replace_file(filename, buf.getvalue())
LOG.debug(_('Done building host file %s'), filename)
return filename
开发者ID:Mierdin,项目名称:neutron,代码行数:47,代码来源:dhcp.py
示例18: _generate_radvd_conf
def _generate_radvd_conf(router_id, router_ports, dev_name_helper):
radvd_conf = utils.get_conf_file_name(cfg.CONF.ra_confs, router_id, "radvd.conf", True)
buf = six.StringIO()
for p in router_ports:
prefix = p["subnet"]["cidr"]
if netaddr.IPNetwork(prefix).version == 6:
interface_name = dev_name_helper(p["id"])
ra_mode = p["subnet"]["ipv6_ra_mode"]
buf.write(
"%s"
% CONFIG_TEMPLATE.render(
ra_mode=ra_mode, interface_name=interface_name, prefix=prefix, constants=constants
)
)
utils.replace_file(radvd_conf, buf.getvalue())
return radvd_conf
开发者ID:noironetworks,项目名称:neutron2,代码行数:17,代码来源:ra.py
示例19: _output_addn_hosts_file
def _output_addn_hosts_file(self):
"""Writes a dnsmasq compatible additional hosts file.
The generated file is sent to the --addn-hosts option of dnsmasq,
and lists the hosts on the network which should be resolved even if
the dnsmaq instance did not give a lease to the host (see the
`_output_hosts_file` method).
Each line in this file is in the same form as a standard /etc/hosts
file.
"""
buf = six.StringIO()
for (port, alloc, hostname, fqdn) in self._iter_hosts():
# It is compulsory to write the `fqdn` before the `hostname` in
# order to obtain it in PTR responses.
buf.write('%s\t%s %s\n' % (alloc.ip_address, fqdn, hostname))
addn_hosts = self.get_conf_file_name('addn_hosts')
utils.replace_file(addn_hosts, buf.getvalue())
return addn_hosts
开发者ID:Mierdin,项目名称:neutron,代码行数:18,代码来源:dhcp.py
示例20: _generate_radvd_conf
def _generate_radvd_conf(router_id, router_ports, dev_name_helper):
radvd_conf = utils.get_conf_file_name(cfg.CONF.ra_confs,
router_id,
'radvd.conf',
True)
buf = six.StringIO()
for p in router_ports:
if netaddr.IPNetwork(p['subnet']['cidr']).version == 6:
interface_name = dev_name_helper(p['id'])
if _is_slaac(p['subnet']['ipv6_ra_mode']):
conf_str = prefix_fmt % (interface_name,
p['subnet']['cidr'])
else:
conf_str = default_fmt % interface_name
buf.write('%s' % conf_str)
utils.replace_file(radvd_conf, buf.getvalue())
return radvd_conf
开发者ID:CiscoSystems,项目名称:neutron,代码行数:18,代码来源:ra.py
注:本文中的neutron.agent.linux.utils.replace_file函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论