本文整理汇总了Python中tracim.model.new_revision函数的典型用法代码示例。如果您正苦于以下问题:Python new_revision函数的具体用法?Python new_revision怎么用?Python new_revision使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了new_revision函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_update
def test_update(self):
created_content = self.test_create()
content = DBSession.query(Content).filter(Content.id == created_content.id).one()
eq_(1, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
with new_revision(content):
time.sleep(0.00001)
content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
DBSession.flush()
eq_(2, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1').count())
eq_(1, DBSession.query(Content).filter(Content.id == created_content.id).count())
with new_revision(content):
time.sleep(0.00001)
content.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2'
content.label = 'TEST_CONTENT_1_UPDATED_2'
DBSession.flush()
eq_(1, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'TEST_CONTENT_1_UPDATED_2').count())
eq_(1, DBSession.query(Content).filter(Content.id == created_content.id).count())
revision_1 = DBSession.query(ContentRevisionRO)\
.filter(ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1').one()
revision_2 = DBSession.query(ContentRevisionRO)\
.filter(ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED').one()
revision_3 = DBSession.query(ContentRevisionRO)\
.filter(ContentRevisionRO.description == 'TEST_CONTENT_DESCRIPTION_1_UPDATED_2').one()
# Updated dates must be different
ok_(revision_1.updated < revision_2.updated < revision_3.updated)
# Created dates must be equal
ok_(revision_1.created == revision_2.created == revision_3.created)
开发者ID:Nonolost,项目名称:tracim,代码行数:33,代码来源:test_content.py
示例2: test_search_in_label_or_description
def test_search_in_label_or_description(self):
# HACK - D.A. - 2015-03-09
# This test is based on a bug which does NOT return results found
# at root of a workspace (eg a folder)
uapi = UserApi(None)
groups = [GroupApi(None).get_one(Group.TIM_USER),
GroupApi(None).get_one(Group.TIM_MANAGER),
GroupApi(None).get_one(Group.TIM_ADMIN)]
user = uapi.create_user(email='[email protected]',
groups=groups, save_now=True)
workspace = WorkspaceApi(user).create_workspace('test workspace',
save_now=True)
api = ContentApi(user)
a = api.create(ContentType.Folder, workspace, None,
'this is randomized folder', True)
p1 = api.create(ContentType.Page, workspace, a,
'this is dummy label content', True)
p2 = api.create(ContentType.Page, workspace, a, 'Hey ! Jon !', True)
with new_revision(p1):
p1.description = 'This is some amazing test'
with new_revision(p2):
p2.description = 'What\'s up ?'
api.save(p1)
api.save(p2)
id1 = p1.content_id
id2 = p2.content_id
eq_(1, DBSession.query(Workspace).filter(Workspace.label == 'test workspace').count())
eq_(1, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is randomized folder').count())
eq_(2, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'this is dummy label content').count())
eq_(1, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'This is some amazing test').count())
eq_(2, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.label == 'Hey ! Jon !').count())
eq_(1, DBSession.query(ContentRevisionRO).filter(ContentRevisionRO.description == 'What\'s up ?').count())
res = api.search(['dummy', 'jon'])
eq_(2, len(res.all()))
eq_(True, id1 in [o.content_id for o in res.all()])
eq_(True, id2 in [o.content_id for o in res.all()])
开发者ID:buxx,项目名称:tracim,代码行数:49,代码来源:test_content_api.py
示例3: put
def put(self, item_id, label='',content=''):
# TODO - SECURE THIS
workspace = tmpl_context.workspace
try:
api = ContentApi(tmpl_context.current_user)
item = api.get_one(int(item_id), self._item_type, workspace)
with new_revision(item):
api.update_content(item, label, content)
if not self._path_validation.validate_new_content(item):
return render_invalid_integrity_chosen_path(
item.get_label(),
)
api.save(item, ActionDescription.REVISION)
msg = _('{} updated').format(self._item_type_label)
tg.flash(msg, CST.STATUS_OK)
tg.redirect(self._std_url.format(tmpl_context.workspace_id, tmpl_context.folder_id, item.content_id))
except SameValueError as e:
msg = _('{} not updated: the content did not change').format(self._item_type_label)
tg.flash(msg, CST.STATUS_WARNING)
tg.redirect(self._err_url.format(tmpl_context.workspace_id, tmpl_context.folder_id, item_id))
except ValueError as e:
msg = _('{} not updated - error: {}').format(self._item_type_label, str(e))
tg.flash(msg, CST.STATUS_ERROR)
tg.redirect(self._err_url.format(tmpl_context.workspace_id, tmpl_context.folder_id, item_id))
开发者ID:buxx,项目名称:tracim,代码行数:30,代码来源:__init__.py
示例4: test_search_in_description
def test_search_in_description(self):
# HACK - D.A. - 2015-03-09
# This test is based on a bug which does NOT return results found
# at root of a workspace (eg a folder)
uapi = UserApi(None)
groups = [GroupApi(None).get_one(Group.TIM_USER),
GroupApi(None).get_one(Group.TIM_MANAGER),
GroupApi(None).get_one(Group.TIM_ADMIN)]
user = uapi.create_user(email='[email protected]',
groups=groups, save_now=True)
workspace = WorkspaceApi(user).create_workspace('test workspace',
save_now=True)
api = ContentApi(user)
a = api.create(ContentType.Folder, workspace, None,
'this is randomized folder', True)
p = api.create(ContentType.Page, workspace, a,
'this is dummy label content', True)
with new_revision(p):
p.description = 'This is some amazing test'
api.save(p)
original_id = p.content_id
res = api.search(['dummy'])
eq_(1, len(res.all()))
item = res.all()[0]
eq_(original_id, item.content_id)
开发者ID:buxx,项目名称:tracim,代码行数:32,代码来源:test_content_api.py
示例5: move_file
def move_file(self, destpath):
workspace = self.provider.get_workspace_from_path(
normpath(destpath),
WorkspaceApi(self.user)
)
parent = self.provider.get_parent_from_path(
normpath(destpath),
self.content_api,
workspace
)
with new_revision(self.content):
if basename(destpath) != self.getDisplayName():
self.content_api.update_content(self.content, re.sub('\.[^\.]+$', '', self.provider.transform_to_bdd(basename(destpath))))
self.content_api.save(self.content)
else:
self.content_api.move(
item=self.content,
new_parent=parent,
must_stay_in_same_workspace=False,
new_workspace=workspace
)
transaction.commit()
开发者ID:Nonolost,项目名称:tracim,代码行数:27,代码来源:sql_resources.py
示例6: move_recursively
def move_recursively(self, item: Content,
new_parent: Content, new_workspace: Workspace):
self.move(item, new_parent, False, new_workspace)
self.save(item, do_notify=False)
for child in item.children:
with new_revision(child):
self.move_recursively(child, item, new_workspace)
return
开发者ID:buxx,项目名称:tracim,代码行数:9,代码来源:content.py
示例7: test_unit__search_exclude_content_under_deleted_or_archived_parents__ok
def test_unit__search_exclude_content_under_deleted_or_archived_parents__ok(self):
admin = DBSession.query(User).filter(User.email == '[email protected]').one()
workspace = self._create_workspace_and_test('workspace_1', admin)
folder_1 = self._create_content_and_test('folder_1', workspace=workspace, type=ContentType.Folder)
folder_2 = self._create_content_and_test('folder_2', workspace=workspace, type=ContentType.Folder)
page_1 = self._create_content_and_test('foo', workspace=workspace, type=ContentType.Page, parent=folder_1)
page_2 = self._create_content_and_test('bar', workspace=workspace, type=ContentType.Page, parent=folder_2)
api = ContentApi(admin)
foo_result = api.search(['foo']).all()
eq_(1, len(foo_result))
ok_(page_1 in foo_result)
bar_result = api.search(['bar']).all()
eq_(1, len(bar_result))
ok_(page_2 in bar_result)
with new_revision(folder_1):
api.delete(folder_1)
with new_revision(folder_2):
api.archive(folder_2)
# Actually ContentApi.search don't filter it
foo_result = api.search(['foo']).all()
eq_(1, len(foo_result))
ok_(page_1 in foo_result)
bar_result = api.search(['bar']).all()
eq_(1, len(bar_result))
ok_(page_2 in bar_result)
# ContentApi offer exclude_unavailable method to do it
foo_result = api.search(['foo']).all()
api.exclude_unavailable(foo_result)
eq_(0, len(foo_result))
bar_result = api.search(['bar']).all()
api.exclude_unavailable(bar_result)
eq_(0, len(bar_result))
开发者ID:buxx,项目名称:tracim,代码行数:40,代码来源:test_content_api.py
示例8: action
def action(self):
try:
# When undeleting/unarchiving we except a content with the new name to not exist, thus if we
# don't get an error and the database request send back a result, we stop the action
self.content_api.get_one_by_label_and_parent(self._new_name, self.content.parent)
raise DAVError(HTTP_FORBIDDEN)
except NoResultFound:
with new_revision(self.content):
self.content_api.update_content(self.content, self._new_name)
self._actions[self._type](self.content)
self.content_api.save(self.content, self._type)
transaction.commit()
开发者ID:lebouquetin,项目名称:tracim,代码行数:13,代码来源:sql_resources.py
示例9: test_query
def test_query(self):
content1 = self.test_create()
with new_revision(content1):
content1.description = 'TEST_CONTENT_DESCRIPTION_1_UPDATED'
DBSession.flush()
content2 = self.test_create(key='2')
with new_revision(content2):
content2.description = 'TEST_CONTENT_DESCRIPTION_2_UPDATED'
DBSession.flush()
workspace1 = DBSession.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_1').one()
workspace2 = DBSession.query(Workspace).filter(Workspace.label == 'TEST_WORKSPACE_2').one()
# To get Content in database we have to join Content and ContentRevisionRO with particular condition:
# Join have to be on most recent revision
join_sub_query = DBSession.query(ContentRevisionRO.revision_id)\
.filter(ContentRevisionRO.content_id == Content.id)\
.order_by(ContentRevisionRO.revision_id.desc())\
.limit(1)\
.correlate(Content)
base_query = DBSession.query(Content)\
.join(ContentRevisionRO, and_(Content.id == ContentRevisionRO.content_id,
ContentRevisionRO.revision_id == join_sub_query))
eq_(2, base_query.count())
eq_(1, base_query.filter(Content.workspace == workspace1).count())
eq_(1, base_query.filter(Content.workspace == workspace2).count())
content1_from_query = base_query.filter(Content.workspace == workspace1).one()
eq_(content1.id, content1_from_query.id)
eq_('TEST_CONTENT_DESCRIPTION_1_UPDATED', content1_from_query.description)
user_admin = DBSession.query(User).filter(User.email == '[email protected]').one()
api = ContentApi(None)
content1_from_api = api.get_one(content1.id, ContentType.Page, workspace1)
开发者ID:Nonolost,项目名称:tracim,代码行数:39,代码来源:test_content.py
示例10: test_archive
def test_archive(self):
uapi = UserApi(None)
groups = [GroupApi(None).get_one(Group.TIM_USER),
GroupApi(None).get_one(Group.TIM_MANAGER),
GroupApi(None).get_one(Group.TIM_ADMIN)]
user = uapi.create_user(email='[email protected]',
groups=groups, save_now=True)
workspace = WorkspaceApi(user).create_workspace('test workspace',
save_now=True)
api = ContentApi(user)
item = api.create(ContentType.Folder, workspace, None,
'not_archived', True)
item2 = api.create(ContentType.Folder, workspace, None,
'to_archive', True)
uid = user.user_id
wid = workspace.workspace_id
transaction.commit()
# Refresh instances after commit
user = uapi.get_one(uid)
workspace = WorkspaceApi(user).get_one(wid)
api = ContentApi(user)
items = api.get_all(None, ContentType.Any, workspace)
eq_(2, len(items))
items = api.get_all(None, ContentType.Any, workspace)
with new_revision(items[0]):
api.archive(items[0])
transaction.commit()
# Refresh instances after commit
user = uapi.get_one(uid)
workspace = WorkspaceApi(user).get_one(wid)
api = ContentApi(user)
items = api.get_all(None, ContentType.Any, workspace)
eq_(1, len(items))
transaction.commit()
# Refresh instances after commit
user = uapi.get_one(uid)
workspace = WorkspaceApi(user).get_one(wid)
api = ContentApi(user)
# Test that the item is still available if "show deleted" is activated
api = ContentApi(None, show_archived=True)
items = api.get_all(None, ContentType.Any, workspace)
eq_(2, len(items))
开发者ID:buxx,项目名称:tracim,代码行数:51,代码来源:test_content_api.py
示例11: update_file
def update_file(self):
"""
Called when we're updating an existing content; we create a new revision and update the file content
"""
with new_revision(self._content):
self._api.update_file_data(
self._content,
self._file_name,
util.guessMimeType(self._content.file_name),
self._file_stream.read()
)
self._api.save(self._content, ActionDescription.REVISION)
开发者ID:lebouquetin,项目名称:tracim,代码行数:14,代码来源:__init__.py
示例12: test_set_status_unknown_status
def test_set_status_unknown_status(self):
uapi = UserApi(None)
groups = [GroupApi(None).get_one(Group.TIM_USER),
GroupApi(None).get_one(Group.TIM_MANAGER),
GroupApi(None).get_one(Group.TIM_ADMIN)]
user = uapi.create_user(email='[email protected]',
groups=groups, save_now=True)
workspace = WorkspaceApi(user).create_workspace('test workspace',
save_now=True)
api = ContentApi(user)
c = api.create(ContentType.Folder, workspace, None, 'parent', True)
with new_revision(c):
api.set_status(c, 'unknown-status')
开发者ID:buxx,项目名称:tracim,代码行数:15,代码来源:test_content_api.py
示例13: put_status
def put_status(self, item_id, status):
item_id = int(item_id)
content_api = ContentApi(tmpl_context.current_user)
item = content_api.get_one(item_id, self._item_type, tmpl_context.workspace)
try:
with new_revision(item):
content_api.set_status(item, status)
content_api.save(item, ActionDescription.STATUS_UPDATE)
msg = _('{} status updated').format(self._item_type_label)
tg.flash(msg, CST.STATUS_OK)
tg.redirect(self._std_url.format(item.workspace_id, item.parent_id, item.content_id))
except ValueError as e:
msg = _('{} status not updated: {}').format(self._item_type_label, str(e))
tg.flash(msg, CST.STATUS_ERROR)
tg.redirect(self._err_url.format(item.workspace_id, item.parent_id, item.content_id))
开发者ID:buxx,项目名称:tracim,代码行数:15,代码来源:__init__.py
示例14: update_event
def update_event(
self,
calendar: Calendar,
event: iCalendarEvent,
event_name: str,
current_user: User,
) -> Content:
"""
Update Content Event
:param calendar: Event calendar owner
:param event: ICS event
:param event_name: Event name (ID) like
20160602T083511Z-18100-1001-1-71_Bastien-20160602T083516Z.ics
:param current_user: Current modification asking user
:return: Updated Content
"""
workspace = None
if isinstance(calendar, WorkspaceCalendar):
workspace = calendar.related_object
elif isinstance(calendar, UserCalendar):
pass
else:
raise UnknownCalendarType('Type "{0}" is not implemented'
.format(type(calendar)))
content_api = ContentApi(
current_user,
force_show_all_types=True,
disable_user_workspaces_filter=True
)
content = content_api.find_one_by_unique_property(
property_name='name',
property_value=event_name,
workspace=workspace
)
with new_revision(content):
self.populate_content_with_event(
content,
event,
event_name
)
content.revision_type = ActionDescription.EDITION
DBSession.flush()
transaction.commit()
return content
开发者ID:buxx,项目名称:tracim,代码行数:48,代码来源:calendar.py
示例15: test_set_status_ok
def test_set_status_ok(self):
uapi = UserApi(None)
groups = [GroupApi(None).get_one(Group.TIM_USER),
GroupApi(None).get_one(Group.TIM_MANAGER),
GroupApi(None).get_one(Group.TIM_ADMIN)]
user = uapi.create_user(email='[email protected]',
groups=groups, save_now=True)
workspace = WorkspaceApi(user).create_workspace('test workspace',
save_now=True)
api = ContentApi(user)
c = api.create(ContentType.Folder, workspace, None, 'parent', True)
with new_revision(c):
for new_status in ['open', 'closed-validated', 'closed-unvalidated',
'closed-deprecated']:
api.set_status(c, new_status)
eq_(new_status, c.status)
eq_(ActionDescription.STATUS_UPDATE, c.revision_type)
开发者ID:buxx,项目名称:tracim,代码行数:20,代码来源:test_content_api.py
示例16: put_archive
def put_archive(self, item_id):
# TODO - CHECK RIGHTS
item_id = int(item_id)
content_api = ContentApi(tmpl_context.current_user)
item = content_api.get_one(item_id, self._item_type, tmpl_context.workspace)
try:
next_url = self._parent_url.format(item.workspace_id, item.parent_id)
undo_url = self._std_url.format(item.workspace_id, item.parent_id, item.content_id)+'/put_archive_undo'
msg = _('{} archived. <a class="alert-link" href="{}">Cancel action</a>').format(self._item_type_label, undo_url)
with new_revision(item):
content_api.archive(item)
content_api.save(item, ActionDescription.ARCHIVING)
tg.flash(msg, CST.STATUS_OK, no_escape=True) # TODO allow to come back
tg.redirect(next_url)
except ValueError as e:
next_url = self._std_url.format(item.workspace_id, item.parent_id, item.content_id)
msg = _('{} not archived: {}').format(self._item_type_label, str(e))
tg.flash(msg, CST.STATUS_ERROR)
tg.redirect(next_url)
开发者ID:buxx,项目名称:tracim,代码行数:21,代码来源:__init__.py
示例17: put_archive_undo
def put_archive_undo(self, item_id):
# TODO - CHECK RIGHTS
item_id = int(item_id)
content_api = ContentApi(tmpl_context.current_user, True) # Here we do not filter archived items
item = content_api.get_one(item_id, self._item_type, tmpl_context.workspace)
try:
next_url = self._std_url.format(item.workspace_id, item.parent_id, item.content_id)
msg = _('{} unarchived.').format(self._item_type_label)
with new_revision(item):
content_api.unarchive(item)
content_api.save(item, ActionDescription.UNARCHIVING)
tg.flash(msg, CST.STATUS_OK)
tg.redirect(next_url )
except ValueError as e:
msg = _('{} not un-archived: {}').format(self._item_type_label, str(e))
next_url = self._std_url.format(item.workspace_id, item.parent_id, item.content_id)
# We still use std url because the item has not been archived
tg.flash(msg, CST.STATUS_ERROR)
tg.redirect(next_url)
开发者ID:buxx,项目名称:tracim,代码行数:21,代码来源:__init__.py
示例18: move_file
def move_file(self, destpath):
workspace = self.content.workspace
parent = self.content.parent
with new_revision(self.content):
if basename(destpath) != self.getDisplayName():
new_given_file_name = transform_to_bdd(basename(destpath))
new_file_name, new_file_extension = \
os.path.splitext(new_given_file_name)
self.content_api.update_content(
self.content,
new_file_name,
)
self.content.file_extension = new_file_extension
self.content_api.save(self.content)
else:
workspace_api = WorkspaceApi(self.user)
content_api = ContentApi(self.user)
destination_workspace = self.provider.get_workspace_from_path(
destpath,
workspace_api,
)
destination_parent = self.provider.get_parent_from_path(
destpath,
content_api,
destination_workspace,
)
self.content_api.move(
item=self.content,
new_parent=destination_parent,
must_stay_in_same_workspace=False,
new_workspace=destination_workspace
)
transaction.commit()
开发者ID:lebouquetin,项目名称:tracim,代码行数:40,代码来源:sql_resources.py
示例19: put_status
def put_status(self, item_id, status):
item_id = int(item_id)
content_api = ContentApi(tmpl_context.current_user)
try:
item = content_api.get_one(item_id, self._item_type,
tmpl_context.workspace)
with new_revision(item):
content_api.set_status(item, status)
content_api.save(item, ActionDescription.STATUS_UPDATE)
msg = _('{} status updated').format(self._item_type_label)
tg.flash(msg, CST.STATUS_OK)
tg.redirect(self._std_url.format(item.workspace_id, item.parent_id, item.content_id))
except ValueError as e:
msg = _('{} status not updated: {}').format(self._item_type_label, str(e))
tg.flash(msg, CST.STATUS_ERROR)
tg.redirect(self._err_url.format(item.workspace_id, item.parent_id, item.content_id))
except NoResultFound as e:
# probably the content is deleted or archived => forbidden to update status
content_api = ContentApi(
tmpl_context.current_user,
show_archived=True,
show_deleted=True
)
item = content_api.get_one(
item_id,
self._item_type,
tmpl_context.workspace
)
next_url = self._std_url.format(
item.workspace_id, item.parent_id, item.content_id
)
msg = _('{} status not updated: the operation '
'is not allowed on deleted/archived content').format(
self._item_type_label
)
tg.flash(msg, CST.STATUS_ERROR)
tg.redirect(next_url)
开发者ID:lebouquetin,项目名称:tracim,代码行数:39,代码来源:__init__.py
示例20: put_delete_undo
def put_delete_undo(self, item_id):
# TODO - CHECK RIGHTS
item_id = int(item_id)
content_api = ContentApi(tmpl_context.current_user, True, True) # Here we do not filter deleted items
item = content_api.get_one(item_id, self._item_type, tmpl_context.workspace)
try:
next_url = self._std_url.format(item.workspace_id, item.parent_id, item.content_id)
msg = _('{} undeleted.').format(self._item_type_label)
with new_revision(item):
content_api.undelete(item)
content_api.save(item, ActionDescription.UNDELETION)
tg.flash(msg, CST.STATUS_OK)
tg.redirect(next_url)
except ValueError as e:
logger.debug(self, 'Exception: {}'.format(e.__str__))
back_url = self._parent_url.format(item.workspace_id, item.parent_id)
msg = _('{} not un-deleted: {}').format(self._item_type_label, str(e))
tg.flash(msg, CST.STATUS_ERROR)
tg.redirect(back_url)
开发者ID:buxx,项目名称:tracim,代码行数:22,代码来源:__init__.py
注:本文中的tracim.model.new_revision函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论