本文整理汇总了Python中trac.db.api._parse_db_str函数的典型用法代码示例。如果您正苦于以下问题:Python _parse_db_str函数的具体用法?Python _parse_db_str怎么用?Python _parse_db_str使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了_parse_db_str函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_sqlite_absolute
def test_sqlite_absolute(self):
# Standard syntax
self.assertEqual(('sqlite', {'path': '/var/db/trac.db'}),
_parse_db_str('sqlite:///var/db/trac.db'))
# Legacy syntax
self.assertEqual(('sqlite', {'path': '/var/db/trac.db'}),
_parse_db_str('sqlite:/var/db/trac.db'))
开发者ID:Stackato-Apps,项目名称:bloodhound,代码行数:7,代码来源:api.py
示例2: backup
def backup(self, dest_file):
from subprocess import Popen, PIPE
db_url = self.env.config.get('trac', 'database')
scheme, db_prop = _parse_db_str(db_url)
db_params = db_prop.setdefault('params', {})
db_name = os.path.basename(db_prop['path'])
args = [self.pg_dump_path, '-C', '--inserts', '-x', '-Z', '8']
if 'user' in db_prop:
args.extend(['-U', db_prop['user']])
if 'host' in db_params:
host = db_params['host']
else:
host = db_prop.get('host')
if host:
args.extend(['-h', host])
if '/' not in host:
args.extend(['-p', str(db_prop.get('port', '5432'))])
if 'schema' in db_params:
try:
p = Popen([self.pg_dump_path, '--version'], stdout=PIPE,
close_fds=close_fds)
except OSError, e:
raise TracError(_("Unable to run %(path)s: %(msg)s",
path=self.pg_dump_path,
msg=exception_to_unicode(e)))
# Need quote for -n (--schema) option in PostgreSQL 8.2+
version = p.communicate()[0]
if re.search(r' 8\.[01]\.', version):
args.extend(['-n', db_params['schema']])
else:
args.extend(['-n', '"%s"' % db_params['schema']])
开发者ID:exocad,项目名称:exotrac,代码行数:33,代码来源:postgres_backend.py
示例3: backup
def backup(self, dest_file):
from subprocess import Popen, PIPE
db_url = self.env.config.get("trac", "database")
scheme, db_prop = _parse_db_str(db_url)
db_params = db_prop.setdefault("params", {})
db_name = os.path.basename(db_prop["path"])
args = [self.mysqldump_path]
if "host" in db_prop:
args.extend(["-h", db_prop["host"]])
if "port" in db_prop:
args.extend(["-P", str(db_prop["port"])])
if "user" in db_prop:
args.extend(["-u", db_prop["user"]])
for name, value in db_params.iteritems():
if name == "compress" and as_int(value, 0):
args.append("--compress")
elif name == "named_pipe" and as_int(value, 0):
args.append("--protocol=pipe")
elif name == "read_default_file": # Must be first
args.insert(1, "--defaults-file=" + value)
elif name == "unix_socket":
args.extend(["--protocol=socket", "--socket=" + value])
elif name not in ("init_command", "read_default_group"):
self.log.warning("Invalid connection string parameter '%s'", name)
args.extend(["-r", dest_file, db_name])
environ = os.environ.copy()
if "password" in db_prop:
environ["MYSQL_PWD"] = str(db_prop["password"])
try:
p = Popen(args, env=environ, stderr=PIPE, close_fds=close_fds)
except OSError, e:
raise TracError(_("Unable to run %(path)s: %(msg)s", path=self.mysqldump_path, msg=exception_to_unicode(e)))
开发者ID:dinhkhanh,项目名称:trac,代码行数:35,代码来源:mysql_backend.py
示例4: reset_db
def reset_db(self, default_data=None):
"""Remove all data from Trac tables, keeping the tables themselves.
:param default_data: after clean-up, initialize with default data
:return: True upon success
"""
from trac import db_default
scheme, db_prop = _parse_db_str(self.dburi)
tables = []
remove_sqlite_db = False
try:
with self.db_transaction as db:
db.rollback() # make sure there's no transaction in progress
# check the database version
database_version = db(
"SELECT value FROM system WHERE name='database_version'")
if database_version:
database_version = int(database_version[0][0])
if database_version == db_default.db_version:
# same version, simply clear the tables (faster)
m = sys.modules[__name__]
reset_fn = 'reset_%s_db' % scheme
if hasattr(m, reset_fn):
tables = getattr(m, reset_fn)(self, db_prop)
else:
# different version or version unknown, drop the tables
remove_sqlite_db = True
self.destroy_db(scheme, db_prop)
except Exception, e:
# "Database not found ...",
# "OperationalError: no such table: system" or the like
pass
开发者ID:thimalk,项目名称:bloodhound,代码行数:31,代码来源:test.py
示例5: backup
def backup(self, dest_file):
try:
from subprocess import Popen, PIPE
except ImportError:
raise TracError('Python >= 2.4 or the subprocess module '
'is required for pre-upgrade backup support')
db_url = self.env.config.get('trac', 'database')
scheme, db_prop = _parse_db_str(db_url)
db_name = os.path.basename(db_prop['path'])
args = [self.mysqldump_path]
if 'host' in db_prop:
args.extend(['-h', db_prop['host']])
if 'port' in db_prop:
args.extend(['-P', str(db_prop['port'])])
if 'user' in db_prop:
args.extend(['-u', db_prop['user']])
args.extend(['-r', dest_file, db_name])
environ = os.environ.copy()
environ['MYSQL_PWD'] = str(db_prop['password'])
p = Popen(args, env=environ, stderr=PIPE, close_fds=close_fds)
errmsg = p.communicate()[1]
if p.returncode != 0:
raise TracError("Backup attempt failed (%s)" % to_unicode(errmsg))
if not os.path.exists(dest_file):
raise TracError("Backup attempt failed")
return dest_file
开发者ID:gdgkyoto,项目名称:kyoto-gtug,代码行数:28,代码来源:mysql_backend.py
示例6: _do_migrate_inplace
def _do_migrate_inplace(self, dburi):
src_dburi = self.config.get('trac', 'database')
if src_dburi == dburi:
self._printerr('Source database and destination database are '
'same: %s', dburi)
return 1
env_path = mkdtemp(prefix='migrate-',
dir=os.path.dirname(self.env.path))
try:
dst_env = self._create_env(env_path, dburi)
src_db = get_connection(self.env)
dst_db = get_connection(dst_env)
self._copy_tables(src_db, dst_db, src_dburi, dburi, inplace=True)
del src_db
del dst_db
dst_env.shutdown()
dst_env = None
if dburi.startswith('sqlite:'):
schema, params = _parse_db_str(dburi)
dbpath = os.path.join(self.env.path, params['path'])
dbdir = os.path.dirname(dbpath)
if not os.path.isdir(dbdir):
os.makedirs(dbdir)
shutil.copy(os.path.join(env_path, params['path']), dbpath)
finally:
shutil.rmtree(env_path)
self._backup_tracini(self.env)
self.config.set('trac', 'database', dburi)
self.config.save()
开发者ID:jun66j5,项目名称:tracmigrateplugin,代码行数:31,代码来源:admin.py
示例7: _get_mysql_process
def _get_mysql_process(self, env):
"""
Returns the mysql process object, to run the client
in python process.
:param env: Environment instance against which the mysql client process should be created.
"""
db_url = env.config.get('trac', 'database')
scheme, db_prop = api._parse_db_str(db_url)
db_name = os.path.basename(db_prop['path'])
# Construct the mysql client command string
args = [self.mysql_path]
if 'host' in db_prop:
args.extend(['-h', db_prop['host']])
if 'port' in db_prop:
args.extend(['-P', str(db_prop['port'])])
if 'user' in db_prop:
args.extend(['-u', db_prop['user']])
args.extend([db_name])
# Set mysql password into environment variable
environ = os.environ.copy()
if 'password' in db_prop:
environ['MYSQL_PWD'] = str(db_prop['password'])
return Popen(args, env=environ, stderr=PIPE, stdin=PIPE, close_fds=close_fds)
开发者ID:alvabai,项目名称:trac-multiproject,代码行数:27,代码来源:backup.py
示例8: write_simple_jndi_properties
def write_simple_jndi_properties(env, targetdir, connection_uri=None, ip=None):
if not connection_uri:
connection_uri = DatabaseManager(env).connection_uri
if not os.path.exists(os.path.join(targetdir,"simple-jndi")):
os.mkdir(os.path.join(targetdir,"simple-jndi"))
scheme, args = _parse_db_str(connection_uri)
if scheme == 'sqlite':
if not args['path'].startswith('/'):
args['path'] = os.path.join(env.path, args['path'].lstrip('/'))
jdbcDriver = "org.sqlite.JDBC"
jdbcConnection = "jdbc:sqlite:%s" % args['path']
jdbcUser = ""
jdbcPassword = ""
elif scheme == 'postgres':
jdbcDriver = "org.postgresql.Driver"
args['path'] = args['path'].strip("/")
if ip:
args['host'] = ip
jdbcConnection = "jdbc:postgresql://%(host)s/%(path)s" % args
jdbcUser = args['user']
jdbcPassword = args['password']
else:
raise KeyError("Unknown database scheme %s" % scheme)
jndi_filename = os.path.join(os.path.join(targetdir,"simple-jndi"), "default.properties")
properties = open(jndi_filename, 'w')
properties.write("projectdata/type=javax.sql.DataSource\n")
properties.write("projectdata/driver=%s\n" % jdbcDriver)
properties.write("projectdata/url=%s\n" % jdbcConnection)
properties.write("projectdata/user=%s\n" % jdbcUser)
properties.write("projectdata/password=%s\n" % jdbcPassword)
properties.close()
env.log.info("Written JNDI details to %s", jndi_filename)
开发者ID:CGI-define-and-primeportal,项目名称:trac-plugin-kettle,代码行数:34,代码来源:util.py
示例9: backup
def backup(self, dest_file):
from subprocess import Popen, PIPE
db_url = self.env.config.get('trac', 'database')
scheme, db_prop = _parse_db_str(db_url)
db_params = db_prop.setdefault('params', {})
db_name = os.path.basename(db_prop['path'])
args = [self.pg_dump_path, '-C', '--inserts', '-x', '-Z', '8']
if 'user' in db_prop:
args.extend(['-U', db_prop['user']])
if 'host' in db_params:
host = db_params['host']
else:
host = db_prop.get('host')
if host:
args.extend(['-h', host])
if '/' not in host:
args.extend(['-p', str(db_prop.get('port', '5432'))])
if 'schema' in db_params:
args.extend(['-n', '"%s"' % db_params['schema']])
dest_file += ".gz"
args.extend(['-f', dest_file, db_name])
environ = os.environ.copy()
if 'password' in db_prop:
environ['PGPASSWORD'] = str(db_prop['password'])
try:
p = Popen(args, env=environ, stderr=PIPE, close_fds=close_fds)
except OSError, e:
raise TracError(_("Unable to run %(path)s: %(msg)s",
path=self.pg_dump_path,
msg=exception_to_unicode(e)))
开发者ID:wiraqutra,项目名称:photrackjp,代码行数:34,代码来源:postgres_backend.py
示例10: reset_db
def reset_db(self, default_data=None):
from agilo.test.functional.api import EnvironmentBuilder
env = EnvironmentBuilder.get_testenv(self.env_key)
from trac.db.api import _parse_db_str
scheme, db_prop = _parse_db_str(env.get_db_url())
if scheme != 'sqlite' and not default_data:
return super(BetterEnvironmentStub, self).reset_db(default_data)
env_for_transaction = env.get_trac_environment()
if AgiloTicketSystem.is_trac_1_0():
env_for_transaction = env
tables = []
if scheme != 'sqlite':
db = self.get_db_cnx()
@with_transaction(env_for_transaction, db)
def implementation(db):
cursor = db.cursor()
cursor.execute("update system set value='9999' WHERE name='database_version'")
db.commit()
tables = super(BetterEnvironmentStub, self).reset_db(default_data)
else:
from trac import db_default
from trac.db_default import schema
from trac.db.sqlite_backend import _to_sql
# our 'destroy_db'
db = self.get_db_cnx()
@with_transaction(env_for_transaction, db)
def implementation(db):
cursor = db.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
for table in tables:
cursor.execute("DROP TABLE %s" % table)
# part of sqlite_backend's init_db
for table in schema:
for stmt in _to_sql(table):
cursor.execute(stmt)
# part of reset_db
for table, cols, vals in db_default.get_data(db):
cursor.executemany("INSERT INTO %s (%s) VALUES (%s)"
% (table, ','.join(cols),
','.join(['%s' for c in cols])),
vals)
db.commit()
if env.tester.testcase.testtype != 'unittest':
try:
env._upgrade_environment()
env._setup_users_and_permissions()
except:
# it's possible that this has already happened
print "Warning: Exception on post-reset_db tasks"
return tables
开发者ID:djangsters,项目名称:agilo,代码行数:60,代码来源:test_env_helper.py
示例11: test_sqlite_windows_path
def test_sqlite_windows_path(self):
# In-memory database
os_name = os.name
try:
os.name = 'nt'
self.assertEqual(('sqlite', {'path': 'C:/project/db/trac.db'}),
_parse_db_str('sqlite:C|/project/db/trac.db'))
finally:
os.name = os_name
开发者ID:Stackato-Apps,项目名称:bloodhound,代码行数:9,代码来源:api.py
示例12: test_sqlite_windows_path
def test_sqlite_windows_path(self):
# In-memory database
os_name = os.name
try:
os.name = "nt"
self.assertEqual(
("sqlite", {"path": "C:/project/db/trac.db"}), _parse_db_str("sqlite:C|/project/db/trac.db")
)
finally:
os.name = os_name
开发者ID:dafrito,项目名称:trac-mirror,代码行数:10,代码来源:api.py
示例13: reset_db
def reset_db(self, default_data=None):
"""Remove all data from Trac tables, keeping the tables themselves.
:param default_data: after clean-up, initialize with default data
:return: True upon success
"""
from trac import db_default
if EnvironmentStub.dbenv:
db = self.get_db_cnx()
scheme, db_prop = _parse_db_str(self.dburi)
tables = []
db.rollback() # make sure there's no transaction in progress
try:
# check the database version
cursor = db.cursor()
cursor.execute("SELECT value FROM system "
"WHERE name='database_version'")
database_version = cursor.fetchone()
if database_version:
database_version = int(database_version[0])
if database_version == db_default.db_version:
# same version, simply clear the tables (faster)
m = sys.modules[__name__]
reset_fn = 'reset_%s_db' % scheme
if hasattr(m, reset_fn):
tables = getattr(m, reset_fn)(db, db_prop)
else:
# different version or version unknown, drop the tables
self.destroy_db(scheme, db_prop)
except:
db.rollback()
# tables are likely missing
if not tables:
del db
dm = DatabaseManager(EnvironmentStub.dbenv)
dm.init_db()
# we need to make sure the next get_db_cnx() will re-create
# a new connection aware of the new data model - see #8518.
dm.shutdown()
db = self.get_db_cnx()
cursor = db.cursor()
if default_data:
for table, cols, vals in db_default.get_data(db):
cursor.executemany("INSERT INTO %s (%s) VALUES (%s)"
% (table, ','.join(cols),
','.join(['%s' for c in cols])),
vals)
elif EnvironmentStub.dbenv:
cursor.execute("INSERT INTO system (name, value) "
"VALUES (%s, %s)",
('database_version', str(db_default.db_version)))
db.commit()
开发者ID:wiraqutra,项目名称:photrackjp,代码行数:54,代码来源:test.py
示例14: reset_db
def reset_db(self, default_data=None):
"""Remove all data from Trac tables, keeping the tables themselves.
:param default_data: after clean-up, initialize with default data
:return: True upon success
"""
from trac import db_default
scheme, db_prop = _parse_db_str(self.dburi)
tables = []
remove_sqlite_db = False
try:
with self.db_transaction as db:
db.rollback() # make sure there's no transaction in progress
# check the database version
database_version = self.get_version()
except Exception:
# "Database not found ...",
# "OperationalError: no such table: system" or the like
pass
else:
if database_version == db_default.db_version:
# same version, simply clear the tables (faster)
m = sys.modules[__name__]
reset_fn = 'reset_%s_db' % scheme
if hasattr(m, reset_fn):
tables = getattr(m, reset_fn)(self, db_prop)
else:
# different version or version unknown, drop the tables
remove_sqlite_db = True
self.destroy_db(scheme, db_prop)
if scheme == 'sqlite' and remove_sqlite_db:
path = db_prop['path']
if path != ':memory:':
if not os.path.isabs(path):
path = os.path.join(self.path, path)
self.global_databasemanager.shutdown()
os.remove(path)
if not tables:
self.global_databasemanager.init_db()
# we need to make sure the next get_db_cnx() will re-create
# a new connection aware of the new data model - see #8518.
if self.dburi != 'sqlite::memory:':
self.global_databasemanager.shutdown()
with self.db_transaction as db:
if default_data:
for table, cols, vals in db_default.get_data(db):
db.executemany("INSERT INTO %s (%s) VALUES (%s)"
% (table, ','.join(cols),
','.join(['%s'] * len(cols))), vals)
else:
db("INSERT INTO system (name, value) VALUES (%s, %s)",
('database_version', str(db_default.db_version)))
开发者ID:exocad,项目名称:exotrac,代码行数:54,代码来源:test.py
示例15: get_dburi
def get_dburi():
dburi = os.environ.get("TRAC_TEST_DB_URI")
if dburi:
scheme, db_prop = _parse_db_str(dburi)
# Assume the schema 'tractest' for Postgres
if scheme == "postgres" and not db_prop.get("params", {}).get("schema"):
if "?" in dburi:
dburi += "&schema=tractest"
else:
dburi += "?schema=tractest"
return dburi
return "sqlite::memory:"
开发者ID:dafrito,项目名称:trac-mirror,代码行数:12,代码来源:test.py
示例16: get_dburi
def get_dburi():
dburi = os.environ.get('TRAC_TEST_DB_URI')
if dburi:
scheme, db_prop = _parse_db_str(dburi)
# Assume the schema 'tractest' for Postgres
if scheme == 'postgres' and \
not db_prop.get('params', {}).get('schema'):
if '?' in dburi:
dburi += "&schema=tractest"
else:
dburi += "?schema=tractest"
return dburi
return 'sqlite::memory:'
开发者ID:thimalk,项目名称:bloodhound,代码行数:13,代码来源:test.py
示例17: get_dburi
def get_dburi():
dburi = os.environ.get('TRAC_TEST_DB_URI')
if dburi:
scheme, db_prop = _parse_db_str(dburi)
# Assume the schema 'tractest' for PostgreSQL
if scheme == 'postgres' and \
not db_prop.get('params', {}).get('schema'):
dburi += ('&' if '?' in dburi else '?') + 'schema=tractest'
elif scheme == 'sqlite' and db_prop['path'] != ':memory:' and \
not db_prop.get('params', {}).get('synchronous'):
# Speed-up tests with SQLite database
dburi += ('&' if '?' in dburi else '?') + 'synchronous=off'
return dburi
return 'sqlite::memory:'
开发者ID:exocad,项目名称:exotrac,代码行数:14,代码来源:test.py
示例18: destroy_db
def destroy_db(self, scheme=None, db_prop=None):
if not (scheme and db_prop):
scheme, db_prop = _parse_db_str(self.dburi)
try:
with self.db_transaction as db:
if scheme == 'postgres' and db.schema:
db('DROP SCHEMA %s CASCADE' % db.quote(db.schema))
elif scheme == 'mysql':
for table in db.get_table_names():
db("DROP TABLE IF EXISTS `%s`" % table)
except Exception:
# "TracError: Database not found...",
# psycopg2.ProgrammingError: schema "tractest" does not exist
pass
return False
开发者ID:exocad,项目名称:exotrac,代码行数:15,代码来源:test.py
示例19: destroy_db
def destroy_db(self, scheme=None, db_prop=None):
if not (scheme and db_prop):
scheme, db_prop = _parse_db_str(self.dburi)
try:
with self.db_transaction as db:
if scheme == 'postgres' and db.schema:
db('DROP SCHEMA "%s" CASCADE' % db.schema)
elif scheme == 'mysql':
dbname = os.path.basename(db_prop['path'])
for table in db("""
SELECT table_name FROM information_schema.tables
WHERE table_schema=%s""", (dbname,)):
db("DROP TABLE IF EXISTS `%s`" % table)
except Exception:
# "TracError: Database not found...",
# psycopg2.ProgrammingError: schema "tractest" does not exist
pass
return False
开发者ID:thimalk,项目名称:bloodhound,代码行数:18,代码来源:test.py
示例20: destroy_db
def destroy_db(self, scheme=None, db_prop=None):
if not (scheme and db_prop):
scheme, db_prop = _parse_db_str(self.dburi)
db = self.get_db_cnx(destroying=True)
cursor = db.cursor()
try:
if scheme == 'postgres' and db.schema:
cursor.execute('DROP SCHEMA "%s" CASCADE' % db.schema)
elif scheme == 'mysql':
dbname = os.path.basename(db_prop['path'])
cursor = db.cursor()
cursor.execute('SELECT table_name FROM '
' information_schema.tables '
'WHERE table_schema=%s', (dbname,))
tables = cursor.fetchall()
for t in tables:
cursor.execute('DROP TABLE IF EXISTS `%s`' % t)
db.commit()
except Exception:
db.rollback()
开发者ID:wiraqutra,项目名称:photrackjp,代码行数:21,代码来源:test.py
注:本文中的trac.db.api._parse_db_str函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论