static void PersistentStore_InsertTuple(
PersistentStoreData *storeData,
PersistentStoreSharedData *storeSharedData,
Datum *values,
bool flushToXLog,
/* When true, the XLOG record for this change will be flushed to disk. */
ItemPointer persistentTid)
/* TID of the stored tuple. */
{
Relation persistentRel;
#ifdef USE_ASSERT_CHECKING
if (storeSharedData == NULL ||
!PersistentStoreSharedData_EyecatcherIsValid(storeSharedData))
elog(ERROR, "Persistent store shared-memory not valid");
#endif
if (Debug_persistent_store_print)
elog(PersistentStore_DebugPrintLevel(),
"PersistentStore_InsertTuple: Going to insert new tuple ('%s', shared data %p)",
storeData->tableName,
storeSharedData);
persistentRel = (*storeData->openRel)();
PersistentStore_DoInsertTuple(
storeData,
storeSharedData,
persistentRel,
values,
flushToXLog,
persistentTid);
(*storeData->closeRel)(persistentRel);
if (Debug_persistent_store_print)
{
elog(PersistentStore_DebugPrintLevel(),
"PersistentStore_InsertTuple: Inserted new tuple at TID %s ('%s')",
ItemPointerToString(persistentTid),
storeData->tableName);
(*storeData->printTupleCallback)(
PersistentStore_DebugPrintLevel(),
"STORE INSERT TUPLE",
persistentTid,
values);
}
}
void PersistentStore_FreeTuple(
PersistentStoreData *storeData,
PersistentStoreSharedData *storeSharedData,
ItemPointer persistentTid,
/* TID of the stored tuple. */
Datum *freeValues,
bool flushToXLog)
/* When true, the XLOG record for this change will be flushed to disk. */
{
Relation persistentRel;
XLogRecPtr xlogEndLoc;
/* The end location of the UPDATE XLOG record. */
Assert( LWLockHeldByMe(PersistentObjLock) );
#ifdef USE_ASSERT_CHECKING
if (storeSharedData == NULL ||
!PersistentStoreSharedData_EyecatcherIsValid(storeSharedData))
elog(ERROR, "Persistent store shared-memory not valid");
#endif
if (Debug_persistent_store_print)
elog(PersistentStore_DebugPrintLevel(),
"PersistentStore_FreeTuple: Going to free tuple at TID %s ('%s', shared data %p)",
ItemPointerToString(persistentTid),
storeData->tableName,
storeSharedData);
Assert(ItemPointerIsValid(persistentTid));
persistentRel = (*storeData->openRel)();
simple_heap_delete_xid(persistentRel, persistentTid, FrozenTransactionId);
/*
* XLOG location of the UPDATE tuple's XLOG record.
*/
xlogEndLoc = XLogLastInsertEndLoc();
(*storeData->closeRel)(persistentRel);
storeSharedData->inUseCount--;
if (flushToXLog)
{
XLogFlush(xlogEndLoc);
XLogRecPtr_Zero(&nowaitXLogEndLoc);
}
else
nowaitXLogEndLoc = xlogEndLoc;
}
//.........这里部分代码省略.........
PersistentTablespace_MarkCreatePending(
Oid filespaceOid,
/* The filespace where the tablespace lives. */
Oid tablespaceOid,
/* The tablespace OID for the create. */
MirroredObjectExistenceState mirrorExistenceState,
ItemPointer persistentTid,
/* TID of the gp_persistent_rel_files tuple for the rel file */
int64 *persistentSerialNum,
bool flushToXLog)
/* When true, the XLOG record for this change will be flushed to disk. */
{
WRITE_PERSISTENT_STATE_ORDERED_LOCK_DECLARE;
PersistentFileSysObjName fsObjName;
TablespaceDirEntry tablespaceDirEntry;
TransactionId topXid;
if (Persistent_BeforePersistenceWork())
{
if (Debug_persistent_print)
elog(Persistent_DebugPrintLevel(),
"Skipping persistent tablespace %u because we are before persistence work",
tablespaceOid);
return;
/*
* The initdb process will load the persistent table once we out of
* bootstrap mode.
*/
}
PersistentTablespace_VerifyInitScan();
PersistentFileSysObjName_SetTablespaceDir(&fsObjName, tablespaceOid);
topXid = GetTopTransactionId();
WRITE_PERSISTENT_STATE_ORDERED_LOCK;
PersistentTablespace_AddTuple(
filespaceOid,
tablespaceOid,
PersistentFileSysState_CreatePending,
/* createMirrorDataLossTrackingSessionNum */ 0,
mirrorExistenceState,
/* reserved */ 0,
/* parentXid */ topXid,
flushToXLog,
persistentTid,
persistentSerialNum);
WRITE_TABLESPACE_HASH_LOCK;
tablespaceDirEntry =
PersistentTablespace_CreateEntryUnderLock(filespaceOid, tablespaceOid);
Assert(tablespaceDirEntry != NULL);
tablespaceDirEntry->state = PersistentFileSysState_CreatePending;
ItemPointerCopy(persistentTid, &tablespaceDirEntry->persistentTid);
tablespaceDirEntry->persistentSerialNum = *persistentSerialNum;
WRITE_TABLESPACE_HASH_UNLOCK;
/*
* This XLOG must be generated under the persistent write-lock.
*/
#ifdef MASTER_MIRROR_SYNC
mmxlog_log_create_tablespace(
filespaceOid,
tablespaceOid);
#endif
SIMPLE_FAULT_INJECTOR(FaultBeforePendingDeleteTablespaceEntry);
/*
* MPP-18228 To make adding 'Create Pending' entry to persistent table and
* adding to the PendingDelete list atomic
*/
PendingDelete_AddCreatePendingEntryWrapper(
&fsObjName,
persistentTid,
*persistentSerialNum);
WRITE_PERSISTENT_STATE_ORDERED_UNLOCK;
if (Debug_persistent_print)
elog(Persistent_DebugPrintLevel(),
"Persistent tablespace directory: Add '%s' in state 'Created', mirror existence state '%s', serial number " INT64_FORMAT " at TID %s",
PersistentFileSysObjName_ObjectName(&fsObjName),
MirroredObjectExistenceState_Name(mirrorExistenceState),
*persistentSerialNum,
ItemPointerToString(persistentTid));
}
void PersistentStore_UpdateTuple(
PersistentStoreData *storeData,
PersistentStoreSharedData *storeSharedData,
ItemPointer persistentTid,
/* TID of the stored tuple. */
Datum *values,
bool flushToXLog)
/* When true, the XLOG record for this change will be flushed to disk. */
{
Relation persistentRel;
bool *nulls;
HeapTuple persistentTuple = NULL;
XLogRecPtr xlogUpdateEndLoc;
#ifdef USE_ASSERT_CHECKING
if (storeSharedData == NULL ||
!PersistentStoreSharedData_EyecatcherIsValid(storeSharedData))
elog(ERROR, "Persistent store shared-memory not valid");
#endif
if (Debug_persistent_store_print)
elog(PersistentStore_DebugPrintLevel(),
"PersistentStore_ReplaceTuple: Going to update whole tuple at TID %s ('%s', shared data %p)",
ItemPointerToString(persistentTid),
storeData->tableName,
storeSharedData);
persistentRel = (*storeData->openRel)();
/*
* In order to keep the tuples the exact same size to enable direct reuse of
* free tuples, we do not use NULLs.
*/
nulls = (bool*)palloc0(storeData->numAttributes * sizeof(bool));
/*
* Form the tuple.
*/
persistentTuple = heap_form_tuple(persistentRel->rd_att, values, nulls);
if (!HeapTupleIsValid(persistentTuple))
elog(ERROR, "Failed to build persistent tuple ('%s')",
storeData->tableName);
persistentTuple->t_self = *persistentTid;
frozen_heap_inplace_update(persistentRel, persistentTuple);
/*
* Return the XLOG location of the UPDATE tuple's XLOG record.
*/
xlogUpdateEndLoc = XLogLastInsertEndLoc();
heap_freetuple(persistentTuple);
#ifdef FAULT_INJECTOR
if (FaultInjector_InjectFaultIfSet(SyncPersistentTable,
DDLNotSpecified,
"" /* databaseName */,
"" /* tableName */)== FaultInjectorTypeSkip)
{
FlushRelationBuffers(persistentRel);
smgrimmedsync(persistentRel->rd_smgr);
}
#endif
(*storeData->closeRel)(persistentRel);
if (Debug_persistent_store_print)
{
elog(PersistentStore_DebugPrintLevel(),
"PersistentStore_UpdateTuple: Updated whole tuple at TID %s ('%s')",
ItemPointerToString(persistentTid),
storeData->tableName);
(*storeData->printTupleCallback)(
PersistentStore_DebugPrintLevel(),
"STORE UPDATED TUPLE",
persistentTid,
values);
}
if (flushToXLog)
{
XLogFlush(xlogUpdateEndLoc);
XLogRecPtr_Zero(&nowaitXLogEndLoc);
}
else
nowaitXLogEndLoc = xlogUpdateEndLoc;
}
void
AppendOnlyMirrorResyncEofs_Merge(RelFileNode *relFileNode,
int32 segmentFileNum,
int nestLevel, /* Transaction nesting level. */
char *relationName,
ItemPointer persistentTid,
int64 persistentSerialNum,
bool mirrorCatchupRequired,
MirrorDataLossTrackingState mirrorDataLossTrackingState,
int64 mirrorDataLossTrackingSessionNum,
int64 mirrorNewEof)
{
int64 previousMirrorNewEof = 0;
AppendOnlyMirrorResyncEofsKey key;
AppendOnlyMirrorResyncEofs *entry;
bool found;
if (AppendOnlyMirrorResyncEofsTable == NULL)
AppendOnlyMirrorResyncEofs_HashTableInit();
AppendOnlyMirrorResyncEofs_InitKey(
&key,
relFileNode,
segmentFileNum,
nestLevel);
entry =
(AppendOnlyMirrorResyncEofs *)
hash_search(AppendOnlyMirrorResyncEofsTable,
(void *) &key,
HASH_ENTER,
&found);
if (!found)
{
entry->relationName = MemoryContextStrdup(TopMemoryContext, relationName);
entry->persistentSerialNum = persistentSerialNum;
entry->persistentTid = *persistentTid;
entry->didIncrementCommitCount = false;
entry->isDistributedTransaction = false;
entry->gid[0] = '\0';
entry->mirrorCatchupRequired = mirrorCatchupRequired;
entry->mirrorDataLossTrackingState = mirrorDataLossTrackingState;
entry->mirrorDataLossTrackingSessionNum = mirrorDataLossTrackingSessionNum;
entry->mirrorNewEof = mirrorNewEof;
}
else
{
previousMirrorNewEof = entry->mirrorNewEof;
/*
* UNDONE: What is the purpose of this IF stmt? Shouldn't we always
* set the new EOF?
*/
if (mirrorNewEof > entry->mirrorNewEof)
entry->mirrorNewEof = mirrorNewEof;
/*
* We adopt the newer FileRep state because we accurately track the
* state of mirror data. For example, the first write session might
* have had loss because the mirror was down. But then the second
* write session discovered we were in sync and copied both the first
* and second write session to the mirror and flushed it.
*/
entry->mirrorCatchupRequired = mirrorCatchupRequired;
entry->mirrorDataLossTrackingState = mirrorDataLossTrackingState;
entry->mirrorDataLossTrackingSessionNum = mirrorDataLossTrackingSessionNum;
}
if (Debug_persistent_print ||
Debug_persistent_appendonly_commit_count_print)
elog(Persistent_DebugPrintLevel(),
"Storage Manager: %s Append-Only mirror resync eofs entry: %u/%u/%u, segment file #%d, relation name '%s' (transaction nest level %d, persistent TID %s, persistent serial number " INT64_FORMAT ", "
"mirror data loss tracking (state '%s', session num " INT64_FORMAT "), "
"previous mirror new EOF " INT64_FORMAT ", input mirror new EOF " INT64_FORMAT ", saved mirror new EOF " INT64_FORMAT ")",
(found ? "Merge" : "New"),
entry->key.relFileNode.spcNode,
entry->key.relFileNode.dbNode,
entry->key.relFileNode.relNode,
entry->key.segmentFileNum,
(entry->relationName == NULL ? "<null>" : entry->relationName),
entry->key.nestLevel,
ItemPointerToString(&entry->persistentTid),
entry->persistentSerialNum,
MirrorDataLossTrackingState_Name(mirrorDataLossTrackingState),
mirrorDataLossTrackingSessionNum,
previousMirrorNewEof,
mirrorNewEof,
entry->mirrorNewEof);
}
请发表评论