Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dist
/.eslintcache
.vscode/
manually-test-on-heroku.js
.history
5 changes: 3 additions & 2 deletions packages/pg/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Client extends EventEmitter {
keepAlive: c.keepAlive || false,
keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0,
encoding: this.connectionParameters.client_encoding || 'utf8',
targetSessionAttrs: c.targetSessionAttrs || null,
})
this._queryQueue = []
this.binary = c.binary || defaults.binary
Expand Down Expand Up @@ -155,7 +156,7 @@ class Client extends EventEmitter {
}
}

if (this.host && this.host.indexOf('/') === 0) {
if (!Array.isArray(this.host) && this.host && this.host.indexOf('/') === 0) {
con.connect(this.host + '/.s.PGSQL.' + this.port)
} else {
con.connect(this.port, this.host)
Expand Down Expand Up @@ -542,7 +543,7 @@ class Client extends EventEmitter {
if (client.activeQuery === query) {
const con = this.connection

if (this.host && this.host.indexOf('/') === 0) {
if (!Array.isArray(this.host) && this.host && this.host.indexOf('/') === 0) {
con.connect(this.host + '/.s.PGSQL.' + this.port)
} else {
con.connect(this.port, this.host)
Expand Down
3 changes: 2 additions & 1 deletion packages/pg/lib/connection-parameters.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class ConnectionParameters {
this.database = this.user
}

this.port = parseInt(val('port', config), 10)
const rawPort = val('port', config)
this.port = Array.isArray(rawPort) ? rawPort.map((p) => parseInt(p, 10)) : parseInt(rawPort, 10)
this.host = val('host', config)

// "hiding" the password so it doesn't show up in stack traces
Expand Down
253 changes: 200 additions & 53 deletions packages/pg/lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ class Connection extends EventEmitter {
super()
config = config || {}

this.stream = config.stream || getStream(config.ssl)
if (typeof this.stream === 'function') {
this.stream = this.stream(config)
if (typeof config.stream === 'function') {
this._streamFactory = config.stream
this._config = config
this.stream = config.stream(config)
} else {
this._streamFactory = null
this._config = null
this.stream = config.stream || getStream(config.ssl)
}

this._keepAlive = config.keepAlive
Expand All @@ -26,6 +31,7 @@ class Connection extends EventEmitter {
this.ssl = config.ssl || false
this._ending = false
this._emitMessage = false
this._targetSessionAttrs = config.targetSessionAttrs || null
const self = this
this.on('newListener', function (eventName) {
if (eventName === 'message') {
Expand All @@ -34,76 +40,200 @@ class Connection extends EventEmitter {
})
}

_newStream() {
return this._streamFactory ? this._streamFactory(this._config) : getStream(this.ssl)
}

connect(port, host) {
const self = this

const hosts = Array.isArray(host) ? host : [host]
const ports = Array.isArray(port) ? port : [port]
let hostIndex = 0

this._connecting = true
this.stream.setNoDelay(true)
this.stream.connect(port, host)

this.stream.once('connect', function () {
if (self._keepAlive) {
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
const targetAttrs = this._targetSessionAttrs

if (targetAttrs && targetAttrs !== 'any') {
let backendParams = {}
let fetchingState = false
let fetchStateRows = []
let fetchStateError = false

const origEmit = EventEmitter.prototype.emit.bind(self)

const tryNextOrFail = () => {
backendParams = {}
fetchingState = false
fetchStateRows = []
fetchStateError = false
if (hostIndex + 1 < hosts.length) {
hostIndex++

self.stream.removeAllListeners()
self.stream.destroy()
self.stream = self._newStream()
attemptConnect()
} else {
self.emit = origEmit
origEmit('error', new Error(`None of the hosts satisfy the target_session_attrs requirement: ${targetAttrs}`))
}
}
self.emit('connect')
})

const reportStreamError = function (error) {
// errors about disconnections should be ignored during disconnect
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
return
self.emit = function (eventName, ...args) {
if (eventName === 'parameterStatus') {
const msg = args[0]
if (msg) backendParams[msg.parameterName] = msg.parameterValue
return origEmit(eventName, ...args)
}

if (fetchingState) {
if (eventName === 'dataRow') {
fetchStateRows.push(args[0])
return
}
if (eventName === 'rowDescription' || eventName === 'commandComplete') {
return
}
if (eventName === 'errorMessage') {
fetchStateError = true
return
}
if (eventName === 'readyForQuery') {
fetchingState = false
if (!fetchStateError && fetchStateRows.length >= 2) {
const txReadOnly = fetchStateRows[0].fields[0]?.toString('utf8') ?? null
const isRecovery = fetchStateRows[1].fields[0]?.toString('utf8') ?? null
if (txReadOnly !== null) backendParams.default_transaction_read_only = txReadOnly
if (isRecovery !== null) backendParams.in_hot_standby = isRecovery === 't' ? 'on' : 'off'
}
fetchStateRows = []
fetchStateError = false
if (notHostMatchTargetSessionAttrs(targetAttrs, backendParams, hostIndex, hosts)) {
tryNextOrFail()
} else {
self.emit = origEmit
origEmit('readyForQuery', args[0])
}
return
}
}

if (eventName === 'readyForQuery') {
if (!backendParams.in_hot_standby || !backendParams.default_transaction_read_only) {
fetchingState = true
fetchStateRows = []
self.query('SHOW transaction_read_only; SELECT pg_catalog.pg_is_in_recovery()')
return
}
if (notHostMatchTargetSessionAttrs(targetAttrs, backendParams, hostIndex, hosts)) {
tryNextOrFail()
return
}
self.emit = origEmit
return origEmit('readyForQuery', args[0])
}

return origEmit(eventName, ...args)
}
self.emit('error', error)
}
this.stream.on('error', reportStreamError)

this.stream.on('close', function () {
self.emit('end')
})
const attemptConnect = () => {
const currentHost = hosts[hostIndex]
const currentPort = ports[Math.min(hostIndex, ports.length - 1)]
let connected = false

if (!this.ssl) {
return this.attachListeners(this.stream)
}
self.stream.setNoDelay(true)
self.stream.connect(currentPort, currentHost)

this.stream.once('data', function (buffer) {
const responseCode = buffer.toString('utf8')
switch (responseCode) {
case 'S': // Server supports SSL connections, continue with a secure connection
break
case 'N': // Server does not support SSL connections
self.stream.end()
return self.emit('error', new Error('The server does not support SSL connections'))
default:
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
self.stream.end()
return self.emit('error', new Error('There was an error establishing an SSL connection'))
}
const options = {
socket: self.stream,
}
self.stream.once('connect', function () {
connected = true
if (self._keepAlive) {
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis)
}
self.emit('connect')
})

if (self.ssl !== true) {
Object.assign(options, self.ssl)
const reportStreamError = function (error) {
// errors about disconnections should be ignored during disconnect
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) {
return
}

if ('key' in self.ssl) {
options.key = self.ssl.key
if (!connected && hostIndex + 1 < hosts.length) {
hostIndex++
self.stream.removeAllListeners()
self.stream.destroy()
self.stream = self._newStream()
attemptConnect()
return
}
self.emit('error', error)
}

const net = require('net')
if (net.isIP && net.isIP(host) === 0) {
options.servername = host
self.stream.on('error', reportStreamError)

const onClose = function () {
self.emit('end')
}
try {
self.stream = getSecureStream(options)
} catch (err) {
return self.emit('error', err)
self.stream.on('close', onClose)

if (!self.ssl) {
return self.attachListeners(self.stream)
}
self.attachListeners(self.stream)
self.stream.on('error', reportStreamError)

self.emit('sslconnect')
})
self.stream.once('data', function (buffer) {
const responseCode = buffer.toString('utf8')
switch (responseCode) {
case 'S': // Server supports SSL connections, continue with a secure connection
break
case 'N': // Server does not support SSL connections
self.stream.end()
return self.emit('error', new Error('The server does not support SSL connections'))
default:
// Any other response byte, including 'E' (ErrorResponse) indicating a server error
self.stream.end()
return self.emit('error', new Error('There was an error establishing an SSL connection'))
}
const options = {
socket: self.stream,
}

if (self.ssl !== true) {
Object.assign(options, self.ssl)

if ('key' in self.ssl) {
options.key = self.ssl.key
}
}

const net = require('net')
if (net.isIP && net.isIP(currentHost) === 0) {
options.servername = currentHost
}

// Remove the close listener from the TCP socket before upgrading to TLS.
// Without this, destroying the TLS stream (during host failover) closes the
// underlying TCP socket, which fires 'close' → 'end' even though we are
// still mid-connection to the next host.
const tcpStream = self.stream
tcpStream.removeListener('close', onClose)
tcpStream.removeListener('error', reportStreamError)
try {
self.stream = getSecureStream(options)
} catch (err) {
return self.emit('error', err)
}
self.attachListeners(self.stream)
self.stream.on('error', reportStreamError)
self.stream.on('close', onClose)

self.emit('sslconnect')
})
}

attemptConnect()
}

attachListeners(stream) {
Expand Down Expand Up @@ -218,4 +348,21 @@ class Connection extends EventEmitter {
}
}

function notHostMatchTargetSessionAttrs(targetAttrs, params, hostIndex, hosts) {
switch (targetAttrs) {
case 'read-write':
return params.in_hot_standby === 'on' || params.default_transaction_read_only === 'on'
case 'read-only':
return params.in_hot_standby !== 'on' && params.default_transaction_read_only !== 'on'
case 'primary':
return params.in_hot_standby === 'on'
case 'standby':
return params.in_hot_standby === 'off'
case 'prefer-standby':
return params.in_hot_standby === 'off' && hostIndex + 1 < hosts.length
default:
return false
}
}

module.exports = Connection
Loading