From b7f80acc89aaa354e0664580d88abfea8fb4ba2f Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Thu, 5 Mar 2026 12:40:15 -0500 Subject: [PATCH 1/3] Add diagnostics_channel TracingChannel support to pg and pg-pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enables instrumentation libraries (OpenTelemetry, etc.) to subscribe to structured events without monkey-patching. Uses TracingChannel for async context propagation and plain channels for simple events. Channels: - pg:query (TracingChannel) — query lifecycle with result enrichment - pg:connection (TracingChannel) — client connect lifecycle - pg:pool:connect (TracingChannel) — pool checkout lifecycle - pg:pool:release (plain) — client released back to pool - pg:pool:remove (plain) — client removed from pool All instrumentation is guarded by hasSubscribers for zero overhead when unused. Gracefully degrades to no-ops on Node < 19.9 or non-Node environments. Closes #3619 Co-Authored-By: Claude Opus 4.6 --- packages/pg-pool/diagnostics.js | 27 +++ packages/pg-pool/index.js | 34 +++ packages/pg-pool/test/diagnostics.js | 192 +++++++++++++++++ packages/pg/lib/client.js | 72 +++++-- packages/pg/lib/diagnostics.js | 23 ++ .../pg/test/unit/client/diagnostics-tests.js | 197 ++++++++++++++++++ 6 files changed, 532 insertions(+), 13 deletions(-) create mode 100644 packages/pg-pool/diagnostics.js create mode 100644 packages/pg-pool/test/diagnostics.js create mode 100644 packages/pg/lib/diagnostics.js create mode 100644 packages/pg/test/unit/client/diagnostics-tests.js diff --git a/packages/pg-pool/diagnostics.js b/packages/pg-pool/diagnostics.js new file mode 100644 index 000000000..99c4ce4d9 --- /dev/null +++ b/packages/pg-pool/diagnostics.js @@ -0,0 +1,27 @@ +'use strict' + +const noopChannel = { hasSubscribers: false } + +let poolConnectChannel = noopChannel +let poolReleaseChannel = noopChannel +let poolRemoveChannel = noopChannel + +try { + let dc + if (typeof process.getBuiltInModule === 'function') { + dc = process.getBuiltInModule('diagnostics_channel') + } else { + dc = require('diagnostics_channel') + } + if (typeof dc.tracingChannel === 'function') { + poolConnectChannel = dc.tracingChannel('pg:pool:connect') + } + if (typeof dc.channel === 'function') { + poolReleaseChannel = dc.channel('pg:pool:release') + poolRemoveChannel = dc.channel('pg:pool:remove') + } +} catch (e) { + // diagnostics_channel not available (non-Node environment) +} + +module.exports = { poolConnectChannel, poolReleaseChannel, poolRemoveChannel } diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index 2fbdb78d5..fee6f2b42 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -1,5 +1,6 @@ 'use strict' const EventEmitter = require('events').EventEmitter +const { poolConnectChannel, poolReleaseChannel, poolRemoveChannel } = require('./diagnostics') const NOOP = function () {} @@ -178,6 +179,10 @@ class Pool extends EventEmitter { this._clients = this._clients.filter((c) => c !== client) const context = this + if (poolRemoveChannel.hasSubscribers) { + poolRemoveChannel.publish({ client: { processID: client.processID } }) + } + client.end(() => { context.emit('remove', client) @@ -196,6 +201,31 @@ class Pool extends EventEmitter { const response = promisify(this.Promise, cb) const result = response.result + if (poolConnectChannel.hasSubscribers) { + const context = { + pool: { + totalCount: this.totalCount, + idleCount: this.idleCount, + waitingCount: this.waitingCount, + maxSize: this.options.max, + }, + } + const origCb = response.callback + const enrichedCb = (err, client, done) => { + if (client) context.client = { processID: client.processID, reused: !!client._poolUseCount } + return origCb(err, client, done) + } + poolConnectChannel.traceCallback( + (tracedCb) => { + response.callback = tracedCb + }, + 0, + context, + null, + enrichedCb + ) + } + // if we don't have to connect a new client, don't do so if (this._isFull() || this._idle.length) { // if we have idle clients schedule a pulse immediately @@ -388,6 +418,10 @@ class Pool extends EventEmitter { this.emit('release', err, client) + if (poolReleaseChannel.hasSubscribers) { + poolReleaseChannel.publish({ client: { processID: client.processID }, error: err || undefined }) + } + // TODO(bmc): expose a proper, public interface _queryable and _ending if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) { if (client._poolUseCount >= this.options.maxUses) { diff --git a/packages/pg-pool/test/diagnostics.js b/packages/pg-pool/test/diagnostics.js new file mode 100644 index 000000000..ab0b344ac --- /dev/null +++ b/packages/pg-pool/test/diagnostics.js @@ -0,0 +1,192 @@ +'use strict' + +const expect = require('expect.js') +const EventEmitter = require('events').EventEmitter +const describe = require('mocha').describe +const it = require('mocha').it +const dc = require('diagnostics_channel') +const Pool = require('../') + +function mockClient(methods) { + return function () { + const client = new EventEmitter() + client.end = function (cb) { + if (cb) process.nextTick(cb) + } + client._queryable = true + client._ending = false + client.processID = 12345 + Object.assign(client, methods) + return client + } +} + +describe('diagnostics channels', function () { + describe('pg:pool:connect', function () { + it('publishes start event when connect is called', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let capturedContext + const channel = dc.tracingChannel('pg:pool:connect') + const subs = { + start: (ctx) => { + capturedContext = ctx + }, + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => {}, + error: () => {}, + } + + channel.subscribe(subs) + + pool.connect(function (err, client, release) { + if (err) return done(err) + release() + pool.end(() => { + expect(capturedContext).to.be.ok() + expect(capturedContext.pool).to.be.ok() + expect(capturedContext.pool.maxSize).to.be(10) + expect(capturedContext.pool.totalCount).to.be.a('number') + + channel.unsubscribe(subs) + done() + }) + }) + }) + + it('enriches context with client info on asyncEnd', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + const channel = dc.tracingChannel('pg:pool:connect') + const subs = { + start: () => {}, + end: () => {}, + asyncStart: () => {}, + asyncEnd: (ctx) => { + expect(ctx.client).to.be.ok() + expect(ctx.client.processID).to.be(12345) + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + pool.connect(function (err, client, release) { + if (err) return done(err) + release() + pool.end() + }) + }) + }) + + describe('pg:pool:release', function () { + it('publishes when a client is released', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let releaseMessage + const channel = dc.channel('pg:pool:release') + const onMessage = (msg) => { + releaseMessage = msg + } + channel.subscribe(onMessage) + + pool.connect(function (err, client, release) { + if (err) return done(err) + release() + pool.end(() => { + expect(releaseMessage).to.be.ok() + expect(releaseMessage.client).to.be.ok() + expect(releaseMessage.client.processID).to.be(12345) + + channel.unsubscribe(onMessage) + done() + }) + }) + }) + + it('includes error when released with error', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let releaseMessage + const channel = dc.channel('pg:pool:release') + const onMessage = (msg) => { + releaseMessage = msg + } + channel.subscribe(onMessage) + + pool.connect(function (err, client, release) { + if (err) return done(err) + const releaseError = new Error('test error') + release(releaseError) + pool.end(() => { + expect(releaseMessage).to.be.ok() + expect(releaseMessage.error).to.be(releaseError) + + channel.unsubscribe(onMessage) + done() + }) + }) + }) + }) + + describe('pg:pool:remove', function () { + it('publishes when a client is removed', function (done) { + const pool = new Pool({ + Client: mockClient({ + connect: function (cb) { + process.nextTick(() => cb(null)) + }, + }), + }) + + let removeMessage + const channel = dc.channel('pg:pool:remove') + const onMessage = (msg) => { + removeMessage = msg + } + channel.subscribe(onMessage) + + pool.connect(function (err, client, release) { + if (err) return done(err) + // release with error to trigger removal + release(new Error('force remove')) + pool.end(() => { + expect(removeMessage).to.be.ok() + expect(removeMessage.client).to.be.ok() + expect(removeMessage.client.processID).to.be(12345) + + channel.unsubscribe(onMessage) + done() + }) + }) + }) + }) +}) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 9200dded6..1b5f9afb8 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -9,6 +9,7 @@ const Query = require('./query') const defaults = require('./defaults') const Connection = require('./connection') const crypto = require('./crypto/utils') +const { queryChannel, connectionChannel } = require('./diagnostics') const activeQueryDeprecationNotice = nodeUtils.deprecate( () => {}, @@ -207,19 +208,33 @@ class Client extends EventEmitter { connect(callback) { if (callback) { - this._connect(callback) + if (connectionChannel.hasSubscribers) { + const context = { + connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, + } + connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback) + } else { + this._connect(callback) + } return } - return new this._Promise((resolve, reject) => { - this._connect((error) => { - if (error) { - reject(error) - } else { - resolve(this) - } + const connectPromise = () => + new this._Promise((resolve, reject) => { + this._connect((error) => { + if (error) reject(error) + else resolve(this) + }) }) - }) + + if (connectionChannel.hasSubscribers) { + const context = { + connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl }, + } + return connectionChannel.tracePromise(connectPromise, context) + } + + return connectPromise() } _attachListeners(con) { @@ -687,11 +702,42 @@ class Client extends EventEmitter { return result } - if (this._queryQueue.length > 0) { - queryQueueLengthDeprecationNotice() + const enqueue = () => { + if (this._queryQueue.length > 0) queryQueueLengthDeprecationNotice() + this._queryQueue.push(query) + this._pulseQueryQueue() + } + + if (queryChannel.hasSubscribers) { + const context = { + query: { text: query.text, name: query.name, rowMode: query._rowMode }, + client: { + database: this.database, + host: this.host, + port: this.port, + user: this.user, + processID: this.processID, + ssl: !!this.ssl, + }, + } + const origCb = query.callback + const enrichedCb = (err, res) => { + if (res) context.result = { rowCount: res.rowCount, command: res.command } + return origCb(err, res) + } + queryChannel.traceCallback( + (tracedCb) => { + query.callback = tracedCb + enqueue() + }, + 0, + context, + null, + enrichedCb + ) + } else { + enqueue() } - this._queryQueue.push(query) - this._pulseQueryQueue() return result } diff --git a/packages/pg/lib/diagnostics.js b/packages/pg/lib/diagnostics.js new file mode 100644 index 000000000..ca991b535 --- /dev/null +++ b/packages/pg/lib/diagnostics.js @@ -0,0 +1,23 @@ +'use strict' + +const noopChannel = { hasSubscribers: false } + +let queryChannel = noopChannel +let connectionChannel = noopChannel + +try { + let dc + if (typeof process.getBuiltInModule === 'function') { + dc = process.getBuiltInModule('diagnostics_channel') + } else { + dc = require('diagnostics_channel') + } + if (typeof dc.tracingChannel === 'function') { + queryChannel = dc.tracingChannel('pg:query') + connectionChannel = dc.tracingChannel('pg:connection') + } +} catch (e) { + // diagnostics_channel not available (non-Node environment) +} + +module.exports = { queryChannel, connectionChannel } diff --git a/packages/pg/test/unit/client/diagnostics-tests.js b/packages/pg/test/unit/client/diagnostics-tests.js new file mode 100644 index 000000000..41bd264fd --- /dev/null +++ b/packages/pg/test/unit/client/diagnostics-tests.js @@ -0,0 +1,197 @@ +'use strict' +const helper = require('./test-helper') +const assert = require('assert') +const dc = require('diagnostics_channel') + +const suite = new helper.Suite() +const test = suite.test.bind(suite) + +test('query diagnostics channel', function () { + test('publishes start and asyncEnd on successful query', function (done) { + const client = helper.client() + client.connection.emit('readyForQuery') + + const events = [] + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => events.push({ type: 'start', context: ctx }), + end: () => {}, + asyncStart: () => {}, + asyncEnd: (ctx) => { + events.push({ type: 'asyncEnd', context: ctx }) + + // asyncEnd fires after the callback, so check everything here + assert.equal(events.length, 2) + assert.equal(events[0].type, 'start') + assert.equal(events[0].context.query.text, 'SELECT 1') + assert.equal(events[0].context.client.database, client.database) + + assert.equal(events[1].type, 'asyncEnd') + assert.equal(events[1].context.result.command, 'SELECT') + assert.equal(events[1].context.result.rowCount, 1) + + channel.unsubscribe(subs) + done() + }, + error: (ctx) => events.push({ type: 'error', context: ctx }), + } + + channel.subscribe(subs) + + client.query('SELECT 1', (err, res) => { + assert.ifError(err) + }) + + // simulate query execution + client.connection.emit('rowDescription', { fields: [{ name: 'col' }] }) + client.connection.emit('dataRow', { fields: ['value'] }) + client.connection.emit('commandComplete', { text: 'SELECT 1' }) + client.connection.emit('readyForQuery') + }) + + test('publishes error on failed query', function (done) { + const client = helper.client() + client.connection.emit('readyForQuery') + + const events = [] + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => events.push({ type: 'start', context: ctx }), + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => {}, + error: (ctx) => { + events.push({ type: 'error', context: ctx }) + + const startEvent = events.find((e) => e.type === 'start') + assert.ok(startEvent) + assert.equal(startEvent.context.query.text, 'BAD QUERY') + + channel.unsubscribe(subs) + done() + }, + } + + channel.subscribe(subs) + + client.query('BAD QUERY', (err) => { + assert.ok(err) + }) + + // simulate error + client.connection.emit('errorMessage', { + severity: 'ERROR', + message: 'syntax error', + }) + }) + + test('query context includes client info', function (done) { + const client = helper.client({ database: 'testdb', host: 'localhost', port: 5432, user: 'testuser' }) + client.connection.emit('readyForQuery') + + let capturedContext + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => { + capturedContext = ctx + }, + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => { + assert.equal(capturedContext.client.host, 'localhost') + assert.equal(capturedContext.client.user, 'testuser') + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + client.query('SELECT 1', () => {}) + + client.connection.emit('rowDescription', { fields: [{ name: 'col' }] }) + client.connection.emit('dataRow', { fields: ['value'] }) + client.connection.emit('commandComplete', { text: 'SELECT 1' }) + client.connection.emit('readyForQuery') + }) + + test('promise query publishes diagnostics', function (done) { + const client = helper.client() + client.connection.emit('readyForQuery') + + const events = [] + const channel = dc.tracingChannel('pg:query') + + const subs = { + start: (ctx) => events.push({ type: 'start', context: ctx }), + end: () => {}, + asyncStart: () => {}, + asyncEnd: (ctx) => { + events.push({ type: 'asyncEnd', context: ctx }) + + assert.ok(events.find((e) => e.type === 'start')) + assert.equal(events[0].context.query.text, 'SELECT 1') + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + client.query('SELECT 1').then(() => {}) + + client.connection.emit('rowDescription', { fields: [{ name: 'col' }] }) + client.connection.emit('dataRow', { fields: ['value'] }) + client.connection.emit('commandComplete', { text: 'SELECT 1' }) + client.connection.emit('readyForQuery') + }) +}) + +test('connection diagnostics channel', function () { + test('publishes start on connect with callback', function (done) { + const Connection = require('../../../lib/connection') + const { Client } = helper + + let capturedContext + const channel = dc.tracingChannel('pg:connection') + + const subs = { + start: (ctx) => { + capturedContext = ctx + }, + end: () => {}, + asyncStart: () => {}, + asyncEnd: () => { + assert.ok(capturedContext) + assert.equal(capturedContext.connection.database, 'testdb') + assert.equal(capturedContext.connection.host, 'myhost') + + channel.unsubscribe(subs) + done() + }, + error: () => {}, + } + + channel.subscribe(subs) + + const connection = new Connection({ stream: 'no' }) + connection.startup = function () {} + connection.connect = function () {} + const client = new Client({ connection: connection, database: 'testdb', host: 'myhost', port: 5432 }) + + client.connect((err) => { + assert.ifError(err) + }) + + // simulate successful connection + connection.emit('connect') + connection.emit('readyForQuery') + }) +}) From 91ace72b01d1194d2f816e66e0c796c7c58b8e30 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Thu, 5 Mar 2026 13:13:29 -0500 Subject: [PATCH 2/3] Skip TracingChannel tests on Node < 19.9 TracingChannel is not available on Node 18 LTS. Skip the tracing-dependent tests gracefully instead of failing. Co-Authored-By: Claude Opus 4.6 --- packages/pg-pool/test/diagnostics.js | 6 ++++-- .../pg/test/unit/client/diagnostics-tests.js | 18 +++++++++++------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/packages/pg-pool/test/diagnostics.js b/packages/pg-pool/test/diagnostics.js index ab0b344ac..1ce6cb6c0 100644 --- a/packages/pg-pool/test/diagnostics.js +++ b/packages/pg-pool/test/diagnostics.js @@ -7,6 +7,8 @@ const it = require('mocha').it const dc = require('diagnostics_channel') const Pool = require('../') +const hasTracingChannel = typeof dc.tracingChannel === 'function' + function mockClient(methods) { return function () { const client = new EventEmitter() @@ -23,7 +25,7 @@ function mockClient(methods) { describe('diagnostics channels', function () { describe('pg:pool:connect', function () { - it('publishes start event when connect is called', function (done) { + ;(hasTracingChannel ? it : it.skip)('publishes start event when connect is called', function (done) { const pool = new Pool({ Client: mockClient({ connect: function (cb) { @@ -61,7 +63,7 @@ describe('diagnostics channels', function () { }) }) - it('enriches context with client info on asyncEnd', function (done) { + ;(hasTracingChannel ? it : it.skip)('enriches context with client info on asyncEnd', function (done) { const pool = new Pool({ Client: mockClient({ connect: function (cb) { diff --git a/packages/pg/test/unit/client/diagnostics-tests.js b/packages/pg/test/unit/client/diagnostics-tests.js index 41bd264fd..46aed29f9 100644 --- a/packages/pg/test/unit/client/diagnostics-tests.js +++ b/packages/pg/test/unit/client/diagnostics-tests.js @@ -3,11 +3,15 @@ const helper = require('./test-helper') const assert = require('assert') const dc = require('diagnostics_channel') +const hasTracingChannel = typeof dc.tracingChannel === 'function' + const suite = new helper.Suite() const test = suite.test.bind(suite) +// pass undefined as callback to skip when TracingChannel is unavailable +const testTracing = (name, cb) => test(name, hasTracingChannel ? cb : undefined) -test('query diagnostics channel', function () { - test('publishes start and asyncEnd on successful query', function (done) { +testTracing('query diagnostics channel', function () { + testTracing('publishes start and asyncEnd on successful query', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -50,7 +54,7 @@ test('query diagnostics channel', function () { client.connection.emit('readyForQuery') }) - test('publishes error on failed query', function (done) { + testTracing('publishes error on failed query', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -87,7 +91,7 @@ test('query diagnostics channel', function () { }) }) - test('query context includes client info', function (done) { + testTracing('query context includes client info', function (done) { const client = helper.client({ database: 'testdb', host: 'localhost', port: 5432, user: 'testuser' }) client.connection.emit('readyForQuery') @@ -120,7 +124,7 @@ test('query diagnostics channel', function () { client.connection.emit('readyForQuery') }) - test('promise query publishes diagnostics', function (done) { + testTracing('promise query publishes diagnostics', function (done) { const client = helper.client() client.connection.emit('readyForQuery') @@ -154,8 +158,8 @@ test('query diagnostics channel', function () { }) }) -test('connection diagnostics channel', function () { - test('publishes start on connect with callback', function (done) { +testTracing('connection diagnostics channel', function () { + testTracing('publishes start on connect with callback', function (done) { const Connection = require('../../../lib/connection') const { Client } = helper From 5b50561d177235048c540f4c9325e93a2a4f37f4 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Thu, 5 Mar 2026 14:04:40 -0500 Subject: [PATCH 3/3] fix: format --- packages/pg-pool/test/diagnostics.js | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/pg-pool/test/diagnostics.js b/packages/pg-pool/test/diagnostics.js index 1ce6cb6c0..357bb7b69 100644 --- a/packages/pg-pool/test/diagnostics.js +++ b/packages/pg-pool/test/diagnostics.js @@ -62,7 +62,6 @@ describe('diagnostics channels', function () { }) }) }) - ;(hasTracingChannel ? it : it.skip)('enriches context with client info on asyncEnd', function (done) { const pool = new Pool({ Client: mockClient({