Adding persistent subscription support

This commit is contained in:
Nicolas Dextraze
2016-03-10 22:57:39 -08:00
parent 29f6bf210b
commit f1c3c42d46
25 changed files with 1939 additions and 117 deletions

View File

@ -0,0 +1,122 @@
var util = require('util');
var uuid = require('uuid');
var SubscriptionOperation = require('./subscriptionOperation');
var ClientMessage = require('../messages/clientMessage');
var TcpCommand = require('../systemData/tcpCommand');
var TcpFlags = require('../systemData/tcpFlags');
var TcpPackage = require('../systemData/tcpPackage');
var createBufferSegment = require('../common/bufferSegment');
var InspectionResult = require('./../systemData/inspectionResult');
var InspectionDecision = require('../systemData/inspectionDecision');
var results = require('../results');
var SubscriptionDropReason = require('../subscriptionDropReason');
var PersistentEventStoreSubscription = require('../persistentEventStoreSubscription');
var ensure = require('../common/utils/ensure');
function ConnectToPersistentSubscriptionOperation(
log, cb, groupName, bufferSize, streamId, userCredentials, eventAppeared, subscriptionDropped,
verboseLogging, getConnection
) {
SubscriptionOperation.call(this, log, cb, streamId, false, userCredentials, eventAppeared, subscriptionDropped, verboseLogging, getConnection);
this._groupName = groupName;
this._bufferSize = bufferSize;
this._subscriptionId = null;
}
util.inherits(ConnectToPersistentSubscriptionOperation, SubscriptionOperation);
ConnectToPersistentSubscriptionOperation.prototype._createSubscriptionPackage = function() {
var dto = new ClientMessage.ConnectToPersistentSubscription(this._groupName, this._streamId, this._bufferSize);
return new TcpPackage(TcpCommand.ConnectToPersistentSubscription,
this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None,
this._correlationId,
this._userCredentials !== null ? this._userCredentials.username : null,
this._userCredentials !== null ? this._userCredentials.password : null,
createBufferSegment(dto.toBuffer()));
};
ConnectToPersistentSubscriptionOperation.prototype._inspectPackage = function(pkg) {
if (pkg.command == TcpCommand.PersistentSubscriptionConfirmation)
{
var dto = ClientMessage.PersistentSubscriptionConfirmation.decode(pkg.data.toBuffer());
this._confirmSubscription(dto.last_commit_position, dto.last_event_number);
this._subscriptionId = dto.subscription_id;
return new InspectionResult(InspectionDecision.Subscribed, "SubscriptionConfirmation");
}
if (pkg.command == TcpCommand.PersistentSubscriptionStreamEventAppeared)
{
var dto = ClientMessage.PersistentSubscriptionStreamEventAppeared.decode(pkg.data.toBuffer());
this._onEventAppeared(new results.ResolvedEvent(dto.event));
return new InspectionResult(InspectionDecision.DoNothing, "StreamEventAppeared");
}
if (pkg.command == TcpCommand.SubscriptionDropped)
{
var dto = ClientMessage.SubscriptionDropped.decode(pkg.data.toBuffer());
if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.AccessDenied)
{
this.dropSubscription(SubscriptionDropReason.AccessDenied, new Error("You do not have access to the stream."));
return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped");
}
if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.NotFound)
{
this.dropSubscription(SubscriptionDropReason.NotFound, new Error("Subscription not found"));
return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped");
}
if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.PersistentSubscriptionDeleted)
{
this.dropSubscription(SubscriptionDropReason.PersistentSubscriptionDeleted, new Error("Persistent subscription deleted."));
return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped");
}
if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.SubscriberMaxCountReached)
{
this.dropSubscription(SubscriptionDropReason.MaxSubscribersReached, new Error("Maximum subscribers reached."));
return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped");
}
this.dropSubscription(SubscriptionDropReason.UserInitiated, null, this._getConnection());
return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped");
}
return null;
};
ConnectToPersistentSubscriptionOperation.prototype._createSubscriptionObject = function(lastCommitPosition, lastEventNumber) {
return new PersistentEventStoreSubscription(this, this._streamId, lastCommitPosition, lastEventNumber);
};
ConnectToPersistentSubscriptionOperation.prototype.notifyEventsProcessed = function(processedEvents) {
ensure.notNull(processedEvents, "processedEvents");
var dto = new ClientMessage.PersistentSubscriptionAckEvents({
subscription_id: this._subscriptionId,
processed_event_ids: processedEvents.map(function (x) {
return new Buffer(uuid.parse(x));
})
});
var pkg = new TcpPackage(TcpCommand.PersistentSubscriptionAckEvents,
this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None,
this._correlationId,
this._userCredentials !== null ? this._userCredentials.username : null,
this._userCredentials !== null ? this._userCredentials.password : null,
createBufferSegment(dto.encode().toBuffer()));
this._enqueueSend(pkg);
};
ConnectToPersistentSubscriptionOperation.prototype.notifyEventsFailed = function(processedEvents, action, reason) {
ensure.notNull(processedEvents, "processedEvents");
ensure.notNull(reason, "reason");
var dto = new ClientMessage.PersistentSubscriptionNakEvents(
this._subscriptionId,
processedEvents.map(function(x) { return new Buffer(uuid.parse(x)); }),
reason,
action);
var pkg = new TcpPackage(TcpCommand.PersistentSubscriptionNakEvents,
this._userCredentials != null ? TcpFlags.Authenticated : TcpFlags.None,
this._correlationId,
this._userCredentials != null ? this._userCredentials.username : null,
this._userCredentials != null ? this._userCredentials.password : null,
createBufferSegment(dto.toBuffer()));
this._enqueueSend(pkg);
};
module.exports = ConnectToPersistentSubscriptionOperation;

View File

@ -0,0 +1,74 @@
var util = require('util');
var uuid = require('uuid');
var ensure = require('../common/utils/ensure');
var OperationBase = require('../clientOperations/operationBase');
var TcpCommand = require('../systemData/tcpCommand');
var ClientMessage = require('../messages/clientMessage');
var SystemConsumerStrategies = require('../systemConsumerStrategies');
var InspectionDecision = require('../systemData/inspectionDecision');
var InspectionResult = require('./../systemData/inspectionResult');
var results = require('../results');
function CreatePersistentSubscriptionOperation(log, cb, stream, groupName, settings, userCredentials) {
OperationBase.call(this, log, cb, TcpCommand.CreatePersistentSubscription, TcpCommand.CreatePersistentSubscriptionCompleted, userCredentials);
ensure.notNull(settings, "settings");
this._resolveLinkTos = settings.resolveLinkTos;
this._stream = stream;
this._groupName = groupName;
this._startFromBeginning = settings.startFrom;
this._maxRetryCount = settings.maxRetryCount;
this._liveBufferSize = settings.liveBufferSize;
this._readBatchSize = settings.readBatchSize;
this._bufferSize = settings.historyBufferSize;
this._recordStatistics = settings.extraStatistics;
this._messageTimeoutMilliseconds = settings.messageTimeout;
this._checkPointAfter = settings.checkPointAfter;
this._minCheckPointCount = settings.minCheckPointCount;
this._maxCheckPointCount = settings.maxCheckPointCount;
this._maxSubscriberCount = settings.maxSubscriberCount;
this._namedConsumerStrategy = settings.namedConsumerStrategy;
this._responseType = ClientMessage.CreatePersistentSubscriptionCompleted;
}
util.inherits(CreatePersistentSubscriptionOperation, OperationBase);
CreatePersistentSubscriptionOperation.prototype._createRequestDto = function() {
return new ClientMessage.CreatePersistentSubscription(this._groupName, this._stream, this._resolveLinkTos,
this._startFromBeginning, this._messageTimeoutMilliseconds, this._recordStatistics, this._liveBufferSize,
this._readBatchSize, this._bufferSize, this._maxRetryCount,
this._namedConsumerStrategy == SystemConsumerStrategies.RoundRobin, this._checkPointAfter,
this._maxCheckPointCount, this._minCheckPointCount, this._maxSubscriberCount, this._namedConsumerStrategy);
};
CreatePersistentSubscriptionOperation.prototype._inspectResponse = function(response) {
switch (response.result)
{
case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.Success:
this._succeed();
return new InspectionResult(InspectionDecision.EndOperation, "Success");
case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.Fail:
this.fail(new Error(util.format("Subscription group %s on stream %s failed '%s'", this._groupName, this._stream, response.reason)));
return new InspectionResult(InspectionDecision.EndOperation, "Fail");
case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.AccessDenied:
this.fail(new Error(util.format("Write access denied for stream '%s'.", this._stream)));
return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied");
case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.AlreadyExists:
this.fail(new Error(util.format("Subscription group %s on stream %s already exists", this._groupName, this._stream)));
return new InspectionResult(InspectionDecision.EndOperation, "AlreadyExists");
default:
throw new Error(util.format("Unexpected OperationResult: %s.", response.result));
}
};
CreatePersistentSubscriptionOperation.prototype._transformResponse = function(response) {
return new results.PersistentSubscriptionCreateResult(results.PersistentSubscriptionCreateStatus.Success);
};
CreatePersistentSubscriptionOperation.prototype.toString = function() {
return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName);
};
module.exports = CreatePersistentSubscriptionOperation;

View File

@ -0,0 +1,55 @@
var util = require('util');
var uuid = require('uuid');
var ensure = require('../common/utils/ensure');
var OperationBase = require('../clientOperations/operationBase');
var TcpCommand = require('../systemData/tcpCommand');
var ClientMessage = require('../messages/clientMessage');
var InspectionDecision = require('../systemData/inspectionDecision');
var InspectionResult = require('./../systemData/inspectionResult');
var results = require('../results');
function DeletePersistentSubscriptionOperation(log, cb, stream, groupName, userCredentials) {
OperationBase.call(this, log, cb, TcpCommand.DeletePersistentSubscription, TcpCommand.DeletePersistentSubscriptionCompleted, userCredentials);
this._stream = stream;
this._groupName = groupName;
this._responseType = ClientMessage.DeletePersistentSubscriptionCompleted;
}
util.inherits(DeletePersistentSubscriptionOperation, OperationBase);
DeletePersistentSubscriptionOperation.prototype._createRequestDto = function() {
return new ClientMessage.DeletePersistentSubscription(this._groupName, this._stream);
};
DeletePersistentSubscriptionOperation.prototype._inspectResponse = function(response) {
switch (response.result)
{
case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.Success:
this._succeed();
return new InspectionResult(InspectionDecision.EndOperation, "Success");
case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.Fail:
this.fail(new Error(util.format("Subscription group %s on stream %s failed '%s'", this._groupName, this._stream, response.reason)));
return new InspectionResult(InspectionDecision.EndOperation, "Fail");
case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.AccessDenied:
this.fail(new Error(util.format("Write access denied for stream '%s'.", this._stream)));
return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied");
case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.DoesNotExist:
this.fail(new Error(util.format("Subscription group %s on stream %s does not exist", this._groupName, this._stream)));
return new InspectionResult(InspectionDecision.EndOperation, "DoesNotExist");
default:
throw new Error(util.format("Unexpected OperationResult: %s.", response.result));
}
};
DeletePersistentSubscriptionOperation.prototype._transformResponse = function(response) {
return new results.PersistentSubscriptionDeleteResult(results.PersistentSubscriptionDeleteStatus.Success);
};
DeletePersistentSubscriptionOperation.prototype.toString = function() {
return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName);
};
module.exports = DeletePersistentSubscriptionOperation;

View File

@ -51,7 +51,7 @@ OperationBase.prototype._succeed = function() {
OperationBase.prototype.createNetworkPackage = function(correlationId) {
var dto = this._createRequestDto();
var buf = dto.encode().toBuffer();
var buf = dto.toBuffer();
return new TcpPackage(
this._requestCommand,
this.userCredentials ? TcpFlags.Authenticated : TcpFlags.None,

View File

@ -39,6 +39,10 @@ function SubscriptionOperation(
this._actionQueue = [];
}
SubscriptionOperation.prototype._enqueueSend = function(pkg) {
this._getConnection().enqueueSend(pkg);
};
SubscriptionOperation.prototype.subscribe = function(correlationId, connection) {
if (connection === null) throw new TypeError("connection is null.");
@ -60,7 +64,7 @@ SubscriptionOperation.prototype.unsubscribe = function() {
SubscriptionOperation.prototype._createUnsubscriptionPackage = function() {
var msg = new ClientMessage.UnsubscribeFromStream();
var data = new BufferSegment(msg.encode().toBuffer());
var data = new BufferSegment(msg.toBuffer());
return new TcpPackage(TcpCommand.UnsubscribeFromStream, TcpFlags.None, this._correlationId, null, null, data);
};
@ -182,11 +186,12 @@ SubscriptionOperation.prototype.dropSubscription = function(reason, err, connect
this._log.debug("Subscription %s to %s: closing subscription, reason: %s, exception: %s...",
this._correlationId, this._streamId || "<all>", reason, err);
if (reason !== SubscriptionDropReason.UserInitiated)
if (reason !== SubscriptionDropReason.UserInitiated && this._subscription === null)
{
if (err === null) throw new Error(util.format("No exception provided for subscription drop reason '%s", reason));
//TODO: this should be last thing to execute
this._cb(err);
return;
}
if (reason === SubscriptionDropReason.UserInitiated && this._subscription !== null && connection !== null)
@ -223,7 +228,7 @@ SubscriptionOperation.prototype._onEventAppeared = function(e) {
if (this._subscription === null) throw new Error("Subscription not confirmed, but event appeared!");
if (this._verboseLogging)
this._log.debug("Subscription %s to %s: event appeared (%s, %d, %s @ %j).",
this._log.debug("Subscription %s to %s: event appeared (%s, %d, %s @ %s).",
this._correlationId, this._streamId || "<all>",
e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition);
@ -250,7 +255,7 @@ SubscriptionOperation.prototype._executeActions = function() {
}
catch (err)
{
this._log.error(err, "Exception during executing user callback: %s.", err.Message);
this._log.error(err, "Exception during executing user callback: %s.", err.message);
}
action = this._actionQueue.shift();
}

View File

@ -0,0 +1,74 @@
var util = require('util');
var uuid = require('uuid');
var ensure = require('../common/utils/ensure');
var OperationBase = require('../clientOperations/operationBase');
var TcpCommand = require('../systemData/tcpCommand');
var ClientMessage = require('../messages/clientMessage');
var SystemConsumerStrategies = require('../systemConsumerStrategies');
var InspectionDecision = require('../systemData/inspectionDecision');
var InspectionResult = require('./../systemData/inspectionResult');
var results = require('../results');
function UpdatePersistentSubscriptionOperation(log, cb, stream, groupName, settings, userCredentials) {
OperationBase.call(this, log, cb, TcpCommand.UpdatePersistentSubscription, TcpCommand.UpdatePersistentSubscriptionCompleted, userCredentials);
ensure.notNull(settings, "settings");
this._resolveLinkTos = settings.resolveLinkTos;
this._stream = stream;
this._groupName = groupName;
this._startFromBeginning = settings.startFrom;
this._maxRetryCount = settings.maxRetryCount;
this._liveBufferSize = settings.liveBufferSize;
this._readBatchSize = settings.readBatchSize;
this._bufferSize = settings.historyBufferSize;
this._recordStatistics = settings.extraStatistics;
this._messageTimeoutMilliseconds = settings.messageTimeout;
this._checkPointAfter = settings.checkPointAfter;
this._minCheckPointCount = settings.minCheckPointCount;
this._maxCheckPointCount = settings.maxCheckPointCount;
this._maxSubscriberCount = settings.maxSubscriberCount;
this._namedConsumerStrategy = settings.namedConsumerStrategy;
this._responseType = ClientMessage.UpdatePersistentSubscriptionCompleted;
}
util.inherits(UpdatePersistentSubscriptionOperation, OperationBase);
UpdatePersistentSubscriptionOperation.prototype._createRequestDto = function() {
return new ClientMessage.UpdatePersistentSubscription(this._groupName, this._stream, this._resolveLinkTos,
this._startFromBeginning, this._messageTimeoutMilliseconds, this._recordStatistics, this._liveBufferSize,
this._readBatchSize, this._bufferSize, this._maxRetryCount,
this._namedConsumerStrategy == SystemConsumerStrategies.RoundRobin, this._checkPointAfter,
this._maxCheckPointCount, this._minCheckPointCount, this._maxSubscriberCount, this._namedConsumerStrategy);
};
UpdatePersistentSubscriptionOperation.prototype._inspectResponse = function(response) {
switch (response.result)
{
case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.Success:
this._succeed();
return new InspectionResult(InspectionDecision.EndOperation, "Success");
case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.Fail:
this.fail(new Error(util.format("Subscription group %s on stream %s failed '%s'", this._groupName, this._stream, response.reason)));
return new InspectionResult(InspectionDecision.EndOperation, "Fail");
case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.AccessDenied:
this.fail(new Error(util.format("Write access denied for stream '%s'.", this._stream)));
return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied");
case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.DoesNotExist:
this.fail(new Error(util.format("Subscription group %s on stream %s does not exist", this._groupName, this._stream)));
return new InspectionResult(InspectionDecision.EndOperation, "DoesNotExist");
default:
throw new Error(util.format("Unexpected OperationResult: %s.", response.result));
}
};
UpdatePersistentSubscriptionOperation.prototype._transformResponse = function(response) {
return new results.PersistentSubscriptionUpdateResult(results.PersistentSubscriptionUpdateStatus.Success);
};
UpdatePersistentSubscriptionOperation.prototype.toString = function() {
return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName);
};
module.exports = UpdatePersistentSubscriptionOperation;

View File

@ -26,7 +26,7 @@ VolatileSubscriptionOperation.prototype._createSubscriptionPackage = function()
this._correlationId,
this._userCredentials != null ? this._userCredentials.username : null,
this._userCredentials != null ? this._userCredentials.password : null,
new BufferSegment(dto.encode().toBuffer()));
new BufferSegment(dto.toBuffer()));
};
VolatileSubscriptionOperation.prototype._inspectPackage = function(pkg) {