Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 87 additions & 32 deletions packages/vertica-nodejs/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ var Connection = require('./connection')
class Client extends EventEmitter {
constructor(config) {
super()

console.log(' Client Class')
console.log(config)
this.connectionParameters = new ConnectionParameters(config)
this.user = this.connectionParameters.user
this.database = this.connectionParameters.database
Expand All @@ -52,7 +53,7 @@ class Client extends EventEmitter {
value: this.connectionParameters.oauth_access_token,
})

this.protocol_version = this.connectionParameters.protocol_version;
this.protocol_version = this.connectionParameters.protocol_version

var c = config || {}

Expand Down Expand Up @@ -81,15 +82,14 @@ class Client extends EventEmitter {
this.processID = null
this.secretKey = null
this.tls_config = this.connectionParameters.tls_config
this.tls_mode = this.connectionParameters.tls_mode || 'prefer'
this.tls_mode = this.connectionParameters.tls_mode || 'disable'
this.tls_trusted_certs = this.connectionParameters.tls_trusted_certs
this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0
this.workload = this.connectionParameters.workload

delete this.connectionParameters.tls_config
delete this.connectionParameters.tls_mode
delete this.connectionParameters.tls_trusted_certs

}

_errorAllQueries(err) {
Expand Down Expand Up @@ -131,35 +131,45 @@ class Client extends EventEmitter {
.map((addr) => addr.address)

this._shuffleAddresses(addresses)
resolve(resolvedAddresses.map((addr) => { return { host: addr, port: node.port } }))
resolve(
resolvedAddresses.map((addr) => {
return { host: addr, port: node.port }
})
)
})
})
}

// Round robin connections iterate through each node in host + backup_server_nodes
// For each node, resolve the host to a list of addresses, shuffle the addresses, then try each address
async _roundRobinConnect(nodes, addresses, error) {
// Update remaining nodes for failover tracking
this._remainingNodes = nodes
if (addresses.length > 0) {
// There are resolved addresses we haven't tried yet, so try the next address
await this._connectToNextAddress(nodes, addresses)
} else if (nodes.length > 0) {
// There are no more resolved addresses for the current node, so resolve the host for the next node
var node = nodes.shift()
await this._resolveHost(node)
.then((async (shuffled_addresses) => {
if (shuffled_addresses.length > 0) {
await this._connectToNextAddress(nodes, shuffled_addresses)
} else {
var err = new Error("Could not resolve host " + node.host)
.then(
(async (shuffled_addresses) => {
if (shuffled_addresses.length > 0) {
await this._connectToNextAddress(nodes, shuffled_addresses)
} else {
var err = new Error('Could not resolve host ' + node.host)
await this._roundRobinConnect(nodes, addresses, err)
}
}).bind(this)
)
.catch(
(async (err) => {
await this._roundRobinConnect(nodes, addresses, err)
}
}).bind(this))
.catch((async (err) => {
await this._roundRobinConnect(nodes, addresses, err)
}).bind(this))
}).bind(this)
)
} else {
if (!error) {
error = new Error("Fatal error: Node list was empty")
error = new Error('Fatal error: Node list was empty')
}

// No more nodes to try, so handle connection error
Expand All @@ -179,6 +189,8 @@ class Client extends EventEmitter {
var self = this
var con = this.connection
this.connectionTimeoutHandle
// Reset connection error flag before attempting next connection
this._connectionError = false
if (this._connectionTimeoutMillis > 0) {
this.connectionTimeoutHandle = setTimeout(() => {
con._ending = true
Expand All @@ -191,11 +203,18 @@ class Client extends EventEmitter {
if (address.host && address.host.indexOf('/') === 0) {
con.connect(address.host + '/.s.PGSQL.' + address.port)
} else {
//console.log(this)
console.log('------------------- 196')
//console.log(con)
con.tls_host = address.host
this.host = address.host
// this.connectionParameters.host = address.host;
//new ConnectionParameters({host:address.host })
con.connect(address.port, address.host)
}

// once connection is established send startup message
con.on('connect', function () {
con.once('connect', function () {
// SSLRequest Message
if (self.tls_mode !== 'disable' || self.tls_config !== undefined) {
con.requestSsl()
Expand All @@ -204,10 +223,15 @@ class Client extends EventEmitter {
}
})

con.on('sslconnect', function () {
con.once('sslconnect', function () {
con.startup(self.getStartupConf())
})

// Remove old listeners to prevent stacking when retrying connections
con.removeAllListeners('connect')
con.removeAllListeners('sslconnect')
con.removeAllListeners('end')

this._attachListeners(con)

con.once('end', async () => {
Expand All @@ -221,6 +245,24 @@ class Client extends EventEmitter {
// on this client then we have an unexpected disconnection
// treat this as an error unless we've already emitted an error
// during connection.

// console.log("---------------------- 229")
// console.log(nodes, addresses)
// this.connectionParameters = new ConnectionParameters({host: addresses.host});
// var c = {}
// this.connection =c.connection ||
// new Connection({
// stream: c.stream,
// tls_config: this.connectionParameters.tls_config,
// tls_mode: this.connectionParameters.tls_mode,
// tls_trusted_certs: this.connectionParameters.tls_trusted_certs,
// tls_host: this.connectionParameters.host,
// keepAlive: c.keepAlive || false,
// keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0,
// encoding: this.connectionParameters.client_encoding || 'utf8',
// client_label: this.connectionParameters.client_label,
// })

await this._roundRobinConnect(nodes, addresses, error)
}

Expand All @@ -244,6 +286,8 @@ class Client extends EventEmitter {
var nodes = this.backup_server_node
// Add host and port to start of queue of nodes to try connecting to
nodes.unshift({ host: this.host, port: this.port })
// Store nodes for failover logic
this._remainingNodes = nodes
await this._roundRobinConnect(nodes, [], undefined)
}

Expand Down Expand Up @@ -327,24 +371,25 @@ class Client extends EventEmitter {
}

_handleParameterStatus(msg) {
const min_supported_version = (3 << 16 | 5) // 3.5
const min_supported_version = (3 << 16) | 5 // 3.5
const max_supported_version = this.connectionParameters.protocol_version // requested protocol version
switch(msg.parameterName) {
switch (msg.parameterName) {
// right now we only care about the protocol_version
// if we want to have the parameterStatus message update any other connection properties, add them here
case 'protocol_version':
// until we allow past 3.0 this won't matter because we are only supporting one protocol version
// with this client right now
if (parseInt(msg.parameterValue) < min_supported_version
|| parseInt(msg.parameterValue) > max_supported_version) {

if (
parseInt(msg.parameterValue) < min_supported_version ||
parseInt(msg.parameterValue) > max_supported_version
) {
// error
throw new Error("Unsupported Protocol Version returned by Server. Connection Disallowed.");
throw new Error('Unsupported Protocol Version returned by Server. Connection Disallowed.')
}
this.protocol_version = parseInt(msg.parameterValue) // effective protocol version
break;
break
default:
// do nothing
// do nothing
}
}

Expand Down Expand Up @@ -394,7 +439,6 @@ class Client extends EventEmitter {
// remove callback for proper error handling
// after the connect event
this._connectionCallback = null

}
this.emit('connect')
}
Expand All @@ -416,10 +460,20 @@ class Client extends EventEmitter {
}
this._connectionError = true
clearTimeout(this.connectionTimeoutHandle)
if (this._connectionCallback) {

// Check if we have remaining backup nodes to try for failover
const hasMoreNodesToTry = this._remainingNodes && this._remainingNodes.length > 0

// Only call the callback if there are no more nodes to try
// Otherwise, let the 'end' event handler trigger the failover
if (!hasMoreNodesToTry && this._connectionCallback) {
return this._connectionCallback(err)
}
this.emit('error', err)

// If we have more nodes to try, don't call callback or emit error yet
if (!hasMoreNodesToTry) {
this.emit('error', err)
}
}

// if we're connected and we receive an error event from the connection
Expand Down Expand Up @@ -529,7 +583,7 @@ class Client extends EventEmitter {
client_os_user_name: params.client_os_user_name,
client_os_hostname: params.client_os_hostname,
client_pid: params.client_pid,
binary_data_protocol: '0', // Defaults to text format '0'
binary_data_protocol: '0', // Defaults to text format '0'
protocol_compat: 'VER',
}

Expand Down Expand Up @@ -651,14 +705,15 @@ class Client extends EventEmitter {

if (config === null || config === undefined) {
throw new TypeError('Client was passed a null or undefined query')
}
}
if (typeof config.submit === 'function') {
readTimeout = config.query_timeout || this.connectionParameters.query_timeout
result = query = config
if (typeof values === 'function') {
query.callback = query.callback || values
}
} else { // config is a string
} else {
// config is a string
readTimeout = this.connectionParameters.query_timeout
query = new Query(config, values, callback)
if (!query.callback) {
Expand Down
2 changes: 1 addition & 1 deletion packages/vertica-nodejs/lib/type-overrides.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
const { VerticaType } = require('v-protocol')
var types = require('pg-types')

// this is a 'temporary' solution allowing us to continue to use pg-types as long as we can to avoid another large
// this is a 'temporary' solution allowing us to continue to use pg-types as long as we can to avoid another large
// package implementation while we get close to the 1.0 release
types.setTypeParser(VerticaType.Boolean, types.getTypeParser(16, 'text'))
types.setTypeParser(VerticaType.Integer, types.getTypeParser(21, 'text'))
Expand Down
Loading