long long Database::getIndexSizeForCollection(OperationContext* opCtx,
Collection* coll,
BSONObjBuilder* details,
int scale ) {
if ( !coll )
return 0;
IndexCatalog::IndexIterator ii =
coll->getIndexCatalog()->getIndexIterator( true /*includeUnfinishedIndexes*/ );
long long totalSize = 0;
while ( ii.more() ) {
IndexDescriptor* d = ii.next();
string indNS = d->indexNamespace();
// XXX creating a Collection for an index which isn't a Collection
Collection* indColl = getCollection( opCtx, indNS );
if ( ! indColl ) {
log() << "error: have index descriptor [" << indNS
<< "] but no entry in the index collection." << endl;
continue;
}
totalSize += indColl->dataSize();
if ( details ) {
long long const indexSize = indColl->dataSize() / scale;
details->appendNumber( d->indexName() , indexSize );
}
}
return totalSize;
}
/**
* order will be:
* 1) store index specs
* 2) drop indexes
* 3) truncate record store
* 4) re-write indexes
*/
Status Collection::truncate(OperationContext* txn) {
dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_X));
BackgroundOperation::assertNoBgOpInProgForNs(ns());
invariant(_indexCatalog.numIndexesInProgress(txn) == 0);
// 1) store index specs
vector<BSONObj> indexSpecs;
{
IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator(txn, false);
while (ii.more()) {
const IndexDescriptor* idx = ii.next();
indexSpecs.push_back(idx->infoObj().getOwned());
}
}
// 2) drop indexes
Status status = _indexCatalog.dropAllIndexes(txn, true);
if (!status.isOK())
return status;
_cursorManager.invalidateAll(false, "collection truncated");
// 3) truncate record store
status = _recordStore->truncate(txn);
if (!status.isOK())
return status;
// 4) re-create indexes
for (size_t i = 0; i < indexSpecs.size(); i++) {
status = _indexCatalog.createIndexOnEmptyCollection(txn, indexSpecs[i]);
if (!status.isOK())
return status;
}
return Status::OK();
}
void CollectionInfoCache::updatePlanCacheIndexEntries(OperationContext* opCtx) {
std::vector<IndexEntry> indexEntries;
// TODO We shouldn't need to include unfinished indexes, but we must here because the index
// catalog may be in an inconsistent state. SERVER-18346.
const bool includeUnfinishedIndexes = true;
IndexCatalog::IndexIterator ii =
_collection->getIndexCatalog()->getIndexIterator(opCtx, includeUnfinishedIndexes);
while (ii.more()) {
const IndexDescriptor* desc = ii.next();
const IndexCatalogEntry* ice = ii.catalogEntry(desc);
indexEntries.emplace_back(desc->keyPattern(),
desc->getAccessMethodName(),
desc->isMultikey(opCtx),
ice->getMultikeyPaths(opCtx),
desc->isSparse(),
desc->unique(),
desc->indexName(),
ice->getFilterExpression(),
desc->infoObj(),
ice->getCollator());
}
_planCache->notifyOfIndexEntries(indexEntries);
}
// page in pages needed for all index lookups on a given object
void prefetchIndexPages(Collection* collection,
const repl::ReplSetImpl::IndexPrefetchConfig& prefetchConfig,
const BSONObj& obj) {
DiskLoc unusedDl; // unused
BSONObjSet unusedKeys;
// do we want prefetchConfig to be (1) as-is, (2) for update ops only, or (3) configured per op type?
// One might want PREFETCH_NONE for updates, but it's more rare that it is a bad idea for inserts.
// #3 (per op), a big issue would be "too many knobs".
switch (prefetchConfig) {
case repl::ReplSetImpl::PREFETCH_NONE:
return;
case repl::ReplSetImpl::PREFETCH_ID_ONLY:
{
TimerHolder timer( &prefetchIndexStats);
// on the update op case, the call to prefetchRecordPages will touch the _id index.
// thus perhaps this option isn't very useful?
try {
IndexDescriptor* desc = collection->getIndexCatalog()->findIdIndex();
if ( !desc )
return;
IndexAccessMethod* iam = collection->getIndexCatalog()->getIndex( desc );
verify( iam );
iam->touch(obj);
}
catch (const DBException& e) {
LOG(2) << "ignoring exception in prefetchIndexPages(): " << e.what() << endl;
}
break;
}
case repl::ReplSetImpl::PREFETCH_ALL:
{
// indexCount includes all indexes, including ones
// in the process of being built
IndexCatalog::IndexIterator ii = collection->getIndexCatalog()->getIndexIterator( true );
while ( ii.more() ) {
TimerHolder timer( &prefetchIndexStats);
// This will page in all index pages for the given object.
try {
IndexDescriptor* desc = ii.next();
IndexAccessMethod* iam = collection->getIndexCatalog()->getIndex( desc );
verify( iam );
iam->touch(obj);
}
catch (const DBException& e) {
LOG(2) << "ignoring exception in prefetchIndexPages(): " << e.what() << endl;
}
unusedKeys.clear();
}
break;
}
default:
fassertFailed(16427);
}
}
void CollectionInfoCache::computeIndexKeys() {
DEV Lock::assertWriteLocked( _collection->ns().ns() );
_indexedPaths.clear();
IndexCatalog::IndexIterator i = _collection->getIndexCatalog()->getIndexIterator(true);
while (i.more()) {
IndexDescriptor* descriptor = i.next();
if (descriptor->getAccessMethodName() != IndexNames::TEXT) {
BSONObj key = descriptor->keyPattern();
BSONObjIterator j(key);
while (j.more()) {
BSONElement e = j.next();
_indexedPaths.addPath(e.fieldName());
}
}
else {
fts::FTSSpec ftsSpec(descriptor->infoObj());
if (ftsSpec.wildcard()) {
_indexedPaths.allPathsIndexed();
}
else {
for (size_t i = 0; i < ftsSpec.numExtraBefore(); ++i) {
_indexedPaths.addPath(ftsSpec.extraBefore(i));
}
for (fts::Weights::const_iterator it = ftsSpec.weights().begin();
it != ftsSpec.weights().end();
++it) {
_indexedPaths.addPath(it->first);
}
for (size_t i = 0; i < ftsSpec.numExtraAfter(); ++i) {
_indexedPaths.addPath(ftsSpec.extraAfter(i));
}
// Any update to a path containing "language" as a component could change the
// language of a subdocument. Add the override field as a path component.
_indexedPaths.addPathComponent(ftsSpec.languageOverrideField());
}
}
}
_keysComputed = true;
}
StatusWith<RecordId> Collection::updateDocument(OperationContext* txn,
const RecordId& oldLocation,
const Snapshotted<BSONObj>& oldDoc,
const BSONObj& newDoc,
bool enforceQuota,
bool indexesAffected,
OpDebug* debug,
oplogUpdateEntryArgs& args) {
{
auto status = checkValidation(txn, newDoc);
if (!status.isOK()) {
if (_validationLevel == STRICT_V) {
return status;
}
// moderate means we have to check the old doc
auto oldDocStatus = checkValidation(txn, oldDoc.value());
if (oldDocStatus.isOK()) {
// transitioning from good -> bad is not ok
return status;
}
// bad -> bad is ok in moderate mode
}
}
dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX));
invariant(oldDoc.snapshotId() == txn->recoveryUnit()->getSnapshotId());
if (_needCappedLock) {
// X-lock the metadata resource for this capped collection until the end of the WUOW. This
// prevents the primary from executing with more concurrency than secondaries.
// See SERVER-21646.
Lock::ResourceLock{txn->lockState(), ResourceId(RESOURCE_METADATA, _ns.ns()), MODE_X};
}
SnapshotId sid = txn->recoveryUnit()->getSnapshotId();
BSONElement oldId = oldDoc.value()["_id"];
if (!oldId.eoo() && (oldId != newDoc["_id"]))
return StatusWith<RecordId>(
ErrorCodes::InternalError, "in Collection::updateDocument _id mismatch", 13596);
// The MMAPv1 storage engine implements capped collections in a way that does not allow records
// to grow beyond their original size. If MMAPv1 part of a replicaset with storage engines that
// do not have this limitation, replication could result in errors, so it is necessary to set a
// uniform rule here. Similarly, it is not sufficient to disallow growing records, because this
// happens when secondaries roll back an update shrunk a record. Exactly replicating legacy
// MMAPv1 behavior would require padding shrunk documents on all storage engines. Instead forbid
// all size changes.
const auto oldSize = oldDoc.value().objsize();
if (_recordStore->isCapped() && oldSize != newDoc.objsize())
return {ErrorCodes::CannotGrowDocumentInCappedNamespace,
str::stream() << "Cannot change the size of a document in a capped collection: "
<< oldSize << " != " << newDoc.objsize()};
// At the end of this step, we will have a map of UpdateTickets, one per index, which
// represent the index updates needed to be done, based on the changes between oldDoc and
// newDoc.
OwnedPointerMap<IndexDescriptor*, UpdateTicket> updateTickets;
if (indexesAffected) {
IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator(txn, true);
while (ii.more()) {
IndexDescriptor* descriptor = ii.next();
IndexCatalogEntry* entry = ii.catalogEntry(descriptor);
IndexAccessMethod* iam = ii.accessMethod(descriptor);
InsertDeleteOptions options;
options.logIfError = false;
options.dupsAllowed =
!(KeyPattern::isIdKeyPattern(descriptor->keyPattern()) || descriptor->unique()) ||
repl::getGlobalReplicationCoordinator()->shouldIgnoreUniqueIndex(descriptor);
UpdateTicket* updateTicket = new UpdateTicket();
updateTickets.mutableMap()[descriptor] = updateTicket;
Status ret = iam->validateUpdate(txn,
oldDoc.value(),
newDoc,
oldLocation,
options,
updateTicket,
entry->getFilterExpression());
if (!ret.isOK()) {
return StatusWith<RecordId>(ret);
}
}
}
// This can call back into Collection::recordStoreGoingToMove. If that happens, the old
// object is removed from all indexes.
StatusWith<RecordId> newLocation = _recordStore->updateRecord(
txn, oldLocation, newDoc.objdata(), newDoc.objsize(), _enforceQuota(enforceQuota), this);
if (!newLocation.isOK()) {
return newLocation;
}
// At this point, the old object may or may not still be indexed, depending on if it was
// moved. If the object did move, we need to add the new location to all indexes.
if (newLocation.getValue() != oldLocation) {
if (debug) {
if (debug->nmoved == -1) // default of -1 rather than 0
debug->nmoved = 1;
//.........这里部分代码省略.........
StatusWith<DiskLoc> Collection::updateDocument( OperationContext* txn,
const DiskLoc& oldLocation,
const BSONObj& objNew,
bool enforceQuota,
OpDebug* debug ) {
BSONObj objOld = _recordStore->dataFor( txn, oldLocation ).toBson();
if ( objOld.hasElement( "_id" ) ) {
BSONElement oldId = objOld["_id"];
BSONElement newId = objNew["_id"];
if ( oldId != newId )
return StatusWith<DiskLoc>( ErrorCodes::InternalError,
"in Collection::updateDocument _id mismatch",
13596 );
}
if ( ns().coll() == "system.users" ) {
// XXX - andy and spencer think this should go away now
V2UserDocumentParser parser;
Status s = parser.checkValidUserDocument(objNew);
if ( !s.isOK() )
return StatusWith<DiskLoc>( s );
}
/* duplicate key check. we descend the btree twice - once for this check, and once for the actual inserts, further
below. that is suboptimal, but it's pretty complicated to do it the other way without rollbacks...
*/
OwnedPointerMap<IndexDescriptor*,UpdateTicket> updateTickets;
IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( txn, true );
while ( ii.more() ) {
IndexDescriptor* descriptor = ii.next();
IndexAccessMethod* iam = _indexCatalog.getIndex( descriptor );
InsertDeleteOptions options;
options.logIfError = false;
options.dupsAllowed =
!(KeyPattern::isIdKeyPattern(descriptor->keyPattern()) || descriptor->unique())
|| repl::getGlobalReplicationCoordinator()->shouldIgnoreUniqueIndex(descriptor);
UpdateTicket* updateTicket = new UpdateTicket();
updateTickets.mutableMap()[descriptor] = updateTicket;
Status ret = iam->validateUpdate(txn, objOld, objNew, oldLocation, options, updateTicket );
if ( !ret.isOK() ) {
return StatusWith<DiskLoc>( ret );
}
}
// this can callback into Collection::recordStoreGoingToMove
StatusWith<DiskLoc> newLocation = _recordStore->updateRecord( txn,
oldLocation,
objNew.objdata(),
objNew.objsize(),
_enforceQuota( enforceQuota ),
this );
if ( !newLocation.isOK() ) {
return newLocation;
}
_infoCache.notifyOfWriteOp();
if ( newLocation.getValue() != oldLocation ) {
if ( debug ) {
if (debug->nmoved == -1) // default of -1 rather than 0
debug->nmoved = 1;
else
debug->nmoved += 1;
}
_indexCatalog.indexRecord(txn, objNew, newLocation.getValue());
return newLocation;
}
if ( debug )
debug->keyUpdates = 0;
ii = _indexCatalog.getIndexIterator( txn, true );
while ( ii.more() ) {
IndexDescriptor* descriptor = ii.next();
IndexAccessMethod* iam = _indexCatalog.getIndex( descriptor );
int64_t updatedKeys;
Status ret = iam->update(txn, *updateTickets.mutableMap()[descriptor], &updatedKeys);
if ( !ret.isOK() )
return StatusWith<DiskLoc>( ret );
if ( debug )
debug->keyUpdates += updatedKeys;
}
// Broadcast the mutation so that query results stay correct.
_cursorCache.invalidateDocument(oldLocation, INVALIDATION_MUTATION);
return newLocation;
}
请发表评论