本文整理汇总了Python中test.lib.util.gc_collect函数的典型用法代码示例。如果您正苦于以下问题:Python gc_collect函数的具体用法?Python gc_collect怎么用?Python gc_collect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了gc_collect函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_gced_delete_on_rollback
def test_gced_delete_on_rollback(self):
User, users = self.classes.User, self.tables.users
s = self.session()
u1 = User(name='ed')
s.add(u1)
s.commit()
s.delete(u1)
u1_state = attributes.instance_state(u1)
assert u1_state in s.identity_map.all_states()
assert u1_state in s._deleted
s.flush()
assert u1_state not in s.identity_map.all_states()
assert u1_state not in s._deleted
del u1
gc_collect()
assert u1_state.obj() is None
s.rollback()
assert u1_state in s.identity_map.all_states()
u1 = s.query(User).filter_by(name='ed').one()
assert u1_state not in s.identity_map.all_states()
assert s.scalar(users.count()) == 1
s.delete(u1)
s.flush()
assert s.scalar(users.count()) == 0
s.commit()
开发者ID:ioram7,项目名称:keystone-federado-pgid2013,代码行数:28,代码来源:test_transaction.py
示例2: test_auto_detach_on_gc_session
def test_auto_detach_on_gc_session(self):
users, User = self.tables.users, self.classes.User
mapper(User, users)
sess = Session()
u1 = User(name='u1')
sess.add(u1)
sess.commit()
# can't add u1 to Session,
# already belongs to u2
s2 = Session()
assert_raises_message(
sa.exc.InvalidRequestError,
r".*is already attached to session",
s2.add, u1
)
# garbage collect sess
del sess
gc_collect()
# s2 lets it in now despite u1 having
# session_key
s2.add(u1)
assert u1 in s2
开发者ID:ioram7,项目名称:keystone-federado-pgid2013,代码行数:28,代码来源:test_session.py
示例3: _test_overflow
def _test_overflow(self, thread_count, max_overflow):
gc_collect()
dbapi = MockDBAPI()
def creator():
time.sleep(.05)
return dbapi.connect()
p = pool.QueuePool(creator=creator,
pool_size=3, timeout=2,
max_overflow=max_overflow)
peaks = []
def whammy():
for i in range(10):
try:
con = p.connect()
time.sleep(.005)
peaks.append(p.overflow())
con.close()
del con
except tsa.exc.TimeoutError:
pass
threads = []
for i in xrange(thread_count):
th = threading.Thread(target=whammy)
th.start()
threads.append(th)
for th in threads:
th.join()
self.assert_(max(peaks) <= max_overflow)
lazy_gc()
assert not pool._refs
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:34,代码来源:test_pool.py
示例4: all
def all():
setup()
try:
t, t2 = 0, 0
def usage(label):
now = resource.getrusage(resource.RUSAGE_SELF)
print "%s: %0.3fs real, %0.3fs user, %0.3fs sys" % (
label, t2 - t,
now.ru_utime - usage.last.ru_utime,
now.ru_stime - usage.last.ru_stime)
usage.snap(now)
usage.snap = lambda stats=None: setattr(
usage, 'last', stats or resource.getrusage(resource.RUSAGE_SELF))
session = create_session()
gc_collect()
usage.snap()
t = time.clock()
people = orm_select(session)
t2 = time.clock()
usage('load objects')
gc_collect()
usage.snap()
t = time.clock()
update_and_flush(session, people)
t2 = time.clock()
usage('update and flush')
finally:
metadata.drop_all()
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:31,代码来源:objupdatespeed.py
示例5: test_autoflush_expressions
def test_autoflush_expressions(self):
"""test that an expression which is dependent on object state is
evaluated after the session autoflushes. This is the lambda
inside of strategies.py lazy_clause.
"""
users, Address, addresses, User = (
self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User,
)
mapper(User, users, properties={"addresses": relationship(Address, backref="user")})
mapper(Address, addresses)
sess = create_session(autoflush=True, autocommit=False)
u = User(name="ed", addresses=[Address(email_address="foo")])
sess.add(u)
eq_(sess.query(Address).filter(Address.user == u).one(), Address(email_address="foo"))
# still works after "u" is garbage collected
sess.commit()
sess.close()
u = sess.query(User).get(u.id)
q = sess.query(Address).filter(Address.user == u)
del u
gc_collect()
eq_(q.one(), Address(email_address="foo"))
开发者ID:vishvananda,项目名称:sqlalchemy,代码行数:30,代码来源:test_session.py
示例6: test_weak_ref_pickled
def test_weak_ref_pickled(self):
users, User = self.tables.users, pickleable.User
s = create_session()
mapper(User, users)
s.add(User(name="ed"))
s.flush()
assert not s.dirty
user = s.query(User).one()
user.name = "fred"
s.expunge(user)
u2 = pickle.loads(pickle.dumps(user))
del user
s.add(u2)
del u2
gc_collect()
assert len(s.identity_map) == 1
assert len(s.dirty) == 1
assert None not in s.dirty
s.flush()
gc_collect()
assert not s.dirty
assert not s.identity_map
开发者ID:vishvananda,项目名称:sqlalchemy,代码行数:30,代码来源:test_session.py
示例7: test_expire_all
def test_expire_all(self):
users, Address, addresses, User = (
self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User,
)
mapper(
User,
users,
properties={"addresses": relationship(Address, backref="user", lazy="joined", order_by=addresses.c.id)},
)
mapper(Address, addresses)
sess = create_session()
userlist = sess.query(User).order_by(User.id).all()
assert self.static.user_address_result == userlist
assert len(list(sess)) == 9
sess.expire_all()
gc_collect()
assert len(list(sess)) == 4 # since addresses were gc'ed
userlist = sess.query(User).order_by(User.id).all()
u = userlist[1]
eq_(self.static.user_address_result, userlist)
assert len(list(sess)) == 9
开发者ID:onetera,项目名称:scandatatransfer,代码行数:27,代码来源:test_expire.py
示例8: test_weakref_with_cycles_o2o
def test_weakref_with_cycles_o2o(self):
Address, addresses, users, User = (
self.classes.Address,
self.tables.addresses,
self.tables.users,
self.classes.User,
)
s = sessionmaker()()
mapper(User, users, properties={"address": relationship(Address, backref="user", uselist=False)})
mapper(Address, addresses)
s.add(User(name="ed", address=Address(email_address="ed1")))
s.commit()
user = s.query(User).options(joinedload(User.address)).one()
user.address.user
eq_(user, User(name="ed", address=Address(email_address="ed1")))
del user
gc_collect()
assert len(s.identity_map) == 0
user = s.query(User).options(joinedload(User.address)).one()
user.address.email_address = "ed2"
user.address.user # lazyload
del user
gc_collect()
assert len(s.identity_map) == 2
s.commit()
user = s.query(User).options(joinedload(User.address)).one()
eq_(user, User(name="ed", address=Address(email_address="ed2")))
开发者ID:vishvananda,项目名称:sqlalchemy,代码行数:33,代码来源:test_session.py
示例9: profile
def profile(*args):
gc_collect()
samples = [0 for x in range(0, 50)]
for x in range(0, 50):
func(*args)
gc_collect()
samples[x] = len(gc.get_objects())
print "sample gc sizes:", samples
assert len(_sessions) == 0
for x in samples[-4:]:
if x != samples[-5]:
flatline = False
break
else:
flatline = True
# object count is bigger than when it started
if not flatline and samples[-1] > samples[0]:
for x in samples[1:-2]:
# see if a spike bigger than the endpoint exists
if x > samples[-1]:
break
else:
assert False, repr(samples) + " " + repr(flatline)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:27,代码来源:test_memusage.py
示例10: go
def go():
obj = [
Foo({'a':1}),
Foo({'b':1}),
Foo({'c':1}),
Foo({'d':1}),
Foo({'e':1}),
Foo({'f':1}),
Foo({'g':1}),
Foo({'h':1}),
Foo({'i':1}),
Foo({'j':1}),
Foo({'k':1}),
Foo({'l':1}),
]
session.add_all(obj)
session.commit()
testing.eq_(len(session.identity_map._mutable_attrs), 12)
testing.eq_(len(session.identity_map), 12)
obj = None
gc_collect()
testing.eq_(len(session.identity_map._mutable_attrs), 0)
testing.eq_(len(session.identity_map), 0)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:25,代码来源:test_memusage.py
示例11: test_invalidate_trans
def test_invalidate_trans(self):
conn = db.connect()
trans = conn.begin()
dbapi.shutdown()
try:
conn.execute(select([1]))
assert False
except tsa.exc.DBAPIError:
pass
# assert was invalidated
gc_collect()
assert len(dbapi.connections) == 0
assert not conn.closed
assert conn.invalidated
assert trans.is_active
assert_raises_message(
tsa.exc.StatementError,
"Can't reconnect until invalid transaction is rolled back",
conn.execute,
select([1]),
)
assert trans.is_active
try:
trans.commit()
assert False
except tsa.exc.InvalidRequestError, e:
assert str(e) == "Can't reconnect until invalid transaction is " "rolled back"
开发者ID:onetera,项目名称:scandatatransfer,代码行数:29,代码来源:test_reconnect.py
示例12: test_conn_reusable
def test_conn_reusable(self):
conn = db.connect()
conn.execute(select([1]))
assert len(dbapi.connections) == 1
dbapi.shutdown()
# raises error
try:
conn.execute(select([1]))
assert False
except tsa.exc.DBAPIError:
pass
assert not conn.closed
assert conn.invalidated
# ensure all connections closed (pool was recycled)
gc_collect()
assert len(dbapi.connections) == 0
# test reconnects
conn.execute(select([1]))
assert not conn.invalidated
assert len(dbapi.connections) == 1
开发者ID:onetera,项目名称:scandatatransfer,代码行数:27,代码来源:test_reconnect.py
示例13: test_copy
def test_copy(self):
mapper(Parent, self.parents,
properties=dict(children=relationship(Child)))
mapper(Child, self.children)
p = Parent('p1')
p.kids.extend(['c1', 'c2'])
p_copy = copy.copy(p)
del p
gc_collect()
assert set(p_copy.kids) == set(['c1', 'c2']), p.kids
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:10,代码来源:test_associationproxy.py
示例14: test_subclass
def test_subclass(self):
class SubTarget(self.Target):
pass
st = SubTarget()
st.dispatch.some_event(1, 2)
del st
del SubTarget
gc_collect()
eq_(self.Target.__subclasses__(), [])
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:10,代码来源:test_events.py
示例15: test_weak_single
def test_weak_single(self):
data, wim = self._fixture()
assert len(data) == len(wim) == len(wim.by_id)
oid = id(data[0])
del data[0]
gc_collect()
assert len(data) == len(wim) == len(wim.by_id)
assert oid not in wim.by_id
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:11,代码来源:test_utils.py
示例16: test_weak_clear
def test_weak_clear(self):
data, wim = self._fixture()
assert len(data) == len(wim) == len(wim.by_id)
del data[:]
gc_collect()
eq_(wim, {})
eq_(wim.by_id, {})
eq_(wim._weakrefs, {})
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:11,代码来源:test_utils.py
示例17: _profile_cProfile
def _profile_cProfile(filename, fn, *args, **kw):
import cProfile, pstats, time
load_stats = lambda: pstats.Stats(filename)
gc_collect()
began = time.time()
cProfile.runctx('result = fn(*args, **kw)', globals(), locals(),
filename=filename)
ended = time.time()
return ended - began, load_stats, locals()['result']
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:12,代码来源:profiling.py
示例18: test_stale_state_positive_gc
def test_stale_state_positive_gc(self):
User = self.classes.User
s, u1, a1 = self._fixture()
s.expunge(u1)
del u1
gc_collect()
u1 = s.query(User).first()
u1.addresses.remove(a1)
self._assert_not_hasparent(a1)
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:12,代码来源:test_hasparent.py
示例19: test_reconnect
def test_reconnect(self):
"""test that an 'is_disconnect' condition will invalidate the
connection, and additionally dispose the previous connection
pool and recreate."""
pid = id(db.pool)
# make a connection
conn = db.connect()
# connection works
conn.execute(select([1]))
# create a second connection within the pool, which we'll ensure
# also goes away
conn2 = db.connect()
conn2.close()
# two connections opened total now
assert len(dbapi.connections) == 2
# set it to fail
dbapi.shutdown()
try:
conn.execute(select([1]))
assert False
except tsa.exc.DBAPIError:
pass
# assert was invalidated
assert not conn.closed
assert conn.invalidated
# close shouldnt break
conn.close()
assert id(db.pool) != pid
# ensure all connections closed (pool was recycled)
gc_collect()
assert len(dbapi.connections) == 0
conn = db.connect()
conn.execute(select([1]))
conn.close()
assert len(dbapi.connections) == 1
开发者ID:onetera,项目名称:scandatatransfer,代码行数:52,代码来源:test_reconnect.py
示例20: test_weakref_kaboom
def test_weakref_kaboom(self):
p = self._queuepool_fixture(pool_size=3,
max_overflow=-1, use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
c1.close()
c2 = None
del c1
del c2
gc_collect()
assert p.checkedout() == 0
c3 = p.connect()
assert c3 is not None
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:13,代码来源:test_pool.py
注:本文中的test.lib.util.gc_collect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论