本文整理汇总了Python中melange.common.utils.utcnow函数的典型用法代码示例。如果您正苦于以下问题:Python utcnow函数的具体用法?Python utcnow怎么用?Python utcnow使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了utcnow函数的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _allocate_allocatable_address
def _allocate_allocatable_address(ip_block, interface,
requested_address=None):
"""Slowly migrate off the AllocatableIps Table."""
model = ipam.models.IpAddress
db_session = session.get_session()
db_session.begin()
allocatable_qry = _query_by(ipam.models.AllocatableIp,
db_session=db_session,
ip_block_id=ip_block.id)
if requested_address is not None:
filter_kwargs = {'address': requested_address}
allocatable_qry = allocatable_qry.filter(**filter_kwargs)
allocatable_address = allocatable_qry.first()
if not allocatable_address:
db_session.commit()
return
ip = allocatable_address.address
address = model(id=utils.generate_uuid(),
created_at=utils.utcnow(),
updated_at=utils.utcnow(),
address=str(ip),
ip_block_id=ip_block.id,
interface_id=interface.id,
used_by_tenant_id=interface.tenant_id,
allocated=True)
db_session.merge(address)
try:
db_session.flush()
except sqlalchemy.exc.IntegrityError:
db_session.rollback()
db_session.begin()
db_session.delete(allocatable_address)
db_session.commit()
LOG.debug("Allocatable ip %s in block %s was a dupe. Deleted" %
(ip, ip_block.id))
return _allocate_allocatable_address(ip_block, interface,
requested_address)
db_session.delete(allocatable_address)
db_session.commit()
return address
db_session.delete(allocatable_qry.address)
开发者ID:blamarvt,项目名称:melange,代码行数:49,代码来源:api.py
示例2: save
def save(self):
if not self.is_valid():
raise InvalidModelError(self.errors)
self._convert_columns_to_proper_type()
self._before_save()
self['updated_at'] = utils.utcnow()
return db.db_api.save(self)
开发者ID:rcbops,项目名称:melange-buildpackage,代码行数:7,代码来源:models.py
示例3: delete_interface
def delete_interface(interface):
db_session = session.get_session()
with db_session.begin():
mac_qry = _query_by(ipam.models.MacAddress, db_session=db_session,
interface_id=interface.id)
mac = mac_qry.with_lockmode('update').first()
if mac:
db_session.delete(mac)
ips_qry = _query_by(ipam.models.IpAddress, db_session=db_session,
interface_id=interface.id)
ips = ips_qry.with_lockmode('update').all()
for ip in ips:
LOG.debug("Marking IP address for deallocation: %r" % ip)
ip.allocated = False
ip.marked_for_deallocation = True
ip.deallocated_at = utils.utcnow()
ip.interface_id = None
db_session.merge(ip)
db_session.delete(interface)
return mac, ips
# NOTE(jkoelker) Failsafe return:
return None, []
开发者ID:blamarvt,项目名称:melange,代码行数:29,代码来源:api.py
示例4: create
def create(cls, **values):
if 'id' not in values or values['id'] is None:
values['id'] = utils.generate_uuid()
values['created_at'] = utils.utcnow()
instance = cls(**values).save()
instance._notify_fields("create")
return instance
开发者ID:klmitch,项目名称:melange,代码行数:7,代码来源:models.py
示例5: save
def save(self):
if not self.is_valid():
raise InvalidModelError(self.errors)
self._convert_columns_to_proper_type()
self._before_save()
self['updated_at'] = utils.utcnow()
LOG.debug("Saving %s: %s" % (self.__class__.__name__, self.__dict__))
return db.db_api.save(self)
开发者ID:roaet,项目名称:melange,代码行数:8,代码来源:models.py
示例6: deallocated_by_date
def deallocated_by_date():
days = config.Config.get('keep_deallocated_ips_for_days')
if days is None:
seconds = config.Config.get('keep_deallocated_ips_for_seconds', 172800)
else:
seconds = int(days) * 86400
LOG.debug("Delete delay = ", seconds)
return utils.utcnow() - datetime.timedelta(seconds=int(seconds))
开发者ID:roaet,项目名称:melange,代码行数:8,代码来源:models.py
示例7: _generate_message
def _generate_message(self, event_type, priority, payload):
return {
"message_id": str(utils.generate_uuid()),
"publisher_id": socket.gethostname(),
"event_type": event_type,
"priority": priority,
"payload": payload,
"timestamp": str(utils.utcnow()),
}
开发者ID:OpenStack-Kha,项目名称:melange,代码行数:9,代码来源:notifier.py
示例8: _setup_expected_message
def _setup_expected_message(self, priority, event,
message):
self.setup_uuid_with("test_uuid")
return {'event_type': event,
'timestamp': str(utils.utcnow()),
'priority': priority,
'message_id': "test_uuid",
'payload': message,
'publisher_id': socket.gethostname(),
}
开发者ID:blamarvt,项目名称:melange,代码行数:10,代码来源:test_notifier.py
示例9: deallocate
def deallocate(self):
LOG.debug("Marking IP address for deallocation: %r" % self)
self.update(marked_for_deallocation=True,
deallocated_at=utils.utcnow(),
interface_id=None)
开发者ID:roaet,项目名称:melange,代码行数:5,代码来源:models.py
示例10: deallocated_by_date
def deallocated_by_date():
days = config.Config.get('keep_deallocated_ips_for_days', 2)
return utils.utcnow() - datetime.timedelta(days=int(days))
开发者ID:AsherBond,项目名称:melange,代码行数:3,代码来源:models.py
示例11: deallocate
def deallocate(self):
self.update(marked_for_deallocation=True,
deallocated_at=utils.utcnow(),
interface_id=None)
开发者ID:rcbops,项目名称:melange-buildpackage,代码行数:4,代码来源:models.py
示例12: allocate_ipv4_address
def allocate_ipv4_address(ip_block, interface, requested_address=None):
model = ipam.models.IpAddress
address = _allocate_allocatable_address(ip_block, interface,
requested_address)
if address:
return address
db_session = session.get_session()
with db_session.begin():
address_qry = _query_by(model, db_session=db_session,
allocated=False,
marked_for_deallocation=False,
ip_block_id=ip_block.id)
if requested_address is not None:
address_qry = address_qry.filter(address=requested_address)
address = address_qry.with_lockmode('update').first()
if address:
address.allocated = True
address.interface_id = interface.id
address.used_by_tenant_id = interface.tenant_id
address.updated_at = utils.utcnow()
db_session.merge(address)
return address
else:
ips = netaddr.IPNetwork(ip_block.cidr)
counter = (ip_block.allocatable_ip_counter or ips[0].value)
if counter >= ips[-1].value:
ip_block.is_full = True
# NOTE(jkoelker) explicit save() to flush the session prior
# to raising
save(ip_block, db_session=db_session)
raise exception.NoMoreAddressesError(_("IpBlock is full"))
ip = netaddr.IPAddress(counter)
# NOTE(jkoelker) HRM, this may need to be rethought order wise
counter = counter + 1
if counter >= ips[-1].value:
ip_block.is_full = True
ip_block.allocatable_ip_counter = counter
db_session.merge(ip_block)
# NOTE(jkoelker) SQLAlchemy models, how do you work? ;)
address = model(id=utils.generate_uuid(),
created_at=utils.utcnow(),
updated_at=utils.utcnow(),
address=str(ip),
ip_block_id=ip_block.id,
interface_id=interface.id,
used_by_tenant_id=interface.tenant_id,
allocated=True)
db_session.merge(address)
return address
开发者ID:blamarvt,项目名称:melange,代码行数:62,代码来源:api.py
注:本文中的melange.common.utils.utcnow函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论