diff --git a/.gitignore b/.gitignore index 8e242c10d..a9285b01e 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ dist /.eslintcache .vscode/ manually-test-on-heroku.js +.history diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 9200dded6..d7ad094c7 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -81,6 +81,7 @@ class Client extends EventEmitter { keepAlive: c.keepAlive || false, keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0, encoding: this.connectionParameters.client_encoding || 'utf8', + targetSessionAttrs: c.targetSessionAttrs || null, }) this._queryQueue = [] this.binary = c.binary || defaults.binary @@ -155,7 +156,7 @@ class Client extends EventEmitter { } } - if (this.host && this.host.indexOf('/') === 0) { + if (!Array.isArray(this.host) && this.host && this.host.indexOf('/') === 0) { con.connect(this.host + '/.s.PGSQL.' + this.port) } else { con.connect(this.port, this.host) @@ -542,7 +543,7 @@ class Client extends EventEmitter { if (client.activeQuery === query) { const con = this.connection - if (this.host && this.host.indexOf('/') === 0) { + if (!Array.isArray(this.host) && this.host && this.host.indexOf('/') === 0) { con.connect(this.host + '/.s.PGSQL.' + this.port) } else { con.connect(this.port, this.host) diff --git a/packages/pg/lib/connection-parameters.js b/packages/pg/lib/connection-parameters.js index c153932bb..233233fa6 100644 --- a/packages/pg/lib/connection-parameters.js +++ b/packages/pg/lib/connection-parameters.js @@ -67,7 +67,8 @@ class ConnectionParameters { this.database = this.user } - this.port = parseInt(val('port', config), 10) + const rawPort = val('port', config) + this.port = Array.isArray(rawPort) ? rawPort.map((p) => parseInt(p, 10)) : parseInt(rawPort, 10) this.host = val('host', config) // "hiding" the password so it doesn't show up in stack traces diff --git a/packages/pg/lib/connection.js b/packages/pg/lib/connection.js index 027f93935..74331ba5e 100644 --- a/packages/pg/lib/connection.js +++ b/packages/pg/lib/connection.js @@ -15,9 +15,14 @@ class Connection extends EventEmitter { super() config = config || {} - this.stream = config.stream || getStream(config.ssl) - if (typeof this.stream === 'function') { - this.stream = this.stream(config) + if (typeof config.stream === 'function') { + this._streamFactory = config.stream + this._config = config + this.stream = config.stream(config) + } else { + this._streamFactory = null + this._config = null + this.stream = config.stream || getStream(config.ssl) } this._keepAlive = config.keepAlive @@ -26,6 +31,7 @@ class Connection extends EventEmitter { this.ssl = config.ssl || false this._ending = false this._emitMessage = false + this._targetSessionAttrs = config.targetSessionAttrs || null const self = this this.on('newListener', function (eventName) { if (eventName === 'message') { @@ -34,76 +40,200 @@ class Connection extends EventEmitter { }) } + _newStream() { + return this._streamFactory ? this._streamFactory(this._config) : getStream(this.ssl) + } + connect(port, host) { const self = this + const hosts = Array.isArray(host) ? host : [host] + const ports = Array.isArray(port) ? port : [port] + let hostIndex = 0 + this._connecting = true - this.stream.setNoDelay(true) - this.stream.connect(port, host) - this.stream.once('connect', function () { - if (self._keepAlive) { - self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) + const targetAttrs = this._targetSessionAttrs + + if (targetAttrs && targetAttrs !== 'any') { + let backendParams = {} + let fetchingState = false + let fetchStateRows = [] + let fetchStateError = false + + const origEmit = EventEmitter.prototype.emit.bind(self) + + const tryNextOrFail = () => { + backendParams = {} + fetchingState = false + fetchStateRows = [] + fetchStateError = false + if (hostIndex + 1 < hosts.length) { + hostIndex++ + + self.stream.removeAllListeners() + self.stream.destroy() + self.stream = self._newStream() + attemptConnect() + } else { + self.emit = origEmit + origEmit('error', new Error(`None of the hosts satisfy the target_session_attrs requirement: ${targetAttrs}`)) + } } - self.emit('connect') - }) - const reportStreamError = function (error) { - // errors about disconnections should be ignored during disconnect - if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) { - return + self.emit = function (eventName, ...args) { + if (eventName === 'parameterStatus') { + const msg = args[0] + if (msg) backendParams[msg.parameterName] = msg.parameterValue + return origEmit(eventName, ...args) + } + + if (fetchingState) { + if (eventName === 'dataRow') { + fetchStateRows.push(args[0]) + return + } + if (eventName === 'rowDescription' || eventName === 'commandComplete') { + return + } + if (eventName === 'errorMessage') { + fetchStateError = true + return + } + if (eventName === 'readyForQuery') { + fetchingState = false + if (!fetchStateError && fetchStateRows.length >= 2) { + const txReadOnly = fetchStateRows[0].fields[0]?.toString('utf8') ?? null + const isRecovery = fetchStateRows[1].fields[0]?.toString('utf8') ?? null + if (txReadOnly !== null) backendParams.default_transaction_read_only = txReadOnly + if (isRecovery !== null) backendParams.in_hot_standby = isRecovery === 't' ? 'on' : 'off' + } + fetchStateRows = [] + fetchStateError = false + if (notHostMatchTargetSessionAttrs(targetAttrs, backendParams, hostIndex, hosts)) { + tryNextOrFail() + } else { + self.emit = origEmit + origEmit('readyForQuery', args[0]) + } + return + } + } + + if (eventName === 'readyForQuery') { + if (!backendParams.in_hot_standby || !backendParams.default_transaction_read_only) { + fetchingState = true + fetchStateRows = [] + self.query('SHOW transaction_read_only; SELECT pg_catalog.pg_is_in_recovery()') + return + } + if (notHostMatchTargetSessionAttrs(targetAttrs, backendParams, hostIndex, hosts)) { + tryNextOrFail() + return + } + self.emit = origEmit + return origEmit('readyForQuery', args[0]) + } + + return origEmit(eventName, ...args) } - self.emit('error', error) } - this.stream.on('error', reportStreamError) - this.stream.on('close', function () { - self.emit('end') - }) + const attemptConnect = () => { + const currentHost = hosts[hostIndex] + const currentPort = ports[Math.min(hostIndex, ports.length - 1)] + let connected = false - if (!this.ssl) { - return this.attachListeners(this.stream) - } + self.stream.setNoDelay(true) + self.stream.connect(currentPort, currentHost) - this.stream.once('data', function (buffer) { - const responseCode = buffer.toString('utf8') - switch (responseCode) { - case 'S': // Server supports SSL connections, continue with a secure connection - break - case 'N': // Server does not support SSL connections - self.stream.end() - return self.emit('error', new Error('The server does not support SSL connections')) - default: - // Any other response byte, including 'E' (ErrorResponse) indicating a server error - self.stream.end() - return self.emit('error', new Error('There was an error establishing an SSL connection')) - } - const options = { - socket: self.stream, - } + self.stream.once('connect', function () { + connected = true + if (self._keepAlive) { + self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) + } + self.emit('connect') + }) - if (self.ssl !== true) { - Object.assign(options, self.ssl) + const reportStreamError = function (error) { + // errors about disconnections should be ignored during disconnect + if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) { + return + } - if ('key' in self.ssl) { - options.key = self.ssl.key + if (!connected && hostIndex + 1 < hosts.length) { + hostIndex++ + self.stream.removeAllListeners() + self.stream.destroy() + self.stream = self._newStream() + attemptConnect() + return } + self.emit('error', error) } - const net = require('net') - if (net.isIP && net.isIP(host) === 0) { - options.servername = host + self.stream.on('error', reportStreamError) + + const onClose = function () { + self.emit('end') } - try { - self.stream = getSecureStream(options) - } catch (err) { - return self.emit('error', err) + self.stream.on('close', onClose) + + if (!self.ssl) { + return self.attachListeners(self.stream) } - self.attachListeners(self.stream) - self.stream.on('error', reportStreamError) - self.emit('sslconnect') - }) + self.stream.once('data', function (buffer) { + const responseCode = buffer.toString('utf8') + switch (responseCode) { + case 'S': // Server supports SSL connections, continue with a secure connection + break + case 'N': // Server does not support SSL connections + self.stream.end() + return self.emit('error', new Error('The server does not support SSL connections')) + default: + // Any other response byte, including 'E' (ErrorResponse) indicating a server error + self.stream.end() + return self.emit('error', new Error('There was an error establishing an SSL connection')) + } + const options = { + socket: self.stream, + } + + if (self.ssl !== true) { + Object.assign(options, self.ssl) + + if ('key' in self.ssl) { + options.key = self.ssl.key + } + } + + const net = require('net') + if (net.isIP && net.isIP(currentHost) === 0) { + options.servername = currentHost + } + + // Remove the close listener from the TCP socket before upgrading to TLS. + // Without this, destroying the TLS stream (during host failover) closes the + // underlying TCP socket, which fires 'close' → 'end' even though we are + // still mid-connection to the next host. + const tcpStream = self.stream + tcpStream.removeListener('close', onClose) + tcpStream.removeListener('error', reportStreamError) + try { + self.stream = getSecureStream(options) + } catch (err) { + return self.emit('error', err) + } + self.attachListeners(self.stream) + self.stream.on('error', reportStreamError) + self.stream.on('close', onClose) + + self.emit('sslconnect') + }) + } + + attemptConnect() } attachListeners(stream) { @@ -218,4 +348,21 @@ class Connection extends EventEmitter { } } +function notHostMatchTargetSessionAttrs(targetAttrs, params, hostIndex, hosts) { + switch (targetAttrs) { + case 'read-write': + return params.in_hot_standby === 'on' || params.default_transaction_read_only === 'on' + case 'read-only': + return params.in_hot_standby !== 'on' && params.default_transaction_read_only !== 'on' + case 'primary': + return params.in_hot_standby === 'on' + case 'standby': + return params.in_hot_standby === 'off' + case 'prefer-standby': + return params.in_hot_standby === 'off' && hostIndex + 1 < hosts.length + default: + return false + } +} + module.exports = Connection diff --git a/packages/pg/test/unit/client/multihost-tests.js b/packages/pg/test/unit/client/multihost-tests.js new file mode 100644 index 000000000..409879e33 --- /dev/null +++ b/packages/pg/test/unit/client/multihost-tests.js @@ -0,0 +1,87 @@ +'use strict' +const assert = require('assert') +const EventEmitter = require('events') +const helper = require('./test-helper') +const { Client } = helper + +const suite = new helper.Suite() + +// Minimal fake Connection that records connect() calls and exposes emit +function makeFakeConnection() { + const con = new EventEmitter() + con.connectCalls = [] + con.connect = function (port, host) { + con.connectCalls.push({ port, host }) + } + con.on = con.addListener.bind(con) + con.once = EventEmitter.prototype.once.bind(con) + con.removeAllListeners = EventEmitter.prototype.removeAllListeners.bind(con) + con._ending = false + con.requestSsl = function () {} + con.startup = function () {} + con.end = function () {} + return con +} + +// --- port array is threaded through to con.connect() --- + +suite.test('passes port array to connection.connect', function () { + const con = makeFakeConnection() + const client = new Client({ connection: con, host: 'localhost', port: [5432, 5433] }) + client._connect(function () {}) + assert.deepStrictEqual(client.port, [5432, 5433]) + assert.deepStrictEqual(con.connectCalls[0].port, [5432, 5433]) +}) + +// --- host array is threaded through to con.connect() --- + +suite.test('passes host array to connection.connect', function () { + const con = makeFakeConnection() + const client = new Client({ connection: con, host: ['h1', 'h2'], port: 5432 }) + client._connect(function () {}) + assert.deepStrictEqual(client.host, ['h1', 'h2']) + assert.deepStrictEqual(con.connectCalls[0].host, ['h1', 'h2']) +}) + +// --- both arrays together --- + +suite.test('passes host and port arrays together to connection.connect', function () { + const con = makeFakeConnection() + const client = new Client({ connection: con, host: ['h1', 'h2'], port: [5432, 5433] }) + client._connect(function () {}) + assert.deepStrictEqual(con.connectCalls[0], { port: [5432, 5433], host: ['h1', 'h2'] }) +}) + +// --- domain socket path is not broken by the array guard --- + +suite.test('domain socket path still works with single string host', function () { + const con = makeFakeConnection() + con.connect = function (path) { + con.connectCalls.push({ path }) + } + const client = new Client({ connection: con, host: '/tmp/', port: 5432 }) + client._connect(function () {}) + assert.ok(con.connectCalls[0].path.startsWith('/tmp/'), 'should use domain socket path') +}) + +// --- array host does NOT trigger domain socket path --- + +suite.test('array host with leading-slash element does not trigger domain socket', function () { + const con = makeFakeConnection() + const client = new Client({ connection: con, host: ['/tmp/', 'localhost'], port: 5432 }) + client._connect(function () {}) + // connect() must receive (port, host) signature, not a single socket path string + const call = con.connectCalls[0] + assert.ok('port' in call, 'should call connect(port, host) not connect(socketPath)') + assert.ok('host' in call, 'should call connect(port, host) not connect(socketPath)') +}) + +// --- single host / single port unchanged --- + +suite.test('single host and port are still passed as scalars', function () { + const con = makeFakeConnection() + const client = new Client({ connection: con, host: 'localhost', port: 5432 }) + client._connect(function () {}) + assert.strictEqual(con.connectCalls[0].port, 5432) + assert.strictEqual(con.connectCalls[0].host, 'localhost') +}) diff --git a/packages/pg/test/unit/connection-parameters/multihost-tests.js b/packages/pg/test/unit/connection-parameters/multihost-tests.js new file mode 100644 index 000000000..ab634b1c1 --- /dev/null +++ b/packages/pg/test/unit/connection-parameters/multihost-tests.js @@ -0,0 +1,70 @@ +'use strict' +const assert = require('assert') +const helper = require('../test-helper') +const ConnectionParameters = require('../../../lib/connection-parameters') + +// clear process.env so defaults don't interfere +for (const key in process.env) { + delete process.env[key] +} + +const suite = new helper.Suite() + +// --- port handling --- + +suite.test('single port as number is parsed to integer', function () { + const subject = new ConnectionParameters({ port: 5432 }) + assert.strictEqual(subject.port, 5432) +}) + +suite.test('single port as string is parsed to integer', function () { + const subject = new ConnectionParameters({ port: '5433' }) + assert.strictEqual(subject.port, 5433) +}) + +suite.test('port array of numbers is preserved as integer array', function () { + const subject = new ConnectionParameters({ port: [5432, 5433] }) + assert.deepStrictEqual(subject.port, [5432, 5433]) +}) + +suite.test('port array of strings is mapped to integers', function () { + const subject = new ConnectionParameters({ port: ['5432', '5433', '5434'] }) + assert.deepStrictEqual(subject.port, [5432, 5433, 5434]) +}) + +suite.test('port array with single element is preserved as array', function () { + const subject = new ConnectionParameters({ port: [5432] }) + assert.deepStrictEqual(subject.port, [5432]) +}) + +// --- host handling --- + +suite.test('single host string is preserved', function () { + const subject = new ConnectionParameters({ host: 'localhost' }) + assert.strictEqual(subject.host, 'localhost') +}) + +suite.test('host array is passed through unchanged', function () { + const subject = new ConnectionParameters({ host: ['host1', 'host2', 'host3'] }) + assert.deepStrictEqual(subject.host, ['host1', 'host2', 'host3']) +}) + +suite.test('host array with single element is preserved as array', function () { + const subject = new ConnectionParameters({ host: ['localhost'] }) + assert.deepStrictEqual(subject.host, ['localhost']) +}) + +// --- multihost + multiport together --- + +suite.test('host and port arrays are both passed through', function () { + const subject = new ConnectionParameters({ host: ['h1', 'h2'], port: [5432, 5433] }) + assert.deepStrictEqual(subject.host, ['h1', 'h2']) + assert.deepStrictEqual(subject.port, [5432, 5433]) +}) + +// --- isDomainSocket must stay false for array hosts --- + +suite.test('isDomainSocket is false when host is an array', function () { + const subject = new ConnectionParameters({ host: ['/tmp/', 'localhost'] }) + assert.strictEqual(subject.isDomainSocket, false) +}) diff --git a/packages/pg/test/unit/connection/multihost-tests.js b/packages/pg/test/unit/connection/multihost-tests.js new file mode 100644 index 000000000..b21071f00 --- /dev/null +++ b/packages/pg/test/unit/connection/multihost-tests.js @@ -0,0 +1,425 @@ +'use strict' +const helper = require('./test-helper') +const Connection = require('../../../lib/connection') +const assert = require('assert') + +const suite = new helper.Suite() +const { MemoryStream } = helper + +function makeStream() { + const stream = new MemoryStream() + stream.destroy = function () {} + return stream +} + +function simulateReadyForQuery(con, params) { + for (const [key, value] of Object.entries(params)) { + con.emit('parameterStatus', { parameterName: key, parameterValue: value }) + } + con.emit('readyForQuery', {}) +} + +// --- Basic multihost connectivity --- + +suite.test('connects to single host', function (done) { + const stream = makeStream() + let connectPort, connectHost + stream.connect = function (port, host) { + connectPort = port + connectHost = host + } + const con = new Connection({ stream: stream }) + con.once('connect', function () { + assert.equal(connectPort, 5432) + assert.equal(connectHost, 'localhost') + done() + }) + con.connect(5432, 'localhost') + stream.emit('connect') +}) + +suite.test('connects to first host when multiple are given', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const connectCalls = [] + streams.forEach((s) => { + s.connect = function (port, host) { + connectCalls.push({ port, host }) + } + }) + const con = new Connection({ stream: () => streams[streamIndex++] }) + con.once('connect', function () { + assert.equal(connectCalls.length, 1) + assert.equal(connectCalls[0].host, 'host1') + assert.equal(connectCalls[0].port, 5432) + done() + }) + con.connect([5432, 5433], ['host1', 'host2']) + streams[0].emit('connect') +}) + +suite.test('stream factory receives same config on failover streams', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const factoryArgs = [] + const config = { + ssl: false, + stream: function (opts) { + factoryArgs.push(opts) + return streams[streamIndex++] + }, + } + const con = new Connection(config) + con.once('connect', function () { + assert.equal(factoryArgs.length, 2) + assert.strictEqual(factoryArgs[0], config) + assert.strictEqual(factoryArgs[1], config) + done() + }) + con.connect([5432, 5433], ['host1', 'host2']) + const err = new Error('Connection refused') + err.code = 'ECONNREFUSED' + streams[0].emit('error', err) + streams[1].emit('connect') +}) + +suite.test('falls back to second host on connection error', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const connectCalls = [] + streams.forEach((s) => { + s.connect = function (port, host) { + connectCalls.push({ port, host }) + } + }) + const con = new Connection({ stream: () => streams[streamIndex++] }) + con.once('connect', function () { + assert.equal(connectCalls.length, 2) + assert.equal(connectCalls[0].host, 'host1') + assert.equal(connectCalls[1].host, 'host2') + done() + }) + con.connect([5432, 5433], ['host1', 'host2']) + const err = new Error('Connection refused') + err.code = 'ECONNREFUSED' + streams[0].emit('error', err) + streams[1].emit('connect') +}) + +suite.test('uses matching port for each host by index', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const connectCalls = [] + streams.forEach((s) => { + s.connect = function (port, host) { + connectCalls.push({ port, host }) + } + }) + const con = new Connection({ stream: () => streams[streamIndex++] }) + con.once('connect', function () { + assert.equal(connectCalls[0].port, 5432) + assert.equal(connectCalls[1].port, 5433) + done() + }) + con.connect([5432, 5433], ['host1', 'host2']) + const err = new Error('Connection refused') + err.code = 'ECONNREFUSED' + streams[0].emit('error', err) + streams[1].emit('connect') +}) + +suite.test('reuses single port for all hosts when port is not an array', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const connectPorts = [] + streams.forEach((s) => { + s.connect = function (port) { + connectPorts.push(port) + } + }) + const con = new Connection({ stream: () => streams[streamIndex++] }) + con.once('connect', function () { + assert.equal(connectPorts[0], 5432) + assert.equal(connectPorts[1], 5432) + done() + }) + con.connect(5432, ['host1', 'host2']) + const err = new Error('Connection refused') + err.code = 'ECONNREFUSED' + streams[0].emit('error', err) + streams[1].emit('connect') +}) + +suite.test('emits error after all hosts fail', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const con = new Connection({ stream: () => streams[streamIndex++] }) + assert.emits(con, 'error', function () { + done() + }) + con.connect([5432, 5433], ['host1', 'host2']) + const err1 = new Error('Connection refused') + err1.code = 'ECONNREFUSED' + streams[0].emit('error', err1) + const err2 = new Error('Connection refused') + err2.code = 'ECONNREFUSED' + streams[1].emit('error', err2) +}) + +suite.test('does not fall back after successful connect', function (done) { + const stream = makeStream() + const con = new Connection({ stream: stream }) + con.once('connect', function () { + assert.emits(con, 'error', function (err) { + assert.equal(err.code, 'ECONNRESET') + done() + }) + const err = new Error('Connection reset') + err.code = 'ECONNRESET' + stream.emit('error', err) + }) + con.connect([5432, 5433], ['host1', 'host2']) + stream.emit('connect') +}) + +// --- targetSessionAttrs --- + +suite.test('targetSessionAttrs=any does not intercept readyForQuery', function (done) { + const stream = makeStream() + const con = new Connection({ targetSessionAttrs: 'any', stream: stream }) + con.once('readyForQuery', function () { + done() + }) + con.connect(5432, 'localhost') + stream.emit('connect') + con.emit('readyForQuery', {}) +}) + +suite.test('targetSessionAttrs=read-write skips hot standby and uses primary', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const con = new Connection({ + targetSessionAttrs: 'read-write', + stream: () => streams[streamIndex++], + }) + con.once('readyForQuery', function () { + assert.equal(streamIndex, 2) + done() + }) + con.connect([5432, 5433], ['standby', 'primary']) + streams[0].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'on', default_transaction_read_only: 'off' }) + streams[1].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'off', default_transaction_read_only: 'off' }) +}) + +suite.test('targetSessionAttrs=read-write skips read-only and uses writable', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const con = new Connection({ + targetSessionAttrs: 'read-write', + stream: () => streams[streamIndex++], + }) + con.once('readyForQuery', function () { + done() + }) + con.connect([5432, 5433], ['readonly', 'writable']) + streams[0].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'off', default_transaction_read_only: 'on' }) + streams[1].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'off', default_transaction_read_only: 'off' }) +}) + +suite.test('targetSessionAttrs=read-only skips primary and uses standby', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const con = new Connection({ + targetSessionAttrs: 'read-only', + stream: () => streams[streamIndex++], + }) + con.once('readyForQuery', function () { + done() + }) + con.connect([5432, 5433], ['primary', 'standby']) + streams[0].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'off', default_transaction_read_only: 'off' }) + streams[1].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'on', default_transaction_read_only: 'on' }) +}) + +suite.test('targetSessionAttrs=primary skips standby and uses primary', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const con = new Connection({ + targetSessionAttrs: 'primary', + stream: () => streams[streamIndex++], + }) + con.once('readyForQuery', function () { + done() + }) + con.connect([5432, 5433], ['standby', 'primary']) + streams[0].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'on', default_transaction_read_only: 'on' }) + streams[1].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'off', default_transaction_read_only: 'off' }) +}) + +suite.test('targetSessionAttrs=standby skips primary and uses hot standby', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const con = new Connection({ + targetSessionAttrs: 'standby', + stream: () => streams[streamIndex++], + }) + con.once('readyForQuery', function () { + done() + }) + con.connect([5432, 5433], ['primary', 'standby']) + streams[0].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'off', default_transaction_read_only: 'off' }) + streams[1].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'on', default_transaction_read_only: 'on' }) +}) + +suite.test('targetSessionAttrs=prefer-standby uses standby when available', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const con = new Connection({ + targetSessionAttrs: 'prefer-standby', + stream: () => streams[streamIndex++], + }) + con.once('readyForQuery', function () { + assert.equal(streamIndex, 2) + done() + }) + con.connect([5432, 5433], ['primary', 'standby']) + streams[0].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'off', default_transaction_read_only: 'off' }) + streams[1].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'on', default_transaction_read_only: 'on' }) +}) + +suite.test('targetSessionAttrs=prefer-standby falls back to primary when no standby available', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const con = new Connection({ + targetSessionAttrs: 'prefer-standby', + stream: () => streams[streamIndex++], + }) + con.once('readyForQuery', function () { + done() + }) + con.connect([5432, 5433], ['primary1', 'primary2']) + streams[0].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'off', default_transaction_read_only: 'off' }) + streams[1].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'off', default_transaction_read_only: 'off' }) +}) + +suite.test('emits error when no host satisfies targetSessionAttrs', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const con = new Connection({ + targetSessionAttrs: 'read-write', + stream: () => streams[streamIndex++], + }) + assert.emits(con, 'error', function (err) { + assert.ok(err.message.includes('read-write')) + done() + }) + con.connect([5432, 5433], ['standby1', 'standby2']) + streams[0].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'on', default_transaction_read_only: 'off' }) + streams[1].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'on', default_transaction_read_only: 'off' }) +}) + +suite.test('resets backend params between hosts when checking targetSessionAttrs', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const con = new Connection({ + targetSessionAttrs: 'primary', + stream: () => streams[streamIndex++], + }) + con.once('readyForQuery', function () { + done() + }) + con.connect([5432, 5433], ['standby', 'primary']) + streams[0].emit('connect') + // standby sends in_hot_standby=on → skip + simulateReadyForQuery(con, { in_hot_standby: 'on', default_transaction_read_only: 'on' }) + streams[1].emit('connect') + // primary must send its OWN params (not leftover from standby) + simulateReadyForQuery(con, { in_hot_standby: 'off', default_transaction_read_only: 'off' }) +}) + +suite.test('fetches session state via SHOW query when not provided in ParameterStatus', function (done) { + const stream = makeStream() + const con = new Connection({ + targetSessionAttrs: 'read-write', + stream: stream, + }) + con.once('readyForQuery', function () { + done() + }) + con.connect(5432, 'localhost') + stream.emit('connect') + // Emit readyForQuery without prior parameterStatus – triggers fetchingState + con.emit('readyForQuery', {}) + // Simulate results of SHOW transaction_read_only; SELECT pg_is_in_recovery() + con.emit('dataRow', { fields: [Buffer.from('off')] }) // transaction_read_only + con.emit('dataRow', { fields: [Buffer.from('f')] }) // pg_is_in_recovery + // Second readyForQuery (after the SHOW query) triggers the decision + con.emit('readyForQuery', {}) +}) + +suite.test('tries next host when SHOW query returns standby state', function (done) { + let streamIndex = 0 + const streams = [makeStream(), makeStream()] + const con = new Connection({ + targetSessionAttrs: 'read-write', + stream: () => streams[streamIndex++], + }) + con.once('readyForQuery', function () { + assert.equal(streamIndex, 2) + done() + }) + con.connect([5432, 5433], ['standby', 'primary']) + streams[0].emit('connect') + // No parameterStatus → triggers fetch + con.emit('readyForQuery', {}) + // SHOW results indicate standby (transaction_read_only=on) + con.emit('dataRow', { fields: [Buffer.from('on')] }) + con.emit('dataRow', { fields: [Buffer.from('t')] }) + con.emit('readyForQuery', {}) + // Now on primary + streams[1].emit('connect') + simulateReadyForQuery(con, { in_hot_standby: 'off', default_transaction_read_only: 'off' }) +}) + +suite.test('swallows rowDescription and commandComplete during SHOW fetch', function (done) { + const stream = makeStream() + const con = new Connection({ + targetSessionAttrs: 'read-write', + stream: stream, + }) + const unexpectedEvents = [] + for (const evt of ['rowDescription', 'commandComplete']) { + con.on(evt, function () { + unexpectedEvents.push(evt) + }) + } + con.once('readyForQuery', function () { + assert.equal(unexpectedEvents.length, 0) + done() + }) + con.connect(5432, 'localhost') + stream.emit('connect') + con.emit('readyForQuery', {}) + // Protocol events during fetch are suppressed + con.emit('rowDescription', {}) + con.emit('commandComplete', {}) + con.emit('dataRow', { fields: [Buffer.from('off')] }) + con.emit('dataRow', { fields: [Buffer.from('f')] }) + con.emit('readyForQuery', {}) +})