/*
* _hash_freeovflpage() -
*
* Remove this overflow page from its bucket's chain, and mark the page as
* free. On entry, ovflbuf is write-locked; it is released before exiting.
*
* Since this function is invoked in VACUUM, we provide an access strategy
* parameter that controls fetches of the bucket pages.
*
* Returns the block number of the page that followed the given page
* in the bucket, or InvalidBlockNumber if no following page.
*
* NB: caller must not hold lock on metapage, nor on either page that's
* adjacent in the bucket chain. The caller had better hold exclusive lock
* on the bucket, too.
*/
BlockNumber
_hash_freeovflpage(Relation rel, Buffer ovflbuf,
BufferAccessStrategy bstrategy)
{
HashMetaPage metap;
Buffer metabuf;
Buffer mapbuf;
BlockNumber ovflblkno;
BlockNumber prevblkno;
BlockNumber blkno;
BlockNumber nextblkno;
HashPageOpaque ovflopaque;
Page ovflpage;
Page mappage;
uint32 *freep;
uint32 ovflbitno;
int32 bitmappage,
bitmapbit;
Bucket bucket PG_USED_FOR_ASSERTS_ONLY;
/* Get information from the doomed page */
_hash_checkpage(rel, ovflbuf, LH_OVERFLOW_PAGE);
ovflblkno = BufferGetBlockNumber(ovflbuf);
ovflpage = BufferGetPage(ovflbuf);
ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
nextblkno = ovflopaque->hasho_nextblkno;
prevblkno = ovflopaque->hasho_prevblkno;
bucket = ovflopaque->hasho_bucket;
/*
* Zero the page for debugging's sake; then write and release it. (Note:
* if we failed to zero the page here, we'd have problems with the Assert
* in _hash_pageinit() when the page is reused.)
*/
MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf));
_hash_wrtbuf(rel, ovflbuf);
/*
* Fix up the bucket chain. this is a doubly-linked list, so we must fix
* up the bucket chain members behind and ahead of the overflow page being
* deleted. No concurrency issues since we hold exclusive lock on the
* entire bucket.
*/
if (BlockNumberIsValid(prevblkno))
{
Buffer prevbuf = _hash_getbuf_with_strategy(rel,
prevblkno,
HASH_WRITE,
LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
bstrategy);
Page prevpage = BufferGetPage(prevbuf);
HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
Assert(prevopaque->hasho_bucket == bucket);
prevopaque->hasho_nextblkno = nextblkno;
_hash_wrtbuf(rel, prevbuf);
}
if (BlockNumberIsValid(nextblkno))
{
Buffer nextbuf = _hash_getbuf_with_strategy(rel,
nextblkno,
HASH_WRITE,
LH_OVERFLOW_PAGE,
bstrategy);
Page nextpage = BufferGetPage(nextbuf);
HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
Assert(nextopaque->hasho_bucket == bucket);
nextopaque->hasho_prevblkno = prevblkno;
_hash_wrtbuf(rel, nextbuf);
}
/* Note: bstrategy is intentionally not used for metapage and bitmap */
/* Read the metapage so we can determine which bitmap page to use */
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ, LH_META_PAGE);
metap = HashPageGetMeta(BufferGetPage(metabuf));
/* Identify which bit to set */
ovflbitno = blkno_to_bitno(metap, ovflblkno);
bitmappage = ovflbitno >> BMPG_SHIFT(metap);
bitmapbit = ovflbitno & BMPG_MASK(metap);
//.........这里部分代码省略.........
/*
* Bulk deletion of all index entries pointing to a set of heap tuples.
* The set of target tuples is specified via a callback routine that tells
* whether any given heap tuple (identified by ItemPointer) is being deleted.
*
* This function also deletes the tuples that are moved by split to other
* bucket.
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
IndexBulkDeleteResult *
hashbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state)
{
Relation rel = info->index;
double tuples_removed;
double num_index_tuples;
double orig_ntuples;
Bucket orig_maxbucket;
Bucket cur_maxbucket;
Bucket cur_bucket;
Buffer metabuf = InvalidBuffer;
HashMetaPage metap;
HashMetaPage cachedmetap;
tuples_removed = 0;
num_index_tuples = 0;
/*
* We need a copy of the metapage so that we can use its hashm_spares[]
* values to compute bucket page addresses, but a cached copy should be
* good enough. (If not, we'll detect that further down and refresh the
* cache as necessary.)
*/
cachedmetap = _hash_getcachedmetap(rel, &metabuf, false);
Assert(cachedmetap != NULL);
orig_maxbucket = cachedmetap->hashm_maxbucket;
orig_ntuples = cachedmetap->hashm_ntuples;
/* Scan the buckets that we know exist */
cur_bucket = 0;
cur_maxbucket = orig_maxbucket;
loop_top:
while (cur_bucket <= cur_maxbucket)
{
BlockNumber bucket_blkno;
BlockNumber blkno;
Buffer bucket_buf;
Buffer buf;
HashPageOpaque bucket_opaque;
Page page;
bool split_cleanup = false;
/* Get address of bucket's start page */
bucket_blkno = BUCKET_TO_BLKNO(cachedmetap, cur_bucket);
blkno = bucket_blkno;
/*
* We need to acquire a cleanup lock on the primary bucket page to out
* wait concurrent scans before deleting the dead tuples.
*/
buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
LockBufferForCleanup(buf);
_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
page = BufferGetPage(buf);
bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
/*
* If the bucket contains tuples that are moved by split, then we need
* to delete such tuples. We can't delete such tuples if the split
* operation on bucket is not finished as those are needed by scans.
*/
if (!H_BUCKET_BEING_SPLIT(bucket_opaque) &&
H_NEEDS_SPLIT_CLEANUP(bucket_opaque))
{
split_cleanup = true;
/*
* This bucket might have been split since we last held a lock on
* the metapage. If so, hashm_maxbucket, hashm_highmask and
* hashm_lowmask might be old enough to cause us to fail to remove
* tuples left behind by the most recent split. To prevent that,
* now that the primary page of the target bucket has been locked
* (and thus can't be further split), check whether we need to
* update our cached metapage data.
*/
Assert(bucket_opaque->hasho_prevblkno != InvalidBlockNumber);
if (bucket_opaque->hasho_prevblkno > cachedmetap->hashm_maxbucket)
{
cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
Assert(cachedmetap != NULL);
}
}
bucket_buf = buf;
//.........这里部分代码省略.........
static void
btree_xlog_split(bool onleft, bool isroot, XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
bool isleaf = (xlrec->level == 0);
Buffer lbuf;
Buffer rbuf;
Page rpage;
BTPageOpaque ropaque;
char *datapos;
Size datalen;
Item left_hikey = NULL;
Size left_hikeysz = 0;
BlockNumber leftsib;
BlockNumber rightsib;
BlockNumber rnext;
XLogRecGetBlockTag(record, 0, NULL, NULL, &leftsib);
XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib);
if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext))
rnext = P_NONE;
/*
* Clear the incomplete split flag on the left sibling of the child page
* this is a downlink for. (Like in btree_xlog_insert, this can be done
* before locking the other pages)
*/
if (!isleaf)
_bt_clear_incomplete_split(record, 3);
/* Reconstruct right (new) sibling page from scratch */
rbuf = XLogInitBufferForRedo(record, 1);
datapos = XLogRecGetBlockData(record, 1, &datalen);
rpage = (Page) BufferGetPage(rbuf);
_bt_pageinit(rpage, BufferGetPageSize(rbuf));
ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
ropaque->btpo_prev = leftsib;
ropaque->btpo_next = rnext;
ropaque->btpo.level = xlrec->level;
ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
ropaque->btpo_cycleid = 0;
_bt_restore_page(rpage, datapos, datalen);
/*
* On leaf level, the high key of the left page is equal to the first key
* on the right page.
*/
if (isleaf)
{
ItemId hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
left_hikey = PageGetItem(rpage, hiItemId);
left_hikeysz = ItemIdGetLength(hiItemId);
}
PageSetLSN(rpage, lsn);
MarkBufferDirty(rbuf);
/* don't release the buffer yet; we touch right page's first item below */
/* Now reconstruct left (original) sibling page */
if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO)
{
/*
* To retain the same physical order of the tuples that they had, we
* initialize a temporary empty page for the left page and add all the
* items to that in item number order. This mirrors how _bt_split()
* works. It's not strictly required to retain the same physical
* order, as long as the items are in the correct item number order,
* but it helps debugging. See also _bt_restore_page(), which does
* the same for the right page.
*/
Page lpage = (Page) BufferGetPage(lbuf);
BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
OffsetNumber off;
Item newitem = NULL;
Size newitemsz = 0;
Page newlpage;
OffsetNumber leftoff;
datapos = XLogRecGetBlockData(record, 0, &datalen);
if (onleft)
{
newitem = (Item) datapos;
newitemsz = MAXALIGN(IndexTupleSize(newitem));
datapos += newitemsz;
datalen -= newitemsz;
}
/* Extract left hikey and its size (assuming 16-bit alignment) */
if (!isleaf)
{
left_hikey = (Item) datapos;
left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
datapos += left_hikeysz;
//.........这里部分代码省略.........
Datum
bt_page_items(PG_FUNCTION_ARGS)
{
text *relname = PG_GETARG_TEXT_P(0);
uint32 blkno = PG_GETARG_UINT32(1);
Datum result;
char *values[6];
HeapTuple tuple;
FuncCallContext *fctx;
MemoryContext mctx;
struct user_args *uargs;
if (!superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser to use pageinspect functions"))));
if (SRF_IS_FIRSTCALL())
{
RangeVar *relrv;
Relation rel;
Buffer buffer;
BTPageOpaque opaque;
TupleDesc tupleDesc;
fctx = SRF_FIRSTCALL_INIT();
relrv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
rel = relation_openrv(relrv, AccessShareLock);
if (!IS_INDEX(rel) || !IS_BTREE(rel))
elog(ERROR, "relation \"%s\" is not a btree index",
RelationGetRelationName(rel));
/*
* Reject attempts to read non-local temporary relations; we would be
* likely to get wrong data since we have no visibility into the
* owning session's local buffers.
*/
if (RELATION_IS_OTHER_TEMP(rel))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot access temporary tables of other sessions")));
if (blkno == 0)
elog(ERROR, "block 0 is a meta page");
CHECK_RELATION_BLOCK_RANGE(rel, blkno);
buffer = ReadBuffer(rel, blkno);
LockBuffer(buffer, BUFFER_LOCK_SHARE);
/*
* We copy the page into local storage to avoid holding pin on the
* buffer longer than we must, and possibly failing to release it at
* all if the calling query doesn't fetch all rows.
*/
mctx = MemoryContextSwitchTo(fctx->multi_call_memory_ctx);
uargs = palloc(sizeof(struct user_args));
uargs->page = palloc(BLCKSZ);
memcpy(uargs->page, BufferGetPage(buffer), BLCKSZ);
UnlockReleaseBuffer(buffer);
relation_close(rel, AccessShareLock);
uargs->offset = FirstOffsetNumber;
opaque = (BTPageOpaque) PageGetSpecialPointer(uargs->page);
if (P_ISDELETED(opaque))
elog(NOTICE, "page is deleted");
fctx->max_calls = PageGetMaxOffsetNumber(uargs->page);
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
fctx->attinmeta = TupleDescGetAttInMetadata(tupleDesc);
fctx->user_fctx = uargs;
MemoryContextSwitchTo(mctx);
}
fctx = SRF_PERCALL_SETUP();
uargs = fctx->user_fctx;
if (fctx->call_cntr < fctx->max_calls)
{
ItemId id;
IndexTuple itup;
int j;
int off;
int dlen;
char *dump;
char *ptr;
//.........这里部分代码省略.........
/*
* _hash_metapinit() -- Initialize the metadata page of a hash index,
* the two buckets that we begin with and the initial
* bitmap page.
*
* We are fairly cavalier about locking here, since we know that no one else
* could be accessing this index. In particular the rule about not holding
* multiple buffer locks is ignored.
*/
void
_hash_metapinit(Relation rel)
{
HashMetaPage metap;
HashPageOpaque pageopaque;
Buffer metabuf;
Buffer buf;
Page pg;
int32 data_width;
int32 item_width;
int32 ffactor;
uint16 i;
/* safety check */
if (RelationGetNumberOfBlocks(rel) != 0)
elog(ERROR, "cannot initialize non-empty hash index \"%s\"",
RelationGetRelationName(rel));
/*
* Determine the target fill factor (tuples per bucket) for this index.
* The idea is to make the fill factor correspond to pages about 3/4ths
* full. We can compute it exactly if the index datatype is fixed-width,
* but for var-width there's some guessing involved.
*/
data_width = get_typavgwidth(RelationGetDescr(rel)->attrs[0]->atttypid,
RelationGetDescr(rel)->attrs[0]->atttypmod);
item_width = MAXALIGN(sizeof(HashItemData)) + MAXALIGN(data_width) +
sizeof(ItemIdData); /* include the line pointer */
ffactor = (BLCKSZ * 3 / 4) / item_width;
/* keep to a sane range */
if (ffactor < 10)
ffactor = 10;
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
pg = BufferGetPage(metabuf);
_hash_pageinit(pg, BufferGetPageSize(metabuf));
pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
pageopaque->hasho_prevblkno = InvalidBlockNumber;
pageopaque->hasho_nextblkno = InvalidBlockNumber;
pageopaque->hasho_bucket = -1;
pageopaque->hasho_flag = LH_META_PAGE;
pageopaque->hasho_filler = HASHO_FILL;
metap = (HashMetaPage) pg;
metap->hashm_magic = HASH_MAGIC;
metap->hashm_version = HASH_VERSION;
metap->hashm_ntuples = 0;
metap->hashm_nmaps = 0;
metap->hashm_ffactor = ffactor;
metap->hashm_bsize = BufferGetPageSize(metabuf);
/* find largest bitmap array size that will fit in page size */
for (i = _hash_log2(metap->hashm_bsize); i > 0; --i)
{
if ((1 << i) <= (metap->hashm_bsize -
(MAXALIGN(sizeof(PageHeaderData)) +
MAXALIGN(sizeof(HashPageOpaqueData)))))
break;
}
Assert(i > 0);
metap->hashm_bmsize = 1 << i;
metap->hashm_bmshift = i + BYTE_TO_BIT;
Assert((1 << BMPG_SHIFT(metap)) == (BMPG_MASK(metap) + 1));
metap->hashm_procid = index_getprocid(rel, 1, HASHPROC);
/*
* We initialize the index with two buckets, 0 and 1, occupying physical
* blocks 1 and 2. The first freespace bitmap page is in block 3.
*/
metap->hashm_maxbucket = metap->hashm_lowmask = 1; /* nbuckets - 1 */
metap->hashm_highmask = 3; /* (nbuckets << 1) - 1 */
MemSet((char *) metap->hashm_spares, 0, sizeof(metap->hashm_spares));
MemSet((char *) metap->hashm_mapp, 0, sizeof(metap->hashm_mapp));
metap->hashm_spares[1] = 1; /* the first bitmap page is only spare */
metap->hashm_ovflpoint = 1;
metap->hashm_firstfree = 0;
/*
* Initialize the first two buckets
*/
for (i = 0; i <= 1; i++)
{
buf = _hash_getbuf(rel, BUCKET_TO_BLKNO(metap, i), HASH_WRITE);
pg = BufferGetPage(buf);
_hash_pageinit(pg, BufferGetPageSize(buf));
pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
pageopaque->hasho_prevblkno = InvalidBlockNumber;
//.........这里部分代码省略.........
/*
* _hash_splitbucket -- split 'obucket' into 'obucket' and 'nbucket'
*
* We are splitting a bucket that consists of a base bucket page and zero
* or more overflow (bucket chain) pages. We must relocate tuples that
* belong in the new bucket, and compress out any free space in the old
* bucket.
*
* The caller must hold exclusive locks on both buckets to ensure that
* no one else is trying to access them (see README).
*
* The caller must hold a pin, but no lock, on the metapage buffer.
* The buffer is returned in the same state. (The metapage is only
* touched if it becomes necessary to add or remove overflow pages.)
*/
static void
_hash_splitbucket(Relation rel,
Buffer metabuf,
Bucket obucket,
Bucket nbucket,
BlockNumber start_oblkno,
BlockNumber start_nblkno,
uint32 maxbucket,
uint32 highmask,
uint32 lowmask)
{
Bucket bucket;
Buffer obuf;
Buffer nbuf;
BlockNumber oblkno;
BlockNumber nblkno;
bool null;
Datum datum;
HashItem hitem;
HashPageOpaque oopaque;
HashPageOpaque nopaque;
IndexTuple itup;
Size itemsz;
OffsetNumber ooffnum;
OffsetNumber noffnum;
OffsetNumber omaxoffnum;
Page opage;
Page npage;
TupleDesc itupdesc = RelationGetDescr(rel);
/*
* It should be okay to simultaneously write-lock pages from each
* bucket, since no one else can be trying to acquire buffer lock
* on pages of either bucket.
*/
oblkno = start_oblkno;
nblkno = start_nblkno;
obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
nbuf = _hash_getbuf(rel, nblkno, HASH_WRITE);
opage = BufferGetPage(obuf);
npage = BufferGetPage(nbuf);
_hash_checkpage(rel, opage, LH_BUCKET_PAGE);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
/* initialize the new bucket's primary page */
_hash_pageinit(npage, BufferGetPageSize(nbuf));
nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
nopaque->hasho_prevblkno = InvalidBlockNumber;
nopaque->hasho_nextblkno = InvalidBlockNumber;
nopaque->hasho_bucket = nbucket;
nopaque->hasho_flag = LH_BUCKET_PAGE;
nopaque->hasho_filler = HASHO_FILL;
/*
* Partition the tuples in the old bucket between the old bucket and the
* new bucket, advancing along the old bucket's overflow bucket chain
* and adding overflow pages to the new bucket as needed.
*/
ooffnum = FirstOffsetNumber;
omaxoffnum = PageGetMaxOffsetNumber(opage);
for (;;)
{
/*
* at each iteration through this loop, each of these variables
* should be up-to-date: obuf opage oopaque ooffnum omaxoffnum
*/
/* check if we're at the end of the page */
if (ooffnum > omaxoffnum)
{
/* at end of page, but check for an(other) overflow page */
oblkno = oopaque->hasho_nextblkno;
if (!BlockNumberIsValid(oblkno))
break;
/*
* we ran out of tuples on this particular page, but we
* have more overflow pages; advance to next page.
*/
_hash_wrtbuf(rel, obuf);
obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
opage = BufferGetPage(obuf);
_hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
//.........这里部分代码省略.........
/*
* rtdosplit -- split a page in the tree.
*
* rtpicksplit does the interesting work of choosing the split.
* This routine just does the bit-pushing.
*/
static void
rtdosplit(Relation r,
Buffer buffer,
RTSTACK *stack,
IndexTuple itup,
RTSTATE *rtstate)
{
Page p;
Buffer leftbuf,
rightbuf;
Page left,
right;
ItemId itemid;
IndexTuple item;
IndexTuple ltup,
rtup;
OffsetNumber maxoff;
OffsetNumber i;
OffsetNumber leftoff,
rightoff;
BlockNumber lbknum,
rbknum;
BlockNumber bufblock;
RTreePageOpaque opaque;
bool *isnull;
SPLITVEC v;
OffsetNumber *spl_left,
*spl_right;
TupleDesc tupDesc;
int n;
OffsetNumber newitemoff;
p = (Page) BufferGetPage(buffer);
opaque = (RTreePageOpaque) PageGetSpecialPointer(p);
rtpicksplit(r, p, &v, itup, rtstate);
/*
* The root of the tree is the first block in the relation. If we're
* about to split the root, we need to do some hocus-pocus to enforce this
* guarantee.
*/
if (BufferGetBlockNumber(buffer) == P_ROOT)
{
leftbuf = ReadBuffer(r, P_NEW);
RTInitBuffer(leftbuf, opaque->flags);
lbknum = BufferGetBlockNumber(leftbuf);
left = (Page) BufferGetPage(leftbuf);
}
else
{
leftbuf = buffer;
IncrBufferRefCount(buffer);
lbknum = BufferGetBlockNumber(buffer);
left = (Page) PageGetTempPage(p, sizeof(RTreePageOpaqueData));
}
rightbuf = ReadBuffer(r, P_NEW);
RTInitBuffer(rightbuf, opaque->flags);
rbknum = BufferGetBlockNumber(rightbuf);
right = (Page) BufferGetPage(rightbuf);
spl_left = v.spl_left;
spl_right = v.spl_right;
leftoff = rightoff = FirstOffsetNumber;
maxoff = PageGetMaxOffsetNumber(p);
newitemoff = OffsetNumberNext(maxoff);
/*
* spl_left contains a list of the offset numbers of the tuples that will
* go to the left page. For each offset number, get the tuple item, then
* add the item to the left page. Similarly for the right side.
*/
/* fill left node */
for (n = 0; n < v.spl_nleft; n++)
{
i = *spl_left;
if (i == newitemoff)
item = itup;
else
{
itemid = PageGetItemId(p, i);
item = (IndexTuple) PageGetItem(p, itemid);
}
if (PageAddItem(left, (Item) item, IndexTupleSize(item),
leftoff, LP_USED) == InvalidOffsetNumber)
elog(ERROR, "failed to add index item to \"%s\"",
RelationGetRelationName(r));
leftoff = OffsetNumberNext(leftoff);
spl_left++; /* advance in left split vector */
//.........这里部分代码省略.........
/*
* replay delete operation of hash index
*/
static void
hash_xlog_delete(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record);
Buffer bucketbuf = InvalidBuffer;
Buffer deletebuf;
Page page;
XLogRedoAction action;
/*
* Ensure we have a cleanup lock on primary bucket page before we start
* with the actual replay operation. This is to ensure that neither a
* scan can start nor a scan can be already-in-progress during the replay
* of this operation. If we allow scans during this operation, then they
* can miss some records or show the same record multiple times.
*/
if (xldata->is_primary_bucket_page)
action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &deletebuf);
else
{
/*
* we don't care for return value as the purpose of reading bucketbuf
* is to ensure a cleanup lock on primary bucket page.
*/
(void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
action = XLogReadBufferForRedo(record, 1, &deletebuf);
}
/* replay the record for deleting entries in bucket page */
if (action == BLK_NEEDS_REDO)
{
char *ptr;
Size len;
ptr = XLogRecGetBlockData(record, 1, &len);
page = (Page) BufferGetPage(deletebuf);
if (len > 0)
{
OffsetNumber *unused;
OffsetNumber *unend;
unused = (OffsetNumber *) ptr;
unend = (OffsetNumber *) ((char *) ptr + len);
if ((unend - unused) > 0)
PageIndexMultiDelete(page, unused, unend - unused);
}
/*
* Mark the page as not containing any LP_DEAD items only if
* clear_dead_marking flag is set to true. See comments in
* hashbucketcleanup() for details.
*/
if (xldata->clear_dead_marking)
{
HashPageOpaque pageopaque;
pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
}
PageSetLSN(page, lsn);
MarkBufferDirty(deletebuf);
}
if (BufferIsValid(deletebuf))
UnlockReleaseBuffer(deletebuf);
if (BufferIsValid(bucketbuf))
UnlockReleaseBuffer(bucketbuf);
}
/*
* replay squeeze page operation of hash index
*/
static void
hash_xlog_squeeze_page(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record);
Buffer bucketbuf = InvalidBuffer;
Buffer writebuf;
Buffer ovflbuf;
Buffer prevbuf = InvalidBuffer;
Buffer mapbuf;
XLogRedoAction action;
/*
* Ensure we have a cleanup lock on primary bucket page before we start
* with the actual replay operation. This is to ensure that neither a
* scan can start nor a scan can be already-in-progress during the replay
* of this operation. If we allow scans during this operation, then they
* can miss some records or show the same record multiple times.
*/
if (xldata->is_prim_bucket_same_wrt)
action = XLogReadBufferForRedoExtended(record, 1, RBM_NORMAL, true, &writebuf);
else
{
/*
* we don't care for return value as the purpose of reading bucketbuf
* is to ensure a cleanup lock on primary bucket page.
*/
(void) XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &bucketbuf);
action = XLogReadBufferForRedo(record, 1, &writebuf);
}
/* replay the record for adding entries in overflow buffer */
if (action == BLK_NEEDS_REDO)
{
Page writepage;
char *begin;
char *data;
Size datalen;
uint16 ninserted = 0;
data = begin = XLogRecGetBlockData(record, 1, &datalen);
writepage = (Page) BufferGetPage(writebuf);
if (xldata->ntups > 0)
{
OffsetNumber *towrite = (OffsetNumber *) data;
data += sizeof(OffsetNumber) * xldata->ntups;
while (data - begin < datalen)
{
IndexTuple itup = (IndexTuple) data;
Size itemsz;
OffsetNumber l;
itemsz = IndexTupleDSize(*itup);
itemsz = MAXALIGN(itemsz);
data += itemsz;
l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
if (l == InvalidOffsetNumber)
elog(ERROR, "hash_xlog_squeeze_page: failed to add item to hash index page, size %d bytes",
(int) itemsz);
ninserted++;
}
}
/*
* number of tuples inserted must be same as requested in REDO record.
*/
Assert(ninserted == xldata->ntups);
/*
* if the page on which are adding tuples is a page previous to freed
* overflow page, then update its nextblno.
*/
if (xldata->is_prev_bucket_same_wrt)
{
HashPageOpaque writeopaque = (HashPageOpaque) PageGetSpecialPointer(writepage);
writeopaque->hasho_nextblkno = xldata->nextblkno;
}
PageSetLSN(writepage, lsn);
MarkBufferDirty(writebuf);
}
/* replay the record for initializing overflow buffer */
if (XLogReadBufferForRedo(record, 2, &ovflbuf) == BLK_NEEDS_REDO)
{
Page ovflpage;
ovflpage = BufferGetPage(ovflbuf);
//.........这里部分代码省略.........
/*
* replay allocation of page for split operation
*/
static void
hash_xlog_split_allocate_page(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_hash_split_allocate_page *xlrec = (xl_hash_split_allocate_page *) XLogRecGetData(record);
Buffer oldbuf;
Buffer newbuf;
Buffer metabuf;
Size datalen PG_USED_FOR_ASSERTS_ONLY;
char *data;
XLogRedoAction action;
/*
* To be consistent with normal operation, here we take cleanup locks on
* both the old and new buckets even though there can't be any concurrent
* inserts.
*/
/* replay the record for old bucket */
action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &oldbuf);
/*
* Note that we still update the page even if it was restored from a full
* page image, because the special space is not included in the image.
*/
if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
{
Page oldpage;
HashPageOpaque oldopaque;
oldpage = BufferGetPage(oldbuf);
oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
oldopaque->hasho_flag = xlrec->old_bucket_flag;
oldopaque->hasho_prevblkno = xlrec->new_bucket;
PageSetLSN(oldpage, lsn);
MarkBufferDirty(oldbuf);
}
/* replay the record for new bucket */
newbuf = XLogInitBufferForRedo(record, 1);
_hash_initbuf(newbuf, xlrec->new_bucket, xlrec->new_bucket,
xlrec->new_bucket_flag, true);
if (!IsBufferCleanupOK(newbuf))
elog(PANIC, "hash_xlog_split_allocate_page: failed to acquire cleanup lock");
MarkBufferDirty(newbuf);
PageSetLSN(BufferGetPage(newbuf), lsn);
/*
* We can release the lock on old bucket early as well but doing here to
* consistent with normal operation.
*/
if (BufferIsValid(oldbuf))
UnlockReleaseBuffer(oldbuf);
if (BufferIsValid(newbuf))
UnlockReleaseBuffer(newbuf);
/*
* Note: in normal operation, we'd update the meta page while still
* holding lock on the old and new bucket pages. But during replay it's
* not necessary to hold those locks, since no other bucket splits can be
* happening concurrently.
*/
/* replay the record for metapage changes */
if (XLogReadBufferForRedo(record, 2, &metabuf) == BLK_NEEDS_REDO)
{
Page page;
HashMetaPage metap;
page = BufferGetPage(metabuf);
metap = HashPageGetMeta(page);
metap->hashm_maxbucket = xlrec->new_bucket;
data = XLogRecGetBlockData(record, 2, &datalen);
if (xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS)
{
uint32 lowmask;
uint32 *highmask;
/* extract low and high masks. */
memcpy(&lowmask, data, sizeof(uint32));
highmask = (uint32 *) ((char *) data + sizeof(uint32));
/* update metapage */
metap->hashm_lowmask = lowmask;
metap->hashm_highmask = *highmask;
data += sizeof(uint32) * 2;
}
if (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT)
{
uint32 ovflpoint;
uint32 *ovflpages;
//.........这里部分代码省略.........
/*
* _hash_step() -- step to the next valid item in a scan in the bucket.
*
* If no valid record exists in the requested direction, return
* false. Else, return true and set the hashso_curpos for the
* scan to the right thing.
*
* 'bufP' points to the current buffer, which is pinned and read-locked.
* On success exit, we have pin and read-lock on whichever page
* contains the right item; on failure, we have released all buffers.
*/
bool
_hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
{
Relation rel = scan->indexRelation;
HashScanOpaque so = (HashScanOpaque) scan->opaque;
ItemPointer current;
Buffer buf;
Page page;
HashPageOpaque opaque;
OffsetNumber maxoff;
OffsetNumber offnum;
BlockNumber blkno;
IndexTuple itup;
current = &(so->hashso_curpos);
buf = *bufP;
_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
page = BufferGetPage(buf);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
/*
* If _hash_step is called from _hash_first, current will not be valid, so
* we can't dereference it. However, in that case, we presumably want to
* start at the beginning/end of the page...
*/
maxoff = PageGetMaxOffsetNumber(page);
if (ItemPointerIsValid(current))
offnum = ItemPointerGetOffsetNumber(current);
else
offnum = InvalidOffsetNumber;
/*
* 'offnum' now points to the last tuple we examined (if any).
*
* continue to step through tuples until: 1) we get to the end of the
* bucket chain or 2) we find a valid tuple.
*/
do
{
switch (dir)
{
case ForwardScanDirection:
if (offnum != InvalidOffsetNumber)
offnum = OffsetNumberNext(offnum); /* move forward */
else
{
/* new page, locate starting position by binary search */
offnum = _hash_binsearch(page, so->hashso_sk_hash);
}
for (;;)
{
/*
* check if we're still in the range of items with the
* target hash key
*/
if (offnum <= maxoff)
{
Assert(offnum >= FirstOffsetNumber);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
if (so->hashso_sk_hash == _hash_get_indextuple_hashkey(itup))
break; /* yes, so exit for-loop */
}
/*
* ran off the end of this page, try the next
*/
_hash_readnext(rel, &buf, &page, &opaque);
if (BufferIsValid(buf))
{
maxoff = PageGetMaxOffsetNumber(page);
offnum = _hash_binsearch(page, so->hashso_sk_hash);
}
else
{
/* end of bucket */
itup = NULL;
break; /* exit for-loop */
}
}
break;
case BackwardScanDirection:
if (offnum != InvalidOffsetNumber)
offnum = OffsetNumberPrev(offnum); /* move back */
else
{
/* new page, locate starting position by binary search */
//.........这里部分代码省略.........
/*
* btvacuumpage --- VACUUM one page
*
* This processes a single page for btvacuumscan(). In some cases we
* must go back and re-examine previously-scanned pages; this routine
* recurses when necessary to handle that case.
*
* blkno is the page to process. orig_blkno is the highest block number
* reached by the outer btvacuumscan loop (the same as blkno, unless we
* are recursing to re-examine a previous page).
*/
static void
btvacuumpage(BTVacState *vstate, BlockNumber blkno, BlockNumber orig_blkno)
{
IndexVacuumInfo *info = vstate->info;
IndexBulkDeleteResult *stats = vstate->stats;
IndexBulkDeleteCallback callback = vstate->callback;
void *callback_state = vstate->callback_state;
Relation rel = info->index;
bool delete_now;
BlockNumber recurse_to;
Buffer buf;
Page page;
BTPageOpaque opaque;
restart:
delete_now = false;
recurse_to = P_NONE;
/* call vacuum_delay_point while not holding any buffer lock */
vacuum_delay_point();
/*
* We can't use _bt_getbuf() here because it always applies
* _bt_checkpage(), which will barf on an all-zero page. We want to
* recycle all-zero pages, not fail. Also, we want to use a nondefault
* buffer access strategy.
*/
buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
info->strategy);
LockBuffer(buf, BT_READ);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!PageIsNew(page))
_bt_checkpage(rel, buf);
/*
* If we are recursing, the only case we want to do anything with is a
* live leaf page having the current vacuum cycle ID. Any other state
* implies we already saw the page (eg, deleted it as being empty).
*/
if (blkno != orig_blkno)
{
if (_bt_page_recyclable(page) ||
P_IGNORE(opaque) ||
!P_ISLEAF(opaque) ||
opaque->btpo_cycleid != vstate->cycleid)
{
_bt_relbuf(rel, buf);
return;
}
}
/* If the page is in use, update lastUsedPage */
if (!_bt_page_recyclable(page) && vstate->lastUsedPage < blkno)
vstate->lastUsedPage = blkno;
/* Page is valid, see what to do with it */
if (_bt_page_recyclable(page))
{
/* Okay to recycle this page */
RecordFreeIndexPage(rel, blkno);
vstate->totFreePages++;
stats->pages_deleted++;
}
else if (P_ISDELETED(opaque))
{
/* Already deleted, but can't recycle yet */
stats->pages_deleted++;
}
else if (P_ISHALFDEAD(opaque))
{
/* Half-dead, try to delete */
delete_now = true;
}
else if (P_ISLEAF(opaque))
{
OffsetNumber deletable[MaxOffsetNumber];
int ndeletable;
OffsetNumber offnum,
minoff,
maxoff;
/*
* Trade in the initial read lock for a super-exclusive write lock on
* this page. We must get such a lock on every leaf page over the
* course of the vacuum scan, whether or not it actually contains any
* deletable tuples --- see nbtree/README.
*/
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
//.........这里部分代码省略.........
请发表评论