From d2e35d6fcbbbd96d32aa7062a2768359c1efbd19 Mon Sep 17 00:00:00 2001 From: liushengsong Date: Fri, 6 Mar 2026 09:51:20 +0800 Subject: [PATCH 1/3] feat: add pg_stat_resqueues view for resource queue statistics Adds a new PGSTAT_KIND_RESQUEUE statistics kind that tracks per-queue throughput and latency counters, exposed through the pg_stat_resqueues view and pg_stat_get_resqueue_stats() SQL function. New statistics tracked per resource queue: - queries_submitted / admitted / rejected / completed - elapsed_wait_secs / max_wait_secs - elapsed_exec_secs / max_exec_secs - total_cost / total_memory_kb - stat_reset_timestamp Implementation notes: - All stat updates use pgstat_get_entry_ref_locked() to write directly to shared memory, bypassing the pending/flush mechanism. This makes stats immediately visible without explicit flushes, and is safe when called from PG_CATCH blocks (pgstat_resqueue_rejected is invoked inside PG_CATCH in ResLockPortal/ResLockUtilityPortal). - Per-portal timing is tracked in a backend-local hash table keyed by portalid; entries are removed when the portal is admitted, rejected, or completed. Also fixes a pre-existing bug in resqueue.c: dclist_delete_from() already decrements waitProcs.count internally, but three call sites additionally did a manual count-- causing count to underflow to UINT32_MAX. The next dclist_push_tail() would then overflow count back to 0 and trip the overflow assertion in ResProcSleep, crashing the backend. New files: - src/backend/utils/activity/pgstat_resqueue.c - src/test/isolation2/sql/resqueue_stats.sql - src/test/isolation2/expected/resqueue_stats.out Co-Authored-By: Claude Sonnet 4.6 --- src/backend/catalog/system_views.sql | 19 + src/backend/postmaster/autovacuum.c | 7 +- src/backend/utils/activity/Makefile | 1 + src/backend/utils/activity/pgstat.c | 16 + src/backend/utils/activity/pgstat_resqueue.c | 388 ++++++++++++++++++ src/backend/utils/adt/pgstatfuncs.c | 78 ++++ src/backend/utils/resscheduler/resqueue.c | 9 +- src/backend/utils/resscheduler/resscheduler.c | 64 ++- src/include/catalog/pg_proc.dat | 8 + src/include/pgstat.h | 74 +++- src/include/utils/pgstat_internal.h | 31 ++ .../expected/resource_queue_deadlock.out | 8 +- .../expected/resource_queue_multi_portal.out | 2 +- .../isolation2/expected/resqueue_stats.out | 92 +++++ src/test/isolation2/init_file_isolation2 | 4 + .../sql/resource_queue_deadlock.sql | 2 + src/test/isolation2/sql/resqueue_stats.sql | 72 ++++ 17 files changed, 848 insertions(+), 27 deletions(-) create mode 100644 src/backend/utils/activity/pgstat_resqueue.c create mode 100644 src/test/isolation2/expected/resqueue_stats.out create mode 100644 src/test/isolation2/sql/resqueue_stats.sql diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 505c817bc07..d4f0474cdc0 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1224,6 +1224,25 @@ CREATE VIEW pg_resqueue_status AS queueholders int4) ON (s.queueid = q.oid); +-- Resource queue cumulative statistics view +CREATE VIEW pg_stat_resqueues AS + SELECT + q.oid AS queueid, + q.rsqname AS queuename, + s.queries_submitted, + s.queries_admitted, + s.queries_rejected, + s.queries_completed, + s.elapsed_wait_secs AS total_wait_time_secs, + s.max_wait_secs, + s.elapsed_exec_secs AS total_exec_time_secs, + s.max_exec_secs, + s.total_cost, + s.total_memory_kb, + s.stat_reset_timestamp + FROM pg_resqueue AS q, + pg_stat_get_resqueue_stats(q.oid) AS s; + -- External table views CREATE VIEW pg_max_external_files AS diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 7372a51a373..f517cb4d006 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -676,16 +676,17 @@ AutoVacLauncherMain(int argc, char *argv[]) * the database chosen is connectable, the launcher will never select it and the * worker will continue to signal for a new launcher. */ -#if 0 /* * Even when system is configured to use a different fetch consistency, * for autovac we always want fresh stats. */ SetConfigOption("stats_fetch_consistency", "none", PGC_SUSET, PGC_S_OVERRIDE); +#if 0 /* - * In emergency mode, just start a worker (unless shutdown was requested) - * and go away. + * In GPDB, we only want an autovacuum worker to start once we know + * there is a database to vacuum. Therefore, we never want emergency mode + * to start a worker immediately. */ if (!AutoVacuumingActive()) { diff --git a/src/backend/utils/activity/Makefile b/src/backend/utils/activity/Makefile index 7d7482dde02..d7d7d6c6b0d 100644 --- a/src/backend/utils/activity/Makefile +++ b/src/backend/utils/activity/Makefile @@ -25,6 +25,7 @@ OBJS = \ pgstat_io.o \ pgstat_relation.o \ pgstat_replslot.o \ + pgstat_resqueue.o \ pgstat_shmem.o \ pgstat_slru.o \ pgstat_subscription.o \ diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c index 20c3f3bbb40..ab7768154c2 100644 --- a/src/backend/utils/activity/pgstat.c +++ b/src/backend/utils/activity/pgstat.c @@ -337,6 +337,22 @@ static const PgStat_KindInfo pgstat_kind_infos[PGSTAT_NUM_KINDS] = { .reset_timestamp_cb = pgstat_subscription_reset_timestamp_cb, }, + [PGSTAT_KIND_RESQUEUE] = { + .name = "resqueue", + + .fixed_amount = false, + /* resource queues are cluster-wide objects, visible across databases */ + .accessed_across_databases = true, + + .shared_size = sizeof(PgStatShared_ResQueue), + .shared_data_off = offsetof(PgStatShared_ResQueue, stats), + .shared_data_len = sizeof(((PgStatShared_ResQueue *) 0)->stats), + .pending_size = sizeof(PgStat_ResQueueCounts), + + .flush_pending_cb = pgstat_resqueue_flush_cb, + .reset_timestamp_cb = pgstat_resqueue_reset_timestamp_cb, + }, + /* stats for fixed-numbered (mostly 1) objects */ diff --git a/src/backend/utils/activity/pgstat_resqueue.c b/src/backend/utils/activity/pgstat_resqueue.c new file mode 100644 index 00000000000..0ca7f3a94f6 --- /dev/null +++ b/src/backend/utils/activity/pgstat_resqueue.c @@ -0,0 +1,388 @@ +/* ------------------------------------------------------------------------- + * + * pgstat_resqueue.c + * Implementation of resource queue statistics. + * + * Each backend maintains a hash table (keyed by portalid) of + * PgStat_ResQueuePortalEntry structs for in-flight portals. When a portal + * finishes (admitted, rejected, or completed), its timing deltas are + * accumulated into per-queue PgStat_ResQueueCounts pending data, which is + * eventually flushed into the shared-memory PgStatShared_ResQueue entry by + * pgstat_report_stat(). + * + * Time is tracked at second granularity (via time()) to keep overhead low. + * + * Portions Copyright (c) 2006-2010, Greenplum inc. + * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates. + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/utils/activity/pgstat_resqueue.c + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include + +#include "pgstat.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "utils/pgstat_internal.h" + + +/* ---------- + * Backend-local hash table of in-flight portal entries. + * Keyed by portalid (uint32). + * ---------- + */ +static HTAB *pgStatResQueuePortalHash = NULL; + + +/* ---------- + * pgstat_resqueue_portal_hash_init + * + * Lazily initialise the backend-local portal tracking hash. + * ---------- + */ +static void +pgstat_resqueue_portal_hash_init(void) +{ + HASHCTL ctl; + + if (pgStatResQueuePortalHash != NULL) + return; + + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(uint32); + ctl.entrysize = sizeof(PgStat_ResQueuePortalEntry); + ctl.hcxt = TopMemoryContext; + + pgStatResQueuePortalHash = hash_create("ResQueue portal stats", + 16, + &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); +} + +/* ---------- + * pgstat_resqueue_wait_start + * + * Called just before ResLockAcquire() when a portal is about to enter the + * resource queue. Records the wait-start timestamp and resource parameters, + * and immediately writes the submission counter to shared stats. + * + * We write directly to shared stats (bypassing the pending mechanism) so + * that stats are immediately visible to other sessions without requiring an + * explicit flush. This also keeps the code safe regardless of the calling + * context. + * ---------- + */ +void +pgstat_resqueue_wait_start(uint32 portalid, Oid queueid, + Cost query_cost, int64 query_memory_kb) +{ + PgStat_ResQueuePortalEntry *entry; + bool found; + PgStat_EntryRef *entry_ref; + PgStatShared_ResQueue *shqent; + + /* Skip stat update if pgstat shared memory is already detached. */ + if (pgStatLocal.shared_hash == NULL) + return; + + pgstat_resqueue_portal_hash_init(); + + entry = (PgStat_ResQueuePortalEntry *) + hash_search(pgStatResQueuePortalHash, &portalid, HASH_ENTER, &found); + + /* If a stale entry exists (e.g. from a prior run), overwrite it. */ + entry->portalid = portalid; + entry->queueid = queueid; + entry->t_wait_start = time(NULL); + entry->t_exec_start = 0; + entry->query_cost = query_cost; + entry->query_memory_kb = query_memory_kb; + + /* Write submission counters directly to shared stats. */ + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_RESQUEUE, + InvalidOid, queueid, false); + if (entry_ref == NULL) + return; + + shqent = (PgStatShared_ResQueue *) entry_ref->shared_stats; + shqent->stats.queries_submitted++; + shqent->stats.total_cost += (PgStat_Counter) query_cost; + shqent->stats.total_memory_kb += query_memory_kb; + pgstat_unlock_entry(entry_ref); +} + +/* ---------- + * pgstat_resqueue_wait_end + * + * Called after ResLockAcquire() returns successfully (portal admitted). + * Records the exec-start timestamp and counts the admission directly in + * shared stats. + * ---------- + */ +void +pgstat_resqueue_wait_end(uint32 portalid) +{ + PgStat_ResQueuePortalEntry *entry; + bool found; + time_t now; + time_t wait_secs; + PgStat_EntryRef *entry_ref; + PgStatShared_ResQueue *shqent; + + if (pgStatResQueuePortalHash == NULL) + return; + + entry = (PgStat_ResQueuePortalEntry *) + hash_search(pgStatResQueuePortalHash, &portalid, HASH_FIND, &found); + + if (!found) + return; + + now = time(NULL); + entry->t_exec_start = now; + + wait_secs = (entry->t_wait_start > 0) ? (now - entry->t_wait_start) : 0; + if (wait_secs < 0) + wait_secs = 0; + + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_RESQUEUE, + InvalidOid, entry->queueid, false); + if (entry_ref == NULL) + return; + + shqent = (PgStatShared_ResQueue *) entry_ref->shared_stats; + shqent->stats.queries_admitted++; + shqent->stats.elapsed_wait_secs += (PgStat_Counter) wait_secs; + if ((PgStat_Counter) wait_secs > shqent->stats.max_wait_secs) + shqent->stats.max_wait_secs = (PgStat_Counter) wait_secs; + pgstat_unlock_entry(entry_ref); +} + +/* ---------- + * pgstat_resqueue_rejected + * + * Called when ResLockAcquire() raises an error (portal cancelled or error + * while waiting), or when the portal is bypassed (cost below threshold). + * Removes the portal entry without counting exec time. + * + * IMPORTANT: This function may be called from inside a PG_CATCH block. + * It must NOT call pgstat_prep_pending_entry(), which modifies the global + * pgStatPending dlist and allocates memory that may be unsafe to use during + * error recovery. Instead, we update shared stats directly via + * pgstat_get_entry_ref_locked(), which is PG_CATCH-safe because it only + * allocates from TopMemoryContext derivatives and uses LWLock operations. + * ---------- + */ +void +pgstat_resqueue_rejected(uint32 portalid) +{ + PgStat_ResQueuePortalEntry *entry; + bool found; + time_t now; + time_t wait_secs; + Oid queueid; + PgStat_EntryRef *entry_ref; + PgStatShared_ResQueue *shqent; + + if (pgStatResQueuePortalHash == NULL) + return; + + entry = (PgStat_ResQueuePortalEntry *) + hash_search(pgStatResQueuePortalHash, &portalid, HASH_FIND, &found); + + if (!found) + return; + + now = time(NULL); + wait_secs = (entry->t_wait_start > 0) ? (now - entry->t_wait_start) : 0; + if (wait_secs < 0) + wait_secs = 0; + + queueid = entry->queueid; + + /* Remove portal entry first — hash_search(HASH_REMOVE) is PG_CATCH-safe. */ + hash_search(pgStatResQueuePortalHash, &portalid, HASH_REMOVE, NULL); + + /* Skip stat update if pgstat shared memory is already detached. */ + if (pgStatLocal.shared_hash == NULL) + return; + + /* + * Update the shared stats entry directly, bypassing the pending + * mechanism. pgstat_get_entry_ref_locked allocates only from + * TopMemoryContext derivatives and takes an LWLock, both of which are + * safe during error recovery. + */ + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_RESQUEUE, + InvalidOid, queueid, false); + if (entry_ref == NULL) + return; + + shqent = (PgStatShared_ResQueue *) entry_ref->shared_stats; + shqent->stats.queries_rejected++; + shqent->stats.elapsed_wait_secs += (PgStat_Counter) wait_secs; + if ((PgStat_Counter) wait_secs > shqent->stats.max_wait_secs) + shqent->stats.max_wait_secs = (PgStat_Counter) wait_secs; + + pgstat_unlock_entry(entry_ref); +} + +/* ---------- + * pgstat_resqueue_exec_end + * + * Called from ResUnLockPortal() when a portal finishes execution (normal + * completion, error, or cancel after admission). Writes completion counters + * directly to shared stats. + * ---------- + */ +void +pgstat_resqueue_exec_end(uint32 portalid) +{ + PgStat_ResQueuePortalEntry *entry; + bool found; + time_t now; + time_t exec_secs; + PgStat_EntryRef *entry_ref; + PgStatShared_ResQueue *shqent; + + if (pgStatResQueuePortalHash == NULL) + return; + + /* + * pgstat_shutdown_hook (before_shmem_exit) runs before ProcKill + * (on_shmem_exit). If AtExitCleanup_ResPortals calls us after pgstat + * has detached from shared memory, skip the stat update but still clean + * up the local hash entry to avoid a memory leak. + */ + if (pgStatLocal.shared_hash == NULL) + { + hash_search(pgStatResQueuePortalHash, &portalid, HASH_REMOVE, NULL); + return; + } + + entry = (PgStat_ResQueuePortalEntry *) + hash_search(pgStatResQueuePortalHash, &portalid, HASH_FIND, &found); + + if (!found) + return; + + now = time(NULL); + exec_secs = (entry->t_exec_start > 0) ? (now - entry->t_exec_start) : 0; + if (exec_secs < 0) + exec_secs = 0; + + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_RESQUEUE, + InvalidOid, entry->queueid, false); + if (entry_ref != NULL) + { + shqent = (PgStatShared_ResQueue *) entry_ref->shared_stats; + shqent->stats.queries_completed++; + shqent->stats.elapsed_exec_secs += (PgStat_Counter) exec_secs; + if ((PgStat_Counter) exec_secs > shqent->stats.max_exec_secs) + shqent->stats.max_exec_secs = (PgStat_Counter) exec_secs; + pgstat_unlock_entry(entry_ref); + } + + hash_search(pgStatResQueuePortalHash, &portalid, HASH_REMOVE, NULL); +} + +/* ---------- + * pgstat_create_resqueue + * + * Called when a resource queue is created via DDL. Ensures a stats entry + * exists and is initialised. + * ---------- + */ +void +pgstat_create_resqueue(Oid queueid) +{ + pgstat_create_transactional(PGSTAT_KIND_RESQUEUE, InvalidOid, queueid); + pgstat_get_entry_ref(PGSTAT_KIND_RESQUEUE, InvalidOid, queueid, true, NULL); + pgstat_reset_entry(PGSTAT_KIND_RESQUEUE, InvalidOid, queueid, 0); +} + +/* ---------- + * pgstat_drop_resqueue + * + * Called when a resource queue is dropped via DDL. + * ---------- + */ +void +pgstat_drop_resqueue(Oid queueid) +{ + pgstat_drop_transactional(PGSTAT_KIND_RESQUEUE, InvalidOid, queueid); +} + +/* ---------- + * pgstat_fetch_stat_resqueue + * + * Return a palloc'd snapshot of statistics for the given resource queue OID, + * or NULL if no stats entry exists. + * ---------- + */ +PgStat_StatResQueueEntry * +pgstat_fetch_stat_resqueue(Oid queueid) +{ + return (PgStat_StatResQueueEntry *) + pgstat_fetch_entry(PGSTAT_KIND_RESQUEUE, InvalidOid, queueid); +} + +/* ---------- + * pgstat_resqueue_flush_cb + * + * Flush pending per-queue delta counters into shared memory. + * Called by pgstat_report_stat() for each entry with pending data. + * + * max_wait_secs and max_exec_secs are merged with MAX rather than addition. + * ---------- + */ +bool +pgstat_resqueue_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) +{ + PgStat_ResQueueCounts *localent; + PgStatShared_ResQueue *shqent; + + localent = (PgStat_ResQueueCounts *) entry_ref->pending; + shqent = (PgStatShared_ResQueue *) entry_ref->shared_stats; + + if (!pgstat_lock_entry(entry_ref, nowait)) + return false; + +#define RESQUEUE_ACC(fld) shqent->stats.fld += localent->fld + RESQUEUE_ACC(queries_submitted); + RESQUEUE_ACC(queries_admitted); + RESQUEUE_ACC(queries_rejected); + RESQUEUE_ACC(queries_completed); + RESQUEUE_ACC(elapsed_wait_secs); + RESQUEUE_ACC(elapsed_exec_secs); + RESQUEUE_ACC(total_cost); + RESQUEUE_ACC(total_memory_kb); +#undef RESQUEUE_ACC + + /* max fields: merge with MAX */ + if (localent->max_wait_secs > shqent->stats.max_wait_secs) + shqent->stats.max_wait_secs = localent->max_wait_secs; + if (localent->max_exec_secs > shqent->stats.max_exec_secs) + shqent->stats.max_exec_secs = localent->max_exec_secs; + + pgstat_unlock_entry(entry_ref); + + return true; +} + +/* ---------- + * pgstat_resqueue_reset_timestamp_cb + * + * Reset the stat_reset_timestamp in the shared entry. + * ---------- + */ +void +pgstat_resqueue_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts) +{ + ((PgStatShared_ResQueue *) header)->stats.stat_reset_timestamp = ts; +} diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index aeffe94a39e..61c69b3ca4e 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2116,6 +2116,84 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } +/* + * pg_stat_get_resqueue_stats + * + * Returns cumulative statistics for one resource queue as a composite row. + * Returns all-zeros if no stats entry exists for the given queue OID. + */ +Datum +pg_stat_get_resqueue_stats(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_RESQUEUE_STATS_COLS 13 + Oid queueid = PG_GETARG_OID(0); + TupleDesc tupdesc; + Datum values[PG_STAT_GET_RESQUEUE_STATS_COLS] = {0}; + bool nulls[PG_STAT_GET_RESQUEUE_STATS_COLS] = {0}; + PgStat_StatResQueueEntry *entry; + PgStat_StatResQueueEntry allzero; + + /* Fetch stats; fall back to all-zero if queue has no stats yet */ + entry = pgstat_fetch_stat_resqueue(queueid); + if (!entry) + { + memset(&allzero, 0, sizeof(PgStat_StatResQueueEntry)); + entry = &allzero; + } + + tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_RESQUEUE_STATS_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "queueid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "queries_submitted", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "queries_admitted", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "queries_rejected", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "queries_completed", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "elapsed_wait_secs", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "max_wait_secs", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "elapsed_exec_secs", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "max_exec_secs", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_cost", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "total_memory_kb", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stat_reset_timestamp", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "have_stats", + BOOLOID, -1, 0); + BlessTupleDesc(tupdesc); + + values[0] = ObjectIdGetDatum(queueid); + values[1] = Int64GetDatum(entry->queries_submitted); + values[2] = Int64GetDatum(entry->queries_admitted); + values[3] = Int64GetDatum(entry->queries_rejected); + values[4] = Int64GetDatum(entry->queries_completed); + values[5] = Int64GetDatum(entry->elapsed_wait_secs); + values[6] = Int64GetDatum(entry->max_wait_secs); + values[7] = Int64GetDatum(entry->elapsed_exec_secs); + values[8] = Int64GetDatum(entry->max_exec_secs); + values[9] = Int64GetDatum(entry->total_cost); + values[10] = Int64GetDatum(entry->total_memory_kb); + + if (entry->stat_reset_timestamp == 0) + nulls[11] = true; + else + values[11] = TimestampTzGetDatum(entry->stat_reset_timestamp); + + /* have_stats: true when an actual stats entry was found */ + values[12] = BoolGetDatum(pgstat_have_entry(PGSTAT_KIND_RESQUEUE, + InvalidOid, queueid)); + + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} + /* * Checks for presence of stats for object with provided kind, database oid, * object oid. diff --git a/src/backend/utils/resscheduler/resqueue.c b/src/backend/utils/resscheduler/resqueue.c index 3af903f089a..5cb7d9bffa1 100644 --- a/src/backend/utils/resscheduler/resqueue.c +++ b/src/backend/utils/resscheduler/resqueue.c @@ -1367,8 +1367,7 @@ ResProcLockRemoveSelfAndWakeup(LOCK *lock) nextproc = (PGPROC *) proc->links.next; - dclist_delete_from(waitQueue, &(proc->links)); - (proc->waitLock->waitProcs.count)--; + dclist_delete_from_thoroughly(waitQueue, &(proc->links)); proc = nextproc; @@ -1472,8 +1471,7 @@ ResProcWakeup(PGPROC *proc, int waitStatus) retProc = (PGPROC *) proc->links.next; /* Remove process from wait queue */ - dclist_delete_from(&proc->waitLock->waitProcs, &(proc->links)); - (proc->waitLock->waitProcs.count)--; + dclist_delete_from_thoroughly(&proc->waitLock->waitProcs, &(proc->links)); /* Clean up process' state and pass it the ok/fail signal */ proc->waitLock = NULL; @@ -1511,8 +1509,7 @@ ResRemoveFromWaitQueue(PGPROC *proc, uint32 hashcode) Assert(waitLock->waitProcs.count > 0); /* Remove proc from lock's wait queue */ - dclist_delete_from(&waitLock->waitProcs, &(proc->links)); - waitLock->waitProcs.count--; + dclist_delete_from_thoroughly(&waitLock->waitProcs, &(proc->links)); /* Undo increments of request counts by waiting process */ Assert(waitLock->nRequested > 0); diff --git a/src/backend/utils/resscheduler/resscheduler.c b/src/backend/utils/resscheduler/resscheduler.c index 375982b3015..5a09d5541eb 100644 --- a/src/backend/utils/resscheduler/resscheduler.c +++ b/src/backend/utils/resscheduler/resscheduler.c @@ -677,28 +677,35 @@ ResLockPortal(Portal portal, QueryDesc *qDesc) if (takeLock) { #ifdef RESLOCK_DEBUG - elog(DEBUG1, "acquire resource lock for queue %u (portal %u)", + elog(DEBUG1, "acquire resource lock for queue %u (portal %u)", queueid, portal->portalId); #endif SET_LOCKTAG_RESOURCE_QUEUE(tag, queueid); + /* Record wait-start and submission for pgstat tracking. */ + pgstat_resqueue_wait_start(portal->portalId, queueid, + incData.increments[RES_COST_LIMIT], + (int64) (incData.increments[RES_MEMORY_LIMIT] / 1024)); + PG_TRY(); { lockResult = ResLockAcquire(&tag, &incData); } PG_CATCH(); { - /* - * We might have been waiting for a resource queue lock when we get - * here. Calling ResLockRelease without calling ResLockWaitCancel will + /* + * We might have been waiting for a resource queue lock when we get + * here. Calling ResLockRelease without calling ResLockWaitCancel will * cause the locallock to be cleaned up, but will leave the global - * variable lockAwaited still pointing to the locallock hash + * variable lockAwaited still pointing to the locallock hash * entry. */ ResLockWaitCancel(); - - /* If we had acquired the resource queue lock, release it and clean up */ + /* Count this portal as rejected in pgstat. */ + pgstat_resqueue_rejected(portal->portalId); + + /* If we had acquired the resource queue lock, release it and clean up */ if (!ResLockRelease(&tag, portal->portalId)) { ereport(LOG, @@ -709,7 +716,7 @@ ResLockPortal(Portal portal, QueryDesc *qDesc) tag.locktag_field1, portal->portalId, portal->name, portal->sourceText))); } - + /* GPDB hook for collecting query info */ if (query_info_collect_hook) (*query_info_collect_hook)(METRICS_QUERY_ERROR, qDesc); @@ -721,17 +728,24 @@ ResLockPortal(Portal portal, QueryDesc *qDesc) } PG_END_TRY(); - /* + /* * See if query was too small to bother locking at all, i.e had * cost smaller than the ignore cost threshold for the queue. */ if (lockResult == LOCKACQUIRE_NOT_AVAIL) { #ifdef RESLOCK_DEBUG - elog(DEBUG1, "cancel resource lock for queue %u (portal %u)", + elog(DEBUG1, "cancel resource lock for queue %u (portal %u)", queueid, portal->portalId); #endif - /* + /* + * Query cost was below the ignore threshold; the portal won't + * consume a queue slot. Remove the pgstat portal entry we + * created above without counting it as admitted. + */ + pgstat_resqueue_rejected(portal->portalId); + + /* * Reset portalId and queueid for this portal so the queue * and increment accounting tests continue to work properly. */ @@ -739,6 +753,11 @@ ResLockPortal(Portal portal, QueryDesc *qDesc) portal->portalId = INVALID_PORTALID; shouldReleaseLock = false; } + else + { + /* Portal was admitted into the queue; record exec-start time. */ + pgstat_resqueue_wait_end(portal->portalId); + } /* Count holdable cursors (if we are locking this one) .*/ if (portal->cursorOptions & CURSOR_OPT_HOLD && shouldReleaseLock) @@ -789,6 +808,11 @@ ResLockUtilityPortal(Portal portal, float4 ignoreCostLimit) #endif SET_LOCKTAG_RESOURCE_QUEUE(tag, queueid); + /* Record wait-start for utility statement. */ + pgstat_resqueue_wait_start(portal->portalId, queueid, + (Cost) incData.increments[RES_COST_LIMIT], + 0 /* no memory tracking for utility stmts */); + PG_TRY(); { lockResult = ResLockAcquire(&tag, &incData); @@ -804,6 +828,9 @@ ResLockUtilityPortal(Portal portal, float4 ignoreCostLimit) */ ResLockWaitCancel(); + /* Count this portal as rejected in pgstat. */ + pgstat_resqueue_rejected(portal->portalId); + /* If we had acquired the resource queue lock, release it and clean up */ if (!ResLockRelease(&tag, portal->portalId)) { @@ -826,8 +853,13 @@ ResLockUtilityPortal(Portal portal, float4 ignoreCostLimit) PG_RE_THROW(); } PG_END_TRY(); + + if (lockResult != LOCKACQUIRE_NOT_AVAIL) + pgstat_resqueue_wait_end(portal->portalId); + else + pgstat_resqueue_rejected(portal->portalId); } - + portal->hasResQueueLock = shouldReleaseLock; } @@ -842,15 +874,19 @@ ResUnLockPortal(Portal portal) queueid = portal->queueId; - /* + /* * Check we have a valid queue before going any further. */ if (queueid != InvalidOid) { #ifdef RESLOCK_DEBUG - elog(DEBUG1, "release resource lock for queue %u (portal %u)", + elog(DEBUG1, "release resource lock for queue %u (portal %u)", queueid, portal->portalId); #endif + + /* Record execution completion in pgstat. */ + pgstat_resqueue_exec_end(portal->portalId); + SET_LOCKTAG_RESOURCE_QUEUE(tag, queueid); if (!ResLockRelease(&tag, portal->portalId)) diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 3b1f3a7e327..f788bd349e4 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -13005,3 +13005,11 @@ { oid => 8693, descr => 'deparse SCHEDULE clause for a given dynamic table', proname => 'pg_get_dynamic_table_schedule', provolatile => 's', prorettype => 'text', proargtypes => 'oid', prosrc => 'pg_get_dynamic_table_schedule' }, + +{ oid => '9200', descr => 'statistics: cumulative statistics for a resource queue', + proname => 'pg_stat_get_resqueue_stats', provolatile => 's', + proparallel => 'r', prorettype => 'record', proargtypes => 'oid', + proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,bool}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{queueid,queueid,queries_submitted,queries_admitted,queries_rejected,queries_completed,elapsed_wait_secs,max_wait_secs,elapsed_exec_secs,max_exec_secs,total_cost,total_memory_kb,stat_reset_timestamp,have_stats}', + prosrc => 'pg_stat_get_resqueue_stats' }, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 338f5ae9562..c4f3b88444a 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -45,6 +45,7 @@ typedef enum PgStat_Kind PGSTAT_KIND_FUNCTION, /* per-function statistics */ PGSTAT_KIND_REPLSLOT, /* per-slot statistics */ PGSTAT_KIND_SUBSCRIPTION, /* per-subscription statistics */ + PGSTAT_KIND_RESQUEUE, /* per-resource-queue statistics */ /* stats for fixed-numbered objects */ PGSTAT_KIND_ARCHIVER, @@ -237,7 +238,7 @@ typedef struct PgStat_TableXactStatus * ------------------------------------------------------------ */ -#define PGSTAT_FILE_FORMAT_ID 0x01A5BCAC +#define PGSTAT_FILE_FORMAT_ID 0x01A5BCAD typedef struct PgStat_ArchiverStats { @@ -457,6 +458,63 @@ typedef struct PgStat_PendingWalStats } PgStat_PendingWalStats; +/* ---------- + * PgStat_StatResQueueEntry + * + * Per-resource-queue cumulative statistics, stored in shared memory and + * persisted to disk. Exposed via the pg_stat_resqueues view. + * + * Time values are in seconds (matching time() granularity used during + * portal tracking). max_wait_secs and max_exec_secs are historical peaks. + * ---------- + */ +typedef struct PgStat_StatResQueueEntry +{ + /* throughput counters */ + PgStat_Counter queries_submitted; /* total queries entered the queue */ + PgStat_Counter queries_admitted; /* total queries admitted from queue */ + PgStat_Counter queries_rejected; /* queries cancelled/errored while waiting */ + PgStat_Counter queries_completed; /* queries finished execution */ + + /* wait time (seconds) */ + PgStat_Counter elapsed_wait_secs; /* cumulative wait seconds */ + PgStat_Counter max_wait_secs; /* peak single-query wait time */ + + /* exec time (seconds) */ + PgStat_Counter elapsed_exec_secs; /* cumulative exec seconds */ + PgStat_Counter max_exec_secs; /* peak single-query exec time */ + + /* resource usage */ + PgStat_Counter total_cost; /* cumulative planner cost estimate */ + PgStat_Counter total_memory_kb; /* cumulative memory granted (KB) */ + + TimestampTz stat_reset_timestamp; +} PgStat_StatResQueueEntry; + +/* ---------- + * PgStat_ResQueueCounts + * + * Pending (not-yet-flushed) per-resource-queue delta counters accumulated by + * a single backend. Flushed into PgStat_StatResQueueEntry in shared memory + * during pgstat_report_stat(). + * + * This struct must contain only delta counters so that memcmp against zeroes + * reliably detects whether there are pending updates. + * ---------- + */ +typedef struct PgStat_ResQueueCounts +{ + PgStat_Counter queries_submitted; + PgStat_Counter queries_admitted; + PgStat_Counter queries_rejected; + PgStat_Counter queries_completed; + PgStat_Counter elapsed_wait_secs; + PgStat_Counter elapsed_exec_secs; + PgStat_Counter max_wait_secs; /* max in this flush batch */ + PgStat_Counter max_exec_secs; /* max in this flush batch */ + PgStat_Counter total_cost; + PgStat_Counter total_memory_kb; +} PgStat_ResQueueCounts; /* @@ -702,6 +760,20 @@ extern void pgstat_drop_subscription(Oid subid); extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid); +/* + * Functions in pgstat_resqueue.c + */ + +extern void pgstat_resqueue_wait_start(uint32 portalid, Oid queueid, + Cost query_cost, int64 query_memory_kb); +extern void pgstat_resqueue_wait_end(uint32 portalid); +extern void pgstat_resqueue_rejected(uint32 portalid); +extern void pgstat_resqueue_exec_end(uint32 portalid); +extern void pgstat_create_resqueue(Oid queueid); +extern void pgstat_drop_resqueue(Oid queueid); +extern PgStat_StatResQueueEntry *pgstat_fetch_stat_resqueue(Oid queueid); + + /* * Functions in pgstat_xact.c */ diff --git a/src/include/utils/pgstat_internal.h b/src/include/utils/pgstat_internal.h index f886ab7f4bc..2b3da610a31 100644 --- a/src/include/utils/pgstat_internal.h +++ b/src/include/utils/pgstat_internal.h @@ -413,6 +413,37 @@ typedef struct PgStatShared_ReplSlot PgStat_StatReplSlotEntry stats; } PgStatShared_ReplSlot; +typedef struct PgStatShared_ResQueue +{ + PgStatShared_Common header; + PgStat_StatResQueueEntry stats; +} PgStatShared_ResQueue; + +/* ---------- + * PgStat_ResQueuePortalEntry + * + * Backend-local tracking entry for a single portal subject to resource queue + * scheduling. Records timestamps and resource info for a portal's lifetime, + * then rolls them into per-queue PgStat_ResQueueCounts when the portal exits. + * + * The collector never sees this struct. + * ---------- + */ +typedef struct PgStat_ResQueuePortalEntry +{ + uint32 portalid; /* hash key */ + Oid queueid; /* resource queue this portal belongs to */ + time_t t_wait_start; /* time portal started waiting in queue */ + time_t t_exec_start; /* time portal was admitted & began executing */ + Cost query_cost; /* planner cost estimate */ + int64 query_memory_kb; /* memory granted (KB) */ +} PgStat_ResQueuePortalEntry; + +/* Callbacks for pgstat_kind_infos registration (used in pgstat.c). */ +extern bool pgstat_resqueue_flush_cb(PgStat_EntryRef *entry_ref, bool nowait); +extern void pgstat_resqueue_reset_timestamp_cb(PgStatShared_Common *header, + TimestampTz ts); + /* * Central shared memory entry for the cumulative stats system. diff --git a/src/test/isolation2/expected/resource_queue_deadlock.out b/src/test/isolation2/expected/resource_queue_deadlock.out index 06309f34b6d..3c14d72ba4d 100644 --- a/src/test/isolation2/expected/resource_queue_deadlock.out +++ b/src/test/isolation2/expected/resource_queue_deadlock.out @@ -5,6 +5,8 @@ CREATE 0: CREATE role role_deadlock_test RESOURCE QUEUE rq_deadlock_test; CREATE +0: GRANT CREATE ON SCHEMA public TO role_deadlock_test; +GRANT 0: SET gp_autostats_lock_wait TO ON; SET @@ -50,8 +52,8 @@ ANALYZE (1 row) 2: SELECT * FROM t_deadlock_test; ERROR: deadlock detected -DETAIL: Process 1618 waits for ExclusiveLock on resource queue 16520; blocked by process 1606. -Process 1606 waits for ShareUpdateExclusiveLock on relation 16522 of database 16478; blocked by process 1618. +Process PID waits for ExclusiveLock on resource queue OID; blocked by process PID. +Process PID waits for ShareUpdateExclusiveLock on relation OID of database OID; blocked by process PID. HINT: See server log for query details. 2: ROLLBACK; ROLLBACK @@ -68,6 +70,8 @@ INSERT 1 -- Clean up the test 0: DROP TABLE t_deadlock_test; DROP +0: REVOKE CREATE ON SCHEMA public FROM role_deadlock_test; +REVOKE 0: DROP ROLE role_deadlock_test; DROP 0: DROP RESOURCE QUEUE rq_deadlock_test; diff --git a/src/test/isolation2/expected/resource_queue_multi_portal.out b/src/test/isolation2/expected/resource_queue_multi_portal.out index 5fe141ccc9c..36c87160376 100644 --- a/src/test/isolation2/expected/resource_queue_multi_portal.out +++ b/src/test/isolation2/expected/resource_queue_multi_portal.out @@ -135,7 +135,7 @@ DECLARE -- its transaction. 1:DECLARE c3 CURSOR FOR SELECT 1; ERROR: deadlock detected, locking against self -DETAIL: resource queue id: 585193, portal id: 3 +DETAIL: resource queue id: OID, portal id: NUM -- There should be 0 active statements following the transaction abort. 0:SELECT rsqcountlimit, rsqcountvalue FROM pg_resqueue_status WHERE rsqname = 'rq_multi_portal'; diff --git a/src/test/isolation2/expected/resqueue_stats.out b/src/test/isolation2/expected/resqueue_stats.out new file mode 100644 index 00000000000..9bf02f77060 --- /dev/null +++ b/src/test/isolation2/expected/resqueue_stats.out @@ -0,0 +1,92 @@ +-- Test pg_stat_resqueues cumulative statistics for resource queues. + +0:CREATE RESOURCE QUEUE rq_stats_test WITH (active_statements = 1); +CREATE +0:CREATE ROLE role_stats_test RESOURCE QUEUE rq_stats_test; +CREATE + +-- Session 1 holds the queue slot so session 2 will block. +1:SET role role_stats_test; +SET +1:BEGIN; +BEGIN +1:DECLARE c1 CURSOR FOR SELECT 1; +DECLARE + +-- Session 2 submits a query that will block. +2:SET role role_stats_test; +SET +2&:SELECT pg_sleep(0); + +-- Verify session 2 is waiting on the resource queue. +0:SELECT wait_event_type, wait_event FROM pg_stat_activity WHERE query = 'SELECT pg_sleep(0);'; + wait_event_type | wait_event +-----------------+--------------- + ResourceQueue | ResourceQueue +(1 row) + +-- Cancel the blocked query (increments queries_rejected). +0:SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE query = 'SELECT pg_sleep(0);' AND wait_event = 'ResourceQueue'; + pg_cancel_backend +------------------- + t +(1 row) + +2<: <... completed> +ERROR: canceling statement due to user request + +-- Release session 1's slot so later sessions can proceed. +1:CLOSE c1; +CLOSE +1:END; +END + +-- Session 3 runs a query that should be admitted and complete normally. +3:SET role role_stats_test; +SET +3:SELECT 1; + ?column? +---------- + 1 +(1 row) + +-- Session 4 runs another query that completes normally. +4:SET role role_stats_test; +SET +4:SELECT 2; + ?column? +---------- + 2 +(1 row) + +-- All resqueue stats are written directly to shared memory (no flush needed). +-- Check that the view shows the expected minimum counts. +-- queries_submitted >= 3: sessions 2 (rejected), 3, 4 +-- queries_admitted >= 2: sessions 3 and 4 +-- queries_completed >= 2: sessions 3 and 4 +0:SELECT queuename, queries_submitted >= 3 AS submitted_ok, queries_admitted >= 2 AS admitted_ok, queries_completed >= 2 AS completed_ok FROM pg_stat_resqueues WHERE queuename = 'rq_stats_test'; + queuename | submitted_ok | admitted_ok | completed_ok +---------------+--------------+-------------+-------------- + rq_stats_test | t | t | t +(1 row) + +-- All counter columns must be non-negative. +0:SELECT queries_submitted >= 0 AS sub_nn, queries_admitted >= 0 AS adm_nn, queries_rejected >= 0 AS rej_nn, queries_completed >= 0 AS cmp_nn, total_wait_time_secs >= 0 AS wait_nn, max_wait_secs >= 0 AS maxw_nn, total_exec_time_secs >= 0 AS exec_nn, max_exec_secs >= 0 AS maxe_nn, total_cost >= 0 AS cost_nn, total_memory_kb >= 0 AS mem_nn FROM pg_stat_resqueues WHERE queuename = 'rq_stats_test'; + sub_nn | adm_nn | rej_nn | cmp_nn | wait_nn | maxw_nn | exec_nn | maxe_nn | cost_nn | mem_nn +--------+--------+--------+--------+---------+---------+---------+---------+---------+-------- + t | t | t | t | t | t | t | t | t | t +(1 row) + +-- Verify pg_stat_get_resqueue_stats() returns data directly. +-- The function has OUT parameters so no column definition list is needed. +0:SELECT queries_submitted >= 0 AS ok FROM pg_stat_get_resqueue_stats( (SELECT oid FROM pg_resqueue WHERE rsqname = 'rq_stats_test') ) AS s; + ok +------ + t +(1 row) + +-- Cleanup. +0:DROP ROLE role_stats_test; +DROP +0:DROP RESOURCE QUEUE rq_stats_test; +DROP diff --git a/src/test/isolation2/init_file_isolation2 b/src/test/isolation2/init_file_isolation2 index 1c01246e203..c359d3b8254 100644 --- a/src/test/isolation2/init_file_isolation2 +++ b/src/test/isolation2/init_file_isolation2 @@ -46,6 +46,10 @@ s/^DETAIL: Process \d+ waits for ShareLock on transaction \d+; blocked by proce m/.*Process \d+ waits for ExclusiveLock on resource queue \d+; blocked by process \d+./ s/.*Process \d+ waits for ExclusiveLock on resource queue \d+; blocked by process \d+./Process PID waits for ExclusiveLock on resource queue OID; blocked by process PID./ +# For resource queue self-deadlock detail +m/resource queue id: \d+, portal id: \d+/ +s/resource queue id: \d+, portal id: \d+/resource queue id: OID, portal id: NUM/ + m/^Process \d+ waits for ShareLock on transaction \d+; blocked by process \d+./ s/^Process \d+ waits for ShareLock on transaction \d+; blocked by process \d+./Process PID waits for ShareLock on transaction XID; blocked by process PID./ diff --git a/src/test/isolation2/sql/resource_queue_deadlock.sql b/src/test/isolation2/sql/resource_queue_deadlock.sql index 6591e8c75b6..1b2ae432c06 100644 --- a/src/test/isolation2/sql/resource_queue_deadlock.sql +++ b/src/test/isolation2/sql/resource_queue_deadlock.sql @@ -3,6 +3,7 @@ 0: CREATE RESOURCE QUEUE rq_deadlock_test WITH (active_statements = 1); 0: CREATE role role_deadlock_test RESOURCE QUEUE rq_deadlock_test; +0: GRANT CREATE ON SCHEMA public TO role_deadlock_test; 0: SET gp_autostats_lock_wait TO ON; 0: SELECT gp_inject_fault_infinite('before_auto_stats', 'suspend', dbid) FROM gp_segment_configuration WHERE content = -1 AND role = 'p'; @@ -30,5 +31,6 @@ -- Clean up the test 0: DROP TABLE t_deadlock_test; +0: REVOKE CREATE ON SCHEMA public FROM role_deadlock_test; 0: DROP ROLE role_deadlock_test; 0: DROP RESOURCE QUEUE rq_deadlock_test; diff --git a/src/test/isolation2/sql/resqueue_stats.sql b/src/test/isolation2/sql/resqueue_stats.sql new file mode 100644 index 00000000000..01e6041f839 --- /dev/null +++ b/src/test/isolation2/sql/resqueue_stats.sql @@ -0,0 +1,72 @@ +-- Test pg_stat_resqueues cumulative statistics for resource queues. + +0:CREATE RESOURCE QUEUE rq_stats_test WITH (active_statements = 1); +0:CREATE ROLE role_stats_test RESOURCE QUEUE rq_stats_test; + +-- Session 1 holds the queue slot so session 2 will block. +1:SET role role_stats_test; +1:BEGIN; +1:DECLARE c1 CURSOR FOR SELECT 1; + +-- Session 2 submits a query that will block. +2:SET role role_stats_test; +2&:SELECT pg_sleep(0); + +-- Verify session 2 is waiting on the resource queue. +0:SELECT wait_event_type, wait_event FROM pg_stat_activity WHERE query = 'SELECT pg_sleep(0);'; + +-- Cancel the blocked query (increments queries_rejected). +0:SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE query = 'SELECT pg_sleep(0);' AND wait_event = 'ResourceQueue'; + +2<: + +-- Release session 1's slot so later sessions can proceed. +1:CLOSE c1; +1:END; + +-- Session 3 runs a query that should be admitted and complete normally. +3:SET role role_stats_test; +3:SELECT 1; + +-- Session 4 runs another query that completes normally. +4:SET role role_stats_test; +4:SELECT 2; + +-- All resqueue stats are written directly to shared memory (no flush needed). +-- Check that the view shows the expected minimum counts. +-- queries_submitted >= 3: sessions 2 (rejected), 3, 4 +-- queries_admitted >= 2: sessions 3 and 4 +-- queries_completed >= 2: sessions 3 and 4 +0:SELECT + queuename, + queries_submitted >= 3 AS submitted_ok, + queries_admitted >= 2 AS admitted_ok, + queries_completed >= 2 AS completed_ok +FROM pg_stat_resqueues +WHERE queuename = 'rq_stats_test'; + +-- All counter columns must be non-negative. +0:SELECT + queries_submitted >= 0 AS sub_nn, + queries_admitted >= 0 AS adm_nn, + queries_rejected >= 0 AS rej_nn, + queries_completed >= 0 AS cmp_nn, + total_wait_time_secs >= 0 AS wait_nn, + max_wait_secs >= 0 AS maxw_nn, + total_exec_time_secs >= 0 AS exec_nn, + max_exec_secs >= 0 AS maxe_nn, + total_cost >= 0 AS cost_nn, + total_memory_kb >= 0 AS mem_nn +FROM pg_stat_resqueues +WHERE queuename = 'rq_stats_test'; + +-- Verify pg_stat_get_resqueue_stats() returns data directly. +-- The function has OUT parameters so no column definition list is needed. +0:SELECT queries_submitted >= 0 AS ok +FROM pg_stat_get_resqueue_stats( + (SELECT oid FROM pg_resqueue WHERE rsqname = 'rq_stats_test') + ) AS s; + +-- Cleanup. +0:DROP ROLE role_stats_test; +0:DROP RESOURCE QUEUE rq_stats_test; From e429cd647fedb4ab7b84cbe0f241a65d018abddc Mon Sep 17 00:00:00 2001 From: liushengsong Date: Sat, 7 Mar 2026 05:23:40 +0800 Subject: [PATCH 2/3] feat: collect QE relation stats on QD to enable auto-ANALYZE for distributed tables In GPDB, DML (INSERT/UPDATE/DELETE) executes on QE segments, but autovacuum lives on the QD coordinator. Before this change the QD never received modification counts from QEs, so n_mod_since_analyze remained 0 and auto-ANALYZE never triggered for distributed tables. Add two functions: - pgstat_send_qd_tabstats() (QE side, pgstat.c): reads per-table counts from pgStatXactStack before finish_xact_command() NULLs it, serialises them as PgStatTabRecordFromQE[] and attaches the array to the libpq PGresult via extras/PGExtraTypeTableStats. - pgstat_combine_from_qe() (QD side, pgstat_relation.c): called from mppExecutorFinishup/mppExecutorWait after dispatch; iterates QE results, deserialises PgStatTabRecordFromQE arrays and merges each record into the QD's pending stats via pgstat_prep_relation_pending(), then calls pgstat_force_next_flush() to bypass rate-limiting. Using the pending path (rather than writing directly to shared memory) is critical when auto_stats triggers ANALYZE in the same command: pgstat_report_analyze() resets mod_since_analyze to 0, but pending counts are added back when pgstat_report_stat() flushes after ANALYZE. Also add pgstat_get_current_xact_stack() to read pgStatXactStack without side effects, and update the Makefile to include libpq_srcdir for libpq-int.h (PGExtraType, pg_result internals). The autovacuum-analyze isolation2 test verifies all four auto-ANALYZE scenarios: plain heap table, partition leaf tables, lock-conflict abort, and coexistence with auto_stats (on_no_stats / on_change / below threshold). Co-Authored-By: Claude Opus 4.6 --- src/backend/cdb/dispatcher/cdbdisp_query.c | 6 + src/backend/executor/execUtils.c | 12 + src/backend/tcop/postgres.c | 8 + src/backend/utils/activity/Makefile | 3 + src/backend/utils/activity/pgstat.c | 137 +++++++++ src/backend/utils/activity/pgstat_relation.c | 108 +++++++ src/backend/utils/activity/pgstat_xact.c | 10 + src/include/pgstat.h | 21 ++ src/include/utils/pgstat_internal.h | 1 + src/test/isolation2/expected/qe_qd_pgstat.out | 278 ++++++++++++++++++ src/test/isolation2/sql/qe_qd_pgstat.sql | 136 +++++++++ 11 files changed, 720 insertions(+) create mode 100644 src/test/isolation2/expected/qe_qd_pgstat.out create mode 100644 src/test/isolation2/sql/qe_qd_pgstat.sql diff --git a/src/backend/cdb/dispatcher/cdbdisp_query.c b/src/backend/cdb/dispatcher/cdbdisp_query.c index 004050f018b..c69bf54460d 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_query.c +++ b/src/backend/cdb/dispatcher/cdbdisp_query.c @@ -526,6 +526,12 @@ cdbdisp_dispatchCommandInternal(DispatchCommandQueryParms *pQueryParms, ThrowErrorData(qeError); } + /* + * GPDB: Merge relation stats sent by QEs so QD's mod_since_analyze + * stays up to date for autovacuum triggering. + */ + pgstat_combine_from_qe(pr); + cdbdisp_returnResults(pr, cdb_pgresults); cdbdisp_destroyDispatcherState(ds); diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index f9d5719fbe3..5f9473c379e 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -2144,6 +2144,12 @@ void mppExecutorFinishup(QueryDesc *queryDesc) if (ProcessDispatchResult_hook) ProcessDispatchResult_hook(ds); + /* + * GPDB: Merge relation stats sent by QEs so QD's mod_since_analyze + * stays up to date for autovacuum triggering. + */ + pgstat_combine_from_qe(pr); + /* get num of rows processed from writer QEs. */ estate->es_processed += cdbdisp_sumCmdTuples(pr, primaryWriterSliceIndex); @@ -2225,6 +2231,12 @@ uint64 mppExecutorWait(QueryDesc *queryDesc) LocallyExecutingSliceIndex(queryDesc->estate), estate->showstatctx); } + /* + * GPDB: Merge relation stats sent by QEs so QD's mod_since_analyze + * stays up to date for autovacuum triggering. + */ + pgstat_combine_from_qe(pr); + /* get num of rows processed from writer QEs. */ es_processed += cdbdisp_sumCmdTuples(pr, primaryWriterSliceIndex); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 20be306646e..92f744e673f 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -1498,6 +1498,14 @@ exec_mpp_query(const char *query_string, PortalDrop(portal, false); + /* + * GPDB: Send pending relation stats to QD before closing the + * transaction. The stats are in pgStatXactStack (transaction-level + * counts); finish_xact_command() will call AtEOXact_PgStat() which + * NULLs pgStatXactStack, so we must capture the stats first. + */ + pgstat_send_qd_tabstats(); + /* * Close down transaction statement before reporting command-complete. * This is so that any end-of-transaction errors are reported before diff --git a/src/backend/utils/activity/Makefile b/src/backend/utils/activity/Makefile index d7d7d6c6b0d..3ff4f0e5235 100644 --- a/src/backend/utils/activity/Makefile +++ b/src/backend/utils/activity/Makefile @@ -13,6 +13,9 @@ subdir = src/backend/utils/activity top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global +# GPDB: needed for libpq-int.h (PGExtraType, pg_result struct) +override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) + OBJS = \ backend_progress.o \ backend_status.o \ diff --git a/src/backend/utils/activity/pgstat.c b/src/backend/utils/activity/pgstat.c index ab7768154c2..926800c9c09 100644 --- a/src/backend/utils/activity/pgstat.c +++ b/src/backend/utils/activity/pgstat.c @@ -96,6 +96,8 @@ #include "access/transam.h" #include "access/xact.h" #include "lib/dshash.h" +#include "libpq/pqformat.h" +#include "libpq-int.h" #include "pgstat.h" #include "port/atomics.h" #include "storage/fd.h" @@ -107,6 +109,8 @@ #include "utils/memutils.h" #include "utils/pgstat_internal.h" #include "utils/timestamp.h" +#include "catalog/gp_distribution_policy.h" +#include "cdb/cdbvars.h" /* ---------- @@ -1744,3 +1748,136 @@ assign_stats_fetch_consistency(int newval, void *extra) if (pgstat_fetch_consistency != newval) force_stats_snapshot_clear = true; } + + +/* ----------------------------------------------------------------------- + * GPDB: QE→QD pgstat collection. + * + * After a DML statement completes on QE, send the accumulated pending + * relation stats (from pgStatPending) to the QD via a 'y' protocol message. + * The QD collects these in pgstat_combine_from_qe() and merges them into + * its own pending stats, so autovacuum can see modification counts. + * ----------------------------------------------------------------------- + */ + +/* + * pgstat_send_qd_tabstats -- QE side: send relation stats to QD. + * + * Must be called only on QE (Gp_role == GP_ROLE_EXECUTE), BEFORE + * finish_xact_command(). At call time the transaction-level per-table + * counts are still in pgStatXactStack. finish_xact_command() calls + * AtEOXact_PgStat() which NULLs pgStatXactStack, so we must read the + * stats before that happens. + */ +void +pgstat_send_qd_tabstats(void) +{ + PgStat_SubXactStatus *xact_state; + StringInfoData buf; + PgStatTabRecordFromQE *records; + int nrecords = 0; + int capacity = 64; + + if (Gp_role != GP_ROLE_EXECUTE || !Gp_is_writer) + return; + + /* + * On QE inside a distributed transaction, stats for the current + * statement are in pgStatXactStack (not yet merged to pgStatPending, + * because the top-level transaction hasn't committed yet). Read the + * current nesting level's per-table insert/update/delete counts. + */ + xact_state = pgstat_get_current_xact_stack(); + + if (xact_state == NULL) + return; + + records = (PgStatTabRecordFromQE *) + palloc(capacity * sizeof(PgStatTabRecordFromQE)); + + /* + * Send only the current nesting level's per-table insert/update/delete + * counts. QD will place these into its own transactional state (trans), + * letting PG's normal AtEOXact_PgStat_Relations / AtEOSubXact_PgStat_Relations + * machinery handle delta_live_tuples, delta_dead_tuples, changed_tuples, + * and subtransaction commit/abort correctly. + */ + { + PgStat_TableXactStatus *trans; + + for (trans = xact_state->first; trans != NULL; trans = trans->next) + { + PgStat_TableStatus *tabstat = trans->parent; + PgStat_Counter ins, upd, del; + + ins = trans->tuples_inserted; + upd = trans->tuples_updated; + del = trans->tuples_deleted; + + if (ins == 0 && upd == 0 && del == 0 && !trans->truncdropped) + continue; + + /* + * Filter by distribution policy: skip catalog tables (QD has + * the same updates) and deduplicate replicated tables (only + * one segment reports, to avoid overcounting). + */ + { + GpPolicy *gppolicy = GpPolicyFetch(tabstat->id); + + switch (gppolicy->ptype) + { + case POLICYTYPE_ENTRY: + pfree(gppolicy); + continue; + case POLICYTYPE_REPLICATED: + if (GpIdentity.segindex != tabstat->id % gppolicy->numsegments) + { + pfree(gppolicy); + continue; + } + break; + case POLICYTYPE_PARTITIONED: + break; + default: + elog(ERROR, "unrecognized policy type %d", + gppolicy->ptype); + } + pfree(gppolicy); + } + + /* New entry — each table appears at most once per nesting level */ + if (nrecords >= capacity) + { + capacity *= 2; + records = (PgStatTabRecordFromQE *) + repalloc(records, capacity * sizeof(PgStatTabRecordFromQE)); + } + + records[nrecords].t_id = tabstat->id; + records[nrecords].t_shared = tabstat->shared; + records[nrecords].truncdropped = trans->truncdropped; + records[nrecords].tuples_inserted = ins; + records[nrecords].tuples_updated = upd; + records[nrecords].tuples_deleted = del; + nrecords++; + } + } + + if (nrecords == 0) + { + pfree(records); + return; + } + + pq_beginmessage(&buf, 'y'); + pq_sendstring(&buf, "PGSTAT"); + pq_sendbyte(&buf, false); /* result not complete yet */ + pq_sendint(&buf, PGExtraTypeTableStats, sizeof(PGExtraType)); + pq_sendint(&buf, nrecords * sizeof(PgStatTabRecordFromQE), sizeof(int)); + pq_sendbytes(&buf, (char *) records, nrecords * sizeof(PgStatTabRecordFromQE)); + pq_endmessage(&buf); + + pfree(records); + +} diff --git a/src/backend/utils/activity/pgstat_relation.c b/src/backend/utils/activity/pgstat_relation.c index f24091c5078..33532b0030e 100644 --- a/src/backend/utils/activity/pgstat_relation.c +++ b/src/backend/utils/activity/pgstat_relation.c @@ -20,12 +20,16 @@ #include "access/twophase_rmgr.h" #include "access/xact.h" #include "catalog/partition.h" +#include "libpq-int.h" #include "postmaster/autovacuum.h" #include "utils/memutils.h" #include "utils/pgstat_internal.h" #include "utils/rel.h" #include "utils/timestamp.h" #include "catalog/catalog.h" +#include "cdb/cdbdispatchresult.h" +#include "utils/faultinjector.h" +#include "utils/lsyscache.h" /* Record that's written to 2PC state file when pgstat state is persisted */ @@ -953,3 +957,107 @@ restore_truncdrop_counters(PgStat_TableXactStatus *trans) trans->tuples_deleted = trans->deleted_pre_truncdrop; } } + + +/* ----------------------------------------------------------------------- + * GPDB: QD side — merge relation stats received from QEs. + * + * Called from mppExecutorFinishup() / mppExecutorWait() after QEs have + * completed their work. Each QE sends a 'y' protocol message containing + * an array of PgStatTabRecordFromQE — the current nesting level's + * per-table insert/update/delete counts. + * + * We place these into the QD's own transactional state (PgStat_TableXactStatus) + * at the current nesting level. This lets PG's normal end-of-transaction + * machinery (AtEOXact_PgStat_Relations / AtEOSubXact_PgStat_Relations) handle + * delta_live_tuples, delta_dead_tuples, changed_tuples, and subtransaction + * commit/abort correctly — just as if the DML had happened locally. + * + * Because QE sends cumulative values for the current nesting level (trans + * accumulates within a level across statements), we must zero QD's trans + * before re-accumulating from all segments. On first encounter of each + * relation we zero its trans, then sum across all segments in one pass. + * ----------------------------------------------------------------------- + */ +void +pgstat_combine_from_qe(CdbDispatchResults *primaryResults) +{ + int i, + j; + int nest_level = GetCurrentTransactionNestLevel(); + List *zeroed_rels = NIL; + + if (primaryResults == NULL) + return; + + for (i = 0; i < primaryResults->resultCount; i++) + { + CdbDispatchResult *dispResult = &primaryResults->resultArray[i]; + int nres = cdbdisp_numPGresult(dispResult); + + for (j = 0; j < nres; j++) + { + PGresult *pgresult = cdbdisp_getPGresult(dispResult, j); + PgStatTabRecordFromQE *records; + int nrecords, + k; + + if (pgresult == NULL || + pgresult->extras == NULL || + pgresult->extraType != PGExtraTypeTableStats) + continue; + + records = (PgStatTabRecordFromQE *) pgresult->extras; + nrecords = pgresult->extraslen / sizeof(PgStatTabRecordFromQE); + + for (k = 0; k < nrecords; k++) + { + PgStatTabRecordFromQE *rec = &records[k]; + PgStat_TableStatus *tabstat; + PgStat_TableXactStatus *trans; + +#ifdef FAULT_INJECTOR + if (*numActiveFaults_ptr > 0) + { + char *relname = get_rel_name(rec->t_id); + if (relname) + { + FaultInjector_InjectFaultIfSet( + "gp_pgstat_report_on_master", DDLNotSpecified, + "", relname); + pfree(relname); + } + } +#endif + + tabstat = pgstat_prep_relation_pending(rec->t_id, rec->t_shared); + + /* Ensure a trans exists at current nesting level */ + if (tabstat->trans == NULL || + tabstat->trans->nest_level != nest_level) + add_tabstat_xact_level(tabstat, nest_level); + + trans = tabstat->trans; + + /* Zero on first encounter to undo previous merge */ + if (!list_member_oid(zeroed_rels, rec->t_id)) + { + trans->tuples_inserted = 0; + trans->tuples_updated = 0; + trans->tuples_deleted = 0; + trans->truncdropped = false; + zeroed_rels = lappend_oid(zeroed_rels, rec->t_id); + } + + /* Accumulate QE counts from this segment */ + trans->tuples_inserted += rec->tuples_inserted; + trans->tuples_updated += rec->tuples_updated; + trans->tuples_deleted += rec->tuples_deleted; + if (rec->truncdropped) + trans->truncdropped = true; + } + } + } + + list_free(zeroed_rels); +} diff --git a/src/backend/utils/activity/pgstat_xact.c b/src/backend/utils/activity/pgstat_xact.c index 369239d5014..110ddca3c91 100644 --- a/src/backend/utils/activity/pgstat_xact.c +++ b/src/backend/utils/activity/pgstat_xact.c @@ -253,6 +253,16 @@ pgstat_get_xact_stack_level(int nest_level) return xact_state; } +/* + * GPDB: Return the current xact stats stack without allocating new levels. + * Used by pgstat_send_qd_tabstats() to read in-progress transaction stats. + */ +PgStat_SubXactStatus * +pgstat_get_current_xact_stack(void) +{ + return pgStatXactStack; +} + /* * Get stat items that need to be dropped at commit / abort. * diff --git a/src/include/pgstat.h b/src/include/pgstat.h index c4f3b88444a..2a85773c30c 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -719,6 +719,27 @@ extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry_ext(bool shared, Oid reloid); extern PgStat_TableStatus *find_tabstat_entry(Oid rel_id); +/* + * GPDB: Data structure for transmitting per-table stats from QE to QD. + * Used by pgstat_send_qd_tabstats() and pgstat_combine_from_qe(). + */ +typedef struct PgStatTabRecordFromQE +{ + Oid t_id; /* table OID */ + bool t_shared; /* is it a shared catalog? */ + bool truncdropped; /* was it truncated/dropped? */ + PgStat_Counter tuples_inserted; + PgStat_Counter tuples_updated; + PgStat_Counter tuples_deleted; +} PgStatTabRecordFromQE; + +/* GPDB: QE sends pending relation stats to QD via 'y' protocol message */ +extern void pgstat_send_qd_tabstats(void); + +/* GPDB: QD combines relation stats received from QEs after dispatch */ +struct CdbDispatchResults; +extern void pgstat_combine_from_qe(struct CdbDispatchResults *primaryResults); + /* * Functions in pgstat_replslot.c diff --git a/src/include/utils/pgstat_internal.h b/src/include/utils/pgstat_internal.h index 2b3da610a31..861dd7d7184 100644 --- a/src/include/utils/pgstat_internal.h +++ b/src/include/utils/pgstat_internal.h @@ -705,6 +705,7 @@ extern void pgstat_subscription_reset_timestamp_cb(PgStatShared_Common *header, */ extern PgStat_SubXactStatus *pgstat_get_xact_stack_level(int nest_level); +extern PgStat_SubXactStatus *pgstat_get_current_xact_stack(void); extern void pgstat_drop_transactional(PgStat_Kind kind, Oid dboid, Oid objoid); extern void pgstat_create_transactional(PgStat_Kind kind, Oid dboid, Oid objoid); diff --git a/src/test/isolation2/expected/qe_qd_pgstat.out b/src/test/isolation2/expected/qe_qd_pgstat.out new file mode 100644 index 00000000000..762295c0d75 --- /dev/null +++ b/src/test/isolation2/expected/qe_qd_pgstat.out @@ -0,0 +1,278 @@ +-- Test: QE→QD pgstat collection +-- Verifies that DML stats from QE segments reach the QD coordinator's +-- pg_stat_user_tables, enabling autovacuum to see modification counts. +-- Also verifies that gp_stat_user_tables_summary remains accurate on QEs. + +-- +-- Setup: disable autovacuum and auto_stats to prevent interference. +-- +ALTER SYSTEM SET autovacuum = off; +ALTER +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) +SELECT pg_sleep(0.5); + pg_sleep +---------- + +(1 row) +SET gp_autostats_mode = none; +SET + +-- +-- Test 1: Distributed (hash) table — INSERT/UPDATE/DELETE stats reach QD +-- +CREATE TABLE test_pgstat_dist(id int, val int) DISTRIBUTED BY (id); +CREATE + +INSERT INTO test_pgstat_dist SELECT i, 0 FROM generate_series(1, 1000) i; +INSERT 1000 +SELECT gp_stat_force_next_flush(); + gp_stat_force_next_flush +-------------------------- + +(1 row) + +-- QD should see the stats sent from QEs +SELECT n_tup_ins, n_mod_since_analyze FROM pg_stat_user_tables WHERE relname = 'test_pgstat_dist'; + n_tup_ins | n_mod_since_analyze +-----------+--------------------- + 1000 | 1000 +(1 row) + +-- QE summary should also show the same counts +SELECT n_tup_ins, n_mod_since_analyze FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_dist'; + n_tup_ins | n_mod_since_analyze +-----------+--------------------- + 1000 | 1000 +(1 row) + +-- UPDATE non-distribution-key column so it's a real update, not split update +UPDATE test_pgstat_dist SET val = 1 WHERE id <= 100; +UPDATE 100 +SELECT gp_stat_force_next_flush(); + gp_stat_force_next_flush +-------------------------- + +(1 row) + +SELECT n_tup_upd FROM pg_stat_user_tables WHERE relname = 'test_pgstat_dist'; + n_tup_upd +----------- + 100 +(1 row) +SELECT n_tup_upd FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_dist'; + n_tup_upd +----------- + 100 +(1 row) + +DELETE FROM test_pgstat_dist WHERE id <= 50; +DELETE 50 +SELECT gp_stat_force_next_flush(); + gp_stat_force_next_flush +-------------------------- + +(1 row) + +SELECT n_tup_del FROM pg_stat_user_tables WHERE relname = 'test_pgstat_dist'; + n_tup_del +----------- + 50 +(1 row) +SELECT n_tup_del FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_dist'; + n_tup_del +----------- + 50 +(1 row) + +-- +-- Test 2: Replicated table — stats not double-counted +-- With 3 segments, each segment has all 500 rows, but only one segment +-- should report stats to QD. +-- +CREATE TABLE test_pgstat_repl(id int) DISTRIBUTED REPLICATED; +CREATE + +INSERT INTO test_pgstat_repl SELECT i FROM generate_series(1, 500) i; +INSERT 500 +SELECT gp_stat_force_next_flush(); + gp_stat_force_next_flush +-------------------------- + +(1 row) + +-- QD should show exactly 500, not 1500 (3 segments * 500) +SELECT n_tup_ins, n_mod_since_analyze FROM pg_stat_user_tables WHERE relname = 'test_pgstat_repl'; + n_tup_ins | n_mod_since_analyze +-----------+--------------------- + 500 | 500 +(1 row) + +-- QE summary divides replicated table stats by numsegments, so also 500 +SELECT n_tup_ins, n_mod_since_analyze FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_repl'; + n_tup_ins | n_mod_since_analyze +-----------+--------------------- + 500 | 500 +(1 row) + +-- +-- Test 3: Transaction — committed DML stats are counted +-- +CREATE TABLE test_pgstat_xact(id int) DISTRIBUTED BY (id); +CREATE + +BEGIN; +BEGIN +INSERT INTO test_pgstat_xact SELECT i FROM generate_series(1, 300) i; +INSERT 300 +DELETE FROM test_pgstat_xact WHERE id <= 100; +DELETE 100 +COMMIT; +COMMIT +SELECT gp_stat_force_next_flush(); + gp_stat_force_next_flush +-------------------------- + +(1 row) + +SELECT n_tup_ins, n_tup_del FROM pg_stat_user_tables WHERE relname = 'test_pgstat_xact'; + n_tup_ins | n_tup_del +-----------+----------- + 300 | 100 +(1 row) +SELECT n_tup_ins, n_tup_del FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_xact'; + n_tup_ins | n_tup_del +-----------+----------- + 300 | 100 +(1 row) +SELECT count(*) FROM test_pgstat_xact; + count +------- + 200 +(1 row) + +-- +-- Test 4: Subtransaction rollback — n_tup_ins counts all attempted inserts +-- (PG counts attempted actions regardless of commit/abort) +-- +CREATE TABLE test_pgstat_subxact(id int) DISTRIBUTED BY (id); +CREATE + +BEGIN; +BEGIN +INSERT INTO test_pgstat_subxact SELECT i FROM generate_series(1, 200) i; +INSERT 200 +SAVEPOINT sp1; +SAVEPOINT +INSERT INTO test_pgstat_subxact SELECT i FROM generate_series(201, 700) i; +INSERT 500 +ROLLBACK TO sp1; +ROLLBACK +COMMIT; +COMMIT +SELECT gp_stat_force_next_flush(); + gp_stat_force_next_flush +-------------------------- + +(1 row) + +-- n_tup_ins counts all attempted inserts (200 + 500 = 700) +-- but only 200 rows are actually in the table +SELECT n_tup_ins FROM pg_stat_user_tables WHERE relname = 'test_pgstat_subxact'; + n_tup_ins +----------- + 700 +(1 row) +SELECT n_tup_ins FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_subxact'; + n_tup_ins +----------- + 700 +(1 row) +SELECT count(*) FROM test_pgstat_subxact; + count +------- + 200 +(1 row) + +-- +-- Test 5: Nested subtransactions — RELEASE merges into parent, ROLLBACK TO discards +-- +CREATE TABLE test_pgstat_nested(id int) DISTRIBUTED BY (id); +CREATE + +BEGIN; +BEGIN +INSERT INTO test_pgstat_nested SELECT i FROM generate_series(1, 100) i; +INSERT 100 +SAVEPOINT sp1; +SAVEPOINT +INSERT INTO test_pgstat_nested SELECT i FROM generate_series(101, 200) i; +INSERT 100 +SAVEPOINT sp2; +SAVEPOINT +INSERT INTO test_pgstat_nested SELECT i FROM generate_series(201, 300) i; +INSERT 100 +RELEASE SAVEPOINT sp2; +RELEASE +ROLLBACK TO sp1; +ROLLBACK +COMMIT; +COMMIT +SELECT gp_stat_force_next_flush(); + gp_stat_force_next_flush +-------------------------- + +(1 row) + +-- All 300 attempted inserts counted (100 outer + 100 sp1 + 100 sp2) +-- but only 100 rows remain (sp1 rollback discards sp1 and released sp2) +SELECT n_tup_ins FROM pg_stat_user_tables WHERE relname = 'test_pgstat_nested'; + n_tup_ins +----------- + 300 +(1 row) +SELECT n_tup_ins FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_nested'; + n_tup_ins +----------- + 300 +(1 row) +SELECT count(*) FROM test_pgstat_nested; + count +------- + 100 +(1 row) + +-- +-- Test 6: Catalog (entry) table — QE doesn't crash on catalog DML +-- Catalog tables are filtered out (POLICYTYPE_ENTRY), so the stats +-- infrastructure should simply skip them without error. +-- +CREATE FUNCTION test_pgstat_func() RETURNS void AS $$ BEGIN END; $$ LANGUAGE plpgsql; +CREATE +DROP FUNCTION test_pgstat_func(); +DROP + +-- +-- Cleanup +-- +DROP TABLE test_pgstat_dist; +DROP +DROP TABLE test_pgstat_repl; +DROP +DROP TABLE test_pgstat_xact; +DROP +DROP TABLE test_pgstat_subxact; +DROP +DROP TABLE test_pgstat_nested; +DROP + +ALTER SYSTEM RESET autovacuum; +ALTER +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) diff --git a/src/test/isolation2/sql/qe_qd_pgstat.sql b/src/test/isolation2/sql/qe_qd_pgstat.sql new file mode 100644 index 00000000000..1d25ed5a623 --- /dev/null +++ b/src/test/isolation2/sql/qe_qd_pgstat.sql @@ -0,0 +1,136 @@ +-- Test: QE→QD pgstat collection +-- Verifies that DML stats from QE segments reach the QD coordinator's +-- pg_stat_user_tables, enabling autovacuum to see modification counts. +-- Also verifies that gp_stat_user_tables_summary remains accurate on QEs. + +-- +-- Setup: disable autovacuum and auto_stats to prevent interference. +-- +ALTER SYSTEM SET autovacuum = off; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.5); +SET gp_autostats_mode = none; + +-- +-- Test 1: Distributed (hash) table — INSERT/UPDATE/DELETE stats reach QD +-- +CREATE TABLE test_pgstat_dist(id int, val int) DISTRIBUTED BY (id); + +INSERT INTO test_pgstat_dist SELECT i, 0 FROM generate_series(1, 1000) i; +SELECT gp_stat_force_next_flush(); + +-- QD should see the stats sent from QEs +SELECT n_tup_ins, n_mod_since_analyze + FROM pg_stat_user_tables WHERE relname = 'test_pgstat_dist'; + +-- QE summary should also show the same counts +SELECT n_tup_ins, n_mod_since_analyze + FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_dist'; + +-- UPDATE non-distribution-key column so it's a real update, not split update +UPDATE test_pgstat_dist SET val = 1 WHERE id <= 100; +SELECT gp_stat_force_next_flush(); + +SELECT n_tup_upd FROM pg_stat_user_tables WHERE relname = 'test_pgstat_dist'; +SELECT n_tup_upd FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_dist'; + +DELETE FROM test_pgstat_dist WHERE id <= 50; +SELECT gp_stat_force_next_flush(); + +SELECT n_tup_del FROM pg_stat_user_tables WHERE relname = 'test_pgstat_dist'; +SELECT n_tup_del FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_dist'; + +-- +-- Test 2: Replicated table — stats not double-counted +-- With 3 segments, each segment has all 500 rows, but only one segment +-- should report stats to QD. +-- +CREATE TABLE test_pgstat_repl(id int) DISTRIBUTED REPLICATED; + +INSERT INTO test_pgstat_repl SELECT i FROM generate_series(1, 500) i; +SELECT gp_stat_force_next_flush(); + +-- QD should show exactly 500, not 1500 (3 segments * 500) +SELECT n_tup_ins, n_mod_since_analyze + FROM pg_stat_user_tables WHERE relname = 'test_pgstat_repl'; + +-- QE summary divides replicated table stats by numsegments, so also 500 +SELECT n_tup_ins, n_mod_since_analyze + FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_repl'; + +-- +-- Test 3: Transaction — committed DML stats are counted +-- +CREATE TABLE test_pgstat_xact(id int) DISTRIBUTED BY (id); + +BEGIN; +INSERT INTO test_pgstat_xact SELECT i FROM generate_series(1, 300) i; +DELETE FROM test_pgstat_xact WHERE id <= 100; +COMMIT; +SELECT gp_stat_force_next_flush(); + +SELECT n_tup_ins, n_tup_del FROM pg_stat_user_tables WHERE relname = 'test_pgstat_xact'; +SELECT n_tup_ins, n_tup_del FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_xact'; +SELECT count(*) FROM test_pgstat_xact; + +-- +-- Test 4: Subtransaction rollback — n_tup_ins counts all attempted inserts +-- (PG counts attempted actions regardless of commit/abort) +-- +CREATE TABLE test_pgstat_subxact(id int) DISTRIBUTED BY (id); + +BEGIN; +INSERT INTO test_pgstat_subxact SELECT i FROM generate_series(1, 200) i; +SAVEPOINT sp1; +INSERT INTO test_pgstat_subxact SELECT i FROM generate_series(201, 700) i; +ROLLBACK TO sp1; +COMMIT; +SELECT gp_stat_force_next_flush(); + +-- n_tup_ins counts all attempted inserts (200 + 500 = 700) +-- but only 200 rows are actually in the table +SELECT n_tup_ins FROM pg_stat_user_tables WHERE relname = 'test_pgstat_subxact'; +SELECT n_tup_ins FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_subxact'; +SELECT count(*) FROM test_pgstat_subxact; + +-- +-- Test 5: Nested subtransactions — RELEASE merges into parent, ROLLBACK TO discards +-- +CREATE TABLE test_pgstat_nested(id int) DISTRIBUTED BY (id); + +BEGIN; +INSERT INTO test_pgstat_nested SELECT i FROM generate_series(1, 100) i; +SAVEPOINT sp1; +INSERT INTO test_pgstat_nested SELECT i FROM generate_series(101, 200) i; +SAVEPOINT sp2; +INSERT INTO test_pgstat_nested SELECT i FROM generate_series(201, 300) i; +RELEASE SAVEPOINT sp2; +ROLLBACK TO sp1; +COMMIT; +SELECT gp_stat_force_next_flush(); + +-- All 300 attempted inserts counted (100 outer + 100 sp1 + 100 sp2) +-- but only 100 rows remain (sp1 rollback discards sp1 and released sp2) +SELECT n_tup_ins FROM pg_stat_user_tables WHERE relname = 'test_pgstat_nested'; +SELECT n_tup_ins FROM gp_stat_user_tables_summary WHERE relname = 'test_pgstat_nested'; +SELECT count(*) FROM test_pgstat_nested; + +-- +-- Test 6: Catalog (entry) table — QE doesn't crash on catalog DML +-- Catalog tables are filtered out (POLICYTYPE_ENTRY), so the stats +-- infrastructure should simply skip them without error. +-- +CREATE FUNCTION test_pgstat_func() RETURNS void AS $$ BEGIN END; $$ LANGUAGE plpgsql; +DROP FUNCTION test_pgstat_func(); + +-- +-- Cleanup +-- +DROP TABLE test_pgstat_dist; +DROP TABLE test_pgstat_repl; +DROP TABLE test_pgstat_xact; +DROP TABLE test_pgstat_subxact; +DROP TABLE test_pgstat_nested; + +ALTER SYSTEM RESET autovacuum; +SELECT pg_reload_conf(); From 79782d96e4ad8c705269cc1d46625d389b080beb Mon Sep 17 00:00:00 2001 From: liushengsong Date: Wed, 11 Mar 2026 04:25:16 +0800 Subject: [PATCH 3/3] fix: support autovacuum for databases without pgstat entries in GPDB MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With QE→QD pgstat collection, newly created databases may already have tables needing auto-analyze before the database-level pgstat entry is flushed to shared memory. This change: 1. Removes the pgstat entry requirement in rebuild_database_list() and do_start_worker() so autovacuum considers all databases. 2. Filters stale (dropped) database OIDs from DatabaseList by cross-referencing against the current pg_database catalog, preventing the launcher from endlessly scheduling workers for non-existent DBs. 3. Uses gp_stat_force_next_flush() instead of pg_sleep(0.77) for deterministic stats flushing in the autovacuum-analyze test. Co-Authored-By: Claude Opus 4.6 --- src/backend/postmaster/autovacuum.c | 106 ++++++++++-------- .../input/autovacuum-analyze.source | 14 +-- .../output/autovacuum-analyze.source | 29 +++-- 3 files changed, 91 insertions(+), 58 deletions(-) diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index f517cb4d006..762c4d5ad59 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -1068,60 +1068,75 @@ rebuild_database_list(Oid newdb) if (OidIsValid(newdb)) { avl_dbase *db; - PgStat_StatDBEntry *entry; - /* only consider this database if it has a pgstat entry */ - entry = pgstat_fetch_stat_dbentry(newdb); - if (entry != NULL) - { - /* we assume it isn't found because the hash was just created */ - db = hash_search(dbhash, &newdb, HASH_ENTER, NULL); + /* + * In GPDB, include databases even without pgstat entries since + * newly created databases may already need auto-analyze via + * QE→QD pgstat collection. + */ - /* hash_search already filled in the key */ - db->adl_score = score++; - /* next_worker is filled in later */ - } + /* we assume it isn't found because the hash was just created */ + db = hash_search(dbhash, &newdb, HASH_ENTER, NULL); + + /* hash_search already filled in the key */ + db->adl_score = score++; + /* next_worker is filled in later */ } - /* Now insert the databases from the existing list */ - dlist_foreach(iter, &DatabaseList) + /* + * Get the current list of databases first, so we can cross-reference + * existing DatabaseList entries and exclude dropped databases. + * In GPDB, we include databases even without pgstat entries since + * newly created databases may need auto-analyze via QE→QD pgstat. + */ + dblist = get_database_list(); + + /* Build a set of current database OIDs for quick lookup */ { - avl_dbase *avdb = dlist_container(avl_dbase, adl_node, iter.cur); - avl_dbase *db; - bool found; - PgStat_StatDBEntry *entry; + HASHCTL oid_hctl; + HTAB *current_db_oids; + + oid_hctl.keysize = sizeof(Oid); + oid_hctl.entrysize = sizeof(Oid); + oid_hctl.hcxt = tmpcxt; + current_db_oids = hash_create("current db oids", 20, &oid_hctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + foreach(cell, dblist) + { + avw_dbase *avdb = lfirst(cell); + hash_search(current_db_oids, &(avdb->adw_datid), HASH_ENTER, NULL); + } - /* - * skip databases with no stat entries -- in particular, this gets rid - * of dropped databases - */ - entry = pgstat_fetch_stat_dbentry(avdb->adl_datid); - if (entry == NULL) - continue; + /* Now insert the databases from the existing list, skipping dropped ones */ + dlist_foreach(iter, &DatabaseList) + { + avl_dbase *avdb = dlist_container(avl_dbase, adl_node, iter.cur); + avl_dbase *db; + bool found; - db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found); + /* Skip databases that no longer exist in pg_database */ + if (!hash_search(current_db_oids, &(avdb->adl_datid), HASH_FIND, NULL)) + continue; - if (!found) - { - /* hash_search already filled in the key */ - db->adl_score = score++; - /* next_worker is filled in later */ + db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found); + + if (!found) + { + /* hash_search already filled in the key */ + db->adl_score = score++; + /* next_worker is filled in later */ + } } + + hash_destroy(current_db_oids); } /* finally, insert all qualifying databases not previously inserted */ - dblist = get_database_list(); foreach(cell, dblist) { avw_dbase *avdb = lfirst(cell); avl_dbase *db; bool found; - PgStat_StatDBEntry *entry; - - /* only consider databases with a pgstat entry */ - entry = pgstat_fetch_stat_dbentry(avdb->adw_datid); - if (entry == NULL) - continue; db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found); /* only update the score if the database was not already on the hash */ @@ -1329,11 +1344,12 @@ do_start_worker(void) tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid); /* - * Skip a database with no pgstat entry; it means it hasn't seen any - * activity. + * Don't skip databases without pgstat entries. In GPDB, a newly + * created database may already have tables needing auto-analyze + * (via QE→QD pgstat) before the database-level pgstat entry is + * flushed to shared memory. Treat them as candidates with the + * oldest last_autovac_time so they get visited promptly. */ - if (!tmp->adw_entry) - continue; /* * Also, skip a database that appears on the database list as having @@ -1368,11 +1384,13 @@ do_start_worker(void) continue; /* - * Remember the db with oldest autovac time. (If we are here, both - * tmp->entry and db->entry must be non-null.) + * Remember the db with oldest autovac time. A database without a + * pgstat entry is treated as having last_autovac_time = 0, giving + * it the highest priority. */ if (avdb == NULL || - tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time) + (tmp->adw_entry ? tmp->adw_entry->last_autovac_time : 0) < + (avdb->adw_entry ? avdb->adw_entry->last_autovac_time : 0)) avdb = tmp; } diff --git a/src/test/isolation2/input/autovacuum-analyze.source b/src/test/isolation2/input/autovacuum-analyze.source index 19187b107c2..2445c4178b8 100644 --- a/src/test/isolation2/input/autovacuum-analyze.source +++ b/src/test/isolation2/input/autovacuum-analyze.source @@ -171,13 +171,13 @@ SELECT gp_inject_fault('auto_vac_worker_after_report_activity', 'suspend', '', ' -- with auto_stats, the auto-ANALYZE still trigger 2: INSERT INTO autostatstbl select i from generate_series(1, 1000) as i; -2: select pg_sleep(0.77); -- Force pgstat_report_stat() to send tabstat. +2: select gp_stat_force_next_flush(); -- auto_stats executed but auto-ANALYZE not execute yet since we suspend before finish ANALYZE. SELECT count(*) FROM pg_statistic where starelid = 'autostatstbl'::regclass; select relpages, reltuples from pg_class where oid = 'autostatstbl'::regclass; --- expect analyze_count = 1, autoanalyze_count = 0, and n_mod_since_analyze = 1000 since ANALYZE executed --- in same transaction for the insert statement. +-- expect analyze_count = 1, autoanalyze_count = 0, and n_mod_since_analyze = 1000 since ANALYZE +-- executed in same transaction for the insert statement. select analyze_count, autoanalyze_count, n_mod_since_analyze from pg_stat_all_tables where relname = 'autostatstbl'; -- Wait until autovacuum is triggered @@ -205,7 +205,7 @@ SELECT gp_inject_fault('analyze_finished_one_relation', 'skip', '', '', 'autosta SELECT gp_inject_fault('auto_vac_worker_after_report_activity', 'suspend', '', '', 'autostatstbl', 1, -1, 0, 1); 2: INSERT INTO autostatstbl select i from generate_series(1001, 2000) as i; -2: select pg_sleep(0.77); -- Force pgstat_report_stat() to send tabstat. +2: select gp_stat_force_next_flush(); -- auto_stats executed but auto-ANALYZE not execute yet since we suspend before finish ANALYZE. select relpages, reltuples from pg_class where oid = 'autostatstbl'::regclass; @@ -238,12 +238,12 @@ SELECT gp_inject_fault('analyze_finished_one_relation', 'skip', '', '', 'autosta SELECT gp_inject_fault('auto_vac_worker_after_report_activity', 'suspend', '', '', 'autostatstbl', 1, -1, 0, 1); 2: INSERT INTO autostatstbl select i from generate_series(2001, 3000) as i; -2: select pg_sleep(0.77); -- Force pgstat_report_stat() to send tabstat. +2: select gp_stat_force_next_flush(); -- auto_stats should not executed and auto-ANALYZE not execute yet since we suspend before finish ANALYZE. select relpages, reltuples from pg_class where oid = 'autostatstbl'::regclass; --- expect analyze_count = 2, autoanalyze_count = 2, and n_mod_since_analyze = 1000 since ANALYZE executed --- in same transaction for the insert statement. +-- expect analyze_count = 2, autoanalyze_count = 2, and n_mod_since_analyze = 1000 since ANALYZE +-- executed in same transaction for the insert statement. select analyze_count, autoanalyze_count, n_mod_since_analyze from pg_stat_all_tables where relname = 'autostatstbl'; -- Wait until autovacuum is triggered diff --git a/src/test/isolation2/output/autovacuum-analyze.source b/src/test/isolation2/output/autovacuum-analyze.source index b8727a10a51..457ec1b38fb 100644 --- a/src/test/isolation2/output/autovacuum-analyze.source +++ b/src/test/isolation2/output/autovacuum-analyze.source @@ -408,7 +408,12 @@ SELECT gp_inject_fault('auto_vac_worker_after_report_activity', 'suspend', '', ' -- with auto_stats, the auto-ANALYZE still trigger 2: INSERT INTO autostatstbl select i from generate_series(1, 1000) as i; INSERT 1000 -2: select pg_sleep(0.77); -- Force pgstat_report_stat() to send tabstat. +2: select gp_stat_force_next_flush(); + gp_stat_force_next_flush +-------------------------- + +(1 row) + -- auto_stats executed but auto-ANALYZE not execute yet since we suspend before finish ANALYZE. SELECT count(*) FROM pg_statistic where starelid = 'autostatstbl'::regclass; count @@ -420,8 +425,8 @@ select relpages, reltuples from pg_class where oid = 'autostatstbl'::regclass; ----------+----------- 3 | 1000 (1 row) --- expect analyze_count = 1, autoanalyze_count = 0, and n_mod_since_analyze = 1000 since ANALYZE executed --- in same transaction for the insert statement. +-- expect analyze_count = 1, autoanalyze_count = 0, and n_mod_since_analyze = 1000 since ANALYZE +-- executed in same transaction for the insert statement. select analyze_count, autoanalyze_count, n_mod_since_analyze from pg_stat_all_tables where relname = 'autostatstbl'; analyze_count | autoanalyze_count | n_mod_since_analyze ---------------+-------------------+--------------------- @@ -493,7 +498,12 @@ SELECT gp_inject_fault('auto_vac_worker_after_report_activity', 'suspend', '', ' 2: INSERT INTO autostatstbl select i from generate_series(1001, 2000) as i; INSERT 1000 -2: select pg_sleep(0.77); -- Force pgstat_report_stat() to send tabstat. +2: select gp_stat_force_next_flush(); + gp_stat_force_next_flush +-------------------------- + +(1 row) + -- auto_stats executed but auto-ANALYZE not execute yet since we suspend before finish ANALYZE. select relpages, reltuples from pg_class where oid = 'autostatstbl'::regclass; relpages | reltuples @@ -572,15 +582,20 @@ SELECT gp_inject_fault('auto_vac_worker_after_report_activity', 'suspend', '', ' 2: INSERT INTO autostatstbl select i from generate_series(2001, 3000) as i; INSERT 1000 -2: select pg_sleep(0.77); -- Force pgstat_report_stat() to send tabstat. +2: select gp_stat_force_next_flush(); + gp_stat_force_next_flush +-------------------------- + +(1 row) + -- auto_stats should not executed and auto-ANALYZE not execute yet since we suspend before finish ANALYZE. select relpages, reltuples from pg_class where oid = 'autostatstbl'::regclass; relpages | reltuples ----------+----------- 3 | 2000 (1 row) --- expect analyze_count = 2, autoanalyze_count = 2, and n_mod_since_analyze = 1000 since ANALYZE executed --- in same transaction for the insert statement. +-- expect analyze_count = 2, autoanalyze_count = 2, and n_mod_since_analyze = 1000 since ANALYZE +-- executed in same transaction for the insert statement. select analyze_count, autoanalyze_count, n_mod_since_analyze from pg_stat_all_tables where relname = 'autostatstbl'; analyze_count | autoanalyze_count | n_mod_since_analyze ---------------+-------------------+---------------------