• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python session.expire_all函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中turbogears.database.session.expire_all函数的典型用法代码示例。如果您正苦于以下问题:Python expire_all函数的具体用法?Python expire_all怎么用?Python expire_all使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了expire_all函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_power_commands_are_not_run_twice

 def test_power_commands_are_not_run_twice(self):
     # We will make the dummy power script sleep for this long:
     power_sleep = 4
     # To reproduce this bug, we need to queue up three commands for the 
     # same system (so they are run in sequence by beaker-provision), where 
     # the commands take enough time that the second one will still be 
     # running on the next iteration of the polling loop. The third command 
     # will be run twice.
     assert power_sleep < get_conf().get('SLEEP_TIME')
     assert 2 * power_sleep > get_conf().get('SLEEP_TIME')
     with session.begin():
         system = data_setup.create_system(lab_controller=self.get_lc())
         system.power.power_type = PowerType.lazy_create(name=u'dummy')
         system.power.power_id = power_sleep # make power script sleep
         system.action_power(action=u'off', service=u'testdata')
         system.action_power(action=u'off', service=u'testdata')
         system.action_power(action=u'off', service=u'testdata')
     wait_for_commands_to_finish(system, timeout=5 * power_sleep)
     with session.begin():
         session.expire_all()
         self.assertEquals(system.command_queue[0].status, CommandStatus.completed)
         self.assertEquals(system.command_queue[1].status, CommandStatus.completed)
         self.assertEquals(system.command_queue[2].status, CommandStatus.completed)
         # The bug manifests as two "Completed" records for the power 
         # command which ran twice
         self.assertEquals(system.dyn_activity
                 .filter_by(field_name=u'Power', new_value=u'Completed')
                 .count(), 3)
开发者ID:ShaolongHu,项目名称:beaker,代码行数:28,代码来源:test_provision.py


示例2: test_recovers_running_job_with_completed_recipes

    def test_recovers_running_job_with_completed_recipes(self):
        # job with two recipes, both Completed, but job is Running
        # and systems are still assigned
        job = data_setup.create_job(num_recipes=2)
        data_setup.mark_job_running(job)
        systems = [r.resource.system for r in job.all_recipes]
        job.recipesets[0].recipes[0].tasks[-1].stop()
        job.recipesets[0].recipes[0]._update_status()
        job.recipesets[0].recipes[1].tasks[-1].stop()
        job.recipesets[0].recipes[1]._update_status()
        session.flush()
        self.assertEquals(job.recipesets[0].recipes[0].status,
                TaskStatus.completed)
        self.assertEquals(job.recipesets[0].recipes[1].status,
                TaskStatus.completed)
        self.assertEquals(job.recipesets[0].status, TaskStatus.running)
        self.assertEquals(job.status, TaskStatus.running)
        self.assert_(systems[0].open_reservation is not None)
        self.assert_(systems[1].open_reservation is not None)

        job.update_status()
        session.flush()
        session.expire_all()
        self.assertEquals(systems[0].open_reservation, None)
        self.assertEquals(systems[1].open_reservation, None)
        self.assertEquals(job.recipesets[0].status, TaskStatus.completed)
        self.assertEquals(job.status, TaskStatus.completed)
开发者ID:ShaolongHu,项目名称:beaker,代码行数:27,代码来源:test_update_status.py


示例3: test_purge_stale_running_commands

    def test_purge_stale_running_commands(self):
        with session.begin():
            distro_tree = data_setup.create_distro_tree(osmajor=u'Fedora')
            # Helper to build the commands
            def _make_command(lc=None, creation_date=None):
                job = data_setup.create_job(distro_tree=distro_tree)
                recipe = job.recipesets[0].recipes[0]
                system = data_setup.create_system(lab_controller=lc)
                data_setup.mark_recipe_waiting(recipe, system=system)
                command = CommandActivity(
                        user=None, service=u'testdata', action=u'on',
                        status=CommandStatus.running,
                        callback=u'bkr.server.model.auto_cmd_handler')
                if creation_date is not None:
                    command.created = command.updated = creation_date
                system.command_queue.append(command)
                return recipe.tasks[0], command
            # Normal command for the current LC
            recent_task, recent_command = _make_command(lc=self.lc)
            # Old command for a different LC
            backdated = datetime.datetime.utcnow()
            backdated -= datetime.timedelta(days=1, minutes=1)
            old_task, old_command = _make_command(creation_date=backdated)

        self.server.auth.login_password(self.lc.user.user_name, u'logmein')
        self.server.labcontrollers.clear_running_commands(u'Staleness')
        with session.begin():
            session.expire_all()
            # Recent commands have their callback invoked
            self.assertEquals(recent_command.status, CommandStatus.aborted)
            self.assertEquals(recent_task.status, TaskStatus.aborted)
            # Stale commands just get dropped on the floor
            self.assertEquals(old_command.status, CommandStatus.aborted)
            self.assertEquals(old_task.status, TaskStatus.waiting)
开发者ID:sibiaoluo,项目名称:beaker,代码行数:34,代码来源:test_labcontrollers.py


示例4: test_activity_created_with_expire

 def test_activity_created_with_expire(self):
     self.server.auth.login_password(self.user.user_name, u'password')
     self.server.distros.expire(self.distro.name, 'CUSTOMSERVICE')
     session.expire_all()
     with session.begin():
         activity = self.distro_tree.activity[0]
         self.assertEquals(activity.service, u'CUSTOMSERVICE')
开发者ID:beaker-project,项目名称:beaker,代码行数:7,代码来源:test_distros.py


示例5: test_concurrent_recipe_completion

    def test_concurrent_recipe_completion(self):
        # This test simulates two recipes finishing at the same time. So we
        # have two concurrent transactions both updating the respective task states.
        # Previously there was no separate job.update_status() step, so the two
        # transactions would update the job status using out-of-date values in
        # both transactions, leaving the job running.
        with session.begin():
            recipe1 = data_setup.create_recipe()
            recipe2 = data_setup.create_recipe()
            job = data_setup.create_job_for_recipes([recipe1, recipe2])
            assert len(recipe1.tasks) == 1
            assert len(recipe2.tasks) == 1
            data_setup.mark_recipe_running(recipe1)
            data_setup.mark_recipe_running(recipe2)
            recipe1.tasks[-1].pass_(u"/", 0, u"Pass")
            recipe2.tasks[-1].pass_(u"/", 0, u"Pass")

        # Complete the recipes "concurrently" in two separate transactions
        class RecipeCompletionThread(Thread):
            def __init__(self, recipe_id=None, **kwargs):
                super(RecipeCompletionThread, self).__init__(**kwargs)
                self.recipe_id = recipe_id
                self.ready_evt = Event()
                self.continue_evt = Event()

            def run(self):
                session.begin()
                recipe = Recipe.by_id(self.recipe_id)
                self.ready_evt.set()
                self.continue_evt.wait()
                recipe.tasks[-1].stop()
                session.commit()

        thread1 = RecipeCompletionThread(name="recipe1", recipe_id=recipe1.id)
        thread2 = RecipeCompletionThread(name="recipe2", recipe_id=recipe2.id)
        thread1.start()
        thread2.start()
        # Wait for both threads to start their transactions
        thread1.ready_evt.wait()
        thread2.ready_evt.wait()
        # Allow recipe 1 to complete
        thread1.continue_evt.set()
        thread1.join()
        with session.begin():
            session.expire_all()
            job.update_status()
            self.assertEquals(recipe1.status, TaskStatus.completed)
            self.assertEquals(recipe1.ptasks, 1)
            self.assertEquals(job.status, TaskStatus.running)
            self.assertEquals(job.ptasks, 1)
        # Now recipe 2 completes
        thread2.continue_evt.set()
        thread2.join()
        with session.begin():
            session.expire_all()
            job.update_status()
            self.assertEquals(recipe2.status, TaskStatus.completed)
            self.assertEquals(recipe2.ptasks, 1)
            self.assertEquals(job.status, TaskStatus.completed)
            self.assertEquals(job.ptasks, 2)
开发者ID:beaker-project,项目名称:beaker,代码行数:60,代码来源:test_update_status.py


示例6: test_set_job_whiteboard

 def test_set_job_whiteboard(self):
     out = run_client(['bkr', 'job-modify', self.job.t_id,
             '--whiteboard', 'Gregor Samsa awoke'])
     self.assertIn('Successfully modified jobs %s' % self.job.t_id, out)
     with session.begin():
         session.expire_all()
         self.assertEquals(self.job.whiteboard, u'Gregor Samsa awoke')
开发者ID:beaker-project,项目名称:beaker,代码行数:7,代码来源:test_job_modify.py


示例7: test_set_active_policy_to_custom_policy

    def test_set_active_policy_to_custom_policy(self):
        with session.begin():
            user1 = data_setup.create_user()
            user2 = data_setup.create_user()
            self.system.custom_access_policy.add_rule(
                permission=SystemPermission.edit_system, user=user1)
            pool = data_setup.create_system_pool()
            pool.access_policy.add_rule(
                permission=SystemPermission.edit_system, user=user2)
            self.system.active_access_policy = pool.access_policy

        self.assertFalse(self.system.active_access_policy.grants 
                        (user1, SystemPermission.edit_system))
        self.assertTrue(self.system.active_access_policy.grants
                         (user2, SystemPermission.edit_system))

        s = requests.Session()
        s.post(get_server_base() + 'login', data={'user_name': self.owner.user_name,
                'password': 'theowner'}).raise_for_status()
        response = patch_json(get_server_base() +
                              'systems/%s/' % self.system.fqdn, session=s,
                              data={'active_access_policy': {'custom': True}},
                         )
        response.raise_for_status()
        with session.begin():
            session.expire_all()
            self.assertTrue(self.system.active_access_policy.grants \
                            (user1, SystemPermission.edit_system))
开发者ID:ShaolongHu,项目名称:beaker,代码行数:28,代码来源:test_systems.py


示例8: test_nak_job

 def test_nak_job(self):
     out = run_client(['bkr', 'job-modify', self.job.t_id,  '--response', 'nak'])
     self.assert_(out == 'Successfully modified jobs %s\n' % self.job.t_id)
     with session.begin():
         session.expire_all()
         for rs in self.job.recipesets:
             self.assertEqual(rs.waived, True)
开发者ID:beaker-project,项目名称:beaker,代码行数:7,代码来源:test_job_modify.py


示例9: test_inverted_group_modify_grant_owner

    def test_inverted_group_modify_grant_owner(self):
        with session.begin():
            group = data_setup.create_group(owner=self.user,
                    membership_type=GroupMembershipType.inverted)
            user1 = data_setup.create_user()
            group.add_member(user1)
            user2 = data_setup.create_user()
            group.add_member(user2)
            # user3 is not associated but can also be set as the group owner.
            user3 = data_setup.create_user()

        out = run_client(['bkr', 'group-modify',
                          '--grant-owner', user1.user_name,
                          '--grant-owner', user2.user_name,
                          '--grant-owner', user3.user_name,
                          group.group_name],
                         config = self.client_config)

        with session.begin():
            session.expire_all()
            self.assertTrue(group.has_owner(user1))
            self.assertTrue(group.has_owner(user2))
            self.assertTrue(group.has_owner(user3))
            self.assertEquals(group.activity[-1].action, u'Added')
            self.assertEquals(group.activity[-1].field_name, u'Owner')
            self.assertEquals(group.activity[-1].new_value, user3.user_name)
            self.assertEquals(group.activity[-1].service, u'HTTP')
开发者ID:beaker-project,项目名称:beaker,代码行数:27,代码来源:test_group_modify.py


示例10: test_set_recipe_whiteboard

 def test_set_recipe_whiteboard(self):
     recipe = self.job.recipesets[0].recipes[0]
     out = run_client(['bkr', 'job-modify', recipe.t_id,
             '--whiteboard', 'found himself transformed'])
     self.assertIn('Successfully modified jobs %s' % recipe.t_id, out)
     with session.begin():
         session.expire_all()
         self.assertEquals(recipe.whiteboard, u'found himself transformed')
开发者ID:beaker-project,项目名称:beaker,代码行数:8,代码来源:test_job_modify.py


示例11: test_successful_add_tag_for_all_distros

 def test_successful_add_tag_for_all_distros(self):
     with session.begin():
         self.distro = data_setup.create_distro()
     out = run_client(['bkr', 'distros-tag', '--name=%', 'addAll'])
     with session.begin():
         session.expire_all()
         for distro in Distro.query:
             self.assertIn(u'addAll', distro.tags)
开发者ID:beaker-project,项目名称:beaker,代码行数:8,代码来源:test_distros_tag.py


示例12: test_activity_created_with_expire

 def test_activity_created_with_expire(self):
     self.server.auth.login_password(data_setup.ADMIN_USER,
         data_setup.ADMIN_PASSWORD)
     self.server.distros.expire(self.distro.name, 'CUSTOMSERVICE')
     session.expire_all()
     with session.begin():
         activity = self.distro_tree.activity[0]
         self.assertEquals(activity.service, u'CUSTOMSERVICE')
开发者ID:ShaolongHu,项目名称:beaker,代码行数:8,代码来源:test_distros.py


示例13: test_change_password

 def test_change_password(self):
     with session.begin():
         lc = data_setup.create_labcontroller()
     run_client(['bkr', 'labcontroller-modify',
                 '--password', u'newpassword',
                 lc.fqdn])
     with session.begin():
         session.expire_all()
         self.assertTrue(lc.user.check_password(u'newpassword'))
开发者ID:beaker-project,项目名称:beaker,代码行数:9,代码来源:test_labcontroller_modify.py


示例14: test_remove_account

 def test_remove_account(self):
     with session.begin():
         user = data_setup.create_user()
         job = data_setup.create_job(owner=user)
         data_setup.mark_job_running(job)
         owned_system = data_setup.create_system(owner=user)
         loaned_system = data_setup.create_system()
         loaned_system.loaned = user
         reserved_system = data_setup.create_system(status=u"Manual")
         reserved_system.reserve_manually(service=u"testdata", user=user)
         reserved_system.custom_access_policy.add_rule(SystemPermission.reserve, user=user)
         group = data_setup.create_group(owner=user)
     run_client(["bkr", "remove-account", user.user_name])
     with session.begin():
         session.expire_all()
         self.assertIsNotNone(user.removed)
         # running jobs should be cancelled
         job.update_status()
         self.assertEquals(job.status, TaskStatus.cancelled)
         self.assertIn("User %s removed" % user.user_name, job.recipesets[0].recipes[0].tasks[0].results[0].log)
         # reservations should be released
         self.assertIsNone(reserved_system.user)
         self.assertEqual(reserved_system.activity[1].user.user_name, data_setup.ADMIN_USER)
         self.assertEqual(reserved_system.activity[1].field_name, u"User")
         self.assertEqual(reserved_system.activity[1].action, u"Returned")
         self.assertEqual(reserved_system.activity[1].old_value, user.user_name)
         self.assertEqual(reserved_system.activity[1].new_value, u"")
         # loans should be returned
         self.assertIsNone(loaned_system.loaned)
         self.assertEqual(loaned_system.activity[0].user.user_name, data_setup.ADMIN_USER)
         self.assertEqual(loaned_system.activity[0].field_name, u"Loaned To")
         self.assertEqual(loaned_system.activity[0].action, u"Changed")
         self.assertEqual(loaned_system.activity[0].old_value, user.user_name)
         self.assertEqual(loaned_system.activity[0].new_value, None)
         # access policy rules should be removed
         self.assertEqual([], [rule for rule in reserved_system.custom_access_policy.rules if rule.user == user])
         self.assertEqual(reserved_system.activity[0].user.user_name, data_setup.ADMIN_USER)
         self.assertEqual(reserved_system.activity[0].field_name, u"Access Policy Rule")
         self.assertEqual(reserved_system.activity[0].action, u"Removed")
         self.assertEqual(reserved_system.activity[0].old_value, u"<grant reserve to %s>" % user.user_name)
         self.assertEqual(reserved_system.activity[0].new_value, None)
         # systems owned by the user should be transferred to the caller
         self.assertEqual(owned_system.owner.user_name, data_setup.ADMIN_USER)
         self.assertEqual(owned_system.activity[0].user.user_name, data_setup.ADMIN_USER)
         self.assertEqual(owned_system.activity[0].field_name, u"Owner")
         self.assertEqual(owned_system.activity[0].action, u"Changed")
         self.assertEqual(owned_system.activity[0].old_value, user.user_name)
         self.assertEqual(owned_system.activity[0].new_value, data_setup.ADMIN_USER)
         # group membership/ownership should be removed
         self.assertNotIn(group, user.groups)
         self.assertNotIn(user, group.users)
         self.assertFalse(group.has_owner(user))
         self.assertEqual(group.activity[-1].user.user_name, data_setup.ADMIN_USER)
         self.assertEqual(group.activity[-1].field_name, u"User")
         self.assertEqual(group.activity[-1].action, u"Removed")
         self.assertEqual(group.activity[-1].old_value, user.user_name)
         self.assertEqual(group.activity[-1].new_value, None)
开发者ID:beaker-project,项目名称:beaker,代码行数:57,代码来源:test_remove_account.py


示例15: test_multiple_response_recipeset

 def test_multiple_response_recipeset(self):
     out = run_client(['bkr', 'job-modify', self.job.recipesets[0].t_id,
                       self.job_for_rs.recipesets[0].t_id,  '--response', 'nak'])
     self.assert_('Successfully modified jobs' in out and \
                  self.job_for_rs.recipesets[0].t_id in out and \
                  self.job.recipesets[0].t_id in out,)
     with session.begin():
         session.expire_all()
         self.assertEqual(self.job.recipesets[0].waived, True)
         self.assertEqual(self.job_for_rs.recipesets[0].waived, True)
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_job_modify.py


示例16: test_disable_the_lab_controller

 def test_disable_the_lab_controller(self):
     with session.begin():
         lc = data_setup.create_labcontroller()
         user = data_setup.create_user()
     run_client(['bkr', 'labcontroller-modify',
                 '--disable',
                 lc.fqdn])
     with session.begin():
         session.expire_all()
         self.assertTrue(lc.disabled)
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_labcontroller_modify.py


示例17: test_task_update_disable_normal_user_fail

 def test_task_update_disable_normal_user_fail(self):
     req_sess = requests.Session()
     requests_login(req_sess, self.normal_user.user_name, 'secret')
     self.assertEqual(self.my_task.valid, True)
     response = patch_json(get_server_base() + 'tasks/%s' % self.my_task.id,
                           session=req_sess, data={'disabled': True})
     self.assertEqual(response.status_code, 403)
     with session.begin():
         session.expire_all()
         self.assertEqual(self.my_task.valid, True)
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_tasks.py


示例18: test_increase_priority

 def test_increase_priority(self):
     out = run_client(['bkr', 'job-modify', self.job.t_id,  '--priority', 'High'])
     self.assertIn('Successfully modified jobs %s' % self.job.t_id, out)
     with session.begin():
         session.expire_all()
         for rs in self.job.recipesets:
             self.assertEquals(rs.priority.value, 'High')
         self.assertEquals(self.job.recipesets[0].activity[0].action, u'Changed')
         self.assertEquals(self.job.recipesets[0].activity[0].field_name, 'Priority')
         self.assertEquals(self.job.recipesets[0].activity[0].new_value, 'High')
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_job_modify.py


示例19: test_post_comment_to_recipeset

    def test_post_comment_to_recipeset(self):
        with session.begin():
            recipe = self.job.recipesets[0]

        comment_text = u'Never gonna give you up'
        out = run_client(['bkr', 'job-comment', recipe.t_id,
                          '--message', comment_text])
        with session.begin():
            session.expire_all()
            self.assertEqual(recipe.comments[0].comment, comment_text)
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_job_comment.py


示例20: test_change_user_name

 def test_change_user_name(self):
     with session.begin():
         lc = data_setup.create_labcontroller()
         user = data_setup.create_user()
     run_client(['bkr', 'labcontroller-modify',
                 '--user', user.user_name,
                 lc.fqdn])
     with session.begin():
         session.expire_all()
         self.assertEqual(lc.user.user_name, user.user_name)
开发者ID:beaker-project,项目名称:beaker,代码行数:10,代码来源:test_labcontroller_modify.py



注:本文中的turbogears.database.session.expire_all函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python session.flush函数代码示例发布时间:2022-05-27
下一篇:
Python session.delete函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap