var util = require('util'); var uuidParse = require('uuid-parse'); var SubscriptionOperation = require('./subscriptionOperation'); var ClientMessage = require('../messages/clientMessage'); var TcpCommand = require('../systemData/tcpCommand'); var TcpFlags = require('../systemData/tcpFlags'); var TcpPackage = require('../systemData/tcpPackage'); var createBufferSegment = require('../common/bufferSegment'); var InspectionResult = require('./../systemData/inspectionResult'); var InspectionDecision = require('../systemData/inspectionDecision'); var results = require('../results'); var SubscriptionDropReason = require('../subscriptionDropReason'); var PersistentEventStoreSubscription = require('../persistentEventStoreSubscription'); var ensure = require('../common/utils/ensure'); function ConnectToPersistentSubscriptionOperation( log, cb, groupName, bufferSize, streamId, userCredentials, eventAppeared, subscriptionDropped, verboseLogging, getConnection ) { SubscriptionOperation.call(this, log, cb, streamId, false, userCredentials, eventAppeared, subscriptionDropped, verboseLogging, getConnection); this._groupName = groupName; this._bufferSize = bufferSize; this._subscriptionId = null; } util.inherits(ConnectToPersistentSubscriptionOperation, SubscriptionOperation); ConnectToPersistentSubscriptionOperation.prototype._createSubscriptionPackage = function() { var dto = new ClientMessage.ConnectToPersistentSubscription({ subscriptionId: this._groupName, eventStreamId: this._streamId, allowedInFlightMessages: this._bufferSize }); return new TcpPackage(TcpCommand.ConnectToPersistentSubscription, this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None, this._correlationId, this._userCredentials !== null ? this._userCredentials.username : null, this._userCredentials !== null ? this._userCredentials.password : null, createBufferSegment(ClientMessage.ConnectToPersistentSubscription.encode(dto).finish())); }; ConnectToPersistentSubscriptionOperation.prototype._inspectPackage = function(pkg) { if (pkg.command === TcpCommand.PersistentSubscriptionConfirmation) { var dto = ClientMessage.PersistentSubscriptionConfirmation.decode(pkg.data.toBuffer()); this._confirmSubscription(dto.lastCommitPosition, dto.lastEventNumber); this._subscriptionId = dto.subscriptionId; return new InspectionResult(InspectionDecision.Subscribed, "SubscriptionConfirmation"); } if (pkg.command === TcpCommand.PersistentSubscriptionStreamEventAppeared) { var dto = ClientMessage.PersistentSubscriptionStreamEventAppeared.decode(pkg.data.toBuffer()); this._onEventAppeared(new results.ResolvedEvent(dto.event)); return new InspectionResult(InspectionDecision.DoNothing, "StreamEventAppeared"); } if (pkg.command === TcpCommand.SubscriptionDropped) { var dto = ClientMessage.SubscriptionDropped.decode(pkg.data.toBuffer()); if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.AccessDenied) { this.dropSubscription(SubscriptionDropReason.AccessDenied, new Error("You do not have access to the stream.")); return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); } if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.NotFound) { this.dropSubscription(SubscriptionDropReason.NotFound, new Error("Subscription not found")); return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); } if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.PersistentSubscriptionDeleted) { this.dropSubscription(SubscriptionDropReason.PersistentSubscriptionDeleted, new Error("Persistent subscription deleted.")); return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); } if (dto.reason === ClientMessage.SubscriptionDropped.SubscriptionDropReason.SubscriberMaxCountReached) { this.dropSubscription(SubscriptionDropReason.MaxSubscribersReached, new Error("Maximum subscribers reached.")); return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); } this.dropSubscription(SubscriptionDropReason.UserInitiated, null, this._getConnection()); return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); } return null; }; ConnectToPersistentSubscriptionOperation.prototype._createSubscriptionObject = function(lastCommitPosition, lastEventNumber) { return new PersistentEventStoreSubscription(this, this._streamId, lastCommitPosition, lastEventNumber); }; ConnectToPersistentSubscriptionOperation.prototype.notifyEventsProcessed = function(processedEvents) { ensure.notNull(processedEvents, "processedEvents"); var dto = new ClientMessage.PersistentSubscriptionAckEvents({ subscriptionId: this._subscriptionId, processedEventIds: processedEvents.map(function (x) { return new Buffer(uuidParse.parse(x)); }) }); var pkg = new TcpPackage(TcpCommand.PersistentSubscriptionAckEvents, this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None, this._correlationId, this._userCredentials !== null ? this._userCredentials.username : null, this._userCredentials !== null ? this._userCredentials.password : null, createBufferSegment(ClientMessage.PersistentSubscriptionAckEvents.encode(dto).finish())); this._enqueueSend(pkg); }; ConnectToPersistentSubscriptionOperation.prototype.notifyEventsFailed = function(processedEvents, action, reason) { ensure.notNull(processedEvents, "processedEvents"); ensure.notNull(reason, "reason"); var dto = new ClientMessage.PersistentSubscriptionNakEvents({ subscriptionId: this._subscriptionId, processedEventIds: processedEvents.map(function(x) { return new Buffer(uuidParse.parse(x)); }), message: reason, action: action }); var pkg = new TcpPackage(TcpCommand.PersistentSubscriptionNakEvents, this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None, this._correlationId, this._userCredentials !== null ? this._userCredentials.username : null, this._userCredentials !== null ? this._userCredentials.password : null, createBufferSegment(ClientMessage.PersistentSubscriptionNakEvents.encode(dto).finish())); this._enqueueSend(pkg); }; module.exports = ConnectToPersistentSubscriptionOperation;