def test_deletions(self):
"""Test rolling back 'd' operations"""
self.opman.start()
# Insert a document, wait till it replicates to secondary
self.main_conn["test"]["mc"].insert_one({"i": 0})
self.main_conn["test"]["mc"].insert_one({"i": 1})
self.assertEqual(self.primary_conn["test"]["mc"].find().count(), 2)
assert_soon(
lambda: self.secondary_conn["test"]["mc"].count() == 2,
"first write didn't replicate to secondary",
)
# Kill the primary, wait for secondary to be promoted
self.repl_set.primary.stop(destroy=False)
assert_soon(
lambda: self.secondary_conn["admin"].command("isMaster")["ismaster"]
)
# Delete first document
retry_until_ok(self.main_conn["test"]["mc"].delete_one, {"i": 0})
self.assertEqual(self.secondary_conn["test"]["mc"].count(), 1)
# Wait for replication to doc manager
assert_soon(
lambda: len(self.opman.doc_managers[0]._search()) == 1,
"delete was not replicated to doc manager",
)
# Kill the new primary
self.repl_set.secondary.stop(destroy=False)
# Start both servers back up
self.repl_set.primary.start()
primary_admin = self.primary_conn["admin"]
assert_soon(
lambda: primary_admin.command("isMaster")["ismaster"],
"restarted primary never resumed primary status",
)
self.repl_set.secondary.start()
assert_soon(
lambda: retry_until_ok(
self.secondary_conn.admin.command, "replSetGetStatus"
)["myState"]
== 2,
"restarted secondary never resumed secondary status",
)
# Both documents should exist in mongo
assert_soon(lambda: retry_until_ok(self.main_conn["test"]["mc"].count) == 2)
# Both document should exist in doc manager
doc_manager = self.opman.doc_managers[0]
assert_soon(
lambda: len(list(doc_manager._search())) == 2,
("Expected two documents, but got: %r" % list(doc_manager._search())),
)
self.opman.join()
def test_stressed_rollback(self):
"""Test stressed rollback with number of documents equal to specified
in global variable. Rollback is performed like before, but with more
documents.
"""
while len(self.synchronizer._search()) != 0:
time.sleep(1)
for i in range(0, NUMBER_OF_DOC_DIRS):
self.conn['test']['test'].insert(
{'name': 'Paul ' + str(i)}, safe=True)
while len(self.synchronizer._search()) != NUMBER_OF_DOC_DIRS:
time.sleep(1)
primary_conn = Connection(HOSTNAME, int(PORTS_ONE['PRIMARY']))
kill_mongo_proc(HOSTNAME, PORTS_ONE['PRIMARY'])
new_primary_conn = Connection(HOSTNAME, int(PORTS_ONE['SECONDARY']))
admin_db = new_primary_conn['admin']
while admin_db.command("isMaster")['ismaster'] is False:
time.sleep(1)
time.sleep(5)
count = -1
while count + 1 < NUMBER_OF_DOC_DIRS:
try:
count += 1
self.conn['test']['test'].insert({'name': 'Pauline '
+ str(count)}, safe=True)
except (OperationFailure, AutoReconnect):
time.sleep(1)
while (len(self.synchronizer._search())
!= self.conn['test']['test'].find().count()):
time.sleep(1)
result_set_1 = self.synchronizer._search()
i = 0
for item in result_set_1:
if 'Pauline' in item['name']:
result_set_2 = self.conn['test']['test'].find_one(
{'name': item['name']})
self.assertEqual(item['_id'], result_set_2['_id'])
kill_mongo_proc(HOSTNAME, PORTS_ONE['SECONDARY'])
start_mongo_proc(PORTS_ONE['PRIMARY'], "demo-repl", "/replset1a",
"/replset1a.log", None)
while primary_conn['admin'].command("isMaster")['ismaster'] is False:
time.sleep(1)
start_mongo_proc(PORTS_ONE['SECONDARY'], "demo-repl", "/replset1b",
"/replset1b.log", None)
while (len(self.synchronizer._search()) != NUMBER_OF_DOC_DIRS):
time.sleep(5)
result_set_1 = self.synchronizer._search()
self.assertEqual(len(result_set_1), NUMBER_OF_DOC_DIRS)
for item in result_set_1:
self.assertTrue('Paul' in item['name'])
find_cursor = retry_until_ok(self.conn['test']['test'].find)
self.assertEqual(retry_until_ok(find_cursor.count), NUMBER_OF_DOC_DIRS)
def init_cursor(self):
"""Position the cursor appropriately.
The cursor is set to either the beginning of the oplog, or
wherever it was last left off.
Returns the cursor and the number of documents left in the cursor.
"""
timestamp = self.checkpoint
if self.checkpoint is None:
if self.collection_dump:
# dump collection and update checkpoint
timestamp = self.dump_collection()
if timestamp is None:
return None, 0
else:
# Collection dump disabled:
# return cursor to beginning of oplog.
cursor = self.get_oplog_cursor()
self.checkpoint = self.get_last_oplog_timestamp()
return cursor, retry_until_ok(cursor.count)
for i in range(60):
cursor = self.get_oplog_cursor(timestamp)
cursor_len = retry_until_ok(cursor.count)
if cursor_len == 0:
# rollback, update checkpoint, and retry
logging.debug("OplogThread: Initiating rollback from "
"get_oplog_cursor")
self.checkpoint = self.rollback()
return self.init_cursor()
# try to get the first oplog entry
try:
first_oplog_entry = next(cursor)
except StopIteration:
# It's possible for the cursor to become invalid
# between the cursor.count() call and now
time.sleep(1)
continue
# first entry should be last oplog entry processed
cursor_ts_long = util.bson_ts_to_long(
first_oplog_entry.get("ts"))
given_ts_long = util.bson_ts_to_long(timestamp)
if cursor_ts_long > given_ts_long:
# first entry in oplog is beyond timestamp
# we've fallen behind
return None, 0
# first entry has been consumed
return cursor, cursor_len - 1
else:
raise errors.MongoConnectorError(
"Could not initialize oplog cursor.")
def test_single_target(self):
"""Test with a single replication target"""
self.opman.start()
# Insert first document with primary up
self.main_conn["test"]["mc"].insert({"i": 0})
self.assertEqual(self.primary_conn["test"]["mc"].find().count(), 1)
# Make sure the insert is replicated
secondary = self.secondary_conn
assert_soon(lambda: secondary["test"]["mc"].count() == 1, "first write didn't replicate to secondary")
# Kill the primary
self.repl_set.primary.stop(destroy=False)
# Wait for the secondary to be promoted
assert_soon(lambda: secondary["admin"].command("isMaster")["ismaster"])
# Insert another document. This will be rolled back later
retry_until_ok(self.main_conn["test"]["mc"].insert, {"i": 1})
self.assertEqual(secondary["test"]["mc"].count(), 2)
# Wait for replication to doc manager
assert_soon(
lambda: len(self.opman.doc_managers[0]._search()) == 2, "not all writes were replicated to doc manager"
)
# Kill the new primary
self.repl_set.secondary.stop(destroy=False)
# Start both servers back up
self.repl_set.primary.start()
primary_admin = self.primary_conn["admin"]
assert_soon(
lambda: primary_admin.command("isMaster")["ismaster"], "restarted primary never resumed primary status"
)
self.repl_set.secondary.start()
assert_soon(
lambda: retry_until_ok(secondary.admin.command, "replSetGetStatus")["myState"] == 2,
"restarted secondary never resumed secondary status",
)
assert_soon(
lambda: retry_until_ok(self.main_conn.test.mc.find().count) > 0,
"documents not found after primary/secondary restarted",
)
# Only first document should exist in MongoDB
self.assertEqual(self.main_conn["test"]["mc"].count(), 1)
self.assertEqual(self.main_conn["test"]["mc"].find_one()["i"], 0)
# Same case should hold for the doc manager
doc_manager = self.opman.doc_managers[0]
assert_soon(lambda: len(doc_manager._search()) == 1, "documents never rolled back in doc manager.")
self.assertEqual(doc_manager._search()[0]["i"], 0)
# cleanup
self.opman.join()
def test_rollback(self):
"""Test behavior during a MongoDB rollback.
We force a rollback by adding a doc, killing the primary,
adding another doc, killing the new primary, and then
restarting both.
"""
primary_conn = self.repl_set.primary.client()
# This doc can be picked up in the collection dump
self.conn['test']['test'].insert_one({'name': 'paul'})
condition1 = lambda: self.conn['test']['test'].find(
{'name': 'paul'}).count() == 1
condition2 = lambda: self._count() == 1
assert_soon(condition1)
assert_soon(condition2)
# This doc is definitely not picked up by collection dump
self.conn['test']['test'].insert_one({'name': 'pauly'})
self.repl_set.primary.stop(destroy=False)
new_primary_conn = self.repl_set.secondary.client()
admin = new_primary_conn['admin']
assert_soon(lambda: admin.command("isMaster")['ismaster'])
time.sleep(5)
retry_until_ok(self.conn.test.test.insert_one,
{'name': 'pauline'})
assert_soon(lambda: self._count() == 3)
result_set_1 = list(self._search())
result_set_2 = self.conn['test']['test'].find_one({'name': 'pauline'})
self.assertEqual(len(result_set_1), 3)
#make sure pauline is there
for item in result_set_1:
if item['name'] == 'pauline':
self.assertEqual(item['_id'], str(result_set_2['_id']))
self.repl_set.secondary.stop(destroy=False)
self.repl_set.primary.start()
while primary_conn['admin'].command("isMaster")['ismaster'] is False:
time.sleep(1)
self.repl_set.secondary.start()
time.sleep(2)
result_set_1 = list(self._search())
self.assertEqual(len(result_set_1), 2)
if result_set_1[0]['name'] == 'paul':
self.assertEqual(result_set_1[1]['name'], 'pauly')
elif result_set_1[0]['name'] == 'pauly':
self.assertEqual(result_set_1[1]['name'], 'paul')
else:
self.assertTrue(0, 'Unknown document retrieved')
find_cursor = retry_until_ok(self.conn['test']['test'].find)
self.assertEqual(retry_until_ok(find_cursor.count), 2)
def test_single_target(self):
"""Test with a single replication target"""
self.opman.start()
# Insert first document with primary up
self.main_conn["test"]["mc"].insert({"i": 0})
self.assertEqual(self.primary_conn["test"]["mc"].find().count(), 1)
# Make sure the insert is replicated
secondary = self.secondary_conn
self.assertTrue(wait_for(lambda: secondary["test"]["mc"].count() == 1),
"first write didn't replicate to secondary")
# Kill the primary
kill_mongo_proc(self.primary_p, destroy=False)
# Wait for the secondary to be promoted
while not secondary["admin"].command("isMaster")["ismaster"]:
time.sleep(1)
# Insert another document. This will be rolled back later
retry_until_ok(self.main_conn["test"]["mc"].insert, {"i": 1})
self.assertEqual(secondary["test"]["mc"].count(), 2)
# Wait for replication to doc manager
c = lambda: len(self.opman.doc_managers[0]._search()) == 2
self.assertTrue(wait_for(c),
"not all writes were replicated to doc manager")
# Kill the new primary
kill_mongo_proc(self.secondary_p, destroy=False)
# Start both servers back up
restart_mongo_proc(self.primary_p)
primary_admin = self.primary_conn["admin"]
while not primary_admin.command("isMaster")["ismaster"]:
time.sleep(1)
restart_mongo_proc(self.secondary_p)
while secondary["admin"].command("replSetGetStatus")["myState"] != 2:
time.sleep(1)
while retry_until_ok(self.main_conn["test"]["mc"].find().count) == 0:
time.sleep(1)
# Only first document should exist in MongoDB
self.assertEqual(self.main_conn["test"]["mc"].count(), 1)
self.assertEqual(self.main_conn["test"]["mc"].find_one()["i"], 0)
# Same case should hold for the doc manager
doc_manager = self.opman.doc_managers[0]
self.assertEqual(len(doc_manager._search()), 1)
self.assertEqual(doc_manager._search()[0]["i"], 0)
# cleanup
self.opman.join()
def dump_collection(self):
"""Dumps collection into the target system.
This method is called when we're initializing the cursor and have no
configs i.e. when we're starting for the first time.
"""
dump_set = self.namespace_set
#no namespaces specified
if not self.namespace_set:
db_list = self.main_connection.database_names()
for database in db_list:
if database == "config" or database == "local":
continue
coll_list = self.main_connection[database].collection_names()
for coll in coll_list:
if coll.startswith("system"):
continue
namespace = str(database) + "." + str(coll)
dump_set.append(namespace)
timestamp = util.retry_until_ok(self.get_last_oplog_timestamp)
if timestamp is None:
return None
for namespace in dump_set:
database, coll = namespace.split('.', 1)
target_coll = self.main_connection[database][coll]
cursor = util.retry_until_ok(target_coll.find)
long_ts = util.bson_ts_to_long(timestamp)
try:
for doc in cursor:
# Could spend a long time in this loop
if not self.running:
# Return None so we don't save our progress
return None
doc['ns'] = namespace
doc['_ts'] = long_ts
try:
self.doc_manager.upsert(doc)
except errors.OperationFailed:
logging.error("Unable to insert %s" % (doc))
except (pymongo.errors.AutoReconnect,
pymongo.errors.OperationFailure):
err_msg = "OplogManager: Failed during dump collection"
effect = "cannot recover!"
logging.error('%s %s %s' % (err_msg, effect, self.oplog))
self.running = False
return
return timestamp
请发表评论