diff --git a/docs/pages/apis/client.mdx b/docs/pages/apis/client.mdx index f68542672..ad8aadf4d 100644 --- a/docs/pages/apis/client.mdx +++ b/docs/pages/apis/client.mdx @@ -173,6 +173,63 @@ await client.end() console.log('client has disconnected') ``` +## client.getTransactionStatus + +`client.getTransactionStatus() => string | null` + +Returns the current transaction status of the client connection. This can be useful for debugging transaction state issues or implementing custom transaction management logic. + +**Return values:** + +- `'I'` - Idle (not in a transaction) +- `'T'` - Transaction active (BEGIN has been issued) +- `'E'` - Error (transaction aborted, requires ROLLBACK) +- `null` - Initial state or not supported (native client) + +The transaction status is updated after each query completes based on the PostgreSQL backend's `ReadyForQuery` message. + +**Example: Checking transaction state** + +```js +import { Client } from 'pg' +const client = new Client() +await client.connect() + +await client.query('BEGIN') +console.log(client.getTransactionStatus()) // 'T' - in transaction + +await client.query('SELECT * FROM users') +console.log(client.getTransactionStatus()) // 'T' - still in transaction + +await client.query('COMMIT') +console.log(client.getTransactionStatus()) // 'I' - idle + +await client.end() +``` + +**Example: Handling transaction errors** + +```js +import { Client } from 'pg' +const client = new Client() +await client.connect() + +await client.query('BEGIN') +try { + await client.query('INVALID SQL') +} catch (err) { + console.log(client.getTransactionStatus()) // 'E' - error state + + // Must rollback to recover + await client.query('ROLLBACK') + console.log(client.getTransactionStatus()) // 'I' - idle again +} + +await client.end() +``` + +**Note:** This method is not supported in the native client and will always return `null`. + ## events ### error diff --git a/docs/pages/apis/pool.mdx b/docs/pages/apis/pool.mdx index fbe0279e1..370bc708c 100644 --- a/docs/pages/apis/pool.mdx +++ b/docs/pages/apis/pool.mdx @@ -56,6 +56,11 @@ type Config = { // regardless of whether they are idle. It's useful to force rotation of connection pools through // middleware so that you can rotate the underlying servers. The default is disabled (value of zero) maxLifetimeSeconds?: number + + // When true, automatically removes connections with open or failed transactions from the pool + // when they are released. This provides protection against "connection poisoning" where + // uncommitted transactions leak across different requests. Default is false. + evictOnOpenTransaction?: boolean } ``` @@ -70,7 +75,8 @@ const pool = new Pool({ max: 20, idleTimeoutMillis: 30000, connectionTimeoutMillis: 2000, - maxLifetimeSeconds: 60 + maxLifetimeSeconds: 60, + evictOnOpenTransaction: true }) ``` diff --git a/packages/pg-native/index.js b/packages/pg-native/index.js index 8c83406bb..1c18241db 100644 --- a/packages/pg-native/index.js +++ b/packages/pg-native/index.js @@ -6,6 +6,10 @@ const types = require('pg-types') const buildResult = require('./lib/build-result') const CopyStream = require('./lib/copy-stream') +// https://www.postgresql.org/docs/current/libpq-status.html#LIBPQ-PQTRANSACTIONSTATUS +// 0=IDLE, 1=ACTIVE, 2=INTRANS, 3=INERROR +const statusMap = { 0: 'I', 2: 'T', 3: 'E' } + const Client = (module.exports = function (config) { if (!(this instanceof Client)) { return new Client(config) @@ -145,6 +149,10 @@ Client.prototype.escapeIdentifier = function (value) { return this.pq.escapeIdentifier(value) } +Client.prototype.getTransactionStatus = function () { + return statusMap[this.pq.transactionStatus()] ?? null +} + // export the version number so we can check it in node-postgres module.exports.version = require('./package.json').version diff --git a/packages/pg-native/package.json b/packages/pg-native/package.json index 92bf5cac2..da26c3d98 100644 --- a/packages/pg-native/package.json +++ b/packages/pg-native/package.json @@ -34,7 +34,7 @@ }, "homepage": "https://github.com/brianc/node-postgres/tree/master/packages/pg-native", "dependencies": { - "libpq": "^1.8.15", + "libpq": "^1.10.0", "pg-types": "2.2.0" }, "devDependencies": { diff --git a/packages/pg-pool/index.js b/packages/pg-pool/index.js index f53a85ab1..e33090dfd 100644 --- a/packages/pg-pool/index.js +++ b/packages/pg-pool/index.js @@ -91,6 +91,7 @@ class Pool extends EventEmitter { this.options.maxUses = this.options.maxUses || Infinity this.options.allowExitOnIdle = this.options.allowExitOnIdle || false this.options.maxLifetimeSeconds = this.options.maxLifetimeSeconds || 0 + this.options.evictOnOpenTransaction = this.options.evictOnOpenTransaction || false this.log = this.options.log || function () {} this.Client = this.options.Client || Client || require('pg').Client this.Promise = this.options.Promise || global.Promise @@ -116,6 +117,10 @@ class Pool extends EventEmitter { return this._clients.length > this.options.min } + _hasActiveTransaction(client) { + return client && (client.getTransactionStatus() === 'T' || client.getTransactionStatus() === 'E') + } + _pulseQueue() { this.log('pulse queue') if (this.ended) { @@ -363,7 +368,11 @@ class Pool extends EventEmitter { if (client._poolUseCount >= this.options.maxUses) { this.log('remove expended client') } + return this._remove(client, this._pulseQueue.bind(this)) + } + if (this.options.evictOnOpenTransaction && this._hasActiveTransaction(client)) { + this.log('remove client due to open transaction') return this._remove(client, this._pulseQueue.bind(this)) } diff --git a/packages/pg-pool/test/leaked-pool.js b/packages/pg-pool/test/leaked-pool.js new file mode 100644 index 000000000..c10f4e9f1 --- /dev/null +++ b/packages/pg-pool/test/leaked-pool.js @@ -0,0 +1,332 @@ +'use strict' + +const expect = require('expect.js') +const describe = require('mocha').describe +const it = require('mocha').it +const Pool = require('..') + +describe('leaked connection pool', function () { + describe('when evictOnOpenTransaction is true', function () { + it('removes a client with an open transaction on release', async function () { + const logMessages = [] + const pool = new Pool({ + max: 1, + log: (msg) => logMessages.push(msg), + evictOnOpenTransaction: true, + }) + const client = await pool.connect() + await client.query('BEGIN') + expect(client.getTransactionStatus()).to.be('T') + + client.release() + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + expect(logMessages).to.contain('remove client due to open transaction') + + // pool recovers by creating a fresh connection + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + + it('removes a client in a failed transaction state on release', async function () { + const pool = new Pool({ max: 1, evictOnOpenTransaction: true }) + const client = await pool.connect() + await client.query('BEGIN') + try { + await client.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // swallow the error to avoid pool close the connection + } + // The ReadyForQuery message with status 'E' may arrive on a separate I/O event. + // Issue a follow-up query to ensure it has been processed — this will also fail + // (since the transaction is aborted) but guarantees transaction status is updated. + try { + await client.query('SELECT 1') + } catch (e) { + // expected — "current transaction is aborted" + } + expect(client.getTransactionStatus()).to.be('E') + + client.release() + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + + // pool recovers by creating a fresh connection + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + + it('only removes connections with open transactions, keeps idle ones', async function () { + const pool = new Pool({ max: 3, evictOnOpenTransaction: true }) + const clientA = await pool.connect() + const clientB = await pool.connect() + const clientC = await pool.connect() + + // Client A: open transaction (leaked) + await clientA.query('BEGIN') + expect(clientA.getTransactionStatus()).to.be('T') + + // Client B: normal query (idle) + await clientB.query('SELECT 1') + expect(clientB.getTransactionStatus()).to.be('I') + + // Client C: committed transaction (idle) + await clientC.query('BEGIN') + await clientC.query('COMMIT') + expect(clientC.getTransactionStatus()).to.be('I') + + clientA.release() + clientB.release() + clientC.release() + + // A was removed, B and C kept + expect(pool.totalCount).to.be(2) + expect(pool.idleCount).to.be(2) + await pool.end() + }) + + describe('pool.query', function () { + it('removes a client after pool.query leaks transaction via BEGIN', async function () { + const logMessages = [] + const pool = new Pool({ max: 1, log: (msg) => logMessages.push(msg), evictOnOpenTransaction: true }) + + await pool.query('BEGIN') + + // Client auto-released with txStatus='T', should be removed + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + expect(logMessages).to.contain('remove client due to open transaction') + + // Verify pool recovers + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + + it('removes a client after pool.query in failed transaction state', async function () { + const pool = new Pool({ max: 1 }) + + await pool.query('BEGIN') + + try { + await pool.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // Expected error + } + + // Client with txStatus='E' should be removed + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + + // Pool recovers + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + }) + }) + + describe('when evictOnOpenTransaction is false or default', function () { + it('keeps client with open transaction when explicitly false', async function () { + const logMessages = [] + const pool = new Pool({ + max: 1, + log: (msg) => logMessages.push(msg), + evictOnOpenTransaction: false, + }) + const client = await pool.connect() + await client.query('BEGIN') + expect(client.getTransactionStatus()).to.be('T') + + client.release() + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + expect(logMessages).to.not.contain('remove client due to open transaction') + + // Verify pool can still execute queries (connection was reused) + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + + await pool.end() + }) + + it('keeps client with open transaction when option not specified (default)', async function () { + const logMessages = [] + const pool = new Pool({ + max: 1, + log: (msg) => logMessages.push(msg), + }) + const client = await pool.connect() + await client.query('BEGIN') + expect(client.getTransactionStatus()).to.be('T') + + client.release() + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + expect(logMessages).to.not.contain('remove client due to open transaction') + + // Verify pool can still execute queries (connection was reused) + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + + await pool.end() + }) + + it('keeps client in failed transaction state when explicitly false', async function () { + const pool = new Pool({ max: 1, evictOnOpenTransaction: false }) + const client = await pool.connect() + await client.query('BEGIN') + try { + await client.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // swallow the error + } + // Issue a follow-up query to ensure transaction status is updated to 'E' + try { + await client.query('SELECT 1') + } catch (e) { + // expected — "current transaction is aborted" + } + expect(client.getTransactionStatus()).to.be('E') + + client.release() + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + + // Get a new client and manually ROLLBACK the failed transaction + const client2 = await pool.connect() + await client2.query('ROLLBACK') + const { rows } = await client2.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + client2.release() + + await pool.end() + }) + + it('keeps client in failed transaction state when option not specified (default)', async function () { + const pool = new Pool({ max: 1 }) + const client = await pool.connect() + await client.query('BEGIN') + try { + await client.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // swallow the error + } + // Issue a follow-up query to ensure transaction status is updated to 'E' + try { + await client.query('SELECT 1') + } catch (e) { + // expected — "current transaction is aborted" + } + expect(client.getTransactionStatus()).to.be('E') + + client.release() + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + + // Get a new client and manually ROLLBACK the failed transaction + const client2 = await pool.connect() + await client2.query('ROLLBACK') + const { rows } = await client2.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + client2.release() + + await pool.end() + }) + + it('keeps all clients with mixed transaction states', async function () { + const logMessages = [] + const pool = new Pool({ + max: 3, + evictOnOpenTransaction: false, + log: (msg) => logMessages.push(msg), + }) + const clientA = await pool.connect() + const clientB = await pool.connect() + const clientC = await pool.connect() + + // Client A: open transaction (leaked) + await clientA.query('BEGIN') + expect(clientA.getTransactionStatus()).to.be('T') + + // Client B: normal query (idle) + await clientB.query('SELECT 1') + expect(clientB.getTransactionStatus()).to.be('I') + + // Client C: committed transaction (idle) + await clientC.query('BEGIN') + await clientC.query('COMMIT') + expect(clientC.getTransactionStatus()).to.be('I') + + clientA.release() + clientB.release() + clientC.release() + + // All clients kept in pool + expect(pool.totalCount).to.be(3) + expect(pool.idleCount).to.be(3) + expect(logMessages).to.not.contain('remove client due to open transaction') + + await pool.end() + }) + + describe('pool.query', function () { + it('keeps client after pool.query leaks transaction via BEGIN (default)', async function () { + const logMessages = [] + const pool = new Pool({ max: 1, log: (msg) => logMessages.push(msg) }) + + await pool.query('BEGIN') + + // Client auto-released with txStatus='T', should be kept + expect(pool.totalCount).to.be(1) // NOT removed + expect(pool.idleCount).to.be(1) + expect(logMessages).to.not.contain('remove client due to open transaction') + + // Verify pool still works + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + + await pool.end() + }) + + it('removes client on pool.query error even when evictOnOpenTransaction is false', async function () { + const pool = new Pool({ max: 1, evictOnOpenTransaction: false }) + + await pool.query('BEGIN') + + try { + await pool.query('SELECT invalid_column FROM nonexistent_table') + } catch (e) { + // Expected error - pool.query calls client.release(err) which removes the client + } + + // Client is removed because pool.query releases with error argument + // This is independent of evictOnOpenTransaction setting + expect(pool.totalCount).to.be(0) + expect(pool.idleCount).to.be(0) + + // Pool recovers with a fresh connection + const { rows } = await pool.query('SELECT 1 as num') + expect(rows[0].num).to.be(1) + expect(pool.totalCount).to.be(1) + expect(pool.idleCount).to.be(1) + + await pool.end() + }) + }) + }) +}) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 459439037..865109edb 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -68,6 +68,7 @@ class Client extends EventEmitter { this._connectionError = false this._queryable = true this._activeQuery = null + this._txStatus = null this.enableChannelBinding = Boolean(c.enableChannelBinding) // set true to use SCRAM-SHA-256-PLUS when offered this.connection = @@ -356,6 +357,7 @@ class Client extends EventEmitter { } const activeQuery = this._getActiveQuery() this._activeQuery = null + this._txStatus = msg?.status ?? null this.readyForQuery = true if (activeQuery) { activeQuery.handleReadyForQuery(this.connection) @@ -693,6 +695,10 @@ class Client extends EventEmitter { this.connection.unref() } + getTransactionStatus() { + return this._txStatus + } + end(cb) { this._ending = true diff --git a/packages/pg/lib/native/client.js b/packages/pg/lib/native/client.js index 44d5b5c64..fbab40b60 100644 --- a/packages/pg/lib/native/client.js +++ b/packages/pg/lib/native/client.js @@ -313,3 +313,7 @@ Client.prototype.getTypeParser = function (oid, format) { Client.prototype.isConnected = function () { return this._connected } + +Client.prototype.getTransactionStatus = function () { + return this.native.getTransactionStatus() +} diff --git a/packages/pg/test/integration/client/txstatus-tests.js b/packages/pg/test/integration/client/txstatus-tests.js new file mode 100644 index 000000000..cb8b740f8 --- /dev/null +++ b/packages/pg/test/integration/client/txstatus-tests.js @@ -0,0 +1,82 @@ +'use strict' +const helper = require('./test-helper') +const suite = new helper.Suite() +const pg = helper.pg +const assert = require('assert') + +suite.test('txStatus tracking', function (done) { + const client = new pg.Client() + client.connect( + assert.success(function () { + // Run a simple query to initialize txStatus + client.query( + 'SELECT 1', + assert.success(function () { + // Test 1: Initial state after query (should be idle) + assert.equal(client.getTransactionStatus(), 'I', 'should start in idle state') + + // Test 2: BEGIN transaction + client.query( + 'BEGIN', + assert.success(function () { + assert.equal(client.getTransactionStatus(), 'T', 'should be in transaction state') + + // Test 3: COMMIT + client.query( + 'COMMIT', + assert.success(function () { + assert.equal(client.getTransactionStatus(), 'I', 'should return to idle after commit') + + client.end(done) + }) + ) + }) + ) + }) + ) + }) + ) +}) + +suite.test('txStatus error state', function (done) { + const client = new pg.Client() + client.connect( + assert.success(function () { + // Run a simple query to initialize txStatus + client.query( + 'SELECT 1', + assert.success(function () { + client.query( + 'BEGIN', + assert.success(function () { + // Execute invalid SQL to trigger error state + client.query('INVALID SQL SYNTAX', function (err) { + assert(err, 'should receive error from invalid query') + + // Issue a sync query to ensure ReadyForQuery has been processed + // This guarantees transaction status has been updated + client.query('SELECT 1', function () { + // This callback fires after ReadyForQuery is processed + assert.equal(client.getTransactionStatus(), 'E', 'should be in error state') + + // Rollback to recover + client.query( + 'ROLLBACK', + assert.success(function () { + assert.equal( + client.getTransactionStatus(), + 'I', + 'should return to idle after rollback from error' + ) + client.end(done) + }) + ) + }) + }) + }) + ) + }) + ) + }) + ) +}) diff --git a/yarn.lock b/yarn.lock index 8628e175d..447beedf3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5968,13 +5968,13 @@ levn@~0.3.0: prelude-ls "~1.1.2" type-check "~0.3.2" -libpq@^1.8.15: - version "1.8.15" - resolved "https://registry.yarnpkg.com/libpq/-/libpq-1.8.15.tgz#bf9cea8e59e1a4a911d06df01d408213a09925ad" - integrity sha512-4lSWmly2Nsj3LaTxxtFmJWuP3Kx+0hYHEd+aNrcXEWT0nKWaPd9/QZPiMkkC680zeALFGHQdQWjBvnilL+vgWA== +libpq@^1.10.0: + version "1.10.0" + resolved "https://registry.yarnpkg.com/libpq/-/libpq-1.10.0.tgz#238d01d416abca8768aab09bc82d81af9c7ffa23" + integrity sha512-PHY+JGD3+9X5b2emXLh+WJEnz1jhczO1xs25ZH0xbMWvQi+Hd9X/mTZOrGA99Rcw/DvNjsBRlegroqigpNfaJA== dependencies: bindings "1.5.0" - nan "~2.22.2" + nan "~2.23.1" lines-and-columns@^1.1.6: version "1.1.6" @@ -6700,10 +6700,10 @@ mz@^2.5.0: object-assign "^4.0.1" thenify-all "^1.0.0" -nan@~2.22.2: - version "2.22.2" - resolved "https://registry.yarnpkg.com/nan/-/nan-2.22.2.tgz#6b504fd029fb8f38c0990e52ad5c26772fdacfbb" - integrity sha512-DANghxFkS1plDdRsX0X9pm0Z6SJNN6gBdtXfanwoZ8hooC5gosGFSBGRYHUVPz1asKA/kMRqDRdHrluZ61SpBQ== +nan@~2.23.1: + version "2.23.1" + resolved "https://registry.yarnpkg.com/nan/-/nan-2.23.1.tgz#6f86a31dd87e3d1eb77512bf4b9e14c8aded3975" + integrity sha512-r7bBUGKzlqk8oPBDYxt6Z0aEdF1G1rwlMcLk8LCOMbOzf0mG+JUfUzG4fIMWwHWP0iyaLWEQZJmtB7nOHEm/qw== nanoid@^3.3.11: version "3.3.11"