本文整理汇总了Python中turbogears.database.session.begin函数的典型用法代码示例。如果您正苦于以下问题:Python begin函数的具体用法?Python begin怎么用?Python begin使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了begin函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: update_db
def update_db(self):
self.logger.info('Updating local Beaker database..')
for task_rpm in self.tasks_added:
self.logger.debug('Adding %s'% task_rpm)
with open(os.path.join(self.task_dir,task_rpm)) as f:
try:
session.begin()
task = self.tasks.process_taskinfo(self.tasks.read_taskinfo(f))
old_rpm = task.rpm
task.rpm = task_rpm
session.commit()
except Exception:
session.rollback()
session.close()
self.logger.critical('Error adding task %s' % task_rpm)
unlink_ignore(task_rpm)
else:
session.close()
self.logger.debug('Successfully added %s' % task.rpm)
if old_rpm:
unlink_ignore(os.path.join(self.task_dir, old_rpm))
# Update task repo
self.logger.info('Creating repodata..')
Task.update_repo()
return
开发者ID:sibiaoluo,项目名称:beaker,代码行数:33,代码来源:sync_tasks.py
示例2: test_recipe_running_then_watchdog_expired
def test_recipe_running_then_watchdog_expired(self):
""" This tests the case where the recipe is running, has a valid
reservation request, but the watchdog expires before it's
completed.
"""
with session.begin():
recipe = data_setup.create_recipe(
task_list=[Task.by_name(u'/distribution/install')],
reservesys=True)
job = data_setup.create_job_for_recipes([recipe])
job_id = job.id
data_setup.mark_recipe_tasks_finished(recipe,
task_status=TaskStatus.aborted)
job.recipesets[0].recipes[0].abort()
beakerd.update_dirty_jobs()
with session.begin():
job = Job.by_id(job_id)
self.assertEqual(job.recipesets[0].recipes[0].status,
TaskStatus.reserved)
job.recipesets[0].recipes[0].return_reservation()
beakerd.update_dirty_jobs()
with session.begin():
job = Job.by_id(job_id)
self.assertEqual(job.recipesets[0].recipes[0].status,
TaskStatus.aborted)
开发者ID:ShaolongHu,项目名称:beaker,代码行数:25,代码来源:test_update_status.py
示例3: test_task_aborted_return_reservation
def test_task_aborted_return_reservation(self):
""" This tests the case where the task was aborted, then
the recipe goes to Reserved state and then finally the reservation
is returned
"""
with session.begin():
recipe = data_setup.create_recipe(
task_list=[Task.by_name(u'/distribution/install')],
reservesys=True)
job = data_setup.create_job_for_recipes([recipe])
job_id = job.id
data_setup.mark_recipe_tasks_finished(recipe, result=TaskResult.warn,
task_status=TaskStatus.aborted)
job._mark_dirty()
beakerd.update_dirty_jobs()
with session.begin():
job = Job.by_id(job_id)
self.assertEqual(job.recipesets[0].recipes[0].status,
TaskStatus.reserved)
job.recipesets[0].recipes[0].return_reservation()
beakerd.update_dirty_jobs()
with session.begin():
job = Job.by_id(job_id)
self.assertEqual(job.recipesets[0].recipes[0].status,
TaskStatus.aborted)
开发者ID:ShaolongHu,项目名称:beaker,代码行数:25,代码来源:test_update_status.py
示例4: test_delete_install_options
def test_delete_install_options(self):
with session.begin():
self.system.provisions[self.distro_tree.arch] = Provision(
arch=self.distro_tree.arch, ks_meta=u'some_ks_meta_var=1',
kernel_options=u'some_kernel_option=1',
kernel_options_post=u'some_kernel_option=2')
orig_date_modified = self.system.date_modified
b = self.browser
login(b)
self.go_to_system_view(tab='Install Options')
delete_and_confirm(b, '//tr[th/text()="Architecture"]')
b.find_element_by_xpath('//h1[text()="%s"]' % self.system.fqdn)
with session.begin():
session.refresh(self.system)
self.assert_(self.system.date_modified > orig_date_modified)
self.assert_(self.distro_tree.arch not in self.system.provisions)
self.assertEquals(self.system.activity[0].action, u'Removed')
self.assertEquals(self.system.activity[0].field_name,
u'InstallOption:kernel_options_post:i386')
self.assertEquals(self.system.activity[1].action, u'Removed')
self.assertEquals(self.system.activity[1].field_name,
u'InstallOption:kernel_options:i386')
self.assertEquals(self.system.activity[2].action, u'Removed')
self.assertEquals(self.system.activity[2].field_name,
u'InstallOption:ks_meta:i386')
开发者ID:sujithshankar,项目名称:beaker,代码行数:25,代码来源:test_system_view.py
示例5: test_add_cc
def test_add_cc(self):
with session.begin():
self.system.cc = []
b = self.browser
login(b)
self.go_to_system_view(tab='Owner')
tab = b.find_element_by_id('owner')
tab.find_element_by_name('cc').send_keys('[email protected]')
tab.find_element_by_class_name('cc-add').submit()
tab.find_element_by_xpath('.//li[contains(text(), "[email protected]")]')
tab.find_element_by_name('cc').send_keys('[email protected]')
tab.find_element_by_class_name('cc-add').submit()
tab.find_element_by_xpath('.//li[contains(text(), "[email protected]")]')
tab.find_element_by_xpath('.//li[contains(text(), "[email protected]")]')
with session.begin():
session.refresh(self.system)
self.assertEquals(set(self.system.cc),
set([u'[email protected]', u'[email protected]']))
self.assertEquals(self.system.activity[0].field_name, u'Cc')
self.assertEquals(self.system.activity[0].service, u'HTTP')
self.assertEquals(self.system.activity[0].action, u'Added')
self.assertEquals(self.system.activity[0].new_value, u'[email protected]')
self.assertEquals(self.system.activity[1].field_name, u'Cc')
self.assertEquals(self.system.activity[1].service, u'HTTP')
self.assertEquals(self.system.activity[1].action, u'Added')
self.assertEquals(self.system.activity[1].new_value, u'[email protected]')
开发者ID:sujithshankar,项目名称:beaker,代码行数:26,代码来源:test_system_view.py
示例6: test_ackability
def test_ackability(self):
# XXX If this test gets any more complicated, we should break
# it up
b = self.browser
login(b, user=self.user_1.user_name, password=self.password)
b.get(get_server_base() + 'jobs/%d' % self.job.id)
#This tests that the ack is there for owner
b.find_element_by_name("response_box_%d" % self.job.recipesets[0].id)
logout(b)
# Not there for non owner
login(b, user=self.user_2.user_name, password=self.password)
b.get(get_server_base() + 'jobs/%d' % self.job.id)
b.find_element_by_xpath("//td[normalize-space(text())='RS:%s' and "
"not(./input[@name='response_box_%s'])]" % (
self.job.recipesets[0].id, self.job.recipesets[0].id))
# Is there for job owner's group co-member.
with session.begin():
data_setup.add_user_to_group(self.user_1, self.group)
data_setup.add_user_to_group(self.user_3, self.group)
logout(b)
login(b, user=self.user_3.user_name, password=self.password)
b.get(get_server_base() + 'jobs/%d' % self.job.id)
b.find_element_by_xpath("//input[@name='response_box_%s']" %
self.job.recipesets[0].id)
# There for job's group member
with session.begin():
self.job.group = self.group
self.user_2.groups.append(self.group)
logout(b)
login(b, user=self.user_2.user_name, password=self.password)
b.get(get_server_base() + 'jobs/%s' % self.job.id)
b.find_element_by_name("response_box_%s" % self.job.recipesets[0].id)
开发者ID:sibiaoluo,项目名称:beaker,代码行数:33,代码来源:test_job_ack.py
示例7: test_new_power_settings
def test_new_power_settings(self):
with session.begin():
lc = data_setup.create_labcontroller()
system = data_setup.create_system(lab_controller=lc,
with_power=False)
b = self.browser
login(b)
self.go_to_system_view(system=system, tab='Power Settings')
tab = b.find_element_by_id('power-settings')
BootstrapSelect(tab.find_element_by_name('power_type'))\
.select_by_visible_text('virsh')
tab.find_element_by_name('power_address').send_keys \
('qemu+ssh:10.10.10.10')
tab.find_element_by_name('power_user').send_keys('root')
tab.find_element_by_name('power_id').send_keys(system.fqdn)
tab.find_element_by_tag_name('form').submit()
# check activity records
power_fields_changed = {'power_type': 'virsh',
'power_address': 'qemu+ssh:10.10.10.1',
'power_user': 'root',
'power_id': system.fqdn,
'power_quiescent_period': 5}
with session.begin():
session.refresh(system)
for activity in system.activity:
self.assertEquals(activity.new_value,
power_fields_changed[activity])
开发者ID:sujithshankar,项目名称:beaker,代码行数:29,代码来源:test_system_view.py
示例8: test_filters_out_excluded_families
def test_filters_out_excluded_families(self):
with session.begin():
rhel3_i386 = data_setup.create_distro_tree(
osmajor=u"RedHatEnterpriseLinux3", arch=u"i386", distro_tags=[u"STABLE"]
)
rhel3_x86_64 = data_setup.create_distro_tree(
osmajor=u"RedHatEnterpriseLinux3", arch=u"x86_64", distro_tags=[u"STABLE"]
)
rhel4_i386 = data_setup.create_distro_tree(
osmajor=u"RedHatEnterpriseLinux4", arch=u"i386", distro_tags=[u"STABLE"]
)
rhel4_x86_64 = data_setup.create_distro_tree(
osmajor=u"RedHatEnterpriseLinux4", arch=u"x86_64", distro_tags=[u"STABLE"]
)
# system with RHEL4 i386 and RHEL3 x86_64 excluded
system = data_setup.create_system(arch=u"i386")
system.arch.append(Arch.by_name(u"x86_64"))
system.excluded_osmajor.extend(
[
ExcludeOSMajor(arch=Arch.by_name(u"i386"), osmajor=OSMajor.by_name(u"RedHatEnterpriseLinux4")),
ExcludeOSMajor(arch=Arch.by_name(u"x86_64"), osmajor=OSMajor.by_name(u"RedHatEnterpriseLinux3")),
]
)
out = run_client(["bkr", "machine-test", "--machine", system.fqdn])
self.assert_(out.startswith("Submitted:"), out)
with session.begin():
new_job = Job.query.order_by(Job.id.desc()).first()
distro_trees = [recipe.distro_tree for recipe in new_job.all_recipes]
self.assert_(rhel3_i386 in distro_trees, distro_trees)
self.assert_(rhel3_x86_64 not in distro_trees, distro_trees)
self.assert_(rhel4_i386 not in distro_trees, distro_trees)
self.assert_(rhel4_x86_64 in distro_trees, distro_trees)
开发者ID:beaker-project,项目名称:beaker,代码行数:32,代码来源:test_machine_test.py
示例9: init
def init():
if(len(Work.query().all()) > 0):
return
log.info("initializing data")
works = [
dict(title="Mountain Stream",
file_path="mountain_stream",
description="Taken in Colorado during the summer of 1999.",
purchases=0,
id=0),
dict(title="Graffiti",
file_path="graffiti",
description="SpongeBob and Patrick in St. Joseph, Missouri.",
purchases=0,
id=1),
dict(title="Lenexa Conference Center",
file_path="lenexa_conference_center",
description="Taken to scout out a wedding reception location.",
purchases=0,
id=2),
dict(title="Glass",
file_path="glass",
description="Somewhere in California",
purchases=0,
id=3),
]
session.begin()
for w in works:
work = Work(dict=w)
session.commit()
开发者ID:davidfooks,项目名称:fpys2,代码行数:34,代码来源:model.py
示例10: test_can_return_manual_reservation_when_automated
def test_can_return_manual_reservation_when_automated(self):
with session.begin():
user = data_setup.create_user(password='foobar')
system = data_setup.create_system(owner=user, status=SystemStatus.manual)
b = self.browser
login(b, user=user.user_name, password="foobar")
# Take
b.get(get_server_base() + 'view/%s' % system.fqdn)
b.find_element_by_link_text('Take').click()
b.find_element_by_xpath('//div[contains(@class, "system-quick-usage")]'
'//span[@class="label" and text()="Reserved"]')
# toggle status to Automated
with session.begin():
system.lab_controller = data_setup.create_labcontroller()
system.status = SystemStatus.automated
# Attempt to return
b.get(get_server_base() + 'view/%s' % system.fqdn)
b.find_element_by_link_text('Return').click()
b.find_element_by_xpath('//div[contains(@class, "system-quick-usage")]'
'//span[@class="label" and text()="Idle"]')
开发者ID:beaker-project,项目名称:beaker,代码行数:25,代码来源:test_system_return.py
示例11: test_remove_user_job_cancel
def test_remove_user_job_cancel(self):
with session.begin():
user = data_setup.create_user(user_name =
data_setup.unique_name('aaaaa%s'))
job = data_setup.create_job(owner=user)
data_setup.mark_job_running(job)
b = self.browser
login(b)
b.get(get_server_base() + 'users')
b.find_element_by_xpath('//a[@href="remove?id=%d"]' %user.user_id).click()
# XXX: not necessary, but doing it here to buy time, since sometimes the
# job cancellation seems to take a while
logout(b)
# reflect the change in recipe task status when
# update_dirty_jobs() is called
session.expunge_all()
beakerd.update_dirty_jobs()
with session.begin():
job = Job.by_id(job.id)
self.assertEquals(job.status, TaskStatus.cancelled)
self.assertIn('User %s removed' % user.user_name,
job.recipesets[0].recipes[0].tasks[0].results[0].log)
开发者ID:ustbgaofan,项目名称:beaker,代码行数:25,代码来源:test_remove_user.py
示例12: test_duplicate_notify_cc_addresses_are_merged
def test_duplicate_notify_cc_addresses_are_merged(self):
with session.begin():
user = data_setup.create_user(password=u'hornet')
b = self.browser
login(b, user.user_name, u'hornet')
b.get(get_server_base())
click_menu_item(b, 'Scheduler', 'New Job')
xml_file = tempfile.NamedTemporaryFile()
xml_file.write('''
<job>
<whiteboard>job with duplicate notify cc addresses</whiteboard>
<notify>
<cc>[email protected]</cc>
<cc>[email protected]</cc>
</notify>
<recipeSet>
<recipe>
<distroRequires>
<distro_name op="=" value="BlueShoeLinux5-5" />
</distroRequires>
<hostRequires/>
<task name="/distribution/install" role="STANDALONE"/>
</recipe>
</recipeSet>
</job>
''')
xml_file.flush()
b.find_element_by_id('jobs_filexml').send_keys(xml_file.name)
b.find_element_by_xpath('//button[text()="Submit Data"]').click()
b.find_element_by_xpath('//button[text()="Queue"]').click()
flash_message = b.find_element_by_class_name('flash').text
self.assert_(flash_message.startswith('Success!'), flash_message)
with session.begin():
job = Job.query.filter(Job.owner == user).order_by(Job.id.desc()).first()
self.assertEqual(job.cc, ['[email protected]'])
开发者ID:omps,项目名称:beaker,代码行数:35,代码来源:test_jobs.py
示例13: test_add_group
def test_add_group(self):
with session.begin():
group = data_setup.create_group()
user_password = 'password'
user = data_setup.create_user(password=user_password)
data_setup.add_user_to_group(user, group)
orig_date_modified = self.system.date_modified
# as admin, assign the system to our test group
b = self.browser
login(b)
self.go_to_system_view(tab='Groups')
b.find_element_by_name('group.text').send_keys(group.group_name)
b.find_element_by_name('groups').submit()
b.find_element_by_xpath(
'//div[@id="groups"]'
'//td[normalize-space(text())="%s"]' % group.group_name)
with session.begin():
session.refresh(self.system)
self.assert_(self.system.date_modified > orig_date_modified)
# as a user in the group, can we see it?
logout(b)
login(b, user.user_name, user_password)
click_menu_item(b, 'Systems', 'Available')
b.find_element_by_name('simplesearch').send_keys(self.system.fqdn)
b.find_element_by_name('systemsearch_simple').submit()
check_system_search_results(b, present=[self.system])
开发者ID:sujithshankar,项目名称:beaker,代码行数:28,代码来源:test_system_view.py
示例14: test_set_active_policy_to_custom_policy
def test_set_active_policy_to_custom_policy(self):
with session.begin():
user1 = data_setup.create_user()
user2 = data_setup.create_user()
self.system.custom_access_policy.add_rule(
permission=SystemPermission.edit_system, user=user1)
pool = data_setup.create_system_pool()
pool.access_policy.add_rule(
permission=SystemPermission.edit_system, user=user2)
self.system.active_access_policy = pool.access_policy
self.assertFalse(self.system.active_access_policy.grants
(user1, SystemPermission.edit_system))
self.assertTrue(self.system.active_access_policy.grants
(user2, SystemPermission.edit_system))
s = requests.Session()
s.post(get_server_base() + 'login', data={'user_name': self.owner.user_name,
'password': 'theowner'}).raise_for_status()
response = patch_json(get_server_base() +
'systems/%s/' % self.system.fqdn, session=s,
data={'active_access_policy': {'custom': True}},
)
response.raise_for_status()
with session.begin():
session.expire_all()
self.assertTrue(self.system.active_access_policy.grants \
(user1, SystemPermission.edit_system))
开发者ID:ShaolongHu,项目名称:beaker,代码行数:28,代码来源:test_systems.py
示例15: test_activity_is_not_logged_when_leaving_power_settings_empty
def test_activity_is_not_logged_when_leaving_power_settings_empty(self):
# The bug was that we were recording a change to power_user or
# power_passwd because it changed from NULL to ''.
with session.begin():
self.system.power.power_type = PowerType.lazy_create(name=u'ilo')
self.system.power.power_user = None
self.system.power.power_passwd = None
self.system.power.power_id = None
PowerType.lazy_create(name=u'drac')
self.assertEquals(len(self.system.activity), 0)
b = self.browser
login(b)
self.go_to_system_view(tab='Power Settings')
tab = b.find_element_by_id('power-settings')
# change power type but leave the other fields empty
BootstrapSelect(tab.find_element_by_name('power_type'))\
.select_by_visible_text('drac')
tab.find_element_by_tag_name('form').submit()
tab.find_element_by_xpath('.//span[@class="sync-status" and not(text())]')
with session.begin():
session.refresh(self.system)
self.assertEquals(len(self.system.activity), 1,
'Expecting only one activity row for power_type but found: %r'
% self.system.activity)
self.assertEquals(self.system.activity[0].field_name, u'power_type')
开发者ID:sujithshankar,项目名称:beaker,代码行数:25,代码来源:test_system_view.py
示例16: test_update_labinfo
def test_update_labinfo(self):
with session.begin():
# Due to bz987313 system must have existing lab info
self.system.labinfo = LabInfo(weight=100)
orig_date_modified = self.system.date_modified
b = self.browser
login(b)
self.go_to_system_view(tab='Lab Info')
changes = {
'orig_cost': '1,000.00',
'curr_cost': '500.00',
'dimensions': '1x1x1',
'weight': '50',
'wattage': '500',
'cooling': '1',
}
for k, v in changes.iteritems():
b.find_element_by_name(k).clear()
b.find_element_by_name(k).send_keys(v)
b.find_element_by_xpath('//button[text()="Save Lab Info Changes"]').click()
self.assertEquals(b.find_element_by_class_name('flash').text,
'Saved Lab Info')
for k, v in changes.iteritems():
self.assertEquals(b.find_element_by_name(k).get_attribute('value'), v)
with session.begin():
session.refresh(self.system)
self.assert_(self.system.date_modified > orig_date_modified)
开发者ID:sujithshankar,项目名称:beaker,代码行数:27,代码来源:test_system_view.py
示例17: test_system_pools_import
def test_system_pools_import(self):
with session.begin():
system = data_setup.create_system()
pool1 = data_setup.create_system_pool()
pool2 = data_setup.create_system_pool()
login(self.browser)
self.import_csv((u'csv_type,fqdn,pool,deleted\n'
u'system_pool,%s,%s,False\n'
u'system_pool,%s,%s,False'%(system.fqdn, pool1.name,
system.fqdn, pool2.name)) \
.encode('utf8'))
self.failUnless(is_text_present(self.browser, 'No Errors'))
with session.begin():
session.refresh(system)
self.assertEquals([pool1.name, pool2.name],
[pool.name for pool in system.pools])
# test deletion
self.import_csv((u'csv_type,fqdn,pool,deleted\n'
u'system_pool,%s,%s,True' % (system.fqdn, pool2.name)) \
.encode('utf8'))
self.failUnless(is_text_present(self.browser, 'No Errors'))
with session.begin():
session.refresh(system)
self.assertNotIn(pool2.name, [pool.name for pool in system.pools])
# Attempting to add a system to a Non existent pool should throw an error
self.import_csv((u'csv_type,fqdn,pool,deleted\n'
u'system_pool,%s,poolpool,True' % system.fqdn) \
.encode('utf8'))
self.assertTrue(is_text_present(self.browser, 'poolpool: pool does not exist'))
开发者ID:ShaolongHu,项目名称:beaker,代码行数:31,代码来源:test_csv_import.py
示例18: test_recipe_running_then_cancelled
def test_recipe_running_then_cancelled(self):
""" This tests the case where the recipe is running, has a valid
reservation request, but is cancelled before it's completed.
"""
with session.begin():
recipe = data_setup.create_recipe(
task_list=[Task.by_name(u'/distribution/install')] * 2,
reservesys=True)
job = data_setup.create_job_for_recipes([recipe])
job_id = job.id
data_setup.mark_recipe_running(recipe)
data_setup.mark_recipe_installation_finished(recipe)
# we want at least one task to be Completed here
# https://bugzilla.redhat.com/show_bug.cgi?id=1195558
job.recipesets[0].recipes[0].tasks[0].stop()
job.recipesets[0].recipes[0].tasks[1].start()
beakerd.update_dirty_jobs()
with session.begin():
job = Job.by_id(job_id)
self.assertEqual(job.recipesets[0].recipes[0].status,
TaskStatus.running)
job.recipesets[0].cancel()
beakerd.update_dirty_jobs()
with session.begin():
job = Job.by_id(job_id)
self.assertEqual(job.recipesets[0].recipes[0].status,
TaskStatus.cancelled)
开发者ID:ShaolongHu,项目名称:beaker,代码行数:27,代码来源:test_update_status.py
示例19: setUp
def setUp(self):
session.begin()
from bkr.server.jobs import Jobs
self.controller = Jobs()
self.user = data_setup.create_user()
data_setup.create_distro_tree(distro_name=u'BlueShoeLinux5-5')
session.flush()
开发者ID:ShaolongHu,项目名称:beaker,代码行数:7,代码来源:test_update_status.py
示例20: run
def run(self):
session.begin()
recipe = Recipe.by_id(self.recipe_id)
self.ready_evt.set()
self.continue_evt.wait()
recipe.tasks[-1].stop()
session.commit()
开发者ID:ShaolongHu,项目名称:beaker,代码行数:7,代码来源:test_update_status.py
注:本文中的turbogears.database.session.begin函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论