Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions src/am/build.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <utils/backend_progress.h>
#include <utils/builtins.h>
#include <utils/lsyscache.h>
#include <utils/regproc.h>
#include <utils/snapmgr.h>

#include "am.h"
Expand All @@ -38,6 +39,25 @@
/* Progress reporting interval (tuples) */
#define TP_PROGRESS_REPORT_INTERVAL 1000

/*
* Build phase name for progress reporting
*/
char *
tp_buildphasename(int64 phase)
{
switch (phase)
{
case PROGRESS_CREATEIDX_SUBPHASE_INITIALIZE:
return "initializing";
case TP_PHASE_LOADING:
return "loading tuples";
case TP_PHASE_WRITING:
return "writing index";
default:
return NULL;
}
}

/*
* Auto-spill memtable to disk segment when posting count threshold exceeded.
* This is called after each document insert to check if spill is needed.
Expand Down Expand Up @@ -126,6 +146,113 @@ tp_auto_spill_if_needed(TpLocalIndexState *index_state, Relation index_rel)
}
}

/*
* tp_spill_memtable - Force memtable flush to disk segment
*
* This function allows manual triggering of segment writes.
* Returns the block number of the written segment, or NULL if memtable was
* empty.
*/
PG_FUNCTION_INFO_V1(tp_spill_memtable);

Datum
tp_spill_memtable(PG_FUNCTION_ARGS)
{
text *index_name_text = PG_GETARG_TEXT_PP(0);
char *index_name = text_to_cstring(index_name_text);
Oid index_oid;
Relation index_rel;
TpLocalIndexState *index_state;
BlockNumber segment_root;
RangeVar *rv;

/* Parse index name (supports schema.index notation) */
rv = makeRangeVarFromNameList(stringToQualifiedNameList(index_name, NULL));
index_oid = RangeVarGetRelid(rv, AccessShareLock, false);

if (!OidIsValid(index_oid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("index \"%s\" does not exist", index_name)));

/* Open the index */
index_rel = index_open(index_oid, RowExclusiveLock);

/* Get index state */
index_state = tp_get_local_index_state(RelationGetRelid(index_rel));
if (!index_state)
{
index_close(index_rel, RowExclusiveLock);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not get index state for \"%s\"", index_name)));
}

/* Acquire exclusive lock for write operation */
tp_acquire_index_lock(index_state, LW_EXCLUSIVE);

/* Write the segment */
segment_root = tp_write_segment(index_state, index_rel);

/* Clear the memtable after successful spilling */
if (segment_root != InvalidBlockNumber)
{
Buffer metabuf;
Page metapage;
TpIndexMetaPage metap;

tp_clear_memtable(index_state);

/* Link new segment as L0 chain head */
metabuf = ReadBuffer(index_rel, 0);
LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
metapage = BufferGetPage(metabuf);
metap = (TpIndexMetaPage)PageGetContents(metapage);

if (metap->level_heads[0] != InvalidBlockNumber)
{
/* Point new segment to old chain head */
Buffer seg_buf;
Page seg_page;
TpSegmentHeader *seg_header;

seg_buf = ReadBuffer(index_rel, segment_root);
LockBuffer(seg_buf, BUFFER_LOCK_EXCLUSIVE);
seg_page = BufferGetPage(seg_buf);
seg_header = (TpSegmentHeader *)((char *)seg_page +
SizeOfPageHeaderData);
seg_header->next_segment = metap->level_heads[0];
MarkBufferDirty(seg_buf);
UnlockReleaseBuffer(seg_buf);
}

metap->level_heads[0] = segment_root;
metap->level_counts[0]++;
MarkBufferDirty(metabuf);

UnlockReleaseBuffer(metabuf);

/* Check if L0 needs compaction */
tp_maybe_compact_level(index_rel, 0);
}

/* Release lock */
tp_release_index_lock(index_state);

/* Close the index */
index_close(index_rel, RowExclusiveLock);

/* Return block number or NULL */
if (segment_root != InvalidBlockNumber)
{
PG_RETURN_INT32(segment_root);
}
else
{
PG_RETURN_NULL();
}
}

/*
* Helper: Extract options from index relation
*/
Expand Down
130 changes: 0 additions & 130 deletions src/am/vacuum.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@
#include "state/metapage.h"
#include "state/state.h"

/* Tapir-specific build phases */
#define TP_PHASE_LOADING 2
#define TP_PHASE_WRITING 3

/*
* Bulk delete callback for vacuum
*/
Expand Down Expand Up @@ -119,129 +115,3 @@ tp_vacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)

return stats;
}

/*
* Build phase name for progress reporting
*/
char *
tp_buildphasename(int64 phase)
{
switch (phase)
{
case PROGRESS_CREATEIDX_SUBPHASE_INITIALIZE:
return "initializing";
case TP_PHASE_LOADING:
return "loading tuples";
case TP_PHASE_WRITING:
return "writing index";
default:
return NULL;
}
}

/*
* tp_spill_memtable - Force memtable flush to disk segment
*
* This function allows manual triggering of segment writes.
* Returns the block number of the written segment, or NULL if memtable was
* empty.
*/
PG_FUNCTION_INFO_V1(tp_spill_memtable);

Datum
tp_spill_memtable(PG_FUNCTION_ARGS)
{
text *index_name_text = PG_GETARG_TEXT_PP(0);
char *index_name = text_to_cstring(index_name_text);
Oid index_oid;
Relation index_rel;
TpLocalIndexState *index_state;
BlockNumber segment_root;
RangeVar *rv;

/* Parse index name (supports schema.index notation) */
rv = makeRangeVarFromNameList(stringToQualifiedNameList(index_name, NULL));
index_oid = RangeVarGetRelid(rv, AccessShareLock, false);

if (!OidIsValid(index_oid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("index \"%s\" does not exist", index_name)));

/* Open the index */
index_rel = index_open(index_oid, RowExclusiveLock);

/* Get index state */
index_state = tp_get_local_index_state(RelationGetRelid(index_rel));
if (!index_state)
{
index_close(index_rel, RowExclusiveLock);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not get index state for \"%s\"", index_name)));
}

/* Acquire exclusive lock for write operation */
tp_acquire_index_lock(index_state, LW_EXCLUSIVE);

/* Write the segment */
segment_root = tp_write_segment(index_state, index_rel);

/* Clear the memtable after successful spilling */
if (segment_root != InvalidBlockNumber)
{
Buffer metabuf;
Page metapage;
TpIndexMetaPage metap;

tp_clear_memtable(index_state);

/* Link new segment as L0 chain head */
metabuf = ReadBuffer(index_rel, 0);
LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
metapage = BufferGetPage(metabuf);
metap = (TpIndexMetaPage)PageGetContents(metapage);

if (metap->level_heads[0] != InvalidBlockNumber)
{
/* Point new segment to old chain head */
Buffer seg_buf;
Page seg_page;
TpSegmentHeader *seg_header;

seg_buf = ReadBuffer(index_rel, segment_root);
LockBuffer(seg_buf, BUFFER_LOCK_EXCLUSIVE);
seg_page = BufferGetPage(seg_buf);
seg_header = (TpSegmentHeader *)((char *)seg_page +
SizeOfPageHeaderData);
seg_header->next_segment = metap->level_heads[0];
MarkBufferDirty(seg_buf);
UnlockReleaseBuffer(seg_buf);
}

metap->level_heads[0] = segment_root;
metap->level_counts[0]++;
MarkBufferDirty(metabuf);

UnlockReleaseBuffer(metabuf);

/* Check if L0 needs compaction */
tp_maybe_compact_level(index_rel, 0);
}

/* Release lock */
tp_release_index_lock(index_state);

/* Close the index */
index_close(index_rel, RowExclusiveLock);

/* Return block number or NULL */
if (segment_root != InvalidBlockNumber)
{
PG_RETURN_INT32(segment_root);
}
else
{
PG_RETURN_NULL();
}
}
Loading