feat(cluster): rewrite cluster discovering
* Discovering process adapted from EventStore scala client * Use DNS for first discover but also for reconnoctions (the aim is to be able to reconnect even if all nodes have new IP addresses eg. after rolling update in a cloud environment) * Being able to perform proper unit testing
This commit is contained in:
25
src/common/utils/shuffle.js
Normal file
25
src/common/utils/shuffle.js
Normal file
@@ -0,0 +1,25 @@
|
||||
function rndNext(min, max) {
|
||||
min = Math.ceil(min);
|
||||
max = Math.floor(max);
|
||||
return Math.floor(Math.random() * (max - min)) + min;
|
||||
}
|
||||
|
||||
function shuffle (arr, from, to) {
|
||||
if (!to) {
|
||||
to = arr.length - 1;
|
||||
}
|
||||
if (!from) {
|
||||
from = 0;
|
||||
}
|
||||
const newArr = [...arr];
|
||||
if (from >= to) return;
|
||||
for (var current = from; current <= to; ++current) {
|
||||
var index = rndNext(current, to + 1);
|
||||
var tmp = newArr[index];
|
||||
newArr[index] = newArr[current];
|
||||
newArr[current] = tmp;
|
||||
}
|
||||
return newArr;
|
||||
};
|
||||
|
||||
module.exports = shuffle;
|
163
src/core/cluster/clusterDiscoverer.js
Normal file
163
src/core/cluster/clusterDiscoverer.js
Normal file
@@ -0,0 +1,163 @@
|
||||
const ClusterInfo = require('./clusterInfo');
|
||||
const GossipSeed = require('../../gossipSeed');
|
||||
const NodeEndPoints = require('./nodeEndpoints');
|
||||
const shuffle = require('../../common/utils/shuffle');
|
||||
|
||||
function wait(ms) {
|
||||
return new Promise(resolve => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
/**
|
||||
* ClusterDiscoverer
|
||||
* @constructor
|
||||
* @class
|
||||
* @param {Logger} log - Logger instance
|
||||
* @param {Object} settings - Settings object
|
||||
* @param {Object} dnsService - DNS service to perform DNS lookup
|
||||
* @param {Object} httpService - HTTP service to perform http requests
|
||||
*/
|
||||
function ClusterDiscoverer(log, settings, dnsService, httpService) {
|
||||
if (!settings.clusterDns && (!settings.seeds || settings.seeds.length === 0))
|
||||
throw new Error('Both clusterDns and seeds are null/empty.');
|
||||
this._log = log;
|
||||
|
||||
this._settings = settings;
|
||||
this._dnsService = dnsService;
|
||||
this._httpService = httpService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Discover Cluster endpoints
|
||||
* @param {Object} failedTcpEndPoint - The failed TCP endpoint which were used by the handler
|
||||
* @returns {Promise.<NodeEndPoints>}
|
||||
*/
|
||||
ClusterDiscoverer.prototype.discover = async function (failedTcpEndPoint) {
|
||||
let attempts = 0;
|
||||
while (attempts++ < this._settings.maxDiscoverAttempts) {
|
||||
try {
|
||||
const candidates = await this._getGossipCandidates(this._settings.managerExternalHttpPort);
|
||||
const gossipSeeds = candidates.filter(
|
||||
(candidate) =>
|
||||
!failedTcpEndPoint ||
|
||||
!(candidate.endPoint.host === failedTcpEndPoint.host && candidate.endPoint.port === failedTcpEndPoint.port)
|
||||
);
|
||||
let gossipSeedsIndex = 0;
|
||||
let clusterInfo;
|
||||
do {
|
||||
try {
|
||||
clusterInfo = await this._clusterInfo(gossipSeeds[gossipSeedsIndex], this._settings.gossipTimeout);
|
||||
if (!clusterInfo.bestNode) {
|
||||
this._log.info(
|
||||
`Discovering attempt ${attempts}/${this._settings.maxDiscoverAttempts} failed: no candidate found.`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
} catch (err) {}
|
||||
} while (++gossipSeedsIndex < gossipSeeds.length);
|
||||
if (clusterInfo) {
|
||||
return NodeEndPoints.createFromGossipMember(clusterInfo.bestNode);
|
||||
}
|
||||
} catch (err) {
|
||||
this._log.info(
|
||||
`Discovering attempt ${attempts}/${this._settings.maxDiscoverAttempts} failed with error: ${err}.\n${err.stack}`
|
||||
);
|
||||
}
|
||||
await wait(this._settings.discoverDelay);
|
||||
}
|
||||
throw new Error(`Failed to discover candidate in ${this._settings.maxDiscoverAttempts} attempts.`);
|
||||
};
|
||||
|
||||
/**
|
||||
* Get gossip candidates either from DNS or from gossipSeeds settings
|
||||
* @private
|
||||
* @param {Number} managerExternalHttpPort - Http port of the manager (or the http port of the node for OSS clusters)
|
||||
* @returns {Promise.<GossipSeed[]>}
|
||||
*/
|
||||
ClusterDiscoverer.prototype._getGossipCandidates = async function (managerExternalHttpPort) {
|
||||
const gossipSeeds =
|
||||
this._settings.seeds && this._settings.seeds.length > 0
|
||||
? this._settings.seeds
|
||||
: (await this._resolveDns(this._settings.clusterDns)).map(
|
||||
(address) => new GossipSeed({ host: address, port: managerExternalHttpPort }, undefined)
|
||||
);
|
||||
return shuffle(gossipSeeds);
|
||||
};
|
||||
|
||||
/**
|
||||
* Resolve the cluster DNS discovery address to retrieve belonging ip addresses
|
||||
* @private
|
||||
* @param {String} clusterDns - Cluster DNS discovery address
|
||||
* @returns {Promise.<String[]>}
|
||||
*/
|
||||
ClusterDiscoverer.prototype._resolveDns = async function (clusterDns) {
|
||||
const dnsOptions = {
|
||||
family: 4,
|
||||
hints: this._dnsService.ADDRCONFIG | this._dnsService.V4MAPPED,
|
||||
all: true,
|
||||
};
|
||||
const result = await this._dnsService.lookup(clusterDns, dnsOptions);
|
||||
if (!result || result.length === 0) {
|
||||
throw new Error(`No result from dns lookup for ${clusterDns}`);
|
||||
}
|
||||
return result.map((address) => address.address);
|
||||
};
|
||||
|
||||
/**
|
||||
* Get cluster informations (gossip members)
|
||||
* @param {GossipSeed} candidate - candidate to get informations from
|
||||
* @param {Number} timeout - timeout for the http request
|
||||
* @returns {Promise.<ClusterInfo>}
|
||||
*/
|
||||
ClusterDiscoverer.prototype._clusterInfo = async function (candidate, timeout) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const options = {
|
||||
host: candidate.endPoint.host,
|
||||
port: candidate.endPoint.port,
|
||||
path: '/gossip?format=json',
|
||||
timeout: timeout,
|
||||
};
|
||||
if (candidate.hostHeader) {
|
||||
options.headers = {
|
||||
Host: candidate.hostHeader,
|
||||
};
|
||||
}
|
||||
|
||||
const request = this._httpService.request(options, (res) => {
|
||||
if (res.statusCode !== 200) {
|
||||
this._log.info('Trying to get gossip from', candidate, 'failed with status code:', res.statusCode);
|
||||
reject(new Error(`Gossip candidate returns a ${res.statusCode} error`));
|
||||
return;
|
||||
}
|
||||
let result = '';
|
||||
res.on('data', (chunk) => {
|
||||
result += chunk.toString();
|
||||
});
|
||||
res.on('end', function () {
|
||||
try {
|
||||
result = JSON.parse(result);
|
||||
} catch (e) {
|
||||
reject(new Error('Unable to parse gossip response'));
|
||||
}
|
||||
resolve(new ClusterInfo(result.members));
|
||||
});
|
||||
});
|
||||
|
||||
request.setTimeout(timeout);
|
||||
|
||||
request.on('timeout', () => {
|
||||
this._log.info('Trying to get gossip from', candidate, 'timed out.');
|
||||
request.destroy();
|
||||
reject(new Error('Connection to gossip timed out'));
|
||||
});
|
||||
|
||||
request.on('error', (error) => {
|
||||
this._log.info('Trying to get gossip from', candidate, 'errored', error);
|
||||
request.destroy();
|
||||
reject(new Error('Connection to gossip errored'));
|
||||
});
|
||||
|
||||
request.end();
|
||||
});
|
||||
};
|
||||
|
||||
module.exports = ClusterDiscoverer;
|
36
src/core/cluster/clusterInfo.js
Normal file
36
src/core/cluster/clusterInfo.js
Normal file
@@ -0,0 +1,36 @@
|
||||
const MemberInfo = require('./memberInfo.js');
|
||||
|
||||
const VNodeStates = Object.freeze({
|
||||
Initializing: 0,
|
||||
Unknown: 1,
|
||||
PreReplica: 2,
|
||||
CatchingUp: 3,
|
||||
Clone: 4,
|
||||
Slave: 5,
|
||||
PreMaster: 6,
|
||||
Master: 7,
|
||||
Manager: 8,
|
||||
ShuttingDown: 9,
|
||||
Shutdown: 10
|
||||
});
|
||||
|
||||
function ClusterInfo(members) {
|
||||
this._members = members.map(member => new MemberInfo(member));
|
||||
|
||||
Object.defineProperty(this, 'bestNode', {
|
||||
enumerable: true,
|
||||
get: function () {
|
||||
return this._getBestNode();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ClusterInfo.prototype._getBestNode = function () {
|
||||
return this._members
|
||||
.filter(member => member.isAlive && member.isAllowedToConnect)
|
||||
.sort(function (a, b) {
|
||||
return VNodeStates[b.state] - VNodeStates[a.state];
|
||||
})[0];
|
||||
}
|
||||
|
||||
module.exports = ClusterInfo;
|
73
src/core/cluster/memberInfo.js
Normal file
73
src/core/cluster/memberInfo.js
Normal file
@@ -0,0 +1,73 @@
|
||||
const NOT_ALLOWED_STATES = [
|
||||
'Manager',
|
||||
'ShuttingDown',
|
||||
'Shutdown'
|
||||
];
|
||||
|
||||
function MemberInfo(informations) {
|
||||
this._instanceId = informations.instanceId;
|
||||
this._timeStamp = informations.timeStamp;
|
||||
this._state = informations.state;
|
||||
this._isAlive = informations.isAlive;
|
||||
this._internalTcpIp = informations.internalTcpIp;
|
||||
this._internalTcpPort = informations.internalTcpPort;
|
||||
this._internalSecureTcpPort = informations.internalSecureTcpPort;
|
||||
this._externalTcpIp = informations.externalTcpIp;
|
||||
this._externalTcpPort = informations.externalTcpPort;
|
||||
this._externalSecureTcpPort = informations.externalSecureTcpPort;
|
||||
this._internalHttpIp = informations.internalHttpIp;
|
||||
this._internalHttpPort = informations.internalHttpPort;
|
||||
this._externalHttpIp = informations.externalHttpIp;
|
||||
this._externalHttpPort = informations.externalHttpPort;
|
||||
this._lastCommitPosition = informations.lastCommitPosition;
|
||||
this._writerCheckpoint = informations.writerCheckpoint;
|
||||
this._chaserCheckpoint = informations.chaserCheckpoint;
|
||||
this._epochPosition = informations.epochPosition;
|
||||
this._epochNumber = informations.epochNumber;
|
||||
this._epochId = informations.epochId;
|
||||
this._nodePriority = informations.nodePriority;
|
||||
|
||||
Object.defineProperty(this, 'state', {
|
||||
enumerable: true,
|
||||
get: function () {
|
||||
return this._state;
|
||||
}
|
||||
});
|
||||
|
||||
Object.defineProperty(this, 'isAllowedToConnect', {
|
||||
enumerable: true,
|
||||
get: function () {
|
||||
return !NOT_ALLOWED_STATES.includes(this._state);
|
||||
}
|
||||
});
|
||||
|
||||
Object.defineProperty(this, 'isAlive', {
|
||||
enumerable: true,
|
||||
get: function () {
|
||||
return this._isAlive;
|
||||
}
|
||||
});
|
||||
|
||||
Object.defineProperty(this, 'externalTcpIp', {
|
||||
enumerable: true,
|
||||
get: function () {
|
||||
return this._externalTcpIp;
|
||||
}
|
||||
});
|
||||
|
||||
Object.defineProperty(this, 'externalTcpPort', {
|
||||
enumerable: true,
|
||||
get: function () {
|
||||
return this._externalTcpPort;
|
||||
}
|
||||
});
|
||||
|
||||
Object.defineProperty(this, 'externalSecureTcpPort', {
|
||||
enumerable: true,
|
||||
get: function () {
|
||||
return this._externalSecureTcpPort;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = MemberInfo;
|
23
src/core/cluster/nodeEndpoints.js
Normal file
23
src/core/cluster/nodeEndpoints.js
Normal file
@@ -0,0 +1,23 @@
|
||||
function NodeEndPoints(tcpEndPoint, secureTcpEndPoint) {
|
||||
if (tcpEndPoint === null && secureTcpEndPoint === null) throw new Error('Both endpoints are null.');
|
||||
Object.defineProperties(this, {
|
||||
tcpEndPoint: {
|
||||
enumerable: true,
|
||||
value: tcpEndPoint
|
||||
},
|
||||
secureTcpEndPoint: {
|
||||
enumerable: true,
|
||||
value: secureTcpEndPoint
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
NodeEndPoints.createFromGossipMember = function (member) {
|
||||
const normTcp = { host: member.externalTcpIp, port: member.externalTcpPort };
|
||||
const secTcp = member.externalSecureTcpPort > 0
|
||||
? { host: member.externalTcpIp, port: member.externalSecureTcpPort }
|
||||
: null;
|
||||
return new NodeEndPoints(normTcp, secTcp);
|
||||
}
|
||||
|
||||
module.exports = NodeEndPoints
|
@@ -1,9 +1,19 @@
|
||||
var EventStoreNodeConnection = require('./eventStoreNodeConnection');
|
||||
var StaticEndpointDiscoverer = require('./core/staticEndpointDiscoverer');
|
||||
var ClusterDnsEndPointDiscoverer = require('./core/clusterDnsEndPointDiscoverer');
|
||||
var ClusterDiscoverer = require('./core/cluster/clusterDiscoverer');
|
||||
var NoopLogger = require('./common/log/noopLogger');
|
||||
var ensure = require('./common/utils/ensure');
|
||||
|
||||
const util = require('util');
|
||||
const http = require('http');
|
||||
const dns = require('dns');
|
||||
|
||||
const dnsService = {
|
||||
lookup : util.promisify(dns.lookup),
|
||||
ADDRCONFIG: dns.ADDRCONFIG,
|
||||
V4MAPPED: dns.V4MAPPED
|
||||
};
|
||||
|
||||
var defaultConnectionSettings = Object.freeze({
|
||||
log: new NoopLogger(),
|
||||
verboseLogging: false,
|
||||
@@ -32,6 +42,7 @@ var defaultConnectionSettings = Object.freeze({
|
||||
// Cluster Settings
|
||||
clusterDns: '',
|
||||
maxDiscoverAttempts: 10,
|
||||
discoverDelay: 500,
|
||||
externalGossipPort: 0,
|
||||
gossipTimeout: 1000
|
||||
});
|
||||
@@ -80,17 +91,17 @@ function createFromClusterDns(connectionSettings, clusterDns, externalGossipPort
|
||||
var mergedSettings = merge(defaultConnectionSettings, connectionSettings || {});
|
||||
var clusterSettings = {
|
||||
clusterDns: clusterDns,
|
||||
gossipSeeds: null,
|
||||
externalGossipPort: externalGossipPort,
|
||||
seeds: null,
|
||||
managerExternalHttpPort: externalGossipPort,
|
||||
maxDiscoverAttempts: mergedSettings.maxDiscoverAttempts,
|
||||
discoverDelay: mergedSettings.discoverDelay,
|
||||
gossipTimeout: mergedSettings.gossipTimeout
|
||||
};
|
||||
var endPointDiscoverer = new ClusterDnsEndPointDiscoverer(mergedSettings.log,
|
||||
clusterSettings.clusterDns,
|
||||
clusterSettings.maxDiscoverAttempts,
|
||||
clusterSettings.externalGossipPort,
|
||||
clusterSettings.gossipSeeds,
|
||||
clusterSettings.gossipTimeout
|
||||
var endPointDiscoverer = new ClusterDiscoverer(
|
||||
mergedSettings.log,
|
||||
clusterSettings,
|
||||
dnsService,
|
||||
http
|
||||
);
|
||||
return new EventStoreNodeConnection(mergedSettings, clusterSettings, endPointDiscoverer, connectionName);
|
||||
}
|
||||
@@ -101,17 +112,17 @@ function createFromGossipSeeds(connectionSettings, gossipSeeds, connectionName)
|
||||
var mergedSettings = merge(defaultConnectionSettings, connectionSettings || {});
|
||||
var clusterSettings = {
|
||||
clusterDns: '',
|
||||
gossipSeeds: gossipSeeds,
|
||||
seeds: gossipSeeds,
|
||||
externalGossipPort: 0,
|
||||
maxDiscoverAttempts: mergedSettings.maxDiscoverAttempts,
|
||||
discoverDelay: mergedSettings.discoverDelay,
|
||||
gossipTimeout: mergedSettings.gossipTimeout
|
||||
};
|
||||
var endPointDiscoverer = new ClusterDnsEndPointDiscoverer(mergedSettings.log,
|
||||
clusterSettings.clusterDns,
|
||||
clusterSettings.maxDiscoverAttempts,
|
||||
clusterSettings.externalGossipPort,
|
||||
clusterSettings.gossipSeeds,
|
||||
clusterSettings.gossipTimeout
|
||||
var endPointDiscoverer = new ClusterDiscoverer(
|
||||
mergedSettings.log,
|
||||
clusterSettings,
|
||||
dnsService,
|
||||
http
|
||||
);
|
||||
return new EventStoreNodeConnection(mergedSettings, clusterSettings, endPointDiscoverer, connectionName);
|
||||
}
|
||||
|
@@ -1,7 +1,8 @@
|
||||
function GossipSeed(endPoint, hostName) {
|
||||
function GossipSeed(endPoint, hostName, hostHeader) {
|
||||
if (typeof endPoint !== 'object' || !endPoint.host || !endPoint.port) throw new TypeError('endPoint must be have host and port properties.');
|
||||
this.endPoint = endPoint;
|
||||
this.hostName = hostName;
|
||||
this.hostHeader = hostHeader;
|
||||
Object.freeze(this);
|
||||
}
|
||||
|
||||
|
@@ -160,4 +160,4 @@ TcpConnection.createConnectingConnection = function(
|
||||
return connection;
|
||||
};
|
||||
|
||||
module.exports = TcpConnection;
|
||||
module.exports = TcpConnection;
|
Reference in New Issue
Block a user