19: Implement cluster discovery using dns

This commit is contained in:
Nicolas Dextraze 2017-01-29 10:57:59 -08:00
parent 0463d85cfe
commit 1558918692
3 changed files with 91 additions and 47 deletions

View File

@ -1,5 +1,6 @@
var http = require('http'); var http = require('http');
var util = require('util'); var util = require('util');
var dns = require('dns');
var GossipSeed = require('../gossipSeed'); var GossipSeed = require('../gossipSeed');
function NodeEndPoints(tcpEndPoint, secureTcpEndPoint) { function NodeEndPoints(tcpEndPoint, secureTcpEndPoint) {
@ -61,37 +62,37 @@ ClusterDnsEndPointDiscoverer.prototype.discover = function(failedTcpEndPoint) {
*/ */
ClusterDnsEndPointDiscoverer.prototype._discoverEndPoint = function (failedTcpEndPoint) { ClusterDnsEndPointDiscoverer.prototype._discoverEndPoint = function (failedTcpEndPoint) {
try { try {
var gossipCandidates = this._oldGossip var mainPromise = this._oldGossip
? this._getGossipCandidatesFromOldGossip(this._oldGossip, failedTcpEndPoint) ? Promise.resolve(this._getGossipCandidatesFromOldGossip(this._oldGossip, failedTcpEndPoint))
: this._getGossipCandidatesFromDns(); : this._getGossipCandidatesFromDns();
var self = this; var self = this;
var promise = Promise.resolve();
var j = 0; var j = 0;
self._log.debug('Gossip candidates', gossipCandidates); return mainPromise.then(function (gossipCandidates) {
for (var i = 0; i < gossipCandidates.length; i++) { var loopPromise = Promise.resolve();
promise = promise.then(function (endPoints) { for (var i = 0; i < gossipCandidates.length; i++) {
if (endPoints) return endPoints; loopPromise = loopPromise.then(function (endPoints) {
if (endPoints) return endPoints;
return self._tryGetGossipFrom(gossipCandidates[j++]) return self._tryGetGossipFrom(gossipCandidates[j++])
.then(function (gossip) { .then(function (gossip) {
if (gossip === null || gossip.members === null || gossip.members.length === 0) if (!gossip || !gossip.members || gossip.members.length === 0)
return; return;
var bestNode = self._tryDetermineBestNode(gossip.members); var bestNode = self._tryDetermineBestNode(gossip.members);
if (bestNode !== null) { if (bestNode) {
self._oldGossip = gossip.members; self._oldGossip = gossip.members;
return bestNode; return bestNode;
} }
}); });
}); });
} }
return promise; return loopPromise;
});
} catch (e) { } catch (e) {
return Promise.reject(e); return Promise.reject(e);
} }
}; };
ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromOldGossip = function (oldGossip, failedTcpEndPoint) { ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromOldGossip = function (oldGossip, failedTcpEndPoint) {
if (failedTcpEndPoint === null) return this._arrangeGossipCandidates(oldGossip); if (!failedTcpEndPoint) return this._arrangeGossipCandidates(oldGossip);
var gossipCandidates = oldGossip.filter(function(x) { var gossipCandidates = oldGossip.filter(function(x) {
return !(x.externalTcpPort === failedTcpEndPoint.port && x.externalTcpIp === failedTcpEndPoint.host); return !(x.externalTcpPort === failedTcpEndPoint.port && x.externalTcpIp === failedTcpEndPoint.host);
}); });
@ -115,20 +116,33 @@ ClusterDnsEndPointDiscoverer.prototype._arrangeGossipCandidates = function (memb
}; };
ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromDns = function () { ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromDns = function () {
var endpoints = []; var self = this;
if(this._gossipSeeds !== null && this._gossipSeeds.length > 0) return new Promise(function (resolve, reject) {
{ if (self._gossipSeeds && self._gossipSeeds.length > 0) {
endpoints = this._gossipSeeds; var endpoints = self._gossipSeeds;
} self._randomShuffle(endpoints, 0, endpoints.length - 1);
else resolve(endpoints);
{ }
//TODO: dns resolve else {
throw new Error('Not implemented.'); const dnsOptions = {
//endpoints = ResolveDns(_clusterDns).Select(x => new GossipSeed(new IPEndPoint(x, _managerExternalHttpPort))).ToArray(); family: 4,
} hints: dns.ADDRCONFIG | dns.V4MAPPED,
all: true
this._randomShuffle(endpoints, 0, endpoints.length-1); };
return endpoints; dns.lookup(self._clusterDns, dnsOptions, function (err, addresses) {
if (err) {
return reject(err);
}
if (!addresses || addresses.length === 0) {
return reject(new Error('No result from dns lookup for ' + self._clusterDns));
}
var endpoints = addresses.map(function (x) {
return new GossipSeed({host: x.address, port: self._managerExternalHttpPort});
});
resolve(endpoints);
});
}
});
}; };
ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) { ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) {
@ -143,9 +157,9 @@ ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) {
this._log.info('Try get gossip from', endPoint); this._log.info('Try get gossip from', endPoint);
var self = this; var self = this;
return new Promise(function (resolve, reject) { return new Promise(function (resolve, reject) {
try { var timedout = false;
http http.request(options, function (res) {
.request(options, function (res) { if (timedout) return;
var result = ''; var result = '';
if (res.statusCode !== 200) { if (res.statusCode !== 200) {
self._log.info('Trying to get gossip from', endPoint, 'failed with status code:', res.statusCode); self._log.info('Trying to get gossip from', endPoint, 'failed with status code:', res.statusCode);
@ -166,16 +180,15 @@ ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) {
}) })
.setTimeout(self._gossipTimeout, function () { .setTimeout(self._gossipTimeout, function () {
self._log.info('Trying to get gossip from', endPoint, 'timed out.'); self._log.info('Trying to get gossip from', endPoint, 'timed out.');
timedout = true;
resolve(); resolve();
}) })
.on('error', function (e) { .on('error', function (e) {
if (timedout) return;
self._log.info('Trying to get gossip from', endPoint, 'failed with error:', e); self._log.info('Trying to get gossip from', endPoint, 'failed with error:', e);
resolve(); resolve();
}) })
.end(); .end();
} catch(e) {
reject(e);
}
}); });
}; };

32
test/cluster/dns_test.js Normal file
View File

@ -0,0 +1,32 @@
var client = require('../../src/client');
var uuid = require('uuid');
var settings = {
log: {
info: console.log,
error: console.log,
debug: console.log
}
};
var conn = client.createConnection(settings, "discover://es.nicdex.com:2113");
console.log('Connecting...');
conn.on('connected', function (tcpEndPoint) {
console.log('Connected to', tcpEndPoint);
setTimeout(function () {
conn.appendToStream('test-' + uuid.v4(), client.expectedVersion.noStream, [
client.createJsonEventData(uuid.v4(), {abc: 123}, {}, 'myEvent')
]);
}, 5000);
});
conn.on('error', function (err) {
console.log(err.stack);
});
conn.on('closed', function (reason) {
console.log('Connection closed:', reason);
process.exit(-1);
});
conn.connect()
.catch(function (err) {
console.log(err.stack);
process.exit(-1);
});

View File

@ -9,15 +9,14 @@ var settings = {
} }
}; };
var gossipSeeds = [ var gossipSeeds = [
new client.GossipSeed({host: '192.168.33.10', port: 1113}), new client.GossipSeed({host: '192.168.33.10', port: 2113}),
new client.GossipSeed({host: '192.168.33.11', port: 1113}), new client.GossipSeed({host: '192.168.33.11', port: 2113}),
new client.GossipSeed({host: '192.168.33.12', port: 1113}) new client.GossipSeed({host: '192.168.33.12', port: 2113})
]; ];
var conn = client.createConnection(settings, gossipSeeds); var conn = client.createConnection(settings, gossipSeeds);
console.log('Connecting...'); console.log('Connecting...');
conn.on('connected', function (tcpEndPoint) { conn.on('connected', function (tcpEndPoint) {
console.log('Connect to', tcpEndPoint); console.log('Connected to', tcpEndPoint);
setTimeout(function () { setTimeout(function () {
conn.appendToStream('test-' + uuid.v4(), client.expectedVersion.noStream, [ conn.appendToStream('test-' + uuid.v4(), client.expectedVersion.noStream, [
client.createJsonEventData(uuid.v4(), {abc: 123}, {}, 'myEvent') client.createJsonEventData(uuid.v4(), {abc: 123}, {}, 'myEvent')