• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python util.gc_collect函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test.lib.util.gc_collect函数的典型用法代码示例。如果您正苦于以下问题:Python gc_collect函数的具体用法?Python gc_collect怎么用?Python gc_collect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了gc_collect函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_gced_delete_on_rollback

    def test_gced_delete_on_rollback(self):
        User, users = self.classes.User, self.tables.users

        s = self.session()
        u1 = User(name='ed')
        s.add(u1)
        s.commit()

        s.delete(u1)
        u1_state = attributes.instance_state(u1)
        assert u1_state in s.identity_map.all_states()
        assert u1_state in s._deleted
        s.flush()
        assert u1_state not in s.identity_map.all_states()
        assert u1_state not in s._deleted
        del u1
        gc_collect()
        assert u1_state.obj() is None

        s.rollback()
        assert u1_state in s.identity_map.all_states()
        u1 = s.query(User).filter_by(name='ed').one()
        assert u1_state not in s.identity_map.all_states()
        assert s.scalar(users.count()) == 1
        s.delete(u1)
        s.flush()
        assert s.scalar(users.count()) == 0
        s.commit()
开发者ID:ioram7,项目名称:keystone-federado-pgid2013,代码行数:28,代码来源:test_transaction.py


示例2: test_auto_detach_on_gc_session

    def test_auto_detach_on_gc_session(self):
        users, User = self.tables.users, self.classes.User

        mapper(User, users)

        sess = Session()

        u1 = User(name='u1')
        sess.add(u1)
        sess.commit()

        # can't add u1 to Session,
        # already belongs to u2
        s2 = Session()
        assert_raises_message(
            sa.exc.InvalidRequestError,
            r".*is already attached to session",
            s2.add, u1
        )

        # garbage collect sess
        del sess
        gc_collect()

        # s2 lets it in now despite u1 having
        # session_key
        s2.add(u1)
        assert u1 in s2
开发者ID:ioram7,项目名称:keystone-federado-pgid2013,代码行数:28,代码来源:test_session.py


示例3: _test_overflow

    def _test_overflow(self, thread_count, max_overflow):
        gc_collect()

        dbapi = MockDBAPI()
        def creator():
            time.sleep(.05)
            return dbapi.connect()

        p = pool.QueuePool(creator=creator,
                           pool_size=3, timeout=2,
                           max_overflow=max_overflow)
        peaks = []
        def whammy():
            for i in range(10):
                try:
                    con = p.connect()
                    time.sleep(.005)
                    peaks.append(p.overflow())
                    con.close()
                    del con
                except tsa.exc.TimeoutError:
                    pass
        threads = []
        for i in xrange(thread_count):
            th = threading.Thread(target=whammy)
            th.start()
            threads.append(th)
        for th in threads:
            th.join()

        self.assert_(max(peaks) <= max_overflow)

        lazy_gc()
        assert not pool._refs
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:34,代码来源:test_pool.py


示例4: all

def all():
    setup()
    try:
        t, t2 = 0, 0
        def usage(label):
            now = resource.getrusage(resource.RUSAGE_SELF)
            print "%s: %0.3fs real, %0.3fs user, %0.3fs sys" % (
                label, t2 - t,
                now.ru_utime - usage.last.ru_utime,
                now.ru_stime - usage.last.ru_stime)
            usage.snap(now)
        usage.snap = lambda stats=None: setattr(
            usage, 'last', stats or resource.getrusage(resource.RUSAGE_SELF))

        session = create_session()

        gc_collect()
        usage.snap()
        t = time.clock()
        people = orm_select(session)
        t2 = time.clock()
        usage('load objects')

        gc_collect()
        usage.snap()
        t = time.clock()
        update_and_flush(session, people)
        t2 = time.clock()
        usage('update and flush')
    finally:
        metadata.drop_all()
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:31,代码来源:objupdatespeed.py


示例5: test_autoflush_expressions

    def test_autoflush_expressions(self):
        """test that an expression which is dependent on object state is
        evaluated after the session autoflushes.   This is the lambda
        inside of strategies.py lazy_clause.

        """

        users, Address, addresses, User = (
            self.tables.users,
            self.classes.Address,
            self.tables.addresses,
            self.classes.User,
        )

        mapper(User, users, properties={"addresses": relationship(Address, backref="user")})
        mapper(Address, addresses)

        sess = create_session(autoflush=True, autocommit=False)
        u = User(name="ed", addresses=[Address(email_address="foo")])
        sess.add(u)
        eq_(sess.query(Address).filter(Address.user == u).one(), Address(email_address="foo"))

        # still works after "u" is garbage collected
        sess.commit()
        sess.close()
        u = sess.query(User).get(u.id)
        q = sess.query(Address).filter(Address.user == u)
        del u
        gc_collect()
        eq_(q.one(), Address(email_address="foo"))
开发者ID:vishvananda,项目名称:sqlalchemy,代码行数:30,代码来源:test_session.py


示例6: test_weak_ref_pickled

    def test_weak_ref_pickled(self):
        users, User = self.tables.users, pickleable.User

        s = create_session()
        mapper(User, users)

        s.add(User(name="ed"))
        s.flush()
        assert not s.dirty

        user = s.query(User).one()
        user.name = "fred"
        s.expunge(user)

        u2 = pickle.loads(pickle.dumps(user))

        del user
        s.add(u2)

        del u2
        gc_collect()

        assert len(s.identity_map) == 1
        assert len(s.dirty) == 1
        assert None not in s.dirty
        s.flush()
        gc_collect()
        assert not s.dirty

        assert not s.identity_map
开发者ID:vishvananda,项目名称:sqlalchemy,代码行数:30,代码来源:test_session.py


示例7: test_expire_all

    def test_expire_all(self):
        users, Address, addresses, User = (
            self.tables.users,
            self.classes.Address,
            self.tables.addresses,
            self.classes.User,
        )

        mapper(
            User,
            users,
            properties={"addresses": relationship(Address, backref="user", lazy="joined", order_by=addresses.c.id)},
        )
        mapper(Address, addresses)

        sess = create_session()
        userlist = sess.query(User).order_by(User.id).all()
        assert self.static.user_address_result == userlist
        assert len(list(sess)) == 9
        sess.expire_all()
        gc_collect()
        assert len(list(sess)) == 4  # since addresses were gc'ed

        userlist = sess.query(User).order_by(User.id).all()
        u = userlist[1]
        eq_(self.static.user_address_result, userlist)
        assert len(list(sess)) == 9
开发者ID:onetera,项目名称:scandatatransfer,代码行数:27,代码来源:test_expire.py


示例8: test_weakref_with_cycles_o2o

    def test_weakref_with_cycles_o2o(self):
        Address, addresses, users, User = (
            self.classes.Address,
            self.tables.addresses,
            self.tables.users,
            self.classes.User,
        )

        s = sessionmaker()()
        mapper(User, users, properties={"address": relationship(Address, backref="user", uselist=False)})
        mapper(Address, addresses)
        s.add(User(name="ed", address=Address(email_address="ed1")))
        s.commit()

        user = s.query(User).options(joinedload(User.address)).one()
        user.address.user
        eq_(user, User(name="ed", address=Address(email_address="ed1")))

        del user
        gc_collect()
        assert len(s.identity_map) == 0

        user = s.query(User).options(joinedload(User.address)).one()
        user.address.email_address = "ed2"
        user.address.user  # lazyload

        del user
        gc_collect()
        assert len(s.identity_map) == 2

        s.commit()
        user = s.query(User).options(joinedload(User.address)).one()
        eq_(user, User(name="ed", address=Address(email_address="ed2")))
开发者ID:vishvananda,项目名称:sqlalchemy,代码行数:33,代码来源:test_session.py


示例9: profile

    def profile(*args):
        gc_collect()
        samples = [0 for x in range(0, 50)]
        for x in range(0, 50):
            func(*args)
            gc_collect()
            samples[x] = len(gc.get_objects())

        print "sample gc sizes:", samples

        assert len(_sessions) == 0

        for x in samples[-4:]:
            if x != samples[-5]:
                flatline = False
                break
        else:
            flatline = True

        # object count is bigger than when it started
        if not flatline and samples[-1] > samples[0]:
            for x in samples[1:-2]:
                # see if a spike bigger than the endpoint exists
                if x > samples[-1]:
                    break
            else:
                assert False, repr(samples) + " " + repr(flatline)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:27,代码来源:test_memusage.py


示例10: go

        def go():
            obj = [
                Foo({'a':1}),
                Foo({'b':1}),
                Foo({'c':1}),
                Foo({'d':1}),
                Foo({'e':1}),
                Foo({'f':1}),
                Foo({'g':1}),
                Foo({'h':1}),
                Foo({'i':1}),
                Foo({'j':1}),
                Foo({'k':1}),
                Foo({'l':1}),
            ]

            session.add_all(obj)
            session.commit()

            testing.eq_(len(session.identity_map._mutable_attrs), 12)
            testing.eq_(len(session.identity_map), 12)
            obj = None
            gc_collect()
            testing.eq_(len(session.identity_map._mutable_attrs), 0)
            testing.eq_(len(session.identity_map), 0)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:25,代码来源:test_memusage.py


示例11: test_invalidate_trans

    def test_invalidate_trans(self):
        conn = db.connect()
        trans = conn.begin()
        dbapi.shutdown()
        try:
            conn.execute(select([1]))
            assert False
        except tsa.exc.DBAPIError:
            pass

        # assert was invalidated

        gc_collect()
        assert len(dbapi.connections) == 0
        assert not conn.closed
        assert conn.invalidated
        assert trans.is_active
        assert_raises_message(
            tsa.exc.StatementError,
            "Can't reconnect until invalid transaction is rolled back",
            conn.execute,
            select([1]),
        )
        assert trans.is_active
        try:
            trans.commit()
            assert False
        except tsa.exc.InvalidRequestError, e:
            assert str(e) == "Can't reconnect until invalid transaction is " "rolled back"
开发者ID:onetera,项目名称:scandatatransfer,代码行数:29,代码来源:test_reconnect.py


示例12: test_conn_reusable

    def test_conn_reusable(self):
        conn = db.connect()

        conn.execute(select([1]))

        assert len(dbapi.connections) == 1

        dbapi.shutdown()

        # raises error
        try:
            conn.execute(select([1]))
            assert False
        except tsa.exc.DBAPIError:
            pass

        assert not conn.closed
        assert conn.invalidated

        # ensure all connections closed (pool was recycled)
        gc_collect()
        assert len(dbapi.connections) == 0

        # test reconnects
        conn.execute(select([1]))
        assert not conn.invalidated
        assert len(dbapi.connections) == 1
开发者ID:onetera,项目名称:scandatatransfer,代码行数:27,代码来源:test_reconnect.py


示例13: test_copy

 def test_copy(self):
     mapper(Parent, self.parents,
            properties=dict(children=relationship(Child)))
     mapper(Child, self.children)
     p = Parent('p1')
     p.kids.extend(['c1', 'c2'])
     p_copy = copy.copy(p)
     del p
     gc_collect()
     assert set(p_copy.kids) == set(['c1', 'c2']), p.kids
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:10,代码来源:test_associationproxy.py


示例14: test_subclass

    def test_subclass(self):
        class SubTarget(self.Target):
            pass

        st = SubTarget()
        st.dispatch.some_event(1, 2)
        del st
        del SubTarget
        gc_collect()
        eq_(self.Target.__subclasses__(), [])
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:10,代码来源:test_events.py


示例15: test_weak_single

    def test_weak_single(self):
        data, wim = self._fixture()

        assert len(data) == len(wim) == len(wim.by_id)

        oid = id(data[0])
        del data[0]
        gc_collect()

        assert len(data) == len(wim) == len(wim.by_id)
        assert oid not in wim.by_id
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:11,代码来源:test_utils.py


示例16: test_weak_clear

    def test_weak_clear(self):
        data, wim = self._fixture()

        assert len(data) == len(wim) == len(wim.by_id)

        del data[:]
        gc_collect()

        eq_(wim, {})
        eq_(wim.by_id, {})
        eq_(wim._weakrefs, {})
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:11,代码来源:test_utils.py


示例17: _profile_cProfile

def _profile_cProfile(filename, fn, *args, **kw):
    import cProfile, pstats, time

    load_stats = lambda: pstats.Stats(filename)
    gc_collect()

    began = time.time()
    cProfile.runctx('result = fn(*args, **kw)', globals(), locals(),
                    filename=filename)
    ended = time.time()

    return ended - began, load_stats, locals()['result']
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:12,代码来源:profiling.py


示例18: test_stale_state_positive_gc

    def test_stale_state_positive_gc(self):
        User = self.classes.User
        s, u1, a1 = self._fixture()

        s.expunge(u1)
        del u1
        gc_collect()

        u1 = s.query(User).first()
        u1.addresses.remove(a1)

        self._assert_not_hasparent(a1)
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:12,代码来源:test_hasparent.py


示例19: test_reconnect

    def test_reconnect(self):
        """test that an 'is_disconnect' condition will invalidate the
        connection, and additionally dispose the previous connection
        pool and recreate."""

        pid = id(db.pool)

        # make a connection

        conn = db.connect()

        # connection works

        conn.execute(select([1]))

        # create a second connection within the pool, which we'll ensure
        # also goes away

        conn2 = db.connect()
        conn2.close()

        # two connections opened total now

        assert len(dbapi.connections) == 2

        # set it to fail

        dbapi.shutdown()
        try:
            conn.execute(select([1]))
            assert False
        except tsa.exc.DBAPIError:
            pass

        # assert was invalidated

        assert not conn.closed
        assert conn.invalidated

        # close shouldnt break

        conn.close()
        assert id(db.pool) != pid

        # ensure all connections closed (pool was recycled)

        gc_collect()
        assert len(dbapi.connections) == 0
        conn = db.connect()
        conn.execute(select([1]))
        conn.close()
        assert len(dbapi.connections) == 1
开发者ID:onetera,项目名称:scandatatransfer,代码行数:52,代码来源:test_reconnect.py


示例20: test_weakref_kaboom

 def test_weakref_kaboom(self):
     p = self._queuepool_fixture(pool_size=3,
                        max_overflow=-1, use_threadlocal=True)
     c1 = p.connect()
     c2 = p.connect()
     c1.close()
     c2 = None
     del c1
     del c2
     gc_collect()
     assert p.checkedout() == 0
     c3 = p.connect()
     assert c3 is not None
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:13,代码来源:test_pool.py



注:本文中的test.lib.util.gc_collect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python libregrtest._parse_args函数代码示例发布时间:2022-05-27
下一篇:
Python testing.eq_函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap