node-eventstore-client/src/clientOperations/connectToPersistentSubscriptionOperation.js

123 lines
6.1 KiB
JavaScript

var util = require('util');
var uuid = require('uuid');
var SubscriptionOperation = require('./subscriptionOperation');
var ClientMessage = require('../messages/clientMessage');
var TcpCommand = require('../systemData/tcpCommand');
var TcpFlags = require('../systemData/tcpFlags');
var TcpPackage = require('../systemData/tcpPackage');
var createBufferSegment = require('../common/bufferSegment');
var InspectionResult = require('./../systemData/inspectionResult');
var InspectionDecision = require('../systemData/inspectionDecision');
var results = require('../results');
var SubscriptionDropReason = require('../subscriptionDropReason');
var PersistentEventStoreSubscription = require('../persistentEventStoreSubscription');
var ensure = require('../common/utils/ensure');
function ConnectToPersistentSubscriptionOperation(
log, cb, groupName, bufferSize, streamId, userCredentials, eventAppeared, subscriptionDropped,
verboseLogging, getConnection
) {
SubscriptionOperation.call(this, log, cb, streamId, false, userCredentials, eventAppeared, subscriptionDropped, verboseLogging, getConnection);
this._groupName = groupName;
this._bufferSize = bufferSize;
this._subscriptionId = null;
}
util.inherits(ConnectToPersistentSubscriptionOperation, SubscriptionOperation);
ConnectToPersistentSubscriptionOperation.prototype._createSubscriptionPackage = function() {
var dto = new ClientMessage.ConnectToPersistentSubscription(this._groupName, this._streamId, this._bufferSize);
return new TcpPackage(TcpCommand.ConnectToPersistentSubscription,
this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None,
this._correlationId,
this._userCredentials !== null ? this._userCredentials.username : null,
this._userCredentials !== null ? this._userCredentials.password : null,
createBufferSegment(dto.toBuffer()));
};
ConnectToPersistentSubscriptionOperation.prototype._inspectPackage = function(pkg) {
if (pkg.command === TcpCommand.PersistentSubscriptionConfirmation)
{
var dto = ClientMessage.PersistentSubscriptionConfirmation.decode(pkg.data.toBuffer());
this._confirmSubscription(dto.last_commit_position, dto.last_event_number);
this._subscriptionId = dto.subscription_id;
return new InspectionResult(InspectionDecision.Subscribed, "SubscriptionConfirmation");
}
if (pkg.command === TcpCommand.PersistentSubscriptionStreamEventAppeared)
{
var dto = ClientMessage.PersistentSubscriptionStreamEventAppeared.decode(pkg.data.toBuffer());
this._onEventAppeared(new results.ResolvedEvent(dto.event));
return new InspectionResult(InspectionDecision.DoNothing, "StreamEventAppeared");
}
if (pkg.command === TcpCommand.SubscriptionDropped)
{
var dto = ClientMessage.SubscriptionDropped.decode(pkg.data.toBuffer());
if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.AccessDenied)
{
this.dropSubscription(SubscriptionDropReason.AccessDenied, new Error("You do not have access to the stream."));
return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped");
}
if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.NotFound)
{
this.dropSubscription(SubscriptionDropReason.NotFound, new Error("Subscription not found"));
return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped");
}
if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.PersistentSubscriptionDeleted)
{
this.dropSubscription(SubscriptionDropReason.PersistentSubscriptionDeleted, new Error("Persistent subscription deleted."));
return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped");
}
if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.SubscriberMaxCountReached)
{
this.dropSubscription(SubscriptionDropReason.MaxSubscribersReached, new Error("Maximum subscribers reached."));
return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped");
}
this.dropSubscription(SubscriptionDropReason.UserInitiated, null, this._getConnection());
return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped");
}
return null;
};
ConnectToPersistentSubscriptionOperation.prototype._createSubscriptionObject = function(lastCommitPosition, lastEventNumber) {
return new PersistentEventStoreSubscription(this, this._streamId, lastCommitPosition, lastEventNumber);
};
ConnectToPersistentSubscriptionOperation.prototype.notifyEventsProcessed = function(processedEvents) {
ensure.notNull(processedEvents, "processedEvents");
var dto = new ClientMessage.PersistentSubscriptionAckEvents({
subscription_id: this._subscriptionId,
processed_event_ids: processedEvents.map(function (x) {
return new Buffer(uuid.parse(x));
})
});
var pkg = new TcpPackage(TcpCommand.PersistentSubscriptionAckEvents,
this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None,
this._correlationId,
this._userCredentials !== null ? this._userCredentials.username : null,
this._userCredentials !== null ? this._userCredentials.password : null,
createBufferSegment(dto.encode().toBuffer()));
this._enqueueSend(pkg);
};
ConnectToPersistentSubscriptionOperation.prototype.notifyEventsFailed = function(processedEvents, action, reason) {
ensure.notNull(processedEvents, "processedEvents");
ensure.notNull(reason, "reason");
var dto = new ClientMessage.PersistentSubscriptionNakEvents(
this._subscriptionId,
processedEvents.map(function(x) { return new Buffer(uuid.parse(x)); }),
reason,
action);
var pkg = new TcpPackage(TcpCommand.PersistentSubscriptionNakEvents,
this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None,
this._correlationId,
this._userCredentials !== null ? this._userCredentials.username : null,
this._userCredentials !== null ? this._userCredentials.password : null,
createBufferSegment(dto.toBuffer()));
this._enqueueSend(pkg);
};
module.exports = ConnectToPersistentSubscriptionOperation;