handle eventAppeared callback returning Promise
This commit is contained in:
@ -232,7 +232,7 @@ SubscriptionOperation.prototype._onEventAppeared = function(e) {
|
||||
e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition);
|
||||
|
||||
var self = this;
|
||||
this._executeAction(function() { self._eventAppeared(self._subscription, e); });
|
||||
this._executeAction(function() { return self._eventAppeared(self._subscription, e); });
|
||||
};
|
||||
|
||||
SubscriptionOperation.prototype._executeAction = function(action) {
|
||||
@ -249,15 +249,25 @@ SubscriptionOperation.prototype._executeActions = function() {
|
||||
this._actionExecuting = false;
|
||||
return;
|
||||
}
|
||||
var promise;
|
||||
try
|
||||
{
|
||||
action();
|
||||
promise = action();
|
||||
}
|
||||
catch (err)
|
||||
{
|
||||
this._log.error(err, "Exception during executing user callback: %s.", err.message);
|
||||
}
|
||||
setImmediate(this._executeActions.bind(this));
|
||||
if (promise && promise.then) {
|
||||
var self = this;
|
||||
promise
|
||||
.catch(function (err) {
|
||||
self._log.error(err, "Exception during executing user callback: %s.", err.message);
|
||||
})
|
||||
.then(this._executeActions.bind(this));
|
||||
} else {
|
||||
setImmediate(this._executeActions.bind(this));
|
||||
}
|
||||
};
|
||||
|
||||
SubscriptionOperation.prototype.toString = function() {
|
||||
|
@ -24,14 +24,10 @@ EventStoreAllCatchUpSubscription.prototype._readEventsTill = function(
|
||||
var self = this;
|
||||
|
||||
function processEvents(events, index) {
|
||||
index = index || 0;
|
||||
if (index >= events.length) return Promise.resolve();
|
||||
if (events[index].originalPosition === null) throw new Error("Subscription event came up with no OriginalPosition.");
|
||||
|
||||
return new Promise(function(resolve, reject) {
|
||||
self._tryProcess(events[index]);
|
||||
resolve();
|
||||
})
|
||||
return self._tryProcess(events[index])
|
||||
.then(function() {
|
||||
return processEvents(events, index + 1);
|
||||
});
|
||||
@ -40,13 +36,12 @@ EventStoreAllCatchUpSubscription.prototype._readEventsTill = function(
|
||||
function readNext() {
|
||||
return connection.readAllEventsForward(self._nextReadPosition, self.readBatchSize, resolveLinkTos, userCredentials)
|
||||
.then(function(slice) {
|
||||
return processEvents(slice.events)
|
||||
return processEvents(slice.events, 0)
|
||||
.then(function() {
|
||||
self._nextReadPosition = slice.nextPosition;
|
||||
var done = lastCommitPosition === null
|
||||
return (lastCommitPosition === null)
|
||||
? slice.isEndOfStream
|
||||
: slice.nextPosition.compareTo(new results.Position(lastCommitPosition, lastCommitPosition)) >= 0;
|
||||
return Promise.resolve(done);
|
||||
});
|
||||
})
|
||||
.then(function(done) {
|
||||
@ -67,9 +62,10 @@ EventStoreAllCatchUpSubscription.prototype._readEventsTill = function(
|
||||
|
||||
EventStoreAllCatchUpSubscription.prototype._tryProcess = function(e) {
|
||||
var processed = false;
|
||||
var promise;
|
||||
if (e.originalPosition.compareTo(this._lastProcessedPosition) > 0)
|
||||
{
|
||||
this._eventAppeared(this, e);
|
||||
promise = this._eventAppeared(this, e);
|
||||
this._lastProcessedPosition = e.originalPosition;
|
||||
processed = true;
|
||||
}
|
||||
@ -77,6 +73,7 @@ EventStoreAllCatchUpSubscription.prototype._tryProcess = function(e) {
|
||||
this._log.debug("Catch-up Subscription to %s: %s event (%s, %d, %s @ %s).",
|
||||
this.streamId || '<all>', processed ? "processed" : "skipping",
|
||||
e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalPosition);
|
||||
return (promise && promise.then) ? promise : Promise.resolve();
|
||||
};
|
||||
|
||||
module.exports = EventStoreAllCatchUpSubscription;
|
||||
|
@ -219,15 +219,25 @@ EventStoreCatchUpSubscription.prototype._processLiveQueue = function() {
|
||||
this._isProcessing = false;
|
||||
return;
|
||||
}
|
||||
var promise;
|
||||
try {
|
||||
this._tryProcess(ev);
|
||||
promise = this._tryProcess(ev);
|
||||
}
|
||||
catch(err) {
|
||||
this._dropSubscription(SubscriptionDropReason.EventHandlerException, err);
|
||||
this._isProcessing = false;
|
||||
return;
|
||||
}
|
||||
setImmediate(this._processLiveQueue.bind(this));
|
||||
if (promise && promise.then) {
|
||||
var self = this;
|
||||
promise
|
||||
.then(this._processLiveQueue.bind(this), function(err) {
|
||||
self._dropSubscription(SubscriptionDropReason.EventHandlerException, err);
|
||||
self._isProcessing = false;
|
||||
});
|
||||
} else {
|
||||
setImmediate(this._processLiveQueue.bind(this));
|
||||
}
|
||||
};
|
||||
|
||||
EventStoreCatchUpSubscription.prototype._dropSubscription = function(reason, error) {
|
||||
|
@ -112,6 +112,14 @@ EventStorePersistentSubscriptionBase.prototype._enqueue = function(resolvedEvent
|
||||
}
|
||||
};
|
||||
|
||||
function runAsync(fn) {
|
||||
try {
|
||||
return Promise.resolve(fn());
|
||||
} catch(e) {
|
||||
return Promise.reject(e);
|
||||
}
|
||||
}
|
||||
|
||||
EventStorePersistentSubscriptionBase.prototype._processQueue = function() {
|
||||
var ev = this._queue.shift();
|
||||
if (!ev) {
|
||||
@ -132,24 +140,28 @@ EventStorePersistentSubscriptionBase.prototype._processQueue = function() {
|
||||
this._isProcessing = false;
|
||||
return;
|
||||
}
|
||||
try
|
||||
{
|
||||
this._eventAppeared(this, ev);
|
||||
if(this._autoAck)
|
||||
this._subscription.notifyEventsProcessed([ev.originalEvent.eventId]);
|
||||
if (this._verbose)
|
||||
this._log.debug("Persistent Subscription to %s: processed event (%s, %d, %s @ %d).",
|
||||
this._streamId, ev.originalEvent.eventStreamId, ev.originalEvent.eventNumber, ev.originalEvent.eventType,
|
||||
var self = this;
|
||||
runAsync(function() {
|
||||
return self._eventAppeared(self, ev);
|
||||
})
|
||||
.then(function() {
|
||||
if(self._autoAck)
|
||||
self._subscription.notifyEventsProcessed([ev.originalEvent.eventId]);
|
||||
if (self._verbose)
|
||||
self._log.debug("Persistent Subscription to %s: processed event (%s, %d, %s @ %d).",
|
||||
self._streamId, ev.originalEvent.eventStreamId, ev.originalEvent.eventNumber, ev.originalEvent.eventType,
|
||||
ev.originalEventNumber);
|
||||
}
|
||||
catch (err)
|
||||
{
|
||||
//TODO GFY should we autonak here?
|
||||
this._dropSubscription(SubscriptionDropReason.EventHandlerException, err);
|
||||
this._isProcessing = false;
|
||||
return;
|
||||
}
|
||||
setImmediate(this._processQueue.bind(this));
|
||||
return false;
|
||||
}, function(err) {
|
||||
//TODO GFY should we autonak here?
|
||||
self._dropSubscription(SubscriptionDropReason.EventHandlerException, err);
|
||||
self._isProcessing = false;
|
||||
return true;
|
||||
})
|
||||
.then(function (faulted) {
|
||||
if (faulted) return;
|
||||
self._processQueue();
|
||||
});
|
||||
};
|
||||
|
||||
EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reason, error) {
|
||||
@ -162,8 +174,13 @@ EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reas
|
||||
|
||||
if (this._subscription !== null)
|
||||
this._subscription.unsubscribe();
|
||||
if (this._subscriptionDropped !== null)
|
||||
this._subscriptionDropped(this, reason, error);
|
||||
if (this._subscriptionDropped !== null) {
|
||||
try {
|
||||
this._subscriptionDropped(this, reason, error);
|
||||
} catch (e) {
|
||||
this._log.error(e, "Persistent Subscription to %s: subscriptionDropped callback failed.", this._streamId);
|
||||
}
|
||||
}
|
||||
this._stopped = true;
|
||||
}
|
||||
};
|
||||
|
@ -19,19 +19,21 @@ function EventStoreStreamCatchUpSubscription(
|
||||
}
|
||||
util.inherits(EventStoreStreamCatchUpSubscription, EventStoreCatchUpSubscription);
|
||||
|
||||
function delay(ms, result) {
|
||||
return new Promise(function (resolve, reject) {
|
||||
setTimeout(resolve, ms, result);
|
||||
})
|
||||
}
|
||||
|
||||
EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function(
|
||||
connection, resolveLinkTos, userCredentials, lastCommitPosition, lastEventNumber
|
||||
) {
|
||||
var self = this;
|
||||
|
||||
function processEvents(events, index) {
|
||||
index = index || 0;
|
||||
if (index >= events.length) return Promise.resolve();
|
||||
|
||||
return new Promise(function(resolve, reject) {
|
||||
self._tryProcess(events[index]);
|
||||
resolve();
|
||||
})
|
||||
return self._tryProcess(events[index])
|
||||
.then(function() {
|
||||
return processEvents(events, index + 1);
|
||||
});
|
||||
@ -42,12 +44,12 @@ EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function(
|
||||
.then(function(slice) {
|
||||
switch(slice.status) {
|
||||
case SliceReadStatus.Success:
|
||||
return processEvents(slice.events)
|
||||
return processEvents(slice.events, 0)
|
||||
.then(function() {
|
||||
self._nextReadEventNumber = slice.nextEventNumber;
|
||||
var done = Promise.resolve(lastEventNumber === null ? slice.isEndOfStream : slice.nextEventNumber > lastEventNumber);
|
||||
if (!done && slice.isEndOfStream)
|
||||
return done.delay(10);
|
||||
return delay(100, false);
|
||||
return done;
|
||||
});
|
||||
break;
|
||||
@ -77,8 +79,9 @@ EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function(
|
||||
|
||||
EventStoreStreamCatchUpSubscription.prototype._tryProcess = function(e) {
|
||||
var processed = false;
|
||||
var promise;
|
||||
if (e.originalEventNumber > this._lastProcessedEventNumber) {
|
||||
this._eventAppeared(this, e);
|
||||
promise = this._eventAppeared(this, e);
|
||||
this._lastProcessedEventNumber = e.originalEventNumber;
|
||||
processed = true;
|
||||
}
|
||||
@ -86,6 +89,7 @@ EventStoreStreamCatchUpSubscription.prototype._tryProcess = function(e) {
|
||||
this._log.debug("Catch-up Subscription to %s: %s event (%s, %d, %s @ %d).",
|
||||
this.isSubscribedToAll ? '<all>' : this.streamId, processed ? "processed" : "skipping",
|
||||
e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalEventNumber)
|
||||
return (promise && promise.then) ? promise : Promise.resolve();
|
||||
};
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user