Performance improvement by using strict equality, fixed heartbeat issue in connection stage

This commit is contained in:
Nicolas Dextraze
2016-10-17 21:58:28 -07:00
parent dd1302f641
commit f951a625f4
22 changed files with 107 additions and 98 deletions

View File

@ -38,7 +38,7 @@ ClusterDnsEndPointDiscoverer.prototype.discover = function(failedTcpEndPoint) {
return endPoints;
})
.catch(function (exc) {
self._log.info(util.format("Discovering attempt %d/%d failed with error: %s.", attempt, self._maxDiscoverAttempts, exc));
self._log.info(util.format("Discovering attempt %d/%d failed with error: %s.\n%s", attempt, self._maxDiscoverAttempts, exc, exc.stack));
})
.then(function (endPoints) {
if (endPoints)
@ -67,6 +67,7 @@ ClusterDnsEndPointDiscoverer.prototype._discoverEndPoint = function (failedTcpEn
var self = this;
var promise = Promise.resolve();
var j = 0;
self._log.debug('Gossip candidates', gossipCandidates);
for (var i = 0; i < gossipCandidates.length; i++) {
promise = promise.then(function (endPoints) {
if (endPoints) return endPoints;
@ -90,10 +91,9 @@ ClusterDnsEndPointDiscoverer.prototype._discoverEndPoint = function (failedTcpEn
};
ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromOldGossip = function (oldGossip, failedTcpEndPoint) {
if (failedTcpEndPoint === null) return oldGossip;
if (failedTcpEndPoint === null) return this._arrangeGossipCandidates(oldGossip);
var gossipCandidates = oldGossip.filter(function(x) {
//TODO: failedTcpEndpoint.host might not be an ip
return (x.externalTcpPort !== failedTcpEndPoint.port && x.externalTcpIp !== failedTcpEndPoint.host);
return !(x.externalTcpPort === failedTcpEndPoint.port && x.externalTcpIp === failedTcpEndPoint.host);
});
return this._arrangeGossipCandidates(gossipCandidates);
};
@ -133,6 +133,7 @@ ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromDns = function ()
ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) {
var options = {
host: endPoint.endPoint.host,
hostname: endPoint.endPoint.hostname,
port: endPoint.endPoint.port,
path: '/gossip?format=json'
@ -140,6 +141,7 @@ ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) {
if (endPoint.hostHeader) {
options.headers = {'Host': endPoint.hostHeader};
}
this._log.info('Try get gossip from', endPoint);
var self = this;
return new Promise(function (resolve, reject) {
try {
@ -215,7 +217,7 @@ ClusterDnsEndPointDiscoverer.prototype._tryDetermineBestNode = function (members
var secTcp = node.externalSecureTcpPort > 0
? {host: externalTcpIp, port: node.externalSecureTcpPort}
: null;
this._log.info(util.format("Discovering: found best choice [%j,%j] (%s).", normTcp, secTcp == null ? "n/a" : secTcp, node.state));
this._log.info(util.format("Discovering: found best choice [%j,%j] (%s).", normTcp, secTcp === null ? "n/a" : secTcp, node.state));
return new NodeEndPoints(normTcp, secTcp);
};

View File

@ -165,7 +165,7 @@ EventStoreConnectionLogicHandler.prototype._startConnection = function(cb, endpo
* @private
*/
EventStoreConnectionLogicHandler.prototype._closeConnection = function(reason, error) {
if (this._state == ConnectionState.Closed) {
if (this._state === ConnectionState.Closed) {
this._logDebug("CloseConnection IGNORED because is ESConnection is CLOSED, reason %s, error %s.", reason, error ? error.stack : '');
return;
}
@ -189,7 +189,7 @@ EventStoreConnectionLogicHandler.prototype._closeConnection = function(reason, e
EventStoreConnectionLogicHandler.prototype._closeTcpConnection = function(reason) {
if (!this._connection) {
this._logDebug("CloseTcpConnection IGNORED because _connection == null");
this._logDebug("CloseTcpConnection IGNORED because _connection === null");
return;
}
@ -320,7 +320,7 @@ EventStoreConnectionLogicHandler.prototype._startPersistentSubscription = functi
EventStoreConnectionLogicHandler.prototype._establishTcpConnection = function(endPoints) {
var endPoint = this._settings.useSslConnection ? endPoints.secureTcpEndPoint : endPoints.tcpEndPoint;
if (endPoint == null)
if (endPoint === null)
{
this._closeConnection("No end point to node specified.");
return;
@ -328,8 +328,8 @@ EventStoreConnectionLogicHandler.prototype._establishTcpConnection = function(en
this._logDebug("EstablishTcpConnection to [%j]", endPoint);
if (this._state != ConnectionState.Connecting) return;
if (this._connectingPhase != ConnectingPhase.EndPointDiscovery) return;
if (this._state !== ConnectionState.Connecting) return;
if (this._connectingPhase !== ConnectingPhase.EndPointDiscovery) return;
var self = this;
this._connectingPhase = ConnectingPhase.ConnectionEstablishing;
@ -358,10 +358,10 @@ EventStoreConnectionLogicHandler.prototype._establishTcpConnection = function(en
};
EventStoreConnectionLogicHandler.prototype._tcpConnectionEstablished = function(connection) {
if (this._state != ConnectionState.Connecting || !this._connection.equals(connection) || connection.isClosed)
if (this._state !== ConnectionState.Connecting || !this._connection.equals(connection) || connection.isClosed)
{
this._logDebug("IGNORED (_state %s, _conn.Id %s, conn.Id %s, conn.closed %s): TCP connection to [%j, L%j] established.",
this._state, this._connection == null ? EmptyGuid : this._connection.connectionId, connection.connectionId,
this._state, this._connection === null ? EmptyGuid : this._connection.connectionId, connection.connectionId,
connection.isClosed, connection.remoteEndPoint, connection.localEndPoint);
return;
}
@ -373,7 +373,7 @@ EventStoreConnectionLogicHandler.prototype._tcpConnectionEstablished = function(
timeStamp: Date.now()
};
if (this._settings.defaultUserCredentials != null)
if (this._settings.defaultUserCredentials !== null)
{
this._connectingPhase = ConnectingPhase.Authentication;
@ -402,7 +402,7 @@ EventStoreConnectionLogicHandler.prototype._goToConnectedState = function() {
this.emit('connected', this._connection.remoteEndPoint);
if (Date.now() - this._lastTimeoutsTimeStamp >= this._settings.operationTimeoutCheckPeriod)
if ((Date.now() - this._lastTimeoutsTimeStamp) >= this._settings.operationTimeoutCheckPeriod)
{
this._operations.checkTimeoutsAndRetry(this._connection);
this._subscriptions.checkTimeoutsAndRetry(this._connection);
@ -411,19 +411,19 @@ EventStoreConnectionLogicHandler.prototype._goToConnectedState = function() {
};
EventStoreConnectionLogicHandler.prototype._tcpConnectionError = function(connection, error) {
if (this._connection != connection) return;
if (this._state == ConnectionState.Closed) return;
if (!this._connection.equals(connection)) return;
if (this._state === ConnectionState.Closed) return;
this._logDebug("TcpConnectionError connId %s, exc %s.", connection.connectionId, error);
this._closeConnection("TCP connection error occurred.", error);
};
EventStoreConnectionLogicHandler.prototype._tcpConnectionClosed = function(connection, error) {
if (this._state == ConnectionState.Init) throw new Error();
if (this._state == ConnectionState.Closed || !this._connection.equals(connection))
if (this._state === ConnectionState.Init) throw new Error();
if (this._state === ConnectionState.Closed || !this._connection.equals(connection))
{
this._logDebug("IGNORED (_state: %s, _conn.ID: %s, conn.ID: %s): TCP connection to [%j, L%j] closed.",
this._state, this._connection == null ? EmptyGuid : this._connection.connectionId, connection.connectionId,
this._state, this._connection === null ? EmptyGuid : this._connection.connectionId, connection.connectionId,
connection.remoteEndPoint, connection.localEndPoint);
return;
}
@ -447,7 +447,7 @@ EventStoreConnectionLogicHandler.prototype._tcpConnectionClosed = function(conne
};
EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connection, pkg) {
if (!connection.equals(this._connection) || this._state == ConnectionState.Closed || this._state == ConnectionState.Init)
if (!connection.equals(this._connection) || this._state === ConnectionState.Closed || this._state === ConnectionState.Init)
{
this._logDebug("IGNORED: HandleTcpPackage connId %s, package %s, %s.",
connection.connectionId, TcpCommand.getName(pkg.command), pkg.correlationId);
@ -458,9 +458,9 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti
this._connection.connectionId, TcpCommand.getName(pkg.command), pkg.correlationId);
this._packageNumber += 1;
if (pkg.command == TcpCommand.HeartbeatResponseCommand)
if (pkg.command === TcpCommand.HeartbeatResponseCommand)
return;
if (pkg.command == TcpCommand.HeartbeatRequestCommand)
if (pkg.command === TcpCommand.HeartbeatRequestCommand)
{
this._connection.enqueueSend(new TcpPackage(
TcpCommand.HeartbeatResponseCommand,
@ -469,13 +469,13 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti
return;
}
if (pkg.command == TcpCommand.Authenticated || pkg.command == TcpCommand.NotAuthenticated)
if (pkg.command === TcpCommand.Authenticated || pkg.command === TcpCommand.NotAuthenticated)
{
if (this._state == ConnectionState.Connecting
&& this._connectingPhase == ConnectingPhase.Authentication
&& this._authInfo.correlationId == pkg.correlationId)
if (this._state === ConnectionState.Connecting
&& this._connectingPhase === ConnectingPhase.Authentication
&& this._authInfo.correlationId === pkg.correlationId)
{
if (pkg.command == TcpCommand.NotAuthenticated)
if (pkg.command === TcpCommand.NotAuthenticated)
this.emit('authenticationFailed', "Not authenticated");
this._goToConnectedState();
@ -483,7 +483,7 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti
}
}
if (pkg.command == TcpCommand.BadRequest && pkg.correlationId == EmptyGuid)
if (pkg.command === TcpCommand.BadRequest && pkg.correlationId === EmptyGuid)
{
var message = "<no message>";
try {
@ -515,7 +515,7 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti
default:
throw new Error("Unknown InspectionDecision: " + result.decision);
}
if (this._state == ConnectionState.Connected)
if (this._state === ConnectionState.Connected)
this._operations.scheduleWaitingOperations(connection);
return;
@ -557,13 +557,13 @@ EventStoreConnectionLogicHandler.prototype._reconnectTo = function(endPoints) {
var endPoint = this._settings.useSslConnection
? endPoints.secureTcpEndPoint
: endPoints.tcpEndPoint;
if (endPoint == null)
if (endPoint === null)
{
this._closeConnection("No end point is specified while trying to reconnect.");
return;
}
if (this._state != ConnectionState.Connected || this._connection.remoteEndPoint == endPoint)
if (this._state !== ConnectionState.Connected || this._connection.remoteEndPoint === endPoint)
return;
var msg = util.format("EventStoreConnection '%s': going to reconnect to [%j]. Current endpoint: [%j, L%j].",
@ -581,7 +581,7 @@ EventStoreConnectionLogicHandler.prototype._timerTick = function() {
{
case ConnectionState.Init: break;
case ConnectionState.Connecting:
if (this._connectingPhase == ConnectingPhase.Reconnecting && Date.now() - this._reconnInfo.timeStamp >= this._settings.reconnectionDelay)
if (this._connectingPhase === ConnectingPhase.Reconnecting && (Date.now() - this._reconnInfo.timeStamp) >= this._settings.reconnectionDelay)
{
this._logDebug("TimerTick checking reconnection...");
@ -594,17 +594,17 @@ EventStoreConnectionLogicHandler.prototype._timerTick = function() {
this._discoverEndpoint(null);
}
}
else if (this._connectingPhase == ConnectingPhase.Authentication && Date.now() - this._authInfo.timeStamp >= this._settings.operationTimeout)
else if (this._connectingPhase === ConnectingPhase.Authentication && (Date.now() - this._authInfo.timeStamp) >= this._settings.operationTimeout)
{
this.emit('authenticationFailed', "Authentication timed out.");
this._goToConnectedState();
}
else if (this._connectingPhase > ConnectingPhase.ConnectionEstablishing)
else if (this._connectingPhase === ConnectingPhase.Authentication || this._connectingPhase === ConnectingPhase.Connected)
this._manageHeartbeats();
break;
case ConnectionState.Connected:
// operations timeouts are checked only if connection is established and check period time passed
if (Date.now() - this._lastTimeoutsTimeStamp >= this._settings.operationTimeoutCheckPeriod)
if ((Date.now() - this._lastTimeoutsTimeStamp) >= this._settings.operationTimeoutCheckPeriod)
{
// On mono even impossible connection first says that it is established
// so clearing of reconnection count on ConnectionEstablished event causes infinite reconnections.
@ -624,14 +624,14 @@ EventStoreConnectionLogicHandler.prototype._timerTick = function() {
};
EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() {
if (this._connection == null) return;
if (this._connection === null) return;
var timeout = this._heartbeatInfo.isIntervalStage ? this._settings.heartbeatInterval : this._settings.heartbeatTimeout;
if (Date.now() - this._heartbeatInfo.timeStamp < timeout)
if ((Date.now() - this._heartbeatInfo.timeStamp) < timeout)
return;
var packageNumber = this._packageNumber;
if (this._heartbeatInfo.lastPackageNumber != packageNumber)
if (this._heartbeatInfo.lastPackageNumber !== packageNumber)
{
this._heartbeatInfo = {lastPackageNumber: packageNumber, isIntervalStage: true, timeStamp: Date.now()};
return;
@ -649,10 +649,9 @@ EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() {
else
{
// TcpMessage.HeartbeatTimeout analog
var msg = util.format("EventStoreConnection '%s': closing TCP connection [%j, L%j, %s] due to HEARTBEAT TIMEOUT at pkgNum %d.",
this._logInfo("EventStoreConnection '%s': closing TCP connection [%j, L%j, %s] due to HEARTBEAT TIMEOUT at pkgNum %d.",
this._esConnection.connectionName, this._connection.remoteEndPoint, this._connection.localEndPoint,
this._connection.connectionId, packageNumber);
this._settings.log.info(msg);
this._closeTcpConnection(msg);
}
};

View File

@ -55,7 +55,7 @@ OperationsManager.prototype.checkTimeoutsAndRetry = function(connection) {
var removeOperations = [];
var self = this;
this._activeOperations.forEach(function(correlationId, operation) {
if (operation.connectionId != connection.connectionId)
if (operation.connectionId !== connection.connectionId)
{
retryOperations.push(operation);
}

View File

@ -13,8 +13,14 @@ function SimpleQueuedHandler() {
}
SimpleQueuedHandler.prototype.registerHandler = function(type, handler) {
type = typeName(type);
this._handlers[type] = handler;
var typeId = typeName(type);
this._handlers[typeId] = function (msg) {
try {
handler(msg);
} catch(e) {
console.log('ERROR: ', e);
}
};
};
SimpleQueuedHandler.prototype.enqueueMessage = function(msg) {
@ -28,10 +34,10 @@ SimpleQueuedHandler.prototype.enqueueMessage = function(msg) {
SimpleQueuedHandler.prototype._processQueue = function() {
var message = this._messages.shift();
while(message) {
var type = typeName(message);
var handler = this._handlers[type];
var typeId = typeName(message);
var handler = this._handlers[typeId];
if (!handler)
throw new Error("No handler registered for message " + type);
throw new Error("No handler registered for message " + typeId);
setImmediate(handler, message);
message = this._messages.shift();
}

View File

@ -42,7 +42,7 @@ SubscriptionsManager.prototype.purgeSubscribedAndDroppedSubscriptions = function
var self = this;
var subscriptionsToRemove = [];
this._activeSubscriptions.forEach(function(_, subscription) {
if (subscription.isSubscribed && subscription.connectionId == connectionId) {
if (subscription.isSubscribed && subscription.connectionId === connectionId) {
subscription.operation.connectionClosed();
subscriptionsToRemove.push(subscription);
}
@ -60,7 +60,7 @@ SubscriptionsManager.prototype.checkTimeoutsAndRetry = function(connection) {
var removeSubscriptions = [];
this._activeSubscriptions.forEach(function(_, subscription) {
if (subscription.isSubscribed) return;
if (subscription.connectionId != connection.connectionId)
if (subscription.connectionId !== connection.connectionId)
{
retrySubscriptions.push(subscription);
}
@ -165,7 +165,7 @@ SubscriptionsManager.prototype._logDebug = function(message) {
if (!this._settings.verboseLogging) return;
var parameters = Array.prototype.slice.call(arguments, 1);
this._settings.log.debug("EventStoreConnection '%s': %s.", this._connectionName, parameters.length == 0 ? message : util.format(message, parameters));
this._settings.log.debug("EventStoreConnection '%s': %s.", this._connectionName, parameters.length === 0 ? message : util.format(message, parameters));
};
module.exports = SubscriptionsManager;