diff --git a/packages/vertica-nodejs/lib/client.js b/packages/vertica-nodejs/lib/client.js index 63d6819f..de4410c5 100644 --- a/packages/vertica-nodejs/lib/client.js +++ b/packages/vertica-nodejs/lib/client.js @@ -29,7 +29,8 @@ var Connection = require('./connection') class Client extends EventEmitter { constructor(config) { super() - + console.log(' Client Class') + console.log(config) this.connectionParameters = new ConnectionParameters(config) this.user = this.connectionParameters.user this.database = this.connectionParameters.database @@ -52,7 +53,7 @@ class Client extends EventEmitter { value: this.connectionParameters.oauth_access_token, }) - this.protocol_version = this.connectionParameters.protocol_version; + this.protocol_version = this.connectionParameters.protocol_version var c = config || {} @@ -81,7 +82,7 @@ class Client extends EventEmitter { this.processID = null this.secretKey = null this.tls_config = this.connectionParameters.tls_config - this.tls_mode = this.connectionParameters.tls_mode || 'prefer' + this.tls_mode = this.connectionParameters.tls_mode || 'disable' this.tls_trusted_certs = this.connectionParameters.tls_trusted_certs this._connectionTimeoutMillis = c.connectionTimeoutMillis || 0 this.workload = this.connectionParameters.workload @@ -89,7 +90,6 @@ class Client extends EventEmitter { delete this.connectionParameters.tls_config delete this.connectionParameters.tls_mode delete this.connectionParameters.tls_trusted_certs - } _errorAllQueries(err) { @@ -131,7 +131,11 @@ class Client extends EventEmitter { .map((addr) => addr.address) this._shuffleAddresses(addresses) - resolve(resolvedAddresses.map((addr) => { return { host: addr, port: node.port } })) + resolve( + resolvedAddresses.map((addr) => { + return { host: addr, port: node.port } + }) + ) }) }) } @@ -139,6 +143,8 @@ class Client extends EventEmitter { // Round robin connections iterate through each node in host + backup_server_nodes // For each node, resolve the host to a list of addresses, shuffle the addresses, then try each address async _roundRobinConnect(nodes, addresses, error) { + // Update remaining nodes for failover tracking + this._remainingNodes = nodes if (addresses.length > 0) { // There are resolved addresses we haven't tried yet, so try the next address await this._connectToNextAddress(nodes, addresses) @@ -146,20 +152,24 @@ class Client extends EventEmitter { // There are no more resolved addresses for the current node, so resolve the host for the next node var node = nodes.shift() await this._resolveHost(node) - .then((async (shuffled_addresses) => { - if (shuffled_addresses.length > 0) { - await this._connectToNextAddress(nodes, shuffled_addresses) - } else { - var err = new Error("Could not resolve host " + node.host) + .then( + (async (shuffled_addresses) => { + if (shuffled_addresses.length > 0) { + await this._connectToNextAddress(nodes, shuffled_addresses) + } else { + var err = new Error('Could not resolve host ' + node.host) + await this._roundRobinConnect(nodes, addresses, err) + } + }).bind(this) + ) + .catch( + (async (err) => { await this._roundRobinConnect(nodes, addresses, err) - } - }).bind(this)) - .catch((async (err) => { - await this._roundRobinConnect(nodes, addresses, err) - }).bind(this)) + }).bind(this) + ) } else { if (!error) { - error = new Error("Fatal error: Node list was empty") + error = new Error('Fatal error: Node list was empty') } // No more nodes to try, so handle connection error @@ -179,6 +189,8 @@ class Client extends EventEmitter { var self = this var con = this.connection this.connectionTimeoutHandle + // Reset connection error flag before attempting next connection + this._connectionError = false if (this._connectionTimeoutMillis > 0) { this.connectionTimeoutHandle = setTimeout(() => { con._ending = true @@ -191,11 +203,18 @@ class Client extends EventEmitter { if (address.host && address.host.indexOf('/') === 0) { con.connect(address.host + '/.s.PGSQL.' + address.port) } else { + //console.log(this) + console.log('------------------- 196') + //console.log(con) + con.tls_host = address.host + this.host = address.host + // this.connectionParameters.host = address.host; + //new ConnectionParameters({host:address.host }) con.connect(address.port, address.host) } // once connection is established send startup message - con.on('connect', function () { + con.once('connect', function () { // SSLRequest Message if (self.tls_mode !== 'disable' || self.tls_config !== undefined) { con.requestSsl() @@ -204,10 +223,15 @@ class Client extends EventEmitter { } }) - con.on('sslconnect', function () { + con.once('sslconnect', function () { con.startup(self.getStartupConf()) }) + // Remove old listeners to prevent stacking when retrying connections + con.removeAllListeners('connect') + con.removeAllListeners('sslconnect') + con.removeAllListeners('end') + this._attachListeners(con) con.once('end', async () => { @@ -221,6 +245,24 @@ class Client extends EventEmitter { // on this client then we have an unexpected disconnection // treat this as an error unless we've already emitted an error // during connection. + + // console.log("---------------------- 229") + // console.log(nodes, addresses) + // this.connectionParameters = new ConnectionParameters({host: addresses.host}); + // var c = {} + // this.connection =c.connection || + // new Connection({ + // stream: c.stream, + // tls_config: this.connectionParameters.tls_config, + // tls_mode: this.connectionParameters.tls_mode, + // tls_trusted_certs: this.connectionParameters.tls_trusted_certs, + // tls_host: this.connectionParameters.host, + // keepAlive: c.keepAlive || false, + // keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0, + // encoding: this.connectionParameters.client_encoding || 'utf8', + // client_label: this.connectionParameters.client_label, + // }) + await this._roundRobinConnect(nodes, addresses, error) } @@ -244,6 +286,8 @@ class Client extends EventEmitter { var nodes = this.backup_server_node // Add host and port to start of queue of nodes to try connecting to nodes.unshift({ host: this.host, port: this.port }) + // Store nodes for failover logic + this._remainingNodes = nodes await this._roundRobinConnect(nodes, [], undefined) } @@ -327,24 +371,25 @@ class Client extends EventEmitter { } _handleParameterStatus(msg) { - const min_supported_version = (3 << 16 | 5) // 3.5 + const min_supported_version = (3 << 16) | 5 // 3.5 const max_supported_version = this.connectionParameters.protocol_version // requested protocol version - switch(msg.parameterName) { + switch (msg.parameterName) { // right now we only care about the protocol_version // if we want to have the parameterStatus message update any other connection properties, add them here case 'protocol_version': // until we allow past 3.0 this won't matter because we are only supporting one protocol version // with this client right now - if (parseInt(msg.parameterValue) < min_supported_version - || parseInt(msg.parameterValue) > max_supported_version) { - + if ( + parseInt(msg.parameterValue) < min_supported_version || + parseInt(msg.parameterValue) > max_supported_version + ) { // error - throw new Error("Unsupported Protocol Version returned by Server. Connection Disallowed."); + throw new Error('Unsupported Protocol Version returned by Server. Connection Disallowed.') } this.protocol_version = parseInt(msg.parameterValue) // effective protocol version - break; + break default: - // do nothing + // do nothing } } @@ -394,7 +439,6 @@ class Client extends EventEmitter { // remove callback for proper error handling // after the connect event this._connectionCallback = null - } this.emit('connect') } @@ -416,10 +460,20 @@ class Client extends EventEmitter { } this._connectionError = true clearTimeout(this.connectionTimeoutHandle) - if (this._connectionCallback) { + + // Check if we have remaining backup nodes to try for failover + const hasMoreNodesToTry = this._remainingNodes && this._remainingNodes.length > 0 + + // Only call the callback if there are no more nodes to try + // Otherwise, let the 'end' event handler trigger the failover + if (!hasMoreNodesToTry && this._connectionCallback) { return this._connectionCallback(err) } - this.emit('error', err) + + // If we have more nodes to try, don't call callback or emit error yet + if (!hasMoreNodesToTry) { + this.emit('error', err) + } } // if we're connected and we receive an error event from the connection @@ -529,7 +583,7 @@ class Client extends EventEmitter { client_os_user_name: params.client_os_user_name, client_os_hostname: params.client_os_hostname, client_pid: params.client_pid, - binary_data_protocol: '0', // Defaults to text format '0' + binary_data_protocol: '0', // Defaults to text format '0' protocol_compat: 'VER', } @@ -651,14 +705,15 @@ class Client extends EventEmitter { if (config === null || config === undefined) { throw new TypeError('Client was passed a null or undefined query') - } + } if (typeof config.submit === 'function') { readTimeout = config.query_timeout || this.connectionParameters.query_timeout result = query = config if (typeof values === 'function') { query.callback = query.callback || values } - } else { // config is a string + } else { + // config is a string readTimeout = this.connectionParameters.query_timeout query = new Query(config, values, callback) if (!query.callback) { diff --git a/packages/vertica-nodejs/lib/type-overrides.js b/packages/vertica-nodejs/lib/type-overrides.js index b9152812..ad022bf0 100644 --- a/packages/vertica-nodejs/lib/type-overrides.js +++ b/packages/vertica-nodejs/lib/type-overrides.js @@ -17,7 +17,7 @@ const { VerticaType } = require('v-protocol') var types = require('pg-types') -// this is a 'temporary' solution allowing us to continue to use pg-types as long as we can to avoid another large +// this is a 'temporary' solution allowing us to continue to use pg-types as long as we can to avoid another large // package implementation while we get close to the 1.0 release types.setTypeParser(VerticaType.Boolean, types.getTypeParser(16, 'text')) types.setTypeParser(VerticaType.Integer, types.getTypeParser(21, 'text'))