From dd1302f641f48d578c97f565f453bbd7a15aa70b Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Sat, 15 Oct 2016 15:41:25 -0700 Subject: [PATCH] Implemented connection to cluster using gossip seeds --- README.md | 5 +- src/client.js | 1 + src/core/clusterDnsEndPointDiscoverer.js | 240 ++++++++++++++++ src/core/eventStoreConnectionLogicHandler.js | 2 +- src/eventStoreConnection.js | 77 ++++- src/eventStoreNodeConnection.js | 4 +- src/gossipSeed.js | 13 + test/client_test.js | 280 ------------------- test/common/base_test.js | 3 +- test/connection_test.js | 29 +- 10 files changed, 351 insertions(+), 303 deletions(-) create mode 100644 src/core/clusterDnsEndPointDiscoverer.js create mode 100644 src/gossipSeed.js delete mode 100644 test/client_test.js diff --git a/README.md b/README.md index c58922c..2aa20f9 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,9 @@ Examples ## Porting .Net Task to Node.js -I used Promise to replace .Net Task, so when executing an async command, i.e. appendToStream you'll have to wait for result/error like this: +.Net Task have been replace with Promise. When executing an async command, i.e. appendToStream you can use then/catch to wait for result/error. + +*Example* connection .appendToStream('myStream', client.expectedVersion.any, events, userCredentials) @@ -51,7 +53,6 @@ I used Promise to replace .Net Task, so when executing an async command, i.e. ap To run the tests you will need - To install the dependencies (`npm install`) -- To install nodeunit (`npm install -g nodeunit`) - Run an instance of EventStore >= 3.3.0 (competing consumers are required for test) on localhost:1113 (Download [here](https://geteventstore.com/downloads/)) To execute the tests suites simply run test with npm diff --git a/src/client.js b/src/client.js index e503577..8bef120 100644 --- a/src/client.js +++ b/src/client.js @@ -44,6 +44,7 @@ module.exports.UserCredentials = require('./systemData/userCredentials'); module.exports.EventData = EventData; module.exports.PersistentSubscriptionSettings = require('./persistentSubscriptionSettings'); module.exports.SystemConsumerStrategies = require('./systemConsumerStrategies'); +module.exports.GossipSeed = require('./gossipSeed'); // Exporting errors module.exports.WrongExpectedVersionError = require('./errors/wrongExpectedVersionError'); module.exports.StreamDeletedError = require('./errors/streamDeletedError'); diff --git a/src/core/clusterDnsEndPointDiscoverer.js b/src/core/clusterDnsEndPointDiscoverer.js new file mode 100644 index 0000000..5205fd5 --- /dev/null +++ b/src/core/clusterDnsEndPointDiscoverer.js @@ -0,0 +1,240 @@ +var http = require('http'); +var util = require('util'); +var GossipSeed = require('../gossipSeed'); + +function NodeEndPoints(tcpEndPoint, secureTcpEndPoint) { + if (tcpEndPoint === null && secureTcpEndPoint === null) throw new Error('Both endpoints are null.'); + Object.defineProperties(this, { + tcpEndPoint: { + enumerable: true, + value: tcpEndPoint + }, + secureTcpEndPoint: { + enumerable: true, + value: secureTcpEndPoint + } + }); +} + +function ClusterDnsEndPointDiscoverer(log, clusterDns, maxDiscoverAttempts, managerExternalHttpPort, gossipSeeds, gossipTimeout) { + if (!clusterDns && (!gossipSeeds || gossipSeeds.length === 0)) throw new Error('Both clusterDns and gossipSeeds are null/empty.'); + this._log = log; + this._clusterDns = clusterDns; + this._maxDiscoverAttempts = maxDiscoverAttempts; + this._managerExternalHttpPort = managerExternalHttpPort; + this._gossipSeeds = gossipSeeds; + this._gossipTimeout = gossipTimeout; + this._oldGossip = null; +} + +ClusterDnsEndPointDiscoverer.prototype.discover = function(failedTcpEndPoint) { + var attempt = 1; + var self = this; + function discover(resolve, reject) { + self._discoverEndPoint(failedTcpEndPoint) + .then(function (endPoints) { + if (!endPoints) + self._log.info(util.format("Discovering attempt %d/%d failed: no candidate found.", attempt, self._maxDiscoverAttempts)); + return endPoints; + }) + .catch(function (exc) { + self._log.info(util.format("Discovering attempt %d/%d failed with error: %s.", attempt, self._maxDiscoverAttempts, exc)); + }) + .then(function (endPoints) { + if (endPoints) + return resolve(endPoints); + if (attempt++ === self._maxDiscoverAttempts) + return reject(new Error('Failed to discover candidate in ' + self._maxDiscoverAttempts + ' attempts.')); + setTimeout(discover, 500, resolve, reject); + }); + } + return new Promise(function (resolve, reject) { + discover(resolve, reject); + }); +}; + +/** + * Discover Cluster endpoints + * @param {Object} failedTcpEndPoint + * @returns {Promise.} + * @private + */ +ClusterDnsEndPointDiscoverer.prototype._discoverEndPoint = function (failedTcpEndPoint) { + try { + var gossipCandidates = this._oldGossip + ? this._getGossipCandidatesFromOldGossip(this._oldGossip, failedTcpEndPoint) + : this._getGossipCandidatesFromDns(); + var self = this; + var promise = Promise.resolve(); + var j = 0; + for (var i = 0; i < gossipCandidates.length; i++) { + promise = promise.then(function (endPoints) { + if (endPoints) return endPoints; + + return self._tryGetGossipFrom(gossipCandidates[j++]) + .then(function (gossip) { + if (gossip === null || gossip.members === null || gossip.members.length === 0) + return; + var bestNode = self._tryDetermineBestNode(gossip.members); + if (bestNode !== null) { + self._oldGossip = gossip.members; + return bestNode; + } + }); + }); + } + return promise; + } catch (e) { + return Promise.reject(e); + } +}; + +ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromOldGossip = function (oldGossip, failedTcpEndPoint) { + if (failedTcpEndPoint === null) return oldGossip; + var gossipCandidates = oldGossip.filter(function(x) { + //TODO: failedTcpEndpoint.host might not be an ip + return (x.externalTcpPort !== failedTcpEndPoint.port && x.externalTcpIp !== failedTcpEndPoint.host); + }); + return this._arrangeGossipCandidates(gossipCandidates); +}; + +ClusterDnsEndPointDiscoverer.prototype._arrangeGossipCandidates = function (members) { + var result = new Array(members.length); + var i = -1; + var j = members.length; + for (var k = 0; k < members.length; ++k) + { + if (members[k].state === 'Manager') + result[--j] = new GossipSeed({host: members[k].externalHttpIp, port: members[k].externalHttpPort}); + else + result[++i] = new GossipSeed({host: members[k].externalHttpIp, port: members[k].externalHttpPort}); + } + this._randomShuffle(result, 0, i); // shuffle nodes + this._randomShuffle(result, j, members.length - 1); // shuffle managers + return result; +}; + +ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromDns = function () { + var endpoints = []; + if(this._gossipSeeds !== null && this._gossipSeeds.length > 0) + { + endpoints = this._gossipSeeds; + } + else + { + //TODO: dns resolve + throw new Error('Not implemented.'); + //endpoints = ResolveDns(_clusterDns).Select(x => new GossipSeed(new IPEndPoint(x, _managerExternalHttpPort))).ToArray(); + } + + this._randomShuffle(endpoints, 0, endpoints.length-1); + return endpoints; +}; + +ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) { + var options = { + hostname: endPoint.endPoint.hostname, + port: endPoint.endPoint.port, + path: '/gossip?format=json' + }; + if (endPoint.hostHeader) { + options.headers = {'Host': endPoint.hostHeader}; + } + var self = this; + return new Promise(function (resolve, reject) { + try { + http + .request(options, function (res) { + var result = ''; + if (res.statusCode !== 200) { + self._log.info('Trying to get gossip from', endPoint, 'failed with status code:', res.statusCode); + resolve(); + return; + } + res.on('data', function (chunk) { + result += chunk.toString(); + }); + res.on('end', function () { + try { + result = JSON.parse(result); + } catch (e) { + return resolve(); + } + resolve(result); + }); + }) + .setTimeout(self._gossipTimeout, function () { + self._log.info('Trying to get gossip from', endPoint, 'timed out.'); + resolve(); + }) + .on('error', function (e) { + self._log.info('Trying to get gossip from', endPoint, 'failed with error:', e); + resolve(); + }) + .end(); + } catch(e) { + reject(e); + } + }); +}; + +const VNodeStates = { + 'Initializing': 0, + 'Unknown': 1, + 'PreReplica': 2, + 'CatchingUp': 3, + 'Clone': 4, + 'Slave': 5, + 'PreMaster': 6, + 'Master': 7, + 'Manager': 8, + 'ShuttingDown': 9, + 'Shutdown': 10 +}; + +ClusterDnsEndPointDiscoverer.prototype._tryDetermineBestNode = function (members) { + var notAllowedStates = [ + 'Manager', + 'ShuttingDown', + 'Shutdown' + ]; + var node = members + .filter(function (x) { + return (x.isAlive && notAllowedStates.indexOf(x.state) === -1); + }) + .sort(function (a, b) { + return VNodeStates[b.state] - VNodeStates[a.state]; + })[0]; + if (!node) + { + //_log.Info("Unable to locate suitable node. Gossip info:\n{0}.", string.Join("\n", members.Select(x => x.ToString()))); + return null; + } + + var normTcp = {host: node.externalTcpIp, port: node.externalTcpPort}; + var secTcp = node.externalSecureTcpPort > 0 + ? {host: externalTcpIp, port: node.externalSecureTcpPort} + : null; + this._log.info(util.format("Discovering: found best choice [%j,%j] (%s).", normTcp, secTcp == null ? "n/a" : secTcp, node.state)); + return new NodeEndPoints(normTcp, secTcp); +}; + +function rndNext(min, max) { + min = Math.ceil(min); + max = Math.floor(max); + return Math.floor(Math.random() * (max - min)) + min; +} + +ClusterDnsEndPointDiscoverer.prototype._randomShuffle = function (arr, i, j) { + if (i >= j) + return; + for (var k = i; k <= j; ++k) + { + var index = rndNext(k, j + 1); + var tmp = arr[index]; + arr[index] = arr[k]; + arr[k] = tmp; + } +}; + +module.exports = ClusterDnsEndPointDiscoverer; \ No newline at end of file diff --git a/src/core/eventStoreConnectionLogicHandler.js b/src/core/eventStoreConnectionLogicHandler.js index d9c7b8f..c0f6c7e 100644 --- a/src/core/eventStoreConnectionLogicHandler.js +++ b/src/core/eventStoreConnectionLogicHandler.js @@ -624,7 +624,7 @@ EventStoreConnectionLogicHandler.prototype._timerTick = function() { }; EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() { - if (this._connection == null) throw new Error(); + if (this._connection == null) return; var timeout = this._heartbeatInfo.isIntervalStage ? this._settings.heartbeatInterval : this._settings.heartbeatTimeout; if (Date.now() - this._heartbeatInfo.timeStamp < timeout) diff --git a/src/eventStoreConnection.js b/src/eventStoreConnection.js index 0ed9a65..a2db720 100644 --- a/src/eventStoreConnection.js +++ b/src/eventStoreConnection.js @@ -1,6 +1,8 @@ var EventStoreNodeConnection = require('./eventStoreNodeConnection'); var StaticEndpointDiscoverer = require('./core/staticEndpointDiscoverer'); +var ClusterDnsEndPointDiscoverer = require('./core/clusterDnsEndPointDiscoverer'); var NoopLogger = require('./common/log/noopLogger'); +var ensure = require('./common/utils/ensure'); var defaultConnectionSettings = { log: new NoopLogger(), @@ -25,7 +27,13 @@ var defaultConnectionSettings = { failOnNoServerResponse: false, heartbeatInterval: 750, heartbeatTimeout: 1500, - clientConnectionTimeout: 1000 + clientConnectionTimeout: 1000, + + // Cluster Settings + clusterDns: '', + maxDiscoverAttemps: 10, + externalGossipPort: 0, + gossipTimeout: 1000 }; @@ -40,24 +48,63 @@ function merge(a,b) { return c; } +function createFromTcpEndpoint(settings, tcpEndpoint, connectionName) { + if (!tcpEndpoint.port || !tcpEndpoint.hostname) throw new TypeError('endPoint object must have hostname and port properties.'); + var mergedSettings = merge(defaultConnectionSettings, settings || {}); + var endpointDiscoverer = new StaticEndpointDiscoverer(tcpEndpoint, settings.useSslConnection); + return new EventStoreNodeConnection(mergedSettings, null, endpointDiscoverer, connectionName || null); +} + +function createFromStringEndpoint(settings, endPoint, connectionName) { + var m = endPoint.match(/^(tcp|discover):\/\/([^:]):?(\d+)?$/); + if (!m) throw new Error('endPoint string must be tcp://hostname[:port] or discover://dns[:port]'); + var scheme = m[1]; + var hostname = m[2]; + var port = m[3] ? parseInt(m[3]) : 1113; + if (scheme === 'tcp') { + var tcpEndpoint = { + hostname: hostname, + port: port + }; + return createFromTcpEndpoint(settings, tcpEndpoint, connectionName); + } + if (scheme === 'discover') { + throw new Error('Not implemented.'); + } + throw new Error('Invalid scheme for endPoint: ' + scheme); +} + +function createFromGossipSeeds(connectionSettings, gossipSeeds, connectionName) { + ensure.notNull(connectionSettings, "connectionSettings"); + ensure.notNull(gossipSeeds, "gossipSeeds"); + var mergedSettings = merge(defaultConnectionSettings, connectionSettings || {}); + var clusterSettings = { + clusterDns: '', + gossipSeeds: gossipSeeds, + externalGossipPort: 0, + maxDiscoverAttempts: mergedSettings.maxDiscoverAttempts, + gossipTimeout: mergedSettings.gossipTimeout + }; + var endPointDiscoverer = new ClusterDnsEndPointDiscoverer(connectionSettings.log, + clusterSettings.clusterDns, + clusterSettings.maxDiscoverAttempts, + clusterSettings.externalGossipPort, + clusterSettings.gossipSeeds, + clusterSettings.gossipTimeout + ); + return new EventStoreNodeConnection(mergedSettings, clusterSettings, endPointDiscoverer, connectionName); +} + /** * Create an EventStore connection * @param {object} settings - * @param {string|object} endPoint + * @param {string|object|array} endPointOrGossipSeeds * @param {string} [connectionName] * @returns {EventStoreNodeConnection} */ -module.exports.create = function(settings, endPoint, connectionName) { - if (typeof endPoint === 'object') { - var mergedSettings = merge(defaultConnectionSettings, settings || {}); - var endpointDiscoverer = new StaticEndpointDiscoverer(endPoint, settings.useSslConnection); - return new EventStoreNodeConnection(mergedSettings, endpointDiscoverer, connectionName || null); - } - if (typeof endPoint === 'string') { - //TODO: tcpEndpoint represented as tcp://hostname:port - //TODO: cluster discovery via dns represented as discover://dns:?port - throw new Error('Not implemented.'); - } - //TODO: cluster discovery via gossip seeds in settings - throw new Error('Not implemented.'); +module.exports.create = function(settings, endPointOrGossipSeeds, connectionName) { + if (Array.isArray(endPointOrGossipSeeds)) return createFromGossipSeeds(settings, endPointOrGossipSeeds, connectionName); + if (typeof endPointOrGossipSeeds === 'object') return createFromTcpEndpoint(settings, endPointOrGossipSeeds, connectionName); + if (typeof endPointOrGossipSeeds === 'string') return createFromStringEndpoint(settings, endPointOrGossipSeeds, connectionName); + throw new TypeError('endPointOrGossipSeeds must be an object, a string or an array.'); }; \ No newline at end of file diff --git a/src/eventStoreNodeConnection.js b/src/eventStoreNodeConnection.js index b243c6d..3df5b20 100644 --- a/src/eventStoreNodeConnection.js +++ b/src/eventStoreNodeConnection.js @@ -34,14 +34,16 @@ const MaxReadSize = 4096; /** * @param settings + * @param clusterSettings * @param endpointDiscoverer * @param connectionName * @constructor * @property {string} connectionName */ -function EventStoreNodeConnection(settings, endpointDiscoverer, connectionName) { +function EventStoreNodeConnection(settings, clusterSettings, endpointDiscoverer, connectionName) { this._connectionName = connectionName || ['ES-', uuid.v4()].join(''); this._settings = settings; + this._clusterSettings = clusterSettings; this._endpointDiscoverer = endpointDiscoverer; this._handler = new EventStoreConnectionLogicHandler(this, settings); diff --git a/src/gossipSeed.js b/src/gossipSeed.js new file mode 100644 index 0000000..14bc015 --- /dev/null +++ b/src/gossipSeed.js @@ -0,0 +1,13 @@ +module.exports = function GossipSeed(endPoint, hostName) { + if (typeof endPoint !== 'object' || !endPoint.hostname || !endPoint.port) throw new TypeError('endPoint must be have hostname and port properties.'); + Object.defineProperties(this, { + endPoint: { + enumerable: true, + value: endPoint + }, + hostName: { + enumerable: true, + value: hostName + } + }); +}; diff --git a/test/client_test.js b/test/client_test.js deleted file mode 100644 index 2ca5a37..0000000 --- a/test/client_test.js +++ /dev/null @@ -1,280 +0,0 @@ -var util = require('util'); -var uuid = require('uuid'); -var client = require('../src/client'); -var NoopLogger = require('../src/common/log/noopLogger'); - -var consoleLogger = { - debug: function() { - var msg = util.format.apply(util, Array.prototype.slice.call(arguments)); - util.log(msg); - }, - info: function() {}, - error: function() {} -}; - -function createRandomEvent() { - return client.createJsonEventData(uuid.v4(), {a: uuid.v4(), b: Math.random()}, {createdAt: Date.now()}, 'testEvent'); -} - -var testStreamName = 'test-' + uuid.v4(); -var userCredentialsForAll = new client.UserCredentials("admin", "changeit"); - -function testEvent(test, event, expectedVersion) { - if (!event) return; - test.ok(event.event, "Event has no 'event'."); - if (!event.event) return; - test.ok(event.event.eventNumber === expectedVersion, util.format("Wrong expected version. Expected: %d Got: %d", event.event.eventNumber, expectedVersion)); -} - -module.exports = { - setUp: function(cb) { - var tcpEndPoint = {host: 'localhost', port: 1113}; - var settings = {verboseLogging: false, log: new NoopLogger()}; - //var settings = {verboseLogging: true, log: consoleLogger}; - this.conn = client.EventStoreConnection.create(settings, tcpEndPoint); - this.connError = null; - var self = this; - this.conn.connect() - .catch(function(e) { - self.connError = e; - cb(e); - }); - this.conn.on('connected', function() { - cb(); - }); - }, - tearDown: function(cb) { - this.conn.close(); - this.conn.on('closed', function() { - cb(); - }); - this.conn = null; - }, -/* - 'Test Connection': function(test) { - test.ok(this.connError === null, "Connection error: " + this.connError); - test.done(); - }, - 'Test Append To Stream': function(test) { - var events = [ - createRandomEvent() - ]; - this.conn.appendToStream(testStreamName, client.expectedVersion.any, events) - .then(function(result) { - test.ok(result, "No result."); - test.done(); - }) - .catch(function (err) { - test.done(err); - }); - }, - 'Test Commit Two Events Using Transaction': function(test) { - this.conn.startTransaction(testStreamName, client.expectedVersion.any) - .then(function(trx) { - test.ok(trx, "No transaction."); - return Promise.all([trx, trx.write([createRandomEvent()])]); - }) - .then(function(args) { - var trx = args[0]; - return Promise.all([trx, trx.write([createRandomEvent()])]); - }) - .then(function(args) { - var trx = args[0]; - return trx.commit(); - }) - .then(function(result) { - test.ok(result, "No result."); - test.done(); - }) - .catch(function(err) { - test.done(err); - }); - }, - 'Test Read One Event': function(test) { - this.conn.readEvent(testStreamName, 0) - .then(function(result) { - test.ok(result, "No result."); - if (result) - test.ok(result.event, "No event. " + result.status); - test.done(); - }) - .catch(function(err) { - test.done(err); - }); - }, - 'Test Read Stream Forward': function(test) { - this.conn.readStreamEventsForward(testStreamName, 0, 100) - .then(function(result) { - test.ok(result, "No result."); - if (result) - test.ok(result.events.length === 3, "Expecting 3 events, got " + result.events.length); - for(var i = 0; i < 3; i++) - testEvent(test, result.events[i], i); - test.done(); - }) - .catch(function(err) { - test.done(err); - }); - }, - 'Test Read Stream Backward': function(test) { - this.conn.readStreamEventsBackward(testStreamName, 2, 100) - .then(function(result) { - test.ok(result, "No result."); - if (result) - test.ok(result.events.length === 3, "Expecting 3 events, got " + result.events.length); - for(var i = 0; i < 3; i++) - testEvent(test, result.events[i], 2-i); - test.done(); - }) - .catch(function(err) { - test.done(err); - }); - }, - 'Test Read All Forward': function(test) { - this.conn.readAllEventsForward(client.positions.start, 100, false, userCredentialsForAll) - .then(function(result) { - test.ok(result, "No result."); - if (result) - test.ok(result.events.length >= 3, "Expecting at least 3 events, got " + result.events.length); - for(var i = 1; i < result.events.length; i++) - test.ok(result.events[i].originalPosition.compareTo(result.events[i-1].originalPosition) > 0, - util.format("event[%d] position is not > event[%d] position.", - result.events[i].originalPosition, - result.events[i-1].originalPosition)); - test.done(); - }) - .catch(function(err) { - test.done(err); - }); - }, - 'Test Read All Backward': function(test) { - this.conn.readAllEventsBackward(client.positions.end, 100, false, userCredentialsForAll) - .then(function(result) { - test.ok(result, "No result."); - if (result) - test.ok(result.events.length >= 3, "Expecting at least 3 events, got " + result.events.length); - for(var i = 1; i < result.events.length; i++) - test.ok(result.events[i].originalPosition.compareTo(result.events[i-1].originalPosition) < 0, - util.format("event[%d] position is not < event[%d] position.", - result.events[i].originalPosition, - result.events[i-1].originalPosition)); - test.done(); - }) - .catch(function(err) { - test.done(err); - }); - }, - 'Test Subscribe to Stream': function(test) { - var done = false; - function eventAppeared() { - if (!done) { - done = true; - test.done(); - } - } - function subscriptionDropped() { - if (!done) { - done = true; - test.done(); - } - } - var conn = this.conn; - this.conn.subscribeToStream(testStreamName, false, eventAppeared, subscriptionDropped) - .then(function(subscription) { - var events = [createRandomEvent()]; - return conn.appendToStream(testStreamName, client.expectedVersion.any, events); - }) - .catch(function(err) { - done = true; - test.done(err); - }) - } - 'Test Subscribe to All': function(test) { - var done = false; - function eventAppeared() { - if (!done) { - done = true; - test.done(); - } - } - function subscriptionDropped() { - if (!done) { - done = true; - test.done(); - } - } - var conn = this.conn; - this.conn.subscribeToAll(false, eventAppeared, subscriptionDropped, userCredentialsForAll) - .then(function(subscription) { - var events = [createRandomEvent()]; - return conn.appendToStream(testStreamName, client.expectedVersion.any, events); - }) - .catch(function(err) { - done = true; - test.done(err); - }); - }, - 'Test Subscribe to Stream From': function(test) { - var self = this; - var liveProcessing = false; - var catchUpEvents = []; - var liveEvents = []; - function eventAppeared(s, e) { - if (liveProcessing) { - liveEvents.push(e); - s.stop(); - } else { - catchUpEvents.push(e); - } - } - function liveProcessingStarted() { - liveProcessing = true; - var events = [createRandomEvent()]; - self.conn.appendToStream(testStreamName, client.expectedVersion.any, events); - } - function subscriptionDropped(connection, reason, error) { - console.log(reason); - test.ok(liveEvents.length === 1, "Expecting 1 live event, got " + liveEvents.length); - test.ok(catchUpEvents.length >= 1, "Expecting at least 1 catchUp event, got " + catchUpEvents.length); - test.done(error); - } - this.conn.appendToStream(testStreamName, client.expectedVersion.noStream, ) - var subscription = this.conn.subscribeToStreamFrom(testStreamName, null, false, eventAppeared, liveProcessingStarted, subscriptionDropped); - }, - 'Test Subscribe to All From': function(test) { - var self = this; - var liveProcessing = false; - var catchUpEvents = []; - var liveEvents = []; - function eventAppeared(s, e) { - if (liveProcessing) { - liveEvents.push(e); - s.stop(); - } else { - catchUpEvents.push(e); - } - } - function liveProcessingStarted() { - liveProcessing = true; - var events = [createRandomEvent()]; - self.conn.appendToStream(testStreamName, client.expectedVersion.any, events); - } - function subscriptionDropped(connection, reason, error) { - test.ok(liveEvents.length === 1, "Expecting 1 live event, got " + liveEvents.length); - test.ok(catchUpEvents.length > 1, "Expecting at least 1 catchUp event, got " + catchUpEvents.length); - test.done(error); - } - var subscription = this.conn.subscribeToAllFrom(null, false, eventAppeared, liveProcessingStarted, subscriptionDropped, userCredentialsForAll); - },*/ - /*, - 'Test Delete Stream': function(test) { - this.conn.deleteStream(testStreamName, client.expectedVersion.any) - .then(function(result) { - test.ok(result, "No result."); - test.done(); - }) - .catch(function(err) { - test.done(err); - }); - }*/ -}; \ No newline at end of file diff --git a/test/common/base_test.js b/test/common/base_test.js index 5f5f276..319ef37 100644 --- a/test/common/base_test.js +++ b/test/common/base_test.js @@ -12,12 +12,11 @@ if (process.env.TESTS_VERBOSE_LOGGING === '1') { settings.log = new FileLogger('test-verbose.log'); } -var tcpEndPoint = {host: 'localhost', port: 1113}; +var tcpEndPoint = {hostname: 'localhost', port: 1112}; function setUp(cb) { this.log = settings.log; this.testStreamName = 'test-' + uuid.v4(); - this.log.info('A', this.testStreamName); var connected = false; this.conn = client.EventStoreConnection.create(settings, tcpEndPoint); this.conn.connect() diff --git a/test/connection_test.js b/test/connection_test.js index bc4bba7..b237aea 100644 --- a/test/connection_test.js +++ b/test/connection_test.js @@ -1,10 +1,11 @@ var client = require('../src/client.js'); +var GossipSeed = require('../src/gossipSeed'); var testBase = require('./common/base_test'); module.exports = { 'Connect To Endpoint Happy Path': function(test) { - var tcpEndpoint = {hostname: 'localhost', port: 1113}; + var tcpEndpoint = {hostname: 'localhost', port: 1112}; var conn = client.EventStoreConnection.create(testBase.settings(), tcpEndpoint); conn.connect() .catch(function(err) { @@ -23,7 +24,7 @@ module.exports = { } }, 'Connect To Endpoint That Doesn\'t Exist': function(test) { - var tcpEndpoint = {hostname: 'localhost', port: 1114}; + var tcpEndpoint = {hostname: 'localhost', port: 11112}; var conn = client.EventStoreConnection.create(testBase.settings({maxReconnections:1}), tcpEndpoint); conn.connect() .catch(function (err) { @@ -40,6 +41,30 @@ module.exports = { test.ok(reason.indexOf("Reconnection limit reached") === 0, "Wrong expected reason."); test.done(); }); + }, + 'Connect to Cluster using gossip seeds': function (test) { + test.expect(1); + var gossipSeeds = [ + new GossipSeed({hostname: 'localhost', port: 1113}), + new GossipSeed({hostname: 'localhost', port: 2113}), + new GossipSeed({hostname: 'localhost', port: 3113}) + ]; + var conn = client.EventStoreConnection.create(testBase.settings(), gossipSeeds); + conn.connect() + .catch(function(err) { + test.done(err); + }); + conn.on('connected', function(endPoint){ + test.ok(endPoint, "no endpoint"); + done(); + }); + conn.on('error', done); + + function done(err) { + conn.close(); + if (err) return test.done(err); + test.done(); + } } };