diff --git a/index.d.ts b/index.d.ts index b418f13..a23c1f1 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,8 +1,8 @@ /// /// -import { EventEmitter } from "events"; -import { StrictEventEmitter } from "strict-event-emitter-types"; +import { EventEmitter } from 'events'; +import { StrictEventEmitter } from 'strict-event-emitter-types'; // Expose classes export class Position { @@ -380,6 +380,7 @@ export interface ConnectionSettings { // Cluster Settings clusterDns?: string, maxDiscoverAttempts?: number, + discoverDelay?: number, externalGossipPort?: number, gossipTimeout?: number } diff --git a/package.json b/package.json index 0d586f4..be0b597 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ "pretest": "npm run build", "test": "nodeunit", "test-debug": "TESTS_VERBOSE_LOGGING=1 nodeunit", + "test:jest:watch": "jest --watch --coverage", "prepublishOnly": "npm run build && npm run gendocs", "gendocs": "rm -rf docs && jsdoc src -r -d docs" }, @@ -54,6 +55,7 @@ "uuid": "^3.0.1" }, "devDependencies": { + "jest": "^26.4.2", "jsdoc": "^3.6.3", "nodeunit": "^0.11.3", "webpack": "^4.41.2", diff --git a/src/common/utils/shuffle.js b/src/common/utils/shuffle.js new file mode 100644 index 0000000..5814094 --- /dev/null +++ b/src/common/utils/shuffle.js @@ -0,0 +1,25 @@ +function rndNext(min, max) { + min = Math.ceil(min); + max = Math.floor(max); + return Math.floor(Math.random() * (max - min)) + min; +} + +function shuffle (arr, from, to) { + if (!to) { + to = arr.length - 1; + } + if (!from) { + from = 0; + } + const newArr = [...arr]; + if (from >= to) return; + for (var current = from; current <= to; ++current) { + var index = rndNext(current, to + 1); + var tmp = newArr[index]; + newArr[index] = newArr[current]; + newArr[current] = tmp; + } + return newArr; +}; + +module.exports = shuffle; \ No newline at end of file diff --git a/src/core/cluster/clusterDiscoverer.js b/src/core/cluster/clusterDiscoverer.js new file mode 100644 index 0000000..281f0ae --- /dev/null +++ b/src/core/cluster/clusterDiscoverer.js @@ -0,0 +1,163 @@ +const ClusterInfo = require('./clusterInfo'); +const GossipSeed = require('../../gossipSeed'); +const NodeEndPoints = require('./nodeEndpoints'); +const shuffle = require('../../common/utils/shuffle'); + +function wait(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +/** + * ClusterDiscoverer + * @constructor + * @class + * @param {Logger} log - Logger instance + * @param {Object} settings - Settings object + * @param {Object} dnsService - DNS service to perform DNS lookup + * @param {Object} httpService - HTTP service to perform http requests + */ +function ClusterDiscoverer(log, settings, dnsService, httpService) { + if (!settings.clusterDns && (!settings.seeds || settings.seeds.length === 0)) + throw new Error('Both clusterDns and seeds are null/empty.'); + this._log = log; + + this._settings = settings; + this._dnsService = dnsService; + this._httpService = httpService; +} + +/** + * Discover Cluster endpoints + * @param {Object} failedTcpEndPoint - The failed TCP endpoint which were used by the handler + * @returns {Promise.} + */ +ClusterDiscoverer.prototype.discover = async function (failedTcpEndPoint) { + let attempts = 0; + while (attempts++ < this._settings.maxDiscoverAttempts) { + try { + const candidates = await this._getGossipCandidates(this._settings.managerExternalHttpPort); + const gossipSeeds = candidates.filter( + (candidate) => + !failedTcpEndPoint || + !(candidate.endPoint.host === failedTcpEndPoint.host && candidate.endPoint.port === failedTcpEndPoint.port) + ); + let gossipSeedsIndex = 0; + let clusterInfo; + do { + try { + clusterInfo = await this._clusterInfo(gossipSeeds[gossipSeedsIndex], this._settings.gossipTimeout); + if (!clusterInfo.bestNode) { + this._log.info( + `Discovering attempt ${attempts}/${this._settings.maxDiscoverAttempts} failed: no candidate found.` + ); + continue; + } + } catch (err) {} + } while (++gossipSeedsIndex < gossipSeeds.length); + if (clusterInfo) { + return NodeEndPoints.createFromGossipMember(clusterInfo.bestNode); + } + } catch (err) { + this._log.info( + `Discovering attempt ${attempts}/${this._settings.maxDiscoverAttempts} failed with error: ${err}.\n${err.stack}` + ); + } + await wait(this._settings.discoverDelay); + } + throw new Error(`Failed to discover candidate in ${this._settings.maxDiscoverAttempts} attempts.`); +}; + +/** + * Get gossip candidates either from DNS or from gossipSeeds settings + * @private + * @param {Number} managerExternalHttpPort - Http port of the manager (or the http port of the node for OSS clusters) + * @returns {Promise.} + */ +ClusterDiscoverer.prototype._getGossipCandidates = async function (managerExternalHttpPort) { + const gossipSeeds = + this._settings.seeds && this._settings.seeds.length > 0 + ? this._settings.seeds + : (await this._resolveDns(this._settings.clusterDns)).map( + (address) => new GossipSeed({ host: address, port: managerExternalHttpPort }, undefined) + ); + return shuffle(gossipSeeds); +}; + +/** + * Resolve the cluster DNS discovery address to retrieve belonging ip addresses + * @private + * @param {String} clusterDns - Cluster DNS discovery address + * @returns {Promise.} + */ +ClusterDiscoverer.prototype._resolveDns = async function (clusterDns) { + const dnsOptions = { + family: 4, + hints: this._dnsService.ADDRCONFIG | this._dnsService.V4MAPPED, + all: true, + }; + const result = await this._dnsService.lookup(clusterDns, dnsOptions); + if (!result || result.length === 0) { + throw new Error(`No result from dns lookup for ${clusterDns}`); + } + return result.map((address) => address.address); +}; + +/** + * Get cluster informations (gossip members) + * @param {GossipSeed} candidate - candidate to get informations from + * @param {Number} timeout - timeout for the http request + * @returns {Promise.} + */ +ClusterDiscoverer.prototype._clusterInfo = async function (candidate, timeout) { + return new Promise((resolve, reject) => { + const options = { + host: candidate.endPoint.host, + port: candidate.endPoint.port, + path: '/gossip?format=json', + timeout: timeout, + }; + if (candidate.hostHeader) { + options.headers = { + Host: candidate.hostHeader, + }; + } + + const request = this._httpService.request(options, (res) => { + if (res.statusCode !== 200) { + this._log.info('Trying to get gossip from', candidate, 'failed with status code:', res.statusCode); + reject(new Error(`Gossip candidate returns a ${res.statusCode} error`)); + return; + } + let result = ''; + res.on('data', (chunk) => { + result += chunk.toString(); + }); + res.on('end', function () { + try { + result = JSON.parse(result); + } catch (e) { + reject(new Error('Unable to parse gossip response')); + } + resolve(new ClusterInfo(result.members)); + }); + }); + + request.setTimeout(timeout); + + request.on('timeout', () => { + this._log.info('Trying to get gossip from', candidate, 'timed out.'); + request.destroy(); + reject(new Error('Connection to gossip timed out')); + }); + + request.on('error', (error) => { + this._log.info('Trying to get gossip from', candidate, 'errored', error); + request.destroy(); + reject(new Error('Connection to gossip errored')); + }); + + request.end(); + }); +}; + +module.exports = ClusterDiscoverer; diff --git a/src/core/cluster/clusterInfo.js b/src/core/cluster/clusterInfo.js new file mode 100644 index 0000000..38c8db5 --- /dev/null +++ b/src/core/cluster/clusterInfo.js @@ -0,0 +1,36 @@ +const MemberInfo = require('./memberInfo.js'); + +const VNodeStates = Object.freeze({ + Initializing: 0, + Unknown: 1, + PreReplica: 2, + CatchingUp: 3, + Clone: 4, + Slave: 5, + PreMaster: 6, + Master: 7, + Manager: 8, + ShuttingDown: 9, + Shutdown: 10 +}); + +function ClusterInfo(members) { + this._members = members.map(member => new MemberInfo(member)); + + Object.defineProperty(this, 'bestNode', { + enumerable: true, + get: function () { + return this._getBestNode(); + } + }); +} + +ClusterInfo.prototype._getBestNode = function () { + return this._members + .filter(member => member.isAlive && member.isAllowedToConnect) + .sort(function (a, b) { + return VNodeStates[b.state] - VNodeStates[a.state]; + })[0]; +} + +module.exports = ClusterInfo; \ No newline at end of file diff --git a/src/core/cluster/memberInfo.js b/src/core/cluster/memberInfo.js new file mode 100644 index 0000000..d376f76 --- /dev/null +++ b/src/core/cluster/memberInfo.js @@ -0,0 +1,73 @@ +const NOT_ALLOWED_STATES = [ + 'Manager', + 'ShuttingDown', + 'Shutdown' +]; + +function MemberInfo(informations) { + this._instanceId = informations.instanceId; + this._timeStamp = informations.timeStamp; + this._state = informations.state; + this._isAlive = informations.isAlive; + this._internalTcpIp = informations.internalTcpIp; + this._internalTcpPort = informations.internalTcpPort; + this._internalSecureTcpPort = informations.internalSecureTcpPort; + this._externalTcpIp = informations.externalTcpIp; + this._externalTcpPort = informations.externalTcpPort; + this._externalSecureTcpPort = informations.externalSecureTcpPort; + this._internalHttpIp = informations.internalHttpIp; + this._internalHttpPort = informations.internalHttpPort; + this._externalHttpIp = informations.externalHttpIp; + this._externalHttpPort = informations.externalHttpPort; + this._lastCommitPosition = informations.lastCommitPosition; + this._writerCheckpoint = informations.writerCheckpoint; + this._chaserCheckpoint = informations.chaserCheckpoint; + this._epochPosition = informations.epochPosition; + this._epochNumber = informations.epochNumber; + this._epochId = informations.epochId; + this._nodePriority = informations.nodePriority; + + Object.defineProperty(this, 'state', { + enumerable: true, + get: function () { + return this._state; + } + }); + + Object.defineProperty(this, 'isAllowedToConnect', { + enumerable: true, + get: function () { + return !NOT_ALLOWED_STATES.includes(this._state); + } + }); + + Object.defineProperty(this, 'isAlive', { + enumerable: true, + get: function () { + return this._isAlive; + } + }); + + Object.defineProperty(this, 'externalTcpIp', { + enumerable: true, + get: function () { + return this._externalTcpIp; + } + }); + + Object.defineProperty(this, 'externalTcpPort', { + enumerable: true, + get: function () { + return this._externalTcpPort; + } + }); + + Object.defineProperty(this, 'externalSecureTcpPort', { + enumerable: true, + get: function () { + return this._externalSecureTcpPort; + } + }); +} + +module.exports = MemberInfo; \ No newline at end of file diff --git a/src/core/cluster/nodeEndpoints.js b/src/core/cluster/nodeEndpoints.js new file mode 100644 index 0000000..c3e95da --- /dev/null +++ b/src/core/cluster/nodeEndpoints.js @@ -0,0 +1,23 @@ +function NodeEndPoints(tcpEndPoint, secureTcpEndPoint) { + if (tcpEndPoint === null && secureTcpEndPoint === null) throw new Error('Both endpoints are null.'); + Object.defineProperties(this, { + tcpEndPoint: { + enumerable: true, + value: tcpEndPoint + }, + secureTcpEndPoint: { + enumerable: true, + value: secureTcpEndPoint + } + }); +} + +NodeEndPoints.createFromGossipMember = function (member) { + const normTcp = { host: member.externalTcpIp, port: member.externalTcpPort }; + const secTcp = member.externalSecureTcpPort > 0 + ? { host: member.externalTcpIp, port: member.externalSecureTcpPort } + : null; + return new NodeEndPoints(normTcp, secTcp); +} + +module.exports = NodeEndPoints \ No newline at end of file diff --git a/src/eventStoreConnection.js b/src/eventStoreConnection.js index e3f93c0..7b2abc6 100644 --- a/src/eventStoreConnection.js +++ b/src/eventStoreConnection.js @@ -1,9 +1,19 @@ var EventStoreNodeConnection = require('./eventStoreNodeConnection'); var StaticEndpointDiscoverer = require('./core/staticEndpointDiscoverer'); -var ClusterDnsEndPointDiscoverer = require('./core/clusterDnsEndPointDiscoverer'); +var ClusterDiscoverer = require('./core/cluster/clusterDiscoverer'); var NoopLogger = require('./common/log/noopLogger'); var ensure = require('./common/utils/ensure'); +const util = require('util'); +const http = require('http'); +const dns = require('dns'); + +const dnsService = { + lookup : util.promisify(dns.lookup), + ADDRCONFIG: dns.ADDRCONFIG, + V4MAPPED: dns.V4MAPPED +}; + var defaultConnectionSettings = Object.freeze({ log: new NoopLogger(), verboseLogging: false, @@ -32,6 +42,7 @@ var defaultConnectionSettings = Object.freeze({ // Cluster Settings clusterDns: '', maxDiscoverAttempts: 10, + discoverDelay: 500, externalGossipPort: 0, gossipTimeout: 1000 }); @@ -80,17 +91,17 @@ function createFromClusterDns(connectionSettings, clusterDns, externalGossipPort var mergedSettings = merge(defaultConnectionSettings, connectionSettings || {}); var clusterSettings = { clusterDns: clusterDns, - gossipSeeds: null, - externalGossipPort: externalGossipPort, + seeds: null, + managerExternalHttpPort: externalGossipPort, maxDiscoverAttempts: mergedSettings.maxDiscoverAttempts, + discoverDelay: mergedSettings.discoverDelay, gossipTimeout: mergedSettings.gossipTimeout }; - var endPointDiscoverer = new ClusterDnsEndPointDiscoverer(mergedSettings.log, - clusterSettings.clusterDns, - clusterSettings.maxDiscoverAttempts, - clusterSettings.externalGossipPort, - clusterSettings.gossipSeeds, - clusterSettings.gossipTimeout + var endPointDiscoverer = new ClusterDiscoverer( + mergedSettings.log, + clusterSettings, + dnsService, + http ); return new EventStoreNodeConnection(mergedSettings, clusterSettings, endPointDiscoverer, connectionName); } @@ -101,17 +112,17 @@ function createFromGossipSeeds(connectionSettings, gossipSeeds, connectionName) var mergedSettings = merge(defaultConnectionSettings, connectionSettings || {}); var clusterSettings = { clusterDns: '', - gossipSeeds: gossipSeeds, + seeds: gossipSeeds, externalGossipPort: 0, maxDiscoverAttempts: mergedSettings.maxDiscoverAttempts, + discoverDelay: mergedSettings.discoverDelay, gossipTimeout: mergedSettings.gossipTimeout }; - var endPointDiscoverer = new ClusterDnsEndPointDiscoverer(mergedSettings.log, - clusterSettings.clusterDns, - clusterSettings.maxDiscoverAttempts, - clusterSettings.externalGossipPort, - clusterSettings.gossipSeeds, - clusterSettings.gossipTimeout + var endPointDiscoverer = new ClusterDiscoverer( + mergedSettings.log, + clusterSettings, + dnsService, + http ); return new EventStoreNodeConnection(mergedSettings, clusterSettings, endPointDiscoverer, connectionName); } diff --git a/src/gossipSeed.js b/src/gossipSeed.js index 804c8e5..bd8140e 100644 --- a/src/gossipSeed.js +++ b/src/gossipSeed.js @@ -1,7 +1,8 @@ -function GossipSeed(endPoint, hostName) { +function GossipSeed(endPoint, hostName, hostHeader) { if (typeof endPoint !== 'object' || !endPoint.host || !endPoint.port) throw new TypeError('endPoint must be have host and port properties.'); this.endPoint = endPoint; this.hostName = hostName; + this.hostHeader = hostHeader; Object.freeze(this); } diff --git a/src/transport/tcp/tcpConnection.js b/src/transport/tcp/tcpConnection.js index 7c1891c..1d2ab55 100644 --- a/src/transport/tcp/tcpConnection.js +++ b/src/transport/tcp/tcpConnection.js @@ -160,4 +160,4 @@ TcpConnection.createConnectingConnection = function( return connection; }; -module.exports = TcpConnection; +module.exports = TcpConnection; \ No newline at end of file diff --git a/test/fixtures/gossip.json b/test/fixtures/gossip.json new file mode 100644 index 0000000..478ba62 --- /dev/null +++ b/test/fixtures/gossip.json @@ -0,0 +1,98 @@ +{ + "members": [ + { + "instanceId": "bb16857d-373d-4233-a175-89c917a72329", + "timeStamp": "2020-09-02T13:53:24.234898Z", + "state": "Slave", + "isAlive": false, + "internalTcpIp": "10.0.0.1", + "internalTcpPort": 1112, + "internalSecureTcpPort": 0, + "externalTcpIp": "10.0.0.1", + "externalTcpPort": 1113, + "externalSecureTcpPort": 0, + "internalHttpIp": "10.0.0.1", + "internalHttpPort": 2112, + "externalHttpIp": "10.0.0.1", + "externalHttpPort": 2113, + "lastCommitPosition": 648923382, + "writerCheckpoint": 648936339, + "chaserCheckpoint": 648936339, + "epochPosition": 551088596, + "epochNumber": 201, + "epochId": "d8f95f4b-167a-4487-9031-4d31a507e6d9", + "nodePriority": 0 + }, + { + "instanceId": "b3c18dcd-6476-467a-b7b8-d6672b74e9c2", + "timeStamp": "2020-09-02T13:56:06.189428Z", + "state": "CatchingUp", + "isAlive": true, + "internalTcpIp": "10.0.0.2", + "internalTcpPort": 1112, + "internalSecureTcpPort": 0, + "externalTcpIp": "10.0.0.2", + "externalTcpPort": 1113, + "externalSecureTcpPort": 0, + "internalHttpIp": "10.0.0.2", + "internalHttpPort": 2112, + "externalHttpIp": "10.0.0.2", + "externalHttpPort": 2113, + "lastCommitPosition": -1, + "writerCheckpoint": 0, + "chaserCheckpoint": 0, + "epochPosition": -1, + "epochNumber": -1, + "epochId": "00000000-0000-0000-0000-000000000000", + "nodePriority": 0 + }, + { + "instanceId": "e802a2b5-826c-4bd5-84d0-c9d1387fbf79", + "timeStamp": "2020-09-02T13:56:07.391534Z", + "state": "Master", + "isAlive": true, + "internalTcpIp": "10.0.0.3", + "internalTcpPort": 1112, + "internalSecureTcpPort": 0, + "externalTcpIp": "10.0.0.3", + "externalTcpPort": 1113, + "externalSecureTcpPort": 0, + "internalHttpIp": "10.0.0.3", + "internalHttpPort": 2112, + "externalHttpIp": "10.0.0.3", + "externalHttpPort": 2113, + "lastCommitPosition": 649007631, + "writerCheckpoint": 649024685, + "chaserCheckpoint": 649024685, + "epochPosition": 649023795, + "epochNumber": 202, + "epochId": "1f17695d-6558-4d8b-ba60-2ae273b11e09", + "nodePriority": 0 + }, + { + "instanceId": "24bb9031-5f21-436c-a7b5-c5f03a95e938", + "timeStamp": "2020-09-02T13:54:39.023053Z", + "state": "Slave", + "isAlive": false, + "internalTcpIp": "10.0.0.4", + "internalTcpPort": 1112, + "internalSecureTcpPort": 0, + "externalTcpIp": "10.0.0.4", + "externalTcpPort": 1113, + "externalSecureTcpPort": 0, + "internalHttpIp": "10.0.0.4", + "internalHttpPort": 2112, + "externalHttpIp": "10.0.0.4", + "externalHttpPort": 2113, + "lastCommitPosition": 649007631, + "writerCheckpoint": 649023795, + "chaserCheckpoint": 649023795, + "epochPosition": 551088596, + "epochNumber": 201, + "epochId": "d8f95f4b-167a-4487-9031-4d31a507e6d9", + "nodePriority": 0 + } + ], + "serverIp": "10.0.0.3", + "serverPort": 2112 +} diff --git a/test/unit/core/clusterDiscoverer.test.js b/test/unit/core/clusterDiscoverer.test.js new file mode 100644 index 0000000..f63a5e4 --- /dev/null +++ b/test/unit/core/clusterDiscoverer.test.js @@ -0,0 +1,693 @@ +const fs = require('fs'); +const path = require('path'); +const dns = require('dns'); + +const ClusterDiscoverer = require('../../../src/core/cluster/clusterDiscoverer'); +const ClusterInfo = require('../../../src/core/cluster/clusterInfo'); +const GossipSeed = require('../../../src/gossipSeed'); +const NodeEndPoints = require('../../../src/core/cluster/nodeEndpoints'); + +const logger = { info: () => {} }; + +describe('ClusterDiscoverer', () => { + const mockDns = { + ADDRCONFIG: dns.ADDRCONFIG, + V4MAPPED: dns.V4MAPPED, + }; + const mockHttp = {}; + const settings = { + clusterDns: 'my-discover.com:2113', + maxDiscoverAttempts: 10, + discoverDelay: 10, + managerExternalHttpPort: 2113, + seeds: null, + gossipTimeout: 1000, + }; + const tClusterInfo = new ClusterInfo([ + { + instanceId: 'bb16857d-373d-4233-a175-89c917a72329', + timeStamp: '2020-09-02T13:53:24.234898Z', + state: 'Slave', + isAlive: false, + internalTcpIp: '10.0.0.1', + internalTcpPort: 1112, + internalSecureTcpPort: 0, + externalTcpIp: '10.0.0.1', + externalTcpPort: 1113, + externalSecureTcpPort: 0, + internalHttpIp: '10.0.0.1', + internalHttpPort: 2112, + externalHttpIp: '10.0.0.1', + externalHttpPort: 2113, + lastCommitPosition: 648923382, + writerCheckpoint: 648936339, + chaserCheckpoint: 648936339, + epochPosition: 551088596, + epochNumber: 201, + epochId: 'd8f95f4b-167a-4487-9031-4d31a507e6d9', + nodePriority: 0, + }, + { + instanceId: 'b3c18dcd-6476-467a-b7b8-d6672b74e9c2', + timeStamp: '2020-09-02T13:56:06.189428Z', + state: 'CatchingUp', + isAlive: true, + internalTcpIp: '10.0.0.2', + internalTcpPort: 1112, + internalSecureTcpPort: 0, + externalTcpIp: '10.0.0.2', + externalTcpPort: 1113, + externalSecureTcpPort: 0, + internalHttpIp: '10.0.0.2', + internalHttpPort: 2112, + externalHttpIp: '10.0.0.2', + externalHttpPort: 2113, + lastCommitPosition: -1, + writerCheckpoint: 0, + chaserCheckpoint: 0, + epochPosition: -1, + epochNumber: -1, + epochId: '00000000-0000-0000-0000-000000000000', + nodePriority: 0, + }, + { + instanceId: 'e802a2b5-826c-4bd5-84d0-c9d1387fbf79', + timeStamp: '2020-09-02T13:56:07.391534Z', + state: 'Master', + isAlive: true, + internalTcpIp: '10.0.0.3', + internalTcpPort: 1112, + internalSecureTcpPort: 0, + externalTcpIp: '10.0.0.3', + externalTcpPort: 1113, + externalSecureTcpPort: 0, + internalHttpIp: '10.0.0.3', + internalHttpPort: 2112, + externalHttpIp: '10.0.0.3', + externalHttpPort: 2113, + lastCommitPosition: 649007631, + writerCheckpoint: 649024685, + chaserCheckpoint: 649024685, + epochPosition: 649023795, + epochNumber: 202, + epochId: '1f17695d-6558-4d8b-ba60-2ae273b11e09', + nodePriority: 0, + }, + { + instanceId: '24bb9031-5f21-436c-a7b5-c5f03a95e938', + timeStamp: '2020-09-02T13:54:39.023053Z', + state: 'Slave', + isAlive: false, + internalTcpIp: '10.0.0.4', + internalTcpPort: 1112, + internalSecureTcpPort: 0, + externalTcpIp: '10.0.0.4', + externalTcpPort: 1113, + externalSecureTcpPort: 0, + internalHttpIp: '10.0.0.4', + internalHttpPort: 2112, + externalHttpIp: '10.0.0.4', + externalHttpPort: 2113, + lastCommitPosition: 649007631, + writerCheckpoint: 649023795, + chaserCheckpoint: 649023795, + epochPosition: 551088596, + epochNumber: 201, + epochId: 'd8f95f4b-167a-4487-9031-4d31a507e6d9', + nodePriority: 0, + }, + ]); + const tClusterInfoNoBestNode = new ClusterInfo([ + { + instanceId: 'bb16857d-373d-4233-a175-89c917a72329', + timeStamp: '2020-09-02T13:53:24.234898Z', + state: 'Manager', + isAlive: true, + internalTcpIp: '10.0.0.1', + internalTcpPort: 1112, + internalSecureTcpPort: 0, + externalTcpIp: '10.0.0.1', + externalTcpPort: 1113, + externalSecureTcpPort: 0, + internalHttpIp: '10.0.0.1', + internalHttpPort: 2112, + externalHttpIp: '10.0.0.1', + externalHttpPort: 2113, + lastCommitPosition: 648923382, + writerCheckpoint: 648936339, + chaserCheckpoint: 648936339, + epochPosition: 551088596, + epochNumber: 201, + epochId: 'd8f95f4b-167a-4487-9031-4d31a507e6d9', + nodePriority: 0, + }, + { + instanceId: 'b3c18dcd-6476-467a-b7b8-d6672b74e9c2', + timeStamp: '2020-09-02T13:56:06.189428Z', + state: 'CatchingUp', + isAlive: false, + internalTcpIp: '10.0.0.2', + internalTcpPort: 1112, + internalSecureTcpPort: 0, + externalTcpIp: '10.0.0.2', + externalTcpPort: 1113, + externalSecureTcpPort: 0, + internalHttpIp: '10.0.0.2', + internalHttpPort: 2112, + externalHttpIp: '10.0.0.2', + externalHttpPort: 2113, + lastCommitPosition: -1, + writerCheckpoint: 0, + chaserCheckpoint: 0, + epochPosition: -1, + epochNumber: -1, + epochId: '00000000-0000-0000-0000-000000000000', + nodePriority: 0, + }, + { + instanceId: 'e802a2b5-826c-4bd5-84d0-c9d1387fbf79', + timeStamp: '2020-09-02T13:56:07.391534Z', + state: 'Master', + isAlive: false, + internalTcpIp: '10.0.0.3', + internalTcpPort: 1112, + internalSecureTcpPort: 0, + externalTcpIp: '10.0.0.3', + externalTcpPort: 1113, + externalSecureTcpPort: 0, + internalHttpIp: '10.0.0.3', + internalHttpPort: 2112, + externalHttpIp: '10.0.0.3', + externalHttpPort: 2113, + lastCommitPosition: 649007631, + writerCheckpoint: 649024685, + chaserCheckpoint: 649024685, + epochPosition: 649023795, + epochNumber: 202, + epochId: '1f17695d-6558-4d8b-ba60-2ae273b11e09', + nodePriority: 0, + }, + { + instanceId: '24bb9031-5f21-436c-a7b5-c5f03a95e938', + timeStamp: '2020-09-02T13:54:39.023053Z', + state: 'Slave', + isAlive: false, + internalTcpIp: '10.0.0.4', + internalTcpPort: 1112, + internalSecureTcpPort: 0, + externalTcpIp: '10.0.0.4', + externalTcpPort: 1113, + externalSecureTcpPort: 0, + internalHttpIp: '10.0.0.4', + internalHttpPort: 2112, + externalHttpIp: '10.0.0.4', + externalHttpPort: 2113, + lastCommitPosition: 649007631, + writerCheckpoint: 649023795, + chaserCheckpoint: 649023795, + epochPosition: 551088596, + epochNumber: 201, + epochId: 'd8f95f4b-167a-4487-9031-4d31a507e6d9', + nodePriority: 0, + }, + ]); + const discoverer = new ClusterDiscoverer(logger, settings, mockDns, mockHttp); + const discovererWithGossipSeeds = new ClusterDiscoverer( + logger, + { + ...settings, + ...{ + seeds: [ + new GossipSeed({ + host: '10.0.0.1', + port: 2113, + }), + new GossipSeed({ + host: '10.0.0.1', + port: 2113, + }), + new GossipSeed({ + host: '10.0.0.1', + port: 2113, + }), + ], + }, + }, + mockDns, + mockHttp + ); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe('constructor', () => { + test('Should be defined', () => { + expect(discoverer).toBeDefined(); + }); + + test('Should throw an error', () => { + expect( + () => + new ClusterDiscoverer( + logger, + { + clusterDns: null, + maxDiscoverAttempts: 10, + managerExternalHttpPort: 2113, + seeds: null, + gossipTimeout: 1000, + }, + mockDns, + mockHttp + ) + ).toThrow(); + }); + expect( + () => + new ClusterDiscoverer( + logger, + { + clusterDns: null, + maxDiscoverAttempts: 10, + managerExternalHttpPort: 2113, + seeds: [], + gossipTimeout: 1000, + }, + mockDns, + mockHttp + ) + ).toThrow(); + }); + + describe('#_resolveDns', () => { + test('Should call lookup', async () => { + mockDns.lookup = jest.fn().mockResolvedValue([ + { + address: '10.0.0.1', + family: 4, + }, + ]); + await discoverer._resolveDns('my-discover.com:2113'); + expect(mockDns.lookup).toHaveBeenCalledWith('my-discover.com:2113', { + family: 4, + hints: dns.ADDRCONFIG | dns.V4MAPPED, + all: true, + }); + }); + + test('Should reject if dnsService fails', async () => { + mockDns.lookup = jest.fn().mockRejectedValue(new Error('Unexpected DNS error')); + await expect(discoverer._resolveDns('my-discover.com:2113')).rejects.toBeDefined(); + }); + + test('Should reject if no addresses are returned', async () => { + mockDns.lookup = jest.fn().mockResolvedValue([]); + await expect(discoverer._resolveDns('my-discover.com:2113')).rejects.toEqual( + new Error('No result from dns lookup for my-discover.com:2113') + ); + }); + + test('Should return a list of candidate addresses', async () => { + mockDns.lookup = jest.fn().mockResolvedValue([ + { + address: '10.0.0.1', + family: 4, + }, + { + address: '10.0.0.2', + family: 4, + }, + { + address: '10.0.0.3', + family: 4, + }, + ]); + const candidates = await discoverer._resolveDns('my-discover.com:2113'); + expect(candidates).toEqual(['10.0.0.1', '10.0.0.2', '10.0.0.3']); + }); + }); + + describe('#_clusterInfo', () => { + test('Should call httpService.request to get cluster informations', async () => { + const tCandidate = new GossipSeed( + { + host: '10.0.0.1', + port: '2113', + }, + undefined + ); + const tTimeout = 1000; + const requestEvents = {}; + let responseCallback; + mockHttp.request = jest.fn((options, callback) => { + responseCallback = callback; + return { + setTimeout: jest.fn(() => ({})), + on: (type, callback) => { + requestEvents[type] = callback; + }, + end: () => {}, + destroy: () => {}, + }; + }); + discoverer._clusterInfo(tCandidate, tTimeout); + expect(mockHttp.request).toHaveBeenCalledWith( + { + host: tCandidate.endPoint.host, + port: tCandidate.endPoint.port, + path: '/gossip?format=json', + timeout: tTimeout, + }, + expect.anything() + ); + }); + + test('Should call httpService.request to get cluster informations with host header', async () => { + const tCandidate = new GossipSeed( + { + host: '10.0.0.1', + port: '2113', + }, + undefined, + 'MyHost' + ); + const tTimeout = 1000; + const requestEvents = {}; + let responseCallback; + mockHttp.request = jest.fn((options, callback) => { + responseCallback = callback; + return { + setTimeout: jest.fn(() => ({})), + on: (type, callback) => { + requestEvents[type] = callback; + }, + end: () => {}, + destroy: () => {}, + }; + }); + discoverer._clusterInfo(tCandidate, tTimeout); + expect(mockHttp.request).toHaveBeenCalledWith( + { + host: tCandidate.endPoint.host, + port: tCandidate.endPoint.port, + path: '/gossip?format=json', + timeout: tTimeout, + headers: { + Host: tCandidate.hostHeader, + }, + }, + expect.anything() + ); + }); + + test('Should return a timeout error if the sockets fails to be connected in the specified timeout', async () => { + const tCandidate = new GossipSeed({ + host: '10.0.0.1', + port: '2113', + }); + const tTimeout = 1000; + const requestEvents = {}; + let responseCallback; + mockHttp.request = jest.fn((options, callback) => { + responseCallback = callback; + return { + setTimeout: jest.fn(() => ({})), + on: (type, callback) => { + requestEvents[type] = callback; + }, + end: () => { + requestEvents['timeout'](); + }, + destroy: () => {}, + }; + }); + await expect(discoverer._clusterInfo(tCandidate, tTimeout)).rejects.toThrow( + new Error('Connection to gossip timed out') + ); + }); + + test('Should return an error if the http request emits an error', async () => { + const tCandidate = new GossipSeed({ + host: '10.0.0.1', + port: '2113', + }); + const tTimeout = 1000; + const requestEvents = {}; + let responseCallback; + mockHttp.request = jest.fn((options, callback) => { + responseCallback = callback; + return { + setTimeout: jest.fn(() => ({})), + on: (type, callback) => { + requestEvents[type] = callback; + }, + end: () => { + requestEvents['error'](new Error('Request error')); + }, + destroy: () => {}, + }; + }); + await expect(discoverer._clusterInfo(tCandidate, tTimeout)).rejects.toThrow( + new Error('Connection to gossip errored') + ); + }); + + test("Should return an error if the candidate doesn't returns a 200 code", async () => { + const tCandidate = new GossipSeed({ + host: '10.0.0.1', + port: '2113', + }); + const tTimeout = 1000; + const requestEvents = {}; + let responseCallback; + mockHttp.request = jest.fn((options, callback) => { + responseCallback = callback; + return { + setTimeout: jest.fn(() => ({})), + on: (type, callback) => { + requestEvents[type] = callback; + }, + end: () => { + callback({ + statusCode: 503, + on: (type, callback) => { + responseEvents[type] = callback; + }, + }); + }, + destroy: () => {}, + }; + }); + await expect(discoverer._clusterInfo(tCandidate, tTimeout)).rejects.toThrow( + new Error('Gossip candidate returns a 503 error') + ); + }); + + test('Should return an error if the response is not a valid JSON', async () => { + const tCandidate = new GossipSeed({ + host: '10.0.0.1', + port: '2113', + }); + const tTimeout = 1000; + let responseCallback; + const requestEvents = {}; + const responseEvents = {}; + mockHttp.request = jest.fn((options, callback) => { + responseCallback = callback; + return { + setTimeout: jest.fn(() => ({})), + on: (type, callback) => { + requestEvents[type] = callback; + }, + end: () => { + callback({ + statusCode: 200, + on: (type, callback) => { + responseEvents[type] = callback; + }, + }); + responseEvents['data']('Not a JSON response'); + responseEvents['end'](); + }, + destroy: () => {}, + }; + }); + await expect(discoverer._clusterInfo(tCandidate, tTimeout)).rejects.toThrow( + new Error('Unable to parse gossip response') + ); + }); + + test('Should return the member informations for the cluster', async () => { + const tCandidate = new GossipSeed({ + host: '10.0.0.1', + port: '2113', + }); + const tTimeout = 1000; + const requestEvents = {}; + const responseEvents = {}; + mockHttp.request = jest.fn((options, callback) => { + return { + setTimeout: jest.fn(() => ({})), + on: (type, callback) => { + requestEvents[type] = callback; + }, + end: () => { + callback({ + statusCode: 200, + on: (type, callback) => { + responseEvents[type] = callback; + }, + }); + responseEvents['data'](fs.readFileSync(path.resolve(__dirname, '../../fixtures/gossip.json'))); + responseEvents['end'](); + }, + destroy: () => {}, + }; + }); + const infos = await discoverer._clusterInfo(tCandidate, tTimeout); + expect(infos).toEqual(tClusterInfo); + }); + }); + + describe('#_getGossipCandidates', () => { + test('Should get from dns if gossipSeeds are empty', async () => { + discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']); + const candidates = await discoverer._getGossipCandidates(settings.managerExternalHttpPort); + expect(discoverer._resolveDns).toHaveBeenCalled(); + expect(candidates).toHaveLength(3); + for (let i in candidates) { + expect(candidates[i]).toBeInstanceOf(GossipSeed); + } + }); + + test('Should get gossipSeeds if present', async () => { + discovererWithGossipSeeds._resolveDns = jest.fn(); + const candidates = await discovererWithGossipSeeds._getGossipCandidates(settings.managerExternalHttpPort); + expect(discovererWithGossipSeeds._resolveDns).not.toHaveBeenCalled(); + expect(candidates).toHaveLength(3); + for (let i in candidates) { + expect(candidates[i]).toBeInstanceOf(GossipSeed); + } + }); + }); + + describe('#discover', () => { + test('Should get resolve dns discover url to get IP addresses of the eventstore node', async () => { + discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']); + discoverer._clusterInfo = jest.fn().mockResolvedValue(tClusterInfo); + await discoverer.discover(); + expect(discoverer._resolveDns).toHaveBeenCalledWith(settings.clusterDns); + }); + + test('Should call _clusterInfo with candidate', async () => { + discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']); + discoverer._clusterInfo = jest.fn().mockResolvedValue(tClusterInfo); + await discoverer.discover(); + expect(discoverer._clusterInfo).toHaveBeenCalledWith( + new GossipSeed({ host: '10.0.0.1', port: settings.managerExternalHttpPort }), + settings.gossipTimeout + ); + }); + + test('Should call _clusterInfo with candidate from gossipSeed if provided', async () => { + discovererWithGossipSeeds._resolveDns = jest.fn().mockResolvedValue(); + discovererWithGossipSeeds._clusterInfo = jest.fn().mockResolvedValue(tClusterInfo); + await discovererWithGossipSeeds.discover(); + expect(discovererWithGossipSeeds._resolveDns).not.toHaveBeenCalled(); + }); + + test('Should return the bestNode', async () => { + discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']); + discoverer._clusterInfo = jest.fn().mockResolvedValue(tClusterInfo); + const node = await discoverer.discover(); + expect(node).toEqual( + new NodeEndPoints( + { + host: '10.0.0.3', + port: 1113, + }, + null + ) + ); + }); + + test('Should try to call each candidates until it get clusterInfo with bestNode', async () => { + discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']); + discoverer._clusterInfo = jest.fn().mockImplementation(async (candidate) => { + if (candidate.endPoint.host === '10.0.0.3') { + return tClusterInfo; + } + throw new Error('Gossip candidate returns a 503 error'); + }); + const node = await discoverer.discover(); + expect(node).toEqual( + new NodeEndPoints( + { + host: '10.0.0.3', + port: 1113, + }, + null + ) + ); + expect(discoverer._clusterInfo).toHaveBeenCalledTimes(3); + }); + + test('Should fail if the we reach the maxDiscoverAttempts limits (no bestNode is found)', async () => { + discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']); + discoverer._clusterInfo = jest.fn().mockResolvedValue(tClusterInfoNoBestNode); + await expect(discoverer.discover()).rejects.toEqual( + new Error(`Failed to discover candidate in ${settings.maxDiscoverAttempts} attempts.`) + ); + expect(discoverer._resolveDns).toHaveBeenCalledTimes(settings.maxDiscoverAttempts); + expect(discoverer._resolveDns).toHaveBeenCalledTimes(settings.maxDiscoverAttempts); + }); + + test('Should fail if the we reach the maxDiscoverAttempts limits (all resolveDns attempts fails)', async () => { + discoverer._resolveDns = jest.fn().mockRejectedValue(new Error('Connection to gossip timed out')); + await expect(discoverer.discover()).rejects.toEqual( + new Error(`Failed to discover candidate in ${settings.maxDiscoverAttempts} attempts.`) + ); + expect(discoverer._resolveDns).toHaveBeenCalledTimes(settings.maxDiscoverAttempts); + }); + + test('Should fail if the we reach the maxDiscoverAttempts limits (all clusterInfo attempts fails)', async () => { + discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']); + discoverer._clusterInfo = jest.fn().mockRejectedValue(new Error('Gossip candidate returns a 503 error')); + await expect(discoverer.discover()).rejects.toEqual( + new Error(`Failed to discover candidate in ${settings.maxDiscoverAttempts} attempts.`) + ); + expect(discoverer._resolveDns).toHaveBeenCalledTimes(settings.maxDiscoverAttempts); + expect(discoverer._resolveDns).toHaveBeenCalledTimes(settings.maxDiscoverAttempts); + }); + + test('Should try to call each candidates expect failed one until it get clusterInfo with bestNode', async () => { + discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']); + discoverer._clusterInfo = jest.fn().mockImplementation(async (candidate) => { + if (candidate.endPoint.host === '10.0.0.3') { + return tClusterInfo; + } + throw new Error('Gossip candidate returns a 503 error'); + }); + const node = await discoverer.discover({ host: '10.0.0.2', port: 2113 }); + expect(node).toEqual( + new NodeEndPoints( + { + host: '10.0.0.3', + port: 1113, + }, + null + ) + ); + expect(discoverer._clusterInfo).toHaveBeenCalledTimes(2); + expect(discoverer._clusterInfo).toHaveBeenCalledWith( + new GossipSeed({ host: '10.0.0.1', port: 2113 }), + settings.gossipTimeout + ); + expect(discoverer._clusterInfo).toHaveBeenCalledWith( + new GossipSeed({ host: '10.0.0.3', port: 2113 }), + settings.gossipTimeout + ); + }); + }); +});