Initial commit
This commit is contained in:
646
src/core/eventStoreConnectionLogicHandler.js
Normal file
646
src/core/eventStoreConnectionLogicHandler.js
Normal file
@ -0,0 +1,646 @@
|
||||
var util = require('util');
|
||||
var uuid = require('uuid');
|
||||
var EventEmitter = require('events').EventEmitter;
|
||||
|
||||
var SimpleQueuedHandler = require('./simpleQueuedHandler');
|
||||
var TcpPackageConnection = require('../transport/tcp/tcpPackageConnection');
|
||||
var OperationsManager = require('./operationsManager');
|
||||
var SubscriptionsManager = require('./subscriptionsManager');
|
||||
var VolatileSubscriptionOperation = require('../clientOperations/volatileSubscriptionOperation');
|
||||
var messages = require('./messages');
|
||||
|
||||
var TcpPackage = require('../systemData/tcpPackage');
|
||||
var TcpCommand = require('../systemData/tcpCommand');
|
||||
var TcpFlags = require('../systemData/tcpFlags');
|
||||
var InspectionDecision = require('../systemData/inspectionDecision');
|
||||
|
||||
const ConnectionState = {
|
||||
Init: 'init',
|
||||
Connecting: 'connecting',
|
||||
Connected: 'connected',
|
||||
Closed: 'closed'
|
||||
};
|
||||
|
||||
const ConnectingPhase = {
|
||||
Invalid: 'invalid',
|
||||
Reconnecting: 'reconnecting',
|
||||
EndPointDiscovery: 'endpointDiscovery',
|
||||
ConnectionEstablishing: 'connectionEstablishing',
|
||||
Authentication: 'authentication',
|
||||
Connected: 'connected'
|
||||
};
|
||||
|
||||
const TimerPeriod = 200;
|
||||
const TimerTickMessage = new messages.TimerTickMessage();
|
||||
const EmptyGuid = '00000000-0000-0000-0000-000000000000';
|
||||
|
||||
/**
|
||||
* @param {EventStoreNodeConnection} esConnection
|
||||
* @param {Object} settings
|
||||
* @constructor
|
||||
* @property {Number} totalOperationCount
|
||||
*/
|
||||
function EventStoreConnectionLogicHandler(esConnection, settings) {
|
||||
this._esConnection = esConnection;
|
||||
this._settings = settings;
|
||||
this._queue = new SimpleQueuedHandler();
|
||||
this._state = ConnectionState.Init;
|
||||
this._connectingPhase = ConnectingPhase.Invalid;
|
||||
this._endpointDiscoverer = null;
|
||||
this._connection = null;
|
||||
this._wasConnected = false;
|
||||
this._packageNumber = 0;
|
||||
this._authInfo = null;
|
||||
this._lastTimeoutsTimeStamp = 0;
|
||||
|
||||
this._operations = new OperationsManager(esConnection.connectionName, settings);
|
||||
this._subscriptions = new SubscriptionsManager(esConnection.connectionName, settings);
|
||||
|
||||
var self = this;
|
||||
this._queue.registerHandler(messages.StartConnectionMessage, function(msg) {
|
||||
self._startConnection(msg.cb, msg.endpointDiscoverer);
|
||||
});
|
||||
this._queue.registerHandler(messages.CloseConnectionMessage, function(msg) {
|
||||
self._closeConnection(msg.reason, msg.error);
|
||||
});
|
||||
|
||||
this._queue.registerHandler(messages.StartOperationMessage, function(msg) {
|
||||
self._startOperation(msg.operation, msg.maxRetries, msg.timeout);
|
||||
});
|
||||
this._queue.registerHandler(messages.StartSubscriptionMessage, function(msg) {
|
||||
self._startSubscription(msg);
|
||||
});
|
||||
|
||||
this._queue.registerHandler(messages.EstablishTcpConnectionMessage, function(msg) {
|
||||
self._establishTcpConnection(msg.endPoints);
|
||||
});
|
||||
this._queue.registerHandler(messages.TcpConnectionEstablishedMessage, function(msg) {
|
||||
self._tcpConnectionEstablished(msg.connection);
|
||||
});
|
||||
this._queue.registerHandler(messages.TcpConnectionErrorMessage, function(msg) {
|
||||
self._tcpConnectionError(msg.connection, msg.error);
|
||||
});
|
||||
this._queue.registerHandler(messages.TcpConnectionClosedMessage, function(msg) {
|
||||
self._tcpConnectionClosed(msg.connection, msg.error);
|
||||
});
|
||||
this._queue.registerHandler(messages.HandleTcpPackageMessage, function(msg) {
|
||||
self._handleTcpPackage(msg.connection, msg.package);
|
||||
});
|
||||
|
||||
this._queue.registerHandler(messages.TimerTickMessage, function(msg) {
|
||||
self._timerTick();
|
||||
});
|
||||
|
||||
this._timer = setInterval(function() {
|
||||
self.enqueueMessage(TimerTickMessage);
|
||||
}, TimerPeriod);
|
||||
}
|
||||
util.inherits(EventStoreConnectionLogicHandler, EventEmitter);
|
||||
|
||||
Object.defineProperty(EventStoreConnectionLogicHandler.prototype, 'totalOperationCount', {
|
||||
get: function() {
|
||||
return this._operations.totalOperationCount;
|
||||
}
|
||||
});
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype.enqueueMessage = function(msg) {
|
||||
if (this._settings.verboseLogging && msg !== TimerTickMessage) this._logDebug("enqueuing message %s.", msg);
|
||||
this._queue.enqueueMessage(msg);
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._discoverEndpoint = function(cb) {
|
||||
this._logDebug('DiscoverEndpoint');
|
||||
|
||||
if (this._state != ConnectionState.Connecting) return;
|
||||
if (this._connectingPhase != ConnectingPhase.Reconnecting) return;
|
||||
|
||||
this._connectingPhase = ConnectingPhase.EndPointDiscovery;
|
||||
|
||||
cb = cb || function() {};
|
||||
|
||||
var self = this;
|
||||
this._endpointDiscoverer.discover(this._connection != null ? this._connection.remoteEndPoint : null)
|
||||
.then(function(nodeEndpoints){
|
||||
self.enqueueMessage(new messages.EstablishTcpConnectionMessage(nodeEndpoints));
|
||||
cb();
|
||||
})
|
||||
.catch(function(err) {
|
||||
self.enqueueMessage(new messages.CloseConnectionMessage("Failed to resolve TCP end point to which to connect.", err));
|
||||
cb(new Error("Couldn't resolve target end point: " + err.message));
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* @param {Function} cb
|
||||
* @param {StaticEndpointDiscoverer} endpointDiscoverer
|
||||
* @private
|
||||
*/
|
||||
EventStoreConnectionLogicHandler.prototype._startConnection = function(cb, endpointDiscoverer) {
|
||||
this._logDebug('StartConnection');
|
||||
|
||||
switch(this._state) {
|
||||
case ConnectionState.Init:
|
||||
this._endpointDiscoverer = endpointDiscoverer;
|
||||
this._state = ConnectionState.Connecting;
|
||||
this._connectingPhase = ConnectingPhase.Reconnecting;
|
||||
this._discoverEndpoint(cb);
|
||||
break;
|
||||
case ConnectionState.Connecting:
|
||||
case ConnectionState.Connected:
|
||||
return cb(new Error(['EventStoreConnection', this._esConnection.connectionName, 'is already active.'].join(' ')));
|
||||
case ConnectionState.Closed:
|
||||
return cb(new Error(['EventStoreConnection', this._esConnection.connectionName, 'is closed.'].join(' ')));
|
||||
default:
|
||||
return cb(new Error(['Unknown state:', this._state].join(' ')));
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @param {string} reason
|
||||
* @param {Error} [error]
|
||||
* @private
|
||||
*/
|
||||
EventStoreConnectionLogicHandler.prototype._closeConnection = function(reason, error) {
|
||||
if (this._state == ConnectionState.Closed) {
|
||||
this._logDebug("CloseConnection IGNORED because is ESConnection is CLOSED, reason %s, error %s.", reason, error ? error.stack : '');
|
||||
return;
|
||||
}
|
||||
|
||||
this._logDebug("CloseConnection, reason %s, error %s.", reason, error ? error.stack : '');
|
||||
|
||||
this._state = ConnectionState.Closed;
|
||||
|
||||
clearInterval(this._timer);
|
||||
this._operations.cleanUp();
|
||||
this._subscriptions.cleanUp();
|
||||
this._closeTcpConnection(reason);
|
||||
|
||||
this._logInfo("Closed. Reason: %s", reason);
|
||||
|
||||
if (error)
|
||||
this.emit('error', error);
|
||||
|
||||
this.emit('closed', reason);
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._closeTcpConnection = function(reason) {
|
||||
if (!this._connection) {
|
||||
this._logDebug("CloseTcpConnection IGNORED because _connection == null");
|
||||
return;
|
||||
}
|
||||
|
||||
this._logDebug("CloseTcpConnection");
|
||||
this._connection.close(reason);
|
||||
this._tcpConnectionClosed(this._connection);
|
||||
this._connection = null;
|
||||
};
|
||||
|
||||
var _nextSeqNo = -1;
|
||||
function createOperationItem(operation, maxRetries, timeout) {
|
||||
var operationItem = {
|
||||
seqNo: _nextSeqNo++,
|
||||
operation: operation,
|
||||
maxRetries: maxRetries,
|
||||
timeout: timeout,
|
||||
createdTime: Date.now(),
|
||||
correlationId: uuid.v4(),
|
||||
retryCount: 0,
|
||||
lastUpdated: Date.now()
|
||||
};
|
||||
operationItem.toString = (function() {
|
||||
return util.format("Operation %s (%s): %s, retry count: %d, created: %s, last updated: %s",
|
||||
this.operation.constructor.name, this.correlationId, this.operation, this.retryCount,
|
||||
new Date(this.createdTime).toISOString().substr(11,12),
|
||||
new Date(this.lastUpdated).toISOString().substr(11,12));
|
||||
}).bind(operationItem);
|
||||
return operationItem;
|
||||
}
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._startOperation = function(operation, maxRetries, timeout) {
|
||||
switch(this._state) {
|
||||
case ConnectionState.Init:
|
||||
operation.fail(new Error("EventStoreConnection '" + this._esConnection.connectionName + "' is not active."));
|
||||
break;
|
||||
case ConnectionState.Connecting:
|
||||
this._logDebug("StartOperation enqueue %s, %s, %d, %d.", operation.constructor.name, operation, maxRetries, timeout);
|
||||
this._operations.enqueueOperation(createOperationItem(operation, maxRetries, timeout));
|
||||
break;
|
||||
case ConnectionState.Connected:
|
||||
this._logDebug("StartOperation schedule %s, %s, %d, %d.", operation.constructor.name, operation, maxRetries, timeout);
|
||||
this._operations.scheduleOperation(createOperationItem(operation, maxRetries, timeout), this._connection);
|
||||
break;
|
||||
case ConnectionState.Closed:
|
||||
operation.fail(new Error("EventStoreConnection '" + this._esConnection.connectionName + "' is closed."));
|
||||
break;
|
||||
default:
|
||||
throw new Error("Unknown state: " + this._state + '.');
|
||||
}
|
||||
};
|
||||
|
||||
function createSubscriptionItem(operation, maxRetries, timeout) {
|
||||
var subscriptionItem = {
|
||||
operation: operation,
|
||||
maxRetries: maxRetries,
|
||||
timeout: timeout,
|
||||
createdTime: Date.now(),
|
||||
correlationId: uuid.v4(),
|
||||
retryCount: 0,
|
||||
lastUpdated: Date.now(),
|
||||
isSubscribed: false
|
||||
};
|
||||
subscriptionItem.toString = (function(){
|
||||
return util.format("Subscription %s (%s): %s, is subscribed: %s, retry count: %d, created: %d, last updated: %d",
|
||||
this.operation.constructor.name, this.correlationId, this.operation, this.isSubscribed, this.retryCount,
|
||||
new Date(this.createdTime).toISOString().substr(11,12),
|
||||
new Date(this.lastUpdated).toISOString().substr(11,12));
|
||||
}).bind(subscriptionItem);
|
||||
return subscriptionItem;
|
||||
}
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._startSubscription = function(msg) {
|
||||
switch (this._state)
|
||||
{
|
||||
case ConnectionState.Init:
|
||||
msg.cb(new Error(util.format("EventStoreConnection '%s' is not active.", this._esConnection.connectionName)));
|
||||
break;
|
||||
case ConnectionState.Connecting:
|
||||
case ConnectionState.Connected:
|
||||
var self = this;
|
||||
var operation = new VolatileSubscriptionOperation(this._settings.log, msg.cb, msg.streamId, msg.resolveLinkTos,
|
||||
msg.userCredentials, msg.eventAppeared, msg.subscriptionDropped,
|
||||
this._settings.verboseLogging, function() { return self._connection });
|
||||
this._logDebug("StartSubscription %s %s, %s, %d, %d.", operation.constructor.name, operation, msg.maxRetries, msg.timeout, this._state === ConnectionState.Connected ? "fire" : "enqueue");
|
||||
var subscription = createSubscriptionItem(operation, msg.maxRetries, msg.timeout);
|
||||
if (this._state === ConnectionState.Connecting)
|
||||
this._subscriptions.enqueueSubscription(subscription);
|
||||
else
|
||||
this._subscriptions.startSubscription(subscription, this._connection);
|
||||
break;
|
||||
case ConnectionState.Closed:
|
||||
msg.cb(new Error("Connection closed. Connection: " + this._esConnection.connectionName));
|
||||
break;
|
||||
default:
|
||||
throw new Error(util.format("Unknown state: %s.", this._state));
|
||||
}
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._establishTcpConnection = function(endPoints) {
|
||||
var endPoint = this._settings.useSslConnection ? endPoints.secureTcpEndPoint : endPoints.tcpEndPoint;
|
||||
if (endPoint == null)
|
||||
{
|
||||
this._closeConnection("No end point to node specified.");
|
||||
return;
|
||||
}
|
||||
|
||||
this._logDebug("EstablishTcpConnection to [%j]", endPoint);
|
||||
|
||||
if (this._state != ConnectionState.Connecting) return;
|
||||
if (this._connectingPhase != ConnectingPhase.EndPointDiscovery) return;
|
||||
|
||||
var self = this;
|
||||
this._connectingPhase = ConnectingPhase.ConnectionEstablishing;
|
||||
this._connection = new TcpPackageConnection(
|
||||
this._settings.log,
|
||||
endPoint,
|
||||
uuid.v4(),
|
||||
this._settings.useSslConnection,
|
||||
this._settings.targetHost,
|
||||
this._settings.validateServer,
|
||||
this._settings.clientConnectionTimeout,
|
||||
function(connection, pkg) {
|
||||
self.enqueueMessage(new messages.HandleTcpPackageMessage(connection, pkg));
|
||||
},
|
||||
function(connection, error) {
|
||||
self.enqueueMessage(new messages.TcpConnectionErrorMessage(connection, error));
|
||||
},
|
||||
function(connection) {
|
||||
connection.startReceiving();
|
||||
self.enqueueMessage(new messages.TcpConnectionEstablishedMessage(connection));
|
||||
},
|
||||
function(connection, error) {
|
||||
self.enqueueMessage(new messages.TcpConnectionClosedMessage(connection, error));
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._tcpConnectionEstablished = function(connection) {
|
||||
if (this._state != ConnectionState.Connecting || !this._connection.equals(connection) || connection.isClosed)
|
||||
{
|
||||
this._logDebug("IGNORED (_state %s, _conn.Id %s, conn.Id %s, conn.closed %s): TCP connection to [%j, L%j] established.",
|
||||
this._state, this._connection == null ? EmptyGuid : this._connection.connectionId, connection.connectionId,
|
||||
connection.isClosed, connection.remoteEndPoint, connection.localEndPoint);
|
||||
return;
|
||||
}
|
||||
|
||||
this._logDebug("TCP connection to [%j, L%j, %s] established.", connection.remoteEndPoint, connection.localEndPoint, connection.connectionId);
|
||||
this._heartbeatInfo = {
|
||||
lastPackageNumber: this._packageNumber,
|
||||
isIntervalStage: true,
|
||||
timeStamp: Date.now()
|
||||
};
|
||||
|
||||
if (this._settings.defaultUserCredentials != null)
|
||||
{
|
||||
this._connectingPhase = ConnectingPhase.Authentication;
|
||||
|
||||
this._authInfo = {
|
||||
correlationId: uuid.v4(),
|
||||
timeStamp: Date.now()
|
||||
};
|
||||
this._connection.enqueueSend(new TcpPackage(
|
||||
TcpCommand.Authenticate,
|
||||
TcpFlags.Authenticated,
|
||||
this._authInfo.correlationId,
|
||||
this._settings.defaultUserCredentials.username,
|
||||
this._settings.defaultUserCredentials.password));
|
||||
}
|
||||
else
|
||||
{
|
||||
this._goToConnectedState();
|
||||
}
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._goToConnectedState = function() {
|
||||
this._state = ConnectionState.Connected;
|
||||
this._connectingPhase = ConnectingPhase.Connected;
|
||||
|
||||
this._wasConnected = true;
|
||||
|
||||
this.emit('connected', this._connection.remoteEndPoint);
|
||||
|
||||
if (Date.now() - this._lastTimeoutsTimeStamp >= this._settings.operationTimeoutCheckPeriod)
|
||||
{
|
||||
this._operations.checkTimeoutsAndRetry(this._connection);
|
||||
this._subscriptions.checkTimeoutsAndRetry(this._connection);
|
||||
this._lastTimeoutsTimeStamp = Date.now();
|
||||
}
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._tcpConnectionError = function(connection, error) {
|
||||
if (this._connection != connection) return;
|
||||
if (this._state == ConnectionState.Closed) return;
|
||||
|
||||
this._logDebug("TcpConnectionError connId %s, exc %s.", connection.connectionId, error);
|
||||
this._closeConnection("TCP connection error occurred.", error);
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._tcpConnectionClosed = function(connection, error) {
|
||||
if (this._state == ConnectionState.Init) throw new Error();
|
||||
if (this._state == ConnectionState.Closed || !this._connection.equals(connection))
|
||||
{
|
||||
this._logDebug("IGNORED (_state: %s, _conn.ID: %s, conn.ID: %s): TCP connection to [%j, L%j] closed.",
|
||||
this._state, this._connection == null ? EmptyGuid : this._connection.connectionId, connection.connectionId,
|
||||
connection.remoteEndPoint, connection.localEndPoint);
|
||||
return;
|
||||
}
|
||||
|
||||
this._state = ConnectionState.Connecting;
|
||||
this._connectingPhase = ConnectingPhase.Reconnecting;
|
||||
|
||||
this._logDebug("TCP connection to [%j, L%j, %s] closed.", connection.remoteEndPoint, connection.localEndPoint, connection.connectionId);
|
||||
|
||||
this._subscriptions.purgeSubscribedAndDroppedSubscriptions(this._connection.connectionId);
|
||||
this._reconnInfo = {
|
||||
reconnectionAttempt: this._reconnInfo ? this._reconnInfo.reconnectionAttempt : 0,
|
||||
timeStamp: Date.now()
|
||||
};
|
||||
|
||||
if (this._wasConnected)
|
||||
{
|
||||
this._wasConnected = false;
|
||||
this.emit('disconnected', connection.remoteEndPoint);
|
||||
}
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connection, pkg) {
|
||||
if (!connection.equals(this._connection) || this._state == ConnectionState.Closed || this._state == ConnectionState.Init)
|
||||
{
|
||||
this._logDebug("IGNORED: HandleTcpPackage connId %s, package %s, %s.",
|
||||
connection.connectionId, TcpCommand.getName(pkg.command), pkg.correlationId);
|
||||
return;
|
||||
}
|
||||
|
||||
this._logDebug("HandleTcpPackage connId %s, package %s, %s.",
|
||||
this._connection.connectionId, TcpCommand.getName(pkg.command), pkg.correlationId);
|
||||
this._packageNumber += 1;
|
||||
|
||||
if (pkg.command == TcpCommand.HeartbeatResponseCommand)
|
||||
return;
|
||||
if (pkg.command == TcpCommand.HeartbeatRequestCommand)
|
||||
{
|
||||
this._connection.enqueueSend(new TcpPackage(
|
||||
TcpCommand.HeartbeatResponseCommand,
|
||||
TcpFlags.None,
|
||||
pkg.correlationId));
|
||||
return;
|
||||
}
|
||||
|
||||
if (pkg.command == TcpCommand.Authenticated || pkg.command == TcpCommand.NotAuthenticated)
|
||||
{
|
||||
if (this._state == ConnectionState.Connecting
|
||||
&& this._connectingPhase == ConnectingPhase.Authentication
|
||||
&& this._authInfo.correlationId == pkg.correlationId)
|
||||
{
|
||||
if (pkg.command == TcpCommand.NotAuthenticated)
|
||||
this.emit('authenticationFailed', "Not authenticated");
|
||||
|
||||
this._goToConnectedState();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (pkg.command == TcpCommand.BadRequest && pkg.correlationId == EmptyGuid)
|
||||
{
|
||||
var message = "<no message>";
|
||||
try {
|
||||
message = pkg.data.toString();
|
||||
} catch(e) {}
|
||||
var err = new Error("Bad request received from server. Error: " + message);
|
||||
this._closeConnection("Connection-wide BadRequest received. Too dangerous to continue.", err);
|
||||
return;
|
||||
}
|
||||
|
||||
var operation = this._operations.getActiveOperation(pkg.correlationId);
|
||||
if (operation)
|
||||
{
|
||||
var result = operation.operation.inspectPackage(pkg);
|
||||
this._logDebug("HandleTcpPackage OPERATION DECISION %s (%s), %s", result.decision, result.description, operation.operation);
|
||||
switch (result.decision)
|
||||
{
|
||||
case InspectionDecision.DoNothing: break;
|
||||
case InspectionDecision.EndOperation:
|
||||
this._operations.removeOperation(operation);
|
||||
break;
|
||||
case InspectionDecision.Retry:
|
||||
this._operations.scheduleOperationRetry(operation);
|
||||
break;
|
||||
case InspectionDecision.Reconnect:
|
||||
this._reconnectTo({tcpEndPoint: result.tcpEndPoint, secureTcpEndPoint: result.secureTcpEndPoint});
|
||||
this._operations.scheduleOperationRetry(operation);
|
||||
break;
|
||||
default:
|
||||
throw new Error("Unknown InspectionDecision: " + result.decision);
|
||||
}
|
||||
if (this._state == ConnectionState.Connected)
|
||||
this._operations.scheduleWaitingOperations(connection);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
var subscription = this._subscriptions.getActiveSubscription(pkg.correlationId);
|
||||
if (subscription)
|
||||
{
|
||||
var result = subscription.operation.inspectPackage(pkg);
|
||||
this._logDebug("HandleTcpPackage SUBSCRIPTION DECISION %s (%s), %s", result.decision, result.description, subscription);
|
||||
switch (result.decision)
|
||||
{
|
||||
case InspectionDecision.DoNothing: break;
|
||||
case InspectionDecision.EndOperation:
|
||||
this._subscriptions.removeSubscription(subscription);
|
||||
break;
|
||||
case InspectionDecision.Retry:
|
||||
this._subscriptions.scheduleSubscriptionRetry(subscription);
|
||||
break;
|
||||
case InspectionDecision.Reconnect:
|
||||
this._reconnectTo({tcpEndPoint: result.tcpEndPoint, secureTcpEndPoint: result.secureTcpEndPoint});
|
||||
this._subscriptions.scheduleSubscriptionRetry(subscription);
|
||||
break;
|
||||
case InspectionDecision.Subscribed:
|
||||
subscription.isSubscribed = true;
|
||||
break;
|
||||
default:
|
||||
throw new Error("Unknown InspectionDecision: " + result.decision);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
this._logDebug("HandleTcpPackage UNMAPPED PACKAGE with CorrelationId %s, Command: %s",
|
||||
pkg.correlationId, TcpCommand.getName(pkg.command));
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._reconnectTo = function(endPoints) {
|
||||
var endPoint = this._settings.useSslConnection
|
||||
? endPoints.secureTcpEndPoint
|
||||
: endPoints.tcpEndPoint;
|
||||
if (endPoint == null)
|
||||
{
|
||||
this._closeConnection("No end point is specified while trying to reconnect.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (this._state != ConnectionState.Connected || this._connection.remoteEndPoint == endPoint)
|
||||
return;
|
||||
|
||||
var msg = util.format("EventStoreConnection '%s': going to reconnect to [%j]. Current endpoint: [%j, L%j].",
|
||||
this._esConnection.connectionName, endPoint, this._connection.remoteEndPoint, this._connection.localEndPoint);
|
||||
if (this._settings.verboseLogging) this._settings.log.info(msg);
|
||||
this._closeTcpConnection(msg);
|
||||
|
||||
this._state = ConnectionState.Connecting;
|
||||
this._connectingPhase = ConnectingPhase.EndPointDiscovery;
|
||||
this._establishTcpConnection(endPoints);
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._timerTick = function() {
|
||||
switch (this._state)
|
||||
{
|
||||
case ConnectionState.Init: break;
|
||||
case ConnectionState.Connecting:
|
||||
{
|
||||
if (this._connectingPhase == ConnectingPhase.Reconnecting && Date.now() - this._reconnInfo.timeStamp >= this._settings.reconnectionDelay)
|
||||
{
|
||||
this._logDebug("TimerTick checking reconnection...");
|
||||
|
||||
this._reconnInfo = {reconnectionAttempt: this._reconnInfo.reconnectionAttempt + 1, timeStamp: Date.now()};
|
||||
if (this._settings.maxReconnections >= 0 && this._reconnInfo.reconnectionAttempt > this._settings.maxReconnections)
|
||||
this._closeConnection("Reconnection limit reached.");
|
||||
else
|
||||
{
|
||||
this.emit('reconnecting', {});
|
||||
this._discoverEndpoint(null);
|
||||
}
|
||||
}
|
||||
if (this._connectingPhase == ConnectingPhase.Authentication && Date.now() - this._authInfo.timeStamp >= this._settings.operationTimeout)
|
||||
{
|
||||
this.emit('authenticationFailed', "Authentication timed out.");
|
||||
this._goToConnectedState();
|
||||
}
|
||||
if (this._connectingPhase > ConnectingPhase.ConnectionEstablishing)
|
||||
this._manageHeartbeats();
|
||||
break;
|
||||
}
|
||||
case ConnectionState.Connected:
|
||||
{
|
||||
// operations timeouts are checked only if connection is established and check period time passed
|
||||
if (Date.now() - this._lastTimeoutsTimeStamp >= this._settings.operationTimeoutCheckPeriod)
|
||||
{
|
||||
// On mono even impossible connection first says that it is established
|
||||
// so clearing of reconnection count on ConnectionEstablished event causes infinite reconnections.
|
||||
// So we reset reconnection count to zero on each timeout check period when connection is established
|
||||
this._reconnInfo = {reconnectionAttempt: 0, timeStamp: Date.now()};
|
||||
this._operations.checkTimeoutsAndRetry(this._connection);
|
||||
this._subscriptions.checkTimeoutsAndRetry(this._connection);
|
||||
this._lastTimeoutsTimeStamp = Date.now();
|
||||
}
|
||||
this._manageHeartbeats();
|
||||
break;
|
||||
}
|
||||
case ConnectionState.Closed:
|
||||
break;
|
||||
default:
|
||||
throw new Error("Unknown state: " + this._state + ".");
|
||||
}
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() {
|
||||
if (this._connection == null) throw new Error();
|
||||
|
||||
var timeout = this._heartbeatInfo.isIntervalStage ? this._settings.heartbeatInterval : this._settings.heartbeatTimeout;
|
||||
if (Date.now() - this._heartbeatInfo.timeStamp < timeout)
|
||||
return;
|
||||
|
||||
var packageNumber = this._packageNumber;
|
||||
if (this._heartbeatInfo.lastPackageNumber != packageNumber)
|
||||
{
|
||||
this._heartbeatInfo = {lastPackageNumber: packageNumber, isIntervalStage: true, timeStamp: Date.now()};
|
||||
return;
|
||||
}
|
||||
|
||||
if (this._heartbeatInfo.isIntervalStage)
|
||||
{
|
||||
// TcpMessage.Heartbeat analog
|
||||
this._connection.enqueueSend(new TcpPackage(
|
||||
TcpCommand.HeartbeatRequestCommand,
|
||||
TcpFlags.None,
|
||||
uuid.v4()));
|
||||
this._heartbeatInfo = {lastPackageNumber: this._heartbeatInfo.lastPackageNumber, isIntervalStage: false, timeStamp: Date.now()};
|
||||
}
|
||||
else
|
||||
{
|
||||
// TcpMessage.HeartbeatTimeout analog
|
||||
var msg = util.format("EventStoreConnection '%s': closing TCP connection [%j, L%j, %s] due to HEARTBEAT TIMEOUT at pkgNum %d.",
|
||||
this._esConnection.connectionName, this._connection.remoteEndPoint, this._connection.localEndPoint,
|
||||
this._connection.connectionId, packageNumber);
|
||||
this._settings.log.info(msg);
|
||||
this._closeTcpConnection(msg);
|
||||
}
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._logDebug = function(message) {
|
||||
if (!this._settings.verboseLogging) return;
|
||||
|
||||
if (arguments.length > 1)
|
||||
message = util.format.apply(util, Array.prototype.slice.call(arguments));
|
||||
|
||||
this._settings.log.debug("EventStoreConnection '%s': %s", this._esConnection.connectionName, message);
|
||||
};
|
||||
|
||||
EventStoreConnectionLogicHandler.prototype._logInfo = function(message){
|
||||
if (arguments.length > 1)
|
||||
message = util.format.apply(util, Array.prototype.slice.call(arguments));
|
||||
|
||||
this._settings.log.info("EventStoreConnection '%s': %s", this._esConnection.connectionName, message);
|
||||
};
|
||||
|
||||
module.exports = EventStoreConnectionLogicHandler;
|
90
src/core/messages.js
Normal file
90
src/core/messages.js
Normal file
@ -0,0 +1,90 @@
|
||||
var util = require('util');
|
||||
|
||||
function Message() {
|
||||
}
|
||||
Message.prototype.toString = function() {
|
||||
return this.constructor.name;
|
||||
};
|
||||
|
||||
function StartConnectionMessage(cb, endpointDiscoverer) {
|
||||
this.cb = cb;
|
||||
this.endpointDiscoverer = endpointDiscoverer;
|
||||
}
|
||||
util.inherits(StartConnectionMessage, Message);
|
||||
|
||||
function CloseConnectionMessage(reason, error) {
|
||||
this.reason = reason;
|
||||
this.error = error;
|
||||
}
|
||||
util.inherits(CloseConnectionMessage, Message);
|
||||
|
||||
function StartOperationMessage(operation, maxRetries, timeout) {
|
||||
this.operation = operation;
|
||||
this.maxRetries = maxRetries;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
util.inherits(StartOperationMessage, Message);
|
||||
|
||||
function StartSubscriptionMessage(
|
||||
cb, streamId, resolveLinkTos, userCredentials, eventAppeared, subscriptionDropped, maxRetries, timeout
|
||||
) {
|
||||
this.cb = cb;
|
||||
this.streamId = streamId;
|
||||
this.resolveLinkTos = resolveLinkTos;
|
||||
this.userCredentials = userCredentials;
|
||||
this.eventAppeared = eventAppeared;
|
||||
this.subscriptionDropped = subscriptionDropped;
|
||||
this.maxRetries = maxRetries;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
util.inherits(StartSubscriptionMessage, Message);
|
||||
|
||||
/**
|
||||
* @constructor
|
||||
* @property {object} endPoints
|
||||
* @property {object} endPoints.secureTcpEndPoint
|
||||
* @property {object} endPoints.tcpEndPoint
|
||||
*/
|
||||
function EstablishTcpConnectionMessage(endPoints) {
|
||||
this.endPoints = endPoints;
|
||||
}
|
||||
util.inherits(EstablishTcpConnectionMessage, Message);
|
||||
|
||||
function HandleTcpPackageMessage(connection, pkg) {
|
||||
this.connection = connection;
|
||||
this.package = pkg;
|
||||
}
|
||||
util.inherits(HandleTcpPackageMessage, Message);
|
||||
|
||||
function TcpConnectionErrorMessage(connection, error) {
|
||||
this.connection = connection;
|
||||
this.error = error;
|
||||
}
|
||||
util.inherits(TcpConnectionErrorMessage, Message);
|
||||
|
||||
function TcpConnectionEstablishedMessage(connection) {
|
||||
this.connection = connection;
|
||||
}
|
||||
util.inherits(TcpConnectionEstablishedMessage, Message);
|
||||
|
||||
function TcpConnectionClosedMessage(connection, error) {
|
||||
this.connection = connection;
|
||||
this.error = error;
|
||||
}
|
||||
util.inherits(TcpConnectionClosedMessage, Message);
|
||||
|
||||
function TimerTickMessage() {}
|
||||
util.inherits(TimerTickMessage, Message);
|
||||
|
||||
module.exports = {
|
||||
StartConnectionMessage: StartConnectionMessage,
|
||||
CloseConnectionMessage: CloseConnectionMessage,
|
||||
StartOperationMessage: StartOperationMessage,
|
||||
StartSubscriptionMessage: StartSubscriptionMessage,
|
||||
EstablishTcpConnectionMessage: EstablishTcpConnectionMessage,
|
||||
HandleTcpPackageMessage: HandleTcpPackageMessage,
|
||||
TcpConnectionErrorMessage: TcpConnectionErrorMessage,
|
||||
TcpConnectionEstablishedMessage: TcpConnectionEstablishedMessage,
|
||||
TcpConnectionClosedMessage: TcpConnectionClosedMessage,
|
||||
TimerTickMessage: TimerTickMessage
|
||||
};
|
171
src/core/operationsManager.js
Normal file
171
src/core/operationsManager.js
Normal file
@ -0,0 +1,171 @@
|
||||
var util = require('util');
|
||||
var uuid = require('uuid');
|
||||
|
||||
var Hash = require('../common/hash');
|
||||
var TcpCommand = require('../systemData/tcpCommand');
|
||||
|
||||
/**
|
||||
* @param {string} connectionName
|
||||
* @param {object} settings
|
||||
* @constructor
|
||||
* @property {number} totalOperationCount
|
||||
*/
|
||||
function OperationsManager(connectionName, settings) {
|
||||
this._connectionName = connectionName;
|
||||
this._settings = settings;
|
||||
|
||||
this._totalOperationCount = 0;
|
||||
this._activeOperations = new Hash();
|
||||
this._waitingOperations = [];
|
||||
this._retryPendingOperations = [];
|
||||
}
|
||||
Object.defineProperty(OperationsManager.prototype, 'totalOperationCount', {
|
||||
get: function() {
|
||||
return this._totalOperationCount;
|
||||
}
|
||||
});
|
||||
|
||||
OperationsManager.prototype.getActiveOperation = function(correlationId) {
|
||||
return this._activeOperations.get(correlationId);
|
||||
};
|
||||
|
||||
OperationsManager.prototype.cleanUp = function() {
|
||||
var connectionClosedError = new Error(util.format("Connection '%s' was closed.", this._connectionName));
|
||||
|
||||
this._activeOperations.forEach(function(correlationId, operation){
|
||||
operation.operation.fail(connectionClosedError);
|
||||
});
|
||||
this._waitingOperations.forEach(function(operation) {
|
||||
operation.operation.fail(connectionClosedError);
|
||||
});
|
||||
this._retryPendingOperations.forEach(function(operation) {
|
||||
operation.operation.fail(connectionClosedError);
|
||||
});
|
||||
|
||||
this._activeOperations.clear();
|
||||
this._waitingOperations = [];
|
||||
this._retryPendingOperations = [];
|
||||
this._totalOperationCount = 0;
|
||||
};
|
||||
|
||||
OperationsManager.prototype.checkTimeoutsAndRetry = function(connection) {
|
||||
if (!connection) throw new TypeError("Connection is null.");
|
||||
|
||||
var retryOperations = [];
|
||||
var removeOperations = [];
|
||||
var self = this;
|
||||
this._activeOperations.forEach(function(correlationId, operation) {
|
||||
if (operation.connectionId != connection.connectionId)
|
||||
{
|
||||
retryOperations.push(operation);
|
||||
}
|
||||
else if (operation.timeout > 0 && Date.now() - operation.lastUpdated > self._settings.operationTimeout)
|
||||
{
|
||||
var err = util.format("EventStoreConnection '%s': operation never got response from server.\n"
|
||||
+ "UTC now: %s, operation: %s.",
|
||||
self._connectionName, new Date(), operation);
|
||||
self._settings.log.error(err);
|
||||
|
||||
if (self._settings.failOnNoServerResponse)
|
||||
{
|
||||
operation.operation.fail(new Error(err));
|
||||
removeOperations.push(operation);
|
||||
}
|
||||
else
|
||||
{
|
||||
retryOperations.push(operation);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
retryOperations.forEach(function(operation) {
|
||||
self.scheduleOperationRetry(operation);
|
||||
});
|
||||
removeOperations.forEach(function(operation) {
|
||||
self.removeOperation(operation);
|
||||
});
|
||||
|
||||
if (this._retryPendingOperations.length > 0)
|
||||
{
|
||||
this._retryPendingOperations.sort(function(x,y) {
|
||||
if (x.seqNo < y.seqNo) return -1;
|
||||
if (x.seqNo > y.seqNo) return 1;
|
||||
return 0;
|
||||
});
|
||||
this._retryPendingOperations.forEach(function(operation) {
|
||||
var oldCorrId = operation.correlationId;
|
||||
operation.correlationId = uuid.v4();
|
||||
operation.retryCount += 1;
|
||||
self._logDebug("retrying, old corrId %s, operation %s.", oldCorrId, operation);
|
||||
self.scheduleOperation(operation, connection);
|
||||
});
|
||||
this._retryPendingOperations = [];
|
||||
}
|
||||
|
||||
this.scheduleWaitingOperations(connection);
|
||||
};
|
||||
|
||||
OperationsManager.prototype.scheduleOperationRetry = function(operation) {
|
||||
if (!this.removeOperation(operation))
|
||||
return;
|
||||
|
||||
this._logDebug("ScheduleOperationRetry for %s.", operation);
|
||||
if (operation.maxRetries >= 0 && operation.retryCount >= operation.maxRetries)
|
||||
{
|
||||
var err = util.format("Retry limit reached. Operation: %s, RetryCount: %d", operation, operation.retryCount);
|
||||
operation.operation.fail(new Error(err));
|
||||
return;
|
||||
}
|
||||
this._retryPendingOperations.push(operation);
|
||||
};
|
||||
|
||||
OperationsManager.prototype.removeOperation = function(operation) {
|
||||
this._activeOperations.remove(operation.connectionId);
|
||||
this._logDebug("RemoveOperation SUCCEEDED for %s.", operation);
|
||||
this._totalOperationCount = this._activeOperations.length + this._waitingOperations.length;
|
||||
return true;
|
||||
};
|
||||
|
||||
OperationsManager.prototype.scheduleWaitingOperations = function(connection) {
|
||||
if (!connection) throw new TypeError("connection is null.");
|
||||
while (this._waitingOperations.length > 0 && this._activeOperations.length < this._settings.maxConcurrentItems)
|
||||
{
|
||||
this.scheduleOperation(this._waitingOperations.shift(), connection);
|
||||
}
|
||||
this._totalOperationCount = this._activeOperations.length + this._waitingOperations.length;
|
||||
};
|
||||
|
||||
OperationsManager.prototype.enqueueOperation = function(operation) {
|
||||
this._logDebug("EnqueueOperation WAITING for %s.", operation);
|
||||
this._waitingOperations.push(operation);
|
||||
};
|
||||
|
||||
OperationsManager.prototype.scheduleOperation = function(operation, connection) {
|
||||
if (this._activeOperations.length >= this._settings.maxConcurrentItems)
|
||||
{
|
||||
this._logDebug("ScheduleOperation WAITING for %s.", operation);
|
||||
this._waitingOperations.push(operation);
|
||||
}
|
||||
else
|
||||
{
|
||||
operation.connectionId = connection.connectionId;
|
||||
operation.lastUpdated = Date.now();
|
||||
this._activeOperations.add(operation.correlationId, operation);
|
||||
|
||||
var pkg = operation.operation.createNetworkPackage(operation.correlationId);
|
||||
this._logDebug("ScheduleOperation package %s, %s, %s.", TcpCommand.getName(pkg.command), pkg.correlationId, operation);
|
||||
connection.enqueueSend(pkg);
|
||||
}
|
||||
this._totalOperationCount = this._activeOperations.length + this._waitingOperations.length;
|
||||
};
|
||||
|
||||
OperationsManager.prototype._logDebug = function(message) {
|
||||
if (!this._settings.verboseLogging) return;
|
||||
|
||||
if (arguments.length > 1)
|
||||
message = util.format.apply(util, Array.prototype.slice.call(arguments));
|
||||
|
||||
this._settings.log.debug("EventStoreConnection '%s': %s.", this._connectionName, message);
|
||||
};
|
||||
|
||||
module.exports = OperationsManager;
|
41
src/core/simpleQueuedHandler.js
Normal file
41
src/core/simpleQueuedHandler.js
Normal file
@ -0,0 +1,41 @@
|
||||
function typeName(t) {
|
||||
if (typeof t === 'function')
|
||||
return t.name;
|
||||
if (typeof t === 'object')
|
||||
return t.constructor.name;
|
||||
throw new TypeError('type must be a function or object, not ' + typeof t);
|
||||
}
|
||||
|
||||
function SimpleQueuedHandler() {
|
||||
this._handlers = {};
|
||||
this._messages = [];
|
||||
this._isProcessing = false;
|
||||
}
|
||||
|
||||
SimpleQueuedHandler.prototype.registerHandler = function(type, handler) {
|
||||
type = typeName(type);
|
||||
this._handlers[type] = handler;
|
||||
};
|
||||
|
||||
SimpleQueuedHandler.prototype.enqueueMessage = function(msg) {
|
||||
this._messages.push(msg);
|
||||
if (!this._isProcessing) {
|
||||
this._isProcessing = true;
|
||||
setImmediate(this._processQueue.bind(this));
|
||||
}
|
||||
};
|
||||
|
||||
SimpleQueuedHandler.prototype._processQueue = function() {
|
||||
var message = this._messages.shift();
|
||||
while(message) {
|
||||
var type = typeName(message);
|
||||
var handler = this._handlers[type];
|
||||
if (!handler)
|
||||
throw new Error("No handler registered for message " + type);
|
||||
setImmediate(handler, message);
|
||||
message = this._messages.shift();
|
||||
}
|
||||
this._isProcessing = false;
|
||||
};
|
||||
|
||||
module.exports = SimpleQueuedHandler;
|
14
src/core/staticEndpointDiscoverer.js
Normal file
14
src/core/staticEndpointDiscoverer.js
Normal file
@ -0,0 +1,14 @@
|
||||
var when = require('when');
|
||||
|
||||
function StaticEndpointDiscoverer(tcpEndPoint, useSsl) {
|
||||
this._nodeEndpoints = {
|
||||
tcpEndPoint: useSsl ? null : tcpEndPoint,
|
||||
secureTcpEndPoint: useSsl ? tcpEndPoint : null
|
||||
}
|
||||
}
|
||||
|
||||
StaticEndpointDiscoverer.prototype.discover = function(failedTcpEndpoint) {
|
||||
return when(this._nodeEndpoints);
|
||||
};
|
||||
|
||||
module.exports = StaticEndpointDiscoverer;
|
171
src/core/subscriptionsManager.js
Normal file
171
src/core/subscriptionsManager.js
Normal file
@ -0,0 +1,171 @@
|
||||
var util = require('util');
|
||||
var uuid = require('uuid');
|
||||
var Hash = require('../common/hash');
|
||||
|
||||
var SubscriptionDropReason = require('../subscriptionDropReason');
|
||||
|
||||
function SubscriptionsManager(connectionName, settings) {
|
||||
//Ensure.NotNull(connectionName, "connectionName");
|
||||
//Ensure.NotNull(settings, "settings");
|
||||
this._connectionName = connectionName;
|
||||
this._settings = settings;
|
||||
|
||||
this._activeSubscriptions = new Hash();
|
||||
this._waitingSubscriptions = [];
|
||||
this._retryPendingSubscriptions = [];
|
||||
}
|
||||
|
||||
SubscriptionsManager.prototype.getActiveSubscription = function(correlationId) {
|
||||
return this._activeSubscriptions.get(correlationId);
|
||||
};
|
||||
|
||||
SubscriptionsManager.prototype.cleanUp = function() {
|
||||
var connectionClosedError = new Error(util.format("Connection '%s' was closed.", this._connectionName));
|
||||
|
||||
var self = this;
|
||||
this._activeSubscriptions.forEach(function(correlationId, subscription){
|
||||
subscription.operation.dropSubscription(SubscriptionDropReason.ConnectionClosed, connectionClosedError);
|
||||
});
|
||||
this._waitingSubscriptions.forEach(function(subscription){
|
||||
subscription.operation.dropSubscription(SubscriptionDropReason.ConnectionClosed, connectionClosedError);
|
||||
});
|
||||
this._retryPendingSubscriptions.forEach(function(subscription){
|
||||
subscription.operation.dropSubscription(SubscriptionDropReason.ConnectionClosed, connectionClosedError);
|
||||
});
|
||||
|
||||
this._activeSubscriptions.clear();
|
||||
this._waitingSubscriptions = [];
|
||||
this._retryPendingSubscriptions = [];
|
||||
};
|
||||
|
||||
SubscriptionsManager.prototype.purgeSubscribedAndDroppedSubscriptions = function() {
|
||||
var self = this;
|
||||
var subscriptionsToRemove = [];
|
||||
this._activeSubscriptions.forEach(function(_, subscription) {
|
||||
if (subscription.isSubscribed && subscription.connectionId == connectionId) {
|
||||
subscription.operation.connectionClosed();
|
||||
subscriptionsToRemove.push(subscription);
|
||||
}
|
||||
});
|
||||
subscriptionsToRemove.forEach(function(subscription) {
|
||||
self._activeSubscriptions.remove(subscription.correlationId);
|
||||
});
|
||||
};
|
||||
|
||||
SubscriptionsManager.prototype.checkTimeoutsAndRetry = function(connection) {
|
||||
//Ensure.NotNull(connection, "connection");
|
||||
|
||||
var self = this;
|
||||
var retrySubscriptions = [];
|
||||
var removeSubscriptions = [];
|
||||
this._activeSubscriptions.forEach(function(_, subscription) {
|
||||
if (subscription.isSubscribed) return;
|
||||
if (subscription.connectionId != connection.connectionId)
|
||||
{
|
||||
retrySubscriptions.push(subscription);
|
||||
}
|
||||
else if (subscription.timeout > 0 && Date.now() - subscription.lastUpdated > self._settings.operationTimeout)
|
||||
{
|
||||
var err = util.format("EventStoreConnection '%s': subscription never got confirmation from server.\n" +
|
||||
"UTC now: %s, operation: %s.",
|
||||
self._connectionName, new Date(), subscription);
|
||||
self._settings.log.error(err);
|
||||
|
||||
if (self._settings.failOnNoServerResponse)
|
||||
{
|
||||
subscription.operation.dropSubscription(SubscriptionDropReason.SubscribingError, new Error(err));
|
||||
removeSubscriptions.push(subscription);
|
||||
}
|
||||
else
|
||||
{
|
||||
retrySubscriptions.push(subscription);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
retrySubscriptions.forEach(function(subscription) {
|
||||
self.scheduleSubscriptionRetry(subscription);
|
||||
});
|
||||
removeSubscriptions.forEach(function(subscription) {
|
||||
self.removeSubscription(subscription);
|
||||
});
|
||||
|
||||
if (this._retryPendingSubscriptions.length > 0)
|
||||
{
|
||||
this._retryPendingSubscriptions.forEach(function(subscription) {
|
||||
subscription.retryCount += 1;
|
||||
self.startSubscription(subscription, connection);
|
||||
});
|
||||
this._retryPendingSubscriptions = [];
|
||||
}
|
||||
|
||||
while (this._waitingSubscriptions.length > 0)
|
||||
{
|
||||
this.startSubscription(this._waitingSubscriptions.shift(), connection);
|
||||
}
|
||||
};
|
||||
|
||||
SubscriptionsManager.prototype.removeSubscription = function(subscription) {
|
||||
this._activeSubscriptions.remove(subscription.correlationId);
|
||||
this._logDebug("RemoveSubscription %s.", subscription);
|
||||
return true;
|
||||
};
|
||||
|
||||
SubscriptionsManager.prototype.scheduleSubscriptionRetry = function(subscription) {
|
||||
if (!this.removeSubscription(subscription))
|
||||
{
|
||||
this._logDebug("RemoveSubscription failed when trying to retry %s.", subscription);
|
||||
return;
|
||||
}
|
||||
|
||||
if (subscription.maxRetries >= 0 && subscription.retryCount >= subscription.maxRetries)
|
||||
{
|
||||
this._logDebug("RETRIES LIMIT REACHED when trying to retry %s.", subscription);
|
||||
var err = util.format("Retries limit reached. Subscription: %s RetryCount: %d.", subscription, subscription.retryCount);
|
||||
subscription.operation.dropSubscription(SubscriptionDropReason.SubscribingError, new Error(err));
|
||||
return;
|
||||
}
|
||||
|
||||
this._logDebug("retrying subscription %s.", subscription);
|
||||
this._retryPendingSubscriptions.push(subscription);
|
||||
};
|
||||
|
||||
SubscriptionsManager.prototype.enqueueSubscription = function(subscriptionItem) {
|
||||
this._waitingSubscriptions.push(subscriptionItem);
|
||||
};
|
||||
|
||||
SubscriptionsManager.prototype.startSubscription = function(subscription, connection) {
|
||||
//Ensure.NotNull(connection, "connection");
|
||||
|
||||
if (subscription.isSubscribed)
|
||||
{
|
||||
this._logDebug("StartSubscription REMOVING due to already subscribed %s.", subscription);
|
||||
this.removeSubscription(subscription);
|
||||
return;
|
||||
}
|
||||
|
||||
subscription.correlationId = uuid.v4();
|
||||
subscription.connectionId = connection.connectionId;
|
||||
subscription.lastUpdated = Date.now();
|
||||
|
||||
this._activeSubscriptions.add(subscription.correlationId, subscription);
|
||||
|
||||
if (!subscription.operation.subscribe(subscription.correlationId, connection))
|
||||
{
|
||||
this._logDebug("StartSubscription REMOVING AS COULDN'T SUBSCRIBE %s.", subscription);
|
||||
this.removeSubscription(subscription);
|
||||
}
|
||||
else
|
||||
{
|
||||
this._logDebug("StartSubscription SUBSCRIBING %s.", subscription);
|
||||
}
|
||||
};
|
||||
|
||||
SubscriptionsManager.prototype._logDebug = function(message) {
|
||||
if (!this._settings.verboseLogging) return;
|
||||
|
||||
var parameters = Array.prototype.slice.call(arguments, 1);
|
||||
this._settings.log.debug("EventStoreConnection '%s': %s.", this._connectionName, parameters.length == 0 ? message : util.format(message, parameters));
|
||||
};
|
||||
|
||||
module.exports = SubscriptionsManager;
|
Reference in New Issue
Block a user