• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python api._parse_db_str函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中trac.db.api._parse_db_str函数的典型用法代码示例。如果您正苦于以下问题:Python _parse_db_str函数的具体用法?Python _parse_db_str怎么用?Python _parse_db_str使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了_parse_db_str函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_sqlite_absolute

 def test_sqlite_absolute(self):
     # Standard syntax
     self.assertEqual(('sqlite', {'path': '/var/db/trac.db'}),
                      _parse_db_str('sqlite:///var/db/trac.db'))
     # Legacy syntax
     self.assertEqual(('sqlite', {'path': '/var/db/trac.db'}),
                      _parse_db_str('sqlite:/var/db/trac.db'))
开发者ID:Stackato-Apps,项目名称:bloodhound,代码行数:7,代码来源:api.py


示例2: backup

    def backup(self, dest_file):
        from subprocess import Popen, PIPE
        db_url = self.env.config.get('trac', 'database')
        scheme, db_prop = _parse_db_str(db_url)
        db_params = db_prop.setdefault('params', {})
        db_name = os.path.basename(db_prop['path'])

        args = [self.pg_dump_path, '-C', '--inserts', '-x', '-Z', '8']
        if 'user' in db_prop:
            args.extend(['-U', db_prop['user']])
        if 'host' in db_params:
            host = db_params['host']
        else:
            host = db_prop.get('host')
        if host:
            args.extend(['-h', host])
            if '/' not in host:
                args.extend(['-p', str(db_prop.get('port', '5432'))])

        if 'schema' in db_params:
            try:
                p = Popen([self.pg_dump_path, '--version'], stdout=PIPE,
                          close_fds=close_fds)
            except OSError, e:
                raise TracError(_("Unable to run %(path)s: %(msg)s",
                                  path=self.pg_dump_path,
                                  msg=exception_to_unicode(e)))
            # Need quote for -n (--schema) option in PostgreSQL 8.2+
            version = p.communicate()[0]
            if re.search(r' 8\.[01]\.', version):
                args.extend(['-n', db_params['schema']])
            else:
                args.extend(['-n', '"%s"' % db_params['schema']])
开发者ID:exocad,项目名称:exotrac,代码行数:33,代码来源:postgres_backend.py


示例3: backup

    def backup(self, dest_file):
        from subprocess import Popen, PIPE

        db_url = self.env.config.get("trac", "database")
        scheme, db_prop = _parse_db_str(db_url)
        db_params = db_prop.setdefault("params", {})
        db_name = os.path.basename(db_prop["path"])

        args = [self.mysqldump_path]
        if "host" in db_prop:
            args.extend(["-h", db_prop["host"]])
        if "port" in db_prop:
            args.extend(["-P", str(db_prop["port"])])
        if "user" in db_prop:
            args.extend(["-u", db_prop["user"]])
        for name, value in db_params.iteritems():
            if name == "compress" and as_int(value, 0):
                args.append("--compress")
            elif name == "named_pipe" and as_int(value, 0):
                args.append("--protocol=pipe")
            elif name == "read_default_file":  # Must be first
                args.insert(1, "--defaults-file=" + value)
            elif name == "unix_socket":
                args.extend(["--protocol=socket", "--socket=" + value])
            elif name not in ("init_command", "read_default_group"):
                self.log.warning("Invalid connection string parameter '%s'", name)
        args.extend(["-r", dest_file, db_name])

        environ = os.environ.copy()
        if "password" in db_prop:
            environ["MYSQL_PWD"] = str(db_prop["password"])
        try:
            p = Popen(args, env=environ, stderr=PIPE, close_fds=close_fds)
        except OSError, e:
            raise TracError(_("Unable to run %(path)s: %(msg)s", path=self.mysqldump_path, msg=exception_to_unicode(e)))
开发者ID:dinhkhanh,项目名称:trac,代码行数:35,代码来源:mysql_backend.py


示例4: reset_db

 def reset_db(self, default_data=None):
     """Remove all data from Trac tables, keeping the tables themselves.
     :param default_data: after clean-up, initialize with default data
     :return: True upon success
     """
     from trac import db_default
     scheme, db_prop = _parse_db_str(self.dburi)
     tables = []
     remove_sqlite_db = False
     try:
         with self.db_transaction as db:
             db.rollback() # make sure there's no transaction in progress
             # check the database version
             database_version = db(
                 "SELECT value FROM system WHERE name='database_version'")
             if database_version:
                 database_version = int(database_version[0][0])
             if database_version == db_default.db_version:
                 # same version, simply clear the tables (faster)
                 m = sys.modules[__name__]
                 reset_fn = 'reset_%s_db' % scheme
                 if hasattr(m, reset_fn):
                     tables = getattr(m, reset_fn)(self, db_prop)
             else:
                 # different version or version unknown, drop the tables
                 remove_sqlite_db = True
                 self.destroy_db(scheme, db_prop)
     except Exception, e:
         # "Database not found ...",
         # "OperationalError: no such table: system" or the like
         pass
开发者ID:thimalk,项目名称:bloodhound,代码行数:31,代码来源:test.py


示例5: backup

    def backup(self, dest_file):
        try:
            from subprocess import Popen, PIPE
        except ImportError:
            raise TracError('Python >= 2.4 or the subprocess module '
                            'is required for pre-upgrade backup support')
        db_url = self.env.config.get('trac', 'database')
        scheme, db_prop = _parse_db_str(db_url)
        db_name = os.path.basename(db_prop['path'])

        args = [self.mysqldump_path]
        if 'host' in db_prop:
            args.extend(['-h', db_prop['host']])
        if 'port' in db_prop:
            args.extend(['-P', str(db_prop['port'])])
        if 'user' in db_prop:
            args.extend(['-u', db_prop['user']])
        args.extend(['-r', dest_file, db_name])
        
        environ = os.environ.copy()
        environ['MYSQL_PWD'] = str(db_prop['password'])
        p = Popen(args, env=environ, stderr=PIPE, close_fds=close_fds)
        errmsg = p.communicate()[1]
        if p.returncode != 0:
            raise TracError("Backup attempt failed (%s)" % to_unicode(errmsg))
        if not os.path.exists(dest_file):
            raise TracError("Backup attempt failed")
        return dest_file
开发者ID:gdgkyoto,项目名称:kyoto-gtug,代码行数:28,代码来源:mysql_backend.py


示例6: _do_migrate_inplace

    def _do_migrate_inplace(self, dburi):
        src_dburi = self.config.get('trac', 'database')
        if src_dburi == dburi:
            self._printerr('Source database and destination database are '
                           'same: %s', dburi)
            return 1

        env_path = mkdtemp(prefix='migrate-',
                           dir=os.path.dirname(self.env.path))
        try:
            dst_env = self._create_env(env_path, dburi)
            src_db = get_connection(self.env)
            dst_db = get_connection(dst_env)
            self._copy_tables(src_db, dst_db, src_dburi, dburi, inplace=True)
            del src_db
            del dst_db
            dst_env.shutdown()
            dst_env = None
            if dburi.startswith('sqlite:'):
                schema, params = _parse_db_str(dburi)
                dbpath = os.path.join(self.env.path, params['path'])
                dbdir = os.path.dirname(dbpath)
                if not os.path.isdir(dbdir):
                    os.makedirs(dbdir)
                shutil.copy(os.path.join(env_path, params['path']), dbpath)
        finally:
            shutil.rmtree(env_path)

        self._backup_tracini(self.env)
        self.config.set('trac', 'database', dburi)
        self.config.save()
开发者ID:jun66j5,项目名称:tracmigrateplugin,代码行数:31,代码来源:admin.py


示例7: _get_mysql_process

    def _get_mysql_process(self, env):
        """
        Returns the mysql process object, to run the client
        in python process.

        :param env: Environment instance against which the mysql client process should be created.
        """
        db_url = env.config.get('trac', 'database')
        scheme, db_prop = api._parse_db_str(db_url)
        db_name = os.path.basename(db_prop['path'])

        # Construct the mysql client command string
        args = [self.mysql_path]
        if 'host' in db_prop:
            args.extend(['-h', db_prop['host']])
        if 'port' in db_prop:
            args.extend(['-P', str(db_prop['port'])])
        if 'user' in db_prop:
            args.extend(['-u', db_prop['user']])
        args.extend([db_name])

        # Set mysql password into environment variable
        environ = os.environ.copy()
        if 'password' in db_prop:
            environ['MYSQL_PWD'] = str(db_prop['password'])

        return Popen(args, env=environ, stderr=PIPE, stdin=PIPE, close_fds=close_fds)
开发者ID:alvabai,项目名称:trac-multiproject,代码行数:27,代码来源:backup.py


示例8: write_simple_jndi_properties

def write_simple_jndi_properties(env, targetdir, connection_uri=None, ip=None):
    if not connection_uri:
        connection_uri = DatabaseManager(env).connection_uri
    if not os.path.exists(os.path.join(targetdir,"simple-jndi")):
        os.mkdir(os.path.join(targetdir,"simple-jndi"))
    scheme, args = _parse_db_str(connection_uri)
    if scheme == 'sqlite':
        if not args['path'].startswith('/'):
            args['path'] = os.path.join(env.path, args['path'].lstrip('/'))
        jdbcDriver = "org.sqlite.JDBC"
        jdbcConnection = "jdbc:sqlite:%s" % args['path']
        jdbcUser = ""
        jdbcPassword = ""
    elif scheme == 'postgres':
        jdbcDriver = "org.postgresql.Driver"
        args['path'] = args['path'].strip("/")
        if ip:
            args['host'] = ip
        jdbcConnection = "jdbc:postgresql://%(host)s/%(path)s" % args
        jdbcUser = args['user']
        jdbcPassword = args['password']
    else:
        raise KeyError("Unknown database scheme %s" % scheme)

    jndi_filename = os.path.join(os.path.join(targetdir,"simple-jndi"), "default.properties")
    properties = open(jndi_filename, 'w')
    properties.write("projectdata/type=javax.sql.DataSource\n")
    properties.write("projectdata/driver=%s\n" % jdbcDriver)
    properties.write("projectdata/url=%s\n" % jdbcConnection)
    properties.write("projectdata/user=%s\n" % jdbcUser)
    properties.write("projectdata/password=%s\n" % jdbcPassword)
    properties.close()

    env.log.info("Written JNDI details to %s", jndi_filename)
开发者ID:CGI-define-and-primeportal,项目名称:trac-plugin-kettle,代码行数:34,代码来源:util.py


示例9: backup

    def backup(self, dest_file):
        from subprocess import Popen, PIPE
        db_url = self.env.config.get('trac', 'database')
        scheme, db_prop = _parse_db_str(db_url)
        db_params = db_prop.setdefault('params', {})
        db_name = os.path.basename(db_prop['path'])

        args = [self.pg_dump_path, '-C', '--inserts', '-x', '-Z', '8']
        if 'user' in db_prop:
            args.extend(['-U', db_prop['user']])
        if 'host' in db_params:
            host = db_params['host']
        else:
            host = db_prop.get('host')
        if host:
            args.extend(['-h', host])
            if '/' not in host:
                args.extend(['-p', str(db_prop.get('port', '5432'))])

        if 'schema' in db_params:
            args.extend(['-n', '"%s"' % db_params['schema']])

        dest_file += ".gz"
        args.extend(['-f', dest_file, db_name])

        environ = os.environ.copy()
        if 'password' in db_prop:
            environ['PGPASSWORD'] = str(db_prop['password'])
        try:
            p = Popen(args, env=environ, stderr=PIPE, close_fds=close_fds)
        except OSError, e:
            raise TracError(_("Unable to run %(path)s: %(msg)s",
                              path=self.pg_dump_path,
                              msg=exception_to_unicode(e)))
开发者ID:wiraqutra,项目名称:photrackjp,代码行数:34,代码来源:postgres_backend.py


示例10: reset_db

    def reset_db(self, default_data=None):
        from agilo.test.functional.api import EnvironmentBuilder
        env = EnvironmentBuilder.get_testenv(self.env_key)
        from trac.db.api import _parse_db_str
        scheme, db_prop = _parse_db_str(env.get_db_url())

        if scheme != 'sqlite' and not default_data:
            return super(BetterEnvironmentStub, self).reset_db(default_data)

        env_for_transaction = env.get_trac_environment()
        if AgiloTicketSystem.is_trac_1_0():
            env_for_transaction = env

        tables = []
        if scheme != 'sqlite':
            db = self.get_db_cnx()
            @with_transaction(env_for_transaction, db)
            def implementation(db):
                cursor = db.cursor()
                cursor.execute("update system set value='9999' WHERE name='database_version'")
                db.commit()

            tables = super(BetterEnvironmentStub, self).reset_db(default_data)
        else:
            from trac import db_default
            from trac.db_default import schema
            from trac.db.sqlite_backend import _to_sql

            # our 'destroy_db'
            db = self.get_db_cnx()
            @with_transaction(env_for_transaction, db)
            def implementation(db):
                cursor = db.cursor()
                cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
                tables = cursor.fetchall()
                for table in tables:
                    cursor.execute("DROP TABLE %s" % table)

                # part of sqlite_backend's init_db
                for table in schema:
                    for stmt in _to_sql(table):
                        cursor.execute(stmt)

                # part of reset_db
                for table, cols, vals in db_default.get_data(db):
                    cursor.executemany("INSERT INTO %s (%s) VALUES (%s)"
                                       % (table, ','.join(cols),
                                          ','.join(['%s' for c in cols])),
                        vals)
                db.commit()

        if env.tester.testcase.testtype != 'unittest':
            try:
                env._upgrade_environment()
                env._setup_users_and_permissions()
            except:
                # it's possible that this has already happened
                print "Warning: Exception on post-reset_db tasks"

        return tables
开发者ID:djangsters,项目名称:agilo,代码行数:60,代码来源:test_env_helper.py


示例11: test_sqlite_windows_path

 def test_sqlite_windows_path(self):
     # In-memory database
     os_name = os.name
     try:
         os.name = 'nt'
         self.assertEqual(('sqlite', {'path': 'C:/project/db/trac.db'}),
                          _parse_db_str('sqlite:C|/project/db/trac.db'))
     finally:
         os.name = os_name
开发者ID:Stackato-Apps,项目名称:bloodhound,代码行数:9,代码来源:api.py


示例12: test_sqlite_windows_path

 def test_sqlite_windows_path(self):
     # In-memory database
     os_name = os.name
     try:
         os.name = "nt"
         self.assertEqual(
             ("sqlite", {"path": "C:/project/db/trac.db"}), _parse_db_str("sqlite:C|/project/db/trac.db")
         )
     finally:
         os.name = os_name
开发者ID:dafrito,项目名称:trac-mirror,代码行数:10,代码来源:api.py


示例13: reset_db

    def reset_db(self, default_data=None):
        """Remove all data from Trac tables, keeping the tables themselves.
        :param default_data: after clean-up, initialize with default data
        :return: True upon success
        """
        from trac import db_default
        if EnvironmentStub.dbenv:
            db = self.get_db_cnx()
            scheme, db_prop = _parse_db_str(self.dburi)

            tables = []
            db.rollback() # make sure there's no transaction in progress
            try:
                # check the database version
                cursor = db.cursor()
                cursor.execute("SELECT value FROM system "
                               "WHERE name='database_version'")
                database_version = cursor.fetchone()
                if database_version:
                    database_version = int(database_version[0])
                if database_version == db_default.db_version:
                    # same version, simply clear the tables (faster)
                    m = sys.modules[__name__]
                    reset_fn = 'reset_%s_db' % scheme
                    if hasattr(m, reset_fn):
                        tables = getattr(m, reset_fn)(db, db_prop)
                else:
                    # different version or version unknown, drop the tables
                    self.destroy_db(scheme, db_prop)
            except:
                db.rollback()
                # tables are likely missing

            if not tables:
                del db
                dm = DatabaseManager(EnvironmentStub.dbenv)
                dm.init_db()
                # we need to make sure the next get_db_cnx() will re-create 
                # a new connection aware of the new data model - see #8518.
                dm.shutdown() 

        db = self.get_db_cnx()
        cursor = db.cursor()
        if default_data:
            for table, cols, vals in db_default.get_data(db):
                cursor.executemany("INSERT INTO %s (%s) VALUES (%s)"
                                   % (table, ','.join(cols),
                                      ','.join(['%s' for c in cols])),
                                   vals)
        elif EnvironmentStub.dbenv:
            cursor.execute("INSERT INTO system (name, value) "
                           "VALUES (%s, %s)",
                           ('database_version', str(db_default.db_version)))
        db.commit()
开发者ID:wiraqutra,项目名称:photrackjp,代码行数:54,代码来源:test.py


示例14: reset_db

    def reset_db(self, default_data=None):
        """Remove all data from Trac tables, keeping the tables themselves.
        :param default_data: after clean-up, initialize with default data
        :return: True upon success
        """
        from trac import db_default
        scheme, db_prop = _parse_db_str(self.dburi)
        tables = []
        remove_sqlite_db = False
        try:
            with self.db_transaction as db:
                db.rollback()  # make sure there's no transaction in progress
                # check the database version
                database_version = self.get_version()
        except Exception:
            # "Database not found ...",
            # "OperationalError: no such table: system" or the like
            pass
        else:
            if database_version == db_default.db_version:
                # same version, simply clear the tables (faster)
                m = sys.modules[__name__]
                reset_fn = 'reset_%s_db' % scheme
                if hasattr(m, reset_fn):
                    tables = getattr(m, reset_fn)(self, db_prop)
            else:
                # different version or version unknown, drop the tables
                remove_sqlite_db = True
                self.destroy_db(scheme, db_prop)

        if scheme == 'sqlite' and remove_sqlite_db:
            path = db_prop['path']
            if path != ':memory:':
                if not os.path.isabs(path):
                    path = os.path.join(self.path, path)
                self.global_databasemanager.shutdown()
                os.remove(path)

        if not tables:
            self.global_databasemanager.init_db()
            # we need to make sure the next get_db_cnx() will re-create
            # a new connection aware of the new data model - see #8518.
            if self.dburi != 'sqlite::memory:':
                self.global_databasemanager.shutdown()

        with self.db_transaction as db:
            if default_data:
                for table, cols, vals in db_default.get_data(db):
                    db.executemany("INSERT INTO %s (%s) VALUES (%s)"
                                   % (table, ','.join(cols),
                                      ','.join(['%s'] * len(cols))), vals)
            else:
                db("INSERT INTO system (name, value) VALUES (%s, %s)",
                   ('database_version', str(db_default.db_version)))
开发者ID:exocad,项目名称:exotrac,代码行数:54,代码来源:test.py


示例15: get_dburi

def get_dburi():
    dburi = os.environ.get("TRAC_TEST_DB_URI")
    if dburi:
        scheme, db_prop = _parse_db_str(dburi)
        # Assume the schema 'tractest' for Postgres
        if scheme == "postgres" and not db_prop.get("params", {}).get("schema"):
            if "?" in dburi:
                dburi += "&schema=tractest"
            else:
                dburi += "?schema=tractest"
        return dburi
    return "sqlite::memory:"
开发者ID:dafrito,项目名称:trac-mirror,代码行数:12,代码来源:test.py


示例16: get_dburi

def get_dburi():
    dburi = os.environ.get('TRAC_TEST_DB_URI')
    if dburi:
        scheme, db_prop = _parse_db_str(dburi)
        # Assume the schema 'tractest' for Postgres
        if scheme == 'postgres' and \
                not db_prop.get('params', {}).get('schema'):
            if '?' in dburi:
                dburi += "&schema=tractest"
            else:
                dburi += "?schema=tractest"
        return dburi
    return 'sqlite::memory:'
开发者ID:thimalk,项目名称:bloodhound,代码行数:13,代码来源:test.py


示例17: get_dburi

def get_dburi():
    dburi = os.environ.get('TRAC_TEST_DB_URI')
    if dburi:
        scheme, db_prop = _parse_db_str(dburi)
        # Assume the schema 'tractest' for PostgreSQL
        if scheme == 'postgres' and \
                not db_prop.get('params', {}).get('schema'):
            dburi += ('&' if '?' in dburi else '?') + 'schema=tractest'
        elif scheme == 'sqlite' and db_prop['path'] != ':memory:' and \
                not db_prop.get('params', {}).get('synchronous'):
            # Speed-up tests with SQLite database
            dburi += ('&' if '?' in dburi else '?') + 'synchronous=off'
        return dburi
    return 'sqlite::memory:'
开发者ID:exocad,项目名称:exotrac,代码行数:14,代码来源:test.py


示例18: destroy_db

 def destroy_db(self, scheme=None, db_prop=None):
     if not (scheme and db_prop):
         scheme, db_prop = _parse_db_str(self.dburi)
     try:
         with self.db_transaction as db:
             if scheme == 'postgres' and db.schema:
                 db('DROP SCHEMA %s CASCADE' % db.quote(db.schema))
             elif scheme == 'mysql':
                 for table in db.get_table_names():
                     db("DROP TABLE IF EXISTS `%s`" % table)
     except Exception:
         # "TracError: Database not found...",
         # psycopg2.ProgrammingError: schema "tractest" does not exist
         pass
     return False
开发者ID:exocad,项目名称:exotrac,代码行数:15,代码来源:test.py


示例19: destroy_db

 def destroy_db(self, scheme=None, db_prop=None):
     if not (scheme and db_prop):
         scheme, db_prop = _parse_db_str(self.dburi)
     try:
         with self.db_transaction as db:
             if scheme == 'postgres' and db.schema:
                 db('DROP SCHEMA "%s" CASCADE' % db.schema)
             elif scheme == 'mysql':
                 dbname = os.path.basename(db_prop['path'])
                 for table in db("""
                       SELECT table_name FROM information_schema.tables
                       WHERE table_schema=%s""", (dbname,)):
                     db("DROP TABLE IF EXISTS `%s`" % table)
     except Exception:
         # "TracError: Database not found...",
         # psycopg2.ProgrammingError: schema "tractest" does not exist
         pass
     return False
开发者ID:thimalk,项目名称:bloodhound,代码行数:18,代码来源:test.py


示例20: destroy_db

    def destroy_db(self, scheme=None, db_prop=None):
        if not (scheme and db_prop):
            scheme, db_prop = _parse_db_str(self.dburi)

        db = self.get_db_cnx(destroying=True)
        cursor = db.cursor()
        try:
            if scheme == 'postgres' and db.schema:
                cursor.execute('DROP SCHEMA "%s" CASCADE' % db.schema)
            elif scheme == 'mysql':
                dbname = os.path.basename(db_prop['path'])
                cursor = db.cursor()
                cursor.execute('SELECT table_name FROM '
                               '  information_schema.tables '
                               'WHERE table_schema=%s', (dbname,))
                tables = cursor.fetchall()
                for t in tables:
                    cursor.execute('DROP TABLE IF EXISTS `%s`' % t)
            db.commit()
        except Exception:
            db.rollback()
开发者ID:wiraqutra,项目名称:photrackjp,代码行数:21,代码来源:test.py



注:本文中的trac.db.api._parse_db_str函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python api.DatabaseManager类代码示例发布时间:2022-05-27
下一篇:
Python db.DatabaseManager类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap