本文整理汇总了Python中trac.versioncontrol.cache.CachedRepository类的典型用法代码示例。如果您正苦于以下问题:Python CachedRepository类的具体用法?Python CachedRepository怎么用?Python CachedRepository使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了CachedRepository类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_initial_sync
def test_initial_sync(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
repos = self.get_repos(get_changeset=lambda x: changesets[int(x)],
youngest_rev=1)
changes = [('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
('trunk/README', Node.FILE, Changeset.ADD, None, None)]
changesets = [Mock(Changeset, repos, 0, '', '', t1,
get_changes=lambda: []),
Mock(Changeset, repos, 1, 'Import', 'joe', t2,
get_changes=lambda: iter(changes))]
cache = CachedRepository(self.env, repos, self.log)
cache.sync()
cursor = self.db.cursor()
cursor.execute("SELECT rev,time,author,message FROM revision")
self.assertEquals(('0', to_utimestamp(t1), '', ''),
cursor.fetchone())
self.assertEquals(('1', to_utimestamp(t2), 'joe', 'Import'),
cursor.fetchone())
self.assertEquals(None, cursor.fetchone())
cursor.execute("""
SELECT rev,path,node_type,change_type,base_path,base_rev
FROM node_change
""")
self.assertEquals(('1', 'trunk', 'D', 'A', None, None),
cursor.fetchone())
self.assertEquals(('1', 'trunk/README', 'F', 'A', None, None),
cursor.fetchone())
self.assertEquals(None, cursor.fetchone())
开发者ID:wiraqutra,项目名称:photrackjp,代码行数:30,代码来源:cache.py
示例2: sync
def sync(self):
youngest_stored = self.repos.get_youngest_rev_in_cache(self.db)
if youngest_stored != str(self.repos.youngest_rev):
# Need to cache all information for changes since the last
# sync operation.
if youngest_stored is None:
youngest_stored = '0'
# Obtain a list of changes since the last cache sync
from p4trac.repos import _P4ChangesOutputConsumer
output = _P4ChangesOutputConsumer(self.repos._repos)
self.repos._connection.run('changes', '-l', '-s', 'submitted',
'@>%s' % youngest_stored,
output=output)
if output.errors:
from p4trac.repos import PerforceError
raise PerforceError(output.errors)
changes = output.changes
changes.reverse()
# Perform the precaching of the file history for files in these
# changes.
self.repos._repos.precacheFileHistoryForChanges(changes)
# Call on to the default implementation now that we've cached
# enough information to make it run a bit faster.
CachedRepository.sync(self)
开发者ID:nyuhuhuu,项目名称:trachacks,代码行数:30,代码来源:api.py
示例3: test_get_changes
def test_get_changes(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
cursor = self.db.cursor()
cursor.execute("INSERT INTO revision (rev,time,author,message) "
"VALUES (0,%s,'','')", (to_timestamp(t1),))
cursor.execute("INSERT INTO revision (rev,time,author,message) "
"VALUES (1,%s,'joe','Import')", (to_timestamp(t2),))
cursor.executemany("INSERT INTO node_change (rev,path,node_type,"
"change_type,base_path,base_rev) "
"VALUES ('1',%s,%s,%s,%s,%s)",
[('trunk', 'D', 'A', None, None),
('trunk/README', 'F', 'A', None, None)])
cursor.execute("UPDATE system SET value='1' WHERE name='youngest_rev'")
repos = Mock(Repository, 'test-repos', None, self.log,
get_changeset=lambda x: None,
get_youngest_rev=lambda: 1,
get_oldest_rev=lambda: 0,
next_rev=lambda x: None,
normalize_rev=lambda rev: rev)
cache = CachedRepository(self.db, repos, None, self.log)
self.assertEqual('1', cache.youngest_rev)
changeset = cache.get_changeset(1)
self.assertEqual('joe', changeset.author)
self.assertEqual('Import', changeset.message)
self.assertEqual(t2, changeset.date)
changes = changeset.get_changes()
self.assertEqual(('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
changes.next())
self.assertEqual(('trunk/README', Node.FILE, Changeset.ADD, None, None),
changes.next())
self.assertRaises(StopIteration, changes.next)
开发者ID:gdgkyoto,项目名称:kyoto-gtug,代码行数:33,代码来源:cache.py
示例4: test_initial_sync
def test_initial_sync(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
repos = self.get_repos(get_changeset=lambda x: changesets[int(x)], youngest_rev=1)
changes = [
("trunk", Node.DIRECTORY, Changeset.ADD, None, None),
("trunk/README", Node.FILE, Changeset.ADD, None, None),
]
changesets = [
Mock(Changeset, repos, 0, "", "", t1, get_changes=lambda: []),
Mock(Changeset, repos, 1, "Import", "joe", t2, get_changes=lambda: iter(changes)),
]
cache = CachedRepository(self.env, repos, self.log)
cache.sync()
with self.env.db_query as db:
rows = db("SELECT rev, time, author, message FROM revision")
self.assertEquals(len(rows), 2)
self.assertEquals(("0", to_utimestamp(t1), "", ""), rows[0])
self.assertEquals(("1", to_utimestamp(t2), "joe", "Import"), rows[1])
rows = db(
"""
SELECT rev, path, node_type, change_type, base_path, base_rev
FROM node_change"""
)
self.assertEquals(len(rows), 2)
self.assertEquals(("1", "trunk", "D", "A", None, None), rows[0])
self.assertEquals(("1", "trunk/README", "F", "A", None, None), rows[1])
开发者ID:rvelezc,项目名称:rafaelvelez.us-backup,代码行数:28,代码来源:cache.py
示例5: test_initial_sync
def test_initial_sync(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
changes = [('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
('trunk/README', Node.FILE, Changeset.ADD, None, None)]
changesets = [Mock(Changeset, 0, '', '', t1,
get_changes=lambda: []),
Mock(Changeset, 1, 'Import', 'joe', t2,
get_changes=lambda: iter(changes))]
repos = Mock(Repository, 'test-repos', None, self.log,
get_changeset=lambda x: changesets[int(x)],
get_oldest_rev=lambda: 0,
get_youngest_rev=lambda: 1,
normalize_rev=lambda x: x,
next_rev=lambda x: int(x) == 0 and 1 or None)
cache = CachedRepository(self.db, repos, None, self.log)
cache.sync()
cursor = self.db.cursor()
cursor.execute("SELECT rev,time,author,message FROM revision")
self.assertEquals(('0', to_timestamp(t1), '', ''), cursor.fetchone())
self.assertEquals(('1', to_timestamp(t2), 'joe', 'Import'), cursor.fetchone())
self.assertEquals(None, cursor.fetchone())
cursor.execute("SELECT rev,path,node_type,change_type,base_path,"
"base_rev FROM node_change")
self.assertEquals(('1', 'trunk', 'D', 'A', None, None),
cursor.fetchone())
self.assertEquals(('1', 'trunk/README', 'F', 'A', None, None),
cursor.fetchone())
self.assertEquals(None, cursor.fetchone())
开发者ID:gdgkyoto,项目名称:kyoto-gtug,代码行数:30,代码来源:cache.py
示例6: test_update_sync
def test_update_sync(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
t3 = datetime(2003, 1, 1, 1, 1, 1, 0, utc)
self.preset_cache(
(("0", to_utimestamp(t1), "", ""), []),
(
("1", to_utimestamp(t2), "joe", "Import"),
[("trunk", "D", "A", None, None), ("trunk/README", "F", "A", None, None)],
),
)
repos = self.get_repos(get_changeset=lambda x: changesets[int(x)], youngest_rev=2)
changes = [("trunk/README", Node.FILE, Changeset.EDIT, "trunk/README", 1)]
changesets = [
None,
Mock(Changeset, repos, 1, "", "", t2, get_changes=lambda: []),
Mock(Changeset, repos, 2, "Update", "joe", t3, get_changes=lambda: iter(changes)),
]
cache = CachedRepository(self.env, repos, self.log)
cache.sync()
with self.env.db_query as db:
self.assertEquals(
[(to_utimestamp(t3), "joe", "Update")], db("SELECT time, author, message FROM revision WHERE rev='2'")
)
self.assertEquals(
[("trunk/README", "F", "E", "trunk/README", "1")],
db(
"""SELECT path, node_type, change_type, base_path,
base_rev
FROM node_change WHERE rev='2'"""
),
)
开发者ID:rvelezc,项目名称:rafaelvelez.us-backup,代码行数:33,代码来源:cache.py
示例7: test_sync_changeset
def test_sync_changeset(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
self.preset_cache(
(('0', to_utimestamp(t1), '', ''), []),
(('1', to_utimestamp(t2), 'joe', 'Import'),
[('trunk', 'D', 'A', None, None),
('trunk/README', 'F', 'A', None, None)]),
)
repos = self.get_repos(get_changeset=lambda x: changesets[int(x)],
youngest_rev=1)
changes1 = [('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
('trunk/README', Node.FILE, Changeset.ADD, None, None)]
changesets = [
Mock(Changeset, repos, 0, '**empty**', 'joe', t1,
get_changes=lambda: []),
Mock(Changeset, repos, 1, 'Initial Import', 'joe', t2,
get_changes=lambda: iter(changes1)),
]
cache = CachedRepository(self.env, repos, self.log)
cache.sync_changeset(0)
cursor = self.db.cursor()
cursor.execute("SELECT time,author,message FROM revision ORDER BY rev")
self.assertEquals((to_utimestamp(t1), 'joe', '**empty**'),
cursor.fetchone())
self.assertEquals((to_utimestamp(t2), 'joe', 'Import'),
cursor.fetchone())
self.assertEquals(None, cursor.fetchone())
开发者ID:wiraqutra,项目名称:photrackjp,代码行数:29,代码来源:cache.py
示例8: test_sync_changeset
def test_sync_changeset(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
self.preset_cache(
(("0", to_utimestamp(t1), "", ""), []),
(
("1", to_utimestamp(t2), "joe", "Import"),
[("trunk", "D", "A", None, None), ("trunk/README", "F", "A", None, None)],
),
)
repos = self.get_repos(get_changeset=lambda x: changesets[int(x)], youngest_rev=1)
changes1 = [
("trunk", Node.DIRECTORY, Changeset.ADD, None, None),
("trunk/README", Node.FILE, Changeset.ADD, None, None),
]
changesets = [
Mock(Changeset, repos, 0, "**empty**", "joe", t1, get_changes=lambda: []),
Mock(Changeset, repos, 1, "Initial Import", "joe", t2, get_changes=lambda: iter(changes1)),
]
cache = CachedRepository(self.env, repos, self.log)
cache.sync_changeset(0)
rows = self.env.db_query("SELECT time, author, message FROM revision ORDER BY rev")
self.assertEquals(2, len(rows))
self.assertEquals((to_utimestamp(t1), "joe", "**empty**"), rows[0])
self.assertEquals((to_utimestamp(t2), "joe", "Import"), rows[1])
开发者ID:rvelezc,项目名称:rafaelvelez.us-backup,代码行数:26,代码来源:cache.py
示例9: test_update_sync
def test_update_sync(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
t3 = datetime(2003, 1, 1, 1, 1, 1, 0, utc)
self.preset_cache(
(('0', to_utimestamp(t1), '', ''), []),
(('1', to_utimestamp(t2), 'joe', 'Import'),
[('trunk', 'D', 'A', None, None),
('trunk/README', 'F', 'A', None, None)]),
)
repos = self.get_repos(get_changeset=lambda x: changesets[int(x)],
youngest_rev=2)
changes = [('trunk/README', Node.FILE, Changeset.EDIT, 'trunk/README',
1)]
changesets = [
None,
Mock(Changeset, repos, 1, '', '', t2, get_changes=lambda: []),
Mock(Changeset, repos, 2, 'Update', 'joe', t3,
get_changes=lambda: iter(changes))
]
cache = CachedRepository(self.env, repos, self.log)
cache.sync()
with self.env.db_query as db:
self.assertEquals([(to_utimestamp(t3), 'joe', 'Update')],
db("SELECT time, author, message FROM revision WHERE rev='2'"))
self.assertEquals([('trunk/README', 'F', 'E', 'trunk/README',
'1')],
db("""SELECT path, node_type, change_type, base_path,
base_rev
FROM node_change WHERE rev='2'"""))
开发者ID:thimalk,项目名称:bloodhound,代码行数:31,代码来源:cache.py
示例10: test_update_sync
def test_update_sync(self):
cursor = self.db.cursor()
cursor.execute("INSERT INTO revision (rev,time,author,message) "
"VALUES (0,41000,'','')")
cursor.execute("INSERT INTO revision (rev,time,author,message) "
"VALUES (1,42000,'joe','Import')")
cursor.executemany("INSERT INTO node_change (rev,path,node_type,"
"change_type,base_path,base_rev) "
"VALUES ('1',%s,%s,%s,%s,%s)",
[('trunk', 'D', 'A', None, None),
('trunk/README', 'F', 'A', None, None)])
changes = [('trunk/README', Node.FILE, Changeset.EDIT, 'trunk/README', 1)]
changeset = Mock(Changeset, 2, 'Update', 'joe', 42042,
get_changes=lambda: iter(changes))
repos = Mock(Repository, 'test-repos', None, self.log,
get_changeset=lambda x: changeset,
get_youngest_rev=lambda: 2,
next_rev=lambda x: int(x) == 1 and 2 or None)
cache = CachedRepository(self.db, repos, None, self.log)
cache.sync()
cursor = self.db.cursor()
cursor.execute("SELECT time,author,message FROM revision WHERE rev='2'")
self.assertEquals((42042, 'joe', 'Update'), cursor.fetchone())
self.assertEquals(None, cursor.fetchone())
cursor.execute("SELECT path,node_type,change_type,base_path,base_rev "
"FROM node_change WHERE rev='2'")
self.assertEquals(('trunk/README', 'F', 'E', 'trunk/README', '1'),
cursor.fetchone())
self.assertEquals(None, cursor.fetchone())
开发者ID:nyuhuhuu,项目名称:trachacks,代码行数:31,代码来源:cache.py
示例11: test_get_changes
def test_get_changes(self):
cursor = self.db.cursor()
cursor.execute("INSERT INTO revision (rev,time,author,message) "
"VALUES (0,41000,'','')")
cursor.execute("INSERT INTO revision (rev,time,author,message) "
"VALUES (1,42000,'joe','Import')")
cursor.executemany("INSERT INTO node_change (rev,path,node_type,"
"change_type,base_path,base_rev) "
"VALUES ('1',%s,%s,%s,%s,%s)",
[('trunk', 'D', 'A', None, None),
('trunk/README', 'F', 'A', None, None)])
repos = Mock(Repository, 'test-repos', None, self.log,
get_changeset=lambda x: None,
get_youngest_rev=lambda: 1,
next_rev=lambda x: None, normalize_rev=lambda rev: rev)
cache = CachedRepository(self.db, repos, None, self.log)
self.assertEqual('1', cache.youngest_rev)
changeset = cache.get_changeset(1)
self.assertEqual('joe', changeset.author)
self.assertEqual('Import', changeset.message)
self.assertEqual(42000, changeset.date)
changes = changeset.get_changes()
self.assertEqual(('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
changes.next())
self.assertEqual(('trunk/README', Node.FILE, Changeset.ADD, None, None),
changes.next())
self.assertRaises(StopIteration, changes.next)
开发者ID:nyuhuhuu,项目名称:trachacks,代码行数:28,代码来源:cache.py
示例12: test_initial_sync_with_empty_repos
def test_initial_sync_with_empty_repos(self):
repos = self.get_repos()
cache = CachedRepository(self.env, repos, self.log)
cache.sync()
with self.env.db_query as db:
self.assertEquals([], db("SELECT rev, time, author, message FROM revision"))
self.assertEquals(0, db("SELECT COUNT(*) FROM node_change")[0][0])
开发者ID:rvelezc,项目名称:rafaelvelez.us-backup,代码行数:8,代码来源:cache.py
示例13: test_sync_changeset_if_not_exists
def test_sync_changeset_if_not_exists(self):
t = [
datetime(2001, 1, 1, 1, 1, 1, 0, utc), # r0
datetime(2002, 1, 1, 1, 1, 1, 0, utc), # r1
datetime(2003, 1, 1, 1, 1, 1, 0, utc), # r2
datetime(2004, 1, 1, 1, 1, 1, 0, utc), # r3
]
self.preset_cache(
(("0", to_utimestamp(t[0]), "joe", "**empty**"), []),
(
("1", to_utimestamp(t[1]), "joe", "Import"),
[("trunk", "D", "A", None, None), ("trunk/README", "F", "A", None, None)],
),
# not exists r2
(("3", to_utimestamp(t[3]), "joe", "Add COPYING"), [("trunk/COPYING", "F", "A", None, None)]),
)
repos = self.get_repos(get_changeset=lambda x: changesets[int(x)], youngest_rev=3)
changes = [
None, # r0
[
("trunk", Node.DIRECTORY, Changeset.ADD, None, None), # r1
("trunk/README", Node.FILE, Changeset.ADD, None, None),
],
[
("branches", Node.DIRECTORY, Changeset.ADD, None, None), # r2
("tags", Node.DIRECTORY, Changeset.ADD, None, None),
],
[("trunk/COPYING", Node.FILE, Changeset.ADD, None, None)], # r3
]
changesets = [
Mock(Changeset, repos, 0, "**empty**", "joe", t[0], get_changes=lambda: []),
Mock(Changeset, repos, 1, "Initial Import", "joe", t[1], get_changes=lambda: iter(changes[1])),
Mock(Changeset, repos, 2, "Created directories", "john", t[2], get_changes=lambda: iter(changes[2])),
Mock(Changeset, repos, 3, "Add COPYING", "joe", t[3], get_changes=lambda: iter(changes[3])),
]
cache = CachedRepository(self.env, repos, self.log)
self.assertRaises(NoSuchChangeset, cache.get_changeset, 2)
cache.sync()
self.assertRaises(NoSuchChangeset, cache.get_changeset, 2)
self.assertEqual(None, cache.sync_changeset(2))
cset = cache.get_changeset(2)
self.assertEqual("john", cset.author)
self.assertEqual("Created directories", cset.message)
self.assertEqual(t[2], cset.date)
cset_changes = cset.get_changes()
self.assertEqual(("branches", Node.DIRECTORY, Changeset.ADD, None, None), cset_changes.next())
self.assertEqual(("tags", Node.DIRECTORY, Changeset.ADD, None, None), cset_changes.next())
self.assertRaises(StopIteration, cset_changes.next)
rows = self.env.db_query("SELECT time,author,message FROM revision ORDER BY rev")
self.assertEquals(4, len(rows))
self.assertEquals((to_utimestamp(t[0]), "joe", "**empty**"), rows[0])
self.assertEquals((to_utimestamp(t[1]), "joe", "Import"), rows[1])
self.assertEquals((to_utimestamp(t[2]), "john", "Created directories"), rows[2])
self.assertEquals((to_utimestamp(t[3]), "joe", "Add COPYING"), rows[3])
开发者ID:rvelezc,项目名称:rafaelvelez.us-backup,代码行数:56,代码来源:cache.py
示例14: test_initial_sync_with_empty_repos
def test_initial_sync_with_empty_repos(self):
repos = self.get_repos()
cache = CachedRepository(self.env, repos, self.log)
cache.sync()
cursor = self.db.cursor()
cursor.execute("SELECT rev,time,author,message FROM revision")
self.assertEquals(None, cursor.fetchone())
cursor.execute("SELECT COUNT(*) FROM node_change")
self.assertEquals(0, cursor.fetchone()[0])
开发者ID:wiraqutra,项目名称:photrackjp,代码行数:10,代码来源:cache.py
示例15: test_clean_sync
def test_clean_sync(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
t3 = datetime(2003, 1, 1, 1, 1, 1, 0, utc)
self.preset_cache(
(('0', to_utimestamp(t1), '', ''), []),
(('1', to_utimestamp(t2), 'joe', 'Import'),
[('trunk', 'D', 'A', None, None),
('trunk/README', 'F', 'A', None, None)]),
)
repos = self.get_repos(get_changeset=lambda x: changesets[int(x)],
youngest_rev=2)
changes1 = [('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
('trunk/README', Node.FILE, Changeset.ADD, None, None)]
changes2 = [('trunk/README', Node.FILE, Changeset.EDIT, 'trunk/README',
1)]
changesets = [
Mock(Changeset, repos, 0, '**empty**', 'joe', t1,
get_changes=lambda: []),
Mock(Changeset, repos, 1, 'Initial Import', 'joe', t2,
get_changes=lambda: iter(changes1)),
Mock(Changeset, repos, 2, 'Update', 'joe', t3,
get_changes=lambda: iter(changes2))
]
cache = CachedRepository(self.env, repos, self.log)
cache.sync(clean=True)
cursor = self.db.cursor()
cursor.execute("SELECT time,author,message FROM revision")
self.assertEquals((to_utimestamp(t1), 'joe', '**empty**'),
cursor.fetchone())
self.assertEquals((to_utimestamp(t2), 'joe', 'Initial Import'),
cursor.fetchone())
self.assertEquals((to_utimestamp(t3), 'joe', 'Update'),
cursor.fetchone())
self.assertEquals(None, cursor.fetchone())
cursor.execute("""
SELECT rev,path,node_type,change_type,base_path,base_rev
FROM node_change ORDER BY rev
""")
self.assertEquals(('1', 'trunk', 'D', 'A', None, None),
cursor.fetchone())
self.assertEquals(('1', 'trunk/README', 'F', 'A', None, None),
cursor.fetchone())
self.assertEquals(('2', 'trunk/README', 'F', 'E',
'trunk/README', '1'),
cursor.fetchone())
self.assertEquals(None, cursor.fetchone())
开发者ID:wiraqutra,项目名称:photrackjp,代码行数:48,代码来源:cache.py
示例16: test_initial_sync_with_empty_repos
def test_initial_sync_with_empty_repos(self):
changeset = Mock(Changeset, 0, '', '', 42000,
get_changes=lambda: [])
repos = Mock(Repository, 'test-repos', None, self.log,
get_changeset=lambda x: changeset,
get_oldest_rev=lambda: 0,
get_youngest_rev=lambda: 0,
normalize_rev=lambda x: x,
next_rev=lambda x: None)
cache = CachedRepository(self.db, repos, None, self.log)
cache.sync()
cursor = self.db.cursor()
cursor.execute("SELECT rev,time,author,message FROM revision")
self.assertEquals(('0', 42000, '', ''), cursor.fetchone())
cursor.execute("SELECT COUNT(*) FROM node_change")
self.assertEquals(0, cursor.fetchone()[0])
开发者ID:nyuhuhuu,项目名称:trachacks,代码行数:17,代码来源:cache.py
示例17: test_initial_sync_with_empty_repos
def test_initial_sync_with_empty_repos(self):
def no_changeset(rev):
raise NoSuchChangeset(rev)
repos = Mock(Repository, 'test-repos', None, self.log,
get_changeset=no_changeset,
get_oldest_rev=lambda: 1,
get_youngest_rev=lambda: 0,
normalize_rev=no_changeset,
next_rev=lambda x: None)
cache = CachedRepository(self.db, repos, None, self.log)
cache.sync()
cursor = self.db.cursor()
cursor.execute("SELECT rev,time,author,message FROM revision")
self.assertEquals(None, cursor.fetchone())
cursor.execute("SELECT COUNT(*) FROM node_change")
self.assertEquals(0, cursor.fetchone()[0])
开发者ID:cyphactor,项目名称:lifecyclemanager,代码行数:18,代码来源:cache.py
示例18: test_clean_sync
def test_clean_sync(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
t3 = datetime(2003, 1, 1, 1, 1, 1, 0, utc)
self.preset_cache(
(("0", to_utimestamp(t1), "", ""), []),
(
("1", to_utimestamp(t2), "joe", "Import"),
[("trunk", "D", "A", None, None), ("trunk/README", "F", "A", None, None)],
),
)
repos = self.get_repos(get_changeset=lambda x: changesets[int(x)], youngest_rev=2)
changes1 = [
("trunk", Node.DIRECTORY, Changeset.ADD, None, None),
("trunk/README", Node.FILE, Changeset.ADD, None, None),
]
changes2 = [("trunk/README", Node.FILE, Changeset.EDIT, "trunk/README", 1)]
changesets = [
Mock(Changeset, repos, 0, "**empty**", "joe", t1, get_changes=lambda: []),
Mock(Changeset, repos, 1, "Initial Import", "joe", t2, get_changes=lambda: iter(changes1)),
Mock(Changeset, repos, 2, "Update", "joe", t3, get_changes=lambda: iter(changes2)),
]
cache = CachedRepository(self.env, repos, self.log)
cache.sync(clean=True)
rows = self.env.db_query(
"""
SELECT time, author, message FROM revision ORDER BY rev
"""
)
self.assertEquals(3, len(rows))
self.assertEquals((to_utimestamp(t1), "joe", "**empty**"), rows[0])
self.assertEquals((to_utimestamp(t2), "joe", "Initial Import"), rows[1])
self.assertEquals((to_utimestamp(t3), "joe", "Update"), rows[2])
rows = self.env.db_query(
"""
SELECT rev, path, node_type, change_type, base_path, base_rev
FROM node_change ORDER BY rev, path"""
)
self.assertEquals(3, len(rows))
self.assertEquals(("1", "trunk", "D", "A", None, None), rows[0])
self.assertEquals(("1", "trunk/README", "F", "A", None, None), rows[1])
self.assertEquals(("2", "trunk/README", "F", "E", "trunk/README", "1"), rows[2])
开发者ID:rvelezc,项目名称:rafaelvelez.us-backup,代码行数:44,代码来源:cache.py
示例19: get_repository
def get_repository(self, type, dir, authname):
"""Return a `SubversionRepository`.
The repository is wrapped in a `CachedRepository`, unless `type` is
'direct-svnfs'.
"""
if not self._version:
self._version = self._get_version()
self.env.systeminfo.append(("Subversion", self._version))
fs_repos = SubversionRepository(dir, None, self.log, {"tags": self.tags, "branches": self.branches})
if type == "direct-svnfs":
repos = fs_repos
else:
repos = CachedRepository(self.env.get_db_cnx, fs_repos, None, self.log)
repos.has_linear_changesets = True
if authname:
authz = SubversionAuthorizer(self.env, weakref.proxy(repos), authname)
repos.authz = fs_repos.authz = authz
return repos
开发者ID:gdgkyoto,项目名称:kyoto-gtug,代码行数:19,代码来源:svn_fs.py
示例20: test_get_changes
def test_get_changes(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
self.preset_cache(
(("0", to_utimestamp(t1), "", ""), []),
(
("1", to_utimestamp(t2), "joe", "Import"),
[("trunk", "D", "A", None, None), ("trunk/RDME", "F", "A", None, None)],
),
)
repos = self.get_repos()
cache = CachedRepository(self.env, repos, self.log)
self.assertEqual("1", cache.youngest_rev)
changeset = cache.get_changeset(1)
self.assertEqual("joe", changeset.author)
self.assertEqual("Import", changeset.message)
self.assertEqual(t2, changeset.date)
changes = changeset.get_changes()
self.assertEqual(("trunk", Node.DIRECTORY, Changeset.ADD, None, None), changes.next())
self.assertEqual(("trunk/RDME", Node.FILE, Changeset.ADD, None, None), changes.next())
self.assertRaises(StopIteration, changes.next)
开发者ID:rvelezc,项目名称:rafaelvelez.us-backup,代码行数:21,代码来源:cache.py
注:本文中的trac.versioncontrol.cache.CachedRepository类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论