From 121e248dd8efa296df11875dcdfaf4043bbee9ed Mon Sep 17 00:00:00 2001 From: Nicolas Dextraze Date: Mon, 15 May 2017 12:55:14 -0700 Subject: [PATCH] #35 implement heartbeatInfo event on connection --- index.d.ts | 13 +++++++++-- package.json | 2 +- samples/subscribe-all-events.js | 5 ++++ src/core/eventStoreConnectionLogicHandler.js | 24 ++++++++++++++++++-- src/eventStoreNodeConnection.js | 3 +++ 5 files changed, 42 insertions(+), 5 deletions(-) diff --git a/index.d.ts b/index.d.ts index a4c8f0a..8bc4a6d 100644 --- a/index.d.ts +++ b/index.d.ts @@ -203,6 +203,15 @@ export interface TcpEndPoint { host: string; } +export interface HeartbeatInfo { + connectionId: string; + remoteEndPoint: TcpEndPoint; + requestSentAt: number; + requestPkgNumber: number; + responseReceivedAt: number; + responsePkgNumber: number; +} + export interface EventData { readonly eventId: string; readonly type: string; @@ -234,8 +243,8 @@ export interface EventStoreNodeConnection { setStreamMetadataRaw(stream: string, expectedMetastreamVersion: number, metadata: any, userCredentials?: UserCredentials): Promise; getStreamMetadataRaw(stream: string, userCredentials?: UserCredentials): Promise; - on(event: "connected" | "disconnected" | "reconnecting" | "closed" | "error", listener: (arg: Error | string | TcpEndPoint) => void): this; - once(event: "connected" | "disconnected" | "reconnecting" | "closed" | "error", listener: (arg: Error | string | TcpEndPoint) => void): this; + on(event: "connected" | "disconnected" | "reconnecting" | "closed" | "error" | "heartbeatInfo", listener: (arg: Error | string | TcpEndPoint | HeartbeatInfo) => void): this; + once(event: "connected" | "disconnected" | "reconnecting" | "closed" | "error" | "heartbeatInfo", listener: (arg: Error | string | TcpEndPoint | HeartbeatInfo) => void): this; } // Expose helper functions diff --git a/package.json b/package.json index 2e6a395..88b3928 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "eventstore-node", - "version": "0.1.0", + "version": "0.1.2", "description": "A port of the EventStore .Net ClientAPI to Node.js", "main": "index.js", "types": "index.d.ts", diff --git a/samples/subscribe-all-events.js b/samples/subscribe-all-events.js index 92c7fad..f8e9205 100644 --- a/samples/subscribe-all-events.js +++ b/samples/subscribe-all-events.js @@ -29,6 +29,11 @@ const connection = client.createConnection(settings, endpoint) connection.connect().catch(err => console.log(err)) +connection.on('heartbeatInfo', heartbeatInfo => { + console.log('Connected to endpoint', heartbeatInfo.remoteEndPoint) + console.log('Heartbeat latency', heartbeatInfo.responseReceivedAt - heartbeatInfo.requestSentAt) +}) + connection.once("connected", tcpEndPoint => { console.log(`Connected to eventstore at ${tcpEndPoint.host}:${tcpEndPoint.port}`) connection.subscribeToAll( diff --git a/src/core/eventStoreConnectionLogicHandler.js b/src/core/eventStoreConnectionLogicHandler.js index 2ad6281..cda53ba 100644 --- a/src/core/eventStoreConnectionLogicHandler.js +++ b/src/core/eventStoreConnectionLogicHandler.js @@ -460,7 +460,26 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti this._packageNumber += 1; if (pkg.command === TcpCommand.HeartbeatResponseCommand) + { + if (pkg.correlationId === this._heartbeatInfo.correlationId) + { + var now = Date.now(); + var heartbeatEvent = { + connectionId: this._connection.connectionId, + remoteEndPoint: this._connection.remoteEndPoint, + requestSentAt: this._heartbeatInfo.timeStamp, + requestPkgNumber: this._heartbeatInfo.lastPackageNumber, + responseReceivedAt: now, + responsePkgNumber: this._packageNumber + }; + try { + this.emit('heartbeatInfo', heartbeatEvent); + } catch(e) { + this._logDebug("IGNORED: emit heartbeat event failed.\n%s", e.stack); + } + } return; + } if (pkg.command === TcpCommand.HeartbeatRequestCommand) { this._connection.enqueueSend(new TcpPackage( @@ -640,12 +659,13 @@ EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() { if (this._heartbeatInfo.isIntervalStage) { + var correlationId = uuid.v4(); // TcpMessage.Heartbeat analog this._connection.enqueueSend(new TcpPackage( TcpCommand.HeartbeatRequestCommand, TcpFlags.None, - uuid.v4())); - this._heartbeatInfo = {lastPackageNumber: this._heartbeatInfo.lastPackageNumber, isIntervalStage: false, timeStamp: Date.now()}; + correlationId)); + this._heartbeatInfo = {correlationId: correlationId, lastPackageNumber: this._heartbeatInfo.lastPackageNumber, isIntervalStage: false, timeStamp: Date.now()}; } else { diff --git a/src/eventStoreNodeConnection.js b/src/eventStoreNodeConnection.js index cc047cb..8395ba7 100644 --- a/src/eventStoreNodeConnection.js +++ b/src/eventStoreNodeConnection.js @@ -59,6 +59,9 @@ function EventStoreNodeConnection(settings, clusterSettings, endpointDiscoverer, this._handler.on('error', function(e) { self.emit('error', e); }); + this._handler.on('heartbeatInfo', function(e) { + self.emit('heartbeatInfo', e); + }); } util.inherits(EventStoreNodeConnection, EventEmitter);