本文整理汇总了C++中ON_BLOCK_EXIT函数的典型用法代码示例。如果您正苦于以下问题:C++ ON_BLOCK_EXIT函数的具体用法?C++ ON_BLOCK_EXIT怎么用?C++ ON_BLOCK_EXIT使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了ON_BLOCK_EXIT函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的C++代码示例。
示例1: BIO_new
bool SSLManager::_setSubjectName(const std::string& keyFile, std::string& subjectName) {
// Read the certificate subject name and store it
BIO *in = BIO_new(BIO_s_file_internal());
if (NULL == in){
error() << "failed to allocate BIO object: " <<
getSSLErrorMessage(ERR_get_error()) << endl;
return false;
}
ON_BLOCK_EXIT(BIO_free, in);
if (BIO_read_filename(in, keyFile.c_str()) <= 0){
error() << "cannot read key file when setting subject name: " << keyFile << ' ' <<
getSSLErrorMessage(ERR_get_error()) << endl;
return false;
}
X509* x509 = PEM_read_bio_X509(in, NULL, &SSLManager::password_cb, this);
if (NULL == x509) {
error() << "cannot retreive certificate from keyfile: " << keyFile << ' ' <<
getSSLErrorMessage(ERR_get_error()) << endl;
return false;
}
ON_BLOCK_EXIT(X509_free, x509);
subjectName = getCertificateSubjectName(x509);
return true;
}
开发者ID:ChowZenki,项目名称:mongo,代码行数:27,代码来源:ssl_manager.cpp
示例2: ImpersonateCurrentUser
bool ImpersonateCurrentUser() {
SetLastError(0);
HANDLE process=0;
HANDLE Token=NULL;
GetCurrentUserToken(process, Token);
// Auto close process and token when leaving scope
ON_BLOCK_EXIT(CloseHandle, process);
ON_BLOCK_EXIT(CloseHandle, Token);
bool test=(FALSE != ImpersonateLoggedOnUser(Token));
return test;
}
开发者ID:00farts,项目名称:italc-1,代码行数:15,代码来源:security.cpp
示例3: verify
void DBClientCursor::requestMore() {
verify(cursorId && batch.pos == batch.nReturned);
if (haveLimit) {
nToReturn -= batch.nReturned;
verify(nToReturn > 0);
}
BufBuilder b;
b.appendNum(opts);
b.appendStr(ns);
b.appendNum(nextBatchSize());
b.appendNum(cursorId);
Message toSend;
toSend.setData(dbGetMore, b.buf(), b.len());
Message response;
if (_client) {
_client->call(toSend, response);
this->batch.m = std::move(response);
dataReceived();
} else {
verify(_scopedHost.size());
ScopedDbConnection conn(_scopedHost);
conn->call(toSend, response);
_client = conn.get();
ON_BLOCK_EXIT([this] { _client = nullptr; });
this->batch.m = std::move(response);
dataReceived();
conn.done();
}
}
开发者ID:mihail812,项目名称:mongo,代码行数:32,代码来源:dbclientcursor.cpp
示例4: ON_BLOCK_EXIT
void BackgroundSync::producerThread() {
Client::initThread("rsBackgroundSync");
AuthorizationSession::get(cc())->grantInternalAuthorization();
_threadPoolTaskExecutor.startup();
ON_BLOCK_EXIT([this]() {
_threadPoolTaskExecutor.shutdown();
_threadPoolTaskExecutor.join();
});
while (!inShutdown()) {
try {
_producerThread();
} catch (const DBException& e) {
std::string msg(str::stream() << "sync producer problem: " << e.toString());
error() << msg;
_replCoord->setMyHeartbeatMessage(msg);
sleepmillis(100); // sleep a bit to keep from hammering this thread with temp. errors.
} catch (const std::exception& e2) {
severe() << "sync producer exception: " << e2.what();
fassertFailed(28546);
}
}
stop();
}
开发者ID:AnkyrinRepeat,项目名称:mongo,代码行数:25,代码来源:bgsync.cpp
示例5: ON_BLOCK_EXIT
Shard::HostWithResponse ShardLocal::_runCommand(OperationContext* txn,
const ReadPreferenceSetting& unused,
const std::string& dbName,
Milliseconds maxTimeMSOverrideUnused,
const BSONObj& cmdObj) {
repl::OpTime currentOpTimeFromClient =
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
ON_BLOCK_EXIT([this, &txn, ¤tOpTimeFromClient] {
_updateLastOpTimeFromClient(txn, currentOpTimeFromClient);
});
try {
DBDirectClient client(txn);
rpc::UniqueReply commandResponse = client.runCommandWithMetadata(
dbName, cmdObj.firstElementFieldName(), rpc::makeEmptyMetadata(), cmdObj);
BSONObj responseReply = commandResponse->getCommandReply().getOwned();
BSONObj responseMetadata = commandResponse->getMetadata().getOwned();
Status commandStatus = getStatusFromCommandResult(responseReply);
Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseReply);
return Shard::HostWithResponse(boost::none,
Shard::CommandResponse{std::move(responseReply),
std::move(responseMetadata),
std::move(commandStatus),
std::move(writeConcernStatus)});
} catch (const DBException& ex) {
return Shard::HostWithResponse(boost::none, ex.toStatus());
}
}
开发者ID:Machyne,项目名称:mongo,代码行数:30,代码来源:shard_local.cpp
示例6: invariant
void WiredTigerOperationStats::fetchStats(WT_SESSION* session,
const std::string& uri,
const std::string& config) {
invariant(session);
WT_CURSOR* c = nullptr;
const char* cursorConfig = config.empty() ? nullptr : config.c_str();
int ret = session->open_cursor(session, uri.c_str(), nullptr, cursorConfig, &c);
uassert(ErrorCodes::CursorNotFound, "Unable to open statistics cursor", ret == 0);
invariant(c);
ON_BLOCK_EXIT([&] { c->close(c); });
const char* desc;
uint64_t value;
uint64_t key;
while (c->next(c) == 0 && c->get_key(c, &key) == 0) {
fassert(51035, c->get_value(c, &desc, nullptr, &value) == 0);
#if defined(__s390x__)
_stats[key >> 32] = WiredTigerUtil::castStatisticsValue<long long>(value);
#else
_stats[key] = WiredTigerUtil::castStatisticsValue<long long>(value);
#endif // __s390x__
}
// Reset the statistics so that the next fetch gives the recent values.
invariantWTOK(c->reset(c));
}
开发者ID:ajdavis,项目名称:mongo,代码行数:28,代码来源:wiredtiger_recovery_unit.cpp
示例7: SSL_get_peer_certificate
std::string SSLManager::validatePeerCertificate(const SSLConnection* conn) {
if (!_validateCertificates) return "";
X509* peerCert = SSL_get_peer_certificate(conn->ssl);
if (NULL == peerCert) { // no certificate presented by peer
if (_weakValidation) {
warning() << "no SSL certificate provided by peer" << endl;
}
else {
error() << "no SSL certificate provided by peer; connection rejected" << endl;
throw SocketException(SocketException::CONNECT_ERROR, "");
}
return "";
}
ON_BLOCK_EXIT(X509_free, peerCert);
long result = SSL_get_verify_result(conn->ssl);
if (result != X509_V_OK) {
error() << "SSL peer certificate validation failed:" <<
X509_verify_cert_error_string(result) << endl;
throw SocketException(SocketException::CONNECT_ERROR, "");
}
// TODO: check optional cipher restriction, using cert.
return getCertificateSubjectName(peerCert);
}
开发者ID:ChowZenki,项目名称:mongo,代码行数:28,代码来源:ssl_manager.cpp
示例8: threadPoolTaskExecutor
void BackgroundSync::producerThread(executor::TaskExecutor* taskExecutor) {
Client::initThread("rsBackgroundSync");
AuthorizationSession::get(cc())->grantInternalAuthorization();
// Disregard task executor passed into this function and use a local thread pool task executor
// instead. This ensures that potential blocking operations inside the fetcher callback will
// not affect other coordinators (such as the replication coordinator) that might be dependent
// on the shared task executor.
ThreadPool::Options threadPoolOptions;
threadPoolOptions.poolName = "rsBackgroundSync";
executor::ThreadPoolTaskExecutor threadPoolTaskExecutor(
stdx::make_unique<ThreadPool>(threadPoolOptions), executor::makeNetworkInterface());
taskExecutor = &threadPoolTaskExecutor;
taskExecutor->startup();
ON_BLOCK_EXIT([taskExecutor]() {
taskExecutor->shutdown();
taskExecutor->join();
});
while (!inShutdown()) {
try {
_producerThread(taskExecutor);
} catch (const DBException& e) {
std::string msg(str::stream() << "sync producer problem: " << e.toString());
error() << msg;
_replCoord->setMyHeartbeatMessage(msg);
} catch (const std::exception& e2) {
severe() << "sync producer exception: " << e2.what();
fassertFailed(28546);
}
}
}
开发者ID:alabid,项目名称:mongo,代码行数:32,代码来源:bgsync.cpp
示例9: eventProcessingThread
void eventProcessingThread() {
std::string eventName = getShutdownSignalName(ProcessId::getCurrent().asUInt32());
HANDLE event = CreateEventA(NULL, TRUE, FALSE, eventName.c_str());
if (event == NULL) {
warning() << "eventProcessingThread CreateEvent failed: "
<< errnoWithDescription();
return;
}
ON_BLOCK_EXIT(CloseHandle, event);
int returnCode = WaitForSingleObject(event, INFINITE);
if (returnCode != WAIT_OBJECT_0) {
if (returnCode == WAIT_FAILED) {
warning() << "eventProcessingThread WaitForSingleObject failed: "
<< errnoWithDescription();
return;
}
else {
warning() << "eventProcessingThread WaitForSingleObject failed: "
<< errnoWithDescription(returnCode);
return;
}
}
Client::initThread("eventTerminate");
log() << "shutdown event signaled, will terminate after current cmd ends";
exitCleanly(EXIT_CLEAN);
}
开发者ID:mohamed-ahmed,项目名称:mongo,代码行数:30,代码来源:server.cpp
示例10: syncDoInitialSync
void syncDoInitialSync(ReplicationCoordinatorExternalState* replicationCoordinatorExternalState) {
stdx::unique_lock<stdx::mutex> lk(_initialSyncMutex, stdx::defer_lock);
if (!lk.try_lock()) {
uasserted(34474, "Initial Sync Already Active.");
}
std::unique_ptr<BackgroundSync> bgsync;
{
log() << "Starting replication fetcher thread for initial sync";
auto txn = cc().makeOperationContext();
bgsync = stdx::make_unique<BackgroundSync>(
replicationCoordinatorExternalState,
replicationCoordinatorExternalState->makeInitialSyncOplogBuffer(txn.get()));
bgsync->startup(txn.get());
createOplog(txn.get());
}
ON_BLOCK_EXIT([&bgsync]() {
log() << "Stopping replication fetcher thread for initial sync";
auto txn = cc().makeOperationContext();
bgsync->shutdown(txn.get());
bgsync->join(txn.get());
});
int failedAttempts = 0;
while (failedAttempts < kMaxFailedAttempts) {
try {
// leave loop when successful
Status status = _initialSync(bgsync.get());
if (status.isOK()) {
break;
} else {
error() << status;
}
} catch (const DBException& e) {
error() << e;
// Return if in shutdown
if (inShutdown()) {
return;
}
}
if (inShutdown()) {
return;
}
error() << "initial sync attempt failed, " << (kMaxFailedAttempts - ++failedAttempts)
<< " attempts remaining";
sleepmillis(durationCount<Milliseconds>(kInitialSyncRetrySleepDuration));
}
// No need to print a stack
if (failedAttempts >= kMaxFailedAttempts) {
severe() << "The maximum number of retries have been exhausted for initial sync.";
fassertFailedNoTrace(16233);
}
}
开发者ID:mihail812,项目名称:mongo,代码行数:56,代码来源:rs_initialsync.cpp
示例11: errnoWithDescription
std::string errnoWithDescription(int errNumber) {
#if defined(_WIN32)
if (errNumber == -1)
errNumber = GetLastError();
#else
if (errNumber < 0)
errNumber = errno;
#endif
char buf[kBuflen];
char* msg{nullptr};
#if defined(__GNUC__) && defined(_GNU_SOURCE) && !(__ANDROID_API__ <= 22) && !defined(EMSCRIPTEN)
msg = strerror_r(errNumber, buf, kBuflen);
#elif defined(_WIN32)
LPWSTR errorText = nullptr;
FormatMessageW(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_IGNORE_INSERTS,
nullptr,
errNumber,
0,
reinterpret_cast<LPWSTR>(&errorText), // output
0, // minimum size for output buffer
nullptr);
if (errorText) {
ON_BLOCK_EXIT([&errorText] { LocalFree(errorText); });
std::string utf8ErrorText = toUtf8String(errorText);
auto size = utf8ErrorText.find_first_of("\r\n");
if (size == std::string::npos) { // not found
size = utf8ErrorText.length();
}
if (size >= kBuflen) {
size = kBuflen - 1;
}
memcpy(buf, utf8ErrorText.c_str(), size);
buf[size] = '\0';
msg = buf;
} else if (strerror_s(buf, kBuflen, errNumber) != 0) {
msg = buf;
}
#else /* XSI strerror_r */
if (strerror_r(errNumber, buf, kBuflen) == 0) {
msg = buf;
}
#endif
if (!msg) {
return str::stream() << kUnknownMsg << errNumber;
}
return {msg};
}
开发者ID:ShaneHarvey,项目名称:mongo,代码行数:56,代码来源:errno_util.cpp
示例12: ON_BLOCK_EXIT
void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) {
const int shuttingDown = _shuttingDown.fetchAndAdd(1);
ON_BLOCK_EXIT([this] { _shuttingDown.fetchAndSubtract(1); });
uassert(ErrorCodes::ShutdownInProgress,
"Cannot wait for durability because a shutdown is in progress",
!(shuttingDown & kShuttingDownMask));
// When forcing a checkpoint with journaling enabled, don't synchronize with other
// waiters, as a log flush is much cheaper than a full checkpoint.
if (forceCheckpoint && _engine->isDurable()) {
UniqueWiredTigerSession session = getSession();
WT_SESSION* s = session->getSession();
{
stdx::unique_lock<stdx::mutex> lk(_journalListenerMutex);
JournalListener::Token token = _journalListener->getToken();
invariantWTOK(s->checkpoint(s, NULL));
_journalListener->onDurable(token);
}
LOG(4) << "created checkpoint (forced)";
return;
}
uint32_t start = _lastSyncTime.load();
// Do the remainder in a critical section that ensures only a single thread at a time
// will attempt to synchronize.
stdx::unique_lock<stdx::mutex> lk(_lastSyncMutex);
uint32_t current = _lastSyncTime.loadRelaxed(); // synchronized with writes through mutex
if (current != start) {
// Someone else synced already since we read lastSyncTime, so we're done!
return;
}
_lastSyncTime.store(current + 1);
// Nobody has synched yet, so we have to sync ourselves.
auto session = getSession();
WT_SESSION* s = session->getSession();
// This gets the token (OpTime) from the last write, before flushing (either the journal, or a
// checkpoint), and then reports that token (OpTime) as a durable write.
stdx::unique_lock<stdx::mutex> jlk(_journalListenerMutex);
JournalListener::Token token = _journalListener->getToken();
// Use the journal when available, or a checkpoint otherwise.
if (_engine->isDurable()) {
invariantWTOK(s->log_flush(s, "sync=on"));
LOG(4) << "flushed journal";
} else {
invariantWTOK(s->checkpoint(s, NULL));
LOG(4) << "created checkpoint";
}
_journalListener->onDurable(token);
}
开发者ID:AlexOreshkevich,项目名称:mongo,代码行数:53,代码来源:wiredtiger_session_cache.cpp
示例13: main
int BOOST_TEST_CALL_DECL
main( int argc, char* argv[] )
{
google::InitGoogleLogging(argv[0]);
google::InstallFailureSignalHandler();
#ifdef DEBUG
google::LogToStderr();
#endif
ON_BLOCK_EXIT(&google::ShutdownGoogleLogging);
return ::boost::unit_test::unit_test_main( &init_unit_test, argc, argv );
}
开发者ID:Cyberax,项目名称:SofaDb,代码行数:12,代码来源:test_main.cpp
示例14: run
void run() {
// Create a new collection.
Database* db = _ctx.db();
Collection* coll;
{
WriteUnitOfWork wunit(&_opCtx);
ASSERT_OK(db->dropCollection(&_opCtx, _nss));
coll = db->createCollection(&_opCtx, _nss);
OpDebug* const nullOpDebug = nullptr;
ASSERT_OK(coll->insertDocument(&_opCtx,
InsertStatement(BSON("_id" << 1 << "a"
<< "dup")),
nullOpDebug,
true));
ASSERT_OK(coll->insertDocument(&_opCtx,
InsertStatement(BSON("_id" << 2 << "a"
<< "dup")),
nullOpDebug,
true));
wunit.commit();
}
MultiIndexBlock indexer;
const BSONObj spec = BSON("name"
<< "a"
<< "ns"
<< coll->ns().ns()
<< "key"
<< BSON("a" << 1)
<< "v"
<< static_cast<int>(kIndexVersion)
<< "unique"
<< true
<< "background"
<< background);
ON_BLOCK_EXIT([&] { indexer.cleanUpAfterBuild(&_opCtx, coll); });
ASSERT_OK(indexer.init(&_opCtx, coll, spec, MultiIndexBlock::kNoopOnInitFn).getStatus());
auto desc =
coll->getIndexCatalog()->findIndexByName(&_opCtx, "a", true /* includeUnfinished */);
ASSERT(desc);
// Hybrid index builds check duplicates explicitly.
ASSERT_OK(indexer.insertAllDocumentsInCollection(&_opCtx, coll));
auto status = indexer.checkConstraints(&_opCtx);
ASSERT_EQUALS(status.code(), ErrorCodes::DuplicateKey);
}
开发者ID:guoyr,项目名称:mongo,代码行数:52,代码来源:indexupdatetests.cpp
示例15: processStillActive
bool processStillActive(DWORD pid)
{
HANDLE proc = OpenProcess(SYNCHRONIZE, FALSE, pid);
ON_BLOCK_EXIT([proc]() {
::CloseHandle(proc);
});
DWORD exitCode;
if (!GetExitCodeProcess(proc, &exitCode)) {
spdlog::get("usvfs")->warn("failed to query exit code on process {}", pid);
return false;
} else {
return exitCode == STILL_ACTIVE;
}
}
开发者ID:killbug2004,项目名称:usvfs,代码行数:15,代码来源:usvfs.cpp
示例16: invariant
void WiredTigerSessionCache::releaseSession(WiredTigerSession* session) {
invariant(session);
invariant(session->cursorsOut() == 0);
const int shuttingDown = _shuttingDown.fetchAndAdd(1);
ON_BLOCK_EXIT([this] { _shuttingDown.fetchAndSubtract(1); });
if (shuttingDown & kShuttingDownMask) {
// Leak the session in order to avoid race condition with clean shutdown, where the
// storage engine is ripped from underneath transactions, which are not "active"
// (i.e., do not have any locks), but are just about to delete the recovery unit.
// See SERVER-16031 for more information.
return;
}
// This checks that we are only caching idle sessions and not something which might hold
// locks or otherwise prevent truncation.
{
WT_SESSION* ss = session->getSession();
uint64_t range;
invariantWTOK(ss->transaction_pinned_range(ss, &range));
invariant(range == 0);
}
// If the cursor epoch has moved on, close all cursors in the session.
uint64_t cursorEpoch = _cursorEpoch.load();
if (session->_getCursorEpoch() != cursorEpoch)
session->closeAllCursors();
bool returnedToCache = false;
uint64_t currentEpoch = _epoch.load();
if (session->_getEpoch() == currentEpoch) { // check outside of lock to reduce contention
stdx::lock_guard<stdx::mutex> lock(_cacheLock);
if (session->_getEpoch() == _epoch.load()) { // recheck inside the lock for correctness
returnedToCache = true;
_sessions.push_back(session);
}
} else
invariant(session->_getEpoch() < currentEpoch);
if (!returnedToCache)
delete session;
if (_engine && _engine->haveDropsQueued())
_engine->dropSomeQueuedIdents();
}
开发者ID:carlyrobison,项目名称:mongo,代码行数:47,代码来源:wiredtiger_session_cache.cpp
示例17: run
bool run(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj,
BSONObjBuilder& result) override {
uassert(ErrorCodes::IllegalOperation,
"_configsvrDropCollection can only be run on config servers",
serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
// Set the operation context read concern level to local for reads into the config database.
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
const NamespaceString nss(parseNs(dbname, cmdObj));
uassert(ErrorCodes::InvalidOptions,
str::stream() << "dropCollection must be called with majority writeConcern, got "
<< cmdObj,
opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority);
Seconds waitFor(DistLockManager::kDefaultLockTimeout);
MONGO_FAIL_POINT_BLOCK(setDropCollDistLockWait, customWait) {
const BSONObj& data = customWait.getData();
waitFor = Seconds(data["waitForSecs"].numberInt());
}
auto const catalogClient = Grid::get(opCtx)->catalogClient();
auto scopedDbLock =
ShardingCatalogManager::get(opCtx)->serializeCreateOrDropDatabase(opCtx, nss.db());
auto scopedCollLock =
ShardingCatalogManager::get(opCtx)->serializeCreateOrDropCollection(opCtx, nss);
auto dbDistLock = uassertStatusOK(
catalogClient->getDistLockManager()->lock(opCtx, nss.db(), "dropCollection", waitFor));
auto collDistLock = uassertStatusOK(
catalogClient->getDistLockManager()->lock(opCtx, nss.ns(), "dropCollection", waitFor));
ON_BLOCK_EXIT(
[opCtx, nss] { Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(nss); });
staleExceptionRetry(
opCtx, "_configsvrDropCollection", [&] { _dropCollection(opCtx, nss); });
return true;
}
开发者ID:ShaneHarvey,项目名称:mongo,代码行数:45,代码来源:configsvr_drop_collection_command.cpp
示例18: sha1
/*
* Computes a SHA-1 hash of 'input'.
*/
bool sha1(const unsigned char* input,
const size_t inputLen,
unsigned char* output) {
EVP_MD_CTX digestCtx;
EVP_MD_CTX_init(&digestCtx);
ON_BLOCK_EXIT(EVP_MD_CTX_cleanup, &digestCtx);
if (1 != EVP_DigestInit_ex(&digestCtx, EVP_sha1(), NULL)) {
return false;
}
if (1 != EVP_DigestUpdate(&digestCtx, input, inputLen)) {
return false;
}
return (1 == EVP_DigestFinal_ex(&digestCtx, output, NULL));
}
开发者ID:psigen,项目名称:mongo-cxx-driver-legacy-release,代码行数:21,代码来源:crypto_openssl.cpp
示例19: invariant
void OperationContext::markKilled(ErrorCodes::Error killCode) {
invariant(killCode != ErrorCodes::OK);
stdx::unique_lock<stdx::mutex> lkWaitMutex;
if (_waitMutex) {
invariant(++_numKillers > 0);
getClient()->unlock();
ON_BLOCK_EXIT([this]() noexcept {
getClient()->lock();
invariant(--_numKillers >= 0);
});
lkWaitMutex = stdx::unique_lock<stdx::mutex>{*_waitMutex};
}
_killCode.compareAndSwap(ErrorCodes::OK, killCode);
if (lkWaitMutex && _numKillers == 0) {
invariant(_waitCV);
_waitCV->notify_all();
}
}
开发者ID:EvgeniyPatlan,项目名称:percona-server-mongodb,代码行数:18,代码来源:operation_context.cpp
示例20: invariant
// Applies a batch of oplog entries, by using a set of threads to apply the operations and then
// writes the oplog entries to the local oplog.
OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
invariant(_applyFunc);
if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) {
// Use a ThreadPool to prefetch all the operations in a batch.
prefetchOps(ops.getDeque(), &_prefetcherPool);
}
std::vector<std::vector<BSONObj>> writerVectors(replWriterThreadCount);
fillWriterVectors(txn, ops.getDeque(), &writerVectors);
LOG(2) << "replication batch size is " << ops.getDeque().size() << endl;
// We must grab this because we're going to grab write locks later.
// We hold this mutex the entire time we're writing; it doesn't matter
// because all readers are blocked anyway.
stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync);
// stop all readers until we're done
Lock::ParallelBatchWriterMode pbwm(txn->lockState());
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain()) {
severe() << "attempting to replicate ops while primary";
fassertFailed(28527);
}
applyOps(writerVectors, &_writerPool, _applyFunc, this);
OpTime lastOpTime;
{
ON_BLOCK_EXIT([&] { _writerPool.join(); });
std::vector<BSONObj> raws;
raws.reserve(ops.getDeque().size());
for (auto&& op : ops.getDeque()) {
raws.emplace_back(op.raw);
}
lastOpTime = writeOpsToOplog(txn, raws);
if (inShutdown()) {
return OpTime();
}
}
// We have now written all database writes and updated the oplog to match.
return lastOpTime;
}
开发者ID:VonRosenchild,项目名称:percona-server-mongodb,代码行数:46,代码来源:sync_tail.cpp
注:本文中的ON_BLOCK_EXIT函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论