本文整理汇总了Python中test.lib.engines.testing_engine函数的典型用法代码示例。如果您正苦于以下问题:Python testing_engine函数的具体用法?Python testing_engine怎么用?Python testing_engine使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了testing_engine函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_engine_param_stays
def test_engine_param_stays(self):
eng = testing_engine()
isolation_level = eng.dialect.get_isolation_level(eng.connect().connection)
level = self._non_default_isolation_level()
ne_(isolation_level, level)
eng = testing_engine(options=dict(isolation_level=level))
eq_(
eng.dialect.get_isolation_level(eng.connect().connection),
level
)
# check that it stays
conn = eng.connect()
eq_(
eng.dialect.get_isolation_level(conn.connection),
level
)
conn.close()
conn = eng.connect()
eq_(
eng.dialect.get_isolation_level(conn.connection),
level
)
conn.close()
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:28,代码来源:test_transaction.py
示例2: test_bind_arguments
def test_bind_arguments(self):
users, Address, addresses, User = (
self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User,
)
mapper(User, users)
mapper(Address, addresses)
e1 = engines.testing_engine()
e2 = engines.testing_engine()
e3 = engines.testing_engine()
sess = Session(e3)
sess.bind_mapper(User, e1)
sess.bind_mapper(Address, e2)
assert sess.connection().engine is e3
assert sess.connection(bind=e1).engine is e1
assert sess.connection(mapper=Address, bind=e1).engine is e1
assert sess.connection(mapper=Address).engine is e2
assert sess.connection(clause=addresses.select()).engine is e2
assert sess.connection(mapper=User, clause=addresses.select()).engine is e1
assert sess.connection(mapper=User, clause=addresses.select(), bind=e2).engine is e2
sess.close()
开发者ID:vishvananda,项目名称:sqlalchemy,代码行数:28,代码来源:test_session.py
示例3: test_native_odbc_execute
def test_native_odbc_execute(self):
t1 = Table('t1', MetaData(), Column('c1', Integer))
dbapi = MockDBAPI()
engine = engines.testing_engine('mssql+mxodbc://localhost',
options={'module': dbapi, '_initialize': False})
conn = engine.connect()
# crud: uses execute
conn.execute(t1.insert().values(c1='foo'))
conn.execute(t1.delete().where(t1.c.c1 == 'foo'))
conn.execute(t1.update().where(t1.c.c1 == 'foo').values(c1='bar'
))
# select: uses executedirect
conn.execute(t1.select())
# manual flagging
conn.execution_options(native_odbc_execute=True).\
execute(t1.select())
conn.execution_options(native_odbc_execute=False).\
execute(t1.insert().values(c1='foo'
))
eq_(dbapi.log, [
'execute',
'execute',
'execute',
'executedirect',
'execute',
'executedirect',
])
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:33,代码来源:test_mxodbc.py
示例4: test_per_connection
def test_per_connection(self):
from sqlalchemy.pool import QueuePool
eng = testing_engine(options=dict(
poolclass=QueuePool,
pool_size=2, max_overflow=0))
c1 = eng.connect()
c1 = c1.execution_options(
isolation_level=self._non_default_isolation_level()
)
c2 = eng.connect()
eq_(
eng.dialect.get_isolation_level(c1.connection),
self._non_default_isolation_level()
)
eq_(
eng.dialect.get_isolation_level(c2.connection),
self._default_isolation_level()
)
c1.close()
c2.close()
c3 = eng.connect()
eq_(
eng.dialect.get_isolation_level(c3.connection),
self._default_isolation_level()
)
c4 = eng.connect()
eq_(
eng.dialect.get_isolation_level(c4.connection),
self._default_isolation_level()
)
c3.close()
c4.close()
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:34,代码来源:test_transaction.py
示例5: go
def go():
engine = engines.testing_engine(
options={'logging_name':'FOO',
'pool_logging_name':'BAR',
'use_reaper':False}
)
sess = create_session(bind=engine)
a1 = A(col2="a1")
a2 = A(col2="a2")
a3 = A(col2="a3")
a1.bs.append(B(col2="b1"))
a1.bs.append(B(col2="b2"))
a3.bs.append(B(col2="b3"))
for x in [a1,a2,a3]:
sess.add(x)
sess.flush()
sess.expunge_all()
alist = sess.query(A).all()
eq_(
[
A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
A(col2="a2", bs=[]),
A(col2="a3", bs=[B(col2="b3")])
],
alist)
for a in alist:
sess.delete(a)
sess.flush()
sess.close()
engine.dispose()
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:33,代码来源:test_memusage.py
示例6: test_retval_flag
def test_retval_flag(self):
canary = []
def tracker(name):
def go(conn, *args, **kw):
canary.append(name)
return go
def execute(conn, clauseelement, multiparams, params):
canary.append('execute')
return clauseelement, multiparams, params
def cursor_execute(conn, cursor, statement,
parameters, context, executemany):
canary.append('cursor_execute')
return statement, parameters
engine = engines.testing_engine()
assert_raises(
tsa.exc.ArgumentError,
event.listen, engine, "begin", tracker("begin"), retval=True
)
event.listen(engine, "before_execute", execute, retval=True)
event.listen(engine, "before_cursor_execute", cursor_execute, retval=True)
engine.execute(select([1]))
eq_(
canary, ['execute', 'cursor_execute']
)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:29,代码来源:test_execute.py
示例7: test_transactional
def test_transactional(self):
canary = []
def tracker(name):
def go(conn, *args, **kw):
canary.append(name)
return go
engine = engines.testing_engine()
event.listen(engine, 'before_execute', tracker('execute'))
event.listen(engine, 'before_cursor_execute', tracker('cursor_execute'))
event.listen(engine, 'begin', tracker('begin'))
event.listen(engine, 'commit', tracker('commit'))
event.listen(engine, 'rollback', tracker('rollback'))
conn = engine.connect()
trans = conn.begin()
conn.execute(select([1]))
trans.rollback()
trans = conn.begin()
conn.execute(select([1]))
trans.commit()
eq_(canary, [
'begin', 'execute', 'cursor_execute', 'rollback',
'begin', 'execute', 'cursor_execute', 'commit',
])
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:26,代码来源:test_execute.py
示例8: test_explode_in_initializer
def test_explode_in_initializer(self):
engine = engines.testing_engine()
def broken_initialize(connection):
connection.execute("select fake_stuff from _fake_table")
engine.dialect.initialize = broken_initialize
# raises a DBAPIError, not an AttributeError
assert_raises(exc.DBAPIError, engine.connect)
# dispose connections so we get a new one on
# next go
engine.dispose()
p1 = engine.pool
def is_disconnect(e, conn, cursor):
return True
engine.dialect.is_disconnect = is_disconnect
# invalidate() also doesn't screw up
assert_raises(exc.DBAPIError, engine.connect)
# pool was recreated
assert engine.pool is not p1
开发者ID:onetera,项目名称:scandatatransfer,代码行数:27,代码来源:test_reconnect.py
示例9: test_twophase
def test_twophase(self):
users, Address, addresses, User = (self.tables.users,
self.classes.Address,
self.tables.addresses,
self.classes.User)
# TODO: mock up a failure condition here
# to ensure a rollback succeeds
mapper(User, users)
mapper(Address, addresses)
engine2 = engines.testing_engine()
sess = create_session(autocommit=True, autoflush=False,
twophase=True)
sess.bind_mapper(User, testing.db)
sess.bind_mapper(Address, engine2)
sess.begin()
u1 = User(name='u1')
a1 = Address(email_address='[email protected]')
sess.add_all((u1, a1))
sess.commit()
sess.close()
engine2.dispose()
assert users.count().scalar() == 1
assert addresses.count().scalar() == 1
开发者ID:ioram7,项目名称:keystone-federado-pgid2013,代码行数:25,代码来源:test_transaction.py
示例10: test_levels
def test_levels(self):
e1 = engines.testing_engine()
eq_(e1._should_log_info(), False)
eq_(e1._should_log_debug(), False)
eq_(e1.logger.isEnabledFor(logging.INFO), False)
eq_(e1.logger.getEffectiveLevel(), logging.WARN)
e1.echo = True
eq_(e1._should_log_info(), True)
eq_(e1._should_log_debug(), False)
eq_(e1.logger.isEnabledFor(logging.INFO), True)
eq_(e1.logger.getEffectiveLevel(), logging.INFO)
e1.echo = 'debug'
eq_(e1._should_log_info(), True)
eq_(e1._should_log_debug(), True)
eq_(e1.logger.isEnabledFor(logging.DEBUG), True)
eq_(e1.logger.getEffectiveLevel(), logging.DEBUG)
e1.echo = False
eq_(e1._should_log_info(), False)
eq_(e1._should_log_debug(), False)
eq_(e1.logger.isEnabledFor(logging.INFO), False)
eq_(e1.logger.getEffectiveLevel(), logging.WARN)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:25,代码来源:test_execute.py
示例11: test_transactional_advanced
def test_transactional_advanced(self):
canary = []
def tracker(name):
def go(*args, **kw):
canary.append(name)
return go
engine = engines.testing_engine()
for name in ['begin', 'savepoint',
'rollback_savepoint', 'release_savepoint',
'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']:
event.listen(engine, '%s' % name, tracker(name))
conn = engine.connect()
trans = conn.begin()
trans2 = conn.begin_nested()
conn.execute(select([1]))
trans2.rollback()
trans2 = conn.begin_nested()
conn.execute(select([1]))
trans2.commit()
trans.rollback()
trans = conn.begin_twophase()
conn.execute(select([1]))
trans.prepare()
trans.commit()
eq_(canary, ['begin', 'savepoint',
'rollback_savepoint', 'savepoint', 'release_savepoint',
'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']
)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:35,代码来源:test_execute.py
示例12: test_ad_hoc_types
def test_ad_hoc_types(self):
"""test storage of bind processors, result processors
in dialect-wide registry."""
from sqlalchemy.dialects import mysql, postgresql, sqlite
from sqlalchemy import types
eng = engines.testing_engine()
for args in (
(types.Integer, ),
(types.String, ),
(types.PickleType, ),
(types.Enum, 'a', 'b', 'c'),
(sqlite.DATETIME, ),
(postgresql.ENUM, 'a', 'b', 'c'),
(types.Interval, ),
(postgresql.INTERVAL, ),
(mysql.VARCHAR, ),
):
@profile_memory
def go():
type_ = args[0](*args[1:])
bp = type_._cached_bind_processor(eng.dialect)
rp = type_._cached_result_processor(eng.dialect, 0)
go()
assert not eng.dialect._type_memos
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:27,代码来源:test_memusage.py
示例13: _named_engine
def _named_engine(self, **kw):
options = {
'logging_name':'myenginename',
'pool_logging_name':'mypoolname'
}
options.update(kw)
return engines.testing_engine(options=options)
开发者ID:onetera,项目名称:scandatatransfer,代码行数:7,代码来源:test_execute.py
示例14: setUp
def setUp(self):
global db1, db2, db3, db4, weather_locations, weather_reports
try:
db1 = testing_engine('sqlite:///shard1.db', options=dict(pool_threadlocal=True))
except ImportError:
raise SkipTest('Requires sqlite')
db2 = testing_engine('sqlite:///shard2.db')
db3 = testing_engine('sqlite:///shard3.db')
db4 = testing_engine('sqlite:///shard4.db')
meta = MetaData()
ids = Table('ids', meta,
Column('nextid', Integer, nullable=False))
def id_generator(ctx):
# in reality, might want to use a separate transaction for this.
c = db1.contextual_connect()
nextid = c.execute(ids.select(for_update=True)).scalar()
c.execute(ids.update(values={ids.c.nextid : ids.c.nextid + 1}))
return nextid
weather_locations = Table("weather_locations", meta,
Column('id', Integer, primary_key=True, default=id_generator),
Column('continent', String(30), nullable=False),
Column('city', String(50), nullable=False)
)
weather_reports = Table(
'weather_reports',
meta,
Column('id', Integer, primary_key=True),
Column('location_id', Integer,
ForeignKey('weather_locations.id')),
Column('temperature', Float),
Column('report_time', DateTime,
default=datetime.datetime.now),
)
for db in (db1, db2, db3, db4):
meta.create_all(db)
db1.execute(ids.insert(), nextid=1)
self.setup_session()
self.setup_mappers()
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:47,代码来源:test_horizontal_shard.py
示例15: test_invalid_level
def test_invalid_level(self):
eng = testing_engine(options=dict(isolation_level='FOO'))
assert_raises_message(
exc.ArgumentError,
"Invalid value '%s' for isolation_level. "
"Valid isolation levels for %s are %s" %
("FOO", eng.dialect.name, ", ".join(eng.dialect._isolation_lookup)),
eng.connect)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:8,代码来源:test_transaction.py
示例16: test_per_engine_independence
def test_per_engine_independence(self):
e1 = testing_engine(config.db_url)
e2 = testing_engine(config.db_url)
canary = []
def before_exec(conn, stmt, *arg):
canary.append(stmt)
event.listen(e1, "before_execute", before_exec)
s1 = select([1])
s2 = select([2])
e1.execute(s1)
e2.execute(s2)
eq_(canary, [s1])
event.listen(e2, "before_execute", before_exec)
e1.execute(s1)
e2.execute(s2)
eq_(canary, [s1, s1, s2])
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:17,代码来源:test_execute.py
示例17: setup_class
def setup_class(cls):
global users, metadata, tlengine
tlengine = testing_engine(options=dict(strategy='threadlocal'))
metadata = MetaData()
users = Table('query_users', metadata, Column('user_id', INT,
Sequence('query_users_id_seq', optional=True),
primary_key=True), Column('user_name',
VARCHAR(20)), test_needs_acid=True)
metadata.create_all(tlengine)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:9,代码来源:test_transaction.py
示例18: testing_engine
def testing_engine(self):
e = engines.testing_engine()
# do an initial execute to clear out 'first connect'
# messages
e.execute(select([10])).close()
self.buf.flush()
return e
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:9,代码来源:test_execute.py
示例19: setup
def setup(self):
self.eng = engines.testing_engine(options={'echo':True})
self.eng.execute("create table foo (data string)")
self.buf = logging.handlers.BufferingHandler(100)
for log in [
logging.getLogger('sqlalchemy.engine'),
logging.getLogger('sqlalchemy.pool')
]:
log.addHandler(self.buf)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:9,代码来源:test_execute.py
示例20: test_prepare_no_trans
def test_prepare_no_trans(self):
tlengine = testing_engine(options=dict(strategy="threadlocal"))
# shouldn't fail
tlengine.prepare()
tlengine.begin()
tlengine.rollback()
# shouldn't fail
tlengine.prepare()
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:11,代码来源:test_transaction.py
注:本文中的test.lib.engines.testing_engine函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论