diff --git a/README.md b/README.md index e568f96..35a907d 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,6 @@ Incomplete/missing features: - Typed errors: currently most errors are direct instance of Error, which is not practical for error handling - Ssl connection: Ssl connetion is not implemented yet -- Persistent subscription: create/update/delete/connec to a persistent subscription are not implemented yet - Set system settings: not implemented yet - Performance: there's still some while loop in the code that could be problematic with node.js - Tests: tests are only covering happy path scenarios for now diff --git a/lib/dist.js b/lib/dist.js index fb96fb3..6542401 100644 --- a/lib/dist.js +++ b/lib/dist.js @@ -86,8 +86,10 @@ module.exports = } module.exports.EventStoreConnection = __webpack_require__(6); - module.exports.UserCredentials = __webpack_require__(53); + module.exports.UserCredentials = __webpack_require__(62); module.exports.EventData = EventData; + module.exports.PersistentSubscriptionSettings = __webpack_require__(63); + module.exports.SystemConsumerStrategies = __webpack_require__(49); module.exports.expectedVersion = expectedVersion; module.exports.positions = positions; @@ -435,6 +437,57 @@ module.exports = }); } + const PersistentSubscriptionCreateStatus = { + Success: 'success', + NotFound: 'notFound', + Failure: 'failure' + }; + + /** + * @param {string} status + * @constructor + * @property {string} status + */ + function PersistentSubscriptionCreateResult(status) { + Object.defineProperties(this, { + status: {enumerable: true, value: status} + }); + } + + const PersistentSubscriptionUpdateStatus = { + Success: 'success', + NotFound: 'notFound', + Failure: 'failure', + AccessDenied: 'accessDenied' + }; + + /** + * @param {string} status + * @constructor + * @property {string} status + */ + function PersistentSubscriptionUpdateResult(status) { + Object.defineProperties(this, { + status: {enumerable: true, value: status} + }); + } + + const PersistentSubscriptionDeleteStatus = { + Success: 'success', + Failure: 'failure' + }; + + /** + * @param {string} status + * @constructor + * @property {string} status + */ + function PersistentSubscriptionDeleteResult(status) { + Object.defineProperties(this, { + status: {enumerable: true, value: status} + }); + } + // Exports Constructors module.exports.Position = Position; module.exports.toNumber = toNumber; @@ -446,6 +499,13 @@ module.exports = module.exports.AllEventsSlice = AllEventsSlice; module.exports.DeleteResult = DeleteResult; module.exports.RawStreamMetadataResult = RawStreamMetadataResult; + module.exports.PersistentSubscriptionCreateResult = PersistentSubscriptionCreateResult; + module.exports.PersistentSubscriptionCreateStatus = PersistentSubscriptionCreateStatus; + module.exports.PersistentSubscriptionUpdateResult = PersistentSubscriptionUpdateResult; + module.exports.PersistentSubscriptionUpdateStatus = PersistentSubscriptionUpdateStatus; + module.exports.PersistentSubscriptionDeleteResult = PersistentSubscriptionDeleteResult; + module.exports.PersistentSubscriptionDeleteStatus = PersistentSubscriptionDeleteStatus; + /***/ }, /* 4 */ @@ -464,14 +524,18 @@ module.exports = throw new Error(name + " is empty."); }; + module.exports.notNull = function(value, name) { + if (value === null) + throw new Error(name + " is null."); + }; /***/ }, /* 6 */ /***/ function(module, exports, __webpack_require__) { var EventStoreNodeConnection = __webpack_require__(7); - var StaticEndpointDiscoverer = __webpack_require__(51); - var NoopLogger = __webpack_require__(52); + var StaticEndpointDiscoverer = __webpack_require__(60); + var NoopLogger = __webpack_require__(61); var defaultConnectionSettings = { log: new NoopLogger(), @@ -537,24 +601,28 @@ module.exports = var messages = __webpack_require__(9); var EventStoreConnectionLogicHandler = __webpack_require__(10); - var DeleteStreamOperation = __webpack_require__(31); - var AppendToStreamOperation = __webpack_require__(33); - var StartTransactionOperation = __webpack_require__(34); - var TransactionalWriteOperation = __webpack_require__(36); - var CommitTransactionOperation = __webpack_require__(37); - var ReadEventOperation = __webpack_require__(38); - var ReadStreamEventsForwardOperation = __webpack_require__(39); - var ReadStreamEventsBackwardOperation = __webpack_require__(43); - var ReadAllEventsForwardOperation = __webpack_require__(44); - var ReadAllEventsBackwardOperation = __webpack_require__(45); + var DeleteStreamOperation = __webpack_require__(33); + var AppendToStreamOperation = __webpack_require__(35); + var StartTransactionOperation = __webpack_require__(36); + var TransactionalWriteOperation = __webpack_require__(38); + var CommitTransactionOperation = __webpack_require__(39); + var ReadEventOperation = __webpack_require__(40); + var ReadStreamEventsForwardOperation = __webpack_require__(41); + var ReadStreamEventsBackwardOperation = __webpack_require__(45); + var ReadAllEventsForwardOperation = __webpack_require__(46); + var ReadAllEventsBackwardOperation = __webpack_require__(47); + var CreatePersistentSubscriptionOperation = __webpack_require__(48); + var UpdatePersistentSubscriptionOperation = __webpack_require__(50); + var DeletePersistentSubscriptionOperation = __webpack_require__(51); - var EventStoreTransaction = __webpack_require__(35); - var EventStoreStreamCatchUpSubscription = __webpack_require__(46); - var EventStoreAllCatchUpSubscription = __webpack_require__(48); + var EventStoreTransaction = __webpack_require__(37); + var EventStoreStreamCatchUpSubscription = __webpack_require__(52); + var EventStoreAllCatchUpSubscription = __webpack_require__(54); + var EventStorePersistentSubscription = __webpack_require__(55); var results = __webpack_require__(3); - var systemStreams = __webpack_require__(49); - var systemEventTypes = __webpack_require__(50); + var systemStreams = __webpack_require__(58); + var systemEventTypes = __webpack_require__(59); var EventData = __webpack_require__(1); /** @@ -614,7 +682,6 @@ module.exports = this._handler.enqueueMessage(new messages.CloseConnectionMessage("Connection close requested by client.", null)); }; - // --- Writing --- /** * Delete a stream (async) * @param {string} stream @@ -724,7 +791,6 @@ module.exports = }); }; - // --- Reading --- /** * Read a single event (async) * @param {string} stream @@ -859,7 +925,6 @@ module.exports = }); }; - // --- Subscriptions --- /** * Subscribe to a stream (async) * @param {!string} stream @@ -966,24 +1031,99 @@ module.exports = return catchUpSubscription; }; - EventStoreNodeConnection.prototype.connectToPersistentSubscription = function() { - //TODO: connect to persistent subscription - throw new Error("Not implemented."); + /** + * Subscribe to a persistent subscription + * @param {string} stream + * @param {string} groupName + * @param {function} eventAppeared + * @param {function} [subscriptionDropped] + * @param {UserCredentials} [userCredentials] + * @param {number} [bufferSize] + * @param {boolean} [autoAck] + */ + EventStoreNodeConnection.prototype.connectToPersistentSubscription = function( + stream, groupName, eventAppeared, subscriptionDropped, userCredentials, bufferSize, autoAck + ) { + ensure.notNullOrEmpty(groupName, "groupName"); + ensure.notNullOrEmpty(stream, "stream"); + ensure.notNull(eventAppeared, "eventAppeared"); + + subscriptionDropped = subscriptionDropped || null; + userCredentials = userCredentials || null; + bufferSize = bufferSize === undefined ? 10 : bufferSize; + autoAck = autoAck === undefined ? true : !!autoAck; + + var subscription = new EventStorePersistentSubscription( + groupName, stream, eventAppeared, subscriptionDropped, userCredentials, this._settings.log, + this._settings.verboseLogging, this._settings, this._handler, bufferSize, autoAck); + subscription.start(); + + return subscription; }; - EventStoreNodeConnection.prototype.createPersistentSubscription = function() { - //TODO: create persistent subscription - throw new Error("Not implemented."); + /** + * @param {string} stream + * @param {string} groupName + * @param {PersistentSubscriptionSettings} settings + * @param {UserCredentials} [userCredentials] + * @returns {Promise.} + */ + EventStoreNodeConnection.prototype.createPersistentSubscription = function(stream, groupName, settings, userCredentials) { + ensure.notNullOrEmpty(stream, "stream"); + ensure.notNullOrEmpty(groupName, "groupName"); + ensure.notNull(settings, "settings"); + + var self = this; + return new Promise(function(resolve, reject){ + function cb(err, result) { + if (err) return reject(err); + resolve(result); + } + self._enqueueOperation( + new CreatePersistentSubscriptionOperation(self._settings.log, cb, stream, groupName, settings, userCredentials || null)); + }); }; - EventStoreNodeConnection.prototype.updatePersistentSubscription = function() { - //TODO: update persistent subscription - throw new Error("Not implemented."); + /** + * @param {string} stream + * @param {string} groupName + * @param {string} settings + * @param {UserCredentials} [userCredentials] + * @returns {Promise.} + */ + EventStoreNodeConnection.prototype.updatePersistentSubscription = function(stream, groupName, settings, userCredentials) { + ensure.notNullOrEmpty(stream, "stream"); + ensure.notNullOrEmpty(groupName, "groupName"); + ensure.notNull(settings, "settings"); + var self = this; + return new Promise(function(resolve, reject) { + function cb(err, result) { + if (err) return reject(err); + resolve(result); + } + self._enqueueOperation( + new UpdatePersistentSubscriptionOperation(self._settings.log, cb, stream, groupName, settings, userCredentials || null)); + }); }; - EventStoreNodeConnection.prototype.deletePersistentSubscription = function() { - //TODO: delete persistent subscription - throw new Error("Not implemented."); + /** + * @param {string} stream + * @param {string} groupName + * @param {UserCredentials} [userCredentials] + * @returns {Promise.} + */ + EventStoreNodeConnection.prototype.deletePersistentSubscription = function(stream, groupName, userCredentials) { + ensure.notNullOrEmpty(stream, "stream"); + ensure.notNullOrEmpty(groupName, "groupName"); + var self = this; + return new Promise(function(resolve, reject) { + function cb(err, result) { + if (err) return reject(err); + resolve(result); + } + self._enqueueOperation( + new DeletePersistentSubscriptionOperation(self._settings.log, cb, stream, groupName, userCredentials || null)); + }); }; EventStoreNodeConnection.prototype.setStreamMetadata = function() { @@ -1084,6 +1224,7 @@ module.exports = /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); + var ensure = __webpack_require__(5); function Message() { } @@ -1161,6 +1302,22 @@ module.exports = function TimerTickMessage() {} util.inherits(TimerTickMessage, Message); + function StartPersistentSubscriptionMessage( + cb, subscriptionId, streamId, bufferSize, userCredentials, eventAppeared, subscriptionDropped, + maxRetries, operationTimeout + ) { + this.cb = cb; + this.subscriptionId = subscriptionId; + this.streamId = streamId; + this.bufferSize = bufferSize; + this.userCredentials = userCredentials; + this.eventAppeared = eventAppeared; + this.subscriptionDropped = subscriptionDropped; + this.maxRetries = maxRetries; + this.timeout = operationTimeout; + } + util.inherits(StartPersistentSubscriptionMessage, Message); + module.exports = { StartConnectionMessage: StartConnectionMessage, CloseConnectionMessage: CloseConnectionMessage, @@ -1171,7 +1328,8 @@ module.exports = TcpConnectionErrorMessage: TcpConnectionErrorMessage, TcpConnectionEstablishedMessage: TcpConnectionEstablishedMessage, TcpConnectionClosedMessage: TcpConnectionClosedMessage, - TimerTickMessage: TimerTickMessage + TimerTickMessage: TimerTickMessage, + StartPersistentSubscriptionMessage: StartPersistentSubscriptionMessage }; @@ -1188,6 +1346,7 @@ module.exports = var OperationsManager = __webpack_require__(20); var SubscriptionsManager = __webpack_require__(22); var VolatileSubscriptionOperation = __webpack_require__(24); + var ConnectToPersistentSubscriptionOperation = __webpack_require__(31); var messages = __webpack_require__(9); var TcpPackage = __webpack_require__(17); @@ -1251,6 +1410,9 @@ module.exports = this._queue.registerHandler(messages.StartSubscriptionMessage, function(msg) { self._startSubscription(msg); }); + this._queue.registerHandler(messages.StartPersistentSubscriptionMessage, function(msg) { + self._startPersistentSubscription(msg); + }); this._queue.registerHandler(messages.EstablishTcpConnectionMessage, function(msg) { self._establishTcpConnection(msg.endPoints); @@ -1430,7 +1592,7 @@ module.exports = isSubscribed: false }; subscriptionItem.toString = (function(){ - return util.format("Subscription %s (%s): %s, is subscribed: %s, retry count: %d, created: %d, last updated: %d", + return util.format("Subscription %s (%s): %s, is subscribed: %s, retry count: %d, created: %s, last updated: %s", this.operation.constructor.name, this.correlationId, this.operation, this.isSubscribed, this.retryCount, new Date(this.createdTime).toISOString().substr(11,12), new Date(this.lastUpdated).toISOString().substr(11,12)); @@ -1450,7 +1612,9 @@ module.exports = var operation = new VolatileSubscriptionOperation(this._settings.log, msg.cb, msg.streamId, msg.resolveLinkTos, msg.userCredentials, msg.eventAppeared, msg.subscriptionDropped, this._settings.verboseLogging, function() { return self._connection }); - this._logDebug("StartSubscription %s %s, %s, %d, %d.", operation.constructor.name, operation, msg.maxRetries, msg.timeout, this._state === ConnectionState.Connected ? "fire" : "enqueue"); + this._logDebug("StartSubscription %s %s, %s, %d, %d.", + this._state === ConnectionState.Connected ? "fire" : "enqueue", + operation.constructor.name, operation, msg.maxRetries, msg.timeout); var subscription = createSubscriptionItem(operation, msg.maxRetries, msg.timeout); if (this._state === ConnectionState.Connecting) this._subscriptions.enqueueSubscription(subscription); @@ -1465,6 +1629,34 @@ module.exports = } }; + EventStoreConnectionLogicHandler.prototype._startPersistentSubscription = function(msg) { + var self = this; + switch (this._state) + { + case ConnectionState.Init: + msg.cb(new Error(util.format("EventStoreConnection '%s' is not active.", this._esConnection.connectionName))); + break; + case ConnectionState.Connecting: + case ConnectionState.Connected: + var operation = new ConnectToPersistentSubscriptionOperation(this._settings.log, msg.cb, msg.subscriptionId, + msg.bufferSize, msg.streamId, msg.userCredentials, msg.eventAppeared, msg.subscriptionDropped, + this._settings.verboseLogging, function() { return self._connection }); + this._logDebug("StartSubscription %s %s, %s, %d, %d.", + this._state === ConnectionState.Connected ? "fire" : "enqueue", + operation.constructor.name, operation, msg.maxRetries, msg.timeout); + var subscription = createSubscriptionItem(operation, msg.maxRetries, msg.timeout); + if (this._state === ConnectionState.Connecting) + this._subscriptions.enqueueSubscription(subscription); + else + this._subscriptions.startSubscription(subscription, this._connection); + break; + case ConnectionState.Closed: + msg.cb(new Error("Connection closed. " + this._esConnection.connectionName)); + break; + default: throw new Error(util.format("Unknown state: %s.", this._state)); + } + }; + EventStoreConnectionLogicHandler.prototype._establishTcpConnection = function(endPoints) { var endPoint = this._settings.useSslConnection ? endPoints.secureTcpEndPoint : endPoints.tcpEndPoint; if (endPoint == null) @@ -1578,7 +1770,7 @@ module.exports = this._state = ConnectionState.Connecting; this._connectingPhase = ConnectingPhase.Reconnecting; - this._logDebug("TCP connection to [%j, L%j, %s] closed.", connection.remoteEndPoint, connection.localEndPoint, connection.connectionId); + this._logDebug("TCP connection to [%j, L%j, %s] closed. %s", connection.remoteEndPoint, connection.localEndPoint, connection.connectionId, error); this._subscriptions.purgeSubscribedAndDroppedSubscriptions(this._connection.connectionId); this._reconnInfo = { @@ -2464,6 +2656,17 @@ module.exports = StreamEventAppeared: 0xC2, UnsubscribeFromStream: 0xC3, SubscriptionDropped: 0xC4, + ConnectToPersistentSubscription: 0xC5, + PersistentSubscriptionConfirmation: 0xC6, + PersistentSubscriptionStreamEventAppeared: 0xC7, + CreatePersistentSubscription: 0xC8, + CreatePersistentSubscriptionCompleted: 0xC9, + DeletePersistentSubscription: 0xCA, + DeletePersistentSubscriptionCompleted: 0xCB, + PersistentSubscriptionAckEvents: 0xCC, + PersistentSubscriptionNakEvents: 0xCD, + UpdatePersistentSubscription: 0xCE, + UpdatePersistentSubscriptionCompleted: 0xCF, ScavengeDatabase: 0xD0, ScavengeDatabaseCompleted: 0xD1, @@ -2896,6 +3099,9 @@ module.exports = CatchUpError: 'catchUpError', ConnectionClosed: 'connectionClosed', EventHandlerException: 'eventHandlerException', + MaxSubscribersReached: 'maxSubscribersReached', + NotFound: 'notFound', + PersistentSubscriptionDeleted: 'persistentSubscriptionDeleted', ProcessingQueueOverflow: 'processingQueueOverflow', ServerError: 'serverError', SubscribingError: 'subscribingError', @@ -2937,7 +3143,7 @@ module.exports = this._correlationId, this._userCredentials != null ? this._userCredentials.username : null, this._userCredentials != null ? this._userCredentials.password : null, - new BufferSegment(dto.encode().toBuffer())); + new BufferSegment(dto.toBuffer())); }; VolatileSubscriptionOperation.prototype._inspectPackage = function(pkg) { @@ -3010,6 +3216,10 @@ module.exports = this._actionQueue = []; } + SubscriptionOperation.prototype._enqueueSend = function(pkg) { + this._getConnection().enqueueSend(pkg); + }; + SubscriptionOperation.prototype.subscribe = function(correlationId, connection) { if (connection === null) throw new TypeError("connection is null."); @@ -3031,7 +3241,7 @@ module.exports = SubscriptionOperation.prototype._createUnsubscriptionPackage = function() { var msg = new ClientMessage.UnsubscribeFromStream(); - var data = new BufferSegment(msg.encode().toBuffer()); + var data = new BufferSegment(msg.toBuffer()); return new TcpPackage(TcpCommand.UnsubscribeFromStream, TcpFlags.None, this._correlationId, null, null, data); }; @@ -3153,11 +3363,12 @@ module.exports = this._log.debug("Subscription %s to %s: closing subscription, reason: %s, exception: %s...", this._correlationId, this._streamId || "", reason, err); - if (reason !== SubscriptionDropReason.UserInitiated) + if (reason !== SubscriptionDropReason.UserInitiated && this._subscription === null) { if (err === null) throw new Error(util.format("No exception provided for subscription drop reason '%s", reason)); //TODO: this should be last thing to execute this._cb(err); + return; } if (reason === SubscriptionDropReason.UserInitiated && this._subscription !== null && connection !== null) @@ -3194,7 +3405,7 @@ module.exports = if (this._subscription === null) throw new Error("Subscription not confirmed, but event appeared!"); if (this._verboseLogging) - this._log.debug("Subscription %s to %s: event appeared (%s, %d, %s @ %j).", + this._log.debug("Subscription %s to %s: event appeared (%s, %d, %s @ %s).", this._correlationId, this._streamId || "", e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition); @@ -3221,7 +3432,7 @@ module.exports = } catch (err) { - this._log.error(err, "Exception during executing user callback: %s.", err.Message); + this._log.error(err, "Exception during executing user callback: %s.", err.message); } action = this._actionQueue.shift(); } @@ -3356,13 +3567,172 @@ module.exports = var util = __webpack_require__(4); var uuid = __webpack_require__(2); + var SubscriptionOperation = __webpack_require__(25); + var ClientMessage = __webpack_require__(28); + var TcpCommand = __webpack_require__(19); + var TcpFlags = __webpack_require__(18); + var TcpPackage = __webpack_require__(17); + var createBufferSegment = __webpack_require__(14); + var InspectionResult = __webpack_require__(27); + var InspectionDecision = __webpack_require__(26); + var results = __webpack_require__(3); + var SubscriptionDropReason = __webpack_require__(23); + var PersistentEventStoreSubscription = __webpack_require__(32); + var ensure = __webpack_require__(5); + + function ConnectToPersistentSubscriptionOperation( + log, cb, groupName, bufferSize, streamId, userCredentials, eventAppeared, subscriptionDropped, + verboseLogging, getConnection + ) { + SubscriptionOperation.call(this, log, cb, streamId, false, userCredentials, eventAppeared, subscriptionDropped, verboseLogging, getConnection); + + this._groupName = groupName; + this._bufferSize = bufferSize; + this._subscriptionId = null; + } + util.inherits(ConnectToPersistentSubscriptionOperation, SubscriptionOperation); + + ConnectToPersistentSubscriptionOperation.prototype._createSubscriptionPackage = function() { + var dto = new ClientMessage.ConnectToPersistentSubscription(this._groupName, this._streamId, this._bufferSize); + return new TcpPackage(TcpCommand.ConnectToPersistentSubscription, + this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None, + this._correlationId, + this._userCredentials !== null ? this._userCredentials.username : null, + this._userCredentials !== null ? this._userCredentials.password : null, + createBufferSegment(dto.toBuffer())); + }; + + ConnectToPersistentSubscriptionOperation.prototype._inspectPackage = function(pkg) { + if (pkg.command == TcpCommand.PersistentSubscriptionConfirmation) + { + var dto = ClientMessage.PersistentSubscriptionConfirmation.decode(pkg.data.toBuffer()); + this._confirmSubscription(dto.last_commit_position, dto.last_event_number); + this._subscriptionId = dto.subscription_id; + return new InspectionResult(InspectionDecision.Subscribed, "SubscriptionConfirmation"); + } + if (pkg.command == TcpCommand.PersistentSubscriptionStreamEventAppeared) + { + var dto = ClientMessage.PersistentSubscriptionStreamEventAppeared.decode(pkg.data.toBuffer()); + this._onEventAppeared(new results.ResolvedEvent(dto.event)); + return new InspectionResult(InspectionDecision.DoNothing, "StreamEventAppeared"); + } + if (pkg.command == TcpCommand.SubscriptionDropped) + { + var dto = ClientMessage.SubscriptionDropped.decode(pkg.data.toBuffer()); + if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.AccessDenied) + { + this.dropSubscription(SubscriptionDropReason.AccessDenied, new Error("You do not have access to the stream.")); + return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); + } + if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.NotFound) + { + this.dropSubscription(SubscriptionDropReason.NotFound, new Error("Subscription not found")); + return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); + } + if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.PersistentSubscriptionDeleted) + { + this.dropSubscription(SubscriptionDropReason.PersistentSubscriptionDeleted, new Error("Persistent subscription deleted.")); + return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); + } + if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.SubscriberMaxCountReached) + { + this.dropSubscription(SubscriptionDropReason.MaxSubscribersReached, new Error("Maximum subscribers reached.")); + return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); + } + this.dropSubscription(SubscriptionDropReason.UserInitiated, null, this._getConnection()); + return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); + } + return null; + }; + + ConnectToPersistentSubscriptionOperation.prototype._createSubscriptionObject = function(lastCommitPosition, lastEventNumber) { + return new PersistentEventStoreSubscription(this, this._streamId, lastCommitPosition, lastEventNumber); + }; + + ConnectToPersistentSubscriptionOperation.prototype.notifyEventsProcessed = function(processedEvents) { + ensure.notNull(processedEvents, "processedEvents"); + var dto = new ClientMessage.PersistentSubscriptionAckEvents({ + subscription_id: this._subscriptionId, + processed_event_ids: processedEvents.map(function (x) { + return new Buffer(uuid.parse(x)); + }) + }); + + var pkg = new TcpPackage(TcpCommand.PersistentSubscriptionAckEvents, + this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None, + this._correlationId, + this._userCredentials !== null ? this._userCredentials.username : null, + this._userCredentials !== null ? this._userCredentials.password : null, + createBufferSegment(dto.encode().toBuffer())); + this._enqueueSend(pkg); + }; + + ConnectToPersistentSubscriptionOperation.prototype.notifyEventsFailed = function(processedEvents, action, reason) { + ensure.notNull(processedEvents, "processedEvents"); + ensure.notNull(reason, "reason"); + var dto = new ClientMessage.PersistentSubscriptionNakEvents( + this._subscriptionId, + processedEvents.map(function(x) { return new Buffer(uuid.parse(x)); }), + reason, + action); + + var pkg = new TcpPackage(TcpCommand.PersistentSubscriptionNakEvents, + this._userCredentials != null ? TcpFlags.Authenticated : TcpFlags.None, + this._correlationId, + this._userCredentials != null ? this._userCredentials.username : null, + this._userCredentials != null ? this._userCredentials.password : null, + createBufferSegment(dto.toBuffer())); + this._enqueueSend(pkg); + }; + + module.exports = ConnectToPersistentSubscriptionOperation; + + +/***/ }, +/* 32 */ +/***/ function(module, exports, __webpack_require__) { + + var util = __webpack_require__(4); + + var EventStoreSubscription = __webpack_require__(30); + + + function PersistentEventStoreSubscription(subscriptionOperation, streamId, lastCommitPosition, lastEventNumber) { + EventStoreSubscription.call(this, streamId, lastCommitPosition, lastEventNumber); + + this._subscriptionOperation = subscriptionOperation; + } + util.inherits(PersistentEventStoreSubscription, EventStoreSubscription); + + PersistentEventStoreSubscription.prototype.unsubscribe = function() { + this._subscriptionOperation.unsubscribe(); + }; + + PersistentEventStoreSubscription.prototype.notifyEventsProcessed = function(processedEvents) { + this._subscriptionOperation.notifyEventsProcessed(processedEvents); + }; + + PersistentEventStoreSubscription.prototype.notifyEventsFailed = function(processedEvents, action, reason) { + this._subscriptionOperation.notifyEventsFailed(processedEvents, action, reason); + }; + + module.exports = PersistentEventStoreSubscription; + + +/***/ }, +/* 33 */ +/***/ function(module, exports, __webpack_require__) { + + var util = __webpack_require__(4); + var uuid = __webpack_require__(2); + var TcpCommand = __webpack_require__(19); var InspectionDecision = __webpack_require__(26); var InspectionResult = __webpack_require__(27); var ClientMessage = __webpack_require__(28); var results = __webpack_require__(3); - var OperationBase = __webpack_require__(32); + var OperationBase = __webpack_require__(34); function DeleteStreamOperation(log, cb, requireMaster, stream, expectedVersion, hardDelete, userCredentials) { @@ -3421,7 +3791,7 @@ module.exports = module.exports = DeleteStreamOperation; /***/ }, -/* 32 */ +/* 34 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); @@ -3477,7 +3847,7 @@ module.exports = OperationBase.prototype.createNetworkPackage = function(correlationId) { var dto = this._createRequestDto(); - var buf = dto.encode().toBuffer(); + var buf = dto.toBuffer(); return new TcpPackage( this._requestCommand, this.userCredentials ? TcpFlags.Authenticated : TcpFlags.None, @@ -3575,7 +3945,7 @@ module.exports = /***/ }, -/* 33 */ +/* 35 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); @@ -3588,7 +3958,7 @@ module.exports = var WriteResult = __webpack_require__(3).WriteResult; var Position = __webpack_require__(3).Position; - var OperationBase = __webpack_require__(32); + var OperationBase = __webpack_require__(34); function AppendToStreamOperation(log, cb, requireMaster, stream, expectedVersion, events, userCredentials) { OperationBase.call(this, log, cb, TcpCommand.WriteEvents, TcpCommand.WriteEventsCompleted, userCredentials); @@ -3661,7 +4031,7 @@ module.exports = /***/ }, -/* 34 */ +/* 36 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); @@ -3671,10 +4041,10 @@ module.exports = var InspectionDecision = __webpack_require__(26); var InspectionResult = __webpack_require__(27); var ClientMessage = __webpack_require__(28); - var EventStoreTransaction = __webpack_require__(35); + var EventStoreTransaction = __webpack_require__(37); var results = __webpack_require__(3); - var OperationBase = __webpack_require__(32); + var OperationBase = __webpack_require__(34); function StartTransactionOperation(log, cb, requireMaster, stream, expectedVersion, parentConnection, userCredentials) { OperationBase.call(this, log, cb, TcpCommand.TransactionStart, TcpCommand.TransactionStartCompleted, userCredentials); @@ -3733,7 +4103,7 @@ module.exports = /***/ }, -/* 35 */ +/* 37 */ /***/ function(module, exports) { /** @@ -3791,7 +4161,7 @@ module.exports = module.exports = EventStoreTransaction; /***/ }, -/* 36 */ +/* 38 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); @@ -3802,7 +4172,7 @@ module.exports = var InspectionResult = __webpack_require__(27); var ClientMessage = __webpack_require__(28); - var OperationBase = __webpack_require__(32); + var OperationBase = __webpack_require__(34); function TransactionalWriteOperation(log, cb, requireMaster, transactionId, events, userCredentials) { @@ -3859,7 +4229,7 @@ module.exports = /***/ }, -/* 37 */ +/* 39 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); @@ -3871,7 +4241,7 @@ module.exports = var ClientMessage = __webpack_require__(28); var results = __webpack_require__(3); - var OperationBase = __webpack_require__(32); + var OperationBase = __webpack_require__(34); function CommitTransactionOperation(log, cb, requireMaster, transactionId, userCredentials) { @@ -3929,7 +4299,7 @@ module.exports = module.exports = CommitTransactionOperation; /***/ }, -/* 38 */ +/* 40 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); @@ -3940,7 +4310,7 @@ module.exports = var InspectionDecision = __webpack_require__(26); var results = __webpack_require__(3); - var OperationBase = __webpack_require__(32); + var OperationBase = __webpack_require__(34); function ReadEventOperation(log, cb, stream, eventNumber, resolveLinkTos, requireMaster, userCredentials) { OperationBase.call(this, log, cb, TcpCommand.ReadEvent, TcpCommand.ReadEventCompleted, userCredentials); @@ -4014,7 +4384,7 @@ module.exports = /***/ }, -/* 39 */ +/* 41 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); @@ -4022,13 +4392,13 @@ module.exports = var TcpCommand = __webpack_require__(19); var ClientMessage = __webpack_require__(28); - var ReadDirection = __webpack_require__(40); - var StatusCode = __webpack_require__(41); + var ReadDirection = __webpack_require__(42); + var StatusCode = __webpack_require__(43); var InspectionResult = __webpack_require__(27); var InspectionDecision = __webpack_require__(26); var results = __webpack_require__(3); - var OperationBase = __webpack_require__(32); + var OperationBase = __webpack_require__(34); function ReadStreamEventsForwardOperation( log, cb, stream, fromEventNumber, maxCount, resolveLinkTos, requireMaster, userCredentials @@ -4093,7 +4463,7 @@ module.exports = /***/ }, -/* 40 */ +/* 42 */ /***/ function(module, exports) { const ReadDirection = { @@ -4105,11 +4475,11 @@ module.exports = /***/ }, -/* 41 */ +/* 43 */ /***/ function(module, exports, __webpack_require__) { var ClientMessage = __webpack_require__(28); - var SliceReadStatus = __webpack_require__(42); + var SliceReadStatus = __webpack_require__(44); module.exports = {}; @@ -4127,7 +4497,7 @@ module.exports = }; /***/ }, -/* 42 */ +/* 44 */ /***/ function(module, exports) { const SliceReadStatus = { @@ -4140,7 +4510,7 @@ module.exports = /***/ }, -/* 43 */ +/* 45 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); @@ -4148,13 +4518,13 @@ module.exports = var TcpCommand = __webpack_require__(19); var ClientMessage = __webpack_require__(28); - var ReadDirection = __webpack_require__(40); - var StatusCode = __webpack_require__(41); + var ReadDirection = __webpack_require__(42); + var StatusCode = __webpack_require__(43); var InspectionResult = __webpack_require__(27); var InspectionDecision = __webpack_require__(26); var results = __webpack_require__(3); - var OperationBase = __webpack_require__(32); + var OperationBase = __webpack_require__(34); function ReadStreamEventsBackwardOperation( log, cb, stream, fromEventNumber, maxCount, resolveLinkTos, requireMaster, userCredentials @@ -4219,7 +4589,7 @@ module.exports = /***/ }, -/* 44 */ +/* 46 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); @@ -4227,12 +4597,12 @@ module.exports = var TcpCommand = __webpack_require__(19); var ClientMessage = __webpack_require__(28); - var ReadDirection = __webpack_require__(40); + var ReadDirection = __webpack_require__(42); var InspectionResult = __webpack_require__(27); var InspectionDecision = __webpack_require__(26); var results = __webpack_require__(3); - var OperationBase = __webpack_require__(32); + var OperationBase = __webpack_require__(34); function ReadAllEventsForwardOperation( log, cb, position, maxCount, resolveLinkTos, requireMaster, userCredentials @@ -4286,7 +4656,7 @@ module.exports = /***/ }, -/* 45 */ +/* 47 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); @@ -4294,12 +4664,12 @@ module.exports = var TcpCommand = __webpack_require__(19); var ClientMessage = __webpack_require__(28); - var ReadDirection = __webpack_require__(40); + var ReadDirection = __webpack_require__(42); var InspectionResult = __webpack_require__(27); var InspectionDecision = __webpack_require__(26); var results = __webpack_require__(3); - var OperationBase = __webpack_require__(32); + var OperationBase = __webpack_require__(34); function ReadAllEventsBackwardOperation( log, cb, position, maxCount, resolveLinkTos, requireMaster, userCredentials @@ -4353,13 +4723,247 @@ module.exports = /***/ }, -/* 46 */ +/* 48 */ +/***/ function(module, exports, __webpack_require__) { + + var util = __webpack_require__(4); + var uuid = __webpack_require__(2); + + var ensure = __webpack_require__(5); + var OperationBase = __webpack_require__(34); + var TcpCommand = __webpack_require__(19); + var ClientMessage = __webpack_require__(28); + var SystemConsumerStrategies = __webpack_require__(49); + var InspectionDecision = __webpack_require__(26); + var InspectionResult = __webpack_require__(27); + var results = __webpack_require__(3); + + + function CreatePersistentSubscriptionOperation(log, cb, stream, groupName, settings, userCredentials) { + OperationBase.call(this, log, cb, TcpCommand.CreatePersistentSubscription, TcpCommand.CreatePersistentSubscriptionCompleted, userCredentials); + + ensure.notNull(settings, "settings"); + this._resolveLinkTos = settings.resolveLinkTos; + this._stream = stream; + this._groupName = groupName; + this._startFromBeginning = settings.startFrom; + this._maxRetryCount = settings.maxRetryCount; + this._liveBufferSize = settings.liveBufferSize; + this._readBatchSize = settings.readBatchSize; + this._bufferSize = settings.historyBufferSize; + this._recordStatistics = settings.extraStatistics; + this._messageTimeoutMilliseconds = settings.messageTimeout; + this._checkPointAfter = settings.checkPointAfter; + this._minCheckPointCount = settings.minCheckPointCount; + this._maxCheckPointCount = settings.maxCheckPointCount; + this._maxSubscriberCount = settings.maxSubscriberCount; + this._namedConsumerStrategy = settings.namedConsumerStrategy; + + this._responseType = ClientMessage.CreatePersistentSubscriptionCompleted; + } + util.inherits(CreatePersistentSubscriptionOperation, OperationBase); + + CreatePersistentSubscriptionOperation.prototype._createRequestDto = function() { + return new ClientMessage.CreatePersistentSubscription(this._groupName, this._stream, this._resolveLinkTos, + this._startFromBeginning, this._messageTimeoutMilliseconds, this._recordStatistics, this._liveBufferSize, + this._readBatchSize, this._bufferSize, this._maxRetryCount, + this._namedConsumerStrategy == SystemConsumerStrategies.RoundRobin, this._checkPointAfter, + this._maxCheckPointCount, this._minCheckPointCount, this._maxSubscriberCount, this._namedConsumerStrategy); + }; + + CreatePersistentSubscriptionOperation.prototype._inspectResponse = function(response) { + switch (response.result) + { + case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.Success: + this._succeed(); + return new InspectionResult(InspectionDecision.EndOperation, "Success"); + case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.Fail: + this.fail(new Error(util.format("Subscription group %s on stream %s failed '%s'", this._groupName, this._stream, response.reason))); + return new InspectionResult(InspectionDecision.EndOperation, "Fail"); + case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.AccessDenied: + this.fail(new Error(util.format("Write access denied for stream '%s'.", this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied"); + case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.AlreadyExists: + this.fail(new Error(util.format("Subscription group %s on stream %s already exists", this._groupName, this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "AlreadyExists"); + default: + throw new Error(util.format("Unexpected OperationResult: %s.", response.result)); + } + }; + + CreatePersistentSubscriptionOperation.prototype._transformResponse = function(response) { + return new results.PersistentSubscriptionCreateResult(results.PersistentSubscriptionCreateStatus.Success); + }; + + CreatePersistentSubscriptionOperation.prototype.toString = function() { + return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName); + }; + + module.exports = CreatePersistentSubscriptionOperation; + + +/***/ }, +/* 49 */ +/***/ function(module, exports) { + + const SystemConsumerStrategies = { + DispatchToSingle: 'DispatchToSingle', + RoundRobin: 'RoundRobin', + Pinned: 'Pinned' + }; + + module.exports = SystemConsumerStrategies; + + +/***/ }, +/* 50 */ +/***/ function(module, exports, __webpack_require__) { + + var util = __webpack_require__(4); + var uuid = __webpack_require__(2); + + var ensure = __webpack_require__(5); + var OperationBase = __webpack_require__(34); + var TcpCommand = __webpack_require__(19); + var ClientMessage = __webpack_require__(28); + var SystemConsumerStrategies = __webpack_require__(49); + var InspectionDecision = __webpack_require__(26); + var InspectionResult = __webpack_require__(27); + var results = __webpack_require__(3); + + + function UpdatePersistentSubscriptionOperation(log, cb, stream, groupName, settings, userCredentials) { + OperationBase.call(this, log, cb, TcpCommand.UpdatePersistentSubscription, TcpCommand.UpdatePersistentSubscriptionCompleted, userCredentials); + + ensure.notNull(settings, "settings"); + this._resolveLinkTos = settings.resolveLinkTos; + this._stream = stream; + this._groupName = groupName; + this._startFromBeginning = settings.startFrom; + this._maxRetryCount = settings.maxRetryCount; + this._liveBufferSize = settings.liveBufferSize; + this._readBatchSize = settings.readBatchSize; + this._bufferSize = settings.historyBufferSize; + this._recordStatistics = settings.extraStatistics; + this._messageTimeoutMilliseconds = settings.messageTimeout; + this._checkPointAfter = settings.checkPointAfter; + this._minCheckPointCount = settings.minCheckPointCount; + this._maxCheckPointCount = settings.maxCheckPointCount; + this._maxSubscriberCount = settings.maxSubscriberCount; + this._namedConsumerStrategy = settings.namedConsumerStrategy; + + this._responseType = ClientMessage.UpdatePersistentSubscriptionCompleted; + } + util.inherits(UpdatePersistentSubscriptionOperation, OperationBase); + + UpdatePersistentSubscriptionOperation.prototype._createRequestDto = function() { + return new ClientMessage.UpdatePersistentSubscription(this._groupName, this._stream, this._resolveLinkTos, + this._startFromBeginning, this._messageTimeoutMilliseconds, this._recordStatistics, this._liveBufferSize, + this._readBatchSize, this._bufferSize, this._maxRetryCount, + this._namedConsumerStrategy == SystemConsumerStrategies.RoundRobin, this._checkPointAfter, + this._maxCheckPointCount, this._minCheckPointCount, this._maxSubscriberCount, this._namedConsumerStrategy); + }; + + UpdatePersistentSubscriptionOperation.prototype._inspectResponse = function(response) { + switch (response.result) + { + case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.Success: + this._succeed(); + return new InspectionResult(InspectionDecision.EndOperation, "Success"); + case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.Fail: + this.fail(new Error(util.format("Subscription group %s on stream %s failed '%s'", this._groupName, this._stream, response.reason))); + return new InspectionResult(InspectionDecision.EndOperation, "Fail"); + case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.AccessDenied: + this.fail(new Error(util.format("Write access denied for stream '%s'.", this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied"); + case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.DoesNotExist: + this.fail(new Error(util.format("Subscription group %s on stream %s does not exist", this._groupName, this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "DoesNotExist"); + default: + throw new Error(util.format("Unexpected OperationResult: %s.", response.result)); + } + }; + + UpdatePersistentSubscriptionOperation.prototype._transformResponse = function(response) { + return new results.PersistentSubscriptionUpdateResult(results.PersistentSubscriptionUpdateStatus.Success); + }; + + UpdatePersistentSubscriptionOperation.prototype.toString = function() { + return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName); + }; + + module.exports = UpdatePersistentSubscriptionOperation; + + +/***/ }, +/* 51 */ +/***/ function(module, exports, __webpack_require__) { + + var util = __webpack_require__(4); + var uuid = __webpack_require__(2); + + var ensure = __webpack_require__(5); + var OperationBase = __webpack_require__(34); + var TcpCommand = __webpack_require__(19); + var ClientMessage = __webpack_require__(28); + var InspectionDecision = __webpack_require__(26); + var InspectionResult = __webpack_require__(27); + var results = __webpack_require__(3); + + + function DeletePersistentSubscriptionOperation(log, cb, stream, groupName, userCredentials) { + OperationBase.call(this, log, cb, TcpCommand.DeletePersistentSubscription, TcpCommand.DeletePersistentSubscriptionCompleted, userCredentials); + + this._stream = stream; + this._groupName = groupName; + + this._responseType = ClientMessage.DeletePersistentSubscriptionCompleted; + } + util.inherits(DeletePersistentSubscriptionOperation, OperationBase); + + DeletePersistentSubscriptionOperation.prototype._createRequestDto = function() { + return new ClientMessage.DeletePersistentSubscription(this._groupName, this._stream); + }; + + DeletePersistentSubscriptionOperation.prototype._inspectResponse = function(response) { + switch (response.result) + { + case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.Success: + this._succeed(); + return new InspectionResult(InspectionDecision.EndOperation, "Success"); + case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.Fail: + this.fail(new Error(util.format("Subscription group %s on stream %s failed '%s'", this._groupName, this._stream, response.reason))); + return new InspectionResult(InspectionDecision.EndOperation, "Fail"); + case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.AccessDenied: + this.fail(new Error(util.format("Write access denied for stream '%s'.", this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied"); + case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.DoesNotExist: + this.fail(new Error(util.format("Subscription group %s on stream %s does not exist", this._groupName, this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "DoesNotExist"); + default: + throw new Error(util.format("Unexpected OperationResult: %s.", response.result)); + } + }; + + DeletePersistentSubscriptionOperation.prototype._transformResponse = function(response) { + return new results.PersistentSubscriptionDeleteResult(results.PersistentSubscriptionDeleteStatus.Success); + }; + + DeletePersistentSubscriptionOperation.prototype.toString = function() { + return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName); + }; + + module.exports = DeletePersistentSubscriptionOperation; + + +/***/ }, +/* 52 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); - var EventStoreCatchUpSubscription = __webpack_require__(47); - var SliceReadStatus = __webpack_require__(42); + var EventStoreCatchUpSubscription = __webpack_require__(53); + var SliceReadStatus = __webpack_require__(44); function EventStoreStreamCatchUpSubscription( connection, log, streamId, fromEventNumberExclusive, resolveLinkTos, userCredentials, @@ -4451,7 +5055,7 @@ module.exports = /***/ }, -/* 47 */ +/* 53 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); @@ -4709,12 +5313,12 @@ module.exports = module.exports = EventStoreCatchUpSubscription; /***/ }, -/* 48 */ +/* 54 */ /***/ function(module, exports, __webpack_require__) { var util = __webpack_require__(4); - var EventStoreCatchUpSubscription = __webpack_require__(47); + var EventStoreCatchUpSubscription = __webpack_require__(53); var results = __webpack_require__(3); @@ -4799,7 +5403,240 @@ module.exports = /***/ }, -/* 49 */ +/* 55 */ +/***/ function(module, exports, __webpack_require__) { + + var util = __webpack_require__(4); + + var EventStorePersistentSubscriptionBase = __webpack_require__(56); + var messages = __webpack_require__(9); + + function EventStorePersistentSubscription( + subscriptionId, streamId, eventAppeared, subscriptionDropped, userCredentials, log, verboseLogging, settings, + handler, bufferSize, autoAck + ) { + bufferSize = bufferSize === undefined ? 10 : bufferSize; + autoAck = autoAck === undefined ? true : !!autoAck; + + EventStorePersistentSubscriptionBase.call(this, subscriptionId, streamId, eventAppeared, subscriptionDropped, + userCredentials, log, verboseLogging, settings, bufferSize, autoAck); + + this._handler = handler; + } + util.inherits(EventStorePersistentSubscription, EventStorePersistentSubscriptionBase); + + EventStorePersistentSubscription.prototype._startSubscription = function( + subscriptionId, streamId, bufferSize, userCredentials, onEventAppeared, onSubscriptionDropped, settings + ) { + var self = this; + return new Promise(function(resolve, reject){ + function cb(err, result) { + if (err) return reject(err); + resolve(result); + } + self._handler.enqueueMessage(new messages.StartPersistentSubscriptionMessage(cb, subscriptionId, streamId, + bufferSize, userCredentials, onEventAppeared, onSubscriptionDropped, settings.maxRetries, + settings.operationTimeout)); + }); + }; + + module.exports = EventStorePersistentSubscription; + +/***/ }, +/* 56 */ +/***/ function(module, exports, __webpack_require__) { + + var util = __webpack_require__(4); + var ensure = __webpack_require__(5); + var PersistentSubscriptionNakEventAction = __webpack_require__(57); + var SubscriptionDropReason = __webpack_require__(23); + + function DropSubscriptionEvent() {} + + function EventStorePersistentSubscriptionBase( + subscriptionId, streamId, + eventAppeared, subscriptionDropped, + userCredentials, log, verboseLogging, settings, bufferSize, autoAck + ) { + bufferSize = bufferSize === undefined ? 10 : bufferSize; + autoAck = autoAck === undefined ? true : autoAck; + + this._subscriptionId = subscriptionId; + this._streamId = streamId; + this._eventAppeared = eventAppeared; + this._subscriptionDropped = subscriptionDropped; + this._userCredentials = userCredentials; + this._log = log; + this._verbose = verboseLogging; + this._settings = settings; + this._bufferSize = bufferSize; + this._autoAck = autoAck; + + this._subscription = null; + this._dropData = null; + this._queue = []; + this._isProcessing = false; + this._isDropped = false; + } + + EventStorePersistentSubscriptionBase.prototype.start = function() { + this._stopped = false; + + var self = this; + this._startSubscription(this._subscriptionId, this._streamId, this._bufferSize, this._userCredentials, + this._onEventAppeared.bind(this), this._onSubscriptionDropped.bind(this), this._settings) + .then(function(subscription) { + self._subscription = subscription; + }); + }; + + EventStorePersistentSubscriptionBase.prototype._startSubscription = function() { + throw new Error("EventStorePersistentSubscriptionBase._startSubscription abstract method called." + + this.constructor.name); + }; + + /** + * @param {ResolvedEvent[]|ResolvedEvent} events + */ + EventStorePersistentSubscriptionBase.prototype.acknowledge = function(events) { + ensure.notNull(events, "events"); + + if (this._subscription === null) throw new Error("Invalid operation. Subscription is not active yet."); + if (!Array.isArray(events)) + events = [events]; + var ids = events.map(function(x) { return x.originalEvent.eventId; }); + this._subscription.notifyEventsProcessed(ids); + }; + + /** + * @param {ResolvedEvent[]|ResolvedEvent} events + * @param {number} action One of PersistentSubscriptionNakEventAction + * @param {string} reason + */ + EventStorePersistentSubscriptionBase.prototype.fail = function(events, action, reason) { + ensure.notNull(events, "events"); + PersistentSubscriptionNakEventAction.isValid(action); + ensure.notNull(reason, "reason"); + + if (this._subscription === null) throw new Error("Invalid operation. Subscription is not active yet."); + if (!Array.isArray(events)) + events = [events]; + var ids = events.map(function(x) { return x.originalEvent.eventId; }); + this._subscription.notifyEventsFailed(ids, action, reason); + }; + + EventStorePersistentSubscriptionBase.prototype.stop = function() { + if (this._verbose) this._log.debug("Persistent Subscription to %s: requesting stop...", this._streamId); + this._enqueueSubscriptionDropNotification(SubscriptionDropReason.UserInitiated, null); + //TODO figure out timeout with Promise still running + //if (!_stopped.Wait(timeout)) + //throw new TimeoutException(string.Format("Could not stop {0} in time.", GetType().Name)); + }; + + EventStorePersistentSubscriptionBase.prototype._enqueueSubscriptionDropNotification = function(reason, error) { + // if drop data was already set -- no need to enqueue drop again, somebody did that already + if (!this._dropData) { + this._dropData = {reason: reason, error: error}; + this._enqueue(new DropSubscriptionEvent()); + } + }; + + EventStorePersistentSubscriptionBase.prototype._onSubscriptionDropped = function(subscription, reason, exception) { + this._enqueueSubscriptionDropNotification(reason, exception); + }; + + EventStorePersistentSubscriptionBase.prototype._onEventAppeared = function(subscription, resolvedEvent) { + this._enqueue(resolvedEvent); + }; + + EventStorePersistentSubscriptionBase.prototype._enqueue = function(resolvedEvent) { + this._queue.push(resolvedEvent); + if (!this._isProcessing) { + this._isProcessing = true; + setImmediate(this._processQueue.bind(this)); + } + }; + + EventStorePersistentSubscriptionBase.prototype._processQueue = function() { + //do + //{ + var e = this._queue.shift(); + while (e) + { + if (e instanceof DropSubscriptionEvent) // drop subscription artificial ResolvedEvent + { + if (this._dropData === null) throw new Error("Drop reason not specified."); + this._dropSubscription(this._dropData.reason, this._dropData.error); + return; + } + if (this._dropData !== null) + { + this._dropSubscription(this._dropData.reason, this._dropData.error); + return; + } + try + { + this._eventAppeared(this, e); + if(this._autoAck) + this._subscription.notifyEventsProcessed([e.originalEvent.eventId]); + if (this._verbose) + this._log.debug("Persistent Subscription to %s: processed event (%s, %d, %s @ %d).", + this._streamId, e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, + e.originalEventNumber); + } + catch (err) + { + //TODO GFY should we autonak here? + this._dropSubscription(SubscriptionDropReason.EventHandlerException, err); + return; + } + e = this._queue.shift(); + } + this._isProcessing = false; + //} while (_queue.Count > 0 && Interlocked.CompareExchange(ref _isProcessing, 1, 0) == 0); + }; + + EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reason, error) { + if (!this._isDropped) + { + this._isDropped = true; + if (this._verbose) + this._log.debug("Persistent Subscription to %s: dropping subscription, reason: %s %s.", + this._streamId, reason, error); + + if (this._subscription !== null) + this._subscription.unsubscribe(); + if (this._subscriptionDropped !== null) + this._subscriptionDropped(this, reason, error); + this._stopped = true; + } + }; + + module.exports = EventStorePersistentSubscriptionBase; + + +/***/ }, +/* 57 */ +/***/ function(module, exports) { + + const PersistentSubscriptionNakEventAction = { + Unknown: 0, + Park: 1, + Retry: 2, + Skip: 3, + Stop: 4 + }; + + module.exports = PersistentSubscriptionNakEventAction; + module.exports.isValid = function(value) { + for(var k in PersistentSubscriptionNakEventAction) + if (PersistentSubscriptionNakEventAction[k] === value) return true; + return false; + }; + + +/***/ }, +/* 58 */ /***/ function(module, exports) { module.exports.metastreamOf = function(stream) { @@ -4810,7 +5647,7 @@ module.exports = }; /***/ }, -/* 50 */ +/* 59 */ /***/ function(module, exports) { const SystemEventTypes = { @@ -4821,7 +5658,7 @@ module.exports = /***/ }, -/* 51 */ +/* 60 */ /***/ function(module, exports) { function StaticEndpointDiscoverer(tcpEndPoint, useSsl) { @@ -4838,7 +5675,7 @@ module.exports = module.exports = StaticEndpointDiscoverer; /***/ }, -/* 52 */ +/* 61 */ /***/ function(module, exports) { function NoopLogger() { @@ -4850,7 +5687,7 @@ module.exports = module.exports = NoopLogger; /***/ }, -/* 53 */ +/* 62 */ /***/ function(module, exports, __webpack_require__) { var ensure = __webpack_require__(5); @@ -4874,5 +5711,36 @@ module.exports = module.exports = UserCredentials; +/***/ }, +/* 63 */ +/***/ function(module, exports, __webpack_require__) { + + var SystemConsumerStrategies = __webpack_require__(49); + + function PersistentSubscriptionSettings( + resolveLinkTos, startFrom, extraStatistics, messageTimeout, + maxRetryCount, liveBufferSize, readBatchSize, historyBufferSize, + checkPointAfter, minCheckPointCount, maxCheckPointCount, + maxSubscriberCount, namedConsumerStrategy + ) { + this.resolveLinkTos = resolveLinkTos; + this.startFrom = startFrom; + this.extraStatistics = extraStatistics; + this.messageTimeout = messageTimeout; + this.maxRetryCount = maxRetryCount; + this.liveBufferSize = liveBufferSize; + this.readBatchSize = readBatchSize; + this.historyBufferSize = historyBufferSize; + this.checkPointAfter = checkPointAfter; + this.minCheckPointCount = minCheckPointCount; + this.maxCheckPointCount = maxCheckPointCount; + this.maxSubscriberCount = maxSubscriberCount; + this.namedConsumerStrategy = namedConsumerStrategy; + } + + module.exports.create = function() { + return new PersistentSubscriptionSettings(false, -1, false, 30000, 500, 500, 10, 20, 2000, 10, 1000, 0, SystemConsumerStrategies.RoundRobin); + }; + /***/ } /******/ ]); \ No newline at end of file diff --git a/src/client.js b/src/client.js index 0ff0c30..c401155 100644 --- a/src/client.js +++ b/src/client.js @@ -41,6 +41,8 @@ function eventDataFactory(eventId, type, isJson, data, metadata) { module.exports.EventStoreConnection = require('./eventStoreConnection'); module.exports.UserCredentials = require('./systemData/userCredentials'); module.exports.EventData = EventData; +module.exports.PersistentSubscriptionSettings = require('./persistentSubscriptionSettings'); +module.exports.SystemConsumerStrategies = require('./systemConsumerStrategies'); module.exports.expectedVersion = expectedVersion; module.exports.positions = positions; diff --git a/src/clientOperations/connectToPersistentSubscriptionOperation.js b/src/clientOperations/connectToPersistentSubscriptionOperation.js new file mode 100644 index 0000000..294efdf --- /dev/null +++ b/src/clientOperations/connectToPersistentSubscriptionOperation.js @@ -0,0 +1,122 @@ +var util = require('util'); +var uuid = require('uuid'); + +var SubscriptionOperation = require('./subscriptionOperation'); +var ClientMessage = require('../messages/clientMessage'); +var TcpCommand = require('../systemData/tcpCommand'); +var TcpFlags = require('../systemData/tcpFlags'); +var TcpPackage = require('../systemData/tcpPackage'); +var createBufferSegment = require('../common/bufferSegment'); +var InspectionResult = require('./../systemData/inspectionResult'); +var InspectionDecision = require('../systemData/inspectionDecision'); +var results = require('../results'); +var SubscriptionDropReason = require('../subscriptionDropReason'); +var PersistentEventStoreSubscription = require('../persistentEventStoreSubscription'); +var ensure = require('../common/utils/ensure'); + +function ConnectToPersistentSubscriptionOperation( + log, cb, groupName, bufferSize, streamId, userCredentials, eventAppeared, subscriptionDropped, + verboseLogging, getConnection +) { + SubscriptionOperation.call(this, log, cb, streamId, false, userCredentials, eventAppeared, subscriptionDropped, verboseLogging, getConnection); + + this._groupName = groupName; + this._bufferSize = bufferSize; + this._subscriptionId = null; +} +util.inherits(ConnectToPersistentSubscriptionOperation, SubscriptionOperation); + +ConnectToPersistentSubscriptionOperation.prototype._createSubscriptionPackage = function() { + var dto = new ClientMessage.ConnectToPersistentSubscription(this._groupName, this._streamId, this._bufferSize); + return new TcpPackage(TcpCommand.ConnectToPersistentSubscription, + this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None, + this._correlationId, + this._userCredentials !== null ? this._userCredentials.username : null, + this._userCredentials !== null ? this._userCredentials.password : null, + createBufferSegment(dto.toBuffer())); +}; + +ConnectToPersistentSubscriptionOperation.prototype._inspectPackage = function(pkg) { + if (pkg.command == TcpCommand.PersistentSubscriptionConfirmation) + { + var dto = ClientMessage.PersistentSubscriptionConfirmation.decode(pkg.data.toBuffer()); + this._confirmSubscription(dto.last_commit_position, dto.last_event_number); + this._subscriptionId = dto.subscription_id; + return new InspectionResult(InspectionDecision.Subscribed, "SubscriptionConfirmation"); + } + if (pkg.command == TcpCommand.PersistentSubscriptionStreamEventAppeared) + { + var dto = ClientMessage.PersistentSubscriptionStreamEventAppeared.decode(pkg.data.toBuffer()); + this._onEventAppeared(new results.ResolvedEvent(dto.event)); + return new InspectionResult(InspectionDecision.DoNothing, "StreamEventAppeared"); + } + if (pkg.command == TcpCommand.SubscriptionDropped) + { + var dto = ClientMessage.SubscriptionDropped.decode(pkg.data.toBuffer()); + if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.AccessDenied) + { + this.dropSubscription(SubscriptionDropReason.AccessDenied, new Error("You do not have access to the stream.")); + return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); + } + if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.NotFound) + { + this.dropSubscription(SubscriptionDropReason.NotFound, new Error("Subscription not found")); + return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); + } + if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.PersistentSubscriptionDeleted) + { + this.dropSubscription(SubscriptionDropReason.PersistentSubscriptionDeleted, new Error("Persistent subscription deleted.")); + return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); + } + if (dto.reason == ClientMessage.SubscriptionDropped.SubscriptionDropReason.SubscriberMaxCountReached) + { + this.dropSubscription(SubscriptionDropReason.MaxSubscribersReached, new Error("Maximum subscribers reached.")); + return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); + } + this.dropSubscription(SubscriptionDropReason.UserInitiated, null, this._getConnection()); + return new InspectionResult(InspectionDecision.EndOperation, "SubscriptionDropped"); + } + return null; +}; + +ConnectToPersistentSubscriptionOperation.prototype._createSubscriptionObject = function(lastCommitPosition, lastEventNumber) { + return new PersistentEventStoreSubscription(this, this._streamId, lastCommitPosition, lastEventNumber); +}; + +ConnectToPersistentSubscriptionOperation.prototype.notifyEventsProcessed = function(processedEvents) { + ensure.notNull(processedEvents, "processedEvents"); + var dto = new ClientMessage.PersistentSubscriptionAckEvents({ + subscription_id: this._subscriptionId, + processed_event_ids: processedEvents.map(function (x) { + return new Buffer(uuid.parse(x)); + }) + }); + + var pkg = new TcpPackage(TcpCommand.PersistentSubscriptionAckEvents, + this._userCredentials !== null ? TcpFlags.Authenticated : TcpFlags.None, + this._correlationId, + this._userCredentials !== null ? this._userCredentials.username : null, + this._userCredentials !== null ? this._userCredentials.password : null, + createBufferSegment(dto.encode().toBuffer())); + this._enqueueSend(pkg); +}; + +ConnectToPersistentSubscriptionOperation.prototype.notifyEventsFailed = function(processedEvents, action, reason) { + ensure.notNull(processedEvents, "processedEvents"); + ensure.notNull(reason, "reason"); + var dto = new ClientMessage.PersistentSubscriptionNakEvents( + this._subscriptionId, + processedEvents.map(function(x) { return new Buffer(uuid.parse(x)); }), + reason, + action); + + var pkg = new TcpPackage(TcpCommand.PersistentSubscriptionNakEvents, + this._userCredentials != null ? TcpFlags.Authenticated : TcpFlags.None, + this._correlationId, + this._userCredentials != null ? this._userCredentials.username : null, + this._userCredentials != null ? this._userCredentials.password : null, + createBufferSegment(dto.toBuffer())); + this._enqueueSend(pkg); +}; + +module.exports = ConnectToPersistentSubscriptionOperation; diff --git a/src/clientOperations/createPersistentSubscriptionOperation.js b/src/clientOperations/createPersistentSubscriptionOperation.js new file mode 100644 index 0000000..3944120 --- /dev/null +++ b/src/clientOperations/createPersistentSubscriptionOperation.js @@ -0,0 +1,74 @@ +var util = require('util'); +var uuid = require('uuid'); + +var ensure = require('../common/utils/ensure'); +var OperationBase = require('../clientOperations/operationBase'); +var TcpCommand = require('../systemData/tcpCommand'); +var ClientMessage = require('../messages/clientMessage'); +var SystemConsumerStrategies = require('../systemConsumerStrategies'); +var InspectionDecision = require('../systemData/inspectionDecision'); +var InspectionResult = require('./../systemData/inspectionResult'); +var results = require('../results'); + + +function CreatePersistentSubscriptionOperation(log, cb, stream, groupName, settings, userCredentials) { + OperationBase.call(this, log, cb, TcpCommand.CreatePersistentSubscription, TcpCommand.CreatePersistentSubscriptionCompleted, userCredentials); + + ensure.notNull(settings, "settings"); + this._resolveLinkTos = settings.resolveLinkTos; + this._stream = stream; + this._groupName = groupName; + this._startFromBeginning = settings.startFrom; + this._maxRetryCount = settings.maxRetryCount; + this._liveBufferSize = settings.liveBufferSize; + this._readBatchSize = settings.readBatchSize; + this._bufferSize = settings.historyBufferSize; + this._recordStatistics = settings.extraStatistics; + this._messageTimeoutMilliseconds = settings.messageTimeout; + this._checkPointAfter = settings.checkPointAfter; + this._minCheckPointCount = settings.minCheckPointCount; + this._maxCheckPointCount = settings.maxCheckPointCount; + this._maxSubscriberCount = settings.maxSubscriberCount; + this._namedConsumerStrategy = settings.namedConsumerStrategy; + + this._responseType = ClientMessage.CreatePersistentSubscriptionCompleted; +} +util.inherits(CreatePersistentSubscriptionOperation, OperationBase); + +CreatePersistentSubscriptionOperation.prototype._createRequestDto = function() { + return new ClientMessage.CreatePersistentSubscription(this._groupName, this._stream, this._resolveLinkTos, + this._startFromBeginning, this._messageTimeoutMilliseconds, this._recordStatistics, this._liveBufferSize, + this._readBatchSize, this._bufferSize, this._maxRetryCount, + this._namedConsumerStrategy == SystemConsumerStrategies.RoundRobin, this._checkPointAfter, + this._maxCheckPointCount, this._minCheckPointCount, this._maxSubscriberCount, this._namedConsumerStrategy); +}; + +CreatePersistentSubscriptionOperation.prototype._inspectResponse = function(response) { + switch (response.result) + { + case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.Success: + this._succeed(); + return new InspectionResult(InspectionDecision.EndOperation, "Success"); + case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.Fail: + this.fail(new Error(util.format("Subscription group %s on stream %s failed '%s'", this._groupName, this._stream, response.reason))); + return new InspectionResult(InspectionDecision.EndOperation, "Fail"); + case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.AccessDenied: + this.fail(new Error(util.format("Write access denied for stream '%s'.", this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied"); + case ClientMessage.CreatePersistentSubscriptionCompleted.CreatePersistentSubscriptionResult.AlreadyExists: + this.fail(new Error(util.format("Subscription group %s on stream %s already exists", this._groupName, this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "AlreadyExists"); + default: + throw new Error(util.format("Unexpected OperationResult: %s.", response.result)); + } +}; + +CreatePersistentSubscriptionOperation.prototype._transformResponse = function(response) { + return new results.PersistentSubscriptionCreateResult(results.PersistentSubscriptionCreateStatus.Success); +}; + +CreatePersistentSubscriptionOperation.prototype.toString = function() { + return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName); +}; + +module.exports = CreatePersistentSubscriptionOperation; diff --git a/src/clientOperations/deletePersistentSubscriptionOperation.js b/src/clientOperations/deletePersistentSubscriptionOperation.js new file mode 100644 index 0000000..285a42d --- /dev/null +++ b/src/clientOperations/deletePersistentSubscriptionOperation.js @@ -0,0 +1,55 @@ +var util = require('util'); +var uuid = require('uuid'); + +var ensure = require('../common/utils/ensure'); +var OperationBase = require('../clientOperations/operationBase'); +var TcpCommand = require('../systemData/tcpCommand'); +var ClientMessage = require('../messages/clientMessage'); +var InspectionDecision = require('../systemData/inspectionDecision'); +var InspectionResult = require('./../systemData/inspectionResult'); +var results = require('../results'); + + +function DeletePersistentSubscriptionOperation(log, cb, stream, groupName, userCredentials) { + OperationBase.call(this, log, cb, TcpCommand.DeletePersistentSubscription, TcpCommand.DeletePersistentSubscriptionCompleted, userCredentials); + + this._stream = stream; + this._groupName = groupName; + + this._responseType = ClientMessage.DeletePersistentSubscriptionCompleted; +} +util.inherits(DeletePersistentSubscriptionOperation, OperationBase); + +DeletePersistentSubscriptionOperation.prototype._createRequestDto = function() { + return new ClientMessage.DeletePersistentSubscription(this._groupName, this._stream); +}; + +DeletePersistentSubscriptionOperation.prototype._inspectResponse = function(response) { + switch (response.result) + { + case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.Success: + this._succeed(); + return new InspectionResult(InspectionDecision.EndOperation, "Success"); + case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.Fail: + this.fail(new Error(util.format("Subscription group %s on stream %s failed '%s'", this._groupName, this._stream, response.reason))); + return new InspectionResult(InspectionDecision.EndOperation, "Fail"); + case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.AccessDenied: + this.fail(new Error(util.format("Write access denied for stream '%s'.", this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied"); + case ClientMessage.DeletePersistentSubscriptionCompleted.DeletePersistentSubscriptionResult.DoesNotExist: + this.fail(new Error(util.format("Subscription group %s on stream %s does not exist", this._groupName, this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "DoesNotExist"); + default: + throw new Error(util.format("Unexpected OperationResult: %s.", response.result)); + } +}; + +DeletePersistentSubscriptionOperation.prototype._transformResponse = function(response) { + return new results.PersistentSubscriptionDeleteResult(results.PersistentSubscriptionDeleteStatus.Success); +}; + +DeletePersistentSubscriptionOperation.prototype.toString = function() { + return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName); +}; + +module.exports = DeletePersistentSubscriptionOperation; diff --git a/src/clientOperations/operationBase.js b/src/clientOperations/operationBase.js index 2ab0b9f..1dc9197 100644 --- a/src/clientOperations/operationBase.js +++ b/src/clientOperations/operationBase.js @@ -51,7 +51,7 @@ OperationBase.prototype._succeed = function() { OperationBase.prototype.createNetworkPackage = function(correlationId) { var dto = this._createRequestDto(); - var buf = dto.encode().toBuffer(); + var buf = dto.toBuffer(); return new TcpPackage( this._requestCommand, this.userCredentials ? TcpFlags.Authenticated : TcpFlags.None, diff --git a/src/clientOperations/subscriptionOperation.js b/src/clientOperations/subscriptionOperation.js index 7e05be0..4d0468f 100644 --- a/src/clientOperations/subscriptionOperation.js +++ b/src/clientOperations/subscriptionOperation.js @@ -39,6 +39,10 @@ function SubscriptionOperation( this._actionQueue = []; } +SubscriptionOperation.prototype._enqueueSend = function(pkg) { + this._getConnection().enqueueSend(pkg); +}; + SubscriptionOperation.prototype.subscribe = function(correlationId, connection) { if (connection === null) throw new TypeError("connection is null."); @@ -60,7 +64,7 @@ SubscriptionOperation.prototype.unsubscribe = function() { SubscriptionOperation.prototype._createUnsubscriptionPackage = function() { var msg = new ClientMessage.UnsubscribeFromStream(); - var data = new BufferSegment(msg.encode().toBuffer()); + var data = new BufferSegment(msg.toBuffer()); return new TcpPackage(TcpCommand.UnsubscribeFromStream, TcpFlags.None, this._correlationId, null, null, data); }; @@ -182,11 +186,12 @@ SubscriptionOperation.prototype.dropSubscription = function(reason, err, connect this._log.debug("Subscription %s to %s: closing subscription, reason: %s, exception: %s...", this._correlationId, this._streamId || "", reason, err); - if (reason !== SubscriptionDropReason.UserInitiated) + if (reason !== SubscriptionDropReason.UserInitiated && this._subscription === null) { if (err === null) throw new Error(util.format("No exception provided for subscription drop reason '%s", reason)); //TODO: this should be last thing to execute this._cb(err); + return; } if (reason === SubscriptionDropReason.UserInitiated && this._subscription !== null && connection !== null) @@ -223,7 +228,7 @@ SubscriptionOperation.prototype._onEventAppeared = function(e) { if (this._subscription === null) throw new Error("Subscription not confirmed, but event appeared!"); if (this._verboseLogging) - this._log.debug("Subscription %s to %s: event appeared (%s, %d, %s @ %j).", + this._log.debug("Subscription %s to %s: event appeared (%s, %d, %s @ %s).", this._correlationId, this._streamId || "", e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition); @@ -250,7 +255,7 @@ SubscriptionOperation.prototype._executeActions = function() { } catch (err) { - this._log.error(err, "Exception during executing user callback: %s.", err.Message); + this._log.error(err, "Exception during executing user callback: %s.", err.message); } action = this._actionQueue.shift(); } diff --git a/src/clientOperations/updatePersistentSubscriptionOperation.js b/src/clientOperations/updatePersistentSubscriptionOperation.js new file mode 100644 index 0000000..5b83571 --- /dev/null +++ b/src/clientOperations/updatePersistentSubscriptionOperation.js @@ -0,0 +1,74 @@ +var util = require('util'); +var uuid = require('uuid'); + +var ensure = require('../common/utils/ensure'); +var OperationBase = require('../clientOperations/operationBase'); +var TcpCommand = require('../systemData/tcpCommand'); +var ClientMessage = require('../messages/clientMessage'); +var SystemConsumerStrategies = require('../systemConsumerStrategies'); +var InspectionDecision = require('../systemData/inspectionDecision'); +var InspectionResult = require('./../systemData/inspectionResult'); +var results = require('../results'); + + +function UpdatePersistentSubscriptionOperation(log, cb, stream, groupName, settings, userCredentials) { + OperationBase.call(this, log, cb, TcpCommand.UpdatePersistentSubscription, TcpCommand.UpdatePersistentSubscriptionCompleted, userCredentials); + + ensure.notNull(settings, "settings"); + this._resolveLinkTos = settings.resolveLinkTos; + this._stream = stream; + this._groupName = groupName; + this._startFromBeginning = settings.startFrom; + this._maxRetryCount = settings.maxRetryCount; + this._liveBufferSize = settings.liveBufferSize; + this._readBatchSize = settings.readBatchSize; + this._bufferSize = settings.historyBufferSize; + this._recordStatistics = settings.extraStatistics; + this._messageTimeoutMilliseconds = settings.messageTimeout; + this._checkPointAfter = settings.checkPointAfter; + this._minCheckPointCount = settings.minCheckPointCount; + this._maxCheckPointCount = settings.maxCheckPointCount; + this._maxSubscriberCount = settings.maxSubscriberCount; + this._namedConsumerStrategy = settings.namedConsumerStrategy; + + this._responseType = ClientMessage.UpdatePersistentSubscriptionCompleted; +} +util.inherits(UpdatePersistentSubscriptionOperation, OperationBase); + +UpdatePersistentSubscriptionOperation.prototype._createRequestDto = function() { + return new ClientMessage.UpdatePersistentSubscription(this._groupName, this._stream, this._resolveLinkTos, + this._startFromBeginning, this._messageTimeoutMilliseconds, this._recordStatistics, this._liveBufferSize, + this._readBatchSize, this._bufferSize, this._maxRetryCount, + this._namedConsumerStrategy == SystemConsumerStrategies.RoundRobin, this._checkPointAfter, + this._maxCheckPointCount, this._minCheckPointCount, this._maxSubscriberCount, this._namedConsumerStrategy); +}; + +UpdatePersistentSubscriptionOperation.prototype._inspectResponse = function(response) { + switch (response.result) + { + case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.Success: + this._succeed(); + return new InspectionResult(InspectionDecision.EndOperation, "Success"); + case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.Fail: + this.fail(new Error(util.format("Subscription group %s on stream %s failed '%s'", this._groupName, this._stream, response.reason))); + return new InspectionResult(InspectionDecision.EndOperation, "Fail"); + case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.AccessDenied: + this.fail(new Error(util.format("Write access denied for stream '%s'.", this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied"); + case ClientMessage.UpdatePersistentSubscriptionCompleted.UpdatePersistentSubscriptionResult.DoesNotExist: + this.fail(new Error(util.format("Subscription group %s on stream %s does not exist", this._groupName, this._stream))); + return new InspectionResult(InspectionDecision.EndOperation, "DoesNotExist"); + default: + throw new Error(util.format("Unexpected OperationResult: %s.", response.result)); + } +}; + +UpdatePersistentSubscriptionOperation.prototype._transformResponse = function(response) { + return new results.PersistentSubscriptionUpdateResult(results.PersistentSubscriptionUpdateStatus.Success); +}; + +UpdatePersistentSubscriptionOperation.prototype.toString = function() { + return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName); +}; + +module.exports = UpdatePersistentSubscriptionOperation; diff --git a/src/clientOperations/volatileSubscriptionOperation.js b/src/clientOperations/volatileSubscriptionOperation.js index 99509fe..570ac17 100644 --- a/src/clientOperations/volatileSubscriptionOperation.js +++ b/src/clientOperations/volatileSubscriptionOperation.js @@ -26,7 +26,7 @@ VolatileSubscriptionOperation.prototype._createSubscriptionPackage = function() this._correlationId, this._userCredentials != null ? this._userCredentials.username : null, this._userCredentials != null ? this._userCredentials.password : null, - new BufferSegment(dto.encode().toBuffer())); + new BufferSegment(dto.toBuffer())); }; VolatileSubscriptionOperation.prototype._inspectPackage = function(pkg) { diff --git a/src/common/utils/ensure.js b/src/common/utils/ensure.js index dce0834..9876afe 100644 --- a/src/common/utils/ensure.js +++ b/src/common/utils/ensure.js @@ -4,3 +4,8 @@ module.exports.notNullOrEmpty = function(value, name) { if (value === '') throw new Error(name + " is empty."); }; + +module.exports.notNull = function(value, name) { + if (value === null) + throw new Error(name + " is null."); +}; \ No newline at end of file diff --git a/src/core/eventStoreConnectionLogicHandler.js b/src/core/eventStoreConnectionLogicHandler.js index 4a56818..6feffdd 100644 --- a/src/core/eventStoreConnectionLogicHandler.js +++ b/src/core/eventStoreConnectionLogicHandler.js @@ -7,6 +7,7 @@ var TcpPackageConnection = require('../transport/tcp/tcpPackageConnection'); var OperationsManager = require('./operationsManager'); var SubscriptionsManager = require('./subscriptionsManager'); var VolatileSubscriptionOperation = require('../clientOperations/volatileSubscriptionOperation'); +var ConnectToPersistentSubscriptionOperation = require('../clientOperations/connectToPersistentSubscriptionOperation'); var messages = require('./messages'); var TcpPackage = require('../systemData/tcpPackage'); @@ -70,6 +71,9 @@ function EventStoreConnectionLogicHandler(esConnection, settings) { this._queue.registerHandler(messages.StartSubscriptionMessage, function(msg) { self._startSubscription(msg); }); + this._queue.registerHandler(messages.StartPersistentSubscriptionMessage, function(msg) { + self._startPersistentSubscription(msg); + }); this._queue.registerHandler(messages.EstablishTcpConnectionMessage, function(msg) { self._establishTcpConnection(msg.endPoints); @@ -249,7 +253,7 @@ function createSubscriptionItem(operation, maxRetries, timeout) { isSubscribed: false }; subscriptionItem.toString = (function(){ - return util.format("Subscription %s (%s): %s, is subscribed: %s, retry count: %d, created: %d, last updated: %d", + return util.format("Subscription %s (%s): %s, is subscribed: %s, retry count: %d, created: %s, last updated: %s", this.operation.constructor.name, this.correlationId, this.operation, this.isSubscribed, this.retryCount, new Date(this.createdTime).toISOString().substr(11,12), new Date(this.lastUpdated).toISOString().substr(11,12)); @@ -269,7 +273,9 @@ EventStoreConnectionLogicHandler.prototype._startSubscription = function(msg) { var operation = new VolatileSubscriptionOperation(this._settings.log, msg.cb, msg.streamId, msg.resolveLinkTos, msg.userCredentials, msg.eventAppeared, msg.subscriptionDropped, this._settings.verboseLogging, function() { return self._connection }); - this._logDebug("StartSubscription %s %s, %s, %d, %d.", operation.constructor.name, operation, msg.maxRetries, msg.timeout, this._state === ConnectionState.Connected ? "fire" : "enqueue"); + this._logDebug("StartSubscription %s %s, %s, %d, %d.", + this._state === ConnectionState.Connected ? "fire" : "enqueue", + operation.constructor.name, operation, msg.maxRetries, msg.timeout); var subscription = createSubscriptionItem(operation, msg.maxRetries, msg.timeout); if (this._state === ConnectionState.Connecting) this._subscriptions.enqueueSubscription(subscription); @@ -284,6 +290,34 @@ EventStoreConnectionLogicHandler.prototype._startSubscription = function(msg) { } }; +EventStoreConnectionLogicHandler.prototype._startPersistentSubscription = function(msg) { + var self = this; + switch (this._state) + { + case ConnectionState.Init: + msg.cb(new Error(util.format("EventStoreConnection '%s' is not active.", this._esConnection.connectionName))); + break; + case ConnectionState.Connecting: + case ConnectionState.Connected: + var operation = new ConnectToPersistentSubscriptionOperation(this._settings.log, msg.cb, msg.subscriptionId, + msg.bufferSize, msg.streamId, msg.userCredentials, msg.eventAppeared, msg.subscriptionDropped, + this._settings.verboseLogging, function() { return self._connection }); + this._logDebug("StartSubscription %s %s, %s, %d, %d.", + this._state === ConnectionState.Connected ? "fire" : "enqueue", + operation.constructor.name, operation, msg.maxRetries, msg.timeout); + var subscription = createSubscriptionItem(operation, msg.maxRetries, msg.timeout); + if (this._state === ConnectionState.Connecting) + this._subscriptions.enqueueSubscription(subscription); + else + this._subscriptions.startSubscription(subscription, this._connection); + break; + case ConnectionState.Closed: + msg.cb(new Error("Connection closed. " + this._esConnection.connectionName)); + break; + default: throw new Error(util.format("Unknown state: %s.", this._state)); + } +}; + EventStoreConnectionLogicHandler.prototype._establishTcpConnection = function(endPoints) { var endPoint = this._settings.useSslConnection ? endPoints.secureTcpEndPoint : endPoints.tcpEndPoint; if (endPoint == null) @@ -397,7 +431,7 @@ EventStoreConnectionLogicHandler.prototype._tcpConnectionClosed = function(conne this._state = ConnectionState.Connecting; this._connectingPhase = ConnectingPhase.Reconnecting; - this._logDebug("TCP connection to [%j, L%j, %s] closed.", connection.remoteEndPoint, connection.localEndPoint, connection.connectionId); + this._logDebug("TCP connection to [%j, L%j, %s] closed. %s", connection.remoteEndPoint, connection.localEndPoint, connection.connectionId, error); this._subscriptions.purgeSubscribedAndDroppedSubscriptions(this._connection.connectionId); this._reconnInfo = { diff --git a/src/core/messages.js b/src/core/messages.js index ef9e68b..9c5cedd 100644 --- a/src/core/messages.js +++ b/src/core/messages.js @@ -1,4 +1,5 @@ var util = require('util'); +var ensure = require('../common/utils/ensure'); function Message() { } @@ -76,6 +77,22 @@ util.inherits(TcpConnectionClosedMessage, Message); function TimerTickMessage() {} util.inherits(TimerTickMessage, Message); +function StartPersistentSubscriptionMessage( + cb, subscriptionId, streamId, bufferSize, userCredentials, eventAppeared, subscriptionDropped, + maxRetries, operationTimeout +) { + this.cb = cb; + this.subscriptionId = subscriptionId; + this.streamId = streamId; + this.bufferSize = bufferSize; + this.userCredentials = userCredentials; + this.eventAppeared = eventAppeared; + this.subscriptionDropped = subscriptionDropped; + this.maxRetries = maxRetries; + this.timeout = operationTimeout; +} +util.inherits(StartPersistentSubscriptionMessage, Message); + module.exports = { StartConnectionMessage: StartConnectionMessage, CloseConnectionMessage: CloseConnectionMessage, @@ -86,5 +103,6 @@ module.exports = { TcpConnectionErrorMessage: TcpConnectionErrorMessage, TcpConnectionEstablishedMessage: TcpConnectionEstablishedMessage, TcpConnectionClosedMessage: TcpConnectionClosedMessage, - TimerTickMessage: TimerTickMessage + TimerTickMessage: TimerTickMessage, + StartPersistentSubscriptionMessage: StartPersistentSubscriptionMessage }; diff --git a/src/eventStoreNodeConnection.js b/src/eventStoreNodeConnection.js index df9013a..a7a44fc 100644 --- a/src/eventStoreNodeConnection.js +++ b/src/eventStoreNodeConnection.js @@ -16,10 +16,14 @@ var ReadStreamEventsForwardOperation = require('./clientOperations/readStreamEve var ReadStreamEventsBackwardOperation = require('./clientOperations/readStreamEventsBackwardOperation'); var ReadAllEventsForwardOperation = require('./clientOperations/readAllEventsForwardOperation'); var ReadAllEventsBackwardOperation = require('./clientOperations/readAllEventsBackwardOperation'); +var CreatePersistentSubscriptionOperation = require('./clientOperations/createPersistentSubscriptionOperation'); +var UpdatePersistentSubscriptionOperation = require('./clientOperations/updatePersistentSubscriptionOperation'); +var DeletePersistentSubscriptionOperation = require('./clientOperations/deletePersistentSubscriptionOperation'); var EventStoreTransaction = require('./eventStoreTransaction'); var EventStoreStreamCatchUpSubscription = require('./eventStoreStreamCatchUpSubscription'); var EventStoreAllCatchUpSubscription = require('./eventStoreAllCatchUpSubscription'); +var EventStorePersistentSubscription = require('./eventStorePersistentSubscription'); var results = require('./results'); var systemStreams = require('./common/systemStreams'); @@ -83,7 +87,6 @@ EventStoreNodeConnection.prototype.close = function() { this._handler.enqueueMessage(new messages.CloseConnectionMessage("Connection close requested by client.", null)); }; -// --- Writing --- /** * Delete a stream (async) * @param {string} stream @@ -193,7 +196,6 @@ EventStoreNodeConnection.prototype.commitTransaction = function(transaction, use }); }; -// --- Reading --- /** * Read a single event (async) * @param {string} stream @@ -328,7 +330,6 @@ EventStoreNodeConnection.prototype.readAllEventsBackward = function( }); }; -// --- Subscriptions --- /** * Subscribe to a stream (async) * @param {!string} stream @@ -435,24 +436,99 @@ EventStoreNodeConnection.prototype.subscribeToAllFrom = function( return catchUpSubscription; }; -EventStoreNodeConnection.prototype.connectToPersistentSubscription = function() { - //TODO: connect to persistent subscription - throw new Error("Not implemented."); +/** + * Subscribe to a persistent subscription + * @param {string} stream + * @param {string} groupName + * @param {function} eventAppeared + * @param {function} [subscriptionDropped] + * @param {UserCredentials} [userCredentials] + * @param {number} [bufferSize] + * @param {boolean} [autoAck] + */ +EventStoreNodeConnection.prototype.connectToPersistentSubscription = function( + stream, groupName, eventAppeared, subscriptionDropped, userCredentials, bufferSize, autoAck +) { + ensure.notNullOrEmpty(groupName, "groupName"); + ensure.notNullOrEmpty(stream, "stream"); + ensure.notNull(eventAppeared, "eventAppeared"); + + subscriptionDropped = subscriptionDropped || null; + userCredentials = userCredentials || null; + bufferSize = bufferSize === undefined ? 10 : bufferSize; + autoAck = autoAck === undefined ? true : !!autoAck; + + var subscription = new EventStorePersistentSubscription( + groupName, stream, eventAppeared, subscriptionDropped, userCredentials, this._settings.log, + this._settings.verboseLogging, this._settings, this._handler, bufferSize, autoAck); + subscription.start(); + + return subscription; }; -EventStoreNodeConnection.prototype.createPersistentSubscription = function() { - //TODO: create persistent subscription - throw new Error("Not implemented."); +/** + * @param {string} stream + * @param {string} groupName + * @param {PersistentSubscriptionSettings} settings + * @param {UserCredentials} [userCredentials] + * @returns {Promise.} + */ +EventStoreNodeConnection.prototype.createPersistentSubscription = function(stream, groupName, settings, userCredentials) { + ensure.notNullOrEmpty(stream, "stream"); + ensure.notNullOrEmpty(groupName, "groupName"); + ensure.notNull(settings, "settings"); + + var self = this; + return new Promise(function(resolve, reject){ + function cb(err, result) { + if (err) return reject(err); + resolve(result); + } + self._enqueueOperation( + new CreatePersistentSubscriptionOperation(self._settings.log, cb, stream, groupName, settings, userCredentials || null)); + }); }; -EventStoreNodeConnection.prototype.updatePersistentSubscription = function() { - //TODO: update persistent subscription - throw new Error("Not implemented."); +/** + * @param {string} stream + * @param {string} groupName + * @param {string} settings + * @param {UserCredentials} [userCredentials] + * @returns {Promise.} + */ +EventStoreNodeConnection.prototype.updatePersistentSubscription = function(stream, groupName, settings, userCredentials) { + ensure.notNullOrEmpty(stream, "stream"); + ensure.notNullOrEmpty(groupName, "groupName"); + ensure.notNull(settings, "settings"); + var self = this; + return new Promise(function(resolve, reject) { + function cb(err, result) { + if (err) return reject(err); + resolve(result); + } + self._enqueueOperation( + new UpdatePersistentSubscriptionOperation(self._settings.log, cb, stream, groupName, settings, userCredentials || null)); + }); }; -EventStoreNodeConnection.prototype.deletePersistentSubscription = function() { - //TODO: delete persistent subscription - throw new Error("Not implemented."); +/** + * @param {string} stream + * @param {string} groupName + * @param {UserCredentials} [userCredentials] + * @returns {Promise.} + */ +EventStoreNodeConnection.prototype.deletePersistentSubscription = function(stream, groupName, userCredentials) { + ensure.notNullOrEmpty(stream, "stream"); + ensure.notNullOrEmpty(groupName, "groupName"); + var self = this; + return new Promise(function(resolve, reject) { + function cb(err, result) { + if (err) return reject(err); + resolve(result); + } + self._enqueueOperation( + new DeletePersistentSubscriptionOperation(self._settings.log, cb, stream, groupName, userCredentials || null)); + }); }; EventStoreNodeConnection.prototype.setStreamMetadata = function() { diff --git a/src/eventStorePersistentSubscription.js b/src/eventStorePersistentSubscription.js new file mode 100644 index 0000000..278e761 --- /dev/null +++ b/src/eventStorePersistentSubscription.js @@ -0,0 +1,35 @@ +var util = require('util'); + +var EventStorePersistentSubscriptionBase = require('./eventStorePersistentSubscriptionBase'); +var messages = require('./core/messages'); + +function EventStorePersistentSubscription( + subscriptionId, streamId, eventAppeared, subscriptionDropped, userCredentials, log, verboseLogging, settings, + handler, bufferSize, autoAck +) { + bufferSize = bufferSize === undefined ? 10 : bufferSize; + autoAck = autoAck === undefined ? true : !!autoAck; + + EventStorePersistentSubscriptionBase.call(this, subscriptionId, streamId, eventAppeared, subscriptionDropped, + userCredentials, log, verboseLogging, settings, bufferSize, autoAck); + + this._handler = handler; +} +util.inherits(EventStorePersistentSubscription, EventStorePersistentSubscriptionBase); + +EventStorePersistentSubscription.prototype._startSubscription = function( + subscriptionId, streamId, bufferSize, userCredentials, onEventAppeared, onSubscriptionDropped, settings +) { + var self = this; + return new Promise(function(resolve, reject){ + function cb(err, result) { + if (err) return reject(err); + resolve(result); + } + self._handler.enqueueMessage(new messages.StartPersistentSubscriptionMessage(cb, subscriptionId, streamId, + bufferSize, userCredentials, onEventAppeared, onSubscriptionDropped, settings.maxRetries, + settings.operationTimeout)); + }); +}; + +module.exports = EventStorePersistentSubscription; \ No newline at end of file diff --git a/src/eventStorePersistentSubscriptionBase.js b/src/eventStorePersistentSubscriptionBase.js new file mode 100644 index 0000000..a55cb29 --- /dev/null +++ b/src/eventStorePersistentSubscriptionBase.js @@ -0,0 +1,167 @@ +var util = require('util'); +var ensure = require('./common/utils/ensure'); +var PersistentSubscriptionNakEventAction = require('./persistentSubscriptionNakEventAction'); +var SubscriptionDropReason = require('./subscriptionDropReason'); + +function DropSubscriptionEvent() {} + +function EventStorePersistentSubscriptionBase( + subscriptionId, streamId, + eventAppeared, subscriptionDropped, + userCredentials, log, verboseLogging, settings, bufferSize, autoAck +) { + bufferSize = bufferSize === undefined ? 10 : bufferSize; + autoAck = autoAck === undefined ? true : autoAck; + + this._subscriptionId = subscriptionId; + this._streamId = streamId; + this._eventAppeared = eventAppeared; + this._subscriptionDropped = subscriptionDropped; + this._userCredentials = userCredentials; + this._log = log; + this._verbose = verboseLogging; + this._settings = settings; + this._bufferSize = bufferSize; + this._autoAck = autoAck; + + this._subscription = null; + this._dropData = null; + this._queue = []; + this._isProcessing = false; + this._isDropped = false; +} + +EventStorePersistentSubscriptionBase.prototype.start = function() { + this._stopped = false; + + var self = this; + this._startSubscription(this._subscriptionId, this._streamId, this._bufferSize, this._userCredentials, + this._onEventAppeared.bind(this), this._onSubscriptionDropped.bind(this), this._settings) + .then(function(subscription) { + self._subscription = subscription; + }); +}; + +EventStorePersistentSubscriptionBase.prototype._startSubscription = function() { + throw new Error("EventStorePersistentSubscriptionBase._startSubscription abstract method called." + + this.constructor.name); +}; + +/** + * @param {ResolvedEvent[]|ResolvedEvent} events + */ +EventStorePersistentSubscriptionBase.prototype.acknowledge = function(events) { + ensure.notNull(events, "events"); + + if (this._subscription === null) throw new Error("Invalid operation. Subscription is not active yet."); + if (!Array.isArray(events)) + events = [events]; + var ids = events.map(function(x) { return x.originalEvent.eventId; }); + this._subscription.notifyEventsProcessed(ids); +}; + +/** + * @param {ResolvedEvent[]|ResolvedEvent} events + * @param {number} action One of PersistentSubscriptionNakEventAction + * @param {string} reason + */ +EventStorePersistentSubscriptionBase.prototype.fail = function(events, action, reason) { + ensure.notNull(events, "events"); + PersistentSubscriptionNakEventAction.isValid(action); + ensure.notNull(reason, "reason"); + + if (this._subscription === null) throw new Error("Invalid operation. Subscription is not active yet."); + if (!Array.isArray(events)) + events = [events]; + var ids = events.map(function(x) { return x.originalEvent.eventId; }); + this._subscription.notifyEventsFailed(ids, action, reason); +}; + +EventStorePersistentSubscriptionBase.prototype.stop = function() { + if (this._verbose) this._log.debug("Persistent Subscription to %s: requesting stop...", this._streamId); + this._enqueueSubscriptionDropNotification(SubscriptionDropReason.UserInitiated, null); + //TODO figure out timeout with Promise still running + //if (!_stopped.Wait(timeout)) + //throw new TimeoutException(string.Format("Could not stop {0} in time.", GetType().Name)); +}; + +EventStorePersistentSubscriptionBase.prototype._enqueueSubscriptionDropNotification = function(reason, error) { + // if drop data was already set -- no need to enqueue drop again, somebody did that already + if (!this._dropData) { + this._dropData = {reason: reason, error: error}; + this._enqueue(new DropSubscriptionEvent()); + } +}; + +EventStorePersistentSubscriptionBase.prototype._onSubscriptionDropped = function(subscription, reason, exception) { + this._enqueueSubscriptionDropNotification(reason, exception); +}; + +EventStorePersistentSubscriptionBase.prototype._onEventAppeared = function(subscription, resolvedEvent) { + this._enqueue(resolvedEvent); +}; + +EventStorePersistentSubscriptionBase.prototype._enqueue = function(resolvedEvent) { + this._queue.push(resolvedEvent); + if (!this._isProcessing) { + this._isProcessing = true; + setImmediate(this._processQueue.bind(this)); + } +}; + +EventStorePersistentSubscriptionBase.prototype._processQueue = function() { + //do + //{ + var e = this._queue.shift(); + while (e) + { + if (e instanceof DropSubscriptionEvent) // drop subscription artificial ResolvedEvent + { + if (this._dropData === null) throw new Error("Drop reason not specified."); + this._dropSubscription(this._dropData.reason, this._dropData.error); + return; + } + if (this._dropData !== null) + { + this._dropSubscription(this._dropData.reason, this._dropData.error); + return; + } + try + { + this._eventAppeared(this, e); + if(this._autoAck) + this._subscription.notifyEventsProcessed([e.originalEvent.eventId]); + if (this._verbose) + this._log.debug("Persistent Subscription to %s: processed event (%s, %d, %s @ %d).", + this._streamId, e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, + e.originalEventNumber); + } + catch (err) + { + //TODO GFY should we autonak here? + this._dropSubscription(SubscriptionDropReason.EventHandlerException, err); + return; + } + e = this._queue.shift(); + } + this._isProcessing = false; + //} while (_queue.Count > 0 && Interlocked.CompareExchange(ref _isProcessing, 1, 0) == 0); +}; + +EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reason, error) { + if (!this._isDropped) + { + this._isDropped = true; + if (this._verbose) + this._log.debug("Persistent Subscription to %s: dropping subscription, reason: %s %s.", + this._streamId, reason, error); + + if (this._subscription !== null) + this._subscription.unsubscribe(); + if (this._subscriptionDropped !== null) + this._subscriptionDropped(this, reason, error); + this._stopped = true; + } +}; + +module.exports = EventStorePersistentSubscriptionBase; diff --git a/src/messages/messages.proto b/src/messages/messages.proto index 1cbc99a..e0248f2 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -35,12 +35,12 @@ message EventRecord { } message ResolvedIndexedEvent { - optional EventRecord event = 1; + required EventRecord event = 1; optional EventRecord link = 2; } message ResolvedEvent { - optional EventRecord event = 1; + required EventRecord event = 1; optional EventRecord link = 2; required int64 commit_position = 3; required int64 prepare_position = 4; @@ -195,6 +195,119 @@ message ReadAllEventsCompleted { optional string error = 7; } +message CreatePersistentSubscription { + required string subscription_group_name = 1; + required string event_stream_id = 2; + required bool resolve_link_tos = 3; + required int32 start_from = 4; + required int32 message_timeout_milliseconds = 5; + required bool record_statistics = 6; + required int32 live_buffer_size = 7; + required int32 read_batch_size = 8; + required int32 buffer_size = 9; + required int32 max_retry_count = 10; + required bool prefer_round_robin = 11; + required int32 checkpoint_after_time = 12; + required int32 checkpoint_max_count = 13; + required int32 checkpoint_min_count = 14; + required int32 subscriber_max_count = 15; + optional string named_consumer_strategy = 16; +} + +message DeletePersistentSubscription { + required string subscription_group_name = 1; + required string event_stream_id = 2; +} + +message UpdatePersistentSubscription { + required string subscription_group_name = 1; + required string event_stream_id = 2; + required bool resolve_link_tos = 3; + required int32 start_from = 4; + required int32 message_timeout_milliseconds = 5; + required bool record_statistics = 6; + required int32 live_buffer_size = 7; + required int32 read_batch_size = 8; + required int32 buffer_size = 9; + required int32 max_retry_count = 10; + required bool prefer_round_robin = 11; + required int32 checkpoint_after_time = 12; + required int32 checkpoint_max_count = 13; + required int32 checkpoint_min_count = 14; + required int32 subscriber_max_count = 15; + optional string named_consumer_strategy = 16; +} + +message UpdatePersistentSubscriptionCompleted { + enum UpdatePersistentSubscriptionResult { + Success = 0; + DoesNotExist = 1; + Fail = 2; + AccessDenied=3; + } + required UpdatePersistentSubscriptionResult result = 1 [default = Success]; + optional string reason = 2; +} + +message CreatePersistentSubscriptionCompleted { + enum CreatePersistentSubscriptionResult { + Success = 0; + AlreadyExists = 1; + Fail = 2; + AccessDenied=3; + } + required CreatePersistentSubscriptionResult result = 1 [default = Success]; + optional string reason = 2; +} + +message DeletePersistentSubscriptionCompleted { + enum DeletePersistentSubscriptionResult { + Success = 0; + DoesNotExist = 1; + Fail = 2; + AccessDenied = 3; + } + required DeletePersistentSubscriptionResult result = 1 [default = Success]; + optional string reason = 2; +} + +message ConnectToPersistentSubscription { + required string subscription_id = 1; + required string event_stream_id = 2; + required int32 allowed_in_flight_messages = 3; + +} + +message PersistentSubscriptionAckEvents { + required string subscription_id = 1; + repeated bytes processed_event_ids = 2; +} + +message PersistentSubscriptionNakEvents { + enum NakAction { + Unknown = 0; + Park = 1; + Retry = 2; + Skip = 3; + Stop = 4; + } + + required string subscription_id = 1; + repeated bytes processed_event_ids = 2; + optional string message = 3; + required NakAction action = 4 [default = Unknown]; +} + +message PersistentSubscriptionConfirmation { + required int64 last_commit_position = 1; + required string subscription_id = 2; + optional int32 last_event_number = 3; +} + +message PersistentSubscriptionStreamEventAppeared { + required ResolvedIndexedEvent event = 1; +} + message SubscribeToStream { required string event_stream_id = 1; required bool resolve_link_tos = 2; @@ -217,6 +330,9 @@ message SubscriptionDropped { enum SubscriptionDropReason { Unsubscribed = 0; AccessDenied = 1; + NotFound=2; + PersistentSubscriptionDeleted=3; + SubscriberMaxCountReached=4; } optional SubscriptionDropReason reason = 1 [default = Unsubscribed]; @@ -258,4 +374,4 @@ message ScavengeDatabaseCompleted { optional string error = 2; required int32 total_time_ms = 3; required int64 total_space_saved = 4; -} \ No newline at end of file +} diff --git a/src/persistentEventStoreSubscription.js b/src/persistentEventStoreSubscription.js new file mode 100644 index 0000000..6cb71a0 --- /dev/null +++ b/src/persistentEventStoreSubscription.js @@ -0,0 +1,25 @@ +var util = require('util'); + +var EventStoreSubscription = require('./eventStoreSubscription'); + + +function PersistentEventStoreSubscription(subscriptionOperation, streamId, lastCommitPosition, lastEventNumber) { + EventStoreSubscription.call(this, streamId, lastCommitPosition, lastEventNumber); + + this._subscriptionOperation = subscriptionOperation; +} +util.inherits(PersistentEventStoreSubscription, EventStoreSubscription); + +PersistentEventStoreSubscription.prototype.unsubscribe = function() { + this._subscriptionOperation.unsubscribe(); +}; + +PersistentEventStoreSubscription.prototype.notifyEventsProcessed = function(processedEvents) { + this._subscriptionOperation.notifyEventsProcessed(processedEvents); +}; + +PersistentEventStoreSubscription.prototype.notifyEventsFailed = function(processedEvents, action, reason) { + this._subscriptionOperation.notifyEventsFailed(processedEvents, action, reason); +}; + +module.exports = PersistentEventStoreSubscription; diff --git a/src/persistentSubscriptionNakEventAction.js b/src/persistentSubscriptionNakEventAction.js new file mode 100644 index 0000000..57fe178 --- /dev/null +++ b/src/persistentSubscriptionNakEventAction.js @@ -0,0 +1,14 @@ +const PersistentSubscriptionNakEventAction = { + Unknown: 0, + Park: 1, + Retry: 2, + Skip: 3, + Stop: 4 +}; + +module.exports = PersistentSubscriptionNakEventAction; +module.exports.isValid = function(value) { + for(var k in PersistentSubscriptionNakEventAction) + if (PersistentSubscriptionNakEventAction[k] === value) return true; + return false; +}; diff --git a/src/persistentSubscriptionSettings.js b/src/persistentSubscriptionSettings.js new file mode 100644 index 0000000..9ab5185 --- /dev/null +++ b/src/persistentSubscriptionSettings.js @@ -0,0 +1,26 @@ +var SystemConsumerStrategies = require('./systemConsumerStrategies'); + +function PersistentSubscriptionSettings( + resolveLinkTos, startFrom, extraStatistics, messageTimeout, + maxRetryCount, liveBufferSize, readBatchSize, historyBufferSize, + checkPointAfter, minCheckPointCount, maxCheckPointCount, + maxSubscriberCount, namedConsumerStrategy +) { + this.resolveLinkTos = resolveLinkTos; + this.startFrom = startFrom; + this.extraStatistics = extraStatistics; + this.messageTimeout = messageTimeout; + this.maxRetryCount = maxRetryCount; + this.liveBufferSize = liveBufferSize; + this.readBatchSize = readBatchSize; + this.historyBufferSize = historyBufferSize; + this.checkPointAfter = checkPointAfter; + this.minCheckPointCount = minCheckPointCount; + this.maxCheckPointCount = maxCheckPointCount; + this.maxSubscriberCount = maxSubscriberCount; + this.namedConsumerStrategy = namedConsumerStrategy; +} + +module.exports.create = function() { + return new PersistentSubscriptionSettings(false, -1, false, 30000, 500, 500, 10, 20, 2000, 10, 1000, 0, SystemConsumerStrategies.RoundRobin); +}; \ No newline at end of file diff --git a/src/results.js b/src/results.js index 8db25a3..66e0778 100644 --- a/src/results.js +++ b/src/results.js @@ -285,6 +285,57 @@ function RawStreamMetadataResult(stream, isStreamDeleted, metastreamVersion, str }); } +const PersistentSubscriptionCreateStatus = { + Success: 'success', + NotFound: 'notFound', + Failure: 'failure' +}; + +/** + * @param {string} status + * @constructor + * @property {string} status + */ +function PersistentSubscriptionCreateResult(status) { + Object.defineProperties(this, { + status: {enumerable: true, value: status} + }); +} + +const PersistentSubscriptionUpdateStatus = { + Success: 'success', + NotFound: 'notFound', + Failure: 'failure', + AccessDenied: 'accessDenied' +}; + +/** + * @param {string} status + * @constructor + * @property {string} status + */ +function PersistentSubscriptionUpdateResult(status) { + Object.defineProperties(this, { + status: {enumerable: true, value: status} + }); +} + +const PersistentSubscriptionDeleteStatus = { + Success: 'success', + Failure: 'failure' +}; + +/** + * @param {string} status + * @constructor + * @property {string} status + */ +function PersistentSubscriptionDeleteResult(status) { + Object.defineProperties(this, { + status: {enumerable: true, value: status} + }); +} + // Exports Constructors module.exports.Position = Position; module.exports.toNumber = toNumber; @@ -295,4 +346,10 @@ module.exports.WriteResult = WriteResult; module.exports.StreamEventsSlice = StreamEventsSlice; module.exports.AllEventsSlice = AllEventsSlice; module.exports.DeleteResult = DeleteResult; -module.exports.RawStreamMetadataResult = RawStreamMetadataResult; \ No newline at end of file +module.exports.RawStreamMetadataResult = RawStreamMetadataResult; +module.exports.PersistentSubscriptionCreateResult = PersistentSubscriptionCreateResult; +module.exports.PersistentSubscriptionCreateStatus = PersistentSubscriptionCreateStatus; +module.exports.PersistentSubscriptionUpdateResult = PersistentSubscriptionUpdateResult; +module.exports.PersistentSubscriptionUpdateStatus = PersistentSubscriptionUpdateStatus; +module.exports.PersistentSubscriptionDeleteResult = PersistentSubscriptionDeleteResult; +module.exports.PersistentSubscriptionDeleteStatus = PersistentSubscriptionDeleteStatus; diff --git a/src/subscriptionDropReason.js b/src/subscriptionDropReason.js index 68fc3bb..2aafc92 100644 --- a/src/subscriptionDropReason.js +++ b/src/subscriptionDropReason.js @@ -3,6 +3,9 @@ const SubscriptionDropReason = { CatchUpError: 'catchUpError', ConnectionClosed: 'connectionClosed', EventHandlerException: 'eventHandlerException', + MaxSubscribersReached: 'maxSubscribersReached', + NotFound: 'notFound', + PersistentSubscriptionDeleted: 'persistentSubscriptionDeleted', ProcessingQueueOverflow: 'processingQueueOverflow', ServerError: 'serverError', SubscribingError: 'subscribingError', diff --git a/src/systemConsumerStrategies.js b/src/systemConsumerStrategies.js new file mode 100644 index 0000000..044a5ef --- /dev/null +++ b/src/systemConsumerStrategies.js @@ -0,0 +1,7 @@ +const SystemConsumerStrategies = { + DispatchToSingle: 'DispatchToSingle', + RoundRobin: 'RoundRobin', + Pinned: 'Pinned' +}; + +module.exports = SystemConsumerStrategies; diff --git a/src/systemData/tcpCommand.js b/src/systemData/tcpCommand.js index 23df1f6..7d68b32 100644 --- a/src/systemData/tcpCommand.js +++ b/src/systemData/tcpCommand.js @@ -52,6 +52,17 @@ const TcpCommand = { StreamEventAppeared: 0xC2, UnsubscribeFromStream: 0xC3, SubscriptionDropped: 0xC4, + ConnectToPersistentSubscription: 0xC5, + PersistentSubscriptionConfirmation: 0xC6, + PersistentSubscriptionStreamEventAppeared: 0xC7, + CreatePersistentSubscription: 0xC8, + CreatePersistentSubscriptionCompleted: 0xC9, + DeletePersistentSubscription: 0xCA, + DeletePersistentSubscriptionCompleted: 0xCB, + PersistentSubscriptionAckEvents: 0xCC, + PersistentSubscriptionNakEvents: 0xCD, + UpdatePersistentSubscription: 0xCE, + UpdatePersistentSubscriptionCompleted: 0xCF, ScavengeDatabase: 0xD0, ScavengeDatabaseCompleted: 0xD1, diff --git a/test/client_test.js b/test/client_test.js index a8b8a87..38265df 100644 --- a/test/client_test.js +++ b/test/client_test.js @@ -281,7 +281,36 @@ module.exports = { test.done(err); }); }, - //TODO: Persistent Subscription + 'Test Create Persistent Subscription': function(test) { + var settings = client.PersistentSubscriptionSettings.create(); + this.conn.createPersistentSubscription(testStreamName, 'consumer-1', settings, userCredentialsForAll) + .then(function(result) { + test.done(); + }) + .catch(function(err) { + test.done(err); + }); + }, + //TODO: Update Persistent Subscription + 'Test ConnectTo Persistent Subscription': function(test) { + function eventAppeared(s, e) { + s.stop(); + } + function subscriptionDropped(connection, reason, error) { + test.done(error); + } + var subscription = this.conn.connectToPersistentSubscription(testStreamName, 'consumer-1', eventAppeared, subscriptionDropped); + this.conn.appendToStream(testStreamName, client.expectedVersion.any, [createRandomEvent()]); + }, + 'Test Delete Persistent Subscription': function(test) { + this.conn.deletePersistentSubscription(testStreamName, 'consumer-1', userCredentialsForAll) + .then(function(result) { + test.done(); + }) + .catch(function(err) { + test.done(err); + }); + }, 'Test Delete Stream': function(test) { this.conn.deleteStream(testStreamName, client.expectedVersion.any) .then(function(result) {