From 215708014c53dd779ac53cf49a45c2d4927f2823 Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Mon, 9 Jul 2018 10:27:12 -0700 Subject: [PATCH] Add Object.freeze on private enums Improve code readability --- src/client.js | 44 +++++++-------- .../appendToStreamOperation.js | 5 +- .../commitTransactionOperation.js | 1 - ...onnectToPersistentSubscriptionOperation.js | 2 +- .../createPersistentSubscriptionOperation.js | 3 +- .../deletePersistentSubscriptionOperation.js | 4 +- src/clientOperations/deleteStreamOperation.js | 1 - src/clientOperations/operationBase.js | 10 ++-- .../readAllEventsBackwardOperation.js | 3 +- .../readAllEventsForwardOperation.js | 3 +- src/clientOperations/readEventOperation.js | 3 +- .../readStreamEventsBackwardOperation.js | 3 +- .../readStreamEventsForwardOperation.js | 3 +- .../startTransactionOperation.js | 4 +- src/clientOperations/subscriptionOperation.js | 50 +++++++++-------- .../transactionalWriteOperation.js | 3 +- .../updatePersistentSubscriptionOperation.js | 3 +- src/common/bufferSegment.js | 3 +- src/common/hash.js | 2 +- src/common/systemEventTypes.js | 6 +- src/common/systemMetadata.js | 5 +- src/common/systemStreams.js | 4 +- src/common/utils/ensure.js | 29 ++++------ src/core/clusterDnsEndPointDiscoverer.js | 27 +++++---- src/core/eventStoreConnectionLogicHandler.js | 56 +++++++++---------- src/core/messages.js | 1 - src/core/operationsManager.js | 8 +-- src/core/simpleQueuedHandler.js | 9 +-- src/errors/projectionCommandFailedError.js | 2 +- src/eventData.js | 2 - src/eventStoreAllCatchUpSubscription.js | 10 ++-- src/eventStoreCatchUpSubscription.js | 39 ++++++------- src/eventStoreConnection.js | 6 +- src/eventStoreNodeConnection.js | 13 ++--- src/eventStorePersistentSubscriptionBase.js | 22 ++++---- src/eventStoreStreamCatchUpSubscription.js | 23 ++++---- src/eventStoreSubscription.js | 2 +- src/gossipSeed.js | 6 +- src/persistentEventStoreSubscription.js | 2 +- src/persistentSubscriptionNakEventAction.js | 19 ++++--- src/readDirection.js | 6 +- src/results.js | 56 +++++++++---------- src/sliceReadStatus.js | 7 +-- src/subscriptionDropReason.js | 4 +- src/systemConsumerStrategies.js | 7 +-- src/systemData/inspectionDecision.js | 4 +- src/systemData/inspectionResult.js | 2 +- src/systemData/statusCode.js | 4 +- src/systemData/tcpCommand.js | 16 +++--- src/systemData/tcpFlags.js | 4 +- src/systemData/tcpPackage.js | 19 +++---- .../tcp/lengthPrefixMessageFramer.js | 8 ++- src/transport/tcp/tcpConnection.js | 19 +++---- src/transport/tcp/tcpPackageConnection.js | 16 ++---- src/volatileEventStoreSubscription.js | 2 +- 55 files changed, 288 insertions(+), 327 deletions(-) diff --git a/src/client.js b/src/client.js index 7ca9d09..d5efb9a 100644 --- a/src/client.js +++ b/src/client.js @@ -53,29 +53,29 @@ function createEventData(eventId, type, isJson, data, metadata) { } // Expose classes -module.exports.Position = results.Position; -module.exports.UserCredentials = require('./systemData/userCredentials'); -module.exports.PersistentSubscriptionSettings = require('./persistentSubscriptionSettings'); -module.exports.SystemConsumerStrategies = require('./systemConsumerStrategies'); -module.exports.GossipSeed = require('./gossipSeed'); -module.exports.EventStoreConnection = require('./eventStoreConnection'); -module.exports.ProjectionsManager = require('./projections/projectionsManager'); +exports.Position = results.Position; +exports.UserCredentials = require('./systemData/userCredentials'); +exports.PersistentSubscriptionSettings = require('./persistentSubscriptionSettings'); +exports.SystemConsumerStrategies = require('./systemConsumerStrategies'); +exports.GossipSeed = require('./gossipSeed'); +exports.EventStoreConnection = require('./eventStoreConnection'); +exports.ProjectionsManager = require('./projections/projectionsManager'); // Expose errors -module.exports.WrongExpectedVersionError = require('./errors/wrongExpectedVersionError'); -module.exports.StreamDeletedError = require('./errors/streamDeletedError'); -module.exports.AccessDeniedError = require('./errors/accessDeniedError'); -module.exports.ProjectionCommandFailedError = require('./errors/projectionCommandFailedError'); +exports.WrongExpectedVersionError = require('./errors/wrongExpectedVersionError'); +exports.StreamDeletedError = require('./errors/streamDeletedError'); +exports.AccessDeniedError = require('./errors/accessDeniedError'); +exports.ProjectionCommandFailedError = require('./errors/projectionCommandFailedError'); // Expose enums/constants -module.exports.expectedVersion = expectedVersion; -module.exports.positions = positions; -module.exports.streamPosition = streamPosition; -module.exports.systemMetadata = require('./common/systemMetadata'); -module.exports.eventReadStatus = results.EventReadStatus; -module.exports.sliceReadStatus = require('./sliceReadStatus'); +exports.expectedVersion = expectedVersion; +exports.positions = positions; +exports.streamPosition = streamPosition; +exports.systemMetadata = require('./common/systemMetadata'); +exports.eventReadStatus = results.EventReadStatus; +exports.sliceReadStatus = require('./sliceReadStatus'); // Expose loggers -module.exports.NoopLogger = require('./common/log/noopLogger'); -module.exports.FileLogger = require('./common/log/fileLogger'); +exports.NoopLogger = require('./common/log/noopLogger'); +exports.FileLogger = require('./common/log/fileLogger'); // Expose Helper functions -module.exports.createConnection = require('./eventStoreConnection').create; -module.exports.createJsonEventData = createJsonEventData; -module.exports.createEventData = createEventData; +exports.createConnection = require('./eventStoreConnection').create; +exports.createJsonEventData = createJsonEventData; +exports.createEventData = createEventData; diff --git a/src/clientOperations/appendToStreamOperation.js b/src/clientOperations/appendToStreamOperation.js index 5d748f0..3e556b4 100644 --- a/src/clientOperations/appendToStreamOperation.js +++ b/src/clientOperations/appendToStreamOperation.js @@ -44,8 +44,7 @@ AppendToStreamOperation.prototype._inspectResponse = function(response) { switch (response.result) { case ClientMessage.OperationResult.Success: - if (this._wasCommitTimeout) - this.log.debug("IDEMPOTENT WRITE SUCCEEDED FOR %s.", this); + if (this._wasCommitTimeout) this.log.debug("IDEMPOTENT WRITE SUCCEEDED FOR %s.", this); this._succeed(); return new InspectionResult(InspectionDecision.EndOperation, "Success"); case ClientMessage.OperationResult.PrepareTimeout: @@ -80,4 +79,4 @@ AppendToStreamOperation.prototype.toString = function() { return util.format("Stream: %s, ExpectedVersion: %d", this._stream, this._expectedVersion); }; -module.exports = AppendToStreamOperation; +module.exports = AppendToStreamOperation; \ No newline at end of file diff --git a/src/clientOperations/commitTransactionOperation.js b/src/clientOperations/commitTransactionOperation.js index aeca1ca..56f0d06 100644 --- a/src/clientOperations/commitTransactionOperation.js +++ b/src/clientOperations/commitTransactionOperation.js @@ -1,5 +1,4 @@ var util = require('util'); -var uuid = require('uuid'); var TcpCommand = require('../systemData/tcpCommand'); var InspectionDecision = require('../systemData/inspectionDecision'); diff --git a/src/clientOperations/connectToPersistentSubscriptionOperation.js b/src/clientOperations/connectToPersistentSubscriptionOperation.js index 51431fa..405f21b 100644 --- a/src/clientOperations/connectToPersistentSubscriptionOperation.js +++ b/src/clientOperations/connectToPersistentSubscriptionOperation.js @@ -124,4 +124,4 @@ ConnectToPersistentSubscriptionOperation.prototype.notifyEventsFailed = function this._enqueueSend(pkg); }; -module.exports = ConnectToPersistentSubscriptionOperation; +module.exports = ConnectToPersistentSubscriptionOperation; \ No newline at end of file diff --git a/src/clientOperations/createPersistentSubscriptionOperation.js b/src/clientOperations/createPersistentSubscriptionOperation.js index a087ae9..62286b7 100644 --- a/src/clientOperations/createPersistentSubscriptionOperation.js +++ b/src/clientOperations/createPersistentSubscriptionOperation.js @@ -1,5 +1,4 @@ var util = require('util'); -var uuid = require('uuid'); var ensure = require('../common/utils/ensure'); var OperationBase = require('../clientOperations/operationBase'); @@ -84,4 +83,4 @@ CreatePersistentSubscriptionOperation.prototype.toString = function() { return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName); }; -module.exports = CreatePersistentSubscriptionOperation; +module.exports = CreatePersistentSubscriptionOperation; \ No newline at end of file diff --git a/src/clientOperations/deletePersistentSubscriptionOperation.js b/src/clientOperations/deletePersistentSubscriptionOperation.js index 0bbfd96..18bd732 100644 --- a/src/clientOperations/deletePersistentSubscriptionOperation.js +++ b/src/clientOperations/deletePersistentSubscriptionOperation.js @@ -1,7 +1,5 @@ var util = require('util'); -var uuid = require('uuid'); -var ensure = require('../common/utils/ensure'); var OperationBase = require('../clientOperations/operationBase'); var TcpCommand = require('../systemData/tcpCommand'); var ClientMessage = require('../messages/clientMessage'); @@ -55,4 +53,4 @@ DeletePersistentSubscriptionOperation.prototype.toString = function() { return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName); }; -module.exports = DeletePersistentSubscriptionOperation; +module.exports = DeletePersistentSubscriptionOperation; \ No newline at end of file diff --git a/src/clientOperations/deleteStreamOperation.js b/src/clientOperations/deleteStreamOperation.js index 30b8903..b499f07 100644 --- a/src/clientOperations/deleteStreamOperation.js +++ b/src/clientOperations/deleteStreamOperation.js @@ -1,5 +1,4 @@ var util = require('util'); -var uuid = require('uuid'); var TcpCommand = require('../systemData/tcpCommand'); var InspectionDecision = require('../systemData/inspectionDecision'); diff --git a/src/clientOperations/operationBase.js b/src/clientOperations/operationBase.js index 49f46fd..bdbfccc 100644 --- a/src/clientOperations/operationBase.js +++ b/src/clientOperations/operationBase.js @@ -42,10 +42,11 @@ OperationBase.prototype._succeed = function() { if (!this._completed) { this._completed = true; - if (this._response) + if (this._response) { this._cb(null, this._transformResponse(this._response)); - else + } else { this._cb(new Error("No result.")) + } } }; @@ -130,8 +131,7 @@ OperationBase.prototype._inspectNotHandled = function(pkg) OperationBase.prototype._inspectUnexpectedCommand = function(pkg, expectedCommand) { - if (pkg.command === expectedCommand) - throw new Error("Command shouldn't be " + TcpCommand.getName(pkg.command)); + if (pkg.command === expectedCommand) throw new Error("Command shouldn't be " + TcpCommand.getName(pkg.command)); this.log.error("Unexpected TcpCommand received.\n" + "Expected: %s, Actual: %s, Flags: %s, CorrelationId: %s\n" @@ -145,4 +145,4 @@ OperationBase.prototype._inspectUnexpectedCommand = function(pkg, expectedComman }; -module.exports = OperationBase; +module.exports = OperationBase; \ No newline at end of file diff --git a/src/clientOperations/readAllEventsBackwardOperation.js b/src/clientOperations/readAllEventsBackwardOperation.js index eec1143..6a1620a 100644 --- a/src/clientOperations/readAllEventsBackwardOperation.js +++ b/src/clientOperations/readAllEventsBackwardOperation.js @@ -1,5 +1,4 @@ var util = require('util'); -var uuid = require('uuid'); var TcpCommand = require('../systemData/tcpCommand'); var ClientMessage = require('../messages/clientMessage'); @@ -65,4 +64,4 @@ ReadAllEventsBackwardOperation.prototype.toString = function() { this._position, this._maxCount, this._resolveLinkTos, this._requireMaster); }; -module.exports = ReadAllEventsBackwardOperation; +module.exports = ReadAllEventsBackwardOperation; \ No newline at end of file diff --git a/src/clientOperations/readAllEventsForwardOperation.js b/src/clientOperations/readAllEventsForwardOperation.js index 01fb7e9..f95854a 100644 --- a/src/clientOperations/readAllEventsForwardOperation.js +++ b/src/clientOperations/readAllEventsForwardOperation.js @@ -1,5 +1,4 @@ var util = require('util'); -var uuid = require('uuid'); var TcpCommand = require('../systemData/tcpCommand'); var ClientMessage = require('../messages/clientMessage'); @@ -65,4 +64,4 @@ ReadAllEventsForwardOperation.prototype.toString = function() { this._position, this._maxCount, this._resolveLinkTos, this._requireMaster); }; -module.exports = ReadAllEventsForwardOperation; +module.exports = ReadAllEventsForwardOperation; \ No newline at end of file diff --git a/src/clientOperations/readEventOperation.js b/src/clientOperations/readEventOperation.js index a0f4ffc..f4f2776 100644 --- a/src/clientOperations/readEventOperation.js +++ b/src/clientOperations/readEventOperation.js @@ -59,7 +59,6 @@ ReadEventOperation.prototype._transformResponse = function(response) { return new results.EventReadResult(convert(response.result), this._stream, this._eventNumber, response.event); }; - function convert(result) { switch (result) @@ -82,4 +81,4 @@ ReadEventOperation.prototype.toString = function() { this._stream, this._eventNumber, this._resolveLinkTos, this._requireMaster); }; -module.exports = ReadEventOperation; +module.exports = ReadEventOperation; \ No newline at end of file diff --git a/src/clientOperations/readStreamEventsBackwardOperation.js b/src/clientOperations/readStreamEventsBackwardOperation.js index 13191e8..2d75808 100644 --- a/src/clientOperations/readStreamEventsBackwardOperation.js +++ b/src/clientOperations/readStreamEventsBackwardOperation.js @@ -1,5 +1,4 @@ var util = require('util'); -var uuid = require('uuid'); var TcpCommand = require('../systemData/tcpCommand'); var ClientMessage = require('../messages/clientMessage'); @@ -77,4 +76,4 @@ ReadStreamEventsBackwardOperation.prototype.toString = function() { this._stream, this._fromEventNumber, this._maxCount, this._resolveLinkTos, this._requireMaster); }; -module.exports = ReadStreamEventsBackwardOperation; +module.exports = ReadStreamEventsBackwardOperation; \ No newline at end of file diff --git a/src/clientOperations/readStreamEventsForwardOperation.js b/src/clientOperations/readStreamEventsForwardOperation.js index 4367ea7..364c9a2 100644 --- a/src/clientOperations/readStreamEventsForwardOperation.js +++ b/src/clientOperations/readStreamEventsForwardOperation.js @@ -1,5 +1,4 @@ var util = require('util'); -var uuid = require('uuid'); var TcpCommand = require('../systemData/tcpCommand'); var ClientMessage = require('../messages/clientMessage'); @@ -77,4 +76,4 @@ ReadStreamEventsForwardOperation.prototype.toString = function() { this._stream, this._fromEventNumber, this._maxCount, this._resolveLinkTos, this._requireMaster); }; -module.exports = ReadStreamEventsForwardOperation; +module.exports = ReadStreamEventsForwardOperation; \ No newline at end of file diff --git a/src/clientOperations/startTransactionOperation.js b/src/clientOperations/startTransactionOperation.js index ec0e465..e86e7f7 100644 --- a/src/clientOperations/startTransactionOperation.js +++ b/src/clientOperations/startTransactionOperation.js @@ -1,12 +1,10 @@ var util = require('util'); -var uuid = require('uuid'); var TcpCommand = require('../systemData/tcpCommand'); var InspectionDecision = require('../systemData/inspectionDecision'); var InspectionResult = require('./../systemData/inspectionResult'); var ClientMessage = require('../messages/clientMessage'); var EventStoreTransaction = require('../eventStoreTransaction'); -var results = require('../results'); var AccessDeniedError = require('../errors/accessDeniedError'); var WrongExpectedVersionError = require('../errors/wrongExpectedVersionError'); var StreamDeletedError = require('../errors/streamDeletedError'); @@ -68,4 +66,4 @@ StartTransactionOperation.prototype.toString = function() { return util.format("Stream: %s, ExpectedVersion: %d", this._stream, this._expectedVersion); }; -module.exports = StartTransactionOperation; +module.exports = StartTransactionOperation; \ No newline at end of file diff --git a/src/clientOperations/subscriptionOperation.js b/src/clientOperations/subscriptionOperation.js index dd6a048..ebb8023 100644 --- a/src/clientOperations/subscriptionOperation.js +++ b/src/clientOperations/subscriptionOperation.js @@ -1,5 +1,4 @@ var util = require('util'); -var uuid = require('uuid'); var TcpCommand = require('../systemData/tcpCommand'); var TcpFlags = require('../systemData/tcpFlags'); @@ -46,8 +45,7 @@ SubscriptionOperation.prototype._enqueueSend = function(pkg) { SubscriptionOperation.prototype.subscribe = function(correlationId, connection) { if (connection === null) throw new TypeError("connection is null."); - if (this._subscription !== null || this._unsubscribed) - return false; + if (this._subscription !== null || this._unsubscribed) return false; this._correlationId = correlationId; connection.enqueueSend(this._createSubscriptionPackage()); @@ -76,9 +74,7 @@ SubscriptionOperation.prototype.inspectPackage = function(pkg) { try { var result = this._inspectPackage(pkg); - if (result !== null) { - return result; - } + if (result !== null) return result; switch (pkg.command) { @@ -128,8 +124,9 @@ SubscriptionOperation.prototype.inspectPackage = function(pkg) { case TcpCommand.NotHandled: { - if (this._subscription !== null) + if (this._subscription !== null) { throw new Error("NotHandled command appeared while we already subscribed."); + } var message = ClientMessage.NotHandled.decode(pkg.data.toBuffer()); switch (message.reason) @@ -172,8 +169,7 @@ SubscriptionOperation.prototype.connectionClosed = function() { }; SubscriptionOperation.prototype.timeOutSubscription = function() { - if (this._subscription !== null) - return false; + if (this._subscription !== null) return false; this.dropSubscription(SubscriptionDropReason.SubscribingError, null); return true; }; @@ -182,9 +178,10 @@ SubscriptionOperation.prototype.dropSubscription = function(reason, err, connect if (!this._unsubscribed) { this._unsubscribed = true; - if (this._verboseLogging) + if (this._verboseLogging) { this._log.debug("Subscription %s to %s: closing subscription, reason: %s, exception: %s...", - this._correlationId, this._streamId || "", reason, err); + this._correlationId, this._streamId || "", reason, err); + } if (reason !== SubscriptionDropReason.UserInitiated && this._subscription === null) { @@ -193,24 +190,31 @@ SubscriptionOperation.prototype.dropSubscription = function(reason, err, connect return; } - if (reason === SubscriptionDropReason.UserInitiated && this._subscription !== null && connection !== null) + if (reason === SubscriptionDropReason.UserInitiated && this._subscription !== null && connection !== null) { connection.enqueueSend(this._createUnsubscriptionPackage()); + } var self = this; - if (this._subscription !== null) - this._executeAction(function() { self._subscriptionDropped(self._subscription, reason, err); }); + if (this._subscription !== null) { + this._executeAction(function () { + self._subscriptionDropped(self._subscription, reason, err); + }); + } } }; SubscriptionOperation.prototype._confirmSubscription = function(lastCommitPosition, lastEventNumber) { - if (lastCommitPosition < -1) + if (lastCommitPosition < -1) { throw new Error(util.format("Invalid lastCommitPosition %s on subscription confirmation.", lastCommitPosition)); - if (this._subscription !== null) + } + if (this._subscription !== null) { throw new Error("Double confirmation of subscription."); + } - if (this._verboseLogging) + if (this._verboseLogging) { this._log.debug("Subscription %s to %s: subscribed at CommitPosition: %d, EventNumber: %d.", - this._correlationId, this._streamId || "", lastCommitPosition, lastEventNumber); + this._correlationId, this._streamId || "", lastCommitPosition, lastEventNumber); + } this._subscription = this._createSubscriptionObject(lastCommitPosition, lastEventNumber); this._cb(null, this._subscription); @@ -221,15 +225,15 @@ SubscriptionOperation.prototype._createSubscriptionObject = function(lastCommitP }; SubscriptionOperation.prototype._onEventAppeared = function(e) { - if (this._unsubscribed) - return; + if (this._unsubscribed) return; if (this._subscription === null) throw new Error("Subscription not confirmed, but event appeared!"); - if (this._verboseLogging) + if (this._verboseLogging) { this._log.debug("Subscription %s to %s: event appeared (%s, %d, %s @ %s).", - this._correlationId, this._streamId || "", - e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition); + this._correlationId, this._streamId || "", + e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition); + } var self = this; this._executeAction(function() { return self._eventAppeared(self._subscription, e); }); diff --git a/src/clientOperations/transactionalWriteOperation.js b/src/clientOperations/transactionalWriteOperation.js index bab00a8..952e0dc 100644 --- a/src/clientOperations/transactionalWriteOperation.js +++ b/src/clientOperations/transactionalWriteOperation.js @@ -64,5 +64,4 @@ TransactionalWriteOperation.prototype.toString = function() { return util.format("TransactionId: %s", this._transactionId); }; -module.exports = TransactionalWriteOperation; - +module.exports = TransactionalWriteOperation; \ No newline at end of file diff --git a/src/clientOperations/updatePersistentSubscriptionOperation.js b/src/clientOperations/updatePersistentSubscriptionOperation.js index 5a262f3..034ddcd 100644 --- a/src/clientOperations/updatePersistentSubscriptionOperation.js +++ b/src/clientOperations/updatePersistentSubscriptionOperation.js @@ -1,5 +1,4 @@ var util = require('util'); -var uuid = require('uuid'); var ensure = require('../common/utils/ensure'); var OperationBase = require('../clientOperations/operationBase'); @@ -84,4 +83,4 @@ UpdatePersistentSubscriptionOperation.prototype.toString = function() { return util.format("Stream: %s, Group Name: %s", this._stream, this._groupName); }; -module.exports = UpdatePersistentSubscriptionOperation; +module.exports = UpdatePersistentSubscriptionOperation; \ No newline at end of file diff --git a/src/common/bufferSegment.js b/src/common/bufferSegment.js index 627d76e..2a13397 100644 --- a/src/common/bufferSegment.js +++ b/src/common/bufferSegment.js @@ -19,8 +19,7 @@ BufferSegment.prototype.toString = function() { }; BufferSegment.prototype.toBuffer = function() { - if (this.offset === 0 && this.count === this.buffer.length) - return this.buffer; + if (this.offset === 0 && this.count === this.buffer.length) return this.buffer; return this.buffer.slice(this.offset, this.offset + this.count); }; diff --git a/src/common/hash.js b/src/common/hash.js index 0c3aa4f..a85cb38 100644 --- a/src/common/hash.js +++ b/src/common/hash.js @@ -39,4 +39,4 @@ Hash.prototype.remove = function(key) { }; -module.exports = Hash; +module.exports = Hash; \ No newline at end of file diff --git a/src/common/systemEventTypes.js b/src/common/systemEventTypes.js index 56dd72e..1974520 100644 --- a/src/common/systemEventTypes.js +++ b/src/common/systemEventTypes.js @@ -1,5 +1,5 @@ -const SystemEventTypes = { +const SystemEventTypes = Object.freeze({ StreamMetadata: '$metadata' -}; +}); -module.exports = SystemEventTypes; +module.exports = SystemEventTypes; \ No newline at end of file diff --git a/src/common/systemMetadata.js b/src/common/systemMetadata.js index f5d0f44..42aa2cc 100644 --- a/src/common/systemMetadata.js +++ b/src/common/systemMetadata.js @@ -1,4 +1,4 @@ -const SystemMetadata = { +const SystemMetadata = Object.freeze({ maxAge: '$maxAge', maxCount: '$maxCount', truncateBefore: '$tb', @@ -11,7 +11,6 @@ const SystemMetadata = { aclMetaWrite: '$mw', userStreamAcl: '$userStreamAcl', systemStreamAcl: '$systemStreamAcl' -}; -Object.freeze(SystemMetadata); +}); module.exports = SystemMetadata; \ No newline at end of file diff --git a/src/common/systemStreams.js b/src/common/systemStreams.js index 0cc946e..d019126 100644 --- a/src/common/systemStreams.js +++ b/src/common/systemStreams.js @@ -1,6 +1,6 @@ -module.exports.metastreamOf = function(stream) { +exports.metastreamOf = function(stream) { return '$$' + stream; }; -module.exports.isMetastream = function(stream) { +exports.isMetastream = function(stream) { return stream.indexOf('$$') === 0; }; \ No newline at end of file diff --git a/src/common/utils/ensure.js b/src/common/utils/ensure.js index bec90e2..726a586 100644 --- a/src/common/utils/ensure.js +++ b/src/common/utils/ensure.js @@ -1,20 +1,16 @@ var Long = require('long'); module.exports.notNullOrEmpty = function(value, name) { - if (value === null) - throw new TypeError(name + " should not be null."); - if (value === '') - throw new Error(name + " should not be empty."); + if (value === null) throw new TypeError(name + " should not be null."); + if (value === '') throw new Error(name + " should not be empty."); }; module.exports.notNull = function(value, name) { - if (value === null) - throw new TypeError(name + " should not be null."); + if (value === null) throw new TypeError(name + " should not be null."); }; function isInteger(value, name) { - if (typeof value !== 'number' || value % 1 !== 0) - throw new TypeError(name + " should be an integer."); + if (typeof value !== 'number' || value % 1 !== 0) throw new TypeError(name + " should be an integer."); } module.exports.isInteger = isInteger; @@ -27,24 +23,23 @@ module.exports.isLongOrInteger = function(value, name) { }; module.exports.isArrayOf = function(expectedType, value, name) { - if (!Array.isArray(value)) - throw new TypeError(name + " should be an array."); - if (!value.every(function(x) { return x instanceof expectedType; })) + if (!Array.isArray(value)) throw new TypeError(name + " should be an array."); + if (!value.every(function(x) { return x instanceof expectedType; })) { throw new TypeError([name, " should be an array of ", expectedType.name, "."].join("")); + } }; module.exports.isTypeOf = function(expectedType, value, name, nullAllowed) { if (nullAllowed && value === null) return; - if (!(value instanceof expectedType)) - throw new TypeError([name, " should be of type '", expectedType.name, "'", nullAllowed ? " or null": "", "."].join("")); + if (!(value instanceof expectedType)) { + throw new TypeError([name, " should be of type '", expectedType.name, "'", nullAllowed ? " or null" : "", "."].join("")); + } }; module.exports.positive = function(value, name) { - if (value <= 0) - throw new Error(name + " should be positive."); + if (value <= 0) throw new Error(name + " should be positive."); }; module.exports.nonNegative = function(value, name) { - if (value < 0) - throw new Error(name + " should be non-negative."); + if (value < 0) throw new Error(name + " should be non-negative."); }; \ No newline at end of file diff --git a/src/core/clusterDnsEndPointDiscoverer.js b/src/core/clusterDnsEndPointDiscoverer.js index c207667..b0a688a 100644 --- a/src/core/clusterDnsEndPointDiscoverer.js +++ b/src/core/clusterDnsEndPointDiscoverer.js @@ -34,18 +34,19 @@ ClusterDnsEndPointDiscoverer.prototype.discover = function(failedTcpEndPoint) { function discover(resolve, reject) { self._discoverEndPoint(failedTcpEndPoint) .then(function (endPoints) { - if (!endPoints) + if (!endPoints) { self._log.info(util.format("Discovering attempt %d/%d failed: no candidate found.", attempt, self._maxDiscoverAttempts)); + } return endPoints; }) .catch(function (exc) { self._log.info(util.format("Discovering attempt %d/%d failed with error: %s.\n%s", attempt, self._maxDiscoverAttempts, exc, exc.stack)); }) .then(function (endPoints) { - if (endPoints) - return resolve(endPoints); - if (attempt++ === self._maxDiscoverAttempts) + if (endPoints) return resolve(endPoints); + if (attempt++ === self._maxDiscoverAttempts) { return reject(new Error('Failed to discover candidate in ' + self._maxDiscoverAttempts + ' attempts.')); + } setTimeout(discover, 500, resolve, reject); }); } @@ -74,8 +75,7 @@ ClusterDnsEndPointDiscoverer.prototype._discoverEndPoint = function (failedTcpEn if (endPoints) return endPoints; return self._tryGetGossipFrom(gossipCandidates[j++]) .then(function (gossip) { - if (!gossip || !gossip.members || gossip.members.length === 0) - return; + if (!gossip || !gossip.members || gossip.members.length === 0) return; var bestNode = self._tryDetermineBestNode(gossip.members); if (bestNode) { self._oldGossip = gossip.members; @@ -105,10 +105,11 @@ ClusterDnsEndPointDiscoverer.prototype._arrangeGossipCandidates = function (memb var j = members.length; for (var k = 0; k < members.length; ++k) { - if (members[k].state === 'Manager') + if (members[k].state === 'Manager') { result[--j] = new GossipSeed({host: members[k].externalHttpIp, port: members[k].externalHttpPort}); - else + } else { result[++i] = new GossipSeed({host: members[k].externalHttpIp, port: members[k].externalHttpPort}); + } } this._randomShuffle(result, 0, i); // shuffle nodes this._randomShuffle(result, j, members.length - 1); // shuffle managers @@ -122,8 +123,7 @@ ClusterDnsEndPointDiscoverer.prototype._getGossipCandidatesFromDns = function () var endpoints = self._gossipSeeds; self._randomShuffle(endpoints, 0, endpoints.length - 1); resolve(endpoints); - } - else { + } else { const dnsOptions = { family: 4, hints: dns.ADDRCONFIG | dns.V4MAPPED, @@ -192,7 +192,7 @@ ClusterDnsEndPointDiscoverer.prototype._tryGetGossipFrom = function (endPoint) { }); }; -const VNodeStates = { +const VNodeStates = Object.freeze({ 'Initializing': 0, 'Unknown': 1, 'PreReplica': 2, @@ -204,7 +204,7 @@ const VNodeStates = { 'Manager': 8, 'ShuttingDown': 9, 'Shutdown': 10 -}; +}); ClusterDnsEndPointDiscoverer.prototype._tryDetermineBestNode = function (members) { var notAllowedStates = [ @@ -240,8 +240,7 @@ function rndNext(min, max) { } ClusterDnsEndPointDiscoverer.prototype._randomShuffle = function (arr, i, j) { - if (i >= j) - return; + if (i >= j) return; for (var k = i; k <= j; ++k) { var index = rndNext(k, j + 1); diff --git a/src/core/eventStoreConnectionLogicHandler.js b/src/core/eventStoreConnectionLogicHandler.js index 4698397..2f56075 100644 --- a/src/core/eventStoreConnectionLogicHandler.js +++ b/src/core/eventStoreConnectionLogicHandler.js @@ -17,14 +17,14 @@ var TcpCommand = require('../systemData/tcpCommand'); var TcpFlags = require('../systemData/tcpFlags'); var InspectionDecision = require('../systemData/inspectionDecision'); -const ConnectionState = { +const ConnectionState = Object.freeze({ Init: 'init', Connecting: 'connecting', Connected: 'connected', Closed: 'closed' -}; +}); -const ConnectingPhase = { +const ConnectingPhase = Object.freeze({ Invalid: 'invalid', Reconnecting: 'reconnecting', EndPointDiscovery: 'endpointDiscovery', @@ -32,7 +32,7 @@ const ConnectingPhase = { Authentication: 'authentication', Identification: 'identification', Connected: 'connected' -}; +}); const TimerPeriod = 200; const TimerTickMessage = new messages.TimerTickMessage(); @@ -187,8 +187,7 @@ EventStoreConnectionLogicHandler.prototype._closeConnection = function(reason, e this._logInfo("Closed. Reason: %s", reason); - if (error) - this.emit('error', error); + if (error) this.emit('error', error); this.emit('closed', reason); }; @@ -205,7 +204,7 @@ EventStoreConnectionLogicHandler.prototype._closeTcpConnection = function(reason this._connection = null; }; -var _nextSeqNo = -1; +var _nextSeqNo = 0; function createOperationItem(operation, maxRetries, timeout) { var operationItem = { seqNo: _nextSeqNo++, @@ -283,10 +282,11 @@ EventStoreConnectionLogicHandler.prototype._startSubscription = function(msg) { this._state === ConnectionState.Connected ? "fire" : "enqueue", operation.constructor.name, operation, msg.maxRetries, msg.timeout); var subscription = createSubscriptionItem(operation, msg.maxRetries, msg.timeout); - if (this._state === ConnectionState.Connecting) + if (this._state === ConnectionState.Connecting) { this._subscriptions.enqueueSubscription(subscription); - else + } else { this._subscriptions.startSubscription(subscription, this._connection); + } break; case ConnectionState.Closed: msg.cb(new Error("Connection closed. Connection: " + this._esConnection.connectionName)); @@ -312,10 +312,11 @@ EventStoreConnectionLogicHandler.prototype._startPersistentSubscription = functi this._state === ConnectionState.Connected ? "fire" : "enqueue", operation.constructor.name, operation, msg.maxRetries, msg.timeout); var subscription = createSubscriptionItem(operation, msg.maxRetries, msg.timeout); - if (this._state === ConnectionState.Connecting) + if (this._state === ConnectionState.Connecting) { this._subscriptions.enqueueSubscription(subscription); - else + } else { this._subscriptions.startSubscription(subscription, this._connection); + } break; case ConnectionState.Closed: msg.cb(new Error("Connection closed. " + this._esConnection.connectionName)); @@ -561,8 +562,9 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti default: throw new Error("Unknown InspectionDecision: " + result.decision); } - if (this._state === ConnectionState.Connected) + if (this._state === ConnectionState.Connected) { this._operations.scheduleWaitingOperations(connection); + } return; } @@ -609,8 +611,7 @@ EventStoreConnectionLogicHandler.prototype._reconnectTo = function(endPoints) { return; } - if (this._state !== ConnectionState.Connected || this._connection.remoteEndPoint === endPoint) - return; + if (this._state !== ConnectionState.Connected || this._connection.remoteEndPoint === endPoint) return; var msg = util.format("EventStoreConnection '%s': going to reconnect to [%j]. Current endpoint: [%j, L%j].", this._esConnection.connectionName, endPoint, this._connection.remoteEndPoint, this._connection.localEndPoint); @@ -627,30 +628,26 @@ EventStoreConnectionLogicHandler.prototype._timerTick = function() { { case ConnectionState.Init: break; case ConnectionState.Connecting: - if (this._connectingPhase === ConnectingPhase.Reconnecting && (Date.now() - this._reconnInfo.timeStamp) >= this._settings.reconnectionDelay) - { + if (this._connectingPhase === ConnectingPhase.Reconnecting && (Date.now() - this._reconnInfo.timeStamp) >= this._settings.reconnectionDelay) { this._logDebug("TimerTick checking reconnection..."); this._reconnInfo = {reconnectionAttempt: this._reconnInfo.reconnectionAttempt + 1, timeStamp: Date.now()}; - if (this._settings.maxReconnections >= 0 && this._reconnInfo.reconnectionAttempt > this._settings.maxReconnections) + if (this._settings.maxReconnections >= 0 && this._reconnInfo.reconnectionAttempt > this._settings.maxReconnections) { this._closeConnection("Reconnection limit reached."); - else - { + } else { this.emit('reconnecting', {}); this._discoverEndpoint(null); } - } - else if (this._connectingPhase === ConnectingPhase.Authentication && (Date.now() - this._authInfo.timeStamp) >= this._settings.operationTimeout) - { + } else if (this._connectingPhase === ConnectingPhase.Authentication && (Date.now() - this._authInfo.timeStamp) >= this._settings.operationTimeout) { this.emit('authenticationFailed', "Authentication timed out."); if (this._clientVersion === 1) { this._goToIdentifiedState(); } else { this._goToConnectedState(); } - } - else if (this._connectingPhase === ConnectingPhase.Authentication || this._connectingPhase === ConnectingPhase.Connected) + } else if (this._connectingPhase === ConnectingPhase.Authentication || this._connectingPhase === ConnectingPhase.Connected) { this._manageHeartbeats(); + } break; case ConnectionState.Connected: // operations timeouts are checked only if connection is established and check period time passed @@ -677,8 +674,7 @@ EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() { if (this._connection === null) return; var timeout = this._heartbeatInfo.isIntervalStage ? this._settings.heartbeatInterval : this._settings.heartbeatTimeout; - if ((Date.now() - this._heartbeatInfo.timeStamp) < timeout) - return; + if ((Date.now() - this._heartbeatInfo.timeStamp) < timeout) return; var packageNumber = this._packageNumber; if (this._heartbeatInfo.lastPackageNumber !== packageNumber) @@ -711,15 +707,17 @@ EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() { EventStoreConnectionLogicHandler.prototype._logDebug = function(message) { if (!this._settings.verboseLogging) return; - if (arguments.length > 1) - message = util.format.apply(util, Array.prototype.slice.call(arguments)); + if (arguments.length > 1) { + message = util.format.apply(util, Array.prototype.slice.call(arguments)); + } this._settings.log.debug("EventStoreConnection '%s': %s", this._esConnection.connectionName, message); }; EventStoreConnectionLogicHandler.prototype._logInfo = function(message){ - if (arguments.length > 1) + if (arguments.length > 1) { message = util.format.apply(util, Array.prototype.slice.call(arguments)); + } this._settings.log.info("EventStoreConnection '%s': %s", this._esConnection.connectionName, message); }; diff --git a/src/core/messages.js b/src/core/messages.js index 6892b59..108bda0 100644 --- a/src/core/messages.js +++ b/src/core/messages.js @@ -1,5 +1,4 @@ var util = require('util'); -var ensure = require('../common/utils/ensure'); function Message() { } diff --git a/src/core/operationsManager.js b/src/core/operationsManager.js index 675728a..6e1ee6a 100644 --- a/src/core/operationsManager.js +++ b/src/core/operationsManager.js @@ -110,8 +110,7 @@ OperationsManager.prototype.checkTimeoutsAndRetry = function(connection) { }; OperationsManager.prototype.scheduleOperationRetry = function(operation) { - if (!this.removeOperation(operation)) - return; + if (!this.removeOperation(operation)) return; this._logDebug("ScheduleOperationRetry for %s.", operation); if (operation.maxRetries >= 0 && operation.retryCount >= operation.maxRetries) @@ -166,10 +165,11 @@ OperationsManager.prototype.scheduleOperation = function(operation, connection) OperationsManager.prototype._logDebug = function(message) { if (!this._settings.verboseLogging) return; - if (arguments.length > 1) + if (arguments.length > 1) { message = util.format.apply(util, Array.prototype.slice.call(arguments)); + } this._settings.log.debug("EventStoreConnection '%s': %s.", this._connectionName, message); }; -module.exports = OperationsManager; +module.exports = OperationsManager; \ No newline at end of file diff --git a/src/core/simpleQueuedHandler.js b/src/core/simpleQueuedHandler.js index 607cef8..aa7e7b5 100644 --- a/src/core/simpleQueuedHandler.js +++ b/src/core/simpleQueuedHandler.js @@ -1,8 +1,6 @@ function typeName(t) { - if (typeof t === 'function') - return t.name; - if (typeof t === 'object') - return t.constructor.name; + if (typeof t === 'function') return t.name; + if (typeof t === 'object') return t.constructor.name; throw new TypeError('type must be a function or object, not ' + typeof t); } @@ -36,8 +34,7 @@ SimpleQueuedHandler.prototype._processQueue = function() { while(message) { var typeId = typeName(message); var handler = this._handlers[typeId]; - if (!handler) - throw new Error("No handler registered for message " + typeId); + if (!handler) throw new Error("No handler registered for message " + typeId); setImmediate(handler, message); message = this._messages.shift(); } diff --git a/src/errors/projectionCommandFailedError.js b/src/errors/projectionCommandFailedError.js index 9b46310..db06300 100644 --- a/src/errors/projectionCommandFailedError.js +++ b/src/errors/projectionCommandFailedError.js @@ -7,4 +7,4 @@ function ProjectionCommandFailedError(httpStatusCode, message) { } util.inherits(ProjectionCommandFailedError, Error); -module.exports = ProjectionCommandFailedError; +module.exports = ProjectionCommandFailedError; \ No newline at end of file diff --git a/src/eventData.js b/src/eventData.js index 34505a3..54ed411 100644 --- a/src/eventData.js +++ b/src/eventData.js @@ -1,5 +1,3 @@ -var uuid = require('uuid'); - const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; function isValidId(id) { if (typeof id !== 'string') return false; diff --git a/src/eventStoreAllCatchUpSubscription.js b/src/eventStoreAllCatchUpSubscription.js index 8353ca2..cb759bb 100644 --- a/src/eventStoreAllCatchUpSubscription.js +++ b/src/eventStoreAllCatchUpSubscription.js @@ -45,8 +45,7 @@ EventStoreAllCatchUpSubscription.prototype._readEventsTill = function( }); }) .then(function(done) { - if (done || self._shouldStop) - return; + if (done || self._shouldStop) return; return readNext(); }); } @@ -69,10 +68,11 @@ EventStoreAllCatchUpSubscription.prototype._tryProcess = function(e) { this._lastProcessedPosition = e.originalPosition; processed = true; } - if (this._verbose) + if (this._verbose) { this._log.debug("Catch-up Subscription to %s: %s event (%s, %d, %s @ %s).", - this.streamId || '', processed ? "processed" : "skipping", - e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalPosition); + this.streamId || '', processed ? "processed" : "skipping", + e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalPosition); + } return (promise && promise.then) ? promise : Promise.resolve(); }; diff --git a/src/eventStoreCatchUpSubscription.js b/src/eventStoreCatchUpSubscription.js index ddb7099..4950351 100644 --- a/src/eventStoreCatchUpSubscription.js +++ b/src/eventStoreCatchUpSubscription.js @@ -1,7 +1,6 @@ var util = require('util'); var SubscriptionDropReason = require('./subscriptionDropReason'); -var results = require('./results'); const DefaultReadBatchSize = 500; const DefaultMaxPushQueueSize = 10000; @@ -68,9 +67,9 @@ function EventStoreCatchUpSubscription( var self = this; this._onReconnect = function() { - if (self._verbose) self._log.debug("Catch-up Subscription to %s: recovering after reconnection.", self._streamId || ''); if (self._verbose) self._log.debug("Catch-up Subscription to %s: unhooking from connection.Connected.", self._streamId || ''); self._connection.removeListener('connected', self._onReconnect); + if (self._verbose) self._log.debug("Catch-up Subscription to %s: recovering after reconnection.", self._streamId || ''); self._runSubscription(); } } @@ -133,10 +132,11 @@ EventStoreCatchUpSubscription.prototype._runSubscription = function() { .then(function() { if (self._shouldStop) return; if (self._verbose) self._log.debug("Catch-up Subscription to %s: subscribing...", logStreamName); - if (self._streamId === '') + if (self._streamId === '') { return self._connection.subscribeToAll(self._resolveLinkTos, self._enqueuePushedEvent.bind(self), self._serverSubscriptionDropped.bind(self), self._userCredentials); - else + } else { return self._connection.subscribeToStream(self._streamId, self._resolveLinkTos, self._enqueuePushedEvent.bind(self), self._serverSubscriptionDropped.bind(self), self._userCredentials); + } }) .then(function(subscription) { if (subscription === undefined) return; @@ -155,12 +155,13 @@ EventStoreCatchUpSubscription.prototype._runSubscription = function() { return; } if (self._verbose) self._log.debug("Catch-up Subscription to %s: processing live events...", logStreamName); - if (self._liveProcessingStarted) + if (self._liveProcessingStarted) { try { self._liveProcessingStarted(self); - } catch(e) { + } catch (e) { self._log.error(e, "Catch-up Subscription to %s: liveProcessingStarted callback failed.", logStreamName); } + } if (self._verbose) self._log.debug("Catch-up Subscription to %s: hooking to connection.Connected", logStreamName); self._connection.on('connected', self._onReconnect); self._allowProcessing = true; @@ -169,10 +170,11 @@ EventStoreCatchUpSubscription.prototype._runSubscription = function() { }; EventStoreCatchUpSubscription.prototype._enqueuePushedEvent = function(subscription, e) { - if (this._verbose) + if (this._verbose) { this._log.debug("Catch-up Subscription to %s: event appeared (%s, %d, %s @ %s).", - this._streamId || '', - e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition); + this._streamId || '', + e.originalStreamId, e.originalEventNumber, e.originalEvent.eventType, e.originalPosition); + } if (this._liveQueue.length >= this.maxPushQueueSize) { @@ -183,8 +185,7 @@ EventStoreCatchUpSubscription.prototype._enqueuePushedEvent = function(subscript this._liveQueue.push(e); - if (this._allowProcessing) - this._ensureProcessingPushQueue(); + if (this._allowProcessing) this._ensureProcessingPushQueue(); }; EventStoreCatchUpSubscription.prototype._serverSubscriptionDropped = function(subscription, reason, err) { @@ -196,8 +197,7 @@ EventStoreCatchUpSubscription.prototype._enqueueSubscriptionDropNotification = f if (this._dropData) return; this._dropData = {reason: reason, error: error}; this._liveQueue.push(new DropSubscriptionEvent()); - if (this._allowProcessing) - this._ensureProcessingPushQueue(); + if (this._allowProcessing) this._ensureProcessingPushQueue(); }; EventStoreCatchUpSubscription.prototype._ensureProcessingPushQueue = function() { @@ -244,18 +244,19 @@ EventStoreCatchUpSubscription.prototype._dropSubscription = function(reason, err if (this._isDropped) return; this._isDropped = true; - if (this._verbose) + if (this._verbose) { this._log.debug("Catch-up Subscription to %s: dropping subscription, reason: %s %s.", - this._streamId || '', reason, error); + this._streamId || '', reason, error); + } - if (this._subscription) - this._subscription.unsubscribe(); - if (this._subscriptionDropped) + if (this._subscription) this._subscription.unsubscribe(); + if (this._subscriptionDropped) { try { this._subscriptionDropped(this, reason, error); - } catch(e) { + } catch (e) { this._log.error(e, "Catch-up Subscription to %s: subscriptionDropped callback failed.", this._streamId || ''); } + } this._stopped = true; }; diff --git a/src/eventStoreConnection.js b/src/eventStoreConnection.js index 5769457..e3f93c0 100644 --- a/src/eventStoreConnection.js +++ b/src/eventStoreConnection.js @@ -4,7 +4,7 @@ var ClusterDnsEndPointDiscoverer = require('./core/clusterDnsEndPointDiscoverer' var NoopLogger = require('./common/log/noopLogger'); var ensure = require('./common/utils/ensure'); -var defaultConnectionSettings = { +var defaultConnectionSettings = Object.freeze({ log: new NoopLogger(), verboseLogging: false, @@ -34,7 +34,7 @@ var defaultConnectionSettings = { maxDiscoverAttempts: 10, externalGossipPort: 0, gossipTimeout: 1000 -}; +}); function merge(a,b) { @@ -151,4 +151,4 @@ module.exports.create = function(settings, endPointOrGossipSeeds, connectionName if (typeof endPointOrGossipSeeds === 'object') return createFromTcpEndpoint(settings, endPointOrGossipSeeds, connectionName); if (typeof endPointOrGossipSeeds === 'string') return createFromStringEndpoint(settings, endPointOrGossipSeeds, connectionName); throw new TypeError('endPointOrGossipSeeds must be an object, a string or an array.'); -}; +}; \ No newline at end of file diff --git a/src/eventStoreNodeConnection.js b/src/eventStoreNodeConnection.js index c0474b3..aab5e90 100644 --- a/src/eventStoreNodeConnection.js +++ b/src/eventStoreNodeConnection.js @@ -140,8 +140,7 @@ EventStoreNodeConnection.prototype.appendToStream = function(stream, expectedVer ensure.notNullOrEmpty(stream, "stream"); ensure.isLongOrInteger(expectedVersion, "expectedVersion"); expectedVersion = Long.fromValue(expectedVersion); - if (!Array.isArray(events)) - events = [events]; + if (!Array.isArray(events)) events = [events]; ensure.isArrayOf(EventData, events, "events"); userCredentials = userCredentials || null; @@ -413,8 +412,7 @@ EventStoreNodeConnection.prototype.subscribeToStream = function( ) { ensure.notNullOrEmpty(stream, "stream"); ensure.isTypeOf(Function, eventAppeared, "eventAppeared"); - if (subscriptionDropped) - ensure.isTypeOf(Function, subscriptionDropped, "subscriptionDropped"); + if (subscriptionDropped) ensure.isTypeOf(Function, subscriptionDropped, "subscriptionDropped"); var self = this; return new Promise(function(resolve,reject) { @@ -641,8 +639,9 @@ EventStoreNodeConnection.prototype.setStreamMetadataRaw = function( stream, expectedMetastreamVersion, metadata, userCredentials ) { ensure.notNullOrEmpty(stream, "stream"); - if (systemStreams.isMetastream(stream)) + if (systemStreams.isMetastream(stream)) { throw new Error(util.format("Setting metadata for metastream '%s' is not supported.", stream)); + } ensure.isLongOrInteger(expectedMetastreamVersion, "expectedMetastreamVersion"); expectedMetastreamVersion = Long.fromValue(expectedMetastreamVersion); var self = this; @@ -713,7 +712,7 @@ EventStoreNodeConnection.prototype._enqueueOperation = function(operation) { var message = new messages.StartOperationMessage(operation, self._settings.maxRetries, self._settings.operationTimeout); function tryEnqueue() { if (self._handler.totalOperationCount >= self._settings.maxQueueSize) { - setImmediate(tryEnqueue); + setTimeout(tryEnqueue, 0); return; } self._handler.enqueueMessage(message); @@ -721,4 +720,4 @@ EventStoreNodeConnection.prototype._enqueueOperation = function(operation) { setImmediate(tryEnqueue) }; -module.exports = EventStoreNodeConnection; +module.exports = EventStoreNodeConnection; \ No newline at end of file diff --git a/src/eventStorePersistentSubscriptionBase.js b/src/eventStorePersistentSubscriptionBase.js index 43f1b07..5b471a8 100644 --- a/src/eventStorePersistentSubscriptionBase.js +++ b/src/eventStorePersistentSubscriptionBase.js @@ -56,8 +56,7 @@ EventStorePersistentSubscriptionBase.prototype.acknowledge = function(events) { ensure.notNull(events, "events"); if (this._subscription === null) throw new Error("Invalid operation. Subscription is not active yet."); - if (!Array.isArray(events)) - events = [events]; + if (!Array.isArray(events)) events = [events]; var ids = events.map(function(x) { return x.originalEvent.eventId; }); this._subscription.notifyEventsProcessed(ids); }; @@ -73,8 +72,7 @@ EventStorePersistentSubscriptionBase.prototype.fail = function(events, action, r ensure.notNull(reason, "reason"); if (this._subscription === null) throw new Error("Invalid operation. Subscription is not active yet."); - if (!Array.isArray(events)) - events = [events]; + if (!Array.isArray(events)) events = [events]; var ids = events.map(function(x) { return x.originalEvent.eventId; }); this._subscription.notifyEventsFailed(ids, action, reason); }; @@ -145,12 +143,12 @@ EventStorePersistentSubscriptionBase.prototype._processQueue = function() { return self._eventAppeared(self, ev); }) .then(function() { - if(self._autoAck) - self._subscription.notifyEventsProcessed([ev.originalEvent.eventId]); - if (self._verbose) + if(self._autoAck) self._subscription.notifyEventsProcessed([ev.originalEvent.eventId]); + if (self._verbose) { self._log.debug("Persistent Subscription to %s: processed event (%s, %d, %s @ %d).", self._streamId, ev.originalEvent.eventStreamId, ev.originalEvent.eventNumber, ev.originalEvent.eventType, ev.originalEventNumber); + } return false; }, function(err) { //TODO GFY should we autonak here? @@ -168,12 +166,12 @@ EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reas if (!this._isDropped) { this._isDropped = true; - if (this._verbose) + if (this._verbose) { this._log.debug("Persistent Subscription to %s: dropping subscription, reason: %s %s.", - this._streamId, reason, error); + this._streamId, reason, error); + } - if (this._subscription !== null) - this._subscription.unsubscribe(); + if (this._subscription !== null) this._subscription.unsubscribe(); if (this._subscriptionDropped !== null) { try { this._subscriptionDropped(this, reason, error); @@ -185,4 +183,4 @@ EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reas } }; -module.exports = EventStorePersistentSubscriptionBase; +module.exports = EventStorePersistentSubscriptionBase; \ No newline at end of file diff --git a/src/eventStoreStreamCatchUpSubscription.js b/src/eventStoreStreamCatchUpSubscription.js index 46ae953..8498408 100644 --- a/src/eventStoreStreamCatchUpSubscription.js +++ b/src/eventStoreStreamCatchUpSubscription.js @@ -49,19 +49,18 @@ EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function( .then(function() { self._nextReadEventNumber = slice.nextEventNumber; var done = Promise.resolve(lastEventNumber === null ? slice.isEndOfStream : slice.nextEventNumber.compare(lastEventNumber) > 0); - if (!done && slice.isEndOfStream) - return delay(100, false); + if (!done && slice.isEndOfStream) return delay(100, false); return done; }); - break; case SliceReadStatus.StreamNotFound: - if (lastEventNumber && lastEventNumber.compare(-1) !== 0) + if (lastEventNumber && lastEventNumber.compare(-1) !== 0) { throw new Error(util.format("Impossible: stream %s disappeared in the middle of catching up subscription.", self.streamId)); + } return true; case SliceReadStatus.StreamDeleted: throw new Error("Stream deleted: " + self.streamId); default: - throw new Error("Unexpected StreamEventsSlice.Status: %s.", slice.status); + throw new Error(util.format("Unexpected StreamEventsSlice.Status: %s.", slice.status)); } }) .then(function(done) { @@ -72,9 +71,10 @@ EventStoreStreamCatchUpSubscription.prototype._readEventsTill = function( } return readNext() .then(function() { - if (self._verbose) + if (self._verbose) { self._log.debug("Catch-up Subscription to %s: finished reading events, nextReadEventNumber = %d.", - self.isSubscribedToAll ? '' : self.streamId, self._nextReadEventNumber); + self.isSubscribedToAll ? '' : self.streamId, self._nextReadEventNumber); + } }); }; @@ -86,12 +86,13 @@ EventStoreStreamCatchUpSubscription.prototype._tryProcess = function(e) { this._lastProcessedEventNumber = e.originalEventNumber; processed = true; } - if (this._verbose) + if (this._verbose) { this._log.debug("Catch-up Subscription to %s: %s event (%s, %d, %s @ %d).", - this.isSubscribedToAll ? '' : this.streamId, processed ? "processed" : "skipping", - e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalEventNumber); + this.isSubscribedToAll ? '' : this.streamId, processed ? "processed" : "skipping", + e.originalEvent.eventStreamId, e.originalEvent.eventNumber, e.originalEvent.eventType, e.originalEventNumber); + } return (promise && promise.then) ? promise : Promise.resolve(); }; -module.exports = EventStoreStreamCatchUpSubscription; +module.exports = EventStoreStreamCatchUpSubscription; \ No newline at end of file diff --git a/src/eventStoreSubscription.js b/src/eventStoreSubscription.js index f38ee92..20bc84a 100644 --- a/src/eventStoreSubscription.js +++ b/src/eventStoreSubscription.js @@ -41,4 +41,4 @@ EventStoreSubscription.prototype.unsubscribe = function() { throw new Error("EventStoreSubscription.unsubscribe abstract method called." + this.constructor.name); }; -module.exports = EventStoreSubscription; +module.exports = EventStoreSubscription; \ No newline at end of file diff --git a/src/gossipSeed.js b/src/gossipSeed.js index 2e1b1e1..804c8e5 100644 --- a/src/gossipSeed.js +++ b/src/gossipSeed.js @@ -1,6 +1,8 @@ -module.exports = function GossipSeed(endPoint, hostName) { +function GossipSeed(endPoint, hostName) { if (typeof endPoint !== 'object' || !endPoint.host || !endPoint.port) throw new TypeError('endPoint must be have host and port properties.'); this.endPoint = endPoint; this.hostName = hostName; Object.freeze(this); -}; +} + +module.exports = GossipSeed; \ No newline at end of file diff --git a/src/persistentEventStoreSubscription.js b/src/persistentEventStoreSubscription.js index 6cb71a0..52cf9ad 100644 --- a/src/persistentEventStoreSubscription.js +++ b/src/persistentEventStoreSubscription.js @@ -22,4 +22,4 @@ PersistentEventStoreSubscription.prototype.notifyEventsFailed = function(process this._subscriptionOperation.notifyEventsFailed(processedEvents, action, reason); }; -module.exports = PersistentEventStoreSubscription; +module.exports = PersistentEventStoreSubscription; \ No newline at end of file diff --git a/src/persistentSubscriptionNakEventAction.js b/src/persistentSubscriptionNakEventAction.js index 57fe178..0b75507 100644 --- a/src/persistentSubscriptionNakEventAction.js +++ b/src/persistentSubscriptionNakEventAction.js @@ -1,14 +1,15 @@ -const PersistentSubscriptionNakEventAction = { +const PersistentSubscriptionNakEventAction = Object.freeze({ Unknown: 0, Park: 1, Retry: 2, Skip: 3, - Stop: 4 -}; + Stop: 4, + isValid: function(value) { + for(var k in PersistentSubscriptionNakEventAction) { + if (PersistentSubscriptionNakEventAction[k] === value) return true; + } + return false; + } +}); -module.exports = PersistentSubscriptionNakEventAction; -module.exports.isValid = function(value) { - for(var k in PersistentSubscriptionNakEventAction) - if (PersistentSubscriptionNakEventAction[k] === value) return true; - return false; -}; +module.exports = PersistentSubscriptionNakEventAction; \ No newline at end of file diff --git a/src/readDirection.js b/src/readDirection.js index b7aa413..d418f4c 100644 --- a/src/readDirection.js +++ b/src/readDirection.js @@ -1,6 +1,6 @@ -const ReadDirection = { +const ReadDirection = Object.freeze({ Forward: 'forward', Backward: 'backward' -}; +}); -module.exports = ReadDirection; +module.exports = ReadDirection; \ No newline at end of file diff --git a/src/results.js b/src/results.js index 649eb2e..70c806d 100644 --- a/src/results.js +++ b/src/results.js @@ -19,10 +19,12 @@ function Position(commitPosition, preparePosition) { } Position.prototype.compareTo = function(other) { - if (this.commitPosition.lt(other.commitPosition) || (this.commitPosition.eq(other.commitPosition)&& this.preparePosition.lt(other.preparePosition))) + if (this.commitPosition.lt(other.commitPosition) || (this.commitPosition.eq(other.commitPosition)&& this.preparePosition.lt(other.preparePosition))) { return -1; - if (this.commitPosition.gt(other.commitPosition) || (this.commitPosition.eq(other.commitPosition) && this.preparePosition.gt(other.preparePosition))) + } + if (this.commitPosition.gt(other.commitPosition) || (this.commitPosition.eq(other.commitPosition) && this.preparePosition.gt(other.preparePosition))) { return 1; + } return 0; }; @@ -33,13 +35,12 @@ Position.prototype.toString = function() { Position.start = new Position(0,0); Position.end = new Position(-1,-1); -const EventReadStatus = { +const EventReadStatus = Object.freeze({ Success: 'success', NotFound: 'notFound', NoStream: 'noStream', StreamDeleted: 'streamDeleted' -}; -Object.freeze(EventReadStatus); +}); /** * @param {object} ev @@ -204,12 +205,11 @@ function RawStreamMetadataResult(stream, isStreamDeleted, metastreamVersion, str Object.freeze(this); } -const PersistentSubscriptionCreateStatus = { +const PersistentSubscriptionCreateStatus = Object.freeze({ Success: 'success', NotFound: 'notFound', Failure: 'failure' -}; -Object.freeze(PersistentSubscriptionCreateStatus); +}); /** * @param {string} status @@ -221,13 +221,12 @@ function PersistentSubscriptionCreateResult(status) { Object.freeze(this); } -const PersistentSubscriptionUpdateStatus = { +const PersistentSubscriptionUpdateStatus = Object.freeze({ Success: 'success', NotFound: 'notFound', Failure: 'failure', AccessDenied: 'accessDenied' -}; -Object.freeze(PersistentSubscriptionUpdateStatus); +}); /** * @param {string} status @@ -239,11 +238,10 @@ function PersistentSubscriptionUpdateResult(status) { Object.freeze(this); } -const PersistentSubscriptionDeleteStatus = { +const PersistentSubscriptionDeleteStatus = Object.freeze({ Success: 'success', Failure: 'failure' -}; -Object.freeze(PersistentSubscriptionDeleteStatus); +}); /** * @param {string} status @@ -256,18 +254,18 @@ function PersistentSubscriptionDeleteResult(status) { } // Exports Constructors -module.exports.Position = Position; -module.exports.ResolvedEvent = ResolvedEvent; -module.exports.EventReadStatus = EventReadStatus; -module.exports.EventReadResult = EventReadResult; -module.exports.WriteResult = WriteResult; -module.exports.StreamEventsSlice = StreamEventsSlice; -module.exports.AllEventsSlice = AllEventsSlice; -module.exports.DeleteResult = DeleteResult; -module.exports.RawStreamMetadataResult = RawStreamMetadataResult; -module.exports.PersistentSubscriptionCreateResult = PersistentSubscriptionCreateResult; -module.exports.PersistentSubscriptionCreateStatus = PersistentSubscriptionCreateStatus; -module.exports.PersistentSubscriptionUpdateResult = PersistentSubscriptionUpdateResult; -module.exports.PersistentSubscriptionUpdateStatus = PersistentSubscriptionUpdateStatus; -module.exports.PersistentSubscriptionDeleteResult = PersistentSubscriptionDeleteResult; -module.exports.PersistentSubscriptionDeleteStatus = PersistentSubscriptionDeleteStatus; +exports.Position = Position; +exports.ResolvedEvent = ResolvedEvent; +exports.EventReadStatus = EventReadStatus; +exports.EventReadResult = EventReadResult; +exports.WriteResult = WriteResult; +exports.StreamEventsSlice = StreamEventsSlice; +exports.AllEventsSlice = AllEventsSlice; +exports.DeleteResult = DeleteResult; +exports.RawStreamMetadataResult = RawStreamMetadataResult; +exports.PersistentSubscriptionCreateResult = PersistentSubscriptionCreateResult; +exports.PersistentSubscriptionCreateStatus = PersistentSubscriptionCreateStatus; +exports.PersistentSubscriptionUpdateResult = PersistentSubscriptionUpdateResult; +exports.PersistentSubscriptionUpdateStatus = PersistentSubscriptionUpdateStatus; +exports.PersistentSubscriptionDeleteResult = PersistentSubscriptionDeleteResult; +exports.PersistentSubscriptionDeleteStatus = PersistentSubscriptionDeleteStatus; \ No newline at end of file diff --git a/src/sliceReadStatus.js b/src/sliceReadStatus.js index 376ae72..126f7fa 100644 --- a/src/sliceReadStatus.js +++ b/src/sliceReadStatus.js @@ -1,8 +1,7 @@ -const SliceReadStatus = { +const SliceReadStatus = Object.freeze({ Success: 'success', StreamNotFound: 'streamNotFound', StreamDeleted: 'streamDeleted' -}; -Object.freeze(SliceReadStatus); +}); -module.exports = SliceReadStatus; +module.exports = SliceReadStatus; \ No newline at end of file diff --git a/src/subscriptionDropReason.js b/src/subscriptionDropReason.js index 2aafc92..e3e7f9b 100644 --- a/src/subscriptionDropReason.js +++ b/src/subscriptionDropReason.js @@ -1,4 +1,4 @@ -const SubscriptionDropReason = { +const SubscriptionDropReason = Object.freeze({ AccessDenied: 'accessDenied', CatchUpError: 'catchUpError', ConnectionClosed: 'connectionClosed', @@ -11,6 +11,6 @@ const SubscriptionDropReason = { SubscribingError: 'subscribingError', UserInitiated: 'userInitiated', Unknown: 'unknown' -}; +}); module.exports = SubscriptionDropReason; \ No newline at end of file diff --git a/src/systemConsumerStrategies.js b/src/systemConsumerStrategies.js index 268f460..602eb23 100644 --- a/src/systemConsumerStrategies.js +++ b/src/systemConsumerStrategies.js @@ -1,8 +1,7 @@ -const SystemConsumerStrategies = { +const SystemConsumerStrategies = Object.freeze({ DispatchToSingle: 'DispatchToSingle', RoundRobin: 'RoundRobin', Pinned: 'Pinned' -}; -Object.freeze(SystemConsumerStrategies); +}); -module.exports = SystemConsumerStrategies; +module.exports = SystemConsumerStrategies; \ No newline at end of file diff --git a/src/systemData/inspectionDecision.js b/src/systemData/inspectionDecision.js index 7a2c3ac..254d635 100644 --- a/src/systemData/inspectionDecision.js +++ b/src/systemData/inspectionDecision.js @@ -1,9 +1,9 @@ -var InspectionDecision = { +var InspectionDecision = Object.freeze({ DoNothing: 'doNothing', EndOperation: 'endOperation', Retry: 'retry', Reconnect: 'reconnect', Subscribed: 'subscribed' -}; +}); module.exports = InspectionDecision; \ No newline at end of file diff --git a/src/systemData/inspectionResult.js b/src/systemData/inspectionResult.js index b0878c2..dc2b198 100644 --- a/src/systemData/inspectionResult.js +++ b/src/systemData/inspectionResult.js @@ -5,4 +5,4 @@ function InspectionResult(decision, description, tcpEndPoint, secureTcpEndPoint) this.secureTcpEndPoint = secureTcpEndPoint || null; } -module.exports = InspectionResult; +module.exports = InspectionResult; \ No newline at end of file diff --git a/src/systemData/statusCode.js b/src/systemData/statusCode.js index 58317b9..50f2f71 100644 --- a/src/systemData/statusCode.js +++ b/src/systemData/statusCode.js @@ -1,9 +1,7 @@ var ClientMessage = require('../messages/clientMessage'); var SliceReadStatus = require('../sliceReadStatus'); -module.exports = {}; - -module.exports.convert = function(code) { +exports.convert = function(code) { switch(code) { case ClientMessage.ReadStreamEventsCompleted.ReadStreamResult.Success: return SliceReadStatus.Success; diff --git a/src/systemData/tcpCommand.js b/src/systemData/tcpCommand.js index e66bd77..b54c927 100644 --- a/src/systemData/tcpCommand.js +++ b/src/systemData/tcpCommand.js @@ -1,4 +1,4 @@ -const TcpCommand = { +const TcpCommand = Object.freeze({ HeartbeatRequestCommand: 0x01, HeartbeatResponseCommand: 0x02, @@ -73,16 +73,18 @@ const TcpCommand = { Authenticated: 0xF3, NotAuthenticated: 0xF4, IdentifyClient: 0xF5, - ClientIdentified: 0xF6 -}; + ClientIdentified: 0xF6, + + getName: function(v) { + return _reverseLookup[v]; + } +}); var _reverseLookup = {}; for(var n in TcpCommand) { + if (n === 'getName') continue; var v = TcpCommand[n]; _reverseLookup[v] = n; } -module.exports = TcpCommand; -module.exports.getName = function(v) { - return _reverseLookup[v]; -}; +module.exports = TcpCommand; \ No newline at end of file diff --git a/src/systemData/tcpFlags.js b/src/systemData/tcpFlags.js index fd15f1f..eabc92a 100644 --- a/src/systemData/tcpFlags.js +++ b/src/systemData/tcpFlags.js @@ -1,6 +1,6 @@ -const TcpFlags = { +const TcpFlags = Object.freeze({ None: 0x0, Authenticated: 0x01 -}; +}); module.exports = TcpFlags; diff --git a/src/systemData/tcpPackage.js b/src/systemData/tcpPackage.js index 0a781e0..d311c3e 100644 --- a/src/systemData/tcpPackage.js +++ b/src/systemData/tcpPackage.js @@ -19,8 +19,7 @@ function TcpPackage(command, flags, correlationId, login, password, data) { } TcpPackage.fromBufferSegment = function(data) { - if (data.length < MandatorySize) - throw new Error("ArraySegment too short, length: " + data.length); + if (data.length < MandatorySize) throw new Error("ArraySegment too short, length: " + data.length); var command = data.buffer[data.offset + CommandOffset]; var flags = data.buffer[data.offset + FlagsOffset]; @@ -32,13 +31,15 @@ TcpPackage.fromBufferSegment = function(data) { if ((flags & TcpFlags.Authenticated) !== 0) { var loginLen = data.buffer[data.offset + AuthOffset]; - if (AuthOffset + 1 + loginLen + 1 >= data.count) - throw new Error("Login length is too big, it doesn't fit into TcpPackage."); + if (AuthOffset + 1 + loginLen + 1 >= data.count) { + throw new Error("Login length is too big, it doesn't fit into TcpPackage."); + } login = data.buffer.toString('utf8', data.offset + AuthOffset + 1, data.offset + AuthOffset + 1 + loginLen); var passLen = data.buffer[data.offset + AuthOffset + 1 + loginLen]; - if (AuthOffset + 1 + loginLen + 1 + passLen > data.count) - throw new Error("Password length is too big, it doesn't fit into TcpPackage."); + if (AuthOffset + 1 + loginLen + 1 + passLen > data.count) { + throw new Error("Password length is too big, it doesn't fit into TcpPackage."); + } headerSize += 1 + loginLen + 1 + passLen; pass = data.buffer.toString('utf8', data.offset + AuthOffset + 1 + loginLen + 1, data.offset + headerSize); } @@ -64,8 +65,7 @@ TcpPackage.prototype.asBuffer = function() { res[AuthOffset + 1 + loginBytes.length] = passwordBytes.length; passwordBytes.copy(res, AuthOffset + 2 + loginBytes.length); - if (this.data) - this.data.copyTo(res, res.length - this.data.count); + if (this.data) this.data.copyTo(res, res.length - this.data.count); return res; } else { @@ -73,8 +73,7 @@ TcpPackage.prototype.asBuffer = function() { res[CommandOffset] = this.command; res[FlagsOffset] = this.flags; guidParse.parse(this.correlationId, res, CorrelationOffset); - if (this.data) - this.data.copyTo(res, AuthOffset); + if (this.data) this.data.copyTo(res, AuthOffset); return res; } }; diff --git a/src/transport/tcp/lengthPrefixMessageFramer.js b/src/transport/tcp/lengthPrefixMessageFramer.js index 85fd8a7..404c66e 100644 --- a/src/transport/tcp/lengthPrefixMessageFramer.js +++ b/src/transport/tcp/lengthPrefixMessageFramer.js @@ -31,8 +31,9 @@ LengthPrefixMessageFramer.prototype._parse = function(bytes) { ++this._headerBytes; if (this._headerBytes === HeaderLength) { - if (this._packageLength <= 0 || this._packageLength > this._maxPackageSize) + if (this._packageLength <= 0 || this._packageLength > this._maxPackageSize) { throw new Error(["Package size is out of bounds: ", this._packageLength, "(max: ", this._maxPackageSize, "."].join('')); + } this._messageBuffer = new Buffer(this._packageLength); } @@ -46,8 +47,9 @@ LengthPrefixMessageFramer.prototype._parse = function(bytes) { if (this._bufferIndex === this._packageLength) { - if (this._receivedHandler !== null) + if (this._receivedHandler !== null) { this._receivedHandler(createBufferSegment(this._messageBuffer, 0, this._bufferIndex)); + } this.reset(); } } @@ -69,4 +71,4 @@ LengthPrefixMessageFramer.prototype.registerMessageArrivedCallback = function(ha }; -module.exports = LengthPrefixMessageFramer; +module.exports = LengthPrefixMessageFramer; \ No newline at end of file diff --git a/src/transport/tcp/tcpConnection.js b/src/transport/tcp/tcpConnection.js index d54ef43..ba4a4b8 100644 --- a/src/transport/tcp/tcpConnection.js +++ b/src/transport/tcp/tcpConnection.js @@ -62,14 +62,11 @@ TcpConnection.prototype._trySend = function() { while(sendPiece = this._sendQueue.shift()) { buffers.push(sendPiece); bytes += sendPiece.length; - if (bytes > MaxSendPacketSize) - break; + if (bytes > MaxSendPacketSize) break; } var joinedBuffers = Buffer.concat(buffers, bytes); - if (!this._socket.write(joinedBuffers)) { - return; - } + if (!this._socket.write(joinedBuffers)) return; setImmediate(this._trySend.bind(this)); }; @@ -97,8 +94,7 @@ TcpConnection.prototype.receive = function(cb) { }; TcpConnection.prototype._tryDequeueReceivedData = function() { - if (this._receiveCallback === null || this._receiveQueue.length === 0) - return; + if (this._receiveCallback === null || this._receiveQueue.length === 0) return; var res = []; while(this._receiveQueue.length > 0) { @@ -112,8 +108,9 @@ TcpConnection.prototype._tryDequeueReceivedData = function() { callback(this, res); var bytes = 0; - for(var i=0;i"); - if (this._onError !== null) - this._onError(this, e); + if (this._onError !== null) this._onError(this, e); this._log.debug(e, message); } }; TcpPackageConnection.prototype.startReceiving = function() { - if (this._connection === null) - throw new Error("Failed connection."); + if (this._connection === null) throw new Error("Failed connection."); this._connection.receive(this._onRawDataReceived.bind(this)); }; TcpPackageConnection.prototype.enqueueSend = function(pkg) { - if (this._connection === null) - throw new Error("Failed connection."); + if (this._connection === null) throw new Error("Failed connection."); this._connection.enqueueSend(this._framer.frameData(pkg.asBufferSegment())); }; TcpPackageConnection.prototype.close = function(reason) { - if (this._connection === null) - throw new Error("Failed connection."); + if (this._connection === null) throw new Error("Failed connection."); this._connection.close(reason); }; diff --git a/src/volatileEventStoreSubscription.js b/src/volatileEventStoreSubscription.js index 6735310..59b3bf1 100644 --- a/src/volatileEventStoreSubscription.js +++ b/src/volatileEventStoreSubscription.js @@ -22,4 +22,4 @@ VolatileEventStoreSubscription.prototype.unsubscribe = function() { this._subscriptionOperation.unsubscribe(); }; -module.exports = VolatileEventStoreSubscription; +module.exports = VolatileEventStoreSubscription; \ No newline at end of file