diff --git a/package.json b/package.json index 850d666..5f6c5e4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-eventstore-client", - "version": "0.1.3", + "version": "0.1.4", "description": "A port of the EventStore .Net ClientAPI to Node.js", "main": "index.js", "types": "index.d.ts", diff --git a/src/eventStoreNodeConnection.js b/src/eventStoreNodeConnection.js index 8395ba7..d2fedc6 100644 --- a/src/eventStoreNodeConnection.js +++ b/src/eventStoreNodeConnection.js @@ -441,6 +441,7 @@ EventStoreNodeConnection.prototype.subscribeToStreamFrom = function( userCredentials, readBatchSize ) { if (typeof stream !== 'string' || stream === '') throw new TypeError("stream must be a non-empty string."); + if (lastCheckpoint !== null && typeof lastCheckpoint !== 'number') throw new TypeError("lastCheckpoint must be a number or null."); if (typeof eventAppeared !== 'function') throw new TypeError("eventAppeared must be a function."); var catchUpSubscription = diff --git a/src/eventStoreStreamCatchUpSubscription.js b/src/eventStoreStreamCatchUpSubscription.js index 2a6a916..2f0b180 100644 --- a/src/eventStoreStreamCatchUpSubscription.js +++ b/src/eventStoreStreamCatchUpSubscription.js @@ -14,8 +14,8 @@ function EventStoreStreamCatchUpSubscription( //Ensure.NotNullOrEmpty(streamId, "streamId"); - this._lastProcessedEventNumber = fromEventNumberExclusive || -1; - this._nextReadEventNumber = fromEventNumberExclusive || 0; + this._lastProcessedEventNumber = fromEventNumberExclusive === null ? -1 : fromEventNumberExclusive; + this._nextReadEventNumber = fromEventNumberExclusive === null ? 0 : fromEventNumberExclusive + 1; } util.inherits(EventStoreStreamCatchUpSubscription, EventStoreCatchUpSubscription); diff --git a/test/subscribeToStreamFrom_test.js b/test/subscribeToStreamFrom_test.js index ee71eb2..a0b6e87 100644 --- a/test/subscribeToStreamFrom_test.js +++ b/test/subscribeToStreamFrom_test.js @@ -7,8 +7,8 @@ function createRandomEvent() { } module.exports = { - 'Test Subscribe to Stream From Happy Path': function(test) { - test.expect(20); + 'Test Subscribe to Stream From Beginning (null)': function(test) { + test.expect(22); var self = this; var liveProcessing = false; var catchUpEvents = []; @@ -41,8 +41,10 @@ module.exports = { function subscriptionDropped(connection, reason, error) { test.ok(liveEvents.length === 1, "Expecting 1 live event, got " + liveEvents.length); test.testLiveEvent('liveEvents[0]', liveEvents[0]); + test.ok(liveEvents[0].originalEventNumber, 1); test.ok(catchUpEvents.length === 1, "Expecting 1 catchUp event, got " + catchUpEvents.length); test.testReadEvent('catchUpEvents[0]', catchUpEvents[0]); + test.ok(liveEvents[0].originalEventNumber, 0); done(error); } @@ -57,6 +59,59 @@ module.exports = { test.areEqual("subscription.maxPushQueueSize", subscription.maxPushQueueSize, 10000); }) .catch(test.done); + }, + 'Test Subscribe to Stream From 0': function(test) { + test.expect(22); + var self = this; + var liveProcessing = false; + var catchUpEvents = []; + var liveEvents = []; + var _doneCount = 0; + + function done(err) { + test.ok(!err, err ? err.stack : ''); + if (++_doneCount < 2) return; + test.done(); + } + + function eventAppeared(s, e) { + if (liveProcessing) { + liveEvents.push(e); + s.stop(); + } else { + catchUpEvents.push(e); + } + } + function liveProcessingStarted() { + liveProcessing = true; + var events = [createRandomEvent()]; + self.conn.appendToStream(self.testStreamName, client.expectedVersion.any, events) + .then(function () { + done(); + }) + .catch(done); + } + function subscriptionDropped(connection, reason, error) { + test.ok(liveEvents.length === 1, "Expecting 1 live event, got " + liveEvents.length); + test.testLiveEvent('liveEvents[0]', liveEvents[0]); + test.ok(liveEvents[0].originalEventNumber, 2); + test.ok(catchUpEvents.length === 1, "Expecting 1 catchUp event, got " + catchUpEvents.length); + test.testReadEvent('catchUpEvents[0]', catchUpEvents[0]); + test.ok(liveEvents[0].originalEventNumber, 1); + done(error); + } + + var events = [createRandomEvent(), createRandomEvent()]; + this.conn.appendToStream(self.testStreamName, client.expectedVersion.noStream, events) + .then(function() { + var subscription = self.conn.subscribeToStreamFrom(self.testStreamName, 0, false, eventAppeared, liveProcessingStarted, subscriptionDropped); + + test.areEqual("subscription.streamId", subscription.streamId, self.testStreamName); + test.areEqual("subscription.isSubscribedToAll", subscription.isSubscribedToAll, false); + test.areEqual("subscription.readBatchSize", subscription.readBatchSize, 500); + test.areEqual("subscription.maxPushQueueSize", subscription.maxPushQueueSize, 10000); + }) + .catch(test.done); } };