From f951a625f464b8ad52fbb909df37ef32e99468c2 Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Mon, 17 Oct 2016 21:58:28 -0700 Subject: [PATCH] Performance improvement by using strict equality, fixed heartbeat issue in connection stage --- package.json | 13 ++-- ...onnectToPersistentSubscriptionOperation.js | 20 +++--- .../createPersistentSubscriptionOperation.js | 2 +- src/clientOperations/operationBase.js | 6 +- src/clientOperations/subscriptionOperation.js | 4 +- .../updatePersistentSubscriptionOperation.js | 2 +- .../volatileSubscriptionOperation.js | 10 +-- src/core/clusterDnsEndPointDiscoverer.js | 12 ++-- src/core/eventStoreConnectionLogicHandler.js | 69 +++++++++---------- src/core/operationsManager.js | 2 +- src/core/simpleQueuedHandler.js | 16 +++-- src/core/subscriptionsManager.js | 6 +- src/eventStoreConnection.js | 4 +- src/eventStorePersistentSubscriptionBase.js | 3 +- src/gossipSeed.js | 2 +- src/results.js | 2 +- src/systemData/tcpPackage.js | 4 +- .../tcp/lengthPrefixMessageFramer.js | 6 +- src/transport/tcp/tcpConnection.js | 5 +- src/transport/tcp/tcpPackageConnection.js | 8 +-- test/connection_test.js | 6 +- test/persistentSubscription_test.js | 3 +- 22 files changed, 107 insertions(+), 98 deletions(-) diff --git a/package.json b/package.json index c328bf9..d93cf4a 100644 --- a/package.json +++ b/package.json @@ -1,18 +1,19 @@ { "name": "eventstore-node", - "version": "0.0.2", + "version": "0.0.6", "description": "A port of the EventStore .Net ClientAPI to Node.js", "main": "index.js", - "directories": { - "lib": "lib", - "test": "test" - }, "scripts": { "clean": "rm lib/dist.js", "build": "webpack", "pretest": "npm run build", - "test": "nodeunit" + "test": "nodeunit", + "prepublish": "npm run build" }, + "files": [ + "lib", + "src" + ], "repository": { "type": "git", "url": "git+https://github.com/nicdex/eventstore-node.git" diff --git a/src/clientOperations/connectToPersistentSubscriptionOperation.js b/src/clientOperations/connectToPersistentSubscriptionOperation.js index 294efdf..9cc2752 100644 --- a/src/clientOperations/connectToPersistentSubscriptionOperation.js +++ b/src/clientOperations/connectToPersistentSubscriptionOperation.js @@ -37,38 +37,38 @@ ConnectToPersistentSubscriptionOperation.prototype._createSubscriptionPackage = }; ConnectToPersistentSubscriptionOperation.prototype._inspectPackage = function(pkg) { - if (pkg.command == TcpCommand.PersistentSubscriptionConfirmation) + if (pkg.command === TcpCommand.PersistentSubscriptionConfirmation) { var dto = ClientMessage.PersistentSubscriptionConfirmation.decode(pkg.data.toBuffer()); this._confirmSubscription(dto.last_commit_position, dto.last_event_number); this._subscriptionId = dto.subscription_id; return new InspectionResult(InspectionDecision.Subscribed, "SubscriptionConfirmation"); } - if (pkg.command == TcpCommand.PersistentSubscriptionStreamEventAppeared) + if (pkg.command === TcpCommand.PersistentSubscriptionStreamEventAppeared) { var dto = ClientMessage.PersistentSubscriptionStreamEventAppeared.decode(pkg.data.toBuffer()); this._onEventAppeared(new results.ResolvedEvent(dto.event)); return new InspectionResult(InspectionDecision.DoNothing, "StreamEventAppeared"); } - if (pkg.command == TcpCommand.SubscriptionDropped) + if (pkg.command === TcpCommand.SubscriptionDropped) { var dto = ClientMessage.SubscriptionDropped.decode(pkg.data.toBuffer()); - if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.AccessDenied) + if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.AccessDenied) { this.dropSubscription(SubscriptionDropReason.AccessDenied, new Error("You do not have access to the stream.")); return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); } - if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.NotFound) + if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.NotFound) { this.dropSubscription(SubscriptionDropReason.NotFound, new Error("Subscription not found")); return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); } - if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.PersistentSubscriptionDeleted) + if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.PersistentSubscriptionDeleted) { this.dropSubscription(SubscriptionDropReason.PersistentSubscriptionDeleted, new Error("Persistent subscription deleted.")); return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); } - if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.SubscriberMaxCountReached) + if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.SubscriberMaxCountReached) { this.dropSubscription(SubscriptionDropReason.MaxSubscribersReached, new Error("Maximum subscribers reached.")); return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); @@ -111,10 +111,10 @@ ConnectToPersistentSubscriptionOperation.prototype.notifyEventsFailed = function action); var pkg = new TcpPackage(TcpCommand.PersistentSubscriptionNakEvents, - this._userCredentials != null ? TcpFlags.Authenticated : TcpFlags.None, + this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None, this._correlationId, - this._userCredentials != null ? this._userCredentials.username : null, - this._userCredentials != null ? this._userCredentials.password : null, + this._userCredentials !== null ? this._userCredentials.username : null, + this._userCredentials !== null ? this._userCredentials.password : null, createBufferSegment(dto.toBuffer())); this._enqueueSend(pkg); }; diff --git a/src/clientOperations/createPersistentSubscriptionOperation.js b/src/clientOperations/createPersistentSubscriptionOperation.js index 3944120..252c343 100644 --- a/src/clientOperations/createPersistentSubscriptionOperation.js +++ b/src/clientOperations/createPersistentSubscriptionOperation.js @@ -39,7 +39,7 @@ CreatePersistentSubscriptionOperation.prototype._createRequestDto = function() { return new ClientMessage.CreatePersistentSubscription(this._groupName, this._stream, this._resolveLinkTos, this._startFromBeginning, this._messageTimeoutMilliseconds, this._recordStatistics, this._liveBufferSize, this._readBatchSize, this._bufferSize, this._maxRetryCount, - this._namedConsumerStrategy == SystemConsumerStrategies.RoundRobin, this._checkPointAfter, + this._namedConsumerStrategy === SystemConsumerStrategies.RoundRobin, this._checkPointAfter, this._maxCheckPointCount, this._minCheckPointCount, this._maxSubscriberCount, this._namedConsumerStrategy); }; diff --git a/src/clientOperations/operationBase.js b/src/clientOperations/operationBase.js index 1132463..3454b2f 100644 --- a/src/clientOperations/operationBase.js +++ b/src/clientOperations/operationBase.js @@ -42,7 +42,7 @@ OperationBase.prototype._succeed = function() { if (!this._completed) { this._completed = true; - if (this._response != null) + if (this._response) this._cb(null, this._transformResponse(this._response)); else this._cb(new Error("No result.")) @@ -118,7 +118,7 @@ OperationBase.prototype._inspectNotHandled = function(pkg) case ClientMessage.NotHandled.NotHandledReason.NotMaster: var masterInfo = ClientMessage.NotHandled.MasterInfo.decode(message.additional_info); - return new new InspectionResult(InspectionDecision.Reconnect, "NotHandled - NotMaster", + return new InspectionResult(InspectionDecision.Reconnect, "NotHandled - NotMaster", {host: masterInfo.external_tcp_address, port: masterInfo.external_tcp_port}, {host: masterInfo.external_secure_tcp_address, port: masterInfo.external_secure_tcp_port}); @@ -130,7 +130,7 @@ OperationBase.prototype._inspectNotHandled = function(pkg) OperationBase.prototype._inspectUnexpectedCommand = function(pkg, expectedCommand) { - if (pkg.command == expectedCommand) + if (pkg.command === expectedCommand) throw new Error("Command shouldn't be " + TcpCommand.getName(pkg.command)); this.log.error("Unexpected TcpCommand received.\n" diff --git a/src/clientOperations/subscriptionOperation.js b/src/clientOperations/subscriptionOperation.js index 36bfd49..9bc47b3 100644 --- a/src/clientOperations/subscriptionOperation.js +++ b/src/clientOperations/subscriptionOperation.js @@ -46,7 +46,7 @@ SubscriptionOperation.prototype._enqueueSend = function(pkg) { SubscriptionOperation.prototype.subscribe = function(correlationId, connection) { if (connection === null) throw new TypeError("connection is null."); - if (this._subscription != null || this._unsubscribed != 0) + if (this._subscription !== null || this._unsubscribed) return false; this._correlationId = correlationId; @@ -128,7 +128,7 @@ SubscriptionOperation.prototype.inspectPackage = function(pkg) { case TcpCommand.NotHandled: { - if (this._subscription != null) + if (this._subscription !== null) throw new Error("NotHandled command appeared while we already subscribed."); var message = ClientMessage.NotHandled.decode(pkg.data.toBuffer()); diff --git a/src/clientOperations/updatePersistentSubscriptionOperation.js b/src/clientOperations/updatePersistentSubscriptionOperation.js index 5b83571..e8f0bd7 100644 --- a/src/clientOperations/updatePersistentSubscriptionOperation.js +++ b/src/clientOperations/updatePersistentSubscriptionOperation.js @@ -39,7 +39,7 @@ UpdatePersistentSubscriptionOperation.prototype._createRequestDto = function() { return new ClientMessage.UpdatePersistentSubscription(this._groupName, this._stream, this._resolveLinkTos, this._startFromBeginning, this._messageTimeoutMilliseconds, this._recordStatistics, this._liveBufferSize, this._readBatchSize, this._bufferSize, this._maxRetryCount, - this._namedConsumerStrategy == SystemConsumerStrategies.RoundRobin, this._checkPointAfter, + this._namedConsumerStrategy === SystemConsumerStrategies.RoundRobin, this._checkPointAfter, this._maxCheckPointCount, this._minCheckPointCount, this._maxSubscriberCount, this._namedConsumerStrategy); }; diff --git a/src/clientOperations/volatileSubscriptionOperation.js b/src/clientOperations/volatileSubscriptionOperation.js index 15b3bfe..ea6d10c 100644 --- a/src/clientOperations/volatileSubscriptionOperation.js +++ b/src/clientOperations/volatileSubscriptionOperation.js @@ -22,21 +22,21 @@ util.inherits(VolatileSubscriptionOperation, SubscriptionOperation); VolatileSubscriptionOperation.prototype._createSubscriptionPackage = function() { var dto = new ClientMessage.SubscribeToStream(this._streamId, this._resolveLinkTos); return new TcpPackage(TcpCommand.SubscribeToStream, - this._userCredentials != null ? TcpFlags.Authenticated : TcpFlags.None, + this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None, this._correlationId, - this._userCredentials != null ? this._userCredentials.username : null, - this._userCredentials != null ? this._userCredentials.password : null, + this._userCredentials !== null ? this._userCredentials.username : null, + this._userCredentials !== null ? this._userCredentials.password : null, new BufferSegment(dto.toBuffer())); }; VolatileSubscriptionOperation.prototype._inspectPackage = function(pkg) { try { - if (pkg.command == TcpCommand.SubscriptionConfirmation) { + if (pkg.command === TcpCommand.SubscriptionConfirmation) { var dto = ClientMessage.SubscriptionConfirmation.decode(pkg.data.toBuffer()); this._confirmSubscription(dto.last_commit_position, dto.last_event_number); return new InspectionResult(InspectionDecision.Subscribed, "SubscriptionConfirmation"); } - if (pkg.command == TcpCommand.StreamEventAppeared) { + if (pkg.command === TcpCommand.StreamEventAppeared) { var dto = ClientMessage.StreamEventAppeared.decode(pkg.data.toBuffer()); this._onEventAppeared(new results.ResolvedEvent(dto.event)); return new InspectionResult(InspectionDecision.DoNothing, "StreamEventAppeared"); diff --git a/src/core/clusterDnsEndPointDiscoverer.js b/src/core/clusterDnsEndPointDiscoverer.js index 5205fd5..8f7ba27 100644 --- a/src/core/clusterDnsEndPointDiscoverer.js +++ b/src/core/clusterDnsEndPointDiscoverer.js @@ -38,7 +38,7 @@ ClusterDnsEndPointDiscoverer.prototype.discover = function(failedTcpEndPoint) { return endPoints; }) .catch(function (exc) { - self._log.info(util.format("Discovering attempt %d/%d failed with error: %s.", attempt, self._maxDiscoverAttempts, exc)); + self._log.info(util.format("Discovering attempt %d/%d failed with error: %s.\n%s", attempt, self._maxDiscoverAttempts, exc, exc.stack)); }) .then(function (endPoints) { if (endPoints) @@ -67,6 +67,7 @@ ClusterDnsEndPointDiscoverer.prototype._discoverEndPoint = function (failedTcpEn var self = this; var promise = Promise.resolve(); var j = 0; + self._log.debug('Gossip candidates', gossipCandidates); for (var i = 0; i < gossipCandidates.length; i++) { promise = promise.then(function (endPoints) { if (endPoints) return endPoints; @@ -90,10 +91,9 @@ ClusterDnsEndPointDiscoverer.prototype._discoverEndPoint = function (failedTcpEn }; ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromOldGossip = function (oldGossip, failedTcpEndPoint) { - if (failedTcpEndPoint === null) return oldGossip; + if (failedTcpEndPoint === null) return this._arrangeGossipCandidates(oldGossip); var gossipCandidates = oldGossip.filter(function(x) { - //TODO: failedTcpEndpoint.host might not be an ip - return (x.externalTcpPort !== failedTcpEndPoint.port && x.externalTcpIp !== failedTcpEndPoint.host); + return !(x.externalTcpPort === failedTcpEndPoint.port && x.externalTcpIp === failedTcpEndPoint.host); }); return this._arrangeGossipCandidates(gossipCandidates); }; @@ -133,6 +133,7 @@ ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromDns = function () ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) { var options = { + host: endPoint.endPoint.host, hostname: endPoint.endPoint.hostname, port: endPoint.endPoint.port, path: '/gossip?format=json' @@ -140,6 +141,7 @@ ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) { if (endPoint.hostHeader) { options.headers = {'Host': endPoint.hostHeader}; } + this._log.info('Try get gossip from', endPoint); var self = this; return new Promise(function (resolve, reject) { try { @@ -215,7 +217,7 @@ ClusterDnsEndPointDiscoverer.prototype._tryDetermineBestNode = function (members var secTcp = node.externalSecureTcpPort > 0 ? {host: externalTcpIp, port: node.externalSecureTcpPort} : null; - this._log.info(util.format("Discovering: found best choice [%j,%j] (%s).", normTcp, secTcp == null ? "n/a" : secTcp, node.state)); + this._log.info(util.format("Discovering: found best choice [%j,%j] (%s).", normTcp, secTcp === null ? "n/a" : secTcp, node.state)); return new NodeEndPoints(normTcp, secTcp); }; diff --git a/src/core/eventStoreConnectionLogicHandler.js b/src/core/eventStoreConnectionLogicHandler.js index c0f6c7e..7b56b60 100644 --- a/src/core/eventStoreConnectionLogicHandler.js +++ b/src/core/eventStoreConnectionLogicHandler.js @@ -165,7 +165,7 @@ EventStoreConnectionLogicHandler.prototype._startConnection = function(cb, endpo * @private */ EventStoreConnectionLogicHandler.prototype._closeConnection = function(reason, error) { - if (this._state == ConnectionState.Closed) { + if (this._state === ConnectionState.Closed) { this._logDebug("CloseConnection IGNORED because is ESConnection is CLOSED, reason %s, error %s.", reason, error ? error.stack : ''); return; } @@ -189,7 +189,7 @@ EventStoreConnectionLogicHandler.prototype._closeConnection = function(reason, e EventStoreConnectionLogicHandler.prototype._closeTcpConnection = function(reason) { if (!this._connection) { - this._logDebug("CloseTcpConnection IGNORED because _connection == null"); + this._logDebug("CloseTcpConnection IGNORED because _connection === null"); return; } @@ -320,7 +320,7 @@ EventStoreConnectionLogicHandler.prototype._startPersistentSubscription = functi EventStoreConnectionLogicHandler.prototype._establishTcpConnection = function(endPoints) { var endPoint = this._settings.useSslConnection ? endPoints.secureTcpEndPoint : endPoints.tcpEndPoint; - if (endPoint == null) + if (endPoint === null) { this._closeConnection("No end point to node specified."); return; @@ -328,8 +328,8 @@ EventStoreConnectionLogicHandler.prototype._establishTcpConnection = function(en this._logDebug("EstablishTcpConnection to [%j]", endPoint); - if (this._state != ConnectionState.Connecting) return; - if (this._connectingPhase != ConnectingPhase.EndPointDiscovery) return; + if (this._state !== ConnectionState.Connecting) return; + if (this._connectingPhase !== ConnectingPhase.EndPointDiscovery) return; var self = this; this._connectingPhase = ConnectingPhase.ConnectionEstablishing; @@ -358,10 +358,10 @@ EventStoreConnectionLogicHandler.prototype._establishTcpConnection = function(en }; EventStoreConnectionLogicHandler.prototype._tcpConnectionEstablished = function(connection) { - if (this._state != ConnectionState.Connecting || !this._connection.equals(connection) || connection.isClosed) + if (this._state !== ConnectionState.Connecting || !this._connection.equals(connection) || connection.isClosed) { this._logDebug("IGNORED (_state %s, _conn.Id %s, conn.Id %s, conn.closed %s): TCP connection to [%j, L%j] established.", - this._state, this._connection == null ? EmptyGuid : this._connection.connectionId, connection.connectionId, + this._state, this._connection === null ? EmptyGuid : this._connection.connectionId, connection.connectionId, connection.isClosed, connection.remoteEndPoint, connection.localEndPoint); return; } @@ -373,7 +373,7 @@ EventStoreConnectionLogicHandler.prototype._tcpConnectionEstablished = function( timeStamp: Date.now() }; - if (this._settings.defaultUserCredentials != null) + if (this._settings.defaultUserCredentials !== null) { this._connectingPhase = ConnectingPhase.Authentication; @@ -402,7 +402,7 @@ EventStoreConnectionLogicHandler.prototype._goToConnectedState = function() { this.emit('connected', this._connection.remoteEndPoint); - if (Date.now() - this._lastTimeoutsTimeStamp >= this._settings.operationTimeoutCheckPeriod) + if ((Date.now() - this._lastTimeoutsTimeStamp) >= this._settings.operationTimeoutCheckPeriod) { this._operations.checkTimeoutsAndRetry(this._connection); this._subscriptions.checkTimeoutsAndRetry(this._connection); @@ -411,19 +411,19 @@ EventStoreConnectionLogicHandler.prototype._goToConnectedState = function() { }; EventStoreConnectionLogicHandler.prototype._tcpConnectionError = function(connection, error) { - if (this._connection != connection) return; - if (this._state == ConnectionState.Closed) return; + if (!this._connection.equals(connection)) return; + if (this._state === ConnectionState.Closed) return; this._logDebug("TcpConnectionError connId %s, exc %s.", connection.connectionId, error); this._closeConnection("TCP connection error occurred.", error); }; EventStoreConnectionLogicHandler.prototype._tcpConnectionClosed = function(connection, error) { - if (this._state == ConnectionState.Init) throw new Error(); - if (this._state == ConnectionState.Closed || !this._connection.equals(connection)) + if (this._state === ConnectionState.Init) throw new Error(); + if (this._state === ConnectionState.Closed || !this._connection.equals(connection)) { this._logDebug("IGNORED (_state: %s, _conn.ID: %s, conn.ID: %s): TCP connection to [%j, L%j] closed.", - this._state, this._connection == null ? EmptyGuid : this._connection.connectionId, connection.connectionId, + this._state, this._connection === null ? EmptyGuid : this._connection.connectionId, connection.connectionId, connection.remoteEndPoint, connection.localEndPoint); return; } @@ -447,7 +447,7 @@ EventStoreConnectionLogicHandler.prototype._tcpConnectionClosed = function(conne }; EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connection, pkg) { - if (!connection.equals(this._connection) || this._state == ConnectionState.Closed || this._state == ConnectionState.Init) + if (!connection.equals(this._connection) || this._state === ConnectionState.Closed || this._state === ConnectionState.Init) { this._logDebug("IGNORED: HandleTcpPackage connId %s, package %s, %s.", connection.connectionId, TcpCommand.getName(pkg.command), pkg.correlationId); @@ -458,9 +458,9 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti this._connection.connectionId, TcpCommand.getName(pkg.command), pkg.correlationId); this._packageNumber += 1; - if (pkg.command == TcpCommand.HeartbeatResponseCommand) + if (pkg.command === TcpCommand.HeartbeatResponseCommand) return; - if (pkg.command == TcpCommand.HeartbeatRequestCommand) + if (pkg.command === TcpCommand.HeartbeatRequestCommand) { this._connection.enqueueSend(new TcpPackage( TcpCommand.HeartbeatResponseCommand, @@ -469,13 +469,13 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti return; } - if (pkg.command == TcpCommand.Authenticated || pkg.command == TcpCommand.NotAuthenticated) + if (pkg.command === TcpCommand.Authenticated || pkg.command === TcpCommand.NotAuthenticated) { - if (this._state == ConnectionState.Connecting - && this._connectingPhase == ConnectingPhase.Authentication - && this._authInfo.correlationId == pkg.correlationId) + if (this._state === ConnectionState.Connecting + && this._connectingPhase === ConnectingPhase.Authentication + && this._authInfo.correlationId === pkg.correlationId) { - if (pkg.command == TcpCommand.NotAuthenticated) + if (pkg.command === TcpCommand.NotAuthenticated) this.emit('authenticationFailed', "Not authenticated"); this._goToConnectedState(); @@ -483,7 +483,7 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti } } - if (pkg.command == TcpCommand.BadRequest && pkg.correlationId == EmptyGuid) + if (pkg.command === TcpCommand.BadRequest && pkg.correlationId === EmptyGuid) { var message = ""; try { @@ -515,7 +515,7 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti default: throw new Error("Unknown InspectionDecision: " + result.decision); } - if (this._state == ConnectionState.Connected) + if (this._state === ConnectionState.Connected) this._operations.scheduleWaitingOperations(connection); return; @@ -557,13 +557,13 @@ EventStoreConnectionLogicHandler.prototype._reconnectTo = function(endPoints) { var endPoint = this._settings.useSslConnection ? endPoints.secureTcpEndPoint : endPoints.tcpEndPoint; - if (endPoint == null) + if (endPoint === null) { this._closeConnection("No end point is specified while trying to reconnect."); return; } - if (this._state != ConnectionState.Connected || this._connection.remoteEndPoint == endPoint) + if (this._state !== ConnectionState.Connected || this._connection.remoteEndPoint === endPoint) return; var msg = util.format("EventStoreConnection '%s': going to reconnect to [%j]. Current endpoint: [%j, L%j].", @@ -581,7 +581,7 @@ EventStoreConnectionLogicHandler.prototype._timerTick = function() { { case ConnectionState.Init: break; case ConnectionState.Connecting: - if (this._connectingPhase == ConnectingPhase.Reconnecting && Date.now() - this._reconnInfo.timeStamp >= this._settings.reconnectionDelay) + if (this._connectingPhase === ConnectingPhase.Reconnecting && (Date.now() - this._reconnInfo.timeStamp) >= this._settings.reconnectionDelay) { this._logDebug("TimerTick checking reconnection..."); @@ -594,17 +594,17 @@ EventStoreConnectionLogicHandler.prototype._timerTick = function() { this._discoverEndpoint(null); } } - else if (this._connectingPhase == ConnectingPhase.Authentication && Date.now() - this._authInfo.timeStamp >= this._settings.operationTimeout) + else if (this._connectingPhase === ConnectingPhase.Authentication && (Date.now() - this._authInfo.timeStamp) >= this._settings.operationTimeout) { this.emit('authenticationFailed', "Authentication timed out."); this._goToConnectedState(); } - else if (this._connectingPhase > ConnectingPhase.ConnectionEstablishing) + else if (this._connectingPhase === ConnectingPhase.Authentication || this._connectingPhase === ConnectingPhase.Connected) this._manageHeartbeats(); break; case ConnectionState.Connected: // operations timeouts are checked only if connection is established and check period time passed - if (Date.now() - this._lastTimeoutsTimeStamp >= this._settings.operationTimeoutCheckPeriod) + if ((Date.now() - this._lastTimeoutsTimeStamp) >= this._settings.operationTimeoutCheckPeriod) { // On mono even impossible connection first says that it is established // so clearing of reconnection count on ConnectionEstablished event causes infinite reconnections. @@ -624,14 +624,14 @@ EventStoreConnectionLogicHandler.prototype._timerTick = function() { }; EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() { - if (this._connection == null) return; + if (this._connection === null) return; var timeout = this._heartbeatInfo.isIntervalStage ? this._settings.heartbeatInterval : this._settings.heartbeatTimeout; - if (Date.now() - this._heartbeatInfo.timeStamp < timeout) + if ((Date.now() - this._heartbeatInfo.timeStamp) < timeout) return; var packageNumber = this._packageNumber; - if (this._heartbeatInfo.lastPackageNumber != packageNumber) + if (this._heartbeatInfo.lastPackageNumber !== packageNumber) { this._heartbeatInfo = {lastPackageNumber: packageNumber, isIntervalStage: true, timeStamp: Date.now()}; return; @@ -649,10 +649,9 @@ EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() { else { // TcpMessage.HeartbeatTimeout analog - var msg = util.format("EventStoreConnection '%s': closing TCP connection [%j, L%j, %s] due to HEARTBEAT TIMEOUT at pkgNum %d.", + this._logInfo("EventStoreConnection '%s': closing TCP connection [%j, L%j, %s] due to HEARTBEAT TIMEOUT at pkgNum %d.", this._esConnection.connectionName, this._connection.remoteEndPoint, this._connection.localEndPoint, this._connection.connectionId, packageNumber); - this._settings.log.info(msg); this._closeTcpConnection(msg); } }; diff --git a/src/core/operationsManager.js b/src/core/operationsManager.js index 819c15b..eb2a5c1 100644 --- a/src/core/operationsManager.js +++ b/src/core/operationsManager.js @@ -55,7 +55,7 @@ OperationsManager.prototype.checkTimeoutsAndRetry = function(connection) { var removeOperations = []; var self = this; this._activeOperations.forEach(function(correlationId, operation) { - if (operation.connectionId != connection.connectionId) + if (operation.connectionId !== connection.connectionId) { retryOperations.push(operation); } diff --git a/src/core/simpleQueuedHandler.js b/src/core/simpleQueuedHandler.js index 3d70279..d6dbe8d 100644 --- a/src/core/simpleQueuedHandler.js +++ b/src/core/simpleQueuedHandler.js @@ -13,8 +13,14 @@ function SimpleQueuedHandler() { } SimpleQueuedHandler.prototype.registerHandler = function(type, handler) { - type = typeName(type); - this._handlers[type] = handler; + var typeId = typeName(type); + this._handlers[typeId] = function (msg) { + try { + handler(msg); + } catch(e) { + console.log('ERROR: ', e); + } + }; }; SimpleQueuedHandler.prototype.enqueueMessage = function(msg) { @@ -28,10 +34,10 @@ SimpleQueuedHandler.prototype.enqueueMessage = function(msg) { SimpleQueuedHandler.prototype._processQueue = function() { var message = this._messages.shift(); while(message) { - var type = typeName(message); - var handler = this._handlers[type]; + var typeId = typeName(message); + var handler = this._handlers[typeId]; if (!handler) - throw new Error("No handler registered for message " + type); + throw new Error("No handler registered for message " + typeId); setImmediate(handler, message); message = this._messages.shift(); } diff --git a/src/core/subscriptionsManager.js b/src/core/subscriptionsManager.js index b7d7948..10a6150 100644 --- a/src/core/subscriptionsManager.js +++ b/src/core/subscriptionsManager.js @@ -42,7 +42,7 @@ SubscriptionsManager.prototype.purgeSubscribedAndDroppedSubscriptions = function var self = this; var subscriptionsToRemove = []; this._activeSubscriptions.forEach(function(_, subscription) { - if (subscription.isSubscribed && subscription.connectionId == connectionId) { + if (subscription.isSubscribed && subscription.connectionId === connectionId) { subscription.operation.connectionClosed(); subscriptionsToRemove.push(subscription); } @@ -60,7 +60,7 @@ SubscriptionsManager.prototype.checkTimeoutsAndRetry = function(connection) { var removeSubscriptions = []; this._activeSubscriptions.forEach(function(_, subscription) { if (subscription.isSubscribed) return; - if (subscription.connectionId != connection.connectionId) + if (subscription.connectionId !== connection.connectionId) { retrySubscriptions.push(subscription); } @@ -165,7 +165,7 @@ SubscriptionsManager.prototype._logDebug = function(message) { if (!this._settings.verboseLogging) return; var parameters = Array.prototype.slice.call(arguments, 1); - this._settings.log.debug("EventStoreConnection '%s': %s.", this._connectionName, parameters.length == 0 ? message : util.format(message, parameters)); + this._settings.log.debug("EventStoreConnection '%s': %s.", this._connectionName, parameters.length === 0 ? message : util.format(message, parameters)); }; module.exports = SubscriptionsManager; \ No newline at end of file diff --git a/src/eventStoreConnection.js b/src/eventStoreConnection.js index a2db720..98b7245 100644 --- a/src/eventStoreConnection.js +++ b/src/eventStoreConnection.js @@ -31,7 +31,7 @@ var defaultConnectionSettings = { // Cluster Settings clusterDns: '', - maxDiscoverAttemps: 10, + maxDiscoverAttempts: 10, externalGossipPort: 0, gossipTimeout: 1000 }; @@ -85,7 +85,7 @@ function createFromGossipSeeds(connectionSettings, gossipSeeds, connectionName) maxDiscoverAttempts: mergedSettings.maxDiscoverAttempts, gossipTimeout: mergedSettings.gossipTimeout }; - var endPointDiscoverer = new ClusterDnsEndPointDiscoverer(connectionSettings.log, + var endPointDiscoverer = new ClusterDnsEndPointDiscoverer(mergedSettings.log, clusterSettings.clusterDns, clusterSettings.maxDiscoverAttempts, clusterSettings.externalGossipPort, diff --git a/src/eventStorePersistentSubscriptionBase.js b/src/eventStorePersistentSubscriptionBase.js index a55cb29..afb9ffa 100644 --- a/src/eventStorePersistentSubscriptionBase.js +++ b/src/eventStorePersistentSubscriptionBase.js @@ -38,6 +38,7 @@ EventStorePersistentSubscriptionBase.prototype.start = function() { this._startSubscription(this._subscriptionId, this._streamId, this._bufferSize, this._userCredentials, this._onEventAppeared.bind(this), this._onSubscriptionDropped.bind(this), this._settings) .then(function(subscription) { + console.log('Subscription started.'); self._subscription = subscription; }); }; @@ -145,7 +146,7 @@ EventStorePersistentSubscriptionBase.prototype._processQueue = function() { e = this._queue.shift(); } this._isProcessing = false; - //} while (_queue.Count > 0 && Interlocked.CompareExchange(ref _isProcessing, 1, 0) == 0); + //} while (_queue.Count > 0 && Interlocked.CompareExchange(ref _isProcessing, 1, 0) === 0); }; EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reason, error) { diff --git a/src/gossipSeed.js b/src/gossipSeed.js index 14bc015..c0b88ba 100644 --- a/src/gossipSeed.js +++ b/src/gossipSeed.js @@ -1,5 +1,5 @@ module.exports = function GossipSeed(endPoint, hostName) { - if (typeof endPoint !== 'object' || !endPoint.hostname || !endPoint.port) throw new TypeError('endPoint must be have hostname and port properties.'); + //if (typeof endPoint !== 'object' || !endPoint.hostname || !endPoint.port) throw new TypeError('endPoint must be have hostname and port properties.'); Object.defineProperties(this, { endPoint: { enumerable: true, diff --git a/src/results.js b/src/results.js index 205de5c..ddaab5c 100644 --- a/src/results.js +++ b/src/results.js @@ -69,7 +69,7 @@ function RecordedEvent(ev) { createdEpoch: {enumerable: true, value: ev.created_epoch ? ev.created_epoch.toInt() : 0}, data: {enumerable: true, value: ev.data ? ev.data.toBuffer() : new Buffer(0)}, metadata: {enumerable: true, value: ev.metadata ? ev.metadata.toBuffer() : new Buffer(0)}, - isJson: {enumerable: true, value: ev.data_content_type == 1} + isJson: {enumerable: true, value: ev.data_content_type === 1} }); } diff --git a/src/systemData/tcpPackage.js b/src/systemData/tcpPackage.js index d0f76d1..c168450 100644 --- a/src/systemData/tcpPackage.js +++ b/src/systemData/tcpPackage.js @@ -29,7 +29,7 @@ TcpPackage.fromBufferSegment = function(data) { var headerSize = MandatorySize; var login = null, pass = null; - if ((flags & TcpFlags.Authenticated) != 0) + if ((flags & TcpFlags.Authenticated) !== 0) { var loginLen = data.buffer[data.offset + AuthOffset]; if (AuthOffset + 1 + loginLen + 1 >= data.count) @@ -48,7 +48,7 @@ TcpPackage.fromBufferSegment = function(data) { }; TcpPackage.prototype.asBuffer = function() { - if ((this.flags & TcpFlags.Authenticated) != 0) { + if ((this.flags & TcpFlags.Authenticated) !== 0) { var loginBytes = new Buffer(this.login); if (loginBytes.length > 255) throw new Error("Login serialized length should be less than 256 bytes."); var passwordBytes = new Buffer(this.password); diff --git a/src/transport/tcp/lengthPrefixMessageFramer.js b/src/transport/tcp/lengthPrefixMessageFramer.js index 625ea6a..85fd8a7 100644 --- a/src/transport/tcp/lengthPrefixMessageFramer.js +++ b/src/transport/tcp/lengthPrefixMessageFramer.js @@ -29,7 +29,7 @@ LengthPrefixMessageFramer.prototype._parse = function(bytes) { { this._packageLength |= (buffer[i] << (this._headerBytes * 8)); // little-endian order ++this._headerBytes; - if (this._headerBytes == HeaderLength) + if (this._headerBytes === HeaderLength) { if (this._packageLength <= 0 || this._packageLength > this._maxPackageSize) throw new Error(["Package size is out of bounds: ", this._packageLength, "(max: ", this._maxPackageSize, "."].join('')); @@ -44,9 +44,9 @@ LengthPrefixMessageFramer.prototype._parse = function(bytes) { this._bufferIndex += copyCnt; i += copyCnt - 1; - if (this._bufferIndex == this._packageLength) + if (this._bufferIndex === this._packageLength) { - if (this._receivedHandler != null) + if (this._receivedHandler !== null) this._receivedHandler(createBufferSegment(this._messageBuffer, 0, this._bufferIndex)); this.reset(); } diff --git a/src/transport/tcp/tcpConnection.js b/src/transport/tcp/tcpConnection.js index d984484..3d5d6b6 100644 --- a/src/transport/tcp/tcpConnection.js +++ b/src/transport/tcp/tcpConnection.js @@ -32,6 +32,7 @@ function TcpConnection(log, connectionId, remoteEndPoint, onConnectionClosed) { TcpConnection.prototype._initSocket = function(socket) { this._socket = socket; this._localEndPoint = {host: socket.localAddress, port: socket.localPort}; + this._remoteEndPoint.host = socket.remoteAddress; this._socket.on('error', this._processError.bind(this)); this._socket.on('data', this._processReceive.bind(this)); @@ -120,13 +121,13 @@ TcpConnection.prototype._closeInternal = function(err, reason) { if (this._closed) return; this._closed = true; - if (this._socket != null) { + if (this._socket !== null) { this._socket.end(); this._socket.unref(); this._socket = null; } - if (this._onConnectionClosed != null) + if (this._onConnectionClosed !== null) this._onConnectionClosed(this, err); }; diff --git a/src/transport/tcp/tcpPackageConnection.js b/src/transport/tcp/tcpPackageConnection.js index 94a9f60..eb5b1b2 100644 --- a/src/transport/tcp/tcpPackageConnection.js +++ b/src/transport/tcp/tcpPackageConnection.js @@ -120,26 +120,26 @@ TcpPackageConnection.prototype._incomingMessageArrived = function(data) { var message = util.format("TcpPackageConnection: [%j, L%j, %s] ERROR for %s. Connection will be closed.", this.remoteEndPoint, this.localEndPoint, this._connectionId, valid ? TcpCommand.getName(pkg.command) : ""); - if (this._onError != null) + if (this._onError !== null) this._onError(this, e); this._log.debug(e, message); } }; TcpPackageConnection.prototype.startReceiving = function() { - if (this._connection == null) + if (this._connection === null) throw new Error("Failed connection."); this._connection.receive(this._onRawDataReceived.bind(this)); }; TcpPackageConnection.prototype.enqueueSend = function(pkg) { - if (this._connection == null) + if (this._connection === null) throw new Error("Failed connection."); this._connection.enqueueSend(this._framer.frameData(pkg.asBufferSegment())); }; TcpPackageConnection.prototype.close = function(reason) { - if (this._connection == null) + if (this._connection === null) throw new Error("Failed connection."); this._connection.close(reason); }; diff --git a/test/connection_test.js b/test/connection_test.js index b237aea..6673f65 100644 --- a/test/connection_test.js +++ b/test/connection_test.js @@ -31,7 +31,7 @@ module.exports = { test.done(err); }); conn.on('connected', function () { - test.fail("Should not be able to connect."); + test.ok(false, "Should not be able to connect."); test.done(); }); conn.on('error', function (err) { @@ -41,7 +41,7 @@ module.exports = { test.ok(reason.indexOf("Reconnection limit reached") === 0, "Wrong expected reason."); test.done(); }); - }, + }/*, 'Connect to Cluster using gossip seeds': function (test) { test.expect(1); var gossipSeeds = [ @@ -65,7 +65,7 @@ module.exports = { if (err) return test.done(err); test.done(); } - } + }*/ }; testBase.init(module.exports, false); \ No newline at end of file diff --git a/test/persistentSubscription_test.js b/test/persistentSubscription_test.js index 3d76d07..e1bd5cc 100644 --- a/test/persistentSubscription_test.js +++ b/test/persistentSubscription_test.js @@ -7,7 +7,7 @@ function createRandomEvent() { return client.createJsonEventData(uuid.v4(), {a: uuid.v4(), b: Math.random()}, {createdAt: Date.now()}, 'testEvent'); } -var testStreamName = 'test' + uuid.v4(); +var testStreamName = 'test-' + uuid.v4(); module.exports = { 'Test Create Persistent Subscription': function(test) { @@ -29,7 +29,6 @@ module.exports = { test.done(error); } var subscription = this.conn.connectToPersistentSubscription(testStreamName, 'consumer-1', eventAppeared, subscriptionDropped); - this.log.info('ABC', subscription); this.conn.appendToStream(testStreamName, client.expectedVersion.any, [createRandomEvent()]); }, 'Test Delete Persistent Subscription': function(test) {