• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python transaction.savepoint函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中transaction.savepoint函数的典型用法代码示例。如果您正苦于以下问题:Python savepoint函数的具体用法?Python savepoint怎么用?Python savepoint使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了savepoint函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __iter__

 def __iter__(self):
     count = 0
     for item in self.previous:
         count = (count + 1) % self.every
         if count == 0:
             transaction.savepoint(optimistic=True)
         yield item
开发者ID:ChamaraPhilipsuom,项目名称:collective.transmogrifier,代码行数:7,代码来源:savepoint.py


示例2: _initFolders

    def _initFolders( self ):
        from Products.CMFCore.PortalFolder import PortalFolder

        self.connection = makeConnection()
        try:
            r = self.connection.root()
            a = Application()
            r['Application'] = a
            self.root = a
            responseOut = self.responseOut = cStringIO.StringIO()
            self.app = makerequest( self.root, stdout=responseOut )
            self.app._setObject( 'folder1', PortalFolder( 'folder1' ) )
            self.app._setObject( 'folder2', PortalFolder( 'folder2' ) )
            folder1 = getattr( self.app, 'folder1' )
            folder2 = getattr( self.app, 'folder2' )

            manage_addFile( folder1, 'file'
                          , file='', content_type='text/plain')

            # Hack, we need a _p_mtime for the file, so we make sure that it
            # has one. We use a subtransaction, which means we can rollback
            # later and pretend we didn't touch the ZODB.
            transaction.savepoint(optimistic=True)
        except:
            self.connection.close()
            raise

        return self.app._getOb( 'folder1' ), self.app._getOb( 'folder2' )
开发者ID:goschtl,项目名称:zope,代码行数:28,代码来源:test_PortalFolder.py


示例3: testRenameLastObject

 def testRenameLastObject(self):
     # Renaming should not change position
     transaction.savepoint(optimistic=True)  # make rename work
     self.folder.manage_renameObjects(['baz'], ['bedrock'])
     self.assertEqual(self.folder.getObjectPosition('foo'), 0)
     self.assertEqual(self.folder.getObjectPosition('bar'), 1)
     self.assertEqual(self.folder.getObjectPosition('bedrock'), 2)
开发者ID:AlexStevens,项目名称:Products.CMFPlone,代码行数:7,代码来源:testOrderSupport.py


示例4: test_move_cant_delete_source

    def test_move_cant_delete_source( self ):

        #
        #   This test fails on Zope's earlier than 2.7.3 because of the
        #   changes required to 'OFS.CopytSupport.manage_pasteObjects'
        #   which must pass 'validate_src' of 2 to '_verifyObjectPaste'
        #   to indicate that the object is being moved, rather than
        #   simply copied.
        #
        #   If you are running with such a Zope, this test will fail,
        #   because the move (which should raise Unauthorized) will be
        #   allowed.
        #
        from AccessControl.Permissions import delete_objects as DeleteObjects
        from Products.CMFCore.PortalFolder import PortalFolder

        folder1, folder2 = self._initFolders()
        folder1.manage_permission( DeleteObjects, roles=(), acquire=0 )

        folder1._setObject( 'sub', PortalFolder( 'sub' ) )
        transaction.savepoint(optimistic=True) # get a _p_jar for 'sub'

        self.app.portal_types = DummyTypesTool()

        def _no_delete_objects(permission, object, context):
            return permission != DeleteObjects

        self._initPolicyAndUser( c_lambda=_no_delete_objects )

        cookie = folder1.manage_cutObjects( ids=( 'sub', ) )
        self._assertCopyErrorUnauth( folder2.manage_pasteObjects
                                   , cookie
                                   , ce_regex='Insufficient Privileges'
                                             + '.*%s' % DeleteObjects
                                   )
开发者ID:goschtl,项目名称:zope,代码行数:35,代码来源:test_PortalFolder.py


示例5: testOnItemRenamed

    def testOnItemRenamed(self):
        """Test notification when an item is renamed."""
        ## Since there is currently no special rule for this event, we
        ## should not send anything.
        portal = self.portal
        ntool = getToolByName(portal, NTOOL_ID)
        changeProperty = lambda key, value: \
            ntool.manage_changeProperties(**{key: value})
        self.login('manager')

        ## Enable some rules just to make sure that none of them
        ## match.
        changeProperty('item_creation_notification_enabled', True)
        changeProperty('on_item_creation_users', ['* :: *'])
        changeProperty('on_item_creation_mail_template',
                       ['* :: string:creation_mail_notification'])
        changeProperty('item_modification_notification_enabled', True)
        changeProperty('on_item_modification_users', ['* :: *'])
        changeProperty('on_item_modification_mail_template',
                       ['* :: string:modification_mail_notification'])
        changeProperty('item_wf_transition_notification_enabled', True)
        changeProperty('on_wf_transition_modification_users', ['* :: *'])
        changeProperty('on_wf_transition_modification_mail_template',
                       ['* :: string:workflow_mail_notification'])

        transaction.savepoint() ## We need this to cut/paste objects.
        portal.folder.manage_renameObjects(('document1', ),
                                           ('renamed-document', ))
        self.failUnlessSent(0)
开发者ID:collective,项目名称:Products.CMFNotification,代码行数:29,代码来源:testNotification.py


示例6: test18_retrieveObjectWhichHasBeenReplaced

    def test18_retrieveObjectWhichHasBeenReplaced(self):
        portal_repo = self.portal.portal_repository
        fol = self.portal.fol
        doc1 = fol.doc1
        doc2 = fol.doc2

        # save change no 1
        fol.setTitle('v1 of fol')
        doc1.setTitle("v1 of doc1")
        doc2.setTitle("v1 of doc2")

        portal_repo.applyVersionControl(doc1, comment='first save')
        portal_repo.applyVersionControl(doc2, comment='first save')

        transaction.savepoint(optimistic=True)
        fol.manage_renameObjects(['doc1','doc2'],['doc1_renamed', 'doc1'])

        doc1 = fol.doc1_renamed
        doc2 = fol.doc1

        doc1.setTitle('v2 of doc1_renamed')
        doc2.setTitle('v2 of doc1 (was doc2)')

        portal_repo.save(doc1, comment='second save')
        portal_repo.save(doc2, comment='second save')

        retrieved_data = portal_repo.retrieve(doc1, 0)
        ret_doc = retrieved_data.object
        self.assertEqual(ret_doc.getId(), 'doc1')
        self.assertEqual(ret_doc.Title(), 'v1 of doc1')

        portal_repo.revert(doc1, 0)
        rev_doc = fol.doc1_renamed
        self.assertEqual(rev_doc.getId(), 'doc1_renamed')
        self.assertEqual(rev_doc.Title(), 'v1 of doc1')
开发者ID:nacho22martin,项目名称:tesis,代码行数:35,代码来源:test_IntegrationTests.py


示例7: test_index_html_with_304_and_caching

    def test_index_html_with_304_and_caching( self ):

        # See collector #355
        self._setupCachingPolicyManager(DummyCachingManager())
        original_len = len(self.RESPONSE.headers)
        path, ref = self._extractFile()

        from webdav.common import rfc1123_date

        self.root.image = self._makeOne( 'test_image', 'test_image.gif' )
        image = self.root.image
        transaction.savepoint(optimistic=True)

        mod_time = image.modified()

        self.REQUEST.environ[ 'IF_MODIFIED_SINCE'
                            ] = '%s;' % rfc1123_date( mod_time+1 )

        data = image.index_html( self.REQUEST, self.RESPONSE )

        self.assertEqual( data, '' )
        self.assertEqual( self.RESPONSE.getStatus(), 304 )

        headers = self.RESPONSE.headers
        self.failUnless(len(headers) >= original_len + 3)
        self.failUnless('foo' in headers.keys())
        self.failUnless('bar' in headers.keys())
        self.assertEqual(headers['test_path'], '/test_image')
开发者ID:nacho22martin,项目名称:tesis,代码行数:28,代码来源:test_Image.py


示例8: test23_RegistryBasesNotVersionedOrRestored

    def test23_RegistryBasesNotVersionedOrRestored(self):
        portal_repo = self.portal.portal_repository
        fol = self.portal.fol

        fol.setTitle("v1")
        # Make it a component registry with bases
        base = aq_base(self.portal.getSiteManager())
        components = PersistentComponents()
        components.__bases__ = (base,)
        fol.setSiteManager(components)
        portal_repo.applyVersionControl(fol)

        broken_iface = broken.find_global(
            'never_gonna_be_real', 'IMissing',
            Broken=ZODB.interfaces.IBroken, type=InterfaceClass)
        sys.modules[broken_iface.__module__] = module = imp.new_module(
            broken_iface.__module__)
        module.IMissing = broken_iface

        # add a broken registrsation but do a savepoint before
        # breaking the interfaces to simulate a broken registrion from
        # a previous commit
        base.registerUtility(component=None, provided=broken_iface)
        transaction.savepoint(optimistic=True)
        del sys.modules[broken_iface.__module__]

        fol.setTitle("v2")

        # If an attempt was made to pickle the parent registry's
        # broken registration we would see an error here
        portal_repo.save(fol)

        self.assertEqual(self.portal.fol.Title(), "v2")
        self.assertTrue(
            self.portal.fol.getSiteManager().__bases__[0] is base)
开发者ID:ivanteoh,项目名称:Products.CMFEditions,代码行数:35,代码来源:test_IntegrationTests.py


示例9: handle_rename

    def handle_rename(self, action):
        data, errors = self.extractData()
        if errors:
            return

        parent = aq_parent(aq_inner(self.context))
        sm = getSecurityManager()
        if not sm.checkPermission('Copy or Move', parent):
            raise Unauthorized(_(u'Permission denied to rename ${title}.',
                                 mapping={u'title': self.context.title}))

        oldid = self.context.getId()
        newid = data['new_id']
        newid = INameChooser(parent).chooseName(newid, self.context)

        context_state = getMultiAdapter(
            (self.context, self.request), name='plone_context_state')
        if context_state.is_default_page():
            parent.setDefaultPage(newid)
        # Requires cmf.ModifyPortalContent permission
        self.context.title = data['new_title']
        # Requires zope2.CopyOrMove permission
        parent.manage_renameObjects([oldid, ], [str(newid), ])

        transaction.savepoint(optimistic=True)
        notify(ObjectModifiedEvent(self.context))

        IStatusMessage(self.request).add(
            _(u"Renamed '${oldid}' to '${newid}'.", mapping={
                u'oldid': oldid, u'newid': newid}))

        self.request.response.redirect(self.context.absolute_url())
开发者ID:urska19,项目名称:Plone-test,代码行数:32,代码来源:actions.py


示例10: updateKupu

def updateKupu(context):

    # Ordinarily, GenericSetup handlers check for the existence of XML files.
    # Here, we are not parsing an XML file, but we use this text file as a
    # flag to check that we actually meant for this import step to be run.
    # The file is found in profiles/default.

    if context.readDataFile('collective.imagetags_kupu.txt') is None:
        return

    # Add additional setup code here
    out = StringIO()
    portal = getSite()

    # Get kupu tool and update its paragraph_styles property
    kt = getToolByName(portal, 'kupu_library_tool', None)
    if kt:
        new_style = 'Show tags|img|imagetags-show'
        styles = kt.getParagraphStyles()
        if not new_style in styles:
            styles.append(new_style)
            kt.configure_kupu(parastyles=styles)
            transaction.savepoint()
            print >> out, "Updated paragraph_styles in kupu: %s" % new_style
        else:
            print >> out, "kupu already has %s in paragraph_styles" % new_style

    context.getLogger("collective.imagetags").info(out.getvalue())
    
    return out.getvalue()
开发者ID:collective,项目名称:collective.imagetags,代码行数:30,代码来源:setuphandlers.py


示例11: _install_zope

    def _install_zope(self, db):
        """Install a fresh Zope inside the new test DB. Eventually
        install an application afterwards.
        """
        # Create the "application"
        newSecurityManager(None, AccessControl.User.system)
        connection = db.open()
        root = connection.root()
        root['Application'] = OFS.Application.Application()
        app = root['Application']
        # Do a savepoint to get a _p_jar on the application
        transaction.savepoint()

        # Initialize the "application"
        try:
            TestAppInitializer(
                app, self.products, self.packages, self.users).initialize()
            self._install_application(makerequest(
                    app, environ={'SERVER_NAME': 'localhost'}))
        except Exception as error:
            # There was an error during the application 'setUp'. Abort
            # the transaction and continue, otherwise test in other
            # layers might fail because of this failure.
            transaction.abort()
            raise error
        else:
            # Close
            transaction.commit()
        finally:
            # In any case, close the connection and continue
            connection.close()
            noSecurityManager()
开发者ID:infrae,项目名称:infrae.testing,代码行数:32,代码来源:zope2.py


示例12: removeOldUIDs

def removeOldUIDs(portal, out):
    # remove temporary needed index
    uc = getToolByName(portal, UID_CATALOG)
    print >>out, 'Removing old uids\n'
    if olduididx in uc.indexes():
        uc.delIndex(olduididx)
        if olduididx in uc.schema():
            uc.delColumn(olduididx)
    count = 0
    allbrains = uc()
    for brain in allbrains:
        # Get a uid for each thingie
        obj = brain.getObject()
        objUID = getattr(aq_base(obj), olduididx, None)
        if objUID is None:
            continue  # not an old style AT
        delattr(obj, olduididx)
        obj._updateCatalog(portal)
        count += 1
        if not count % 10:
            print >>out, '.',
        # avoid eating up all RAM
        if not count % 250:
            print >>out, '*',
            transaction.savepoint(optimistic=True)

    if USE_FULL_TRANSACTIONS:
        transaction.commit()
    else:
        transaction.savepoint(optimistic=True)

    print >>out, "\n%s old UID attributes removed." % count
    print >>out, 'Done\n'
开发者ID:seanupton,项目名称:Products.Archetypes,代码行数:33,代码来源:migrations.py


示例13: renameAfterCreation

def renameAfterCreation(obj):
    # Can't rename without a subtransaction commit when using portal_factory
    transaction.savepoint(optimistic=True)
    # The id returned should be normalized already
    new_id = generateUniqueId(obj)
    obj.aq_inner.aq_parent.manage_renameObject(obj.id, new_id)
    return new_id
开发者ID:Ammy2,项目名称:Bika-LIMS,代码行数:7,代码来源:idserver.py


示例14: test_move_comments_when_content_object_is_moved

    def test_move_comments_when_content_object_is_moved(self):
        # Create two folders and a content object with a comment
        self.portal.invokeFactory(id="folder1", title="Folder 1", type_name="Folder")
        self.portal.invokeFactory(id="folder2", title="Folder 2", type_name="Folder")
        self.portal.folder1.invokeFactory(id="moveme", title="Move Me", type_name="Document")
        conversation = IConversation(self.portal.folder1.moveme)
        comment = createObject("plone.Comment")
        comment_id = conversation.addComment(comment)
        # We need to commit here so that _p_jar isn't None and move will work
        transaction.savepoint(optimistic=True)

        # Move moveme from folder1 to folder2
        cp = self.portal.folder1.manage_cutObjects(ids=("moveme",))
        self.portal.folder2.manage_pasteObjects(cp)

        # Make sure no old comment brains are
        brains = self.catalog.searchResults(
            dict(portal_type="Discussion Item", path={"query": "/".join(self.portal.folder1.getPhysicalPath())})
        )
        self.assertEquals(len(brains), 0)

        brains = self.catalog.searchResults(
            dict(portal_type="Discussion Item", path={"query": "/".join(self.portal.folder2.getPhysicalPath())})
        )
        self.assertEquals(len(brains), 1)
        self.assertEquals(brains[0].getPath(), "/plone/folder2/moveme/++conversation++default/" + str(comment_id))
开发者ID:retsu,项目名称:plone.app.discussion,代码行数:26,代码来源:test_catalog.py


示例15: testFileRenameKeepsMimeType

 def testFileRenameKeepsMimeType(self):
     self.assertEqual(self.folder.file.Format(), "application/pdf")
     self.assertEqual(self.folder.file.getFile().content_type, "application/pdf")
     transaction.savepoint(optimistic=True)  # make rename work
     self.folder.file.file_edit(id="foo")
     self.assertEqual(self.folder.foo.Format(), "application/pdf")
     self.assertEqual(self.folder.foo.getFile().content_type, "application/pdf")
开发者ID:seantis,项目名称:Products.CMFPlone,代码行数:7,代码来源:testContentTypeScripts.py


示例16: testImageRenameKeepsMimeType

 def testImageRenameKeepsMimeType(self):
     self.assertEqual(self.folder.image.Format(), 'image/gif')
     self.assertEqual(self.folder.image.content_type, 'image/gif')
     transaction.savepoint(optimistic=True)  # make rename work
     self.folder.image.image_edit(id='foo')
     self.assertEqual(self.folder.foo.Format(), 'image/gif')
     self.assertEqual(self.folder.foo.content_type, 'image/gif')
开发者ID:CGTIC,项目名称:Plone_SP,代码行数:7,代码来源:testContentTypeScripts.py


示例17: removeBrokenCacheFu

def removeBrokenCacheFu(context):
    portal = getToolByName(context, 'portal_url').getPortalObject()
    cpm = getattr(portal, 'caching_policy_manager', None)
    if cpm and cpm.getId() == 'broken':
        # If we detect a broken CacheFu install, remove it
        CACHEFU_IDS = [
            'CacheSetup_OFSCache',
            'CacheSetup_PageCache',
            'caching_policy_manager',
            'HTTPCache',
            'portal_cache_settings',
            'portal_squid',
        ]
        cpm = aq_base(cpm)
        for i in CACHEFU_IDS:
            portal._delOb(i)
        objects = portal._objects
        portal._objects = tuple(
            [i for i in objects if getattr(portal, i['id'], None)])
        sm = getSiteManager(context=portal)
        sm.unregisterUtility(component=cpm, provided=ICachingPolicyManager)
        del cpm
        transaction.savepoint(optimistic=True)
        manage_addCachingPolicyManager(portal)
        addCacheHandlers(portal)
        addCacheForResourceRegistry(portal)
        logger.info('Removed CacheSetup tools.')
开发者ID:CGTIC,项目名称:Plone_SP,代码行数:27,代码来源:alphas.py


示例18: test21_RevertObjectWithChangedIdMaintainsConsistentCatalog

    def test21_RevertObjectWithChangedIdMaintainsConsistentCatalog(self):
        portal_repo = self.portal.portal_repository
        catalog = self.portal.portal_catalog
        fol = self.portal.fol
        doc1 = fol.doc1

        # save change no 1
        doc1.setTitle("v1 of doc1")

        portal_repo.applyVersionControl(doc1, comment='first save')

        self.assertEqual(len(catalog(getId='doc1')), 1)

        doc1.setTitle("v2 of doc1")
        transaction.savepoint()
        fol.manage_renameObject('doc1', 'doc1_changed')
        doc1 = fol.doc1_changed
        doc1.reindexObject()

        self.assertEqual(len(catalog(getId='doc1')), 0)
        self.assertEqual(len(catalog(getId='doc1_changed')), 1)

        portal_repo.save(doc1, comment='second save')

        portal_repo.revert(doc1, 0)
        rev_doc = fol.doc1_changed
        self.assertEqual(rev_doc.Title(), "v1 of doc1")
        self.assertEqual(len(catalog(getId='doc1')), 0)
        self.assertEqual(len(catalog(getId='doc1_changed')), 1)
        self.assertEqual(len(catalog(Title='v1 of doc1')), 1)
开发者ID:nacho22martin,项目名称:tesis,代码行数:30,代码来源:test_IntegrationTests.py


示例19: cleanUpSkinsTool

def cleanUpSkinsTool(context):
    skins = getToolByName(context, 'portal_skins')
    # Remove directory views for directories missing on the filesystem
    for name in skins.keys():
        directory_view = skins.get(name)
        reg_key = getattr(directory_view, '_dirpath', None)
        if not reg_key:
            # not a directory view, but a persistent folder
            continue
        try:
            reg_key = _dirreg.getCurrentKeyFormat(reg_key)
            _dirreg.getDirectoryInfo(reg_key)
        except ValueError:
            skins._delObject(name)

    transaction.savepoint(optimistic=True)
    existing = skins.keys()
    # Remove no longer existing entries from skin selections
    for layer, paths in skins.selections.items():
        new_paths = []
        for name in paths.split(','):
            if name == 'plone_styles':
                # plone_styles has been moved and renamed
                new_paths.append('classic_styles')
            elif name in existing:
                new_paths.append(name)
        skins.selections[layer] = ','.join(new_paths)
开发者ID:CGTIC,项目名称:Plone_SP,代码行数:27,代码来源:alphas.py


示例20: testRenameObject

 def testRenameObject(self):
     # Renaming should not change position
     transaction.savepoint(optimistic=True)  # make rename work
     self.portal.manage_renameObjects(['bar'], ['barney'])
     self.assertEqual(self.portal.getObjectPosition('foo'), 0)
     self.assertEqual(self.portal.getObjectPosition('barney'), 1)
     self.assertEqual(self.portal.getObjectPosition('baz'), 2)
开发者ID:AlexStevens,项目名称:Products.CMFPlone,代码行数:7,代码来源:testOrderSupport.py



注:本文中的transaction.savepoint函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python transaction.Transaction类代码示例发布时间:2022-05-27
下一篇:
Python transaction.get函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap