diff --git a/src/core/clusterDnsEndPointDiscoverer.js b/src/core/clusterDnsEndPointDiscoverer.js index beeb4fd..c207667 100644 --- a/src/core/clusterDnsEndPointDiscoverer.js +++ b/src/core/clusterDnsEndPointDiscoverer.js @@ -1,5 +1,6 @@ var http = require('http'); var util = require('util'); +var dns = require('dns'); var GossipSeed = require('../gossipSeed'); function NodeEndPoints(tcpEndPoint, secureTcpEndPoint) { @@ -61,37 +62,37 @@ ClusterDnsEndPointDiscoverer.prototype.discover = function(failedTcpEndPoint) { */ ClusterDnsEndPointDiscoverer.prototype._discoverEndPoint = function (failedTcpEndPoint) { try { - var gossipCandidates = this._oldGossip - ? this._getGossipCandidatesFromOldGossip(this._oldGossip, failedTcpEndPoint) + var mainPromise = this._oldGossip + ? Promise.resolve(this._getGossipCandidatesFromOldGossip(this._oldGossip, failedTcpEndPoint)) : this._getGossipCandidatesFromDns(); var self = this; - var promise = Promise.resolve(); var j = 0; - self._log.debug('Gossip candidates', gossipCandidates); - for (var i = 0; i < gossipCandidates.length; i++) { - promise = promise.then(function (endPoints) { - if (endPoints) return endPoints; - - return self._tryGetGossipFrom(gossipCandidates[j++]) - .then(function (gossip) { - if (gossip === null || gossip.members === null || gossip.members.length === 0) - return; - var bestNode = self._tryDetermineBestNode(gossip.members); - if (bestNode !== null) { - self._oldGossip = gossip.members; - return bestNode; - } - }); - }); - } - return promise; + return mainPromise.then(function (gossipCandidates) { + var loopPromise = Promise.resolve(); + for (var i = 0; i < gossipCandidates.length; i++) { + loopPromise = loopPromise.then(function (endPoints) { + if (endPoints) return endPoints; + return self._tryGetGossipFrom(gossipCandidates[j++]) + .then(function (gossip) { + if (!gossip || !gossip.members || gossip.members.length === 0) + return; + var bestNode = self._tryDetermineBestNode(gossip.members); + if (bestNode) { + self._oldGossip = gossip.members; + return bestNode; + } + }); + }); + } + return loopPromise; + }); } catch (e) { return Promise.reject(e); } }; ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromOldGossip = function (oldGossip, failedTcpEndPoint) { - if (failedTcpEndPoint === null) return this._arrangeGossipCandidates(oldGossip); + if (!failedTcpEndPoint) return this._arrangeGossipCandidates(oldGossip); var gossipCandidates = oldGossip.filter(function(x) { return !(x.externalTcpPort === failedTcpEndPoint.port && x.externalTcpIp === failedTcpEndPoint.host); }); @@ -115,20 +116,33 @@ ClusterDnsEndPointDiscoverer.prototype._arrangeGossipCandidates = function (memb }; ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromDns = function () { - var endpoints = []; - if(this._gossipSeeds !== null && this._gossipSeeds.length > 0) - { - endpoints = this._gossipSeeds; - } - else - { - //TODO: dns resolve - throw new Error('Not implemented.'); - //endpoints = ResolveDns(_clusterDns).Select(x => new GossipSeed(new IPEndPoint(x, _managerExternalHttpPort))).ToArray(); - } - - this._randomShuffle(endpoints, 0, endpoints.length-1); - return endpoints; + var self = this; + return new Promise(function (resolve, reject) { + if (self._gossipSeeds && self._gossipSeeds.length > 0) { + var endpoints = self._gossipSeeds; + self._randomShuffle(endpoints, 0, endpoints.length - 1); + resolve(endpoints); + } + else { + const dnsOptions = { + family: 4, + hints: dns.ADDRCONFIG | dns.V4MAPPED, + all: true + }; + dns.lookup(self._clusterDns, dnsOptions, function (err, addresses) { + if (err) { + return reject(err); + } + if (!addresses || addresses.length === 0) { + return reject(new Error('No result from dns lookup for ' + self._clusterDns)); + } + var endpoints = addresses.map(function (x) { + return new GossipSeed({host: x.address, port: self._managerExternalHttpPort}); + }); + resolve(endpoints); + }); + } + }); }; ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) { @@ -143,9 +157,9 @@ ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) { this._log.info('Try get gossip from', endPoint); var self = this; return new Promise(function (resolve, reject) { - try { - http - .request(options, function (res) { + var timedout = false; + http.request(options, function (res) { + if (timedout) return; var result = ''; if (res.statusCode !== 200) { self._log.info('Trying to get gossip from', endPoint, 'failed with status code:', res.statusCode); @@ -166,16 +180,15 @@ ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) { }) .setTimeout(self._gossipTimeout, function () { self._log.info('Trying to get gossip from', endPoint, 'timed out.'); + timedout = true; resolve(); }) .on('error', function (e) { + if (timedout) return; self._log.info('Trying to get gossip from', endPoint, 'failed with error:', e); resolve(); }) .end(); - } catch(e) { - reject(e); - } }); }; diff --git a/test/cluster/dns_test.js b/test/cluster/dns_test.js new file mode 100644 index 0000000..11d9488 --- /dev/null +++ b/test/cluster/dns_test.js @@ -0,0 +1,32 @@ +var client = require('../../src/client'); +var uuid = require('uuid'); + +var settings = { + log: { + info: console.log, + error: console.log, + debug: console.log + } +}; +var conn = client.createConnection(settings, "discover://es.nicdex.com:2113"); +console.log('Connecting...'); +conn.on('connected', function (tcpEndPoint) { + console.log('Connected to', tcpEndPoint); + setTimeout(function () { + conn.appendToStream('test-' + uuid.v4(), client.expectedVersion.noStream, [ + client.createJsonEventData(uuid.v4(), {abc: 123}, {}, 'myEvent') + ]); + }, 5000); +}); +conn.on('error', function (err) { + console.log(err.stack); +}); +conn.on('closed', function (reason) { + console.log('Connection closed:', reason); + process.exit(-1); +}); +conn.connect() + .catch(function (err) { + console.log(err.stack); + process.exit(-1); + }); diff --git a/test/cluster/manual_test.js b/test/cluster/gossipSeeds_test.js similarity index 77% rename from test/cluster/manual_test.js rename to test/cluster/gossipSeeds_test.js index c95ffb0..c3e5fc5 100644 --- a/test/cluster/manual_test.js +++ b/test/cluster/gossipSeeds_test.js @@ -9,15 +9,14 @@ var settings = { } }; var gossipSeeds = [ - new client.GossipSeed({host: '192.168.33.10', port: 1113}), - new client.GossipSeed({host: '192.168.33.11', port: 1113}), - new client.GossipSeed({host: '192.168.33.12', port: 1113}) + new client.GossipSeed({host: '192.168.33.10', port: 2113}), + new client.GossipSeed({host: '192.168.33.11', port: 2113}), + new client.GossipSeed({host: '192.168.33.12', port: 2113}) ]; - var conn = client.createConnection(settings, gossipSeeds); console.log('Connecting...'); conn.on('connected', function (tcpEndPoint) { - console.log('Connect to', tcpEndPoint); + console.log('Connected to', tcpEndPoint); setTimeout(function () { conn.appendToStream('test-' + uuid.v4(), client.expectedVersion.noStream, [ client.createJsonEventData(uuid.v4(), {abc: 123}, {}, 'myEvent')