Add Object.freeze on private enums

Improve code readability
This commit is contained in:
Nicolas Dextraze 2018-07-09 10:27:12 -07:00
parent c3a63ff8b7
commit 215708014c
55 changed files with 288 additions and 327 deletions

View File

@ -53,29 +53,29 @@ function createEventData(eventId, type, isJson, data, metadata) {
} }
// Expose classes // Expose classes
module.exports.Position = results.Position; exports.Position = results.Position;
module.exports.UserCredentials = require('./systemData/userCredentials'); exports.UserCredentials = require('./systemData/userCredentials');
module.exports.PersistentSubscriptionSettings = require('./persistentSubscriptionSettings'); exports.PersistentSubscriptionSettings = require('./persistentSubscriptionSettings');
module.exports.SystemConsumerStrategies = require('./systemConsumerStrategies'); exports.SystemConsumerStrategies = require('./systemConsumerStrategies');
module.exports.GossipSeed = require('./gossipSeed'); exports.GossipSeed = require('./gossipSeed');
module.exports.EventStoreConnection = require('./eventStoreConnection'); exports.EventStoreConnection = require('./eventStoreConnection');
module.exports.ProjectionsManager = require('./projections/projectionsManager'); exports.ProjectionsManager = require('./projections/projectionsManager');
// Expose errors // Expose errors
module.exports.WrongExpectedVersionError = require('./errors/wrongExpectedVersionError'); exports.WrongExpectedVersionError = require('./errors/wrongExpectedVersionError');
module.exports.StreamDeletedError = require('./errors/streamDeletedError'); exports.StreamDeletedError = require('./errors/streamDeletedError');
module.exports.AccessDeniedError = require('./errors/accessDeniedError'); exports.AccessDeniedError = require('./errors/accessDeniedError');
module.exports.ProjectionCommandFailedError = require('./errors/projectionCommandFailedError'); exports.ProjectionCommandFailedError = require('./errors/projectionCommandFailedError');
// Expose enums/constants // Expose enums/constants
module.exports.expectedVersion = expectedVersion; exports.expectedVersion = expectedVersion;
module.exports.positions = positions; exports.positions = positions;
module.exports.streamPosition = streamPosition; exports.streamPosition = streamPosition;
module.exports.systemMetadata = require('./common/systemMetadata'); exports.systemMetadata = require('./common/systemMetadata');
module.exports.eventReadStatus = results.EventReadStatus; exports.eventReadStatus = results.EventReadStatus;
module.exports.sliceReadStatus = require('./sliceReadStatus'); exports.sliceReadStatus = require('./sliceReadStatus');
// Expose loggers // Expose loggers
module.exports.NoopLogger = require('./common/log/noopLogger'); exports.NoopLogger = require('./common/log/noopLogger');
module.exports.FileLogger = require('./common/log/fileLogger'); exports.FileLogger = require('./common/log/fileLogger');
// Expose Helper functions // Expose Helper functions
module.exports.createConnection = require('./eventStoreConnection').create; exports.createConnection = require('./eventStoreConnection').create;
module.exports.createJsonEventData = createJsonEventData; exports.createJsonEventData = createJsonEventData;
module.exports.createEventData = createEventData; exports.createEventData = createEventData;

View File

@ -44,8 +44,7 @@ AppendToStreamOperation.prototype._inspectResponse = function(response) {
switch (response.result) switch (response.result)
{ {
case ClientMessage.OperationResult.Success: case ClientMessage.OperationResult.Success:
if (this._wasCommitTimeout) if (this._wasCommitTimeout) this.log.debug("IDEMPOTENT WRITE SUCCEEDED FOR %s.", this);
this.log.debug("IDEMPOTENT WRITE SUCCEEDED FOR %s.", this);
this._succeed(); this._succeed();
return new InspectionResult(InspectionDecision.EndOperation, "Success"); return new InspectionResult(InspectionDecision.EndOperation, "Success");
case ClientMessage.OperationResult.PrepareTimeout: case ClientMessage.OperationResult.PrepareTimeout:

View File

@ -1,5 +1,4 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var TcpCommand = require('../systemData/tcpCommand'); var TcpCommand = require('../systemData/tcpCommand');
var InspectionDecision = require('../systemData/inspectionDecision'); var InspectionDecision = require('../systemData/inspectionDecision');

View File

@ -1,5 +1,4 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var ensure = require('../common/utils/ensure'); var ensure = require('../common/utils/ensure');
var OperationBase = require('../clientOperations/operationBase'); var OperationBase = require('../clientOperations/operationBase');

View File

@ -1,7 +1,5 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var ensure = require('../common/utils/ensure');
var OperationBase = require('../clientOperations/operationBase'); var OperationBase = require('../clientOperations/operationBase');
var TcpCommand = require('../systemData/tcpCommand'); var TcpCommand = require('../systemData/tcpCommand');
var ClientMessage = require('../messages/clientMessage'); var ClientMessage = require('../messages/clientMessage');

View File

@ -1,5 +1,4 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var TcpCommand = require('../systemData/tcpCommand'); var TcpCommand = require('../systemData/tcpCommand');
var InspectionDecision = require('../systemData/inspectionDecision'); var InspectionDecision = require('../systemData/inspectionDecision');

View File

@ -42,10 +42,11 @@ OperationBase.prototype._succeed = function() {
if (!this._completed) { if (!this._completed) {
this._completed = true; this._completed = true;
if (this._response) if (this._response) {
this._cb(null, this._transformResponse(this._response)); this._cb(null, this._transformResponse(this._response));
else } else {
this._cb(new Error("No result.")) this._cb(new Error("No result."))
}
} }
}; };
@ -130,8 +131,7 @@ OperationBase.prototype._inspectNotHandled = function(pkg)
OperationBase.prototype._inspectUnexpectedCommand = function(pkg, expectedCommand) OperationBase.prototype._inspectUnexpectedCommand = function(pkg, expectedCommand)
{ {
if (pkg.command === expectedCommand) if (pkg.command === expectedCommand) throw new Error("Command shouldn't be " + TcpCommand.getName(pkg.command));
throw new Error("Command shouldn't be " + TcpCommand.getName(pkg.command));
this.log.error("Unexpected TcpCommand received.\n" this.log.error("Unexpected TcpCommand received.\n"
+ "Expected: %s, Actual: %s, Flags: %s, CorrelationId: %s\n" + "Expected: %s, Actual: %s, Flags: %s, CorrelationId: %s\n"

View File

@ -1,5 +1,4 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var TcpCommand = require('../systemData/tcpCommand'); var TcpCommand = require('../systemData/tcpCommand');
var ClientMessage = require('../messages/clientMessage'); var ClientMessage = require('../messages/clientMessage');

View File

@ -1,5 +1,4 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var TcpCommand = require('../systemData/tcpCommand'); var TcpCommand = require('../systemData/tcpCommand');
var ClientMessage = require('../messages/clientMessage'); var ClientMessage = require('../messages/clientMessage');

View File

@ -59,7 +59,6 @@ ReadEventOperation.prototype._transformResponse = function(response) {
return new results.EventReadResult(convert(response.result), this._stream, this._eventNumber, response.event); return new results.EventReadResult(convert(response.result), this._stream, this._eventNumber, response.event);
}; };
function convert(result) function convert(result)
{ {
switch (result) switch (result)

View File

@ -1,5 +1,4 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var TcpCommand = require('../systemData/tcpCommand'); var TcpCommand = require('../systemData/tcpCommand');
var ClientMessage = require('../messages/clientMessage'); var ClientMessage = require('../messages/clientMessage');

View File

@ -1,5 +1,4 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var TcpCommand = require('../systemData/tcpCommand'); var TcpCommand = require('../systemData/tcpCommand');
var ClientMessage = require('../messages/clientMessage'); var ClientMessage = require('../messages/clientMessage');

View File

@ -1,12 +1,10 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var TcpCommand = require('../systemData/tcpCommand'); var TcpCommand = require('../systemData/tcpCommand');
var InspectionDecision = require('../systemData/inspectionDecision'); var InspectionDecision = require('../systemData/inspectionDecision');
var InspectionResult = require('./../systemData/inspectionResult'); var InspectionResult = require('./../systemData/inspectionResult');
var ClientMessage = require('../messages/clientMessage'); var ClientMessage = require('../messages/clientMessage');
var EventStoreTransaction = require('../eventStoreTransaction'); var EventStoreTransaction = require('../eventStoreTransaction');
var results = require('../results');
var AccessDeniedError = require('../errors/accessDeniedError'); var AccessDeniedError = require('../errors/accessDeniedError');
var WrongExpectedVersionError = require('../errors/wrongExpectedVersionError'); var WrongExpectedVersionError = require('../errors/wrongExpectedVersionError');
var StreamDeletedError = require('../errors/streamDeletedError'); var StreamDeletedError = require('../errors/streamDeletedError');

View File

@ -1,5 +1,4 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var TcpCommand = require('../systemData/tcpCommand'); var TcpCommand = require('../systemData/tcpCommand');
var TcpFlags = require('../systemData/tcpFlags'); var TcpFlags = require('../systemData/tcpFlags');
@ -46,8 +45,7 @@ SubscriptionOperation.prototype._enqueueSend = function(pkg) {
SubscriptionOperation.prototype.subscribe = function(correlationId, connection) { SubscriptionOperation.prototype.subscribe = function(correlationId, connection) {
if (connection === null) throw new TypeError("connection is null."); if (connection === null) throw new TypeError("connection is null.");
if (this._subscription !== null || this._unsubscribed) if (this._subscription !== null || this._unsubscribed) return false;
return false;
this._correlationId = correlationId; this._correlationId = correlationId;
connection.enqueueSend(this._createSubscriptionPackage()); connection.enqueueSend(this._createSubscriptionPackage());
@ -76,9 +74,7 @@ SubscriptionOperation.prototype.inspectPackage = function(pkg) {
try try
{ {
var result = this._inspectPackage(pkg); var result = this._inspectPackage(pkg);
if (result !== null) { if (result !== null) return result;
return result;
}
switch (pkg.command) switch (pkg.command)
{ {
@ -128,8 +124,9 @@ SubscriptionOperation.prototype.inspectPackage = function(pkg) {
case TcpCommand.NotHandled: case TcpCommand.NotHandled:
{ {
if (this._subscription !== null) if (this._subscription !== null) {
throw new Error("NotHandled command appeared while we already subscribed."); throw new Error("NotHandled command appeared while we already subscribed.");
}
var message = ClientMessage.NotHandled.decode(pkg.data.toBuffer()); var message = ClientMessage.NotHandled.decode(pkg.data.toBuffer());
switch (message.reason) switch (message.reason)
@ -172,8 +169,7 @@ SubscriptionOperation.prototype.connectionClosed = function() {
}; };
SubscriptionOperation.prototype.timeOutSubscription = function() { SubscriptionOperation.prototype.timeOutSubscription = function() {
if (this._subscription !== null) if (this._subscription !== null) return false;
return false;
this.dropSubscription(SubscriptionDropReason.SubscribingError, null); this.dropSubscription(SubscriptionDropReason.SubscribingError, null);
return true; return true;
}; };
@ -182,9 +178,10 @@ SubscriptionOperation.prototype.dropSubscription = function(reason, err, connect
if (!this._unsubscribed) if (!this._unsubscribed)
{ {
this._unsubscribed = true; this._unsubscribed = true;
if (this._verboseLogging) if (this._verboseLogging) {
this._log.debug("Subscription %s to %s: closing subscription, reason: %s, exception: %s...", this._log.debug("Subscription %s to %s: closing subscription, reason: %s, exception: %s...",
this._correlationId, this._streamId || "<all>", reason, err); this._correlationId, this._streamId || "<all>", reason, err);
}
if (reason !== SubscriptionDropReason.UserInitiated && this._subscription === null) if (reason !== SubscriptionDropReason.UserInitiated && this._subscription === null)
{ {
@ -193,24 +190,31 @@ SubscriptionOperation.prototype.dropSubscription = function(reason, err, connect
return; return;
} }
if (reason === SubscriptionDropReason.UserInitiated && this._subscription !== null && connection !== null) if (reason === SubscriptionDropReason.UserInitiated && this._subscription !== null && connection !== null) {
connection.enqueueSend(this._createUnsubscriptionPackage()); connection.enqueueSend(this._createUnsubscriptionPackage());
}
var self = this; var self = this;
if (this._subscription !== null) if (this._subscription !== null) {
this._executeAction(function() { self._subscriptionDropped(self._subscription, reason, err); }); this._executeAction(function () {
self._subscriptionDropped(self._subscription, reason, err);
});
}
} }
}; };
SubscriptionOperation.prototype._confirmSubscription = function(lastCommitPosition, lastEventNumber) { SubscriptionOperation.prototype._confirmSubscription = function(lastCommitPosition, lastEventNumber) {
if (lastCommitPosition < -1) if (lastCommitPosition < -1) {
throw new Error(util.format("Invalid lastCommitPosition %s on subscription confirmation.", lastCommitPosition)); throw new Error(util.format("Invalid lastCommitPosition %s on subscription confirmation.", lastCommitPosition));
if (this._subscription !== null) }
if (this._subscription !== null) {
throw new Error("Double confirmation of subscription."); throw new Error("Double confirmation of subscription.");
}
if (this._verboseLogging) if (this._verboseLogging) {
this._log.debug("Subscription %s to %s: subscribed at CommitPosition: %d, EventNumber: %d.", this._log.debug("Subscription %s to %s: subscribed at CommitPosition: %d, EventNumber: %d.",
this._correlationId, this._streamId || "<all>", lastCommitPosition, lastEventNumber); this._correlationId, this._streamId || "<all>", lastCommitPosition, lastEventNumber);
}
this._subscription = this._createSubscriptionObject(lastCommitPosition, lastEventNumber); this._subscription = this._createSubscriptionObject(lastCommitPosition, lastEventNumber);
this._cb(null, this._subscription); this._cb(null, this._subscription);
@ -221,15 +225,15 @@ SubscriptionOperation.prototype._createSubscriptionObject = function(lastCommitP
}; };
SubscriptionOperation.prototype._onEventAppeared = function(e) { SubscriptionOperation.prototype._onEventAppeared = function(e) {
if (this._unsubscribed) if (this._unsubscribed) return;
return;
if (this._subscription === null) throw new Error("Subscription not confirmed, but event appeared!"); if (this._subscription === null) throw new Error("Subscription not confirmed, but event appeared!");
if (this._verboseLogging) if (this._verboseLogging) {
this._log.debug("Subscription %s to %s: event appeared (%s, %d, %s @ %s).", this._log.debug("Subscription %s to %s: event appeared (%s, %d, %s @ %s).",
this._correlationId, this._streamId || "<all>", this._correlationId, this._streamId || "<all>",
e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition); e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition);
}
var self = this; var self = this;
this._executeAction(function() { return self._eventAppeared(self._subscription, e); }); this._executeAction(function() { return self._eventAppeared(self._subscription, e); });

View File

@ -65,4 +65,3 @@ TransactionalWriteOperation.prototype.toString = function() {
}; };
module.exports = TransactionalWriteOperation; module.exports = TransactionalWriteOperation;

View File

@ -1,5 +1,4 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var ensure = require('../common/utils/ensure'); var ensure = require('../common/utils/ensure');
var OperationBase = require('../clientOperations/operationBase'); var OperationBase = require('../clientOperations/operationBase');

View File

@ -19,8 +19,7 @@ BufferSegment.prototype.toString = function() {
}; };
BufferSegment.prototype.toBuffer = function() { BufferSegment.prototype.toBuffer = function() {
if (this.offset === 0 && this.count === this.buffer.length) if (this.offset === 0 && this.count === this.buffer.length) return this.buffer;
return this.buffer;
return this.buffer.slice(this.offset, this.offset + this.count); return this.buffer.slice(this.offset, this.offset + this.count);
}; };

View File

@ -1,5 +1,5 @@
const SystemEventTypes = { const SystemEventTypes = Object.freeze({
StreamMetadata: '$metadata' StreamMetadata: '$metadata'
}; });
module.exports = SystemEventTypes; module.exports = SystemEventTypes;

View File

@ -1,4 +1,4 @@
const SystemMetadata = { const SystemMetadata = Object.freeze({
maxAge: '$maxAge', maxAge: '$maxAge',
maxCount: '$maxCount', maxCount: '$maxCount',
truncateBefore: '$tb', truncateBefore: '$tb',
@ -11,7 +11,6 @@ const SystemMetadata = {
aclMetaWrite: '$mw', aclMetaWrite: '$mw',
userStreamAcl: '$userStreamAcl', userStreamAcl: '$userStreamAcl',
systemStreamAcl: '$systemStreamAcl' systemStreamAcl: '$systemStreamAcl'
}; });
Object.freeze(SystemMetadata);
module.exports = SystemMetadata; module.exports = SystemMetadata;

View File

@ -1,6 +1,6 @@
module.exports.metastreamOf = function(stream) { exports.metastreamOf = function(stream) {
return '$$' + stream; return '$$' + stream;
}; };
module.exports.isMetastream = function(stream) { exports.isMetastream = function(stream) {
return stream.indexOf('$$') === 0; return stream.indexOf('$$') === 0;
}; };

View File

@ -1,20 +1,16 @@
var Long = require('long'); var Long = require('long');
module.exports.notNullOrEmpty = function(value, name) { module.exports.notNullOrEmpty = function(value, name) {
if (value === null) if (value === null) throw new TypeError(name + " should not be null.");
throw new TypeError(name + " should not be null."); if (value === '') throw new Error(name + " should not be empty.");
if (value === '')
throw new Error(name + " should not be empty.");
}; };
module.exports.notNull = function(value, name) { module.exports.notNull = function(value, name) {
if (value === null) if (value === null) throw new TypeError(name + " should not be null.");
throw new TypeError(name + " should not be null.");
}; };
function isInteger(value, name) { function isInteger(value, name) {
if (typeof value !== 'number' || value % 1 !== 0) if (typeof value !== 'number' || value % 1 !== 0) throw new TypeError(name + " should be an integer.");
throw new TypeError(name + " should be an integer.");
} }
module.exports.isInteger = isInteger; module.exports.isInteger = isInteger;
@ -27,24 +23,23 @@ module.exports.isLongOrInteger = function(value, name) {
}; };
module.exports.isArrayOf = function(expectedType, value, name) { module.exports.isArrayOf = function(expectedType, value, name) {
if (!Array.isArray(value)) if (!Array.isArray(value)) throw new TypeError(name + " should be an array.");
throw new TypeError(name + " should be an array."); if (!value.every(function(x) { return x instanceof expectedType; })) {
if (!value.every(function(x) { return x instanceof expectedType; }))
throw new TypeError([name, " should be an array of ", expectedType.name, "."].join("")); throw new TypeError([name, " should be an array of ", expectedType.name, "."].join(""));
}
}; };
module.exports.isTypeOf = function(expectedType, value, name, nullAllowed) { module.exports.isTypeOf = function(expectedType, value, name, nullAllowed) {
if (nullAllowed && value === null) return; if (nullAllowed && value === null) return;
if (!(value instanceof expectedType)) if (!(value instanceof expectedType)) {
throw new TypeError([name, " should be of type '", expectedType.name, "'", nullAllowed ? " or null": "", "."].join("")); throw new TypeError([name, " should be of type '", expectedType.name, "'", nullAllowed ? " or null" : "", "."].join(""));
}
}; };
module.exports.positive = function(value, name) { module.exports.positive = function(value, name) {
if (value <= 0) if (value <= 0) throw new Error(name + " should be positive.");
throw new Error(name + " should be positive.");
}; };
module.exports.nonNegative = function(value, name) { module.exports.nonNegative = function(value, name) {
if (value < 0) if (value < 0) throw new Error(name + " should be non-negative.");
throw new Error(name + " should be non-negative.");
}; };

View File

@ -34,18 +34,19 @@ ClusterDnsEndPointDiscoverer.prototype.discover = function(failedTcpEndPoint) {
function discover(resolve, reject) { function discover(resolve, reject) {
self._discoverEndPoint(failedTcpEndPoint) self._discoverEndPoint(failedTcpEndPoint)
.then(function (endPoints) { .then(function (endPoints) {
if (!endPoints) if (!endPoints) {
self._log.info(util.format("Discovering attempt %d/%d failed: no candidate found.", attempt, self._maxDiscoverAttempts)); self._log.info(util.format("Discovering attempt %d/%d failed: no candidate found.", attempt, self._maxDiscoverAttempts));
}
return endPoints; return endPoints;
}) })
.catch(function (exc) { .catch(function (exc) {
self._log.info(util.format("Discovering attempt %d/%d failed with error: %s.\n%s", attempt, self._maxDiscoverAttempts, exc, exc.stack)); self._log.info(util.format("Discovering attempt %d/%d failed with error: %s.\n%s", attempt, self._maxDiscoverAttempts, exc, exc.stack));
}) })
.then(function (endPoints) { .then(function (endPoints) {
if (endPoints) if (endPoints) return resolve(endPoints);
return resolve(endPoints); if (attempt++ === self._maxDiscoverAttempts) {
if (attempt++ === self._maxDiscoverAttempts)
return reject(new Error('Failed to discover candidate in ' + self._maxDiscoverAttempts + ' attempts.')); return reject(new Error('Failed to discover candidate in ' + self._maxDiscoverAttempts + ' attempts.'));
}
setTimeout(discover, 500, resolve, reject); setTimeout(discover, 500, resolve, reject);
}); });
} }
@ -74,8 +75,7 @@ ClusterDnsEndPointDiscoverer.prototype._discoverEndPoint = function (failedTcpEn
if (endPoints) return endPoints; if (endPoints) return endPoints;
return self._tryGetGossipFrom(gossipCandidates[j++]) return self._tryGetGossipFrom(gossipCandidates[j++])
.then(function (gossip) { .then(function (gossip) {
if (!gossip || !gossip.members || gossip.members.length === 0) if (!gossip || !gossip.members || gossip.members.length === 0) return;
return;
var bestNode = self._tryDetermineBestNode(gossip.members); var bestNode = self._tryDetermineBestNode(gossip.members);
if (bestNode) { if (bestNode) {
self._oldGossip = gossip.members; self._oldGossip = gossip.members;
@ -105,10 +105,11 @@ ClusterDnsEndPointDiscoverer.prototype._arrangeGossipCandidates = function (memb
var j = members.length; var j = members.length;
for (var k = 0; k < members.length; ++k) for (var k = 0; k < members.length; ++k)
{ {
if (members[k].state === 'Manager') if (members[k].state === 'Manager') {
result[--j] = new GossipSeed({host: members[k].externalHttpIp, port: members[k].externalHttpPort}); result[--j] = new GossipSeed({host: members[k].externalHttpIp, port: members[k].externalHttpPort});
else } else {
result[++i] = new GossipSeed({host: members[k].externalHttpIp, port: members[k].externalHttpPort}); result[++i] = new GossipSeed({host: members[k].externalHttpIp, port: members[k].externalHttpPort});
}
} }
this._randomShuffle(result, 0, i); // shuffle nodes this._randomShuffle(result, 0, i); // shuffle nodes
this._randomShuffle(result, j, members.length - 1); // shuffle managers this._randomShuffle(result, j, members.length - 1); // shuffle managers
@ -122,8 +123,7 @@ ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromDns = function ()
var endpoints = self._gossipSeeds; var endpoints = self._gossipSeeds;
self._randomShuffle(endpoints, 0, endpoints.length - 1); self._randomShuffle(endpoints, 0, endpoints.length - 1);
resolve(endpoints); resolve(endpoints);
} } else {
else {
const dnsOptions = { const dnsOptions = {
family: 4, family: 4,
hints: dns.ADDRCONFIG | dns.V4MAPPED, hints: dns.ADDRCONFIG | dns.V4MAPPED,
@ -192,7 +192,7 @@ ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) {
}); });
}; };
const VNodeStates = { const VNodeStates = Object.freeze({
'Initializing': 0, 'Initializing': 0,
'Unknown': 1, 'Unknown': 1,
'PreReplica': 2, 'PreReplica': 2,
@ -204,7 +204,7 @@ const VNodeStates = {
'Manager': 8, 'Manager': 8,
'ShuttingDown': 9, 'ShuttingDown': 9,
'Shutdown': 10 'Shutdown': 10
}; });
ClusterDnsEndPointDiscoverer.prototype._tryDetermineBestNode = function (members) { ClusterDnsEndPointDiscoverer.prototype._tryDetermineBestNode = function (members) {
var notAllowedStates = [ var notAllowedStates = [
@ -240,8 +240,7 @@ function rndNext(min, max) {
} }
ClusterDnsEndPointDiscoverer.prototype._randomShuffle = function (arr, i, j) { ClusterDnsEndPointDiscoverer.prototype._randomShuffle = function (arr, i, j) {
if (i >= j) if (i >= j) return;
return;
for (var k = i; k <= j; ++k) for (var k = i; k <= j; ++k)
{ {
var index = rndNext(k, j + 1); var index = rndNext(k, j + 1);

View File

@ -17,14 +17,14 @@ var TcpCommand = require('../systemData/tcpCommand');
var TcpFlags = require('../systemData/tcpFlags'); var TcpFlags = require('../systemData/tcpFlags');
var InspectionDecision = require('../systemData/inspectionDecision'); var InspectionDecision = require('../systemData/inspectionDecision');
const ConnectionState = { const ConnectionState = Object.freeze({
Init: 'init', Init: 'init',
Connecting: 'connecting', Connecting: 'connecting',
Connected: 'connected', Connected: 'connected',
Closed: 'closed' Closed: 'closed'
}; });
const ConnectingPhase = { const ConnectingPhase = Object.freeze({
Invalid: 'invalid', Invalid: 'invalid',
Reconnecting: 'reconnecting', Reconnecting: 'reconnecting',
EndPointDiscovery: 'endpointDiscovery', EndPointDiscovery: 'endpointDiscovery',
@ -32,7 +32,7 @@ const ConnectingPhase = {
Authentication: 'authentication', Authentication: 'authentication',
Identification: 'identification', Identification: 'identification',
Connected: 'connected' Connected: 'connected'
}; });
const TimerPeriod = 200; const TimerPeriod = 200;
const TimerTickMessage = new messages.TimerTickMessage(); const TimerTickMessage = new messages.TimerTickMessage();
@ -187,8 +187,7 @@ EventStoreConnectionLogicHandler.prototype._closeConnection = function(reason, e
this._logInfo("Closed. Reason: %s", reason); this._logInfo("Closed. Reason: %s", reason);
if (error) if (error) this.emit('error', error);
this.emit('error', error);
this.emit('closed', reason); this.emit('closed', reason);
}; };
@ -205,7 +204,7 @@ EventStoreConnectionLogicHandler.prototype._closeTcpConnection = function(reason
this._connection = null; this._connection = null;
}; };
var _nextSeqNo = -1; var _nextSeqNo = 0;
function createOperationItem(operation, maxRetries, timeout) { function createOperationItem(operation, maxRetries, timeout) {
var operationItem = { var operationItem = {
seqNo: _nextSeqNo++, seqNo: _nextSeqNo++,
@ -283,10 +282,11 @@ EventStoreConnectionLogicHandler.prototype._startSubscription = function(msg) {
this._state === ConnectionState.Connected ? "fire" : "enqueue", this._state === ConnectionState.Connected ? "fire" : "enqueue",
operation.constructor.name, operation, msg.maxRetries, msg.timeout); operation.constructor.name, operation, msg.maxRetries, msg.timeout);
var subscription = createSubscriptionItem(operation, msg.maxRetries, msg.timeout); var subscription = createSubscriptionItem(operation, msg.maxRetries, msg.timeout);
if (this._state === ConnectionState.Connecting) if (this._state === ConnectionState.Connecting) {
this._subscriptions.enqueueSubscription(subscription); this._subscriptions.enqueueSubscription(subscription);
else } else {
this._subscriptions.startSubscription(subscription, this._connection); this._subscriptions.startSubscription(subscription, this._connection);
}
break; break;
case ConnectionState.Closed: case ConnectionState.Closed:
msg.cb(new Error("Connection closed. Connection: " + this._esConnection.connectionName)); msg.cb(new Error("Connection closed. Connection: " + this._esConnection.connectionName));
@ -312,10 +312,11 @@ EventStoreConnectionLogicHandler.prototype._startPersistentSubscription = functi
this._state === ConnectionState.Connected ? "fire" : "enqueue", this._state === ConnectionState.Connected ? "fire" : "enqueue",
operation.constructor.name, operation, msg.maxRetries, msg.timeout); operation.constructor.name, operation, msg.maxRetries, msg.timeout);
var subscription = createSubscriptionItem(operation, msg.maxRetries, msg.timeout); var subscription = createSubscriptionItem(operation, msg.maxRetries, msg.timeout);
if (this._state === ConnectionState.Connecting) if (this._state === ConnectionState.Connecting) {
this._subscriptions.enqueueSubscription(subscription); this._subscriptions.enqueueSubscription(subscription);
else } else {
this._subscriptions.startSubscription(subscription, this._connection); this._subscriptions.startSubscription(subscription, this._connection);
}
break; break;
case ConnectionState.Closed: case ConnectionState.Closed:
msg.cb(new Error("Connection closed. " + this._esConnection.connectionName)); msg.cb(new Error("Connection closed. " + this._esConnection.connectionName));
@ -561,8 +562,9 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti
default: default:
throw new Error("Unknown InspectionDecision: " + result.decision); throw new Error("Unknown InspectionDecision: " + result.decision);
} }
if (this._state === ConnectionState.Connected) if (this._state === ConnectionState.Connected) {
this._operations.scheduleWaitingOperations(connection); this._operations.scheduleWaitingOperations(connection);
}
return; return;
} }
@ -609,8 +611,7 @@ EventStoreConnectionLogicHandler.prototype._reconnectTo = function(endPoints) {
return; return;
} }
if (this._state !== ConnectionState.Connected || this._connection.remoteEndPoint === endPoint) if (this._state !== ConnectionState.Connected || this._connection.remoteEndPoint === endPoint) return;
return;
var msg = util.format("EventStoreConnection '%s': going to reconnect to [%j]. Current endpoint: [%j, L%j].", var msg = util.format("EventStoreConnection '%s': going to reconnect to [%j]. Current endpoint: [%j, L%j].",
this._esConnection.connectionName, endPoint, this._connection.remoteEndPoint, this._connection.localEndPoint); this._esConnection.connectionName, endPoint, this._connection.remoteEndPoint, this._connection.localEndPoint);
@ -627,30 +628,26 @@ EventStoreConnectionLogicHandler.prototype._timerTick = function() {
{ {
case ConnectionState.Init: break; case ConnectionState.Init: break;
case ConnectionState.Connecting: case ConnectionState.Connecting:
if (this._connectingPhase === ConnectingPhase.Reconnecting && (Date.now() - this._reconnInfo.timeStamp) >= this._settings.reconnectionDelay) if (this._connectingPhase === ConnectingPhase.Reconnecting && (Date.now() - this._reconnInfo.timeStamp) >= this._settings.reconnectionDelay) {
{
this._logDebug("TimerTick checking reconnection..."); this._logDebug("TimerTick checking reconnection...");
this._reconnInfo = {reconnectionAttempt: this._reconnInfo.reconnectionAttempt + 1, timeStamp: Date.now()}; this._reconnInfo = {reconnectionAttempt: this._reconnInfo.reconnectionAttempt + 1, timeStamp: Date.now()};
if (this._settings.maxReconnections >= 0 && this._reconnInfo.reconnectionAttempt > this._settings.maxReconnections) if (this._settings.maxReconnections >= 0 && this._reconnInfo.reconnectionAttempt > this._settings.maxReconnections) {
this._closeConnection("Reconnection limit reached."); this._closeConnection("Reconnection limit reached.");
else } else {
{
this.emit('reconnecting', {}); this.emit('reconnecting', {});
this._discoverEndpoint(null); this._discoverEndpoint(null);
} }
} } else if (this._connectingPhase === ConnectingPhase.Authentication && (Date.now() - this._authInfo.timeStamp) >= this._settings.operationTimeout) {
else if (this._connectingPhase === ConnectingPhase.Authentication && (Date.now() - this._authInfo.timeStamp) >= this._settings.operationTimeout)
{
this.emit('authenticationFailed', "Authentication timed out."); this.emit('authenticationFailed', "Authentication timed out.");
if (this._clientVersion === 1) { if (this._clientVersion === 1) {
this._goToIdentifiedState(); this._goToIdentifiedState();
} else { } else {
this._goToConnectedState(); this._goToConnectedState();
} }
} } else if (this._connectingPhase === ConnectingPhase.Authentication || this._connectingPhase === ConnectingPhase.Connected) {
else if (this._connectingPhase === ConnectingPhase.Authentication || this._connectingPhase === ConnectingPhase.Connected)
this._manageHeartbeats(); this._manageHeartbeats();
}
break; break;
case ConnectionState.Connected: case ConnectionState.Connected:
// operations timeouts are checked only if connection is established and check period time passed // operations timeouts are checked only if connection is established and check period time passed
@ -677,8 +674,7 @@ EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() {
if (this._connection === null) return; if (this._connection === null) return;
var timeout = this._heartbeatInfo.isIntervalStage ? this._settings.heartbeatInterval : this._settings.heartbeatTimeout; var timeout = this._heartbeatInfo.isIntervalStage ? this._settings.heartbeatInterval : this._settings.heartbeatTimeout;
if ((Date.now() - this._heartbeatInfo.timeStamp) < timeout) if ((Date.now() - this._heartbeatInfo.timeStamp) < timeout) return;
return;
var packageNumber = this._packageNumber; var packageNumber = this._packageNumber;
if (this._heartbeatInfo.lastPackageNumber !== packageNumber) if (this._heartbeatInfo.lastPackageNumber !== packageNumber)
@ -711,15 +707,17 @@ EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() {
EventStoreConnectionLogicHandler.prototype._logDebug = function(message) { EventStoreConnectionLogicHandler.prototype._logDebug = function(message) {
if (!this._settings.verboseLogging) return; if (!this._settings.verboseLogging) return;
if (arguments.length > 1) if (arguments.length > 1) {
message = util.format.apply(util, Array.prototype.slice.call(arguments)); message = util.format.apply(util, Array.prototype.slice.call(arguments));
}
this._settings.log.debug("EventStoreConnection '%s': %s", this._esConnection.connectionName, message); this._settings.log.debug("EventStoreConnection '%s': %s", this._esConnection.connectionName, message);
}; };
EventStoreConnectionLogicHandler.prototype._logInfo = function(message){ EventStoreConnectionLogicHandler.prototype._logInfo = function(message){
if (arguments.length > 1) if (arguments.length > 1) {
message = util.format.apply(util, Array.prototype.slice.call(arguments)); message = util.format.apply(util, Array.prototype.slice.call(arguments));
}
this._settings.log.info("EventStoreConnection '%s': %s", this._esConnection.connectionName, message); this._settings.log.info("EventStoreConnection '%s': %s", this._esConnection.connectionName, message);
}; };

View File

@ -1,5 +1,4 @@
var util = require('util'); var util = require('util');
var ensure = require('../common/utils/ensure');
function Message() { function Message() {
} }

View File

@ -110,8 +110,7 @@ OperationsManager.prototype.checkTimeoutsAndRetry = function(connection) {
}; };
OperationsManager.prototype.scheduleOperationRetry = function(operation) { OperationsManager.prototype.scheduleOperationRetry = function(operation) {
if (!this.removeOperation(operation)) if (!this.removeOperation(operation)) return;
return;
this._logDebug("ScheduleOperationRetry for %s.", operation); this._logDebug("ScheduleOperationRetry for %s.", operation);
if (operation.maxRetries >= 0 && operation.retryCount >= operation.maxRetries) if (operation.maxRetries >= 0 && operation.retryCount >= operation.maxRetries)
@ -166,8 +165,9 @@ OperationsManager.prototype.scheduleOperation = function(operation, connection)
OperationsManager.prototype._logDebug = function(message) { OperationsManager.prototype._logDebug = function(message) {
if (!this._settings.verboseLogging) return; if (!this._settings.verboseLogging) return;
if (arguments.length > 1) if (arguments.length > 1) {
message = util.format.apply(util, Array.prototype.slice.call(arguments)); message = util.format.apply(util, Array.prototype.slice.call(arguments));
}
this._settings.log.debug("EventStoreConnection '%s': %s.", this._connectionName, message); this._settings.log.debug("EventStoreConnection '%s': %s.", this._connectionName, message);
}; };

View File

@ -1,8 +1,6 @@
function typeName(t) { function typeName(t) {
if (typeof t === 'function') if (typeof t === 'function') return t.name;
return t.name; if (typeof t === 'object') return t.constructor.name;
if (typeof t === 'object')
return t.constructor.name;
throw new TypeError('type must be a function or object, not ' + typeof t); throw new TypeError('type must be a function or object, not ' + typeof t);
} }
@ -36,8 +34,7 @@ SimpleQueuedHandler.prototype._processQueue = function() {
while(message) { while(message) {
var typeId = typeName(message); var typeId = typeName(message);
var handler = this._handlers[typeId]; var handler = this._handlers[typeId];
if (!handler) if (!handler) throw new Error("No handler registered for message " + typeId);
throw new Error("No handler registered for message " + typeId);
setImmediate(handler, message); setImmediate(handler, message);
message = this._messages.shift(); message = this._messages.shift();
} }

View File

@ -1,5 +1,3 @@
var uuid = require('uuid');
const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
function isValidId(id) { function isValidId(id) {
if (typeof id !== 'string') return false; if (typeof id !== 'string') return false;

View File

@ -45,8 +45,7 @@ EventStoreAllCatchUpSubscription.prototype._readEventsTill = function(
}); });
}) })
.then(function(done) { .then(function(done) {
if (done || self._shouldStop) if (done || self._shouldStop) return;
return;
return readNext(); return readNext();
}); });
} }
@ -69,10 +68,11 @@ EventStoreAllCatchUpSubscription.prototype._tryProcess = function(e) {
this._lastProcessedPosition = e.originalPosition; this._lastProcessedPosition = e.originalPosition;
processed = true; processed = true;
} }
if (this._verbose) if (this._verbose) {
this._log.debug("Catch-up Subscription to %s: %s event (%s, %d, %s @ %s).", this._log.debug("Catch-up Subscription to %s: %s event (%s, %d, %s @ %s).",
this.streamId || '<all>', processed ? "processed" : "skipping", this.streamId || '<all>', processed ? "processed" : "skipping",
e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalPosition); e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalPosition);
}
return (promise && promise.then) ? promise : Promise.resolve(); return (promise && promise.then) ? promise : Promise.resolve();
}; };

View File

@ -1,7 +1,6 @@
var util = require('util'); var util = require('util');
var SubscriptionDropReason = require('./subscriptionDropReason'); var SubscriptionDropReason = require('./subscriptionDropReason');
var results = require('./results');
const DefaultReadBatchSize = 500; const DefaultReadBatchSize = 500;
const DefaultMaxPushQueueSize = 10000; const DefaultMaxPushQueueSize = 10000;
@ -68,9 +67,9 @@ function EventStoreCatchUpSubscription(
var self = this; var self = this;
this._onReconnect = function() { this._onReconnect = function() {
if (self._verbose) self._log.debug("Catch-up Subscription to %s: recovering after reconnection.", self._streamId || '<all>');
if (self._verbose) self._log.debug("Catch-up Subscription to %s: unhooking from connection.Connected.", self._streamId || '<all>'); if (self._verbose) self._log.debug("Catch-up Subscription to %s: unhooking from connection.Connected.", self._streamId || '<all>');
self._connection.removeListener('connected', self._onReconnect); self._connection.removeListener('connected', self._onReconnect);
if (self._verbose) self._log.debug("Catch-up Subscription to %s: recovering after reconnection.", self._streamId || '<all>');
self._runSubscription(); self._runSubscription();
} }
} }
@ -133,10 +132,11 @@ EventStoreCatchUpSubscription.prototype._runSubscription = function() {
.then(function() { .then(function() {
if (self._shouldStop) return; if (self._shouldStop) return;
if (self._verbose) self._log.debug("Catch-up Subscription to %s: subscribing...", logStreamName); if (self._verbose) self._log.debug("Catch-up Subscription to %s: subscribing...", logStreamName);
if (self._streamId === '') if (self._streamId === '') {
return self._connection.subscribeToAll(self._resolveLinkTos, self._enqueuePushedEvent.bind(self), self._serverSubscriptionDropped.bind(self), self._userCredentials); return self._connection.subscribeToAll(self._resolveLinkTos, self._enqueuePushedEvent.bind(self), self._serverSubscriptionDropped.bind(self), self._userCredentials);
else } else {
return self._connection.subscribeToStream(self._streamId, self._resolveLinkTos, self._enqueuePushedEvent.bind(self), self._serverSubscriptionDropped.bind(self), self._userCredentials); return self._connection.subscribeToStream(self._streamId, self._resolveLinkTos, self._enqueuePushedEvent.bind(self), self._serverSubscriptionDropped.bind(self), self._userCredentials);
}
}) })
.then(function(subscription) { .then(function(subscription) {
if (subscription === undefined) return; if (subscription === undefined) return;
@ -155,12 +155,13 @@ EventStoreCatchUpSubscription.prototype._runSubscription = function() {
return; return;
} }
if (self._verbose) self._log.debug("Catch-up Subscription to %s: processing live events...", logStreamName); if (self._verbose) self._log.debug("Catch-up Subscription to %s: processing live events...", logStreamName);
if (self._liveProcessingStarted) if (self._liveProcessingStarted) {
try { try {
self._liveProcessingStarted(self); self._liveProcessingStarted(self);
} catch(e) { } catch (e) {
self._log.error(e, "Catch-up Subscription to %s: liveProcessingStarted callback failed.", logStreamName); self._log.error(e, "Catch-up Subscription to %s: liveProcessingStarted callback failed.", logStreamName);
} }
}
if (self._verbose) self._log.debug("Catch-up Subscription to %s: hooking to connection.Connected", logStreamName); if (self._verbose) self._log.debug("Catch-up Subscription to %s: hooking to connection.Connected", logStreamName);
self._connection.on('connected', self._onReconnect); self._connection.on('connected', self._onReconnect);
self._allowProcessing = true; self._allowProcessing = true;
@ -169,10 +170,11 @@ EventStoreCatchUpSubscription.prototype._runSubscription = function() {
}; };
EventStoreCatchUpSubscription.prototype._enqueuePushedEvent = function(subscription, e) { EventStoreCatchUpSubscription.prototype._enqueuePushedEvent = function(subscription, e) {
if (this._verbose) if (this._verbose) {
this._log.debug("Catch-up Subscription to %s: event appeared (%s, %d, %s @ %s).", this._log.debug("Catch-up Subscription to %s: event appeared (%s, %d, %s @ %s).",
this._streamId || '<all>', this._streamId || '<all>',
e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition); e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition);
}
if (this._liveQueue.length >= this.maxPushQueueSize) if (this._liveQueue.length >= this.maxPushQueueSize)
{ {
@ -183,8 +185,7 @@ EventStoreCatchUpSubscription.prototype._enqueuePushedEvent = function(subscript
this._liveQueue.push(e); this._liveQueue.push(e);
if (this._allowProcessing) if (this._allowProcessing) this._ensureProcessingPushQueue();
this._ensureProcessingPushQueue();
}; };
EventStoreCatchUpSubscription.prototype._serverSubscriptionDropped = function(subscription, reason, err) { EventStoreCatchUpSubscription.prototype._serverSubscriptionDropped = function(subscription, reason, err) {
@ -196,8 +197,7 @@ EventStoreCatchUpSubscription.prototype._enqueueSubscriptionDropNotification = f
if (this._dropData) return; if (this._dropData) return;
this._dropData = {reason: reason, error: error}; this._dropData = {reason: reason, error: error};
this._liveQueue.push(new DropSubscriptionEvent()); this._liveQueue.push(new DropSubscriptionEvent());
if (this._allowProcessing) if (this._allowProcessing) this._ensureProcessingPushQueue();
this._ensureProcessingPushQueue();
}; };
EventStoreCatchUpSubscription.prototype._ensureProcessingPushQueue = function() { EventStoreCatchUpSubscription.prototype._ensureProcessingPushQueue = function() {
@ -244,18 +244,19 @@ EventStoreCatchUpSubscription.prototype._dropSubscription = function(reason, err
if (this._isDropped) return; if (this._isDropped) return;
this._isDropped = true; this._isDropped = true;
if (this._verbose) if (this._verbose) {
this._log.debug("Catch-up Subscription to %s: dropping subscription, reason: %s %s.", this._log.debug("Catch-up Subscription to %s: dropping subscription, reason: %s %s.",
this._streamId || '<all>', reason, error); this._streamId || '<all>', reason, error);
}
if (this._subscription) if (this._subscription) this._subscription.unsubscribe();
this._subscription.unsubscribe(); if (this._subscriptionDropped) {
if (this._subscriptionDropped)
try { try {
this._subscriptionDropped(this, reason, error); this._subscriptionDropped(this, reason, error);
} catch(e) { } catch (e) {
this._log.error(e, "Catch-up Subscription to %s: subscriptionDropped callback failed.", this._streamId || '<all>'); this._log.error(e, "Catch-up Subscription to %s: subscriptionDropped callback failed.", this._streamId || '<all>');
} }
}
this._stopped = true; this._stopped = true;
}; };

View File

@ -4,7 +4,7 @@ var ClusterDnsEndPointDiscoverer = require('./core/clusterDnsEndPointDiscoverer'
var NoopLogger = require('./common/log/noopLogger'); var NoopLogger = require('./common/log/noopLogger');
var ensure = require('./common/utils/ensure'); var ensure = require('./common/utils/ensure');
var defaultConnectionSettings = { var defaultConnectionSettings = Object.freeze({
log: new NoopLogger(), log: new NoopLogger(),
verboseLogging: false, verboseLogging: false,
@ -34,7 +34,7 @@ var defaultConnectionSettings = {
maxDiscoverAttempts: 10, maxDiscoverAttempts: 10,
externalGossipPort: 0, externalGossipPort: 0,
gossipTimeout: 1000 gossipTimeout: 1000
}; });
function merge(a,b) { function merge(a,b) {

View File

@ -140,8 +140,7 @@ EventStoreNodeConnection.prototype.appendToStream = function(stream, expectedVer
ensure.notNullOrEmpty(stream, "stream"); ensure.notNullOrEmpty(stream, "stream");
ensure.isLongOrInteger(expectedVersion, "expectedVersion"); ensure.isLongOrInteger(expectedVersion, "expectedVersion");
expectedVersion = Long.fromValue(expectedVersion); expectedVersion = Long.fromValue(expectedVersion);
if (!Array.isArray(events)) if (!Array.isArray(events)) events = [events];
events = [events];
ensure.isArrayOf(EventData, events, "events"); ensure.isArrayOf(EventData, events, "events");
userCredentials = userCredentials || null; userCredentials = userCredentials || null;
@ -413,8 +412,7 @@ EventStoreNodeConnection.prototype.subscribeToStream = function(
) { ) {
ensure.notNullOrEmpty(stream, "stream"); ensure.notNullOrEmpty(stream, "stream");
ensure.isTypeOf(Function, eventAppeared, "eventAppeared"); ensure.isTypeOf(Function, eventAppeared, "eventAppeared");
if (subscriptionDropped) if (subscriptionDropped) ensure.isTypeOf(Function, subscriptionDropped, "subscriptionDropped");
ensure.isTypeOf(Function, subscriptionDropped, "subscriptionDropped");
var self = this; var self = this;
return new Promise(function(resolve,reject) { return new Promise(function(resolve,reject) {
@ -641,8 +639,9 @@ EventStoreNodeConnection.prototype.setStreamMetadataRaw = function(
stream, expectedMetastreamVersion, metadata, userCredentials stream, expectedMetastreamVersion, metadata, userCredentials
) { ) {
ensure.notNullOrEmpty(stream, "stream"); ensure.notNullOrEmpty(stream, "stream");
if (systemStreams.isMetastream(stream)) if (systemStreams.isMetastream(stream)) {
throw new Error(util.format("Setting metadata for metastream '%s' is not supported.", stream)); throw new Error(util.format("Setting metadata for metastream '%s' is not supported.", stream));
}
ensure.isLongOrInteger(expectedMetastreamVersion, "expectedMetastreamVersion"); ensure.isLongOrInteger(expectedMetastreamVersion, "expectedMetastreamVersion");
expectedMetastreamVersion = Long.fromValue(expectedMetastreamVersion); expectedMetastreamVersion = Long.fromValue(expectedMetastreamVersion);
var self = this; var self = this;
@ -713,7 +712,7 @@ EventStoreNodeConnection.prototype._enqueueOperation = function(operation) {
var message = new messages.StartOperationMessage(operation, self._settings.maxRetries, self._settings.operationTimeout); var message = new messages.StartOperationMessage(operation, self._settings.maxRetries, self._settings.operationTimeout);
function tryEnqueue() { function tryEnqueue() {
if (self._handler.totalOperationCount >= self._settings.maxQueueSize) { if (self._handler.totalOperationCount >= self._settings.maxQueueSize) {
setImmediate(tryEnqueue); setTimeout(tryEnqueue, 0);
return; return;
} }
self._handler.enqueueMessage(message); self._handler.enqueueMessage(message);

View File

@ -56,8 +56,7 @@ EventStorePersistentSubscriptionBase.prototype.acknowledge = function(events) {
ensure.notNull(events, "events"); ensure.notNull(events, "events");
if (this._subscription === null) throw new Error("Invalid operation. Subscription is not active yet."); if (this._subscription === null) throw new Error("Invalid operation. Subscription is not active yet.");
if (!Array.isArray(events)) if (!Array.isArray(events)) events = [events];
events = [events];
var ids = events.map(function(x) { return x.originalEvent.eventId; }); var ids = events.map(function(x) { return x.originalEvent.eventId; });
this._subscription.notifyEventsProcessed(ids); this._subscription.notifyEventsProcessed(ids);
}; };
@ -73,8 +72,7 @@ EventStorePersistentSubscriptionBase.prototype.fail = function(events, action, r
ensure.notNull(reason, "reason"); ensure.notNull(reason, "reason");
if (this._subscription === null) throw new Error("Invalid operation. Subscription is not active yet."); if (this._subscription === null) throw new Error("Invalid operation. Subscription is not active yet.");
if (!Array.isArray(events)) if (!Array.isArray(events)) events = [events];
events = [events];
var ids = events.map(function(x) { return x.originalEvent.eventId; }); var ids = events.map(function(x) { return x.originalEvent.eventId; });
this._subscription.notifyEventsFailed(ids, action, reason); this._subscription.notifyEventsFailed(ids, action, reason);
}; };
@ -145,12 +143,12 @@ EventStorePersistentSubscriptionBase.prototype._processQueue = function() {
return self._eventAppeared(self, ev); return self._eventAppeared(self, ev);
}) })
.then(function() { .then(function() {
if(self._autoAck) if(self._autoAck) self._subscription.notifyEventsProcessed([ev.originalEvent.eventId]);
self._subscription.notifyEventsProcessed([ev.originalEvent.eventId]); if (self._verbose) {
if (self._verbose)
self._log.debug("Persistent Subscription to %s: processed event (%s, %d, %s @ %d).", self._log.debug("Persistent Subscription to %s: processed event (%s, %d, %s @ %d).",
self._streamId, ev.originalEvent.eventStreamId, ev.originalEvent.eventNumber, ev.originalEvent.eventType, self._streamId, ev.originalEvent.eventStreamId, ev.originalEvent.eventNumber, ev.originalEvent.eventType,
ev.originalEventNumber); ev.originalEventNumber);
}
return false; return false;
}, function(err) { }, function(err) {
//TODO GFY should we autonak here? //TODO GFY should we autonak here?
@ -168,12 +166,12 @@ EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reas
if (!this._isDropped) if (!this._isDropped)
{ {
this._isDropped = true; this._isDropped = true;
if (this._verbose) if (this._verbose) {
this._log.debug("Persistent Subscription to %s: dropping subscription, reason: %s %s.", this._log.debug("Persistent Subscription to %s: dropping subscription, reason: %s %s.",
this._streamId, reason, error); this._streamId, reason, error);
}
if (this._subscription !== null) if (this._subscription !== null) this._subscription.unsubscribe();
this._subscription.unsubscribe();
if (this._subscriptionDropped !== null) { if (this._subscriptionDropped !== null) {
try { try {
this._subscriptionDropped(this, reason, error); this._subscriptionDropped(this, reason, error);

View File

@ -49,19 +49,18 @@ EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function(
.then(function() { .then(function() {
self._nextReadEventNumber = slice.nextEventNumber; self._nextReadEventNumber = slice.nextEventNumber;
var done = Promise.resolve(lastEventNumber === null ? slice.isEndOfStream : slice.nextEventNumber.compare(lastEventNumber) > 0); var done = Promise.resolve(lastEventNumber === null ? slice.isEndOfStream : slice.nextEventNumber.compare(lastEventNumber) > 0);
if (!done && slice.isEndOfStream) if (!done && slice.isEndOfStream) return delay(100, false);
return delay(100, false);
return done; return done;
}); });
break;
case SliceReadStatus.StreamNotFound: case SliceReadStatus.StreamNotFound:
if (lastEventNumber && lastEventNumber.compare(-1) !== 0) if (lastEventNumber && lastEventNumber.compare(-1) !== 0) {
throw new Error(util.format("Impossible: stream %s disappeared in the middle of catching up subscription.", self.streamId)); throw new Error(util.format("Impossible: stream %s disappeared in the middle of catching up subscription.", self.streamId));
}
return true; return true;
case SliceReadStatus.StreamDeleted: case SliceReadStatus.StreamDeleted:
throw new Error("Stream deleted: " + self.streamId); throw new Error("Stream deleted: " + self.streamId);
default: default:
throw new Error("Unexpected StreamEventsSlice.Status: %s.", slice.status); throw new Error(util.format("Unexpected StreamEventsSlice.Status: %s.", slice.status));
} }
}) })
.then(function(done) { .then(function(done) {
@ -72,9 +71,10 @@ EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function(
} }
return readNext() return readNext()
.then(function() { .then(function() {
if (self._verbose) if (self._verbose) {
self._log.debug("Catch-up Subscription to %s: finished reading events, nextReadEventNumber = %d.", self._log.debug("Catch-up Subscription to %s: finished reading events, nextReadEventNumber = %d.",
self.isSubscribedToAll ? '<all>' : self.streamId, self._nextReadEventNumber); self.isSubscribedToAll ? '<all>' : self.streamId, self._nextReadEventNumber);
}
}); });
}; };
@ -86,10 +86,11 @@ EventStoreStreamCatchUpSubscription.prototype._tryProcess = function(e) {
this._lastProcessedEventNumber = e.originalEventNumber; this._lastProcessedEventNumber = e.originalEventNumber;
processed = true; processed = true;
} }
if (this._verbose) if (this._verbose) {
this._log.debug("Catch-up Subscription to %s: %s event (%s, %d, %s @ %d).", this._log.debug("Catch-up Subscription to %s: %s event (%s, %d, %s @ %d).",
this.isSubscribedToAll ? '<all>' : this.streamId, processed ? "processed" : "skipping", this.isSubscribedToAll ? '<all>' : this.streamId, processed ? "processed" : "skipping",
e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalEventNumber); e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalEventNumber);
}
return (promise && promise.then) ? promise : Promise.resolve(); return (promise && promise.then) ? promise : Promise.resolve();
}; };

View File

@ -1,6 +1,8 @@
module.exports = function GossipSeed(endPoint, hostName) { function GossipSeed(endPoint, hostName) {
if (typeof endPoint !== 'object' || !endPoint.host || !endPoint.port) throw new TypeError('endPoint must be have host and port properties.'); if (typeof endPoint !== 'object' || !endPoint.host || !endPoint.port) throw new TypeError('endPoint must be have host and port properties.');
this.endPoint = endPoint; this.endPoint = endPoint;
this.hostName = hostName; this.hostName = hostName;
Object.freeze(this); Object.freeze(this);
}; }
module.exports = GossipSeed;

View File

@ -1,14 +1,15 @@
const PersistentSubscriptionNakEventAction = { const PersistentSubscriptionNakEventAction = Object.freeze({
Unknown: 0, Unknown: 0,
Park: 1, Park: 1,
Retry: 2, Retry: 2,
Skip: 3, Skip: 3,
Stop: 4 Stop: 4,
}; isValid: function(value) {
for(var k in PersistentSubscriptionNakEventAction) {
if (PersistentSubscriptionNakEventAction[k] === value) return true;
}
return false;
}
});
module.exports = PersistentSubscriptionNakEventAction; module.exports = PersistentSubscriptionNakEventAction;
module.exports.isValid = function(value) {
for(var k in PersistentSubscriptionNakEventAction)
if (PersistentSubscriptionNakEventAction[k] === value) return true;
return false;
};

View File

@ -1,6 +1,6 @@
const ReadDirection = { const ReadDirection = Object.freeze({
Forward: 'forward', Forward: 'forward',
Backward: 'backward' Backward: 'backward'
}; });
module.exports = ReadDirection; module.exports = ReadDirection;

View File

@ -19,10 +19,12 @@ function Position(commitPosition, preparePosition) {
} }
Position.prototype.compareTo = function(other) { Position.prototype.compareTo = function(other) {
if (this.commitPosition.lt(other.commitPosition) || (this.commitPosition.eq(other.commitPosition)&& this.preparePosition.lt(other.preparePosition))) if (this.commitPosition.lt(other.commitPosition) || (this.commitPosition.eq(other.commitPosition)&& this.preparePosition.lt(other.preparePosition))) {
return -1; return -1;
if (this.commitPosition.gt(other.commitPosition) || (this.commitPosition.eq(other.commitPosition) && this.preparePosition.gt(other.preparePosition))) }
if (this.commitPosition.gt(other.commitPosition) || (this.commitPosition.eq(other.commitPosition) && this.preparePosition.gt(other.preparePosition))) {
return 1; return 1;
}
return 0; return 0;
}; };
@ -33,13 +35,12 @@ Position.prototype.toString = function() {
Position.start = new Position(0,0); Position.start = new Position(0,0);
Position.end = new Position(-1,-1); Position.end = new Position(-1,-1);
const EventReadStatus = { const EventReadStatus = Object.freeze({
Success: 'success', Success: 'success',
NotFound: 'notFound', NotFound: 'notFound',
NoStream: 'noStream', NoStream: 'noStream',
StreamDeleted: 'streamDeleted' StreamDeleted: 'streamDeleted'
}; });
Object.freeze(EventReadStatus);
/** /**
* @param {object} ev * @param {object} ev
@ -204,12 +205,11 @@ function RawStreamMetadataResult(stream, isStreamDeleted, metastreamVersion, str
Object.freeze(this); Object.freeze(this);
} }
const PersistentSubscriptionCreateStatus = { const PersistentSubscriptionCreateStatus = Object.freeze({
Success: 'success', Success: 'success',
NotFound: 'notFound', NotFound: 'notFound',
Failure: 'failure' Failure: 'failure'
}; });
Object.freeze(PersistentSubscriptionCreateStatus);
/** /**
* @param {string} status * @param {string} status
@ -221,13 +221,12 @@ function PersistentSubscriptionCreateResult(status) {
Object.freeze(this); Object.freeze(this);
} }
const PersistentSubscriptionUpdateStatus = { const PersistentSubscriptionUpdateStatus = Object.freeze({
Success: 'success', Success: 'success',
NotFound: 'notFound', NotFound: 'notFound',
Failure: 'failure', Failure: 'failure',
AccessDenied: 'accessDenied' AccessDenied: 'accessDenied'
}; });
Object.freeze(PersistentSubscriptionUpdateStatus);
/** /**
* @param {string} status * @param {string} status
@ -239,11 +238,10 @@ function PersistentSubscriptionUpdateResult(status) {
Object.freeze(this); Object.freeze(this);
} }
const PersistentSubscriptionDeleteStatus = { const PersistentSubscriptionDeleteStatus = Object.freeze({
Success: 'success', Success: 'success',
Failure: 'failure' Failure: 'failure'
}; });
Object.freeze(PersistentSubscriptionDeleteStatus);
/** /**
* @param {string} status * @param {string} status
@ -256,18 +254,18 @@ function PersistentSubscriptionDeleteResult(status) {
} }
// Exports Constructors // Exports Constructors
module.exports.Position = Position; exports.Position = Position;
module.exports.ResolvedEvent = ResolvedEvent; exports.ResolvedEvent = ResolvedEvent;
module.exports.EventReadStatus = EventReadStatus; exports.EventReadStatus = EventReadStatus;
module.exports.EventReadResult = EventReadResult; exports.EventReadResult = EventReadResult;
module.exports.WriteResult = WriteResult; exports.WriteResult = WriteResult;
module.exports.StreamEventsSlice = StreamEventsSlice; exports.StreamEventsSlice = StreamEventsSlice;
module.exports.AllEventsSlice = AllEventsSlice; exports.AllEventsSlice = AllEventsSlice;
module.exports.DeleteResult = DeleteResult; exports.DeleteResult = DeleteResult;
module.exports.RawStreamMetadataResult = RawStreamMetadataResult; exports.RawStreamMetadataResult = RawStreamMetadataResult;
module.exports.PersistentSubscriptionCreateResult = PersistentSubscriptionCreateResult; exports.PersistentSubscriptionCreateResult = PersistentSubscriptionCreateResult;
module.exports.PersistentSubscriptionCreateStatus = PersistentSubscriptionCreateStatus; exports.PersistentSubscriptionCreateStatus = PersistentSubscriptionCreateStatus;
module.exports.PersistentSubscriptionUpdateResult = PersistentSubscriptionUpdateResult; exports.PersistentSubscriptionUpdateResult = PersistentSubscriptionUpdateResult;
module.exports.PersistentSubscriptionUpdateStatus = PersistentSubscriptionUpdateStatus; exports.PersistentSubscriptionUpdateStatus = PersistentSubscriptionUpdateStatus;
module.exports.PersistentSubscriptionDeleteResult = PersistentSubscriptionDeleteResult; exports.PersistentSubscriptionDeleteResult = PersistentSubscriptionDeleteResult;
module.exports.PersistentSubscriptionDeleteStatus = PersistentSubscriptionDeleteStatus; exports.PersistentSubscriptionDeleteStatus = PersistentSubscriptionDeleteStatus;

View File

@ -1,8 +1,7 @@
const SliceReadStatus = { const SliceReadStatus = Object.freeze({
Success: 'success', Success: 'success',
StreamNotFound: 'streamNotFound', StreamNotFound: 'streamNotFound',
StreamDeleted: 'streamDeleted' StreamDeleted: 'streamDeleted'
}; });
Object.freeze(SliceReadStatus);
module.exports = SliceReadStatus; module.exports = SliceReadStatus;

View File

@ -1,4 +1,4 @@
const SubscriptionDropReason = { const SubscriptionDropReason = Object.freeze({
AccessDenied: 'accessDenied', AccessDenied: 'accessDenied',
CatchUpError: 'catchUpError', CatchUpError: 'catchUpError',
ConnectionClosed: 'connectionClosed', ConnectionClosed: 'connectionClosed',
@ -11,6 +11,6 @@ const SubscriptionDropReason = {
SubscribingError: 'subscribingError', SubscribingError: 'subscribingError',
UserInitiated: 'userInitiated', UserInitiated: 'userInitiated',
Unknown: 'unknown' Unknown: 'unknown'
}; });
module.exports = SubscriptionDropReason; module.exports = SubscriptionDropReason;

View File

@ -1,8 +1,7 @@
const SystemConsumerStrategies = { const SystemConsumerStrategies = Object.freeze({
DispatchToSingle: 'DispatchToSingle', DispatchToSingle: 'DispatchToSingle',
RoundRobin: 'RoundRobin', RoundRobin: 'RoundRobin',
Pinned: 'Pinned' Pinned: 'Pinned'
}; });
Object.freeze(SystemConsumerStrategies);
module.exports = SystemConsumerStrategies; module.exports = SystemConsumerStrategies;

View File

@ -1,9 +1,9 @@
var InspectionDecision = { var InspectionDecision = Object.freeze({
DoNothing: 'doNothing', DoNothing: 'doNothing',
EndOperation: 'endOperation', EndOperation: 'endOperation',
Retry: 'retry', Retry: 'retry',
Reconnect: 'reconnect', Reconnect: 'reconnect',
Subscribed: 'subscribed' Subscribed: 'subscribed'
}; });
module.exports = InspectionDecision; module.exports = InspectionDecision;

View File

@ -1,9 +1,7 @@
var ClientMessage = require('../messages/clientMessage'); var ClientMessage = require('../messages/clientMessage');
var SliceReadStatus = require('../sliceReadStatus'); var SliceReadStatus = require('../sliceReadStatus');
module.exports = {}; exports.convert = function(code) {
module.exports.convert = function(code) {
switch(code) { switch(code) {
case ClientMessage.ReadStreamEventsCompleted.ReadStreamResult.Success: case ClientMessage.ReadStreamEventsCompleted.ReadStreamResult.Success:
return SliceReadStatus.Success; return SliceReadStatus.Success;

View File

@ -1,4 +1,4 @@
const TcpCommand = { const TcpCommand = Object.freeze({
HeartbeatRequestCommand: 0x01, HeartbeatRequestCommand: 0x01,
HeartbeatResponseCommand: 0x02, HeartbeatResponseCommand: 0x02,
@ -73,16 +73,18 @@ const TcpCommand = {
Authenticated: 0xF3, Authenticated: 0xF3,
NotAuthenticated: 0xF4, NotAuthenticated: 0xF4,
IdentifyClient: 0xF5, IdentifyClient: 0xF5,
ClientIdentified: 0xF6 ClientIdentified: 0xF6,
};
getName: function(v) {
return _reverseLookup[v];
}
});
var _reverseLookup = {}; var _reverseLookup = {};
for(var n in TcpCommand) { for(var n in TcpCommand) {
if (n === 'getName') continue;
var v = TcpCommand[n]; var v = TcpCommand[n];
_reverseLookup[v] = n; _reverseLookup[v] = n;
} }
module.exports = TcpCommand; module.exports = TcpCommand;
module.exports.getName = function(v) {
return _reverseLookup[v];
};

View File

@ -1,6 +1,6 @@
const TcpFlags = { const TcpFlags = Object.freeze({
None: 0x0, None: 0x0,
Authenticated: 0x01 Authenticated: 0x01
}; });
module.exports = TcpFlags; module.exports = TcpFlags;

View File

@ -19,8 +19,7 @@ function TcpPackage(command, flags, correlationId, login, password, data) {
} }
TcpPackage.fromBufferSegment = function(data) { TcpPackage.fromBufferSegment = function(data) {
if (data.length < MandatorySize) if (data.length < MandatorySize) throw new Error("ArraySegment too short, length: " + data.length);
throw new Error("ArraySegment too short, length: " + data.length);
var command = data.buffer[data.offset + CommandOffset]; var command = data.buffer[data.offset + CommandOffset];
var flags = data.buffer[data.offset + FlagsOffset]; var flags = data.buffer[data.offset + FlagsOffset];
@ -32,13 +31,15 @@ TcpPackage.fromBufferSegment = function(data) {
if ((flags & TcpFlags.Authenticated) !== 0) if ((flags & TcpFlags.Authenticated) !== 0)
{ {
var loginLen = data.buffer[data.offset + AuthOffset]; var loginLen = data.buffer[data.offset + AuthOffset];
if (AuthOffset + 1 + loginLen + 1 >= data.count) if (AuthOffset + 1 + loginLen + 1 >= data.count) {
throw new Error("Login length is too big, it doesn't fit into TcpPackage."); throw new Error("Login length is too big, it doesn't fit into TcpPackage.");
}
login = data.buffer.toString('utf8', data.offset + AuthOffset + 1, data.offset + AuthOffset + 1 + loginLen); login = data.buffer.toString('utf8', data.offset + AuthOffset + 1, data.offset + AuthOffset + 1 + loginLen);
var passLen = data.buffer[data.offset + AuthOffset + 1 + loginLen]; var passLen = data.buffer[data.offset + AuthOffset + 1 + loginLen];
if (AuthOffset + 1 + loginLen + 1 + passLen > data.count) if (AuthOffset + 1 + loginLen + 1 + passLen > data.count) {
throw new Error("Password length is too big, it doesn't fit into TcpPackage."); throw new Error("Password length is too big, it doesn't fit into TcpPackage.");
}
headerSize += 1 + loginLen + 1 + passLen; headerSize += 1 + loginLen + 1 + passLen;
pass = data.buffer.toString('utf8', data.offset + AuthOffset + 1 + loginLen + 1, data.offset + headerSize); pass = data.buffer.toString('utf8', data.offset + AuthOffset + 1 + loginLen + 1, data.offset + headerSize);
} }
@ -64,8 +65,7 @@ TcpPackage.prototype.asBuffer = function() {
res[AuthOffset + 1 + loginBytes.length] = passwordBytes.length; res[AuthOffset + 1 + loginBytes.length] = passwordBytes.length;
passwordBytes.copy(res, AuthOffset + 2 + loginBytes.length); passwordBytes.copy(res, AuthOffset + 2 + loginBytes.length);
if (this.data) if (this.data) this.data.copyTo(res, res.length - this.data.count);
this.data.copyTo(res, res.length - this.data.count);
return res; return res;
} else { } else {
@ -73,8 +73,7 @@ TcpPackage.prototype.asBuffer = function() {
res[CommandOffset] = this.command; res[CommandOffset] = this.command;
res[FlagsOffset] = this.flags; res[FlagsOffset] = this.flags;
guidParse.parse(this.correlationId, res, CorrelationOffset); guidParse.parse(this.correlationId, res, CorrelationOffset);
if (this.data) if (this.data) this.data.copyTo(res, AuthOffset);
this.data.copyTo(res, AuthOffset);
return res; return res;
} }
}; };

View File

@ -31,8 +31,9 @@ LengthPrefixMessageFramer.prototype._parse = function(bytes) {
++this._headerBytes; ++this._headerBytes;
if (this._headerBytes === HeaderLength) if (this._headerBytes === HeaderLength)
{ {
if (this._packageLength <= 0 || this._packageLength > this._maxPackageSize) if (this._packageLength <= 0 || this._packageLength > this._maxPackageSize) {
throw new Error(["Package size is out of bounds: ", this._packageLength, "(max: ", this._maxPackageSize, "."].join('')); throw new Error(["Package size is out of bounds: ", this._packageLength, "(max: ", this._maxPackageSize, "."].join(''));
}
this._messageBuffer = new Buffer(this._packageLength); this._messageBuffer = new Buffer(this._packageLength);
} }
@ -46,8 +47,9 @@ LengthPrefixMessageFramer.prototype._parse = function(bytes) {
if (this._bufferIndex === this._packageLength) if (this._bufferIndex === this._packageLength)
{ {
if (this._receivedHandler !== null) if (this._receivedHandler !== null) {
this._receivedHandler(createBufferSegment(this._messageBuffer, 0, this._bufferIndex)); this._receivedHandler(createBufferSegment(this._messageBuffer, 0, this._bufferIndex));
}
this.reset(); this.reset();
} }
} }

View File

@ -62,14 +62,11 @@ TcpConnection.prototype._trySend = function() {
while(sendPiece = this._sendQueue.shift()) { while(sendPiece = this._sendQueue.shift()) {
buffers.push(sendPiece); buffers.push(sendPiece);
bytes += sendPiece.length; bytes += sendPiece.length;
if (bytes > MaxSendPacketSize) if (bytes > MaxSendPacketSize) break;
break;
} }
var joinedBuffers = Buffer.concat(buffers, bytes); var joinedBuffers = Buffer.concat(buffers, bytes);
if (!this._socket.write(joinedBuffers)) { if (!this._socket.write(joinedBuffers)) return;
return;
}
setImmediate(this._trySend.bind(this)); setImmediate(this._trySend.bind(this));
}; };
@ -97,8 +94,7 @@ TcpConnection.prototype.receive = function(cb) {
}; };
TcpConnection.prototype._tryDequeueReceivedData = function() { TcpConnection.prototype._tryDequeueReceivedData = function() {
if (this._receiveCallback === null || this._receiveQueue.length === 0) if (this._receiveCallback === null || this._receiveQueue.length === 0) return;
return;
var res = []; var res = [];
while(this._receiveQueue.length > 0) { while(this._receiveQueue.length > 0) {
@ -112,8 +108,9 @@ TcpConnection.prototype._tryDequeueReceivedData = function() {
callback(this, res); callback(this, res);
var bytes = 0; var bytes = 0;
for(var i=0;i<res.length;i++) for(var i=0;i<res.length;i++) {
bytes += res[i].count; bytes += res[i].count;
}
//this._pendingReceivedBytes -= bytes; //this._pendingReceivedBytes -= bytes;
}; };
@ -132,8 +129,7 @@ TcpConnection.prototype._closeInternal = function(err, reason) {
this._socket = null; this._socket = null;
} }
if (this._onConnectionClosed !== null) if (this._onConnectionClosed !== null) this._onConnectionClosed(this, err);
this._onConnectionClosed(this, err);
}; };
TcpConnection.createConnectingConnection = function( TcpConnection.createConnectingConnection = function(
@ -155,8 +151,7 @@ TcpConnection.createConnectingConnection = function(
socket.on('connect', function() { socket.on('connect', function() {
socket.removeListener('error', onError); socket.removeListener('error', onError);
connection._initSocket(socket); connection._initSocket(socket);
if (onConnectionEstablished) if (onConnectionEstablished) onConnectionEstablished(connection);
onConnectionEstablished(connection);
}); });
return connection; return connection;
}; };

View File

@ -1,5 +1,4 @@
var util = require('util'); var util = require('util');
var uuid = require('uuid');
var LengthPrefixMessageFramer = require('./lengthPrefixMessageFramer'); var LengthPrefixMessageFramer = require('./lengthPrefixMessageFramer');
var TcpConnection = require('./tcpConnection'); var TcpConnection = require('./tcpConnection');
@ -57,8 +56,7 @@ function TcpPackageConnection(
}, },
function (conn, had_error) { function (conn, had_error) {
var error; var error;
if (had_error) if (had_error) error = new Error('transmission error.');
error = new Error('transmission error.');
log.debug("TcpPackageConnection: connection [%j, L%j, %s] was closed %s", conn.remoteEndPoint, conn.localEndPoint, log.debug("TcpPackageConnection: connection [%j, L%j, %s] was closed %s", conn.remoteEndPoint, conn.localEndPoint,
connectionId, had_error ? "with error: " + error + "." : "cleanly."); connectionId, had_error ? "with error: " + error + "." : "cleanly.");
@ -120,27 +118,23 @@ TcpPackageConnection.prototype._incomingMessageArrived = function(data) {
var message = util.format("TcpPackageConnection: [%j, L%j, %s] ERROR for %s. Connection will be closed.", var message = util.format("TcpPackageConnection: [%j, L%j, %s] ERROR for %s. Connection will be closed.",
this.remoteEndPoint, this.localEndPoint, this._connectionId, this.remoteEndPoint, this.localEndPoint, this._connectionId,
valid ? TcpCommand.getName(pkg.command) : "<invalid package>"); valid ? TcpCommand.getName(pkg.command) : "<invalid package>");
if (this._onError !== null) if (this._onError !== null) this._onError(this, e);
this._onError(this, e);
this._log.debug(e, message); this._log.debug(e, message);
} }
}; };
TcpPackageConnection.prototype.startReceiving = function() { TcpPackageConnection.prototype.startReceiving = function() {
if (this._connection === null) if (this._connection === null) throw new Error("Failed connection.");
throw new Error("Failed connection.");
this._connection.receive(this._onRawDataReceived.bind(this)); this._connection.receive(this._onRawDataReceived.bind(this));
}; };
TcpPackageConnection.prototype.enqueueSend = function(pkg) { TcpPackageConnection.prototype.enqueueSend = function(pkg) {
if (this._connection === null) if (this._connection === null) throw new Error("Failed connection.");
throw new Error("Failed connection.");
this._connection.enqueueSend(this._framer.frameData(pkg.asBufferSegment())); this._connection.enqueueSend(this._framer.frameData(pkg.asBufferSegment()));
}; };
TcpPackageConnection.prototype.close = function(reason) { TcpPackageConnection.prototype.close = function(reason) {
if (this._connection === null) if (this._connection === null) throw new Error("Failed connection.");
throw new Error("Failed connection.");
this._connection.close(reason); this._connection.close(reason);
}; };