• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python engines.testing_engine函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test.lib.engines.testing_engine函数的典型用法代码示例。如果您正苦于以下问题:Python testing_engine函数的具体用法?Python testing_engine怎么用?Python testing_engine使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了testing_engine函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_engine_param_stays

    def test_engine_param_stays(self):

        eng = testing_engine()
        isolation_level = eng.dialect.get_isolation_level(eng.connect().connection)
        level = self._non_default_isolation_level()

        ne_(isolation_level, level)

        eng = testing_engine(options=dict(isolation_level=level))
        eq_(
            eng.dialect.get_isolation_level(eng.connect().connection),
            level
        )

        # check that it stays
        conn = eng.connect()
        eq_(
            eng.dialect.get_isolation_level(conn.connection),
            level
        )
        conn.close()

        conn = eng.connect()
        eq_(
            eng.dialect.get_isolation_level(conn.connection),
            level
        )
        conn.close()
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:28,代码来源:test_transaction.py


示例2: test_bind_arguments

    def test_bind_arguments(self):
        users, Address, addresses, User = (
            self.tables.users,
            self.classes.Address,
            self.tables.addresses,
            self.classes.User,
        )

        mapper(User, users)
        mapper(Address, addresses)

        e1 = engines.testing_engine()
        e2 = engines.testing_engine()
        e3 = engines.testing_engine()

        sess = Session(e3)
        sess.bind_mapper(User, e1)
        sess.bind_mapper(Address, e2)

        assert sess.connection().engine is e3
        assert sess.connection(bind=e1).engine is e1
        assert sess.connection(mapper=Address, bind=e1).engine is e1
        assert sess.connection(mapper=Address).engine is e2
        assert sess.connection(clause=addresses.select()).engine is e2
        assert sess.connection(mapper=User, clause=addresses.select()).engine is e1
        assert sess.connection(mapper=User, clause=addresses.select(), bind=e2).engine is e2

        sess.close()
开发者ID:vishvananda,项目名称:sqlalchemy,代码行数:28,代码来源:test_session.py


示例3: test_native_odbc_execute

    def test_native_odbc_execute(self):
        t1 = Table('t1', MetaData(), Column('c1', Integer))
        dbapi = MockDBAPI()
        engine = engines.testing_engine('mssql+mxodbc://localhost',
                options={'module': dbapi, '_initialize': False})
        conn = engine.connect()

        # crud: uses execute

        conn.execute(t1.insert().values(c1='foo'))
        conn.execute(t1.delete().where(t1.c.c1 == 'foo'))
        conn.execute(t1.update().where(t1.c.c1 == 'foo').values(c1='bar'
                     ))

        # select: uses executedirect

        conn.execute(t1.select())

        # manual flagging

        conn.execution_options(native_odbc_execute=True).\
                execute(t1.select())
        conn.execution_options(native_odbc_execute=False).\
                execute(t1.insert().values(c1='foo'
                ))
        eq_(dbapi.log, [
            'execute',
            'execute',
            'execute',
            'executedirect',
            'execute',
            'executedirect',
            ])
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:33,代码来源:test_mxodbc.py


示例4: test_per_connection

    def test_per_connection(self):
        from sqlalchemy.pool import QueuePool
        eng = testing_engine(options=dict(
                                poolclass=QueuePool,
                                pool_size=2, max_overflow=0))

        c1 = eng.connect()
        c1 = c1.execution_options(
                    isolation_level=self._non_default_isolation_level()
                )
        c2 = eng.connect()
        eq_(
            eng.dialect.get_isolation_level(c1.connection),
            self._non_default_isolation_level()
        )
        eq_(
            eng.dialect.get_isolation_level(c2.connection),
            self._default_isolation_level()
        )
        c1.close()
        c2.close()
        c3 = eng.connect()
        eq_(
            eng.dialect.get_isolation_level(c3.connection),
            self._default_isolation_level()
        )
        c4 = eng.connect()
        eq_(
            eng.dialect.get_isolation_level(c4.connection),
            self._default_isolation_level()
        )

        c3.close()
        c4.close()
开发者ID:NoNo1234,项目名称:the_walking_project,代码行数:34,代码来源:test_transaction.py


示例5: go

        def go():
            engine = engines.testing_engine(
                                options={'logging_name':'FOO',
                                        'pool_logging_name':'BAR',
                                        'use_reaper':False}
                                    )
            sess = create_session(bind=engine)

            a1 = A(col2="a1")
            a2 = A(col2="a2")
            a3 = A(col2="a3")
            a1.bs.append(B(col2="b1"))
            a1.bs.append(B(col2="b2"))
            a3.bs.append(B(col2="b3"))
            for x in [a1,a2,a3]:
                sess.add(x)
            sess.flush()
            sess.expunge_all()

            alist = sess.query(A).all()
            eq_(
                [
                    A(col2="a1", bs=[B(col2="b1"), B(col2="b2")]),
                    A(col2="a2", bs=[]),
                    A(col2="a3", bs=[B(col2="b3")])
                ],
                alist)

            for a in alist:
                sess.delete(a)
            sess.flush()
            sess.close()
            engine.dispose()
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:33,代码来源:test_memusage.py


示例6: test_retval_flag

    def test_retval_flag(self):
        canary = []
        def tracker(name):
            def go(conn, *args, **kw):
                canary.append(name)
            return go

        def execute(conn, clauseelement, multiparams, params):
            canary.append('execute')
            return clauseelement, multiparams, params

        def cursor_execute(conn, cursor, statement, 
                        parameters, context, executemany):
            canary.append('cursor_execute')
            return statement, parameters

        engine = engines.testing_engine()

        assert_raises(
            tsa.exc.ArgumentError,
            event.listen, engine, "begin", tracker("begin"), retval=True
        )

        event.listen(engine, "before_execute", execute, retval=True)
        event.listen(engine, "before_cursor_execute", cursor_execute, retval=True)
        engine.execute(select([1]))
        eq_(
            canary, ['execute', 'cursor_execute']
        )
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:29,代码来源:test_execute.py


示例7: test_transactional

    def test_transactional(self):
        canary = []
        def tracker(name):
            def go(conn, *args, **kw):
                canary.append(name)
            return go

        engine = engines.testing_engine()
        event.listen(engine, 'before_execute', tracker('execute'))
        event.listen(engine, 'before_cursor_execute', tracker('cursor_execute'))
        event.listen(engine, 'begin', tracker('begin'))
        event.listen(engine, 'commit', tracker('commit'))
        event.listen(engine, 'rollback', tracker('rollback'))

        conn = engine.connect()
        trans = conn.begin()
        conn.execute(select([1]))
        trans.rollback()
        trans = conn.begin()
        conn.execute(select([1]))
        trans.commit()

        eq_(canary, [
            'begin', 'execute', 'cursor_execute', 'rollback',
            'begin', 'execute', 'cursor_execute', 'commit',
            ])
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:26,代码来源:test_execute.py


示例8: test_explode_in_initializer

    def test_explode_in_initializer(self):
        engine = engines.testing_engine()

        def broken_initialize(connection):
            connection.execute("select fake_stuff from _fake_table")

        engine.dialect.initialize = broken_initialize

        # raises a DBAPIError, not an AttributeError
        assert_raises(exc.DBAPIError, engine.connect)

        # dispose connections so we get a new one on
        # next go
        engine.dispose()

        p1 = engine.pool

        def is_disconnect(e, conn, cursor):
            return True

        engine.dialect.is_disconnect = is_disconnect

        # invalidate() also doesn't screw up
        assert_raises(exc.DBAPIError, engine.connect)

        # pool was recreated
        assert engine.pool is not p1
开发者ID:onetera,项目名称:scandatatransfer,代码行数:27,代码来源:test_reconnect.py


示例9: test_twophase

    def test_twophase(self):
        users, Address, addresses, User = (self.tables.users,
                                self.classes.Address,
                                self.tables.addresses,
                                self.classes.User)

        # TODO: mock up a failure condition here
        # to ensure a rollback succeeds
        mapper(User, users)
        mapper(Address, addresses)

        engine2 = engines.testing_engine()
        sess = create_session(autocommit=True, autoflush=False,
                            twophase=True)
        sess.bind_mapper(User, testing.db)
        sess.bind_mapper(Address, engine2)
        sess.begin()
        u1 = User(name='u1')
        a1 = Address(email_address='[email protected]')
        sess.add_all((u1, a1))
        sess.commit()
        sess.close()
        engine2.dispose()
        assert users.count().scalar() == 1
        assert addresses.count().scalar() == 1
开发者ID:ioram7,项目名称:keystone-federado-pgid2013,代码行数:25,代码来源:test_transaction.py


示例10: test_levels

    def test_levels(self):
        e1 = engines.testing_engine()

        eq_(e1._should_log_info(), False)
        eq_(e1._should_log_debug(), False)
        eq_(e1.logger.isEnabledFor(logging.INFO), False)
        eq_(e1.logger.getEffectiveLevel(), logging.WARN)

        e1.echo = True
        eq_(e1._should_log_info(), True)
        eq_(e1._should_log_debug(), False)
        eq_(e1.logger.isEnabledFor(logging.INFO), True)
        eq_(e1.logger.getEffectiveLevel(), logging.INFO)

        e1.echo = 'debug'
        eq_(e1._should_log_info(), True)
        eq_(e1._should_log_debug(), True)
        eq_(e1.logger.isEnabledFor(logging.DEBUG), True)
        eq_(e1.logger.getEffectiveLevel(), logging.DEBUG)

        e1.echo = False
        eq_(e1._should_log_info(), False)
        eq_(e1._should_log_debug(), False)
        eq_(e1.logger.isEnabledFor(logging.INFO), False)
        eq_(e1.logger.getEffectiveLevel(), logging.WARN)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:25,代码来源:test_execute.py


示例11: test_transactional_advanced

    def test_transactional_advanced(self):
        canary = []
        def tracker(name):
            def go(*args, **kw):
                canary.append(name)
            return go

        engine = engines.testing_engine()
        for name in ['begin', 'savepoint', 
                    'rollback_savepoint', 'release_savepoint',
                    'rollback', 'begin_twophase', 
                       'prepare_twophase', 'commit_twophase']:
            event.listen(engine, '%s' % name, tracker(name))

        conn = engine.connect()

        trans = conn.begin()
        trans2 = conn.begin_nested()
        conn.execute(select([1]))
        trans2.rollback()
        trans2 = conn.begin_nested()
        conn.execute(select([1]))
        trans2.commit()
        trans.rollback()

        trans = conn.begin_twophase()
        conn.execute(select([1]))
        trans.prepare()
        trans.commit()

        eq_(canary, ['begin', 'savepoint', 
                    'rollback_savepoint', 'savepoint', 'release_savepoint',
                    'rollback', 'begin_twophase', 
                       'prepare_twophase', 'commit_twophase']
        )
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:35,代码来源:test_execute.py


示例12: test_ad_hoc_types

    def test_ad_hoc_types(self):
        """test storage of bind processors, result processors
        in dialect-wide registry."""

        from sqlalchemy.dialects import mysql, postgresql, sqlite
        from sqlalchemy import types

        eng = engines.testing_engine()
        for args in (
            (types.Integer, ),
            (types.String, ),
            (types.PickleType, ),
            (types.Enum, 'a', 'b', 'c'),
            (sqlite.DATETIME, ),
            (postgresql.ENUM, 'a', 'b', 'c'),
            (types.Interval, ),
            (postgresql.INTERVAL, ),
            (mysql.VARCHAR, ),
        ):
            @profile_memory
            def go():
                type_ = args[0](*args[1:])
                bp = type_._cached_bind_processor(eng.dialect)
                rp = type_._cached_result_processor(eng.dialect, 0)
            go()

        assert not eng.dialect._type_memos
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:27,代码来源:test_memusage.py


示例13: _named_engine

 def _named_engine(self, **kw):
     options = {
         'logging_name':'myenginename',
         'pool_logging_name':'mypoolname'
     }
     options.update(kw)
     return engines.testing_engine(options=options)
开发者ID:onetera,项目名称:scandatatransfer,代码行数:7,代码来源:test_execute.py


示例14: setUp

    def setUp(self):
        global db1, db2, db3, db4, weather_locations, weather_reports

        try:
            db1 = testing_engine('sqlite:///shard1.db', options=dict(pool_threadlocal=True))
        except ImportError:
            raise SkipTest('Requires sqlite')
        db2 = testing_engine('sqlite:///shard2.db')
        db3 = testing_engine('sqlite:///shard3.db')
        db4 = testing_engine('sqlite:///shard4.db')

        meta = MetaData()
        ids = Table('ids', meta,
            Column('nextid', Integer, nullable=False))

        def id_generator(ctx):
            # in reality, might want to use a separate transaction for this.

            c = db1.contextual_connect()
            nextid = c.execute(ids.select(for_update=True)).scalar()
            c.execute(ids.update(values={ids.c.nextid : ids.c.nextid + 1}))
            return nextid

        weather_locations = Table("weather_locations", meta,
                Column('id', Integer, primary_key=True, default=id_generator),
                Column('continent', String(30), nullable=False),
                Column('city', String(50), nullable=False)
            )

        weather_reports = Table(
            'weather_reports',
            meta,
            Column('id', Integer, primary_key=True),
            Column('location_id', Integer,
                   ForeignKey('weather_locations.id')),
            Column('temperature', Float),
            Column('report_time', DateTime,
                   default=datetime.datetime.now),
            )

        for db in (db1, db2, db3, db4):
            meta.create_all(db)

        db1.execute(ids.insert(), nextid=1)

        self.setup_session()
        self.setup_mappers()
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:47,代码来源:test_horizontal_shard.py


示例15: test_invalid_level

 def test_invalid_level(self):
     eng = testing_engine(options=dict(isolation_level='FOO'))
     assert_raises_message(
         exc.ArgumentError, 
             "Invalid value '%s' for isolation_level. "
             "Valid isolation levels for %s are %s" % 
             ("FOO", eng.dialect.name, ", ".join(eng.dialect._isolation_lookup)),
         eng.connect)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:8,代码来源:test_transaction.py


示例16: test_per_engine_independence

    def test_per_engine_independence(self):
        e1 = testing_engine(config.db_url)
        e2 = testing_engine(config.db_url)

        canary = []
        def before_exec(conn, stmt, *arg):
            canary.append(stmt)
        event.listen(e1, "before_execute", before_exec)
        s1 = select([1])
        s2 = select([2])
        e1.execute(s1)
        e2.execute(s2)
        eq_(canary, [s1])
        event.listen(e2, "before_execute", before_exec)
        e1.execute(s1)
        e2.execute(s2)
        eq_(canary, [s1, s1, s2])
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:17,代码来源:test_execute.py


示例17: setup_class

 def setup_class(cls):
     global users, metadata, tlengine
     tlengine = testing_engine(options=dict(strategy='threadlocal'))
     metadata = MetaData()
     users = Table('query_users', metadata, Column('user_id', INT,
                   Sequence('query_users_id_seq', optional=True),
                   primary_key=True), Column('user_name',
                   VARCHAR(20)), test_needs_acid=True)
     metadata.create_all(tlengine)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:9,代码来源:test_transaction.py


示例18: testing_engine

    def testing_engine(self):
        e = engines.testing_engine()

        # do an initial execute to clear out 'first connect'
        # messages
        e.execute(select([10])).close()
        self.buf.flush()

        return e
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:9,代码来源:test_execute.py


示例19: setup

 def setup(self):
     self.eng = engines.testing_engine(options={'echo':True})
     self.eng.execute("create table foo (data string)")
     self.buf = logging.handlers.BufferingHandler(100)
     for log in [
         logging.getLogger('sqlalchemy.engine'),
         logging.getLogger('sqlalchemy.pool')
     ]:
         log.addHandler(self.buf)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:9,代码来源:test_execute.py


示例20: test_prepare_no_trans

    def test_prepare_no_trans(self):
        tlengine = testing_engine(options=dict(strategy="threadlocal"))

        # shouldn't fail
        tlengine.prepare()

        tlengine.begin()
        tlengine.rollback()

        # shouldn't fail
        tlengine.prepare()
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:11,代码来源:test_transaction.py



注:本文中的test.lib.engines.testing_engine函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python schema.Table类代码示例发布时间:2022-05-27
下一篇:
Python hquery_test_util.query_html_doc函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap