node-eventstore-client/test/subscribeToStreamFrom_test.js

137 lines
4.9 KiB
JavaScript
Raw Permalink Normal View History

2016-10-15 05:53:23 +00:00
var uuid = require('uuid');
var client = require('../src/client');
var Long = require('long');
2016-10-15 05:53:23 +00:00
function createRandomEvent() {
return client.createJsonEventData(uuid.v4(), {a: uuid.v4(), b: Math.random()}, {createdAt: Date.now()}, 'testEvent');
}
function delay(ms) {
return new Promise(function (resolve, reject) {
setTimeout(resolve, ms);
})
}
function delayOnlyFirst(count, action) {
if (count === 0) return action();
return delay(200)
.then(function () {
action();
})
}
2016-10-15 05:53:23 +00:00
module.exports = {
2017-06-24 18:32:04 +00:00
'Test Subscribe to Stream From Beginning (null)': function(test) {
2017-10-17 22:55:07 +00:00
test.expect(32);
2016-10-15 05:53:23 +00:00
var self = this;
var liveProcessing = false;
var catchUpEvents = [];
var liveEvents = [];
var _doneCount = 0;
function done(err) {
test.ok(!err, err ? err.stack : '');
if (++_doneCount < 2) return;
test.done();
}
2016-10-15 05:53:23 +00:00
function eventAppeared(s, e) {
var isLive = liveProcessing;
delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() {
if (isLive) {
liveEvents.push(e);
} else {
catchUpEvents.push(e);
}
if (isLive && liveEvents.length === 2) s.stop();
});
2016-10-15 05:53:23 +00:00
}
function liveProcessingStarted() {
liveProcessing = true;
var events = [createRandomEvent(), createRandomEvent()];
self.conn.appendToStream(self.testStreamName, client.expectedVersion.any, events)
.then(function () {
done();
})
.catch(done);
2016-10-15 05:53:23 +00:00
}
function subscriptionDropped(connection, reason, error) {
test.ok(liveEvents.length === 2, "Expecting 2 live event, got " + liveEvents.length);
2017-10-17 22:55:07 +00:00
test.testLiveEvent('liveEvents[0]', liveEvents[0], 2);
test.testLiveEvent('liveEvents[1]', liveEvents[1], 3);
test.ok(catchUpEvents.length === 2, "Expecting 2 catchUp event, got " + catchUpEvents.length);
2017-10-17 22:55:07 +00:00
test.testReadEvent('catchUpEvents[0]', catchUpEvents[0], 0);
test.testReadEvent('catchUpEvents[1]', catchUpEvents[1], 1);
done(error);
2016-10-15 05:53:23 +00:00
}
var events = [createRandomEvent(), createRandomEvent()];
2016-10-15 05:53:23 +00:00
this.conn.appendToStream(self.testStreamName, client.expectedVersion.noStream, events)
.then(function() {
var subscription = self.conn.subscribeToStreamFrom(self.testStreamName, null, false, eventAppeared, liveProcessingStarted, subscriptionDropped);
test.areEqual("subscription.streamId", subscription.streamId, self.testStreamName);
test.areEqual("subscription.isSubscribedToAll", subscription.isSubscribedToAll, false);
test.areEqual("subscription.readBatchSize", subscription.readBatchSize, 500);
test.areEqual("subscription.maxPushQueueSize", subscription.maxPushQueueSize, 10000);
})
.catch(test.done);
2017-06-24 18:32:04 +00:00
},
'Test Subscribe to Stream From 0': function(test) {
2017-10-17 22:55:07 +00:00
test.expect(26);
2017-06-24 18:32:04 +00:00
var self = this;
var liveProcessing = false;
var catchUpEvents = [];
var liveEvents = [];
var _doneCount = 0;
function done(err) {
test.ok(!err, err ? err.stack : '');
if (++_doneCount < 2) return;
test.done();
}
function eventAppeared(s, e) {
var isLive = liveProcessing;
delayOnlyFirst(isLive ? liveEvents.length : catchUpEvents.length, function() {
if (isLive) {
liveEvents.push(e);
} else {
catchUpEvents.push(e);
}
if (isLive && liveEvents.length === 2) s.stop();
});
2017-06-24 18:32:04 +00:00
}
function liveProcessingStarted() {
liveProcessing = true;
var events = [createRandomEvent(), createRandomEvent()];
2017-06-24 18:32:04 +00:00
self.conn.appendToStream(self.testStreamName, client.expectedVersion.any, events)
.then(function () {
done();
})
.catch(done);
}
function subscriptionDropped(connection, reason, error) {
test.ok(liveEvents.length === 2, "Expecting 2 live event, got " + liveEvents.length);
2017-10-17 22:55:07 +00:00
test.testLiveEvent('liveEvents[0]', liveEvents[0], 2);
test.testLiveEvent('liveEvents[1]', liveEvents[1], 3);
2017-06-24 18:32:04 +00:00
test.ok(catchUpEvents.length === 1, "Expecting 1 catchUp event, got " + catchUpEvents.length);
2017-10-17 22:55:07 +00:00
test.testReadEvent('catchUpEvents[0]', catchUpEvents[0], 1);
2017-06-24 18:32:04 +00:00
done(error);
}
var events = [createRandomEvent(), createRandomEvent()];
this.conn.appendToStream(self.testStreamName, client.expectedVersion.noStream, events)
.then(function() {
var subscription = self.conn.subscribeToStreamFrom(self.testStreamName, Long.fromNumber(0), false, eventAppeared, liveProcessingStarted, subscriptionDropped);
2017-06-24 18:32:04 +00:00
test.areEqual("subscription.streamId", subscription.streamId, self.testStreamName);
test.areEqual("subscription.isSubscribedToAll", subscription.isSubscribedToAll, false);
test.areEqual("subscription.readBatchSize", subscription.readBatchSize, 500);
test.areEqual("subscription.maxPushQueueSize", subscription.maxPushQueueSize, 10000);
})
.catch(test.done);
2016-10-15 05:53:23 +00:00
}
};
require('./common/base_test').init(module.exports);