Adding transaction tests
This commit is contained in:
parent
a64dbc9b8e
commit
b2504749ce
682
lib/dist.js
682
lib/dist.js
File diff suppressed because it is too large
Load Diff
@ -6,6 +6,9 @@ var InspectionDecision = require('../systemData/inspectionDecision');
|
||||
var InspectionResult = require('./../systemData/inspectionResult');
|
||||
var ClientMessage = require('../messages/clientMessage');
|
||||
var results = require('../results');
|
||||
var WrongExpectedVersionError = require('../errors/wrongExpectedVersionError');
|
||||
var StreamDeletedError = require('../errors/streamDeletedError');
|
||||
var AccessDeniedError = require('../errors/accessDeniedError');
|
||||
|
||||
var OperationBase = require('../clientOperations/operationBase');
|
||||
|
||||
@ -36,17 +39,16 @@ CommitTransactionOperation.prototype._inspectResponse = function(response) {
|
||||
case ClientMessage.OperationResult.ForwardTimeout:
|
||||
return new InspectionResult(InspectionDecision.Retry, "ForwardTimeout");
|
||||
case ClientMessage.OperationResult.WrongExpectedVersion:
|
||||
var err = util.format("Commit transaction failed due to WrongExpectedVersion. TransactionID: %d.", this._transactionId);
|
||||
this.fail(new Error(err));
|
||||
this.fail(new WrongExpectedVersionError("Commit", this._transactionId));
|
||||
return new InspectionResult(InspectionDecision.EndOperation, "WrongExpectedVersion");
|
||||
case ClientMessage.OperationResult.StreamDeleted:
|
||||
this.fail(new Error("Stream deleted."));
|
||||
this.fail(new StreamDeletedError(this._transactionId));
|
||||
return new InspectionResult(InspectionDecision.EndOperation, "StreamDeleted");
|
||||
case ClientMessage.OperationResult.InvalidTransaction:
|
||||
this.fail(new Error("Invalid transaction."));
|
||||
return new InspectionResult(InspectionDecision.EndOperation, "InvalidTransaction");
|
||||
case ClientMessage.OperationResult.AccessDenied:
|
||||
this.fail(new Error("Write access denied."));
|
||||
this.fail(new AccessDeniedError("Write", this._transactionId));
|
||||
return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied");
|
||||
default:
|
||||
throw new Error(util.format("Unexpected OperationResult: %s.", response.result));
|
||||
|
@ -7,7 +7,9 @@ var InspectionResult = require('./../systemData/inspectionResult');
|
||||
var ClientMessage = require('../messages/clientMessage');
|
||||
var EventStoreTransaction = require('../eventStoreTransaction');
|
||||
var results = require('../results');
|
||||
|
||||
var AccessDeniedError = require('../errors/accessDeniedError');
|
||||
var WrongExpectedVersionError = require('../errors/wrongExpectedVersionError');
|
||||
var StreamDeletedError = require('../errors/streamDeletedError');
|
||||
var OperationBase = require('../clientOperations/operationBase');
|
||||
|
||||
function StartTransactionOperation(log, cb, requireMaster, stream, expectedVersion, parentConnection, userCredentials) {
|
||||
@ -38,17 +40,16 @@ StartTransactionOperation.prototype._inspectResponse = function(response) {
|
||||
case ClientMessage.OperationResult.ForwardTimeout:
|
||||
return new InspectionResult(InspectionDecision.Retry, "ForwardTimeout");
|
||||
case ClientMessage.OperationResult.WrongExpectedVersion:
|
||||
var err = util.format("Start transaction failed due to WrongExpectedVersion. Stream: %s, Expected version: %d.", this._stream, this._expectedVersion);
|
||||
this.fail(new Error(err));
|
||||
this.fail(new WrongExpectedVersionError("Start transaction", this._stream, this._expectedVersion));
|
||||
return new InspectionResult(InspectionDecision.EndOperation, "WrongExpectedVersion");
|
||||
case ClientMessage.OperationResult.StreamDeleted:
|
||||
this.fail(new Error("Stream deleted: " + this._stream));
|
||||
this.fail(new StreamDeletedError(this._stream));
|
||||
return new InspectionResult(InspectionDecision.EndOperation, "StreamDeleted");
|
||||
case ClientMessage.OperationResult.InvalidTransaction:
|
||||
this.fail(new Error("Invalid transaction."));
|
||||
return new InspectionResult(InspectionDecision.EndOperation, "InvalidTransaction");
|
||||
case ClientMessage.OperationResult.AccessDenied:
|
||||
this.fail(new Error(util.format("Write access denied for stream '%s'.", this._stream)));
|
||||
this.fail(new AccessDeniedError("Write", this._stream));
|
||||
return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied");
|
||||
default:
|
||||
throw new Error(util.format("Unexpected OperationResult: %s.", response.result));
|
||||
|
@ -5,6 +5,7 @@ var TcpCommand = require('../systemData/tcpCommand');
|
||||
var InspectionDecision = require('../systemData/inspectionDecision');
|
||||
var InspectionResult = require('./../systemData/inspectionResult');
|
||||
var ClientMessage = require('../messages/clientMessage');
|
||||
var AccessDeniedError = require('../errors/accessDeniedError');
|
||||
|
||||
var OperationBase = require('../clientOperations/operationBase');
|
||||
|
||||
@ -43,7 +44,7 @@ TransactionalWriteOperation.prototype._inspectResponse = function(response) {
|
||||
case ClientMessage.OperationResult.ForwardTimeout:
|
||||
return new InspectionResult(InspectionDecision.Retry, "ForwardTimeout");
|
||||
case ClientMessage.OperationResult.AccessDenied:
|
||||
this.fail(new Error("Write access denied."));
|
||||
this.fail(new AccessDeniedError("Write", "trx:" + this._transactionId));
|
||||
return new InspectionResult(InspectionDecision.EndOperation, "AccessDenied");
|
||||
default:
|
||||
throw new Error(util.format("Unexpected OperationResult: %s.", response.result));
|
||||
|
@ -19,12 +19,12 @@ module.exports.isArrayOf = function(expectedType, value, name) {
|
||||
if (!Array.isArray(value))
|
||||
throw new TypeError(name + " should be an array.");
|
||||
if (!value.every(function(x) { return x instanceof expectedType; }))
|
||||
throw new TypeError([name, " should be an array of ", expectedType, "."].join(""));
|
||||
throw new TypeError([name, " should be an array of ", expectedType.name, "."].join(""));
|
||||
};
|
||||
|
||||
module.exports.isTypeOf = function(expectedType, value, name) {
|
||||
if (!(value instanceof expectedType))
|
||||
throw new TypeError([name, " should be of type '", expectedType, "'."].join(""));
|
||||
throw new TypeError([name, " should be of type '", expectedType.name, "'."].join(""));
|
||||
};
|
||||
|
||||
module.exports.positive = function(value, name) {
|
||||
|
@ -1,11 +1,21 @@
|
||||
var util = require('util');
|
||||
var Long = require('long');
|
||||
|
||||
function AccessDeniedError(action, stream) {
|
||||
function AccessDeniedError(action, streamOrTransactionId) {
|
||||
Error.captureStackTrace(this, this.constructor);
|
||||
this.name = this.constructor.name;
|
||||
this.message = util.format("%s access denied for stream '%s'.", action, stream);
|
||||
this.action = action;
|
||||
this.stream = stream;
|
||||
if (typeof streamOrTransactionId === 'string') {
|
||||
this.message = util.format("%s access denied for stream '%s'.", action, streamOrTransactionId);
|
||||
this.stream = streamOrTransactionId;
|
||||
return;
|
||||
}
|
||||
if (Long.isLong(streamOrTransactionId)) {
|
||||
this.message = util.format("%s access denied for transaction %s.", action, streamOrTransactionId);
|
||||
this.transactionId = streamOrTransactionId;
|
||||
return;
|
||||
}
|
||||
throw new TypeError("second argument must be a stream name or transaction Id.");
|
||||
}
|
||||
util.inherits(AccessDeniedError, Error);
|
||||
|
||||
|
@ -1,10 +1,20 @@
|
||||
var util = require('util');
|
||||
var Long = require('long');
|
||||
|
||||
function StreamDeletedError(stream) {
|
||||
function StreamDeletedError(streamOrTransactionId) {
|
||||
Error.captureStackTrace(this, this.constructor);
|
||||
this.name = this.constructor.name;
|
||||
this.message = util.format("Event stream '%s' is deleted.", stream);
|
||||
this.stream = stream;
|
||||
if (typeof streamOrTransactionId === 'string') {
|
||||
this.message = util.format("Event stream '%s' is deleted.", streamOrTransactionId);
|
||||
this.stream = streamOrTransactionId;
|
||||
return;
|
||||
}
|
||||
if (Long.isLong(streamOrTransactionId)) {
|
||||
this.message = util.format("Stream is deleted for transaction %s.", streamOrTransactionId);
|
||||
this.transactionId = streamOrTransactionId;
|
||||
return;
|
||||
}
|
||||
throw new TypeError("second argument must be a stream name or transaction Id.");
|
||||
}
|
||||
util.inherits(StreamDeletedError, Error);
|
||||
|
||||
|
@ -1,12 +1,22 @@
|
||||
var util = require('util');
|
||||
var Long = require('long');
|
||||
|
||||
function WrongExpectedVersionError(action, stream, expectedVersion) {
|
||||
function WrongExpectedVersionError(action, streamOrTransactionId, expectedVersion) {
|
||||
Error.captureStackTrace(this, this.constructor);
|
||||
this.name = this.constructor.name;
|
||||
this.message = util.format("%s failed due to WrongExpectedVersion. Stream: %s Expected version: %d.", action, stream, expectedVersion);
|
||||
this.action = action;
|
||||
this.stream = stream;
|
||||
this.expectedVersion = expectedVersion;
|
||||
if (typeof streamOrTransactionId === 'string') {
|
||||
this.message = util.format("%s failed due to WrongExpectedVersion. Stream: %s Expected version: %d.", action, streamOrTransactionId, expectedVersion);
|
||||
this.stream = streamOrTransactionId;
|
||||
this.expectedVersion = expectedVersion;
|
||||
return;
|
||||
}
|
||||
if (Long.isLong(streamOrTransactionId)) {
|
||||
this.message = util.format("%s transaction failed due to WrongExpectedVersion. Transaction Id: %s.", action, streamOrTransactionId);
|
||||
this.transactionId = streamOrTransactionId;
|
||||
return;
|
||||
}
|
||||
throw new TypeError("second argument must be a stream name or a transaction Id.");
|
||||
}
|
||||
util.inherits(WrongExpectedVersionError, Error);
|
||||
|
||||
|
@ -152,7 +152,10 @@ EventStoreNodeConnection.prototype.appendToStream = function(stream, expectedVer
|
||||
* @returns {Promise.<EventStoreTransaction>}
|
||||
*/
|
||||
EventStoreNodeConnection.prototype.startTransaction = function(stream, expectedVersion, userCredentials) {
|
||||
//TODO validations
|
||||
ensure.notNullOrEmpty(stream, "stream");
|
||||
ensure.isInteger(expectedVersion, "expectedVersion");
|
||||
userCredentials = userCredentials || null;
|
||||
|
||||
var self = this;
|
||||
return new Promise(function(resolve, reject) {
|
||||
function cb(err, result) {
|
||||
@ -160,7 +163,7 @@ EventStoreNodeConnection.prototype.startTransaction = function(stream, expectedV
|
||||
resolve(result);
|
||||
}
|
||||
var operation = new StartTransactionOperation(self._settings.log, cb, self._settings.requireMaster, stream,
|
||||
expectedVersion, self, userCredentials || null);
|
||||
expectedVersion, self, userCredentials);
|
||||
self._enqueueOperation(operation);
|
||||
});
|
||||
};
|
||||
@ -172,11 +175,16 @@ EventStoreNodeConnection.prototype.startTransaction = function(stream, expectedV
|
||||
* @returns {EventStoreTransaction}
|
||||
*/
|
||||
EventStoreNodeConnection.prototype.continueTransaction = function(transactionId, userCredentials) {
|
||||
//TODO validations
|
||||
ensure.nonNegative(transactionId, "transactionId");
|
||||
|
||||
return new EventStoreTransaction(transactionId, userCredentials, this);
|
||||
};
|
||||
|
||||
EventStoreNodeConnection.prototype.transactionalWrite = function(transaction, events, userCredentials) {
|
||||
ensure.isTypeOf(EventStoreTransaction, transaction, "transaction");
|
||||
ensure.isArrayOf(EventData, events, "events");
|
||||
userCredentials = userCredentials || null;
|
||||
|
||||
var self = this;
|
||||
return new Promise(function(resolve, reject) {
|
||||
function cb(err) {
|
||||
@ -190,6 +198,8 @@ EventStoreNodeConnection.prototype.transactionalWrite = function(transaction, ev
|
||||
};
|
||||
|
||||
EventStoreNodeConnection.prototype.commitTransaction = function(transaction, userCredentials) {
|
||||
ensure.isTypeOf(EventStoreTransaction, transaction, "transaction");
|
||||
|
||||
var self = this;
|
||||
return new Promise(function(resolve, reject) {
|
||||
function cb(err, result) {
|
||||
|
@ -33,13 +33,13 @@ EventStoreTransaction.prototype.commit = function() {
|
||||
|
||||
/**
|
||||
* Write events (async)
|
||||
* @param {Array.<EventData>} events
|
||||
* @param {EventData|EventData[]} eventOrEvents
|
||||
* @returns {Promise}
|
||||
*/
|
||||
EventStoreTransaction.prototype.write = function(events) {
|
||||
EventStoreTransaction.prototype.write = function(eventOrEvents) {
|
||||
if (this._isRolledBack) throw new Error("can't write to a rolledback transaction");
|
||||
if (this._isCommitted) throw new Error("Transaction is already committed");
|
||||
if (!Array.isArray(events)) throw new Error("events must be an array.");
|
||||
var events = Array.isArray(eventOrEvents) ? eventOrEvents : [eventOrEvents];
|
||||
return this._connection.transactionalWrite(this, events);
|
||||
};
|
||||
|
||||
|
@ -67,7 +67,7 @@ module.exports = {
|
||||
.catch(function (err) {
|
||||
test.done(err);
|
||||
});
|
||||
},*/
|
||||
},
|
||||
'Test Commit Two Events Using Transaction': function(test) {
|
||||
this.conn.startTransaction(testStreamName, client.expectedVersion.any)
|
||||
.then(function(trx) {
|
||||
@ -90,7 +90,6 @@ module.exports = {
|
||||
test.done(err);
|
||||
});
|
||||
},
|
||||
/*
|
||||
'Test Read One Event': function(test) {
|
||||
this.conn.readEvent(testStreamName, 0)
|
||||
.then(function(result) {
|
||||
@ -231,14 +230,15 @@ module.exports = {
|
||||
function liveProcessingStarted() {
|
||||
liveProcessing = true;
|
||||
var events = [createRandomEvent()];
|
||||
self.conn.appendToStream('test', client.expectedVersion.any, events);
|
||||
self.conn.appendToStream(testStreamName, client.expectedVersion.any, events);
|
||||
}
|
||||
function subscriptionDropped(connection, reason, error) {
|
||||
test.ok(liveEvents.length === 1, "Expecting 1 live event, got " + liveEvents.length);
|
||||
test.ok(catchUpEvents.length > 1, "Expecting at least 1 catchUp event, got " + catchUpEvents.length);
|
||||
test.ok(catchUpEvents.length >= 1, "Expecting at least 1 catchUp event, got " + catchUpEvents.length);
|
||||
test.done(error);
|
||||
}
|
||||
var subscription = this.conn.subscribeToStreamFrom('test', null, false, eventAppeared, liveProcessingStarted, subscriptionDropped);
|
||||
//this.conn.appendToStream()
|
||||
var subscription = this.conn.subscribeToStreamFrom(testStreamName, null, false, eventAppeared, liveProcessingStarted, subscriptionDropped);
|
||||
},
|
||||
'Test Subscribe to All From': function(test) {
|
||||
var self = this;
|
||||
|
185
test/transactions_test.js
Normal file
185
test/transactions_test.js
Normal file
@ -0,0 +1,185 @@
|
||||
var uuid = require('uuid');
|
||||
var Long = require('long');
|
||||
var client = require('../src/client');
|
||||
|
||||
module.exports = {
|
||||
setUp: function(cb) {
|
||||
cb();
|
||||
},
|
||||
'Start A Transaction Happy Path': function(test) {
|
||||
this.conn.startTransaction(this.testStreamName, client.expectedVersion.noStream)
|
||||
.then(function(trx) {
|
||||
test.ok(Long.isLong(trx.transactionId), "trx.transactionId should be a Long.");
|
||||
test.done();
|
||||
})
|
||||
.catch(test.done);
|
||||
},
|
||||
/*
|
||||
'Start A Transaction With Wrong Expected Version': function(test) {
|
||||
this.conn.startTransaction(this.testStreamName, 10)
|
||||
.then(function(trx) {
|
||||
test.fail("Start Transaction with wrong expected version succeeded.");
|
||||
test.done();
|
||||
})
|
||||
.catch(function(err) {
|
||||
var isWrongExpectedVersion = err instanceof client.WrongExpectedVersionError;
|
||||
if (isWrongExpectedVersion) return test.done();
|
||||
test.done(err);
|
||||
});
|
||||
},
|
||||
'Start A Transaction With Deleted Stream': function(test) {
|
||||
var self = this;
|
||||
this.conn.deleteStream(this.testStreamName, client.expectedVersion.emptyStream)
|
||||
.then(function() {
|
||||
return self.conn.startTransaction(self.testStreamName, client.expectedVersion.any);
|
||||
})
|
||||
.then(function(trx) {
|
||||
test.fail("Start Transaction with deleted stream succeeded.");
|
||||
test.done();
|
||||
})
|
||||
.catch(function(err) {
|
||||
var isStreamDeleted = err instanceof client.StreamDeletedError;
|
||||
test.ok(isStreamDeleted, "Expected StreamDeletedError got " + err.constructor.name);
|
||||
if (isStreamDeleted) return test.done();
|
||||
test.done(err);
|
||||
});
|
||||
},
|
||||
*/
|
||||
'Start A Transaction With No Access': function(test) {
|
||||
var self = this;
|
||||
var metadata = {$acl: {$w: "$admins"}};
|
||||
this.conn.setStreamMetadataRaw(this.testStreamName, -1, metadata)
|
||||
.then(function() {
|
||||
return self.conn.startTransaction(self.testStreamName, client.expectedVersion.any);
|
||||
})
|
||||
.then(function(trx) {
|
||||
test.fail("Start Transaction with no access succeeded.");
|
||||
test.done();
|
||||
})
|
||||
.catch(function(err) {
|
||||
var isAccessDenied = err instanceof client.AccessDeniedError;
|
||||
test.ok(isAccessDenied, "Expected AccessDeniedError got " + err.constructor.name);
|
||||
if (isAccessDenied) return test.done();
|
||||
test.done(err);
|
||||
});
|
||||
},
|
||||
'Continue A Transaction Happy Path': function(test) {
|
||||
var self = this;
|
||||
this.conn.startTransaction(this.testStreamName, client.expectedVersion.emptyStream)
|
||||
.then(function(trx) {
|
||||
return trx.write(client.createJsonEventData(uuid.v4(), {a: Math.random()}, null, 'anEvent'))
|
||||
.then(function () {
|
||||
return self.conn.continueTransaction(trx.transactionId);
|
||||
});
|
||||
})
|
||||
.then(function(trx) {
|
||||
return trx.write(client.createJsonEventData(uuid.v4(), {a: Math.random()}, null, 'anEvent'))
|
||||
.then(function() {
|
||||
return trx.commit();
|
||||
})
|
||||
.then(function() {
|
||||
test.done();
|
||||
});
|
||||
})
|
||||
.catch(test.done);
|
||||
},
|
||||
'Write/Commit Transaction Happy Path': function(test) {
|
||||
var self = this;
|
||||
this.conn.startTransaction(this.testStreamName, client.expectedVersion.emptyStream)
|
||||
.then(function(trx) {
|
||||
self.events = [];
|
||||
for(var i = 0; i < 15; i++) {
|
||||
var event = {a: uuid.v4(), b: Math.random()};
|
||||
self.events.push(client.createJsonEventData(uuid.v4(), event, null, 'anEvent'));
|
||||
}
|
||||
return trx.write(self.events)
|
||||
.then(function() {
|
||||
var events = [];
|
||||
for(var j = 0; j < 9; j++) {
|
||||
var event = {a: Math.random(), b: uuid.v4()};
|
||||
events.push(client.createJsonEventData(uuid.v4(), event, null, 'anotherEvent'));
|
||||
}
|
||||
Array.prototype.push.apply(self.events, events);
|
||||
trx.write(events);
|
||||
})
|
||||
.then(function() {
|
||||
return trx.commit();
|
||||
});
|
||||
})
|
||||
.then(function(result) {
|
||||
test.ok(result.logPosition, "Missing result.logPosition");
|
||||
test.areEqual("result.nextExpectedVersion", result.nextExpectedVersion, self.events.length-1);
|
||||
test.done();
|
||||
})
|
||||
.catch(test.done);
|
||||
},
|
||||
'Write/Commit Transaction With Wrong Expected Version': function(test) {
|
||||
this.conn.startTransaction(this.testStreamName, 10)
|
||||
.then(function(trx) {
|
||||
return trx.write(client.createJsonEventData(uuid.v4(), {a: Math.random(), b: uuid.v4()}, null, 'anEvent'))
|
||||
.then(function() {
|
||||
return trx.commit();
|
||||
});
|
||||
})
|
||||
.then(function() {
|
||||
test.fail("Commit on transaction with wrong expected version succeeded.");
|
||||
test.done();
|
||||
})
|
||||
.catch(function(err) {
|
||||
var isWrongExpectedVersion = err instanceof client.WrongExpectedVersionError;
|
||||
test.ok(isWrongExpectedVersion, "Expected WrongExpectedVersionError, but got " + err.constructor.name);
|
||||
if (isWrongExpectedVersion) return test.done();
|
||||
test.done(err);
|
||||
});
|
||||
},
|
||||
'Write/Commit Transaction With Deleted Stream': function(test) {
|
||||
var self = this;
|
||||
this.conn.deleteStream(this.testStreamName, client.expectedVersion.emptyStream, true)
|
||||
.then(function() {
|
||||
return self.conn.startTransaction(self.testStreamName, client.expectedVersion.any);
|
||||
})
|
||||
.then(function(trx) {
|
||||
return trx.write(client.createJsonEventData(uuid.v4(), {a: Math.random(), b: uuid.v4()}, null, 'anEvent'))
|
||||
.then(function() {
|
||||
return trx.commit();
|
||||
});
|
||||
})
|
||||
.then(function() {
|
||||
test.fail("Commit on transaction on deleted stream succeeded.");
|
||||
test.done();
|
||||
})
|
||||
.catch(function(err) {
|
||||
var isStreamDeleted = err instanceof client.StreamDeletedError;
|
||||
test.ok(isStreamDeleted, "Expected StreamDeletedError, but got " + err.constructor.name);
|
||||
if (isStreamDeleted) return test.done();
|
||||
test.done(err);
|
||||
});
|
||||
},
|
||||
'Write/Commit Transaction With No Write Access': function(test) {
|
||||
var self = this;
|
||||
this.conn.startTransaction(this.testStreamName, client.expectedVersion.any)
|
||||
.then(function(trx) {
|
||||
var metadata = {$acl: {$w: "$admins"}};
|
||||
return self.conn.setStreamMetadataRaw(self.testStreamName, -1, metadata)
|
||||
.then(function () {
|
||||
return trx.write(client.createJsonEventData(uuid.v4(), {a: Math.random(), b: uuid.v4()}, null, 'anEvent'))
|
||||
.then(function () {
|
||||
return trx.commit();
|
||||
});
|
||||
})
|
||||
})
|
||||
.then(function() {
|
||||
test.fail("Commit on transaction on deleted stream succeeded.");
|
||||
test.done();
|
||||
})
|
||||
.catch(function(err) {
|
||||
var isAccessDenied = err instanceof client.AccessDeniedError;
|
||||
test.ok(isAccessDenied, "Expected AccessDeniedError, but got " + err.constructor.name);
|
||||
if (isAccessDenied) return test.done();
|
||||
test.done(err);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
require('./common/base_test').init(module.exports);
|
||||
|
Loading…
Reference in New Issue
Block a user