Use Long in Stream catchup subscription

Use Long in tests
Bump version to 2.0
This commit is contained in:
2017-11-08 12:45:22 -08:00
parent 4584517ede
commit e79ad8f9c4
12 changed files with 94 additions and 78 deletions

View File

@ -247,7 +247,7 @@ EventStoreNodeConnection.prototype.readEvent = function(stream, eventNumber, res
ensure.notNullOrEmpty(stream, "stream");
ensure.isLongOrInteger(eventNumber, "eventNumber");
eventNumber = Long.fromValue(eventNumber);
resolveLinkTos = !!resolveLinkTos;
resolveLinkTos = Boolean(resolveLinkTos);
userCredentials = userCredentials || null;
if (typeof stream !== 'string' || stream === '') throw new TypeError("stream must be an non-empty string.");
@ -286,7 +286,7 @@ EventStoreNodeConnection.prototype.readStreamEventsForward = function(
ensure.isInteger(count, "count");
ensure.positive(count, "count");
if (count > MaxReadSize) throw new Error(util.format("Count should be less than %d. For larger reads you should page.", MaxReadSize));
resolveLinkTos = !!resolveLinkTos;
resolveLinkTos = Boolean(resolveLinkTos);
userCredentials = userCredentials || null;
var self = this;
@ -320,7 +320,7 @@ EventStoreNodeConnection.prototype.readStreamEventsBackward = function(
ensure.isInteger(count, "count");
ensure.positive(count, "count");
if (count > MaxReadSize) throw new Error(util.format("Count should be less than %d. For larger reads you should page.", MaxReadSize));
resolveLinkTos = !!resolveLinkTos;
resolveLinkTos = Boolean(resolveLinkTos);
userCredentials = userCredentials || null;
var self = this;
@ -351,7 +351,7 @@ EventStoreNodeConnection.prototype.readAllEventsForward = function(
ensure.isInteger(maxCount, "maxCount");
ensure.positive(maxCount, "maxCount");
if (maxCount > MaxReadSize) throw new Error(util.format("Count should be less than %d. For larger reads you should page.", MaxReadSize));
resolveLinkTos = !!resolveLinkTos;
resolveLinkTos = Boolean(resolveLinkTos);
userCredentials = userCredentials || null;
var self = this;
@ -382,7 +382,7 @@ EventStoreNodeConnection.prototype.readAllEventsBackward = function(
ensure.isInteger(maxCount, "maxCount");
ensure.positive(maxCount, "maxCount");
if (maxCount > MaxReadSize) throw new Error(util.format("Count should be less than %d. For larger reads you should page.", MaxReadSize));
resolveLinkTos = !!resolveLinkTos;
resolveLinkTos = Boolean(resolveLinkTos);
userCredentials = userCredentials || null;
var self = this;
@ -432,7 +432,7 @@ EventStoreNodeConnection.prototype.subscribeToStream = function(
* Subscribe to a stream from position
* @public
* @param {!string} stream
* @param {?number} lastCheckpoint
* @param {?number|Position} lastCheckpoint
* @param {!boolean} resolveLinkTos
* @param {!function} eventAppeared
* @param {function} [liveProcessingStarted]
@ -446,7 +446,10 @@ EventStoreNodeConnection.prototype.subscribeToStreamFrom = function(
userCredentials, readBatchSize
) {
if (typeof stream !== 'string' || stream === '') throw new TypeError("stream must be a non-empty string.");
if (lastCheckpoint !== null && typeof lastCheckpoint !== 'number') throw new TypeError("lastCheckpoint must be a number or null.");
if (lastCheckpoint !== null) {
ensure.isLongOrInteger(lastCheckpoint);
lastCheckpoint = Long.fromValue(lastCheckpoint);
}
if (typeof eventAppeared !== 'function') throw new TypeError("eventAppeared must be a function.");
var catchUpSubscription =
@ -536,7 +539,7 @@ EventStoreNodeConnection.prototype.connectToPersistentSubscription = function(
subscriptionDropped = subscriptionDropped || null;
userCredentials = userCredentials || null;
bufferSize = bufferSize === undefined ? 10 : bufferSize;
autoAck = autoAck === undefined ? true : !!autoAck;
autoAck = autoAck === undefined ? true : Boolean(autoAck);
var subscription = new EventStorePersistentSubscription(
groupName, stream, eventAppeared, subscriptionDropped, userCredentials, this._settings.log,
@ -686,9 +689,9 @@ EventStoreNodeConnection.prototype.getStreamMetadataRaw = function(stream, userC
return new results.RawStreamMetadataResult(stream, false, Long.fromValue(version), data);
case results.EventReadStatus.NotFound:
case results.EventReadStatus.NoStream:
return new results.RawStreamMetadataResult(stream, false, -1, null);
return new results.RawStreamMetadataResult(stream, false, Long.fromValue(-1), null);
case results.EventReadStatus.StreamDeleted:
return new results.RawStreamMetadataResult(stream, true, 0x7fffffff, null);
return new results.RawStreamMetadataResult(stream, true, Long.fromValue(0x7fffffff), null);
default:
throw new Error(util.format("Unexpected ReadEventResult: %s.", res.status));
}

View File

@ -1,4 +1,5 @@
var util = require('util');
var Long = require('long');
var EventStoreCatchUpSubscription = require('./eventStoreCatchUpSubscription');
var SliceReadStatus = require('./sliceReadStatus');
@ -14,8 +15,8 @@ function EventStoreStreamCatchUpSubscription(
//Ensure.NotNullOrEmpty(streamId, "streamId");
this._lastProcessedEventNumber = fromEventNumberExclusive === null ? -1 : fromEventNumberExclusive;
this._nextReadEventNumber = fromEventNumberExclusive === null ? 0 : fromEventNumberExclusive + 1;
this._lastProcessedEventNumber = fromEventNumberExclusive === null ? Long.fromNumber(-1) : fromEventNumberExclusive;
this._nextReadEventNumber = fromEventNumberExclusive === null ? Long.fromNumber(0) : fromEventNumberExclusive.add(1);
}
util.inherits(EventStoreStreamCatchUpSubscription, EventStoreCatchUpSubscription);
@ -47,14 +48,14 @@ EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function(
return processEvents(slice.events, 0)
.then(function() {
self._nextReadEventNumber = slice.nextEventNumber;
var done = Promise.resolve(lastEventNumber === null ? slice.isEndOfStream : slice.nextEventNumber > lastEventNumber);
var done = Promise.resolve(lastEventNumber === null ? slice.isEndOfStream : slice.nextEventNumber.compare(lastEventNumber) > 0);
if (!done && slice.isEndOfStream)
return delay(100, false);
return done;
});
break;
case SliceReadStatus.StreamNotFound:
if (lastEventNumber && lastEventNumber !== -1)
if (lastEventNumber && lastEventNumber.compare(-1) !== 0)
throw new Error(util.format("Impossible: stream %s disappeared in the middle of catching up subscription.", self.streamId));
return true;
case SliceReadStatus.StreamDeleted:
@ -80,7 +81,7 @@ EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function(
EventStoreStreamCatchUpSubscription.prototype._tryProcess = function(e) {
var processed = false;
var promise;
if (e.originalEventNumber > this._lastProcessedEventNumber) {
if (e.originalEventNumber.compare(this._lastProcessedEventNumber) > 0) {
promise = this._eventAppeared(this, e);
this._lastProcessedEventNumber = e.originalEventNumber;
processed = true;
@ -88,7 +89,7 @@ EventStoreStreamCatchUpSubscription.prototype._tryProcess = function(e) {
if (this._verbose)
this._log.debug("Catch-up Subscription to %s: %s event (%s, %d, %s @ %d).",
this.isSubscribedToAll ? '<all>' : this.streamId, processed ? "processed" : "skipping",
e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalEventNumber)
e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalEventNumber);
return (promise && promise.then) ? promise : Promise.resolve();
};