本文整理汇总了Python中neutron.openstack.common.timeutils.utcnow函数的典型用法代码示例。如果您正苦于以下问题:Python utcnow函数的具体用法?Python utcnow怎么用?Python utcnow使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了utcnow函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: run_periodic_tasks
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
full_task_name = '.'.join([self.__class__.__name__, task_name])
now = timeutils.utcnow()
spacing = self._periodic_spacing[task_name]
last_run = self._periodic_last_run[task_name]
# If a periodic task is _nearly_ due, then we'll run it early
if spacing is not None and last_run is not None:
due = last_run + datetime.timedelta(seconds=spacing)
if not timeutils.is_soon(due, 0.2):
idle_for = min(idle_for, timeutils.delta_seconds(now, due))
continue
if spacing is not None:
idle_for = min(idle_for, spacing)
LOG.debug(_("Running periodic task %(full_task_name)s"),
{"full_task_name": full_task_name})
self._periodic_last_run[task_name] = timeutils.utcnow()
try:
task(self, context)
except Exception as e:
if raise_on_error:
raise
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
{"full_task_name": full_task_name, "e": e})
time.sleep(0)
return idle_for
开发者ID:50infivedays,项目名称:neutron,代码行数:34,代码来源:periodic_task.py
示例2: _inner
def _inner():
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
start = timeutils.utcnow()
self.f(*self.args, **self.kw)
end = timeutils.utcnow()
if not self._running:
break
delay = interval - timeutils.delta_seconds(start, end)
if delay <= 0:
LOG.warn(_('task run outlasted interval by %s sec') %
-delay)
greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone as e:
self.stop()
done.send(e.retvalue)
except Exception:
LOG.exception(_('in fixed duration looping call'))
done.send_exception(*sys.exc_info())
return
else:
done.send(True)
开发者ID:50infivedays,项目名称:neutron,代码行数:25,代码来源:loopingcall.py
示例3: reschedule_routers_from_down_agents
def reschedule_routers_from_down_agents(self):
"""Reschedule routers from down l3 agents if admin state is up."""
# give agents extra time to handle transient failures
agent_dead_limit = cfg.CONF.agent_down_time * 2
# check for an abrupt clock change since last check. if a change is
# detected, sleep for a while to let the agents check in.
tdelta = timeutils.utcnow() - getattr(self, '_clock_jump_canary',
timeutils.utcnow())
if timeutils.total_seconds(tdelta) > cfg.CONF.agent_down_time:
LOG.warn(_LW("Time since last L3 agent reschedule check has "
"exceeded the interval between checks. Waiting "
"before check to allow agents to send a heartbeat "
"in case there was a clock adjustment."))
time.sleep(agent_dead_limit)
self._clock_jump_canary = timeutils.utcnow()
context = n_ctx.get_admin_context()
cutoff = timeutils.utcnow() - datetime.timedelta(
seconds=agent_dead_limit)
down_bindings = (
context.session.query(RouterL3AgentBinding).
join(agents_db.Agent).
filter(agents_db.Agent.heartbeat_timestamp < cutoff,
agents_db.Agent.admin_state_up))
for binding in down_bindings:
LOG.warn(_LW("Rescheduling router %(router)s from agent %(agent)s "
"because the agent did not report to the server in "
"the last %(dead_time)s seconds."),
{'router': binding.router_id,
'agent': binding.l3_agent_id,
'dead_time': agent_dead_limit})
self.reschedule_router(context, binding.router_id)
开发者ID:COSHPC,项目名称:neutron,代码行数:34,代码来源:l3_agentschedulers_db.py
示例4: reschedule_routers_from_down_agents
def reschedule_routers_from_down_agents(self):
"""Reschedule routers from down l3 agents if admin state is up."""
# give agents extra time to handle transient failures
agent_dead_limit = cfg.CONF.agent_down_time * 2
# check for an abrupt clock change since last check. if a change is
# detected, sleep for a while to let the agents check in.
tdelta = timeutils.utcnow() - getattr(self, "_clock_jump_canary", timeutils.utcnow())
if timeutils.total_seconds(tdelta) > cfg.CONF.agent_down_time:
LOG.warn(
_LW(
"Time since last L3 agent reschedule check has "
"exceeded the interval between checks. Waiting "
"before check to allow agents to send a heartbeat "
"in case there was a clock adjustment."
)
)
time.sleep(agent_dead_limit)
self._clock_jump_canary = timeutils.utcnow()
context = n_ctx.get_admin_context()
cutoff = timeutils.utcnow() - datetime.timedelta(seconds=agent_dead_limit)
down_bindings = (
context.session.query(RouterL3AgentBinding)
.join(agents_db.Agent)
.filter(agents_db.Agent.heartbeat_timestamp < cutoff, agents_db.Agent.admin_state_up)
.outerjoin(
l3_attrs_db.RouterExtraAttributes,
l3_attrs_db.RouterExtraAttributes.router_id == RouterL3AgentBinding.router_id,
)
.filter(
sa.or_(
l3_attrs_db.RouterExtraAttributes.ha == sql.false(),
l3_attrs_db.RouterExtraAttributes.ha == sql.null(),
)
)
)
try:
for binding in down_bindings:
LOG.warn(
_LW(
"Rescheduling router %(router)s from agent %(agent)s "
"because the agent did not report to the server in "
"the last %(dead_time)s seconds."
),
{"router": binding.router_id, "agent": binding.l3_agent_id, "dead_time": agent_dead_limit},
)
try:
self.reschedule_router(context, binding.router_id)
except (l3agentscheduler.RouterReschedulingFailed, n_rpc.RemoteError):
# Catch individual router rescheduling errors here
# so one broken one doesn't stop the iteration.
LOG.exception(_LE("Failed to reschedule router %s"), binding.router_id)
except db_exc.DBError:
# Catch DB errors here so a transient DB connectivity issue
# doesn't stop the loopingcall.
LOG.exception(_LE("Exception encountered during router " "rescheduling."))
开发者ID:nash-x,项目名称:hws,代码行数:58,代码来源:l3_agentschedulers_db.py
示例5: upgrade
def upgrade():
ip_policy = table('quark_ip_policy',
column('id', sa.String(length=36)),
column('tenant_id', sa.String(length=255)),
column('created_at', sa.DateTime()))
ip_policy_cidrs = table('quark_ip_policy_cidrs',
column('id', sa.String(length=36)),
column('created_at', sa.DateTime()),
column('ip_policy_id', sa.String(length=36)),
column('cidr', sa.String(length=64)))
subnets = table('quark_subnets',
column('id', sa.String(length=36)),
column('_cidr', sa.String(length=64)),
column('tenant_id', sa.String(length=255)),
column('ip_policy_id', sa.String(length=36)))
connection = op.get_bind()
# 1. Find all subnets without ip_policy.
data = connection.execute(select([
subnets.c.id, subnets.c._cidr, subnets.c.tenant_id]).where(
subnets.c.ip_policy_id == None)).fetchall() # noqa
if not data:
return
LOG.info("Subnet IDs without IP policies: %s", [d[0] for d in data])
# 2. Insert ip_policy rows with id.
vals = [dict(id=uuidutils.generate_uuid(),
created_at=timeutils.utcnow(),
tenant_id=tenant_id)
for id, cidr, tenant_id in data]
LOG.info("IP Policy IDs to insert: %s", [v["id"] for v in vals])
connection.execute(ip_policy.insert(), *vals)
# 3. Insert default ip_policy_cidrs for those ip_policy's.
vals2 = []
for ((id, cidr, tenant_id), ip_policy) in zip(data, vals):
cidrs = []
ip_policies.ensure_default_policy(cidrs, [dict(cidr=cidr)])
for cidr in cidrs:
vals2.append(dict(id=uuidutils.generate_uuid(),
created_at=timeutils.utcnow(),
ip_policy_id=ip_policy["id"],
cidr=str(cidr)))
LOG.info("IP Policy CIDR IDs to insert: %s", [v["id"] for v in vals2])
connection.execute(ip_policy_cidrs.insert(), *vals2)
# 4. Set ip_policy_id rows in quark_subnets.
for ((id, cidr, tenant_id), ip_policy) in zip(data, vals):
connection.execute(subnets.update().values(
ip_policy_id=ip_policy["id"]).where(
subnets.c.id == id))
开发者ID:anilkumarkodi,项目名称:quark,代码行数:55,代码来源:552b213c2b8c_default_policy_without_policy.py
示例6: _get_agents
def _get_agents(self, hosts):
return [
agents_db.Agent(
binary='neutron-dhcp-agent',
host=host,
topic=topics.DHCP_AGENT,
configurations="",
agent_type=constants.AGENT_TYPE_DHCP,
created_at=timeutils.utcnow(),
started_at=timeutils.utcnow(),
heartbeat_timestamp=timeutils.utcnow())
for host in hosts
]
开发者ID:AsherBond,项目名称:quantum,代码行数:13,代码来源:test_dhcp_scheduler.py
示例7: test_schedule_router_distributed
def test_schedule_router_distributed(self):
scheduler = l3_agent_scheduler.ChanceScheduler()
agent = agents_db.Agent()
agent.admin_state_up = True
agent.heartbeat_timestamp = timeutils.utcnow()
sync_router = {
'id': 'foo_router_id',
'distributed': True
}
plugin = mock.Mock()
plugin.get_router.return_value = sync_router
plugin.get_l3_agents_hosting_routers.return_value = []
plugin.get_l3_agents.return_value = [agent]
plugin.get_l3_agent_candidates.return_value = [agent]
with mock.patch.object(scheduler, 'bind_router'):
scheduler._schedule_router(
plugin, self.adminContext,
'foo_router_id', None, {'gw_exists': True})
expected_calls = [
mock.call.get_router(mock.ANY, 'foo_router_id'),
mock.call.schedule_snat_router(
mock.ANY, 'foo_router_id', sync_router, True),
mock.call.get_l3_agents_hosting_routers(
mock.ANY, ['foo_router_id'], admin_state_up=True),
mock.call.get_l3_agents(mock.ANY, active=True),
mock.call.get_l3_agent_candidates(
mock.ANY, sync_router, [agent], None),
]
plugin.assert_has_calls(expected_calls)
开发者ID:hichihara,项目名称:neutron,代码行数:29,代码来源:test_l3_schedulers.py
示例8: create_openvpnconnection
def create_openvpnconnection(self, context, openvpnconnection):
openvpnconnection = openvpnconnection['openvpnconnection']
tenant_id = self._get_tenant_id_for_create(context,
openvpnconnection)
l3_plugin = manager.NeutronManager.get_service_plugins().get(
constants.L3_ROUTER_NAT)
if not l3_plugin:
raise openvpn.RouterExtNotFound()
openvpn_conns = self.get_openvpnconnections(
context, filters={'router_id': [openvpnconnection['router_id']]})
if openvpn_conns:
raise openvpn.OpenvpnInExists(router_id=openvpnconnection['router_id'])
external = self.get_external(context, openvpnconnection['router_id'],
openvpnconnection['peer_cidr'])
openvpn_id = uuidutils.generate_uuid()
openvpnconnection.update(external)
openvpnconnection.update({'id':openvpn_id})
ta_key_info = ca.OpenVPNDBDrv().generate_client_ca(openvpn_id)
openvpn_file = ca.OpenVPNFile(openvpnconnection)
zip_contents = openvpn_file.generate_zip_file()
#l3_plugin.get_router(context, openvpnconnection['router_id'])
with context.session.begin(subtransactions=True):
self._validate_peer_vpn_cidr(context,
openvpnconnection['router_id'],
openvpnconnection['peer_cidr'])
openvpnconn_db = OpenVPNConnection(
id=openvpn_id,
tenant_id=tenant_id,
name=openvpnconnection['name'],
description=openvpnconnection['description'],
peer_cidr=openvpnconnection['peer_cidr'],
port=openvpnconnection['port'],
protocol=openvpnconnection['protocol'],
router_id=openvpnconnection['router_id'],
admin_state_up=openvpnconnection['admin_state_up'],
status=constants.DOWN,
created_at=timeutils.utcnow(),
ta_key=ta_key_info['ta_key'],
zip_file=zip_contents,
)
utils.make_default_name(openvpnconn_db, uos_constants.UOS_PRE_OPENVPN)
context.session.add(openvpnconn_db)
openvpn_cons = self._make_openvpn_ca_dict(openvpnconn_db)
openvpn_cons.update(external)
LOG.debug(_('openvpn service info %s in db '),
openvpn_cons)
#remove all file of client
openvpn_file.remove_all_file()
if self.openvpn_driver:
self.openvpn_driver.create_vpnservice(context, openvpn_cons)
return self._make_openvpnconnection_dict(openvpnconn_db)
开发者ID:CingHu,项目名称:neutron-ustack,代码行数:60,代码来源:openvpn_db.py
示例9: _create_or_update_agent
def _create_or_update_agent(self, context, agent):
with context.session.begin(subtransactions=True):
res_keys = ["agent_type", "binary", "host", "topic"]
res = dict((k, agent[k]) for k in res_keys)
configurations_dict = agent.get("configurations", {})
res["configurations"] = jsonutils.dumps(configurations_dict)
current_time = timeutils.utcnow()
try:
agent_db = self._get_agent_by_type_and_host(context, agent["agent_type"], agent["host"])
res["heartbeat_timestamp"] = current_time
if agent.get("start_flag"):
res["started_at"] = current_time
greenthread.sleep(0)
agent_db.update(res)
except ext_agent.AgentNotFoundByTypeHost:
greenthread.sleep(0)
res["created_at"] = current_time
res["started_at"] = current_time
res["heartbeat_timestamp"] = current_time
res["admin_state_up"] = True
agent_db = Agent(**res)
greenthread.sleep(0)
context.session.add(agent_db)
greenthread.sleep(0)
开发者ID:asadoughi,项目名称:neutron,代码行数:25,代码来源:agents_db.py
示例10: attempt_to_reallocate_ip
def attempt_to_reallocate_ip(self, context, net_id, port_id, reuse_after,
version=None, ip_address=None):
version = version or [4, 6]
elevated = context.elevated()
# We never want to take the chance of an infinite loop here. Instead,
# we'll clean up multiple bad IPs if we find them (assuming something
# is really wrong)
for times in xrange(3):
with context.session.begin(subtransactions=True):
address = db_api.ip_address_find(
elevated, network_id=net_id, reuse_after=reuse_after,
deallocated=True, scope=db_api.ONE, ip_address=ip_address,
lock_mode=True, version=version, order_by="address")
if address:
#NOTE(mdietz): We should always be in the CIDR but we've
# also said that before :-/
if address.get("subnet"):
cidr = netaddr.IPNetwork(address["subnet"]["cidr"])
addr = netaddr.IPAddress(int(address["address"]),
version=int(cidr.version))
if addr in cidr:
updated_address = db_api.ip_address_update(
elevated, address, deallocated=False,
deallocated_at=None,
allocated_at=timeutils.utcnow())
return [updated_address]
else:
# Make sure we never find it again
context.session.delete(address)
continue
break
return []
开发者ID:blamarvt,项目名称:quark,代码行数:34,代码来源:ipam.py
示例11: _process_router_update
def _process_router_update(self):
for rp, update in self._queue.each_update_to_next_router():
LOG.debug("Starting router update for %s", update.id)
router = update.router
if update.action != DELETE_ROUTER and not router:
try:
update.timestamp = timeutils.utcnow()
routers = self.plugin_rpc.get_routers(self.context,
[update.id])
except Exception:
msg = _("Failed to fetch router information for '%s'")
LOG.exception(msg, update.id)
self.fullsync = True
continue
if routers:
router = routers[0]
if not router:
self._router_removed(update.id)
continue
self._process_routers([router])
LOG.debug("Finished a router update for %s", update.id)
rp.fetched_and_processed(update.timestamp)
开发者ID:HybridCloud-dew,项目名称:hws,代码行数:25,代码来源:l3_highperformance_agent.py
示例12: _create_or_update_agent
def _create_or_update_agent(self, context, agent):
with context.session.begin(subtransactions=True):
res_keys = ['agent_type', 'binary', 'host', 'topic']
res = dict((k, agent[k]) for k in res_keys)
configurations_dict = agent.get('configurations', {})
res['configurations'] = jsonutils.dumps(configurations_dict)
current_time = timeutils.utcnow()
try:
agent_db = self._get_agent_by_type_and_host(
context, agent['agent_type'], agent['host'])
res['heartbeat_timestamp'] = current_time
if agent.get('start_flag'):
res['started_at'] = current_time
greenthread.sleep(0)
agent_db.update(res)
except ext_agent.AgentNotFoundByTypeHost:
greenthread.sleep(0)
res['created_at'] = current_time
res['started_at'] = current_time
res['heartbeat_timestamp'] = current_time
res['admin_state_up'] = True
agent_db = Agent(**res)
greenthread.sleep(0)
context.session.add(agent_db)
greenthread.sleep(0)
开发者ID:nkapotoxin,项目名称:fs_spc111t_plus_hc,代码行数:26,代码来源:agents_db.py
示例13: create_pptpconnection
def create_pptpconnection(self, context, pptpconnection):
pptpconnection = pptpconnection['pptpconnection']
tenant_id = self._get_tenant_id_for_create(context,
pptpconnection)
l3_plugin = manager.NeutronManager.get_service_plugins().get(
constants.L3_ROUTER_NAT)
if not l3_plugin:
raise pptpvpnaas.RouterExtNotFound()
l3_plugin.get_router(context, pptpconnection['router_id'])
with context.session.begin(subtransactions=True):
self._validate_vpn_cidr(context,
pptpconnection['router_id'],
pptpconnection['vpn_cidr'])
pptpconn_db = PPTPConnection(
id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=pptpconnection['name'],
description=pptpconnection['description'],
vpn_cidr=pptpconnection['vpn_cidr'],
router_id=pptpconnection['router_id'],
admin_state_up=pptpconnection['admin_state_up'],
status=constants.DOWN,
created_at=timeutils.utcnow(),
)
utils.make_default_name(pptpconn_db, uos_constants.UOS_PRE_PPTP)
context.session.add(pptpconn_db)
result = self._make_pptpconnection_dict(pptpconn_db)
if self.pptp_driver:
self.pptp_driver.create_vpnservice(context, result)
return result
开发者ID:CingHu,项目名称:neutron-ustack,代码行数:30,代码来源:pptp_vpn_db.py
示例14: is_device_reachable
def is_device_reachable(self, device):
"""Check the device which hosts this resource is reachable.
If the resource is not reachable, it is added to the backlog.
:param device : dict of the device
:return True if device is reachable, else None
"""
hd = device
hd_id = device['id']
mgmt_url = device.get('mgmt_url', None)
if mgmt_url:
hd_mgmt_ip = mgmt_url.get('ip_address', None)
device['created_at'] = datetime.datetime.strptime(
device['created_at'], '%Y-%m-%dT%H:%M:%S.000000')
if hd_id not in self.backlog_devices.keys():
if _is_pingable(hd_mgmt_ip):
LOG.debug("Hosting device: %(hd_id)[email protected]%(ip)s is reachable.",
{'hd_id': hd_id, 'ip': hd_mgmt_ip})
return True
LOG.warn("Hosting device: %(hd_id)[email protected]%(ip)s is NOT reachable.",
{'hd_id': hd_id, 'ip': hd_mgmt_ip})
#hxn add
hd['backlog_insertion_ts'] = max(
timeutils.utcnow(),
hd['created_at'] +
datetime.timedelta(seconds=BOOT_TIME_INTERVAL))
self.backlog_devices[hd_id] = hd
LOG.debug("Hosting device: %(hd_id)s @ %(ip)s is now added "
"to backlog", {'hd_id': hd_id, 'ip': hd_mgmt_ip})
else:
LOG.debug("Hosting device: %(hd_id)s can not added "
"to backlog, because of no mgmt_ip", {'hd_id': hd_id})
开发者ID:CingHu,项目名称:neutron-ustack,代码行数:34,代码来源:device_status.py
示例15: make_active_agent
def make_active_agent(fake_id, fake_agent_type, config=None):
agent_dict = dict(id=fake_id,
agent_type=fake_agent_type,
host='localhost_' + str(fake_id),
heartbeat_timestamp=timeutils.utcnow(),
configurations=config)
return agent_dict
开发者ID:hp-networking,项目名称:ovsvapp,代码行数:7,代码来源:test_agent_monitor.py
示例16: create_pool
def create_pool(self, context, pool):
network = self._core_plugin.get_network(context, pool['network_id'])
subnet_id = pool.get('subnet_id', None)
if subnet_id:
subnet = self._core_plugin.get_subnet(context, subnet_id)
if subnet['network_id']!=pool['network_id']:
raise loadbalancerv2.NetworkSubnetIDMismatch(
subnet_id = subnet_id,
network_id = pool['network_id'])
with context.session.begin(subtransactions=True):
self._load_id_and_tenant_id(context, pool)
pool['status'] = constants.PENDING_CREATE
session_info = pool.pop('session_persistence')
healthmonitor_info = pool.pop('healthmonitor')
pool['created_at'] = timeutils.utcnow()
pool_db = models.PoolV2(**pool)
if session_info:
LOG.debug('_create_pool session_info %s',session_info)
s_p = self._create_session_persistence_db(session_info,
pool_db.id)
pool_db.session_persistence = s_p
if healthmonitor_info:
health_monitor = self._create_healthmonitor_db(healthmonitor_info,
pool_db.id)
pool_db.healthmonitor = health_monitor
LOG.debug('_create_pool pool_db %s', pool_db)
context.session.add(pool_db)
context.session.flush()
return data_models.Pool.from_sqlalchemy_model(pool_db)
开发者ID:CingHu,项目名称:neutron-ustack,代码行数:34,代码来源:loadbalancer_dbv2.py
示例17: is_hosting_device_reachable
def is_hosting_device_reachable(self, hosting_device):
"""Check the hosting device which hosts this resource is reachable.
If the resource is not reachable, it is added to the backlog.
:param hosting_device : dict of the hosting device
:return True if device is reachable, else None
"""
hd = hosting_device
hd_id = hosting_device['id']
hd_mgmt_ip = hosting_device['management_ip_address']
# Modifying the 'created_at' to a date time object
hosting_device['created_at'] = datetime.datetime.strptime(
hosting_device['created_at'], '%Y-%m-%d %H:%M:%S')
if hd_id not in self.backlog_hosting_devices:
if _is_pingable(hd_mgmt_ip):
LOG.debug("Hosting device: %(hd_id)[email protected]%(ip)s is reachable.",
{'hd_id': hd_id, 'ip': hd_mgmt_ip})
return True
LOG.debug("Hosting device: %(hd_id)[email protected]%(ip)s is NOT reachable.",
{'hd_id': hd_id, 'ip': hd_mgmt_ip})
hd['backlog_insertion_ts'] = max(
timeutils.utcnow(),
hd['created_at'] +
datetime.timedelta(seconds=hd['booting_time']))
self.backlog_hosting_devices[hd_id] = {'hd': hd}
LOG.debug("Hosting device: %(hd_id)s @ %(ip)s is now added "
"to backlog", {'hd_id': hd_id, 'ip': hd_mgmt_ip})
开发者ID:HybridCloud-dew,项目名称:hws,代码行数:29,代码来源:device_status.py
示例18: create_loadbalancer
def create_loadbalancer(self, context, loadbalancer):
with context.session.begin(subtransactions=True):
self._load_id_and_tenant_id(context, loadbalancer)
vip_address = loadbalancer.pop('vip_address')
securitygroup_id = loadbalancer.get('securitygroup_id')
loadbalancer['status'] = constants.PENDING_CREATE
loadbalancer['created_at'] = timeutils.utcnow()
lb_db = models.LoadBalancer(**loadbalancer)
context.session.add(lb_db)
context.session.flush()
lb_db.stats = self._create_loadbalancer_stats(
context, lb_db.id)
context.session.add(lb_db)
# create port outside of lb create transaction since it can sometimes
# cause lock wait timeouts
try:
self._create_port_for_load_balancer(context, lb_db,
vip_address, securitygroup_id)
except ext_sg.SecurityGroupNotFound:
LOG.error('_create_port_for_load_balancer %s securitygroup',lb_db.id)
with excutils.save_and_reraise_exception():
context.session.delete(lb_db)
context.session.flush()
raise loadbalancerv2.SecurityGroupNotFound(id=lb_db.id)
except Exception:
LOG.error('_create_port_for_load_balancer %s',lb_db.id)
with excutils.save_and_reraise_exception():
context.session.delete(lb_db)
context.session.flush()
return data_models.LoadBalancer.from_sqlalchemy_model(lb_db)
开发者ID:CingHu,项目名称:neutron-ustack,代码行数:31,代码来源:loadbalancer_dbv2.py
示例19: deallocate_mac_address
def deallocate_mac_address(self, context, address):
mac = db_api.mac_address_find(context, address=address,
scope=db_api.ONE)
if not mac:
raise exceptions.NotFound(
message="No MAC address %s found" % netaddr.EUI(address))
db_api.mac_address_update(context, mac, deallocated=True,
deallocated_at=timeutils.utcnow())
开发者ID:kilogram,项目名称:quark,代码行数:8,代码来源:ipam.py
示例20: test__schedule_network_no_existing_agents
def test__schedule_network_no_existing_agents(self):
agent = agents_db.Agent()
agent.admin_state_up = True
agent.heartbeat_timestamp = timeutils.utcnow()
network = {'id': 'foo_net_id'}
self._test__schedule_network(network,
new_agents=None, existing_agents=[agent],
expected_casts=0, expected_warnings=0)
开发者ID:ArifovicH,项目名称:neutron,代码行数:8,代码来源:test_dhcp_rpc_agent_api.py
注:本文中的neutron.openstack.common.timeutils.utcnow函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论