Merge pull request #101 from PrestaShopCorp/feature/cluster

Rework on cluster discovering
This commit is contained in:
Nicolas Dextraze 2020-09-28 17:15:19 -07:00 committed by GitHub
commit 853ae875b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 6494 additions and 266 deletions

View File

@ -112,12 +112,27 @@ To generate a test event, open a separate console and run:
## Running the tests ## Running the tests
### Local testing
To run the tests it is recommended that you use an in-memory instance of the eventstore so you don't pollute your dev instance. To run the tests it is recommended that you use an in-memory instance of the eventstore so you don't pollute your dev instance.
EventStore.ClusterNode.exe --run-projections=all --memdb certificate-file=yourcert.pfx EventStore.ClusterNode.exe --run-projections=all --memdb certificate-file=yourcert.pfx
or or
./run-node.sh --run-projections=all --memdb certificate-file=yourcert.p12 ./run-node.sh --run-projections=all --memdb certificate-file=yourcert.p12
You can also use docker-compose :
```bash
# start the single node cluster
npm run compose:single:start
# if you want to wait for the cluster to be available
npm run compose:wait
# run the tests
npm run test
# to cleanup (stop containres, delete volumes)
npm run compose:single:stop
```
For SSL setup see: For SSL setup see:
https://eventstore.org/docs/server/setting_up_ssl/ https://eventstore.org/docs/server/setting_up_ssl/
@ -128,6 +143,29 @@ To execute the tests suites simply run
npm test npm test
### Isolated environment
To be able to run the tests for different connection types (tcp, gossip, cluster) docker-compose files are available to setup the environment and run the tests.
#### Prerequisites
* docker
* docker-compose
#### Run
To execute the tests suites for single node cluster (tcp connection) simply run
npm run test:single
To execute the tests suites for multiple nodes cluster (gossip connection) simply run
npm run test:gossip
To execute the tests suites for multiple nodes cluster (dns discovery connection) simply run
npm run test:cluster
## Porting .Net Task to Node.js ## Porting .Net Task to Node.js
Any async commands returns a [Promise](https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/Promise) object in replacement of .Net Task. Any async commands returns a [Promise](https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/Promise) object in replacement of .Net Task.

147
docker-compose-cluster.yaml Normal file
View File

@ -0,0 +1,147 @@
version: '3.4'
services:
eventstore.db:
image: eventstore/eventstore:release-5.0.8
environment:
- EVENTSTORE_WORKER_THREADS=5
- EVENTSTORE_CLUSTER_SIZE=3
- EVENTSTORE_RUN_PROJECTIONS=All
- EVENTSTORE_DB=/var/lib/eventstore-data
- EVENTSTORE_INDEX=/var/lib/eventstore-index
- EVENTSTORE_LOG=/var/log/eventstore
- EVENTSTORE_CLUSTER_GOSSIP_PORT=2112
- EVENTSTORE_INT_TCP_PORT=1112
- EVENTSTORE_EXT_TCP_PORT=1113
- EVENTSTORE_INT_HTTP_PORT=2112
- EVENTSTORE_EXT_HTTP_PORT=2113
- EVENTSTORE_DISCOVER_VIA_DNS=true
- EVENTSTORE_CLUSTER_DNS=eventstore.local
- EVENTSTORE_INT_IP=192.168.33.10
- EVENTSTORE_EXT_IP=192.168.33.10
ports:
- "1112:1112"
- "1113:1113"
- "2112:2112"
- "2113:2113"
networks:
app_net:
aliases:
- eventstore.local
ipv4_address: 192.168.33.10
volumes:
- type: volume
source: eventstore-volume-data
target: /var/lib/eventstore-data
- type: volume
source: eventstore-volume-index
target: /var/lib/eventstore-index
- type: volume
source: eventstore-volume-logs
target: /var/log/eventstore
eventstore.db2:
image: eventstore/eventstore:release-5.0.8
environment:
- EVENTSTORE_WORKER_THREADS=5
- EVENTSTORE_CLUSTER_SIZE=3
- EVENTSTORE_RUN_PROJECTIONS=All
- EVENTSTORE_DB=/var/lib/eventstore-data
- EVENTSTORE_INDEX=/var/lib/eventstore-index
- EVENTSTORE_LOG=/var/log/eventstore
- EVENTSTORE_CLUSTER_GOSSIP_PORT=2112
- EVENTSTORE_INT_TCP_PORT=1112
- EVENTSTORE_EXT_TCP_PORT=1113
- EVENTSTORE_INT_HTTP_PORT=2112
- EVENTSTORE_EXT_HTTP_PORT=2113
- EVENTSTORE_DISCOVER_VIA_DNS=true
- EVENTSTORE_CLUSTER_DNS=eventstore.local
- EVENTSTORE_INT_IP=192.168.33.11
- EVENTSTORE_EXT_IP=192.168.33.11
expose:
- "1113"
- "1112"
- "2112"
- "2113"
networks:
app_net:
aliases:
- eventstore.local
ipv4_address: 192.168.33.11
volumes:
- type: volume
source: eventstore-volume-data2
target: /var/lib/eventstore-data
- type: volume
source: eventstore-volume-index2
target: /var/lib/eventstore-index
- type: volume
source: eventstore-volume-logs2
target: /var/log/eventstore
eventstore.db3:
image: eventstore/eventstore:release-5.0.8
environment:
- EVENTSTORE_WORKER_THREADS=5
- EVENTSTORE_CLUSTER_SIZE=3
- EVENTSTORE_RUN_PROJECTIONS=All
- EVENTSTORE_DB=/var/lib/eventstore-data
- EVENTSTORE_INDEX=/var/lib/eventstore-index
- EVENTSTORE_LOG=/var/log/eventstore
- EVENTSTORE_CLUSTER_GOSSIP_PORT=2112
- EVENTSTORE_INT_TCP_PORT=1112
- EVENTSTORE_EXT_TCP_PORT=1113
- EVENTSTORE_INT_HTTP_PORT=2112
- EVENTSTORE_EXT_HTTP_PORT=2113
- EVENTSTORE_DISCOVER_VIA_DNS=true
- EVENTSTORE_CLUSTER_DNS=eventstore.local
- EVENTSTORE_INT_IP=192.168.33.12
- EVENTSTORE_EXT_IP=192.168.33.12
expose:
- "1113"
- "1112"
- "2112"
- "2113"
networks:
app_net:
aliases:
- eventstore.local
ipv4_address: 192.168.33.12
volumes:
- type: volume
source: eventstore-volume-data3
target: /var/lib/eventstore-data
- type: volume
source: eventstore-volume-index3
target: /var/lib/eventstore-index
- type: volume
source: eventstore-volume-logs3
target: /var/log/eventstore
nodejs:
image: node:14
working_dir: /var/code
volumes:
- .:/var/code
- /var/code/node_modules
command: bash -c "tail -f /dev/null"
networks:
- app_net
volumes:
eventstore-volume-data:
eventstore-volume-index:
eventstore-volume-logs:
eventstore-volume-data2:
eventstore-volume-index2:
eventstore-volume-logs2:
eventstore-volume-data3:
eventstore-volume-index3:
eventstore-volume-logs3:
networks:
app_net:
ipam:
driver: default
config:
- subnet: "192.168.33.0/24"

View File

@ -0,0 +1,54 @@
version: '3.4'
services:
eventstore.db:
image: eventstore/eventstore:release-5.0.8
environment:
- EVENTSTORE_CLUSTER_SIZE=1
- EVENTSTORE_RUN_PROJECTIONS=All
- EVENTSTORE_START_STANDARD_PROJECTIONS=True
- EVENTSTORE_DB=/var/lib/eventstore-data
- EVENTSTORE_INDEX=/var/lib/eventstore-index
- EVENTSTORE_LOG=/var/log/eventstore
- EVENTSTORE_EXT_TCP_PORT=1113
- EVENTSTORE_EXT_HTTP_PORT=2113
- EVENTSTORE_INT_IP=192.168.33.10
- EVENTSTORE_EXT_IP=192.168.33.10
ports:
- "1113:1113"
- "2113:2113"
volumes:
- type: volume
source: eventstore-volume-data
target: /var/lib/eventstore-data
- type: volume
source: eventstore-volume-index
target: /var/lib/eventstore-index
- type: volume
source: eventstore-volume-logs
target: /var/log/eventstore
networks:
app_net:
ipv4_address: 192.168.33.10
nodejs:
image: node:14
working_dir: /var/code
volumes:
- .:/var/code
- /var/code/node_modules
command: bash -c "tail -f /dev/null"
networks:
- app_net
volumes:
eventstore-volume-data:
eventstore-volume-index:
eventstore-volume-logs:
networks:
app_net:
ipam:
driver: default
config:
- subnet: "192.168.33.0/24"

5
index.d.ts vendored
View File

@ -1,8 +1,8 @@
/// <reference types="node" /> /// <reference types="node" />
/// <reference types="Long" /> /// <reference types="Long" />
import { EventEmitter } from "events"; import { EventEmitter } from 'events';
import { StrictEventEmitter } from "strict-event-emitter-types"; import { StrictEventEmitter } from 'strict-event-emitter-types';
// Expose classes // Expose classes
export class Position { export class Position {
@ -380,6 +380,7 @@ export interface ConnectionSettings {
// Cluster Settings // Cluster Settings
clusterDns?: string, clusterDns?: string,
maxDiscoverAttempts?: number, maxDiscoverAttempts?: number,
discoverDelay?: number,
externalGossipPort?: number, externalGossipPort?: number,
gossipTimeout?: number gossipTimeout?: number
} }

4743
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,18 @@
"pretest": "npm run build", "pretest": "npm run build",
"test": "nodeunit", "test": "nodeunit",
"test-debug": "TESTS_VERBOSE_LOGGING=1 nodeunit", "test-debug": "TESTS_VERBOSE_LOGGING=1 nodeunit",
"test:jest:watch": "jest --watch --coverage",
"test:single": "npm run compose:single:start && npm run compose:wait && npm run compose:single:test ; npm run compose:single:stop",
"test:gossip": "npm run compose:cluster:start && npm run compose:wait && npm run compose:gossip:test ; npm run compose:cluster:stop",
"test:cluster": "npm run compose:cluster:start && npm run compose:wait && npm run compose:cluster:test ; npm run compose:cluster:stop",
"compose:single:start": "docker-compose -f docker-compose-single.yaml up --build -d",
"compose:cluster:start": "docker-compose -f docker-compose-cluster.yaml up --build -d",
"compose:single:stop": "docker-compose -f docker-compose-single.yaml down -v --remove-orphans",
"compose:cluster:stop": "docker-compose -f docker-compose-cluster.yaml down -v --remove-orphans",
"compose:wait": "while [[ \"$(curl -s -o /dev/null -w ''%{http_code}'' localhost:2113/ping)\" != \"200\" ]]; do sleep 5; done",
"compose:single:test": "docker-compose -f docker-compose-single.yaml exec nodejs bash -c \"npm i && npm run build && EVENTSTORE_CONNECTION_TYPE=tcp EVENTSTORE_HOST=192.168.33.10 npm run test-debug\"",
"compose:cluster:test": "docker-compose -f docker-compose-cluster.yaml exec nodejs bash -c \"npm i && npm run build && EVENTSTORE_CONNECTION_TYPE=gossip EVENTSTORE_HOST=192.168.33.10 EVENTSTORE_HOST_1=192.168.33.10 EVENTSTORE_HOST_2=192.168.33.11 EVENTSTORE_HOST_3=192.168.33.12 npm run test-debug\"",
"compose:gossip:test": "docker-compose -f docker-compose-cluster.yaml exec nodejs bash -c \"npm i && npm run build && EVENTSTORE_CONNECTION_TYPE=dns EVENTSTORE_HOST=192.168.33.10 npm run test-debug\"",
"prepublishOnly": "npm run build && npm run gendocs", "prepublishOnly": "npm run build && npm run gendocs",
"gendocs": "rm -rf docs && jsdoc src -r -d docs" "gendocs": "rm -rf docs && jsdoc src -r -d docs"
}, },
@ -54,6 +66,7 @@
"uuid": "^3.0.1" "uuid": "^3.0.1"
}, },
"devDependencies": { "devDependencies": {
"jest": "^26.4.2",
"jsdoc": "^3.6.3", "jsdoc": "^3.6.3",
"nodeunit": "^0.11.3", "nodeunit": "^0.11.3",
"webpack": "^4.41.2", "webpack": "^4.41.2",

View File

@ -0,0 +1,25 @@
function rndNext(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min)) + min;
}
function shuffle (arr, from, to) {
if (!to) {
to = arr.length - 1;
}
if (!from) {
from = 0;
}
const newArr = [...arr];
if (from >= to) return;
for (var current = from; current <= to; ++current) {
var index = rndNext(current, to + 1);
var tmp = newArr[index];
newArr[index] = newArr[current];
newArr[current] = tmp;
}
return newArr;
};
module.exports = shuffle;

View File

@ -0,0 +1,163 @@
const ClusterInfo = require('./clusterInfo');
const GossipSeed = require('../../gossipSeed');
const NodeEndPoints = require('./nodeEndpoints');
const shuffle = require('../../common/utils/shuffle');
function wait(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* ClusterDiscoverer
* @constructor
* @class
* @param {Logger} log - Logger instance
* @param {Object} settings - Settings object
* @param {Object} dnsService - DNS service to perform DNS lookup
* @param {Object} httpService - HTTP service to perform http requests
*/
function ClusterDiscoverer(log, settings, dnsService, httpService) {
if (!settings.clusterDns && (!settings.seeds || settings.seeds.length === 0))
throw new Error('Both clusterDns and seeds are null/empty.');
this._log = log;
this._settings = settings;
this._dnsService = dnsService;
this._httpService = httpService;
}
/**
* Discover Cluster endpoints
* @param {Object} failedTcpEndPoint - The failed TCP endpoint which were used by the handler
* @returns {Promise.<NodeEndPoints>}
*/
ClusterDiscoverer.prototype.discover = async function (failedTcpEndPoint) {
let attempts = 0;
while (attempts++ < this._settings.maxDiscoverAttempts) {
try {
const candidates = await this._getGossipCandidates(this._settings.managerExternalHttpPort);
const gossipSeeds = candidates.filter(
(candidate) =>
!failedTcpEndPoint ||
!(candidate.endPoint.host === failedTcpEndPoint.host && candidate.endPoint.port === failedTcpEndPoint.port)
);
let gossipSeedsIndex = 0;
let clusterInfo;
do {
try {
clusterInfo = await this._clusterInfo(gossipSeeds[gossipSeedsIndex], this._settings.gossipTimeout);
if (!clusterInfo.bestNode) {
this._log.info(
`Discovering attempt ${attempts}/${this._settings.maxDiscoverAttempts} failed: no candidate found.`
);
continue;
}
} catch (err) {}
} while (++gossipSeedsIndex < gossipSeeds.length);
if (clusterInfo) {
return NodeEndPoints.createFromGossipMember(clusterInfo.bestNode);
}
} catch (err) {
this._log.info(
`Discovering attempt ${attempts}/${this._settings.maxDiscoverAttempts} failed with error: ${err}.\n${err.stack}`
);
}
await wait(this._settings.discoverDelay);
}
throw new Error(`Failed to discover candidate in ${this._settings.maxDiscoverAttempts} attempts.`);
};
/**
* Get gossip candidates either from DNS or from gossipSeeds settings
* @private
* @param {Number} managerExternalHttpPort - Http port of the manager (or the http port of the node for OSS clusters)
* @returns {Promise.<GossipSeed[]>}
*/
ClusterDiscoverer.prototype._getGossipCandidates = async function (managerExternalHttpPort) {
const gossipSeeds =
this._settings.seeds && this._settings.seeds.length > 0
? this._settings.seeds
: (await this._resolveDns(this._settings.clusterDns)).map(
(address) => new GossipSeed({ host: address, port: managerExternalHttpPort }, undefined)
);
return shuffle(gossipSeeds);
};
/**
* Resolve the cluster DNS discovery address to retrieve belonging ip addresses
* @private
* @param {String} clusterDns - Cluster DNS discovery address
* @returns {Promise.<String[]>}
*/
ClusterDiscoverer.prototype._resolveDns = async function (clusterDns) {
const dnsOptions = {
family: 4,
hints: this._dnsService.ADDRCONFIG | this._dnsService.V4MAPPED,
all: true,
};
const result = await this._dnsService.lookup(clusterDns, dnsOptions);
if (!result || result.length === 0) {
throw new Error(`No result from dns lookup for ${clusterDns}`);
}
return result.map((address) => address.address);
};
/**
* Get cluster informations (gossip members)
* @param {GossipSeed} candidate - candidate to get informations from
* @param {Number} timeout - timeout for the http request
* @returns {Promise.<ClusterInfo>}
*/
ClusterDiscoverer.prototype._clusterInfo = async function (candidate, timeout) {
return new Promise((resolve, reject) => {
const options = {
host: candidate.endPoint.host,
port: candidate.endPoint.port,
path: '/gossip?format=json',
timeout: timeout,
};
if (candidate.hostHeader) {
options.headers = {
Host: candidate.hostHeader,
};
}
const request = this._httpService.request(options, (res) => {
if (res.statusCode !== 200) {
this._log.info('Trying to get gossip from', candidate, 'failed with status code:', res.statusCode);
reject(new Error(`Gossip candidate returns a ${res.statusCode} error`));
return;
}
let result = '';
res.on('data', (chunk) => {
result += chunk.toString();
});
res.on('end', function () {
try {
result = JSON.parse(result);
} catch (e) {
reject(new Error('Unable to parse gossip response'));
}
resolve(new ClusterInfo(result.members));
});
});
request.setTimeout(timeout);
request.on('timeout', () => {
this._log.info('Trying to get gossip from', candidate, 'timed out.');
request.destroy();
reject(new Error('Connection to gossip timed out'));
});
request.on('error', (error) => {
this._log.info('Trying to get gossip from', candidate, 'errored', error);
request.destroy();
reject(new Error('Connection to gossip errored'));
});
request.end();
});
};
module.exports = ClusterDiscoverer;

View File

@ -0,0 +1,36 @@
const MemberInfo = require('./memberInfo.js');
const VNodeStates = Object.freeze({
Initializing: 0,
Unknown: 1,
PreReplica: 2,
CatchingUp: 3,
Clone: 4,
Slave: 5,
PreMaster: 6,
Master: 7,
Manager: 8,
ShuttingDown: 9,
Shutdown: 10
});
function ClusterInfo(members) {
this._members = members.map(member => new MemberInfo(member));
Object.defineProperty(this, 'bestNode', {
enumerable: true,
get: function () {
return this._getBestNode();
}
});
}
ClusterInfo.prototype._getBestNode = function () {
return this._members
.filter(member => member.isAlive && member.isAllowedToConnect)
.sort(function (a, b) {
return VNodeStates[b.state] - VNodeStates[a.state];
})[0];
}
module.exports = ClusterInfo;

View File

@ -0,0 +1,73 @@
const NOT_ALLOWED_STATES = [
'Manager',
'ShuttingDown',
'Shutdown'
];
function MemberInfo(informations) {
this._instanceId = informations.instanceId;
this._timeStamp = informations.timeStamp;
this._state = informations.state;
this._isAlive = informations.isAlive;
this._internalTcpIp = informations.internalTcpIp;
this._internalTcpPort = informations.internalTcpPort;
this._internalSecureTcpPort = informations.internalSecureTcpPort;
this._externalTcpIp = informations.externalTcpIp;
this._externalTcpPort = informations.externalTcpPort;
this._externalSecureTcpPort = informations.externalSecureTcpPort;
this._internalHttpIp = informations.internalHttpIp;
this._internalHttpPort = informations.internalHttpPort;
this._externalHttpIp = informations.externalHttpIp;
this._externalHttpPort = informations.externalHttpPort;
this._lastCommitPosition = informations.lastCommitPosition;
this._writerCheckpoint = informations.writerCheckpoint;
this._chaserCheckpoint = informations.chaserCheckpoint;
this._epochPosition = informations.epochPosition;
this._epochNumber = informations.epochNumber;
this._epochId = informations.epochId;
this._nodePriority = informations.nodePriority;
Object.defineProperty(this, 'state', {
enumerable: true,
get: function () {
return this._state;
}
});
Object.defineProperty(this, 'isAllowedToConnect', {
enumerable: true,
get: function () {
return !NOT_ALLOWED_STATES.includes(this._state);
}
});
Object.defineProperty(this, 'isAlive', {
enumerable: true,
get: function () {
return this._isAlive;
}
});
Object.defineProperty(this, 'externalTcpIp', {
enumerable: true,
get: function () {
return this._externalTcpIp;
}
});
Object.defineProperty(this, 'externalTcpPort', {
enumerable: true,
get: function () {
return this._externalTcpPort;
}
});
Object.defineProperty(this, 'externalSecureTcpPort', {
enumerable: true,
get: function () {
return this._externalSecureTcpPort;
}
});
}
module.exports = MemberInfo;

View File

@ -0,0 +1,23 @@
function NodeEndPoints(tcpEndPoint, secureTcpEndPoint) {
if (tcpEndPoint === null && secureTcpEndPoint === null) throw new Error('Both endpoints are null.');
Object.defineProperties(this, {
tcpEndPoint: {
enumerable: true,
value: tcpEndPoint
},
secureTcpEndPoint: {
enumerable: true,
value: secureTcpEndPoint
}
});
}
NodeEndPoints.createFromGossipMember = function (member) {
const normTcp = { host: member.externalTcpIp, port: member.externalTcpPort };
const secTcp = member.externalSecureTcpPort > 0
? { host: member.externalTcpIp, port: member.externalSecureTcpPort }
: null;
return new NodeEndPoints(normTcp, secTcp);
}
module.exports = NodeEndPoints

View File

@ -1,9 +1,19 @@
var EventStoreNodeConnection = require('./eventStoreNodeConnection'); var EventStoreNodeConnection = require('./eventStoreNodeConnection');
var StaticEndpointDiscoverer = require('./core/staticEndpointDiscoverer'); var StaticEndpointDiscoverer = require('./core/staticEndpointDiscoverer');
var ClusterDnsEndPointDiscoverer = require('./core/clusterDnsEndPointDiscoverer'); var ClusterDiscoverer = require('./core/cluster/clusterDiscoverer');
var NoopLogger = require('./common/log/noopLogger'); var NoopLogger = require('./common/log/noopLogger');
var ensure = require('./common/utils/ensure'); var ensure = require('./common/utils/ensure');
const util = require('util');
const http = require('http');
const dns = require('dns');
const dnsService = {
lookup : util.promisify(dns.lookup),
ADDRCONFIG: dns.ADDRCONFIG,
V4MAPPED: dns.V4MAPPED
};
var defaultConnectionSettings = Object.freeze({ var defaultConnectionSettings = Object.freeze({
log: new NoopLogger(), log: new NoopLogger(),
verboseLogging: false, verboseLogging: false,
@ -32,6 +42,7 @@ var defaultConnectionSettings = Object.freeze({
// Cluster Settings // Cluster Settings
clusterDns: '', clusterDns: '',
maxDiscoverAttempts: 10, maxDiscoverAttempts: 10,
discoverDelay: 500,
externalGossipPort: 0, externalGossipPort: 0,
gossipTimeout: 1000 gossipTimeout: 1000
}); });
@ -80,17 +91,17 @@ function createFromClusterDns(connectionSettings, clusterDns, externalGossipPort
var mergedSettings = merge(defaultConnectionSettings, connectionSettings || {}); var mergedSettings = merge(defaultConnectionSettings, connectionSettings || {});
var clusterSettings = { var clusterSettings = {
clusterDns: clusterDns, clusterDns: clusterDns,
gossipSeeds: null, seeds: null,
externalGossipPort: externalGossipPort, managerExternalHttpPort: externalGossipPort,
maxDiscoverAttempts: mergedSettings.maxDiscoverAttempts, maxDiscoverAttempts: mergedSettings.maxDiscoverAttempts,
discoverDelay: mergedSettings.discoverDelay,
gossipTimeout: mergedSettings.gossipTimeout gossipTimeout: mergedSettings.gossipTimeout
}; };
var endPointDiscoverer = new ClusterDnsEndPointDiscoverer(mergedSettings.log, var endPointDiscoverer = new ClusterDiscoverer(
clusterSettings.clusterDns, mergedSettings.log,
clusterSettings.maxDiscoverAttempts, clusterSettings,
clusterSettings.externalGossipPort, dnsService,
clusterSettings.gossipSeeds, http
clusterSettings.gossipTimeout
); );
return new EventStoreNodeConnection(mergedSettings, clusterSettings, endPointDiscoverer, connectionName); return new EventStoreNodeConnection(mergedSettings, clusterSettings, endPointDiscoverer, connectionName);
} }
@ -101,17 +112,17 @@ function createFromGossipSeeds(connectionSettings, gossipSeeds, connectionName)
var mergedSettings = merge(defaultConnectionSettings, connectionSettings || {}); var mergedSettings = merge(defaultConnectionSettings, connectionSettings || {});
var clusterSettings = { var clusterSettings = {
clusterDns: '', clusterDns: '',
gossipSeeds: gossipSeeds, seeds: gossipSeeds,
externalGossipPort: 0, externalGossipPort: 0,
maxDiscoverAttempts: mergedSettings.maxDiscoverAttempts, maxDiscoverAttempts: mergedSettings.maxDiscoverAttempts,
discoverDelay: mergedSettings.discoverDelay,
gossipTimeout: mergedSettings.gossipTimeout gossipTimeout: mergedSettings.gossipTimeout
}; };
var endPointDiscoverer = new ClusterDnsEndPointDiscoverer(mergedSettings.log, var endPointDiscoverer = new ClusterDiscoverer(
clusterSettings.clusterDns, mergedSettings.log,
clusterSettings.maxDiscoverAttempts, clusterSettings,
clusterSettings.externalGossipPort, dnsService,
clusterSettings.gossipSeeds, http
clusterSettings.gossipTimeout
); );
return new EventStoreNodeConnection(mergedSettings, clusterSettings, endPointDiscoverer, connectionName); return new EventStoreNodeConnection(mergedSettings, clusterSettings, endPointDiscoverer, connectionName);
} }

View File

@ -1,7 +1,8 @@
function GossipSeed(endPoint, hostName) { function GossipSeed(endPoint, hostName, hostHeader) {
if (typeof endPoint !== 'object' || !endPoint.host || !endPoint.port) throw new TypeError('endPoint must be have host and port properties.'); if (typeof endPoint !== 'object' || !endPoint.host || !endPoint.port) throw new TypeError('endPoint must be have host and port properties.');
this.endPoint = endPoint; this.endPoint = endPoint;
this.hostName = hostName; this.hostName = hostName;
this.hostHeader = hostHeader;
Object.freeze(this); Object.freeze(this);
} }

View File

@ -153,10 +153,21 @@ TcpConnection.createConnectingConnection = function(
connection._initSocket(socket); connection._initSocket(socket);
if (onConnectionEstablished) onConnectionEstablished(connection); if (onConnectionEstablished) onConnectionEstablished(connection);
}); });
var timer = setTimeout(function(){
log.error('TcpConnection: timeout when connecting to %j in %d ms', remoteEndPoint, connectionTimeout);
connection.close();
if (onConnectionFailed) onConnectionFailed(connection, new Error('Connection failed'));
}, connectionTimeout)
socket.once('error', onError); socket.once('error', onError);
function onError(err) { function onError(err) {
clearTimeout(timer);
if (onConnectionFailed) onConnectionFailed(connection, err); if (onConnectionFailed) onConnectionFailed(connection, err);
} }
socket.once('connect', onConnect);
function onConnect() {
log.info('TcpConnection: successfully connected to %j', remoteEndPoint);
clearTimeout(timer);
}
return connection; return connection;
}; };

View File

@ -11,38 +11,102 @@ protobufJS.util.Long = undefined;
protobufJS.configure(); protobufJS.configure();
var settings = { var settings = {
log: new NoopLogger() log: new NoopLogger(),
}; };
if (process.env.TESTS_VERBOSE_LOGGING === '1') { if (process.env.TESTS_VERBOSE_LOGGING === '1') {
settings.verboseLogging = true; settings.verboseLogging = true;
settings.log = new FileLogger('test-verbose.log'); settings.log = new FileLogger('test-verbose.log');
} }
var tcpEndPoint = {host: 'localhost', port: 1113}; function setUpWithGossipSeeds(cb) {
var gossipSeeds = [
function setUp(cb) { new client.GossipSeed({ host: process.env.EVENTSTORE_HOST_1 || '192.168.33.10', port: 2113 }),
new client.GossipSeed({ host: process.env.EVENTSTORE_HOST_2 || '192.168.33.11', port: 2113 }),
new client.GossipSeed({ host: process.env.EVENTSTORE_HOST_3 || '192.168.33.12', port: 2113 }),
];
this.log = settings.log; this.log = settings.log;
this.testStreamName = 'test-' + uuid.v4(); this.testStreamName = 'test-' + uuid.v4();
var connected = false; var connected = false;
this.conn = client.EventStoreConnection.create(settings, tcpEndPoint); this.conn = client.createConnection(settings, gossipSeeds);
this.conn.connect() this.conn
.connect()
.then(function () { .then(function () {
//Doesn't mean anything, connection is just initiated //Doesn't mean anything, connection is just initiated
settings.log.debug("Connection to %j initialized.", tcpEndPoint); settings.log.debug('Connection to %j initialized.', gossipSeeds);
}) })
.catch(function (err) { .catch(function (err) {
settings.log.error(err, "Initializing connection to %j failed.", tcpEndPoint); settings.log.error(err, 'Initializing connection to %j failed.', gossipSeeds);
cb(err); cb(err);
}); });
this.conn.on('closed', function (reason) { this.conn.on('closed', function (reason) {
if (connected) return; if (connected) return;
var error = new Error("Connection failed: " + reason); var error = new Error('Connection failed: ' + reason);
settings.log.error(error, "Connection to %j failed.", tcpEndPoint); settings.log.error(error, 'Connection to %j failed.', gossipSeeds);
cb(error); cb(error);
}); });
this.conn.on('connected', function (tcpEndPoint) { this.conn.on('connected', function (tcpEndPoint) {
if (connected) return; if (connected) return;
settings.log.debug("Connected to %j.", tcpEndPoint); settings.log.debug('Connected to %j.', tcpEndPoint);
connected = true;
cb();
});
}
function setUpWithDns(cb) {
var clusterDns = 'discover://eventstore.local:2113';
this.log = settings.log;
this.testStreamName = 'test-' + uuid.v4();
var connected = false;
this.conn = client.createConnection(settings, clusterDns);
this.conn
.connect()
.then(function () {
//Doesn't mean anything, connection is just initiated
settings.log.debug('Connection to %j initialized.', clusterDns);
})
.catch(function (err) {
settings.log.error(err, 'Initializing connection to %j failed.', clusterDns);
cb(err);
});
this.conn.on('closed', function (reason) {
if (connected) return;
var error = new Error('Connection failed: ' + reason);
settings.log.error(error, 'Connection to %j failed.', clusterDns);
cb(error);
});
this.conn.on('connected', function (tcpEndPoint) {
if (connected) return;
settings.log.debug('Connected to %j.', tcpEndPoint);
connected = true;
cb();
});
}
function setUpWithTcpEndpoint(cb) {
var tcpEndPoint = { host: process.env.EVENTSTORE_HOST || 'localhost', port: 1113 };
this.log = settings.log;
this.testStreamName = 'test-' + uuid.v4();
var connected = false;
this.conn = client.EventStoreConnection.create(settings, tcpEndPoint);
this.conn
.connect()
.then(function () {
//Doesn't mean anything, connection is just initiated
settings.log.debug('Connection to %j initialized.', tcpEndPoint);
})
.catch(function (err) {
settings.log.error(err, 'Initializing connection to %j failed.', tcpEndPoint);
cb(err);
});
this.conn.on('closed', function (reason) {
if (connected) return;
var error = new Error('Connection failed: ' + reason);
settings.log.error(error, 'Connection to %j failed.', tcpEndPoint);
cb(error);
});
this.conn.on('connected', function (tcpEndPoint) {
if (connected) return;
settings.log.debug('Connected to %j.', tcpEndPoint);
connected = true; connected = true;
cb(); cb();
}); });
@ -58,9 +122,8 @@ function tearDown(cb) {
function areEqual(name, actual, expected) { function areEqual(name, actual, expected) {
if (typeof expected !== 'object' || expected === null) if (typeof expected !== 'object' || expected === null)
this.strictEqual(actual, expected, util.format("Failed %s === %s, got %s.", name, expected, actual)); this.strictEqual(actual, expected, util.format('Failed %s === %s, got %s.', name, expected, actual));
else else this.deepEqual(actual, expected, util.format('Failed %s deepEqual %j, got %j.', name, expected, actual));
this.deepEqual(actual, expected, util.format("Failed %s deepEqual %j, got %j.", name, expected, actual));
} }
function fail(reason) { function fail(reason) {
@ -69,51 +132,71 @@ function fail(reason) {
function eventEqualEventData(name, resolvedEvent, eventData) { function eventEqualEventData(name, resolvedEvent, eventData) {
var ev = resolvedEvent.originalEvent; var ev = resolvedEvent.originalEvent;
this.ok(ev !== null, util.format("Failed %s !== null.", name + ".originalEvent")); this.ok(ev !== null, util.format('Failed %s !== null.', name + '.originalEvent'));
if (ev === null) return; if (ev === null) return;
this.areEqual(name + ".originalEvent.eventId", ev.eventId, eventData.eventId); this.areEqual(name + '.originalEvent.eventId', ev.eventId, eventData.eventId);
this.areEqual(name + ".originalEvent.eventType", ev.eventType, eventData.type); this.areEqual(name + '.originalEvent.eventType', ev.eventType, eventData.type);
this.ok(Buffer.compare(ev.data, eventData.data) === 0, name + ".originalEvent.data is not equal to original data."); this.ok(Buffer.compare(ev.data, eventData.data) === 0, name + '.originalEvent.data is not equal to original data.');
this.ok(Buffer.compare(ev.metadata, eventData.metadata) === 0, name + ".originalEvent.metadata is not equal to original metadata."); this.ok(
Buffer.compare(ev.metadata, eventData.metadata) === 0,
name + '.originalEvent.metadata is not equal to original metadata.'
);
} }
function testRecordedEvent(name, event) { function testRecordedEvent(name, event) {
this.ok(Long.isLong(event.eventNumber), name + ".eventNumber is not a Long"); this.ok(Long.isLong(event.eventNumber), name + '.eventNumber is not a Long');
this.ok(event.created instanceof Date, name + ".created is not a Date"); this.ok(event.created instanceof Date, name + '.created is not a Date');
this.ok(typeof event.createdEpoch === 'number', name + ".createdEpoch is not a number"); this.ok(typeof event.createdEpoch === 'number', name + '.createdEpoch is not a number');
} }
function testLiveEvent(name, event, evNumber) { function testLiveEvent(name, event, evNumber) {
this.ok(event.event, name + ".event not defined (or null)"); this.ok(event.event, name + '.event not defined (or null)');
this.ok(event.originalEvent, name + ".originalEvent not defined (or null)"); this.ok(event.originalEvent, name + '.originalEvent not defined (or null)');
this.ok(event.isResolved === false, name + ".isResolved should be true"); this.ok(event.isResolved === false, name + '.isResolved should be true');
this.ok(event.originalPosition instanceof client.Position, name + ".originalPosition is not an instance of Position"); this.ok(event.originalPosition instanceof client.Position, name + '.originalPosition is not an instance of Position');
this.ok(event.originalStreamId, name + ".originalStreamId not defined (or null)"); this.ok(event.originalStreamId, name + '.originalStreamId not defined (or null)');
this.ok(Long.isLong(event.originalEventNumber), name + ".originalEventNumber is not a Long"); this.ok(Long.isLong(event.originalEventNumber), name + '.originalEventNumber is not a Long');
if (typeof evNumber === 'number') { if (typeof evNumber === 'number') {
this.ok(event.originalEventNumber.toNumber() === evNumber, name + '.originalEventNumber expected ' + evNumber + ' got ' + event.originalEventNumber); this.ok(
event.originalEventNumber.toNumber() === evNumber,
name + '.originalEventNumber expected ' + evNumber + ' got ' + event.originalEventNumber
);
} }
testRecordedEvent.call(this, name + '.event', event.event); testRecordedEvent.call(this, name + '.event', event.event);
} }
function testReadEvent(name, event, evNumber) { function testReadEvent(name, event, evNumber) {
this.ok(event.event, name + ".event not defined (or null)"); this.ok(event.event, name + '.event not defined (or null)');
this.ok(event.originalEvent, name + ".originalEvent not defined (or null)"); this.ok(event.originalEvent, name + '.originalEvent not defined (or null)');
this.ok(event.isResolved === false, name + ".isResolved should be true"); this.ok(event.isResolved === false, name + '.isResolved should be true');
this.ok(event.originalPosition === null, name + ".originalPosition is not null"); this.ok(event.originalPosition === null, name + '.originalPosition is not null');
this.ok(event.originalStreamId, name + ".originalStreamId not defined (or null)"); this.ok(event.originalStreamId, name + '.originalStreamId not defined (or null)');
this.ok(Long.isLong(event.originalEventNumber), name + ".originalEventNumber is not a Long"); this.ok(Long.isLong(event.originalEventNumber), name + '.originalEventNumber is not a Long');
if (typeof evNumber === 'number') { if (typeof evNumber === 'number') {
this.ok(event.originalEventNumber.toNumber() === evNumber, name + '.originalEventNumber expected ' + evNumber + ' got ' + event.originalEventNumber); this.ok(
event.originalEventNumber.toNumber() === evNumber,
name + '.originalEventNumber expected ' + evNumber + ' got ' + event.originalEventNumber
);
} }
testRecordedEvent.call(this, name + '.event', event.event); testRecordedEvent.call(this, name + '.event', event.event);
} }
var _ = { var _ = {
'setUp': setUp, tearDown: tearDown,
'tearDown': tearDown
}; };
switch (process.env.EVENTSTORE_CONNECTION_TYPE) {
case 'gossip':
_.setUp = setUpWithGossipSeeds;
break;
case 'dns':
_.setUp = setUpWithDns;
break;
case 'tcp':
default:
_.setUp = setUpWithTcpEndpoint;
}
function wrap(name, testFunc) { function wrap(name, testFunc) {
var base = _[name]; var base = _[name];
if (base === undefined) { if (base === undefined) {
@ -125,7 +208,7 @@ function wrap(name, testFunc) {
test.testLiveEvent = testLiveEvent.bind(test); test.testLiveEvent = testLiveEvent.bind(test);
test.testReadEvent = testReadEvent.bind(test); test.testReadEvent = testReadEvent.bind(test);
return testFunc.call(this, test); return testFunc.call(this, test);
} };
} }
return function (cb) { return function (cb) {
var self = this; var self = this;
@ -133,7 +216,7 @@ function wrap(name, testFunc) {
if (err) return cb(err); if (err) return cb(err);
return testFunc.call(self, cb); return testFunc.call(self, cb);
}); });
} };
} }
module.exports.init = function (testSuite, addSetUpTearDownIfNotPresent) { module.exports.init = function (testSuite, addSetUpTearDownIfNotPresent) {
@ -145,7 +228,19 @@ module.exports.init = function(testSuite, addSetUpTearDownIfNotPresent) {
} }
} }
if (!addSetUpTearDownIfNotPresent) return; if (!addSetUpTearDownIfNotPresent) return;
if (!testSuite.hasOwnProperty('setUp')) testSuite['setUp'] = setUp.bind(thisObj); if (!testSuite.hasOwnProperty('setUp')) {
switch (process.env.EVENTSTORE_CONNECTION_TYPE) {
case 'gossip':
testSuite['setUp'] = setUpWithGossipSeeds.bind(thisObj);
break;
case 'dns':
testSuite['setUp'] = setUpWithDns.bind(thisObj);
break;
case 'tcp':
default:
testSuite['setUp'] = setUpWithTcpEndpoint.bind(thisObj);
}
}
if (!testSuite.hasOwnProperty('tearDown')) testSuite['tearDown'] = tearDown.bind(thisObj); if (!testSuite.hasOwnProperty('tearDown')) testSuite['tearDown'] = tearDown.bind(thisObj);
}; };
module.exports.settings = function (settingsOverride) { module.exports.settings = function (settingsOverride) {

View File

@ -4,59 +4,18 @@ var GossipSeed = require('../src/gossipSeed');
var testBase = require('./common/base_test'); var testBase = require('./common/base_test');
var withSsl = !!process.env.NODE_ESC_WITH_SSL; var withSsl = !!process.env.NODE_ESC_WITH_SSL;
const evenstStoreType = process.env.EVENTSTORE_CONNECTION_TYPE;
module.exports = { module.exports = {}
'Connect To Endpoint Happy Path': function (test) {
test.expect(1);
var tcpEndpoint = {host: 'localhost', port: 1113};
var conn = client.EventStoreConnection.create(testBase.settings(), tcpEndpoint);
conn.connect()
.catch(function (err) {
test.done(err);
});
conn.on('connected', function (endPoint) {
test.areEqual("connected endPoint", endPoint, tcpEndpoint);
done();
});
conn.on('error', done);
function done(err) { switch(evenstStoreType){
conn.close(); case 'gossip':
if (err) return test.done(err); module.exports['Connect to Cluster using gossip seeds'] = function (test) {
test.done();
}
},
'Connect To Endpoint That Doesn\'t Exist': function (test) {
test.expect(1);
var tcpEndpoint = {host: 'localhost', port: 11112};
var conn = client.EventStoreConnection.create(testBase.settings({maxReconnections: 1}), tcpEndpoint);
conn.connect()
.catch(function (err) {
test.done(err);
});
conn.on('connected', function () {
test.ok(false, "Should not be able to connect.");
test.done();
});
conn.on('error', function (err) {
test.done(err);
});
conn.on('closed', function (reason) {
test.ok(reason.indexOf("Reconnection limit reached") === 0, "Wrong expected reason.");
test.done();
});
},
'Create a connection with tcp://host:port string': function (test) {
var conn = client.createConnection({}, 'tcp://localhost:1113');
conn.close();
test.done();
}/*,
'Connect to Cluster using gossip seeds': function (test) {
test.expect(1); test.expect(1);
var gossipSeeds = [ var gossipSeeds = [
new GossipSeed({host: 'localhost', port: 1113}), new GossipSeed({host: process.env.EVENTSTORE_HOST_1 || '192.168.33.10', port: 2113}),
new GossipSeed({host: 'localhost', port: 2113}), new GossipSeed({host: process.env.EVENTSTORE_HOST_2 || '192.168.33.11', port: 2113}),
new GossipSeed({host: 'localhost', port: 3113}) new GossipSeed({host: process.env.EVENTSTORE_HOST_3 || '192.168.33.12', port: 2113})
]; ];
var conn = client.EventStoreConnection.create(testBase.settings(), gossipSeeds); var conn = client.EventStoreConnection.create(testBase.settings(), gossipSeeds);
conn.connect() conn.connect()
@ -64,7 +23,7 @@ module.exports = {
test.done(err); test.done(err);
}); });
conn.on('connected', function(endPoint){ conn.on('connected', function(endPoint){
test.ok(endPoint, "no endpoint"); test.ok(endPoint, 'no endpoint');
done(); done();
}); });
conn.on('error', done); conn.on('error', done);
@ -74,16 +33,132 @@ module.exports = {
if (err) return test.done(err); if (err) return test.done(err);
test.done(); test.done();
} }
}*/
}; };
module.exports['Connect To Cluster with bad gossip seeds'] = function (test) {
test.expect(3);
var gossipSeeds = [
new GossipSeed({host: '1.2.3.4', port: 1113}),
new GossipSeed({host: '2.3.4.5', port: 2113}),
new GossipSeed({host: '3.4.5.6', port: 3113})
];
var conn = client.EventStoreConnection.create(testBase.settings({maxDiscoverAttempts: 1}), gossipSeeds);
conn.connect()
.catch(function (err) {
test.ok(err.message.indexOf('Couldn\'t resolve target end point') === 0, 'Wrong expected reason.');
});
conn.on('connected', function () {
test.ok(false, 'Should not be able to connect.');
});
conn.on('error', function (err) {
test.ok(err.message.indexOf('Failed to discover candidate in 1 attempts') === 0, 'Wrong expected reason.');
});
conn.on('closed', function (reason) {
test.ok(reason.indexOf('Failed to resolve TCP end point to which to connect') === 0, 'Wrong expected reason.');
test.done();
});
};
break;
case 'dns':
module.exports['Connect to Cluster using dns discover'] = function (test) {
test.expect(1);
var clusterDns = 'discover://eventstore.local:2113';
var conn = client.EventStoreConnection.create(testBase.settings(), clusterDns);
conn.connect()
.catch(function(err) {
test.done(err);
});
conn.on('connected', function(endPoint){
test.ok(endPoint, 'no endpoint');
done();
});
conn.on('error', done);
function done(err) {
conn.close();
if (err) return test.done(err);
test.done();
}
};
module.exports['Connect To Cluster with bad dns discover'] = function (test) {
test.expect(3);
var clusterDns = 'discover://eventstore-bad.local:2113';
var conn = client.EventStoreConnection.create(testBase.settings({maxDiscoverAttempts: 1}), clusterDns);
conn.connect()
.catch(function (err) {
test.ok(err.message.indexOf('Couldn\'t resolve target end point') === 0, 'Wrong expected reason.');
});
conn.on('connected', function () {
test.ok(false, 'Should not be able to connect.');
});
conn.on('error', function (err) {
test.ok(err.message.indexOf('Failed to discover candidate in 1 attempts') === 0, 'Wrong expected reason.');
});
conn.on('closed', function (reason) {
test.ok(reason.indexOf('Failed to resolve TCP end point to which to connect') === 0, 'Wrong expected reason.');
test.done();
});
};
break;
case 'tcp':
default:
module.exports['Connect To Endpoint Happy Path'] = function (test) {
test.expect(1);
var tcpEndpoint = {host: process.env.EVENTSTORE_HOST || 'localhost', port: 1113};
var conn = client.EventStoreConnection.create(testBase.settings(), tcpEndpoint);
conn.connect()
.catch(function (err) {
test.done(err);
});
conn.on('connected', function (endPoint) {
test.areEqual('connected endPoint', endPoint, tcpEndpoint);
done();
});
conn.on('error', done);
function done(err) {
conn.close();
if (err) return test.done(err);
test.done();
}
};
module.exports['Connect To Endpoint That Doesn\'t Exist'] = function (test) {
test.expect(1);
var tcpEndpoint = {host: process.env.EVENTSTORE_HOST || 'localhost', port: 11112};
var conn = client.EventStoreConnection.create(testBase.settings({maxReconnections: 1}), tcpEndpoint);
conn.connect()
.catch(function (err) {
test.done(err);
});
conn.on('connected', function () {
test.ok(false, 'Should not be able to connect.');
test.done();
});
conn.on('error', function (err) {
test.done(err);
});
conn.on('closed', function (reason) {
test.ok(reason.indexOf('Reconnection limit reached') === 0, 'Wrong expected reason.');
test.done();
});
};
module.exports['Create a connection with tcp://host:port string'] = function (test) {
var conn = client.createConnection({}, `tcp://${process.env.EVENTSTORE_HOST || 'localhost'}:1113`);
conn.close();
test.done();
};
}
if (withSsl) { if (withSsl) {
module.exports['Connect to secure tcp endpoint'] = function(test) { module.exports['Connect to secure tcp endpoint'] = function(test) {
var conn = client.createConnection({ var conn = client.createConnection({
useSslConnection: true, useSslConnection: true,
targetHost: 'localhost', targetHost: process.env.EVENTSTORE_HOST || 'localhost',
validateServer: false validateServer: false
}, 'tcp://localhost:1115'); }, `tcp://${process.env.EVENTSTORE_HOST || 'localhost'}:1115`);
conn.on('error', function (err) { conn.on('error', function (err) {
test.done(err); test.done(err);
}); });

98
test/fixtures/gossip.json vendored Normal file
View File

@ -0,0 +1,98 @@
{
"members": [
{
"instanceId": "bb16857d-373d-4233-a175-89c917a72329",
"timeStamp": "2020-09-02T13:53:24.234898Z",
"state": "Slave",
"isAlive": false,
"internalTcpIp": "10.0.0.1",
"internalTcpPort": 1112,
"internalSecureTcpPort": 0,
"externalTcpIp": "10.0.0.1",
"externalTcpPort": 1113,
"externalSecureTcpPort": 0,
"internalHttpIp": "10.0.0.1",
"internalHttpPort": 2112,
"externalHttpIp": "10.0.0.1",
"externalHttpPort": 2113,
"lastCommitPosition": 648923382,
"writerCheckpoint": 648936339,
"chaserCheckpoint": 648936339,
"epochPosition": 551088596,
"epochNumber": 201,
"epochId": "d8f95f4b-167a-4487-9031-4d31a507e6d9",
"nodePriority": 0
},
{
"instanceId": "b3c18dcd-6476-467a-b7b8-d6672b74e9c2",
"timeStamp": "2020-09-02T13:56:06.189428Z",
"state": "CatchingUp",
"isAlive": true,
"internalTcpIp": "10.0.0.2",
"internalTcpPort": 1112,
"internalSecureTcpPort": 0,
"externalTcpIp": "10.0.0.2",
"externalTcpPort": 1113,
"externalSecureTcpPort": 0,
"internalHttpIp": "10.0.0.2",
"internalHttpPort": 2112,
"externalHttpIp": "10.0.0.2",
"externalHttpPort": 2113,
"lastCommitPosition": -1,
"writerCheckpoint": 0,
"chaserCheckpoint": 0,
"epochPosition": -1,
"epochNumber": -1,
"epochId": "00000000-0000-0000-0000-000000000000",
"nodePriority": 0
},
{
"instanceId": "e802a2b5-826c-4bd5-84d0-c9d1387fbf79",
"timeStamp": "2020-09-02T13:56:07.391534Z",
"state": "Master",
"isAlive": true,
"internalTcpIp": "10.0.0.3",
"internalTcpPort": 1112,
"internalSecureTcpPort": 0,
"externalTcpIp": "10.0.0.3",
"externalTcpPort": 1113,
"externalSecureTcpPort": 0,
"internalHttpIp": "10.0.0.3",
"internalHttpPort": 2112,
"externalHttpIp": "10.0.0.3",
"externalHttpPort": 2113,
"lastCommitPosition": 649007631,
"writerCheckpoint": 649024685,
"chaserCheckpoint": 649024685,
"epochPosition": 649023795,
"epochNumber": 202,
"epochId": "1f17695d-6558-4d8b-ba60-2ae273b11e09",
"nodePriority": 0
},
{
"instanceId": "24bb9031-5f21-436c-a7b5-c5f03a95e938",
"timeStamp": "2020-09-02T13:54:39.023053Z",
"state": "Slave",
"isAlive": false,
"internalTcpIp": "10.0.0.4",
"internalTcpPort": 1112,
"internalSecureTcpPort": 0,
"externalTcpIp": "10.0.0.4",
"externalTcpPort": 1113,
"externalSecureTcpPort": 0,
"internalHttpIp": "10.0.0.4",
"internalHttpPort": 2112,
"externalHttpIp": "10.0.0.4",
"externalHttpPort": 2113,
"lastCommitPosition": 649007631,
"writerCheckpoint": 649023795,
"chaserCheckpoint": 649023795,
"epochPosition": 551088596,
"epochNumber": 201,
"epochId": "d8f95f4b-167a-4487-9031-4d31a507e6d9",
"nodePriority": 0
}
],
"serverIp": "10.0.0.3",
"serverPort": 2112
}

View File

@ -9,20 +9,6 @@ function createRandomEvent() {
var testStreamName = 'test-' + uuid.v4(); var testStreamName = 'test-' + uuid.v4();
function delay(ms) {
return new Promise(function (resolve, reject) {
setTimeout(resolve, ms);
})
}
function delayOnlyFirst(count, action) {
if (count === 0) return action();
return delay(200)
.then(function () {
action();
})
}
module.exports = { module.exports = {
'Test Create Persistent Subscription': function(test) { 'Test Create Persistent Subscription': function(test) {
var settings = client.PersistentSubscriptionSettings.create(); var settings = client.PersistentSubscriptionSettings.create();
@ -46,10 +32,8 @@ module.exports = {
test.done(); test.done();
} }
function eventAppeared(s, e) { function eventAppeared(s, e) {
return delayOnlyFirst(receivedEvents.length, function () {
receivedEvents.push(e); receivedEvents.push(e);
if (receivedEvents.length === 2) s.stop(); if (receivedEvents.length === 2) s.stop();
});
} }
function subscriptionDropped(connection, reason, error) { function subscriptionDropped(connection, reason, error) {
if (error) return done(error); if (error) return done(error);

View File

@ -2,7 +2,7 @@ const client = require('../lib/dist');
const userCredentials = new client.UserCredentials('admin', 'changeit'); const userCredentials = new client.UserCredentials('admin', 'changeit');
const log = new client.NoopLogger(); const log = new client.NoopLogger();
const httpEndpoint = 'http://127.0.0.1:2113'; const httpEndpoint = `http://${process.env.EVENTSTORE_HOST || "localhost"}:2113`;
const operationTimeout = 5000; const operationTimeout = 5000;
const simpleProjection = "\ const simpleProjection = "\

View File

@ -7,20 +7,6 @@ function createRandomEvent() {
return client.createJsonEventData(uuid.v4(), {a: uuid.v4(), b: Math.random()}, {createdAt: Date.now()}, 'testEvent'); return client.createJsonEventData(uuid.v4(), {a: uuid.v4(), b: Math.random()}, {createdAt: Date.now()}, 'testEvent');
} }
function delay(ms) {
return new Promise(function (resolve, reject) {
setTimeout(resolve, ms);
})
}
function delayOnlyFirst(count, action) {
if (count === 0) return action();
return delay(200)
.then(function () {
action();
})
}
module.exports = { module.exports = {
'Test Subscribe to All From (Start)': function(test) { 'Test Subscribe to All From (Start)': function(test) {
test.expect(6); test.expect(6);
@ -48,14 +34,12 @@ module.exports = {
} }
function eventAppeared(s, e) { function eventAppeared(s, e) {
var isLive = liveProcessing; var isLive = liveProcessing;
delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() {
if (isLive) { if (isLive) {
liveEvents.push(e); liveEvents.push(e);
} else { } else {
catchUpEvents.push(e); catchUpEvents.push(e);
} }
if (isLive && liveEvents.length === 2) s.stop(); if (isLive && liveEvents.length === 2) s.stop();
});
} }
function liveProcessingStarted() { function liveProcessingStarted() {
liveProcessing = true; liveProcessing = true;
@ -99,14 +83,12 @@ module.exports = {
} }
function eventAppeared(s, e) { function eventAppeared(s, e) {
var isLive = liveProcessing; var isLive = liveProcessing;
delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() {
if (isLive) { if (isLive) {
liveEvents.push(e); liveEvents.push(e);
} else { } else {
catchUpEvents.push(e); catchUpEvents.push(e);
} }
if (isLive && liveEvents.length === 2) s.stop(); if (isLive && liveEvents.length === 2) s.stop();
});
} }
function liveProcessingStarted() { function liveProcessingStarted() {
liveProcessing = true; liveProcessing = true;
@ -149,14 +131,12 @@ module.exports = {
} }
function eventAppeared(s, e) { function eventAppeared(s, e) {
var isLive = liveProcessing; var isLive = liveProcessing;
delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() {
if (isLive) { if (isLive) {
liveEvents.push(e); liveEvents.push(e);
} else { } else {
catchUpEvents.push(e); catchUpEvents.push(e);
} }
if (isLive && liveEvents.length === 2) s.stop(); if (isLive && liveEvents.length === 2) s.stop();
});
} }
function liveProcessingStarted() { function liveProcessingStarted() {
liveProcessing = true; liveProcessing = true;

View File

@ -2,20 +2,6 @@ const uuid = require('uuid');
const client = require('../lib/dist'); const client = require('../lib/dist');
const allCredentials = new client.UserCredentials("admin", "changeit"); const allCredentials = new client.UserCredentials("admin", "changeit");
function delay(ms) {
return new Promise(function (resolve, reject) {
setTimeout(resolve, ms);
})
}
function delayOnlyFirst(count, action) {
if (count === 0) return action();
return delay(200)
.then(function () {
action();
})
}
module.exports = { module.exports = {
'Test Subscribe To All Happy Path': function(test) { 'Test Subscribe To All Happy Path': function(test) {
const resolveLinkTos = false; const resolveLinkTos = false;
@ -44,10 +30,15 @@ module.exports = {
var receivedEvents = []; var receivedEvents = [];
function eventAppeared(subscription, event) { function eventAppeared(subscription, event) {
delayOnlyFirst(receivedEvents.length, function() { // Filter non-compliant events (only the one we've published)
let eventData;
try {
eventData = JSON.parse(event.event.data.toString());
} catch(e){}
if (eventData && eventData.a && eventData.b){
receivedEvents.push(event); receivedEvents.push(event);
}
if (receivedEvents.length === numberOfPublishedEvents) subscription.close(); if (receivedEvents.length === numberOfPublishedEvents) subscription.close();
});
} }
function subscriptionDropped(subscription, reason, error) { function subscriptionDropped(subscription, reason, error) {
if (error) return done(error); if (error) return done(error);

View File

@ -6,20 +6,6 @@ function createRandomEvent() {
return client.createJsonEventData(uuid.v4(), {a: uuid.v4(), b: Math.random()}, {createdAt: Date.now()}, 'testEvent'); return client.createJsonEventData(uuid.v4(), {a: uuid.v4(), b: Math.random()}, {createdAt: Date.now()}, 'testEvent');
} }
function delay(ms) {
return new Promise(function (resolve, reject) {
setTimeout(resolve, ms);
})
}
function delayOnlyFirst(count, action) {
if (count === 0) return action();
return delay(200)
.then(function () {
action();
})
}
module.exports = { module.exports = {
'Test Subscribe to Stream From Beginning (null)': function(test) { 'Test Subscribe to Stream From Beginning (null)': function(test) {
test.expect(48); test.expect(48);
@ -37,14 +23,12 @@ module.exports = {
function eventAppeared(s, e) { function eventAppeared(s, e) {
var isLive = liveProcessing; var isLive = liveProcessing;
delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() {
if (isLive) { if (isLive) {
liveEvents.push(e); liveEvents.push(e);
} else { } else {
catchUpEvents.push(e); catchUpEvents.push(e);
} }
if (isLive && liveEvents.length === 2) s.stop(); if (isLive && liveEvents.length === 2) s.stop();
});
} }
function liveProcessingStarted() { function liveProcessingStarted() {
liveProcessing = true; liveProcessing = true;
@ -93,14 +77,12 @@ module.exports = {
function eventAppeared(s, e) { function eventAppeared(s, e) {
var isLive = liveProcessing; var isLive = liveProcessing;
delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() {
if (isLive) { if (isLive) {
liveEvents.push(e); liveEvents.push(e);
} else { } else {
catchUpEvents.push(e); catchUpEvents.push(e);
} }
if (isLive && liveEvents.length === 2) s.stop(); if (isLive && liveEvents.length === 2) s.stop();
});
} }
function liveProcessingStarted() { function liveProcessingStarted() {
liveProcessing = true; liveProcessing = true;

View File

@ -2,20 +2,6 @@ const uuid = require('uuid');
const client = require('../lib/dist'); const client = require('../lib/dist');
const Long = require('long'); const Long = require('long');
function delay(ms) {
return new Promise(function (resolve, reject) {
setTimeout(resolve, ms);
})
}
function delayOnlyFirst(count, action) {
if (count === 0) return action();
return delay(200)
.then(function () {
action();
})
}
module.exports = { module.exports = {
'Test Subscribe To Stream Happy Path': function(test) { 'Test Subscribe To Stream Happy Path': function(test) {
const resolveLinkTos = false; const resolveLinkTos = false;
@ -43,10 +29,15 @@ module.exports = {
var receivedEvents = []; var receivedEvents = [];
function eventAppeared(subscription, event) { function eventAppeared(subscription, event) {
delayOnlyFirst(receivedEvents.length, function () { // Filter non-compliant events (only the one we've published)
let eventData;
try {
eventData = JSON.parse(event.event.data.toString());
} catch(e){}
if (eventData && eventData.a && eventData.b){
receivedEvents.push(event); receivedEvents.push(event);
}
if (receivedEvents.length === numberOfPublishedEvents) subscription.close(); if (receivedEvents.length === numberOfPublishedEvents) subscription.close();
});
} }
function subscriptionDropped(subscription, reason, error) { function subscriptionDropped(subscription, reason, error) {
if (error) return done(error); if (error) return done(error);

View File

@ -0,0 +1,693 @@
const fs = require('fs');
const path = require('path');
const dns = require('dns');
const ClusterDiscoverer = require('../../../src/core/cluster/clusterDiscoverer');
const ClusterInfo = require('../../../src/core/cluster/clusterInfo');
const GossipSeed = require('../../../src/gossipSeed');
const NodeEndPoints = require('../../../src/core/cluster/nodeEndpoints');
const logger = { info: () => {} };
describe('ClusterDiscoverer', () => {
const mockDns = {
ADDRCONFIG: dns.ADDRCONFIG,
V4MAPPED: dns.V4MAPPED,
};
const mockHttp = {};
const settings = {
clusterDns: 'my-discover.com:2113',
maxDiscoverAttempts: 10,
discoverDelay: 10,
managerExternalHttpPort: 2113,
seeds: null,
gossipTimeout: 1000,
};
const tClusterInfo = new ClusterInfo([
{
instanceId: 'bb16857d-373d-4233-a175-89c917a72329',
timeStamp: '2020-09-02T13:53:24.234898Z',
state: 'Slave',
isAlive: false,
internalTcpIp: '10.0.0.1',
internalTcpPort: 1112,
internalSecureTcpPort: 0,
externalTcpIp: '10.0.0.1',
externalTcpPort: 1113,
externalSecureTcpPort: 0,
internalHttpIp: '10.0.0.1',
internalHttpPort: 2112,
externalHttpIp: '10.0.0.1',
externalHttpPort: 2113,
lastCommitPosition: 648923382,
writerCheckpoint: 648936339,
chaserCheckpoint: 648936339,
epochPosition: 551088596,
epochNumber: 201,
epochId: 'd8f95f4b-167a-4487-9031-4d31a507e6d9',
nodePriority: 0,
},
{
instanceId: 'b3c18dcd-6476-467a-b7b8-d6672b74e9c2',
timeStamp: '2020-09-02T13:56:06.189428Z',
state: 'CatchingUp',
isAlive: true,
internalTcpIp: '10.0.0.2',
internalTcpPort: 1112,
internalSecureTcpPort: 0,
externalTcpIp: '10.0.0.2',
externalTcpPort: 1113,
externalSecureTcpPort: 0,
internalHttpIp: '10.0.0.2',
internalHttpPort: 2112,
externalHttpIp: '10.0.0.2',
externalHttpPort: 2113,
lastCommitPosition: -1,
writerCheckpoint: 0,
chaserCheckpoint: 0,
epochPosition: -1,
epochNumber: -1,
epochId: '00000000-0000-0000-0000-000000000000',
nodePriority: 0,
},
{
instanceId: 'e802a2b5-826c-4bd5-84d0-c9d1387fbf79',
timeStamp: '2020-09-02T13:56:07.391534Z',
state: 'Master',
isAlive: true,
internalTcpIp: '10.0.0.3',
internalTcpPort: 1112,
internalSecureTcpPort: 0,
externalTcpIp: '10.0.0.3',
externalTcpPort: 1113,
externalSecureTcpPort: 0,
internalHttpIp: '10.0.0.3',
internalHttpPort: 2112,
externalHttpIp: '10.0.0.3',
externalHttpPort: 2113,
lastCommitPosition: 649007631,
writerCheckpoint: 649024685,
chaserCheckpoint: 649024685,
epochPosition: 649023795,
epochNumber: 202,
epochId: '1f17695d-6558-4d8b-ba60-2ae273b11e09',
nodePriority: 0,
},
{
instanceId: '24bb9031-5f21-436c-a7b5-c5f03a95e938',
timeStamp: '2020-09-02T13:54:39.023053Z',
state: 'Slave',
isAlive: false,
internalTcpIp: '10.0.0.4',
internalTcpPort: 1112,
internalSecureTcpPort: 0,
externalTcpIp: '10.0.0.4',
externalTcpPort: 1113,
externalSecureTcpPort: 0,
internalHttpIp: '10.0.0.4',
internalHttpPort: 2112,
externalHttpIp: '10.0.0.4',
externalHttpPort: 2113,
lastCommitPosition: 649007631,
writerCheckpoint: 649023795,
chaserCheckpoint: 649023795,
epochPosition: 551088596,
epochNumber: 201,
epochId: 'd8f95f4b-167a-4487-9031-4d31a507e6d9',
nodePriority: 0,
},
]);
const tClusterInfoNoBestNode = new ClusterInfo([
{
instanceId: 'bb16857d-373d-4233-a175-89c917a72329',
timeStamp: '2020-09-02T13:53:24.234898Z',
state: 'Manager',
isAlive: true,
internalTcpIp: '10.0.0.1',
internalTcpPort: 1112,
internalSecureTcpPort: 0,
externalTcpIp: '10.0.0.1',
externalTcpPort: 1113,
externalSecureTcpPort: 0,
internalHttpIp: '10.0.0.1',
internalHttpPort: 2112,
externalHttpIp: '10.0.0.1',
externalHttpPort: 2113,
lastCommitPosition: 648923382,
writerCheckpoint: 648936339,
chaserCheckpoint: 648936339,
epochPosition: 551088596,
epochNumber: 201,
epochId: 'd8f95f4b-167a-4487-9031-4d31a507e6d9',
nodePriority: 0,
},
{
instanceId: 'b3c18dcd-6476-467a-b7b8-d6672b74e9c2',
timeStamp: '2020-09-02T13:56:06.189428Z',
state: 'CatchingUp',
isAlive: false,
internalTcpIp: '10.0.0.2',
internalTcpPort: 1112,
internalSecureTcpPort: 0,
externalTcpIp: '10.0.0.2',
externalTcpPort: 1113,
externalSecureTcpPort: 0,
internalHttpIp: '10.0.0.2',
internalHttpPort: 2112,
externalHttpIp: '10.0.0.2',
externalHttpPort: 2113,
lastCommitPosition: -1,
writerCheckpoint: 0,
chaserCheckpoint: 0,
epochPosition: -1,
epochNumber: -1,
epochId: '00000000-0000-0000-0000-000000000000',
nodePriority: 0,
},
{
instanceId: 'e802a2b5-826c-4bd5-84d0-c9d1387fbf79',
timeStamp: '2020-09-02T13:56:07.391534Z',
state: 'Master',
isAlive: false,
internalTcpIp: '10.0.0.3',
internalTcpPort: 1112,
internalSecureTcpPort: 0,
externalTcpIp: '10.0.0.3',
externalTcpPort: 1113,
externalSecureTcpPort: 0,
internalHttpIp: '10.0.0.3',
internalHttpPort: 2112,
externalHttpIp: '10.0.0.3',
externalHttpPort: 2113,
lastCommitPosition: 649007631,
writerCheckpoint: 649024685,
chaserCheckpoint: 649024685,
epochPosition: 649023795,
epochNumber: 202,
epochId: '1f17695d-6558-4d8b-ba60-2ae273b11e09',
nodePriority: 0,
},
{
instanceId: '24bb9031-5f21-436c-a7b5-c5f03a95e938',
timeStamp: '2020-09-02T13:54:39.023053Z',
state: 'Slave',
isAlive: false,
internalTcpIp: '10.0.0.4',
internalTcpPort: 1112,
internalSecureTcpPort: 0,
externalTcpIp: '10.0.0.4',
externalTcpPort: 1113,
externalSecureTcpPort: 0,
internalHttpIp: '10.0.0.4',
internalHttpPort: 2112,
externalHttpIp: '10.0.0.4',
externalHttpPort: 2113,
lastCommitPosition: 649007631,
writerCheckpoint: 649023795,
chaserCheckpoint: 649023795,
epochPosition: 551088596,
epochNumber: 201,
epochId: 'd8f95f4b-167a-4487-9031-4d31a507e6d9',
nodePriority: 0,
},
]);
const discoverer = new ClusterDiscoverer(logger, settings, mockDns, mockHttp);
const discovererWithGossipSeeds = new ClusterDiscoverer(
logger,
{
...settings,
...{
seeds: [
new GossipSeed({
host: '10.0.0.1',
port: 2113,
}),
new GossipSeed({
host: '10.0.0.1',
port: 2113,
}),
new GossipSeed({
host: '10.0.0.1',
port: 2113,
}),
],
},
},
mockDns,
mockHttp
);
afterEach(() => {
jest.clearAllMocks();
});
describe('constructor', () => {
test('Should be defined', () => {
expect(discoverer).toBeDefined();
});
test('Should throw an error', () => {
expect(
() =>
new ClusterDiscoverer(
logger,
{
clusterDns: null,
maxDiscoverAttempts: 10,
managerExternalHttpPort: 2113,
seeds: null,
gossipTimeout: 1000,
},
mockDns,
mockHttp
)
).toThrow();
});
expect(
() =>
new ClusterDiscoverer(
logger,
{
clusterDns: null,
maxDiscoverAttempts: 10,
managerExternalHttpPort: 2113,
seeds: [],
gossipTimeout: 1000,
},
mockDns,
mockHttp
)
).toThrow();
});
describe('#_resolveDns', () => {
test('Should call lookup', async () => {
mockDns.lookup = jest.fn().mockResolvedValue([
{
address: '10.0.0.1',
family: 4,
},
]);
await discoverer._resolveDns('my-discover.com:2113');
expect(mockDns.lookup).toHaveBeenCalledWith('my-discover.com:2113', {
family: 4,
hints: dns.ADDRCONFIG | dns.V4MAPPED,
all: true,
});
});
test('Should reject if dnsService fails', async () => {
mockDns.lookup = jest.fn().mockRejectedValue(new Error('Unexpected DNS error'));
await expect(discoverer._resolveDns('my-discover.com:2113')).rejects.toBeDefined();
});
test('Should reject if no addresses are returned', async () => {
mockDns.lookup = jest.fn().mockResolvedValue([]);
await expect(discoverer._resolveDns('my-discover.com:2113')).rejects.toEqual(
new Error('No result from dns lookup for my-discover.com:2113')
);
});
test('Should return a list of candidate addresses', async () => {
mockDns.lookup = jest.fn().mockResolvedValue([
{
address: '10.0.0.1',
family: 4,
},
{
address: '10.0.0.2',
family: 4,
},
{
address: '10.0.0.3',
family: 4,
},
]);
const candidates = await discoverer._resolveDns('my-discover.com:2113');
expect(candidates).toEqual(['10.0.0.1', '10.0.0.2', '10.0.0.3']);
});
});
describe('#_clusterInfo', () => {
test('Should call httpService.request to get cluster informations', async () => {
const tCandidate = new GossipSeed(
{
host: '10.0.0.1',
port: '2113',
},
undefined
);
const tTimeout = 1000;
const requestEvents = {};
let responseCallback;
mockHttp.request = jest.fn((options, callback) => {
responseCallback = callback;
return {
setTimeout: jest.fn(() => ({})),
on: (type, callback) => {
requestEvents[type] = callback;
},
end: () => {},
destroy: () => {},
};
});
discoverer._clusterInfo(tCandidate, tTimeout);
expect(mockHttp.request).toHaveBeenCalledWith(
{
host: tCandidate.endPoint.host,
port: tCandidate.endPoint.port,
path: '/gossip?format=json',
timeout: tTimeout,
},
expect.anything()
);
});
test('Should call httpService.request to get cluster informations with host header', async () => {
const tCandidate = new GossipSeed(
{
host: '10.0.0.1',
port: '2113',
},
undefined,
'MyHost'
);
const tTimeout = 1000;
const requestEvents = {};
let responseCallback;
mockHttp.request = jest.fn((options, callback) => {
responseCallback = callback;
return {
setTimeout: jest.fn(() => ({})),
on: (type, callback) => {
requestEvents[type] = callback;
},
end: () => {},
destroy: () => {},
};
});
discoverer._clusterInfo(tCandidate, tTimeout);
expect(mockHttp.request).toHaveBeenCalledWith(
{
host: tCandidate.endPoint.host,
port: tCandidate.endPoint.port,
path: '/gossip?format=json',
timeout: tTimeout,
headers: {
Host: tCandidate.hostHeader,
},
},
expect.anything()
);
});
test('Should return a timeout error if the sockets fails to be connected in the specified timeout', async () => {
const tCandidate = new GossipSeed({
host: '10.0.0.1',
port: '2113',
});
const tTimeout = 1000;
const requestEvents = {};
let responseCallback;
mockHttp.request = jest.fn((options, callback) => {
responseCallback = callback;
return {
setTimeout: jest.fn(() => ({})),
on: (type, callback) => {
requestEvents[type] = callback;
},
end: () => {
requestEvents['timeout']();
},
destroy: () => {},
};
});
await expect(discoverer._clusterInfo(tCandidate, tTimeout)).rejects.toThrow(
new Error('Connection to gossip timed out')
);
});
test('Should return an error if the http request emits an error', async () => {
const tCandidate = new GossipSeed({
host: '10.0.0.1',
port: '2113',
});
const tTimeout = 1000;
const requestEvents = {};
let responseCallback;
mockHttp.request = jest.fn((options, callback) => {
responseCallback = callback;
return {
setTimeout: jest.fn(() => ({})),
on: (type, callback) => {
requestEvents[type] = callback;
},
end: () => {
requestEvents['error'](new Error('Request error'));
},
destroy: () => {},
};
});
await expect(discoverer._clusterInfo(tCandidate, tTimeout)).rejects.toThrow(
new Error('Connection to gossip errored')
);
});
test("Should return an error if the candidate doesn't returns a 200 code", async () => {
const tCandidate = new GossipSeed({
host: '10.0.0.1',
port: '2113',
});
const tTimeout = 1000;
const requestEvents = {};
let responseCallback;
mockHttp.request = jest.fn((options, callback) => {
responseCallback = callback;
return {
setTimeout: jest.fn(() => ({})),
on: (type, callback) => {
requestEvents[type] = callback;
},
end: () => {
callback({
statusCode: 503,
on: (type, callback) => {
responseEvents[type] = callback;
},
});
},
destroy: () => {},
};
});
await expect(discoverer._clusterInfo(tCandidate, tTimeout)).rejects.toThrow(
new Error('Gossip candidate returns a 503 error')
);
});
test('Should return an error if the response is not a valid JSON', async () => {
const tCandidate = new GossipSeed({
host: '10.0.0.1',
port: '2113',
});
const tTimeout = 1000;
let responseCallback;
const requestEvents = {};
const responseEvents = {};
mockHttp.request = jest.fn((options, callback) => {
responseCallback = callback;
return {
setTimeout: jest.fn(() => ({})),
on: (type, callback) => {
requestEvents[type] = callback;
},
end: () => {
callback({
statusCode: 200,
on: (type, callback) => {
responseEvents[type] = callback;
},
});
responseEvents['data']('Not a JSON response');
responseEvents['end']();
},
destroy: () => {},
};
});
await expect(discoverer._clusterInfo(tCandidate, tTimeout)).rejects.toThrow(
new Error('Unable to parse gossip response')
);
});
test('Should return the member informations for the cluster', async () => {
const tCandidate = new GossipSeed({
host: '10.0.0.1',
port: '2113',
});
const tTimeout = 1000;
const requestEvents = {};
const responseEvents = {};
mockHttp.request = jest.fn((options, callback) => {
return {
setTimeout: jest.fn(() => ({})),
on: (type, callback) => {
requestEvents[type] = callback;
},
end: () => {
callback({
statusCode: 200,
on: (type, callback) => {
responseEvents[type] = callback;
},
});
responseEvents['data'](fs.readFileSync(path.resolve(__dirname, '../../fixtures/gossip.json')));
responseEvents['end']();
},
destroy: () => {},
};
});
const infos = await discoverer._clusterInfo(tCandidate, tTimeout);
expect(infos).toEqual(tClusterInfo);
});
});
describe('#_getGossipCandidates', () => {
test('Should get from dns if gossipSeeds are empty', async () => {
discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']);
const candidates = await discoverer._getGossipCandidates(settings.managerExternalHttpPort);
expect(discoverer._resolveDns).toHaveBeenCalled();
expect(candidates).toHaveLength(3);
for (let i in candidates) {
expect(candidates[i]).toBeInstanceOf(GossipSeed);
}
});
test('Should get gossipSeeds if present', async () => {
discovererWithGossipSeeds._resolveDns = jest.fn();
const candidates = await discovererWithGossipSeeds._getGossipCandidates(settings.managerExternalHttpPort);
expect(discovererWithGossipSeeds._resolveDns).not.toHaveBeenCalled();
expect(candidates).toHaveLength(3);
for (let i in candidates) {
expect(candidates[i]).toBeInstanceOf(GossipSeed);
}
});
});
describe('#discover', () => {
test('Should get resolve dns discover url to get IP addresses of the eventstore node', async () => {
discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']);
discoverer._clusterInfo = jest.fn().mockResolvedValue(tClusterInfo);
await discoverer.discover();
expect(discoverer._resolveDns).toHaveBeenCalledWith(settings.clusterDns);
});
test('Should call _clusterInfo with candidate', async () => {
discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']);
discoverer._clusterInfo = jest.fn().mockResolvedValue(tClusterInfo);
await discoverer.discover();
expect(discoverer._clusterInfo).toHaveBeenCalledWith(
new GossipSeed({ host: '10.0.0.1', port: settings.managerExternalHttpPort }),
settings.gossipTimeout
);
});
test('Should call _clusterInfo with candidate from gossipSeed if provided', async () => {
discovererWithGossipSeeds._resolveDns = jest.fn().mockResolvedValue();
discovererWithGossipSeeds._clusterInfo = jest.fn().mockResolvedValue(tClusterInfo);
await discovererWithGossipSeeds.discover();
expect(discovererWithGossipSeeds._resolveDns).not.toHaveBeenCalled();
});
test('Should return the bestNode', async () => {
discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']);
discoverer._clusterInfo = jest.fn().mockResolvedValue(tClusterInfo);
const node = await discoverer.discover();
expect(node).toEqual(
new NodeEndPoints(
{
host: '10.0.0.3',
port: 1113,
},
null
)
);
});
test('Should try to call each candidates until it get clusterInfo with bestNode', async () => {
discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']);
discoverer._clusterInfo = jest.fn().mockImplementation(async (candidate) => {
if (candidate.endPoint.host === '10.0.0.3') {
return tClusterInfo;
}
throw new Error('Gossip candidate returns a 503 error');
});
const node = await discoverer.discover();
expect(node).toEqual(
new NodeEndPoints(
{
host: '10.0.0.3',
port: 1113,
},
null
)
);
expect(discoverer._clusterInfo).toHaveBeenCalledTimes(3);
});
test('Should fail if the we reach the maxDiscoverAttempts limits (no bestNode is found)', async () => {
discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']);
discoverer._clusterInfo = jest.fn().mockResolvedValue(tClusterInfoNoBestNode);
await expect(discoverer.discover()).rejects.toEqual(
new Error(`Failed to discover candidate in ${settings.maxDiscoverAttempts} attempts.`)
);
expect(discoverer._resolveDns).toHaveBeenCalledTimes(settings.maxDiscoverAttempts);
expect(discoverer._resolveDns).toHaveBeenCalledTimes(settings.maxDiscoverAttempts);
});
test('Should fail if the we reach the maxDiscoverAttempts limits (all resolveDns attempts fails)', async () => {
discoverer._resolveDns = jest.fn().mockRejectedValue(new Error('Connection to gossip timed out'));
await expect(discoverer.discover()).rejects.toEqual(
new Error(`Failed to discover candidate in ${settings.maxDiscoverAttempts} attempts.`)
);
expect(discoverer._resolveDns).toHaveBeenCalledTimes(settings.maxDiscoverAttempts);
});
test('Should fail if the we reach the maxDiscoverAttempts limits (all clusterInfo attempts fails)', async () => {
discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']);
discoverer._clusterInfo = jest.fn().mockRejectedValue(new Error('Gossip candidate returns a 503 error'));
await expect(discoverer.discover()).rejects.toEqual(
new Error(`Failed to discover candidate in ${settings.maxDiscoverAttempts} attempts.`)
);
expect(discoverer._resolveDns).toHaveBeenCalledTimes(settings.maxDiscoverAttempts);
expect(discoverer._resolveDns).toHaveBeenCalledTimes(settings.maxDiscoverAttempts);
});
test('Should try to call each candidates expect failed one until it get clusterInfo with bestNode', async () => {
discoverer._resolveDns = jest.fn().mockResolvedValue(['10.0.0.1', '10.0.0.2', '10.0.0.3']);
discoverer._clusterInfo = jest.fn().mockImplementation(async (candidate) => {
if (candidate.endPoint.host === '10.0.0.3') {
return tClusterInfo;
}
throw new Error('Gossip candidate returns a 503 error');
});
const node = await discoverer.discover({ host: '10.0.0.2', port: 2113 });
expect(node).toEqual(
new NodeEndPoints(
{
host: '10.0.0.3',
port: 1113,
},
null
)
);
expect(discoverer._clusterInfo).toHaveBeenCalledTimes(2);
expect(discoverer._clusterInfo).toHaveBeenCalledWith(
new GossipSeed({ host: '10.0.0.1', port: 2113 }),
settings.gossipTimeout
);
expect(discoverer._clusterInfo).toHaveBeenCalledWith(
new GossipSeed({ host: '10.0.0.3', port: 2113 }),
settings.gossipTimeout
);
});
});
});