diff --git a/samples/issue-60.js b/samples/issue-60.js new file mode 100644 index 0000000..4cbbbd7 --- /dev/null +++ b/samples/issue-60.js @@ -0,0 +1,62 @@ +const client = require('../src/client'); // RWM: Import from npm installed package rather than from src +//const client = require("node-eventstore-client"); + +const resolveLinkTos = true; + +function resumeEvent(event) { + return [ + event.originalEvent.eventType, + [event.originalEventNumber.toNumber(), event.originalStreamId].join('@'), + event.originalPosition + ].join(" ") +} + +const eventAppeared = (subscription, event) => console.log("Event received", resumeEvent(event)); + +const subscriptionDropped = (subscription, reason, error) => console.log("Subscription dropped", reason, error); + +const libeProcessingStarted = () => console.log("Live processing started."); + +const credentials = new client.UserCredentials("admin", "changeit"); + +const settings = { + maxReconnections: 10, + reconnectionDelay: 1000, // RWM: slow down the reconnection attempts. 10 seconds to restore connection. +}; +if (process.env.ENABLE_LOGGING) settings.log = console; +if (process.env.VERBOSE) settings.verboseLogging = true; +const endpoint = "tcp://localhost:1113"; +const connection = client.createConnection(settings, endpoint); + +connection.connect().catch(err => console.log("Connection failed", err)); + +connection.on('heartbeatInfo', heartbeatInfo => + console.log('Heartbeat latency', heartbeatInfo.responseReceivedAt - heartbeatInfo.requestSentAt, 'ms') +); + +connection.once("connected", tcpEndPoint => { + console.log(`Connected to eventstore at ${tcpEndPoint.host}:${tcpEndPoint.port}`); + // RWM: subscribe Stream instead of All + connection.subscribeToStreamFrom( + "test", // RWM: Stream to subscribe to + null, + resolveLinkTos, + eventAppeared, + libeProcessingStarted, + subscriptionDropped, + credentials + ); +}); + +connection.on("error", error => + console.log(`Error occurred on connection: ${error}`) +) + +connection.on("closed", reason => + console.log(`Connection closed, reason: ${reason}`) +) + +// RWM: Handle the reconnecting event, for better awareness of what's happening +connection.on("reconnecting", msg => + console.log(`Reconnecting, msg: ${JSON.stringify(msg, null, 4)}`) +) diff --git a/src/clientOperations/volatileSubscriptionOperation.js b/src/clientOperations/volatileSubscriptionOperation.js index 099add2..957a9b6 100644 --- a/src/clientOperations/volatileSubscriptionOperation.js +++ b/src/clientOperations/volatileSubscriptionOperation.js @@ -46,7 +46,7 @@ VolatileSubscriptionOperation.prototype._inspectPackage = function(pkg) { } return null; } catch(e) { - console.log(e.stack); + this._log.warn(e.stack); return null; } }; @@ -55,4 +55,4 @@ VolatileSubscriptionOperation.prototype._createSubscriptionObject = function(las return new VolatileEventStoreSubscription(this, this._streamId, lastCommitPosition, lastEventNumber); }; -module.exports = VolatileSubscriptionOperation; \ No newline at end of file +module.exports = VolatileSubscriptionOperation; diff --git a/src/core/eventStoreConnectionLogicHandler.js b/src/core/eventStoreConnectionLogicHandler.js index 2f56075..2cabaf7 100644 --- a/src/core/eventStoreConnectionLogicHandler.js +++ b/src/core/eventStoreConnectionLogicHandler.js @@ -50,7 +50,7 @@ function EventStoreConnectionLogicHandler(esConnection, settings) { EventEmitter.call(this); this._esConnection = esConnection; this._settings = settings; - this._queue = new SimpleQueuedHandler(); + this._queue = new SimpleQueuedHandler(this._settings.log); this._state = ConnectionState.Init; this._connectingPhase = ConnectingPhase.Invalid; this._endpointDiscoverer = null; @@ -722,4 +722,4 @@ EventStoreConnectionLogicHandler.prototype._logInfo = function(message){ this._settings.log.info("EventStoreConnection '%s': %s", this._esConnection.connectionName, message); }; -module.exports = EventStoreConnectionLogicHandler; \ No newline at end of file +module.exports = EventStoreConnectionLogicHandler; diff --git a/src/core/simpleQueuedHandler.js b/src/core/simpleQueuedHandler.js index aa7e7b5..26b380c 100644 --- a/src/core/simpleQueuedHandler.js +++ b/src/core/simpleQueuedHandler.js @@ -4,10 +4,11 @@ function typeName(t) { throw new TypeError('type must be a function or object, not ' + typeof t); } -function SimpleQueuedHandler() { +function SimpleQueuedHandler(log) { this._handlers = {}; this._messages = []; this._isProcessing = false; + this._log = log; } SimpleQueuedHandler.prototype.registerHandler = function(type, handler) { @@ -16,7 +17,7 @@ SimpleQueuedHandler.prototype.registerHandler = function(type, handler) { try { handler(msg); } catch(e) { - console.log('ERROR: ', e.stack); + this._log.error('handle for', type, 'failed:', e.stack); } }; }; @@ -41,4 +42,4 @@ SimpleQueuedHandler.prototype._processQueue = function() { this._isProcessing = false; }; -module.exports = SimpleQueuedHandler; \ No newline at end of file +module.exports = SimpleQueuedHandler; diff --git a/src/eventStorePersistentSubscriptionBase.js b/src/eventStorePersistentSubscriptionBase.js index d9ec31a..90eb6b5 100644 --- a/src/eventStorePersistentSubscriptionBase.js +++ b/src/eventStorePersistentSubscriptionBase.js @@ -38,6 +38,7 @@ EventStorePersistentSubscriptionBase.prototype.start = function() { return this._startSubscription(this._subscriptionId, this._streamId, this._bufferSize, this._userCredentials, this._onEventAppeared.bind(this), this._onSubscriptionDropped.bind(this), this._settings) .then(function(subscription) { + this._log.debug('Subscription started.'); self._subscription = subscription; return self; }); @@ -182,4 +183,4 @@ EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reas } }; -module.exports = EventStorePersistentSubscriptionBase; \ No newline at end of file +module.exports = EventStorePersistentSubscriptionBase; diff --git a/src/transport/tcp/tcpConnection.js b/src/transport/tcp/tcpConnection.js index ba4a4b8..7c1891c 100644 --- a/src/transport/tcp/tcpConnection.js +++ b/src/transport/tcp/tcpConnection.js @@ -35,16 +35,15 @@ TcpConnection.prototype._initSocket = function(socket) { this._localEndPoint = {host: socket.localAddress, port: socket.localPort}; this._remoteEndPoint.host = socket.remoteAddress; - this._socket.on('drain', this._trySend.bind(this)); this._socket.on('error', this._processError.bind(this)); + this._socket.on('drain', this._trySend.bind(this)); this._socket.on('data', this._processReceive.bind(this)); + this._socket.on('close', this._processClose.bind(this)); this._trySend(); }; TcpConnection.prototype.enqueueSend = function(bufSegmentArray) { - //console.log(bufSegmentArray); - for(var i = 0; i < bufSegmentArray.length; i++) { var bufSegment = bufSegmentArray[i]; this._sendQueue.push(bufSegment.toBuffer()); @@ -59,7 +58,7 @@ TcpConnection.prototype._trySend = function() { var buffers = []; var bytes = 0; var sendPiece; - while(sendPiece = this._sendQueue.shift()) { + while((sendPiece = this._sendQueue.shift())) { buffers.push(sendPiece); bytes += sendPiece.length; if (bytes > MaxSendPacketSize) break; @@ -75,6 +74,10 @@ TcpConnection.prototype._processError = function(err) { this._closeInternal(err, "Socket error"); }; +TcpConnection.prototype._processClose = function(had_error) { + this._closeInternal(had_error, "Socket closed"); +}; + TcpConnection.prototype._processReceive = function(buf) { if (buf.length === 0) { //NotifyReceiveCompleted(0); @@ -140,20 +143,21 @@ TcpConnection.createConnectingConnection = function( var provider = ssl ? tls : net; var options = { servername: targetHost, - rejectUnauthorized: validateServer + rejectUnauthorized: validateServer, + port: remoteEndPoint.port, + host: remoteEndPoint.host, + timeout: connectionTimeout }; - var socket = provider.connect(remoteEndPoint.port, remoteEndPoint.host, options); - function onError(err) { - if (onConnectionFailed) - onConnectionFailed(connection, err); - } - socket.once('error', onError); - socket.on('connect', function() { + var socket = provider.connect(options, function() { socket.removeListener('error', onError); connection._initSocket(socket); if (onConnectionEstablished) onConnectionEstablished(connection); }); + socket.once('error', onError); + function onError(err) { + if (onConnectionFailed) onConnectionFailed(connection, err); + } return connection; }; -module.exports = TcpConnection; \ No newline at end of file +module.exports = TcpConnection; diff --git a/src/transport/tcp/tcpPackageConnection.js b/src/transport/tcp/tcpPackageConnection.js index 1ec18ba..c5eb80b 100644 --- a/src/transport/tcp/tcpPackageConnection.js +++ b/src/transport/tcp/tcpPackageConnection.js @@ -54,12 +54,11 @@ function TcpPackageConnection( log.debug("TcpPackageConnection: connection to [%j, L%j, %s] failed. Error: %s.", conn.remoteEndPoint, conn.localEndPoint, connectionId, error); connectionClosed(self, error); }, - function (conn, had_error) { - var error; - if (had_error) error = new Error('transmission error.'); + function (conn, error) { + if (error === true) error = new Error('transmission error.'); log.debug("TcpPackageConnection: connection [%j, L%j, %s] was closed %s", conn.remoteEndPoint, conn.localEndPoint, - connectionId, had_error ? "with error: " + error + "." : "cleanly."); + connectionId, error ? "with error: " + error.stack : "cleanly."); connectionClosed(self, error); }); @@ -144,4 +143,4 @@ TcpPackageConnection.prototype.equals = function(other) { }; -module.exports = TcpPackageConnection; \ No newline at end of file +module.exports = TcpPackageConnection;