From 7c94e26055093931256f2cd94c2221612ec312e8 Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Sun, 16 Jul 2017 17:11:54 -0700 Subject: [PATCH] handle eventAppeared callback returning Promise --- index.d.ts | 2 +- package.json | 8 +- src/clientOperations/subscriptionOperation.js | 16 ++- src/eventStoreAllCatchUpSubscription.js | 15 +-- src/eventStoreCatchUpSubscription.js | 14 ++- src/eventStorePersistentSubscriptionBase.js | 55 +++++---- src/eventStoreStreamCatchUpSubscription.js | 20 ++-- test/persistentSubscription_test.js | 28 ++++- test/subscribeToAllFrom_test.js | 105 +++++++++++++----- test/subscribeToAll_test.js | 20 +++- test/subscribeToStreamFrom_test.js | 68 ++++++++---- test/subscribeToStream_test.js | 20 +++- 12 files changed, 269 insertions(+), 102 deletions(-) diff --git a/index.d.ts b/index.d.ts index b341d16..b965bd2 100644 --- a/index.d.ts +++ b/index.d.ts @@ -212,7 +212,7 @@ export interface PersistentSubscriptionDeleteResult { // Callbacks export interface EventAppearedCallback { - (subscription: TSubscription, event: ResolvedEvent): void; + (subscription: TSubscription, event: ResolvedEvent): void | Promise; } export interface LiveProcessingStartedCallback { diff --git a/package.json b/package.json index 7e5d020..2f71980 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-eventstore-client", - "version": "0.1.5", + "version": "0.1.7", "description": "A port of the EventStore .Net ClientAPI to Node.js", "main": "index.js", "types": "index.d.ts", @@ -49,8 +49,8 @@ "uuid-parse": "^1.0.0" }, "devDependencies": { - "jsdoc": "^3.4.2", - "nodeunit": "^0.11.0", - "webpack": "^2.4.1" + "jsdoc": "^3.5.3", + "nodeunit": "^0.11.1", + "webpack": "^3.3.0" } } diff --git a/src/clientOperations/subscriptionOperation.js b/src/clientOperations/subscriptionOperation.js index 2a4fed4..dd6a048 100644 --- a/src/clientOperations/subscriptionOperation.js +++ b/src/clientOperations/subscriptionOperation.js @@ -232,7 +232,7 @@ SubscriptionOperation.prototype._onEventAppeared = function(e) { e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition); var self = this; - this._executeAction(function() { self._eventAppeared(self._subscription, e); }); + this._executeAction(function() { return self._eventAppeared(self._subscription, e); }); }; SubscriptionOperation.prototype._executeAction = function(action) { @@ -249,15 +249,25 @@ SubscriptionOperation.prototype._executeActions = function() { this._actionExecuting = false; return; } + var promise; try { - action(); + promise = action(); } catch (err) { this._log.error(err, "Exception during executing user callback: %s.", err.message); } - setImmediate(this._executeActions.bind(this)); + if (promise && promise.then) { + var self = this; + promise + .catch(function (err) { + self._log.error(err, "Exception during executing user callback: %s.", err.message); + }) + .then(this._executeActions.bind(this)); + } else { + setImmediate(this._executeActions.bind(this)); + } }; SubscriptionOperation.prototype.toString = function() { diff --git a/src/eventStoreAllCatchUpSubscription.js b/src/eventStoreAllCatchUpSubscription.js index d855c45..8353ca2 100644 --- a/src/eventStoreAllCatchUpSubscription.js +++ b/src/eventStoreAllCatchUpSubscription.js @@ -24,14 +24,10 @@ EventStoreAllCatchUpSubscription.prototype._readEventsTill = function( var self = this; function processEvents(events, index) { - index = index || 0; if (index >= events.length) return Promise.resolve(); if (events[index].originalPosition === null) throw new Error("Subscription event came up with no OriginalPosition."); - return new Promise(function(resolve, reject) { - self._tryProcess(events[index]); - resolve(); - }) + return self._tryProcess(events[index]) .then(function() { return processEvents(events, index + 1); }); @@ -40,13 +36,12 @@ EventStoreAllCatchUpSubscription.prototype._readEventsTill = function( function readNext() { return connection.readAllEventsForward(self._nextReadPosition, self.readBatchSize, resolveLinkTos, userCredentials) .then(function(slice) { - return processEvents(slice.events) + return processEvents(slice.events, 0) .then(function() { self._nextReadPosition = slice.nextPosition; - var done = lastCommitPosition === null + return (lastCommitPosition === null) ? slice.isEndOfStream : slice.nextPosition.compareTo(new results.Position(lastCommitPosition, lastCommitPosition)) >= 0; - return Promise.resolve(done); }); }) .then(function(done) { @@ -67,9 +62,10 @@ EventStoreAllCatchUpSubscription.prototype._readEventsTill = function( EventStoreAllCatchUpSubscription.prototype._tryProcess = function(e) { var processed = false; + var promise; if (e.originalPosition.compareTo(this._lastProcessedPosition) > 0) { - this._eventAppeared(this, e); + promise = this._eventAppeared(this, e); this._lastProcessedPosition = e.originalPosition; processed = true; } @@ -77,6 +73,7 @@ EventStoreAllCatchUpSubscription.prototype._tryProcess = function(e) { this._log.debug("Catch-up Subscription to %s: %s event (%s, %d, %s @ %s).", this.streamId || '', processed ? "processed" : "skipping", e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalPosition); + return (promise && promise.then) ? promise : Promise.resolve(); }; module.exports = EventStoreAllCatchUpSubscription; diff --git a/src/eventStoreCatchUpSubscription.js b/src/eventStoreCatchUpSubscription.js index c04e572..ddb7099 100644 --- a/src/eventStoreCatchUpSubscription.js +++ b/src/eventStoreCatchUpSubscription.js @@ -219,15 +219,25 @@ EventStoreCatchUpSubscription.prototype._processLiveQueue = function() { this._isProcessing = false; return; } + var promise; try { - this._tryProcess(ev); + promise = this._tryProcess(ev); } catch(err) { this._dropSubscription(SubscriptionDropReason.EventHandlerException, err); this._isProcessing = false; return; } - setImmediate(this._processLiveQueue.bind(this)); + if (promise && promise.then) { + var self = this; + promise + .then(this._processLiveQueue.bind(this), function(err) { + self._dropSubscription(SubscriptionDropReason.EventHandlerException, err); + self._isProcessing = false; + }); + } else { + setImmediate(this._processLiveQueue.bind(this)); + } }; EventStoreCatchUpSubscription.prototype._dropSubscription = function(reason, error) { diff --git a/src/eventStorePersistentSubscriptionBase.js b/src/eventStorePersistentSubscriptionBase.js index 999f877..43f1b07 100644 --- a/src/eventStorePersistentSubscriptionBase.js +++ b/src/eventStorePersistentSubscriptionBase.js @@ -112,6 +112,14 @@ EventStorePersistentSubscriptionBase.prototype._enqueue = function(resolvedEvent } }; +function runAsync(fn) { + try { + return Promise.resolve(fn()); + } catch(e) { + return Promise.reject(e); + } +} + EventStorePersistentSubscriptionBase.prototype._processQueue = function() { var ev = this._queue.shift(); if (!ev) { @@ -132,24 +140,28 @@ EventStorePersistentSubscriptionBase.prototype._processQueue = function() { this._isProcessing = false; return; } - try - { - this._eventAppeared(this, ev); - if(this._autoAck) - this._subscription.notifyEventsProcessed([ev.originalEvent.eventId]); - if (this._verbose) - this._log.debug("Persistent Subscription to %s: processed event (%s, %d, %s @ %d).", - this._streamId, ev.originalEvent.eventStreamId, ev.originalEvent.eventNumber, ev.originalEvent.eventType, + var self = this; + runAsync(function() { + return self._eventAppeared(self, ev); + }) + .then(function() { + if(self._autoAck) + self._subscription.notifyEventsProcessed([ev.originalEvent.eventId]); + if (self._verbose) + self._log.debug("Persistent Subscription to %s: processed event (%s, %d, %s @ %d).", + self._streamId, ev.originalEvent.eventStreamId, ev.originalEvent.eventNumber, ev.originalEvent.eventType, ev.originalEventNumber); - } - catch (err) - { - //TODO GFY should we autonak here? - this._dropSubscription(SubscriptionDropReason.EventHandlerException, err); - this._isProcessing = false; - return; - } - setImmediate(this._processQueue.bind(this)); + return false; + }, function(err) { + //TODO GFY should we autonak here? + self._dropSubscription(SubscriptionDropReason.EventHandlerException, err); + self._isProcessing = false; + return true; + }) + .then(function (faulted) { + if (faulted) return; + self._processQueue(); + }); }; EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reason, error) { @@ -162,8 +174,13 @@ EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reas if (this._subscription !== null) this._subscription.unsubscribe(); - if (this._subscriptionDropped !== null) - this._subscriptionDropped(this, reason, error); + if (this._subscriptionDropped !== null) { + try { + this._subscriptionDropped(this, reason, error); + } catch (e) { + this._log.error(e, "Persistent Subscription to %s: subscriptionDropped callback failed.", this._streamId); + } + } this._stopped = true; } }; diff --git a/src/eventStoreStreamCatchUpSubscription.js b/src/eventStoreStreamCatchUpSubscription.js index 2f0b180..e11fb7f 100644 --- a/src/eventStoreStreamCatchUpSubscription.js +++ b/src/eventStoreStreamCatchUpSubscription.js @@ -19,19 +19,21 @@ function EventStoreStreamCatchUpSubscription( } util.inherits(EventStoreStreamCatchUpSubscription, EventStoreCatchUpSubscription); +function delay(ms, result) { + return new Promise(function (resolve, reject) { + setTimeout(resolve, ms, result); + }) +} + EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function( connection, resolveLinkTos, userCredentials, lastCommitPosition, lastEventNumber ) { var self = this; function processEvents(events, index) { - index = index || 0; if (index >= events.length) return Promise.resolve(); - return new Promise(function(resolve, reject) { - self._tryProcess(events[index]); - resolve(); - }) + return self._tryProcess(events[index]) .then(function() { return processEvents(events, index + 1); }); @@ -42,12 +44,12 @@ EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function( .then(function(slice) { switch(slice.status) { case SliceReadStatus.Success: - return processEvents(slice.events) + return processEvents(slice.events, 0) .then(function() { self._nextReadEventNumber = slice.nextEventNumber; var done = Promise.resolve(lastEventNumber === null ? slice.isEndOfStream : slice.nextEventNumber > lastEventNumber); if (!done && slice.isEndOfStream) - return done.delay(10); + return delay(100, false); return done; }); break; @@ -77,8 +79,9 @@ EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function( EventStoreStreamCatchUpSubscription.prototype._tryProcess = function(e) { var processed = false; + var promise; if (e.originalEventNumber > this._lastProcessedEventNumber) { - this._eventAppeared(this, e); + promise = this._eventAppeared(this, e); this._lastProcessedEventNumber = e.originalEventNumber; processed = true; } @@ -86,6 +89,7 @@ EventStoreStreamCatchUpSubscription.prototype._tryProcess = function(e) { this._log.debug("Catch-up Subscription to %s: %s event (%s, %d, %s @ %d).", this.isSubscribedToAll ? '' : this.streamId, processed ? "processed" : "skipping", e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalEventNumber) + return (promise && promise.then) ? promise : Promise.resolve(); }; diff --git a/test/persistentSubscription_test.js b/test/persistentSubscription_test.js index 85c219e..028759b 100644 --- a/test/persistentSubscription_test.js +++ b/test/persistentSubscription_test.js @@ -9,6 +9,20 @@ function createRandomEvent() { var testStreamName = 'test-' + uuid.v4(); +function delay(ms) { + return new Promise(function (resolve, reject) { + setTimeout(resolve, ms); + }) +} + +function delayOnlyFirst(count, action) { + if (count === 0) return action(); + return delay(200) + .then(function () { + action(); + }) +} + module.exports = { 'Test Create Persistent Subscription': function(test) { var settings = client.PersistentSubscriptionSettings.create(); @@ -22,7 +36,8 @@ module.exports = { }, //TODO: Update Persistent Subscription 'Test ConnectTo Persistent Subscription': function(test) { - test.expect(3); + test.expect(4); + var receivedEvents = []; var _doneCount = 0; function done(err) { test.ok(!err, err ? err.stack : ''); @@ -31,16 +46,21 @@ module.exports = { test.done(); } function eventAppeared(s, e) { - s.stop(); + return delayOnlyFirst(receivedEvents.length, function () { + receivedEvents.push(e); + if (receivedEvents.length === 2) s.stop(); + }); } function subscriptionDropped(connection, reason, error) { - done(error); + if (error) return done(error); + test.ok(receivedEvents[1].originalEventNumber > receivedEvents[0].originalEventNumber, "Received events are out of order."); + done(); } var self = this; this.conn.connectToPersistentSubscription(testStreamName, 'consumer-1', eventAppeared, subscriptionDropped) .then(function(subscription) { test.ok(subscription, "Subscription is null."); - return self.conn.appendToStream(testStreamName, client.expectedVersion.any, [createRandomEvent()]); + return self.conn.appendToStream(testStreamName, client.expectedVersion.any, [createRandomEvent(), createRandomEvent()]); }) .then(function () { done(); diff --git a/test/subscribeToAllFrom_test.js b/test/subscribeToAllFrom_test.js index 6335c87..f819ef5 100644 --- a/test/subscribeToAllFrom_test.js +++ b/test/subscribeToAllFrom_test.js @@ -7,9 +7,23 @@ function createRandomEvent() { return client.createJsonEventData(uuid.v4(), {a: uuid.v4(), b: Math.random()}, {createdAt: Date.now()}, 'testEvent'); } +function delay(ms) { + return new Promise(function (resolve, reject) { + setTimeout(resolve, ms); + }) +} + +function delayOnlyFirst(count, action) { + if (count === 0) return action(); + return delay(200) + .then(function () { + action(); + }) +} + module.exports = { 'Test Subscribe to All From (Start)': function(test) { - test.expect(4); + test.expect(6); var self = this; var liveProcessing = false; var catchUpEvents = []; @@ -19,19 +33,33 @@ module.exports = { test.ok(!err, err ? err.stack : ''); _doneCount++; if (_doneCount < 2) return; + + var catchUpInOrder = true; + for(var i = 1; i < catchUpEvents.length; i++) + catchUpInOrder = catchUpInOrder && (catchUpEvents[i].originalPosition.compareTo(catchUpEvents[i-1].originalPosition) > 0); + test.ok(catchUpInOrder, "Catch-up events are out of order."); + + var liveInOrder = true; + for(var j = 1; j < liveEvents.length; j++) + liveInOrder = liveInOrder && (liveEvents[j].originalPosition.compareTo(liveEvents[j-1].originalPosition) > 0); + test.ok(liveInOrder, "Live events are out of order."); + test.done(); } function eventAppeared(s, e) { - if (liveProcessing) { - liveEvents.push(e); - s.stop(); - } else { - catchUpEvents.push(e); - } + var isLive = liveProcessing; + delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() { + if (isLive) { + liveEvents.push(e); + } else { + catchUpEvents.push(e); + } + if (isLive && liveEvents.length === 2) s.stop(); + }); } function liveProcessingStarted() { liveProcessing = true; - var events = [createRandomEvent()]; + var events = [createRandomEvent(), createRandomEvent()]; self.conn.appendToStream(self.testStreamName, client.expectedVersion.any, events) .then(function () { done(); @@ -39,14 +67,14 @@ module.exports = { .catch(done); } function subscriptionDropped(connection, reason, error) { - test.ok(liveEvents.length === 1, "Expecting 1 live event, got " + liveEvents.length); + test.ok(liveEvents.length === 2, "Expecting 2 live event, got " + liveEvents.length); test.ok(catchUpEvents.length > 1, "Expecting at least 1 catchUp event, got " + catchUpEvents.length); done(error); } var subscription = this.conn.subscribeToAllFrom(null, false, eventAppeared, liveProcessingStarted, subscriptionDropped, allCredentials); }, 'Test Subscribe to All From (Position)': function(test) { - test.expect(5); + test.expect(7); var self = this; var liveProcessing = false; var catchUpEvents = []; @@ -56,19 +84,33 @@ module.exports = { test.ok(!err, err ? err.stack : ''); _doneCount++; if (_doneCount < 2) return; + + var catchUpInOrder = true; + for(var i = 1; i < catchUpEvents.length; i++) + catchUpInOrder = catchUpInOrder && (catchUpEvents[i].originalPosition.compareTo(catchUpEvents[i-1].originalPosition) > 0); + test.ok(catchUpInOrder, "Catch-up events are out of order."); + + var liveInOrder = true; + for(var j = 1; j < liveEvents.length; j++) + liveInOrder = liveInOrder && (liveEvents[j].originalPosition.compareTo(liveEvents[j-1].originalPosition) > 0); + test.ok(liveInOrder, "Live events are out of order."); + test.done(); } function eventAppeared(s, e) { - if (liveProcessing) { - liveEvents.push(e); - s.stop(); - } else { - catchUpEvents.push(e); - } + var isLive = liveProcessing; + delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() { + if (isLive) { + liveEvents.push(e); + } else { + catchUpEvents.push(e); + } + if (isLive && liveEvents.length === 2) s.stop(); + }); } function liveProcessingStarted() { liveProcessing = true; - var events = [createRandomEvent()]; + var events = [createRandomEvent(), createRandomEvent()]; self.conn.appendToStream(self.testStreamName, client.expectedVersion.any, events) .then(function () { done(); @@ -76,7 +118,7 @@ module.exports = { .catch(done); } function subscriptionDropped(connection, reason, error) { - test.ok(liveEvents.length === 1, "Expecting 1 live event, got " + liveEvents.length); + test.ok(liveEvents.length === 2, "Expecting 2 live event, got " + liveEvents.length); test.ok(catchUpEvents.length > 1, "Expecting at least 1 catchUp event, got " + catchUpEvents.length); done(error); } @@ -87,7 +129,7 @@ module.exports = { }); }, 'Test Subscribe to All From (End)': function(test) { - test.expect(5); + test.expect(6); var self = this; var liveProcessing = false; var catchUpEvents = []; @@ -97,19 +139,28 @@ module.exports = { test.ok(!err, err ? err.stack : ''); _doneCount++; if (_doneCount < 2) return; + + var liveInOrder = true; + for(var j = 1; j < liveEvents.length; j++) + liveInOrder = liveInOrder && (liveEvents[j].originalPosition.compareTo(liveEvents[j-1].originalPosition) > 0); + test.ok(liveInOrder, "Live events are out of order."); + test.done(); } function eventAppeared(s, e) { - if (liveProcessing) { - liveEvents.push(e); - s.stop(); - } else { - catchUpEvents.push(e); - } + var isLive = liveProcessing; + delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() { + if (isLive) { + liveEvents.push(e); + } else { + catchUpEvents.push(e); + } + if (isLive && liveEvents.length === 2) s.stop(); + }); } function liveProcessingStarted() { liveProcessing = true; - var events = [createRandomEvent()]; + var events = [createRandomEvent(), createRandomEvent()]; self.conn.appendToStream(self.testStreamName, client.expectedVersion.any, events) .then(function () { done(); @@ -117,7 +168,7 @@ module.exports = { .catch(done); } function subscriptionDropped(connection, reason, error) { - test.ok(liveEvents.length === 1, "Expecting 1 live event, got " + liveEvents.length); + test.ok(liveEvents.length === 2, "Expecting 2 live event, got " + liveEvents.length); test.ok(catchUpEvents.length === 0, "Expecting 0 catchUp event, got " + catchUpEvents.length); done(error); } diff --git a/test/subscribeToAll_test.js b/test/subscribeToAll_test.js index f753123..230bf81 100644 --- a/test/subscribeToAll_test.js +++ b/test/subscribeToAll_test.js @@ -2,6 +2,20 @@ const uuid = require('uuid'); const client = require('../src/client'); const allCredentials = new client.UserCredentials("admin", "changeit"); +function delay(ms) { + return new Promise(function (resolve, reject) { + setTimeout(resolve, ms); + }) +} + +function delayOnlyFirst(count, action) { + if (count === 0) return action(); + return delay(200) + .then(function () { + action(); + }) +} + module.exports = { 'Test Subscribe To All Happy Path': function(test) { const resolveLinkTos = false; @@ -30,8 +44,10 @@ module.exports = { var receivedEvents = []; function eventAppeared(subscription, event) { - receivedEvents.push(event); - if (receivedEvents.length === numberOfPublishedEvents) subscription.close(); + delayOnlyFirst(receivedEvents.length, function() { + receivedEvents.push(event); + if (receivedEvents.length === numberOfPublishedEvents) subscription.close(); + }); } function subscriptionDropped(subscription, reason, error) { if (error) return done(error); diff --git a/test/subscribeToStreamFrom_test.js b/test/subscribeToStreamFrom_test.js index a0b6e87..8e1b78f 100644 --- a/test/subscribeToStreamFrom_test.js +++ b/test/subscribeToStreamFrom_test.js @@ -6,9 +6,23 @@ function createRandomEvent() { return client.createJsonEventData(uuid.v4(), {a: uuid.v4(), b: Math.random()}, {createdAt: Date.now()}, 'testEvent'); } +function delay(ms) { + return new Promise(function (resolve, reject) { + setTimeout(resolve, ms); + }) +} + +function delayOnlyFirst(count, action) { + if (count === 0) return action(); + return delay(200) + .then(function () { + action(); + }) +} + module.exports = { 'Test Subscribe to Stream From Beginning (null)': function(test) { - test.expect(22); + test.expect(36); var self = this; var liveProcessing = false; var catchUpEvents = []; @@ -22,16 +36,19 @@ module.exports = { } function eventAppeared(s, e) { - if (liveProcessing) { - liveEvents.push(e); - s.stop(); - } else { - catchUpEvents.push(e); - } + var isLive = liveProcessing; + delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() { + if (isLive) { + liveEvents.push(e); + } else { + catchUpEvents.push(e); + } + if (isLive && liveEvents.length === 2) s.stop(); + }); } function liveProcessingStarted() { liveProcessing = true; - var events = [createRandomEvent()]; + var events = [createRandomEvent(), createRandomEvent()]; self.conn.appendToStream(self.testStreamName, client.expectedVersion.any, events) .then(function () { done(); @@ -39,16 +56,20 @@ module.exports = { .catch(done); } function subscriptionDropped(connection, reason, error) { - test.ok(liveEvents.length === 1, "Expecting 1 live event, got " + liveEvents.length); + test.ok(liveEvents.length === 2, "Expecting 2 live event, got " + liveEvents.length); test.testLiveEvent('liveEvents[0]', liveEvents[0]); - test.ok(liveEvents[0].originalEventNumber, 1); - test.ok(catchUpEvents.length === 1, "Expecting 1 catchUp event, got " + catchUpEvents.length); + test.testLiveEvent('liveEvents[1]', liveEvents[1]); + test.ok(liveEvents[0].originalEventNumber, 2); + test.ok(liveEvents[1].originalEventNumber, 3); + test.ok(catchUpEvents.length === 2, "Expecting 2 catchUp event, got " + catchUpEvents.length); test.testReadEvent('catchUpEvents[0]', catchUpEvents[0]); + test.testReadEvent('catchUpEvents[1]', catchUpEvents[1]); test.ok(liveEvents[0].originalEventNumber, 0); + test.ok(liveEvents[1].originalEventNumber, 1); done(error); } - var events = [createRandomEvent()]; + var events = [createRandomEvent(), createRandomEvent()]; this.conn.appendToStream(self.testStreamName, client.expectedVersion.noStream, events) .then(function() { var subscription = self.conn.subscribeToStreamFrom(self.testStreamName, null, false, eventAppeared, liveProcessingStarted, subscriptionDropped); @@ -61,7 +82,7 @@ module.exports = { .catch(test.done); }, 'Test Subscribe to Stream From 0': function(test) { - test.expect(22); + test.expect(29); var self = this; var liveProcessing = false; var catchUpEvents = []; @@ -75,16 +96,19 @@ module.exports = { } function eventAppeared(s, e) { - if (liveProcessing) { - liveEvents.push(e); - s.stop(); - } else { - catchUpEvents.push(e); - } + var isLive = liveProcessing; + delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() { + if (isLive) { + liveEvents.push(e); + } else { + catchUpEvents.push(e); + } + if (isLive && liveEvents.length === 2) s.stop(); + }); } function liveProcessingStarted() { liveProcessing = true; - var events = [createRandomEvent()]; + var events = [createRandomEvent(), createRandomEvent()]; self.conn.appendToStream(self.testStreamName, client.expectedVersion.any, events) .then(function () { done(); @@ -92,9 +116,11 @@ module.exports = { .catch(done); } function subscriptionDropped(connection, reason, error) { - test.ok(liveEvents.length === 1, "Expecting 1 live event, got " + liveEvents.length); + test.ok(liveEvents.length === 2, "Expecting 2 live event, got " + liveEvents.length); test.testLiveEvent('liveEvents[0]', liveEvents[0]); + test.testLiveEvent('liveEvents[1]', liveEvents[1]); test.ok(liveEvents[0].originalEventNumber, 2); + test.ok(liveEvents[1].originalEventNumber, 3); test.ok(catchUpEvents.length === 1, "Expecting 1 catchUp event, got " + catchUpEvents.length); test.testReadEvent('catchUpEvents[0]', catchUpEvents[0]); test.ok(liveEvents[0].originalEventNumber, 1); diff --git a/test/subscribeToStream_test.js b/test/subscribeToStream_test.js index 48d917d..4a93255 100644 --- a/test/subscribeToStream_test.js +++ b/test/subscribeToStream_test.js @@ -1,6 +1,20 @@ const uuid = require('uuid'); const client = require('../src/client'); +function delay(ms) { + return new Promise(function (resolve, reject) { + setTimeout(resolve, ms); + }) +} + +function delayOnlyFirst(count, action) { + if (count === 0) return action(); + return delay(200) + .then(function () { + action(); + }) +} + module.exports = { 'Test Subscribe To Stream Happy Path': function(test) { const resolveLinkTos = false; @@ -28,8 +42,10 @@ module.exports = { var receivedEvents = []; function eventAppeared(subscription, event) { - receivedEvents.push(event); - if (receivedEvents.length === numberOfPublishedEvents) subscription.close(); + delayOnlyFirst(receivedEvents.length, function () { + receivedEvents.push(event); + if (receivedEvents.length === numberOfPublishedEvents) subscription.close(); + }); } function subscriptionDropped(subscription, reason, error) { if (error) return done(error);