本文整理汇总了Python中turbogears.database.session.expire_all函数的典型用法代码示例。如果您正苦于以下问题:Python expire_all函数的具体用法?Python expire_all怎么用?Python expire_all使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了expire_all函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_power_commands_are_not_run_twice
def test_power_commands_are_not_run_twice(self):
# We will make the dummy power script sleep for this long:
power_sleep = 4
# To reproduce this bug, we need to queue up three commands for the
# same system (so they are run in sequence by beaker-provision), where
# the commands take enough time that the second one will still be
# running on the next iteration of the polling loop. The third command
# will be run twice.
assert power_sleep < get_conf().get('SLEEP_TIME')
assert 2 * power_sleep > get_conf().get('SLEEP_TIME')
with session.begin():
system = data_setup.create_system(lab_controller=self.get_lc())
system.power.power_type = PowerType.lazy_create(name=u'dummy')
system.power.power_id = power_sleep # make power script sleep
system.action_power(action=u'off', service=u'testdata')
system.action_power(action=u'off', service=u'testdata')
system.action_power(action=u'off', service=u'testdata')
wait_for_commands_to_finish(system, timeout=5 * power_sleep)
with session.begin():
session.expire_all()
self.assertEquals(system.command_queue[0].status, CommandStatus.completed)
self.assertEquals(system.command_queue[1].status, CommandStatus.completed)
self.assertEquals(system.command_queue[2].status, CommandStatus.completed)
# The bug manifests as two "Completed" records for the power
# command which ran twice
self.assertEquals(system.dyn_activity
.filter_by(field_name=u'Power', new_value=u'Completed')
.count(), 3)
开发者ID:ShaolongHu,项目名称:beaker,代码行数:28,代码来源:test_provision.py
示例2: test_recovers_running_job_with_completed_recipes
def test_recovers_running_job_with_completed_recipes(self):
# job with two recipes, both Completed, but job is Running
# and systems are still assigned
job = data_setup.create_job(num_recipes=2)
data_setup.mark_job_running(job)
systems = [r.resource.system for r in job.all_recipes]
job.recipesets[0].recipes[0].tasks[-1].stop()
job.recipesets[0].recipes[0]._update_status()
job.recipesets[0].recipes[1].tasks[-1].stop()
job.recipesets[0].recipes[1]._update_status()
session.flush()
self.assertEquals(job.recipesets[0].recipes[0].status,
TaskStatus.completed)
self.assertEquals(job.recipesets[0].recipes[1].status,
TaskStatus.completed)
self.assertEquals(job.recipesets[0].status, TaskStatus.running)
self.assertEquals(job.status, TaskStatus.running)
self.assert_(systems[0].open_reservation is not None)
self.assert_(systems[1].open_reservation is not None)
job.update_status()
session.flush()
session.expire_all()
self.assertEquals(systems[0].open_reservation, None)
self.assertEquals(systems[1].open_reservation, None)
self.assertEquals(job.recipesets[0].status, TaskStatus.completed)
self.assertEquals(job.status, TaskStatus.completed)
开发者ID:ShaolongHu,项目名称:beaker,代码行数:27,代码来源:test_update_status.py
示例3: test_purge_stale_running_commands
def test_purge_stale_running_commands(self):
with session.begin():
distro_tree = data_setup.create_distro_tree(osmajor=u'Fedora')
# Helper to build the commands
def _make_command(lc=None, creation_date=None):
job = data_setup.create_job(distro_tree=distro_tree)
recipe = job.recipesets[0].recipes[0]
system = data_setup.create_system(lab_controller=lc)
data_setup.mark_recipe_waiting(recipe, system=system)
command = CommandActivity(
user=None, service=u'testdata', action=u'on',
status=CommandStatus.running,
callback=u'bkr.server.model.auto_cmd_handler')
if creation_date is not None:
command.created = command.updated = creation_date
system.command_queue.append(command)
return recipe.tasks[0], command
# Normal command for the current LC
recent_task, recent_command = _make_command(lc=self.lc)
# Old command for a different LC
backdated = datetime.datetime.utcnow()
backdated -= datetime.timedelta(days=1, minutes=1)
old_task, old_command = _make_command(creation_date=backdated)
self.server.auth.login_password(self.lc.user.user_name, u'logmein')
self.server.labcontrollers.clear_running_commands(u'Staleness')
with session.begin():
session.expire_all()
# Recent commands have their callback invoked
self.assertEquals(recent_command.status, CommandStatus.aborted)
self.assertEquals(recent_task.status, TaskStatus.aborted)
# Stale commands just get dropped on the floor
self.assertEquals(old_command.status, CommandStatus.aborted)
self.assertEquals(old_task.status, TaskStatus.waiting)
开发者ID:sibiaoluo,项目名称:beaker,代码行数:34,代码来源:test_labcontrollers.py
示例4: test_activity_created_with_expire
def test_activity_created_with_expire(self):
self.server.auth.login_password(self.user.user_name, u'password')
self.server.distros.expire(self.distro.name, 'CUSTOMSERVICE')
session.expire_all()
with session.begin():
activity = self.distro_tree.activity[0]
self.assertEquals(activity.service, u'CUSTOMSERVICE')
开发者ID:beaker-project,项目名称:beaker,代码行数:7,代码来源:test_distros.py
示例5: test_concurrent_recipe_completion
def test_concurrent_recipe_completion(self):
# This test simulates two recipes finishing at the same time. So we
# have two concurrent transactions both updating the respective task states.
# Previously there was no separate job.update_status() step, so the two
# transactions would update the job status using out-of-date values in
# both transactions, leaving the job running.
with session.begin():
recipe1 = data_setup.create_recipe()
recipe2 = data_setup.create_recipe()
job = data_setup.create_job_for_recipes([recipe1, recipe2])
assert len(recipe1.tasks) == 1
assert len(recipe2.tasks) == 1
data_setup.mark_recipe_running(recipe1)
data_setup.mark_recipe_running(recipe2)
recipe1.tasks[-1].pass_(u"/", 0, u"Pass")
recipe2.tasks[-1].pass_(u"/", 0, u"Pass")
# Complete the recipes "concurrently" in two separate transactions
class RecipeCompletionThread(Thread):
def __init__(self, recipe_id=None, **kwargs):
super(RecipeCompletionThread, self).__init__(**kwargs)
self.recipe_id = recipe_id
self.ready_evt = Event()
self.continue_evt = Event()
def run(self):
session.begin()
recipe = Recipe.by_id(self.recipe_id)
self.ready_evt.set()
self.continue_evt.wait()
recipe.tasks[-1].stop()
session.commit()
thread1 = RecipeCompletionThread(name="recipe1", recipe_id=recipe1.id)
thread2 = RecipeCompletionThread(name="recipe2", recipe_id=recipe2.id)
thread1.start()
thread2.start()
# Wait for both threads to start their transactions
thread1.ready_evt.wait()
thread2.ready_evt.wait()
# Allow recipe 1 to complete
thread1.continue_evt.set()
thread1.join()
with session.begin():
session.expire_all()
job.update_status()
self.assertEquals(recipe1.status, TaskStatus.completed)
self.assertEquals(recipe1.ptasks, 1)
self.assertEquals(job.status, TaskStatus.running)
self.assertEquals(job.ptasks, 1)
# Now recipe 2 completes
thread2.continue_evt.set()
thread2.join()
with session.begin():
session.expire_all()
job.update_status()
self.assertEquals(recipe2.status, TaskStatus.completed)
self.assertEquals(recipe2.ptasks, 1)
self.assertEquals(job.status, TaskStatus.completed)
self.assertEquals(job.ptasks, 2)
开发者ID:beaker-project,项目名称:beaker,代码行数:60,代码来源:test_update_status.py
示例6: test_set_job_whiteboard
def test_set_job_whiteboard(self):
out = run_client(['bkr', 'job-modify', self.job.t_id,
'--whiteboard', 'Gregor Samsa awoke'])
self.assertIn('Successfully modified jobs %s' % self.job.t_id, out)
with session.begin():
session.expire_all()
self.assertEquals(self.job.whiteboard, u'Gregor Samsa awoke')
开发者ID:beaker-project,项目名称:beaker,代码行数:7,代码来源:test_job_modify.py
示例7: test_set_active_policy_to_custom_policy
def test_set_active_policy_to_custom_policy(self):
with session.begin():
user1 = data_setup.create_user()
user2 = data_setup.create_user()
self.system.custom_access_policy.add_rule(
permission=SystemPermission.edit_system, user=user1)
pool = data_setup.create_system_pool()
pool.access_policy.add_rule(
permission=SystemPermission.edit_system, user=user2)
self.system.active_access_policy = pool.access_policy
self.assertFalse(self.system.active_access_policy.grants
(user1, SystemPermission.edit_system))
self.assertTrue(self.system.active_access_policy.grants
(user2, SystemPermission.edit_system))
s = requests.Session()
s.post(get_server_base() + 'login', data={'user_name': self.owner.user_name,
'password': 'theowner'}).raise_for_status()
response = patch_json(get_server_base() +
'systems/%s/' % self.system.fqdn, session=s,
data={'active_access_policy': {'custom': True}},
)
response.raise_for_status()
with session.begin():
session.expire_all()
self.assertTrue(self.system.active_access_policy.grants \
(user1, SystemPermission.edit_system))
开发者ID:ShaolongHu,项目名称:beaker,代码行数:28,代码来源:test_systems.py
示例8: test_nak_job
def test_nak_job(self):
out = run_client(['bkr', 'job-modify', self.job.t_id, '--response', 'nak'])
self.assert_(out == 'Successfully modified jobs %s\n' % self.job.t_id)
with session.begin():
session.expire_all()
for rs in self.job.recipesets:
self.assertEqual(rs.waived, True)
开发者ID:beaker-project,项目名称:beaker,代码行数:7,代码来源:test_job_modify.py
示例9: test_inverted_group_modify_grant_owner
def test_inverted_group_modify_grant_owner(self):
with session.begin():
group = data_setup.create_group(owner=self.user,
membership_type=GroupMembershipType.inverted)
user1 = data_setup.create_user()
group.add_member(user1)
user2 = data_setup.create_user()
group.add_member(user2)
# user3 is not associated but can also be set as the group owner.
user3 = data_setup.create_user()
out = run_client(['bkr', 'group-modify',
'--grant-owner', user1.user_name,
'--grant-owner', user2.user_name,
'--grant-owner', user3.user_name,
group.group_name],
config = self.client_config)
with session.begin():
session.expire_all()
self.assertTrue(group.has_owner(user1))
self.assertTrue(group.has_owner(user2))
self.assertTrue(group.has_owner(user3))
self.assertEquals(group.activity[-1].action, u'Added')
self.assertEquals(group.activity[-1].field_name, u'Owner')
self.assertEquals(group.activity[-1].new_value, user3.user_name)
self.assertEquals(group.activity[-1].service, u'HTTP')
开发者ID:beaker-project,项目名称:beaker,代码行数:27,代码来源:test_group_modify.py
示例10: test_set_recipe_whiteboard
def test_set_recipe_whiteboard(self):
recipe = self.job.recipesets[0].recipes[0]
out = run_client(['bkr', 'job-modify', recipe.t_id,
'--whiteboard', 'found himself transformed'])
self.assertIn('Successfully modified jobs %s' % recipe.t_id, out)
with session.begin():
session.expire_all()
self.assertEquals(recipe.whiteboard, u'found himself transformed')
开发者ID:beaker-project,项目名称:beaker,代码行数:8,代码来源:test_job_modify.py
示例11: test_successful_add_tag_for_all_distros
def test_successful_add_tag_for_all_distros(self):
with session.begin():
self.distro = data_setup.create_distro()
out = run_client(['bkr', 'distros-tag', '--name=%', 'addAll'])
with session.begin():
session.expire_all()
for distro in Distro.query:
self.assertIn(u'addAll', distro.tags)
开发者ID:beaker-project,项目名称:beaker,代码行数:8,代码来源:test_distros_tag.py
示例12: test_activity_created_with_expire
def test_activity_created_with_expire(self):
self.server.auth.login_password(data_setup.ADMIN_USER,
data_setup.ADMIN_PASSWORD)
self.server.distros.expire(self.distro.name, 'CUSTOMSERVICE')
session.expire_all()
with session.begin():
activity = self.distro_tree.activity[0]
self.assertEquals(activity.service, u'CUSTOMSERVICE')
开发者ID:ShaolongHu,项目名称:beaker,代码行数:8,代码来源:test_distros.py
示例13: test_change_password
def test_change_password(self):
with session.begin():
lc = data_setup.create_labcontroller()
run_client(['bkr', 'labcontroller-modify',
'--password', u'newpassword',
lc.fqdn])
with session.begin():
session.expire_all()
self.assertTrue(lc.user.check_password(u'newpassword'))
开发者ID:beaker-project,项目名称:beaker,代码行数:9,代码来源:test_labcontroller_modify.py
示例14: test_remove_account
def test_remove_account(self):
with session.begin():
user = data_setup.create_user()
job = data_setup.create_job(owner=user)
data_setup.mark_job_running(job)
owned_system = data_setup.create_system(owner=user)
loaned_system = data_setup.create_system()
loaned_system.loaned = user
reserved_system = data_setup.create_system(status=u"Manual")
reserved_system.reserve_manually(service=u"testdata", user=user)
reserved_system.custom_access_policy.add_rule(SystemPermission.reserve, user=user)
group = data_setup.create_group(owner=user)
run_client(["bkr", "remove-account", user.user_name])
with session.begin():
session.expire_all()
self.assertIsNotNone(user.removed)
# running jobs should be cancelled
job.update_status()
self.assertEquals(job.status, TaskStatus.cancelled)
self.assertIn("User %s removed" % user.user_name, job.recipesets[0].recipes[0].tasks[0].results[0].log)
# reservations should be released
self.assertIsNone(reserved_system.user)
self.assertEqual(reserved_system.activity[1].user.user_name, data_setup.ADMIN_USER)
self.assertEqual(reserved_system.activity[1].field_name, u"User")
self.assertEqual(reserved_system.activity[1].action, u"Returned")
self.assertEqual(reserved_system.activity[1].old_value, user.user_name)
self.assertEqual(reserved_system.activity[1].new_value, u"")
# loans should be returned
self.assertIsNone(loaned_system.loaned)
self.assertEqual(loaned_system.activity[0].user.user_name, data_setup.ADMIN_USER)
self.assertEqual(loaned_system.activity[0].field_name, u"Loaned To")
self.assertEqual(loaned_system.activity[0].action, u"Changed")
self.assertEqual(loaned_system.activity[0].old_value, user.user_name)
self.assertEqual(loaned_system.activity[0].new_value, None)
# access policy rules should be removed
self.assertEqual([], [rule for rule in reserved_system.custom_access_policy.rules if rule.user == user])
self.assertEqual(reserved_system.activity[0].user.user_name, data_setup.ADMIN_USER)
self.assertEqual(reserved_system.activity[0].field_name, u"Access Policy Rule")
self.assertEqual(reserved_system.activity[0].action, u"Removed")
self.assertEqual(reserved_system.activity[0].old_value, u"<grant reserve to %s>" % user.user_name)
self.assertEqual(reserved_system.activity[0].new_value, None)
# systems owned by the user should be transferred to the caller
self.assertEqual(owned_system.owner.user_name, data_setup.ADMIN_USER)
self.assertEqual(owned_system.activity[0].user.user_name, data_setup.ADMIN_USER)
self.assertEqual(owned_system.activity[0].field_name, u"Owner")
self.assertEqual(owned_system.activity[0].action, u"Changed")
self.assertEqual(owned_system.activity[0].old_value, user.user_name)
self.assertEqual(owned_system.activity[0].new_value, data_setup.ADMIN_USER)
# group membership/ownership should be removed
self.assertNotIn(group, user.groups)
self.assertNotIn(user, group.users)
self.assertFalse(group.has_owner(user))
self.assertEqual(group.activity[-1].user.user_name, data_setup.ADMIN_USER)
self.assertEqual(group.activity[-1].field_name, u"User")
self.assertEqual(group.activity[-1].action, u"Removed")
self.assertEqual(group.activity[-1].old_value, user.user_name)
self.assertEqual(group.activity[-1].new_value, None)
开发者ID:beaker-project,项目名称:beaker,代码行数:57,代码来源:test_remove_account.py
示例15: test_multiple_response_recipeset
def test_multiple_response_recipeset(self):
out = run_client(['bkr', 'job-modify', self.job.recipesets[0].t_id,
self.job_for_rs.recipesets[0].t_id, '--response', 'nak'])
self.assert_('Successfully modified jobs' in out and \
self.job_for_rs.recipesets[0].t_id in out and \
self.job.recipesets[0].t_id in out,)
with session.begin():
session.expire_all()
self.assertEqual(self.job.recipesets[0].waived, True)
self.assertEqual(self.job_for_rs.recipesets[0].waived, True)
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_job_modify.py
示例16: test_disable_the_lab_controller
def test_disable_the_lab_controller(self):
with session.begin():
lc = data_setup.create_labcontroller()
user = data_setup.create_user()
run_client(['bkr', 'labcontroller-modify',
'--disable',
lc.fqdn])
with session.begin():
session.expire_all()
self.assertTrue(lc.disabled)
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_labcontroller_modify.py
示例17: test_task_update_disable_normal_user_fail
def test_task_update_disable_normal_user_fail(self):
req_sess = requests.Session()
requests_login(req_sess, self.normal_user.user_name, 'secret')
self.assertEqual(self.my_task.valid, True)
response = patch_json(get_server_base() + 'tasks/%s' % self.my_task.id,
session=req_sess, data={'disabled': True})
self.assertEqual(response.status_code, 403)
with session.begin():
session.expire_all()
self.assertEqual(self.my_task.valid, True)
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_tasks.py
示例18: test_increase_priority
def test_increase_priority(self):
out = run_client(['bkr', 'job-modify', self.job.t_id, '--priority', 'High'])
self.assertIn('Successfully modified jobs %s' % self.job.t_id, out)
with session.begin():
session.expire_all()
for rs in self.job.recipesets:
self.assertEquals(rs.priority.value, 'High')
self.assertEquals(self.job.recipesets[0].activity[0].action, u'Changed')
self.assertEquals(self.job.recipesets[0].activity[0].field_name, 'Priority')
self.assertEquals(self.job.recipesets[0].activity[0].new_value, 'High')
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_job_modify.py
示例19: test_post_comment_to_recipeset
def test_post_comment_to_recipeset(self):
with session.begin():
recipe = self.job.recipesets[0]
comment_text = u'Never gonna give you up'
out = run_client(['bkr', 'job-comment', recipe.t_id,
'--message', comment_text])
with session.begin():
session.expire_all()
self.assertEqual(recipe.comments[0].comment, comment_text)
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_job_comment.py
示例20: test_change_user_name
def test_change_user_name(self):
with session.begin():
lc = data_setup.create_labcontroller()
user = data_setup.create_user()
run_client(['bkr', 'labcontroller-modify',
'--user', user.user_name,
lc.fqdn])
with session.begin():
session.expire_all()
self.assertEqual(lc.user.user_name, user.user_name)
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_labcontroller_modify.py
注:本文中的turbogears.database.session.expire_all函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论