Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions packages/pg-pool/diagnostics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict'

const noopChannel = { hasSubscribers: false }

let poolConnectChannel = noopChannel
let poolReleaseChannel = noopChannel
let poolRemoveChannel = noopChannel

try {
let dc
if (typeof process.getBuiltInModule === 'function') {
dc = process.getBuiltInModule('diagnostics_channel')
} else {
dc = require('diagnostics_channel')
}
if (typeof dc.tracingChannel === 'function') {
poolConnectChannel = dc.tracingChannel('pg:pool:connect')
}
if (typeof dc.channel === 'function') {
poolReleaseChannel = dc.channel('pg:pool:release')
poolRemoveChannel = dc.channel('pg:pool:remove')
}
} catch (e) {
// diagnostics_channel not available (non-Node environment)
}

Comment on lines +5 to +26
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diagnostics bootstrap logic here is very similar to packages/pg/lib/diagnostics.js (built-in-module fallback, feature detection, noop channels). To reduce duplication and avoid the two implementations drifting over time, consider extracting a small shared helper (even internal) for: loading diagnostics_channel safely, creating tracing channels, and creating plain channels.

Suggested change
let poolConnectChannel = noopChannel
let poolReleaseChannel = noopChannel
let poolRemoveChannel = noopChannel
try {
let dc
if (typeof process.getBuiltInModule === 'function') {
dc = process.getBuiltInModule('diagnostics_channel')
} else {
dc = require('diagnostics_channel')
}
if (typeof dc.tracingChannel === 'function') {
poolConnectChannel = dc.tracingChannel('pg:pool:connect')
}
if (typeof dc.channel === 'function') {
poolReleaseChannel = dc.channel('pg:pool:release')
poolRemoveChannel = dc.channel('pg:pool:remove')
}
} catch (e) {
// diagnostics_channel not available (non-Node environment)
}
function loadDiagnosticsChannel() {
try {
if (typeof process.getBuiltInModule === 'function') {
return process.getBuiltInModule('diagnostics_channel')
}
return require('diagnostics_channel')
} catch (e) {
// diagnostics_channel not available (non-Node environment)
return null
}
}
function createTracingChannel(dc, name) {
if (dc && typeof dc.tracingChannel === 'function') {
return dc.tracingChannel(name)
}
return noopChannel
}
function createChannel(dc, name) {
if (dc && typeof dc.channel === 'function') {
return dc.channel(name)
}
return noopChannel
}
const diagnosticsChannel = loadDiagnosticsChannel()
const poolConnectChannel = createTracingChannel(diagnosticsChannel, 'pg:pool:connect')
const poolReleaseChannel = createChannel(diagnosticsChannel, 'pg:pool:release')
const poolRemoveChannel = createChannel(diagnosticsChannel, 'pg:pool:remove')

Copilot uses AI. Check for mistakes.
module.exports = { poolConnectChannel, poolReleaseChannel, poolRemoveChannel }
34 changes: 34 additions & 0 deletions packages/pg-pool/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'
const EventEmitter = require('events').EventEmitter
const { poolConnectChannel, poolReleaseChannel, poolRemoveChannel } = require('./diagnostics')

const NOOP = function () {}

Expand Down Expand Up @@ -178,6 +179,10 @@ class Pool extends EventEmitter {

this._clients = this._clients.filter((c) => c !== client)
const context = this
if (poolRemoveChannel.hasSubscribers) {
poolRemoveChannel.publish({ client: { processID: client.processID } })
}

client.end(() => {
context.emit('remove', client)

Expand All @@ -196,6 +201,31 @@ class Pool extends EventEmitter {
const response = promisify(this.Promise, cb)
const result = response.result

if (poolConnectChannel.hasSubscribers) {
const context = {
pool: {
totalCount: this.totalCount,
idleCount: this.idleCount,
waitingCount: this.waitingCount,
maxSize: this.options.max,
},
}
const origCb = response.callback
const enrichedCb = (err, client, done) => {
if (client) context.client = { processID: client.processID, reused: !!client._poolUseCount }
return origCb(err, client, done)
}
poolConnectChannel.traceCallback(
(tracedCb) => {
response.callback = tracedCb
},
0,
context,
null,
enrichedCb
)
}

// if we don't have to connect a new client, don't do so
if (this._isFull() || this._idle.length) {
// if we have idle clients schedule a pulse immediately
Expand Down Expand Up @@ -388,6 +418,10 @@ class Pool extends EventEmitter {

this.emit('release', err, client)

if (poolReleaseChannel.hasSubscribers) {
poolReleaseChannel.publish({ client: { processID: client.processID }, error: err || undefined })
}

// TODO(bmc): expose a proper, public interface _queryable and _ending
if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) {
if (client._poolUseCount >= this.options.maxUses) {
Expand Down
193 changes: 193 additions & 0 deletions packages/pg-pool/test/diagnostics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
'use strict'

const expect = require('expect.js')
const EventEmitter = require('events').EventEmitter
const describe = require('mocha').describe
const it = require('mocha').it
const dc = require('diagnostics_channel')
const Pool = require('../')

const hasTracingChannel = typeof dc.tracingChannel === 'function'

function mockClient(methods) {
return function () {
const client = new EventEmitter()
client.end = function (cb) {
if (cb) process.nextTick(cb)
}
client._queryable = true
client._ending = false
client.processID = 12345
Object.assign(client, methods)
return client
}
}

describe('diagnostics channels', function () {
describe('pg:pool:connect', function () {
;(hasTracingChannel ? it : it.skip)('publishes start event when connect is called', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let capturedContext
const channel = dc.tracingChannel('pg:pool:connect')
const subs = {
start: (ctx) => {
capturedContext = ctx
},
end: () => {},
asyncStart: () => {},
asyncEnd: () => {},
error: () => {},
}

channel.subscribe(subs)

pool.connect(function (err, client, release) {
if (err) return done(err)
release()
pool.end(() => {
expect(capturedContext).to.be.ok()
expect(capturedContext.pool).to.be.ok()
expect(capturedContext.pool.maxSize).to.be(10)
expect(capturedContext.pool.totalCount).to.be.a('number')

channel.unsubscribe(subs)
done()
})
})
})
;(hasTracingChannel ? it : it.skip)('enriches context with client info on asyncEnd', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

const channel = dc.tracingChannel('pg:pool:connect')
const subs = {
start: () => {},
end: () => {},
asyncStart: () => {},
asyncEnd: (ctx) => {
expect(ctx.client).to.be.ok()
expect(ctx.client.processID).to.be(12345)

channel.unsubscribe(subs)
done()
},
error: () => {},
}

channel.subscribe(subs)

pool.connect(function (err, client, release) {
if (err) return done(err)
release()
pool.end()
})
})
})

describe('pg:pool:release', function () {
it('publishes when a client is released', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let releaseMessage
const channel = dc.channel('pg:pool:release')
const onMessage = (msg) => {
releaseMessage = msg
}
channel.subscribe(onMessage)

pool.connect(function (err, client, release) {
if (err) return done(err)
release()
pool.end(() => {
expect(releaseMessage).to.be.ok()
expect(releaseMessage.client).to.be.ok()
expect(releaseMessage.client.processID).to.be(12345)

channel.unsubscribe(onMessage)
done()
})
})
})

it('includes error when released with error', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let releaseMessage
const channel = dc.channel('pg:pool:release')
const onMessage = (msg) => {
releaseMessage = msg
}
channel.subscribe(onMessage)

pool.connect(function (err, client, release) {
if (err) return done(err)
const releaseError = new Error('test error')
release(releaseError)
pool.end(() => {
expect(releaseMessage).to.be.ok()
expect(releaseMessage.error).to.be(releaseError)

channel.unsubscribe(onMessage)
done()
})
})
})
})

describe('pg:pool:remove', function () {
it('publishes when a client is removed', function (done) {
const pool = new Pool({
Client: mockClient({
connect: function (cb) {
process.nextTick(() => cb(null))
},
}),
})

let removeMessage
const channel = dc.channel('pg:pool:remove')
const onMessage = (msg) => {
removeMessage = msg
}
channel.subscribe(onMessage)

pool.connect(function (err, client, release) {
if (err) return done(err)
// release with error to trigger removal
release(new Error('force remove'))
pool.end(() => {
expect(removeMessage).to.be.ok()
expect(removeMessage.client).to.be.ok()
expect(removeMessage.client.processID).to.be(12345)

channel.unsubscribe(onMessage)
done()
})
})
})
})
})
72 changes: 59 additions & 13 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const Query = require('./query')
const defaults = require('./defaults')
const Connection = require('./connection')
const crypto = require('./crypto/utils')
const { queryChannel, connectionChannel } = require('./diagnostics')

const activeQueryDeprecationNotice = nodeUtils.deprecate(
() => {},
Expand Down Expand Up @@ -207,19 +208,33 @@ class Client extends EventEmitter {

connect(callback) {
if (callback) {
this._connect(callback)
if (connectionChannel.hasSubscribers) {
const context = {
connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl },
}
connectionChannel.traceCallback((tracedCb) => this._connect(tracedCb), 0, context, null, callback)
} else {
this._connect(callback)
}
return
}

return new this._Promise((resolve, reject) => {
this._connect((error) => {
if (error) {
reject(error)
} else {
resolve(this)
}
const connectPromise = () =>
new this._Promise((resolve, reject) => {
this._connect((error) => {
if (error) reject(error)
else resolve(this)
})
})
})

if (connectionChannel.hasSubscribers) {
const context = {
connection: { database: this.database, host: this.host, port: this.port, user: this.user, ssl: !!this.ssl },
}
return connectionChannel.tracePromise(connectPromise, context)
}

return connectPromise()
}

_attachListeners(con) {
Expand Down Expand Up @@ -687,11 +702,42 @@ class Client extends EventEmitter {
return result
}

if (this._queryQueue.length > 0) {
queryQueueLengthDeprecationNotice()
const enqueue = () => {
if (this._queryQueue.length > 0) queryQueueLengthDeprecationNotice()
this._queryQueue.push(query)
this._pulseQueryQueue()
}

if (queryChannel.hasSubscribers) {
const context = {
query: { text: query.text, name: query.name, rowMode: query._rowMode },
client: {
database: this.database,
host: this.host,
port: this.port,
user: this.user,
processID: this.processID,
ssl: !!this.ssl,
},
}
const origCb = query.callback
const enrichedCb = (err, res) => {
if (res) context.result = { rowCount: res.rowCount, command: res.command }
return origCb(err, res)
}
queryChannel.traceCallback(
(tracedCb) => {
query.callback = tracedCb
enqueue()
},
0,
context,
null,
enrichedCb
)
} else {
enqueue()
}
this._queryQueue.push(query)
this._pulseQueryQueue()
return result
}

Expand Down
Loading
Loading