#35 implement heartbeatInfo event on connection

This commit is contained in:
Nicolas Dextraze 2017-05-15 12:55:14 -07:00
parent 917b89cf3d
commit 121e248dd8
5 changed files with 42 additions and 5 deletions

13
index.d.ts vendored
View File

@ -203,6 +203,15 @@ export interface TcpEndPoint {
host: string; host: string;
} }
export interface HeartbeatInfo {
connectionId: string;
remoteEndPoint: TcpEndPoint;
requestSentAt: number;
requestPkgNumber: number;
responseReceivedAt: number;
responsePkgNumber: number;
}
export interface EventData { export interface EventData {
readonly eventId: string; readonly eventId: string;
readonly type: string; readonly type: string;
@ -234,8 +243,8 @@ export interface EventStoreNodeConnection {
setStreamMetadataRaw(stream: string, expectedMetastreamVersion: number, metadata: any, userCredentials?: UserCredentials): Promise<WriteResult>; setStreamMetadataRaw(stream: string, expectedMetastreamVersion: number, metadata: any, userCredentials?: UserCredentials): Promise<WriteResult>;
getStreamMetadataRaw(stream: string, userCredentials?: UserCredentials): Promise<RawStreamMetadataResult>; getStreamMetadataRaw(stream: string, userCredentials?: UserCredentials): Promise<RawStreamMetadataResult>;
on(event: "connected" | "disconnected" | "reconnecting" | "closed" | "error", listener: (arg: Error | string | TcpEndPoint) => void): this; on(event: "connected" | "disconnected" | "reconnecting" | "closed" | "error" | "heartbeatInfo", listener: (arg: Error | string | TcpEndPoint | HeartbeatInfo) => void): this;
once(event: "connected" | "disconnected" | "reconnecting" | "closed" | "error", listener: (arg: Error | string | TcpEndPoint) => void): this; once(event: "connected" | "disconnected" | "reconnecting" | "closed" | "error" | "heartbeatInfo", listener: (arg: Error | string | TcpEndPoint | HeartbeatInfo) => void): this;
} }
// Expose helper functions // Expose helper functions

View File

@ -1,6 +1,6 @@
{ {
"name": "eventstore-node", "name": "eventstore-node",
"version": "0.1.0", "version": "0.1.2",
"description": "A port of the EventStore .Net ClientAPI to Node.js", "description": "A port of the EventStore .Net ClientAPI to Node.js",
"main": "index.js", "main": "index.js",
"types": "index.d.ts", "types": "index.d.ts",

View File

@ -29,6 +29,11 @@ const connection = client.createConnection(settings, endpoint)
connection.connect().catch(err => console.log(err)) connection.connect().catch(err => console.log(err))
connection.on('heartbeatInfo', heartbeatInfo => {
console.log('Connected to endpoint', heartbeatInfo.remoteEndPoint)
console.log('Heartbeat latency', heartbeatInfo.responseReceivedAt - heartbeatInfo.requestSentAt)
})
connection.once("connected", tcpEndPoint => { connection.once("connected", tcpEndPoint => {
console.log(`Connected to eventstore at ${tcpEndPoint.host}:${tcpEndPoint.port}`) console.log(`Connected to eventstore at ${tcpEndPoint.host}:${tcpEndPoint.port}`)
connection.subscribeToAll( connection.subscribeToAll(

View File

@ -460,7 +460,26 @@ EventStoreConnectionLogicHandler.prototype._handleTcpPackage = function(connecti
this._packageNumber += 1; this._packageNumber += 1;
if (pkg.command === TcpCommand.HeartbeatResponseCommand) if (pkg.command === TcpCommand.HeartbeatResponseCommand)
{
if (pkg.correlationId === this._heartbeatInfo.correlationId)
{
var now = Date.now();
var heartbeatEvent = {
connectionId: this._connection.connectionId,
remoteEndPoint: this._connection.remoteEndPoint,
requestSentAt: this._heartbeatInfo.timeStamp,
requestPkgNumber: this._heartbeatInfo.lastPackageNumber,
responseReceivedAt: now,
responsePkgNumber: this._packageNumber
};
try {
this.emit('heartbeatInfo', heartbeatEvent);
} catch(e) {
this._logDebug("IGNORED: emit heartbeat event failed.\n%s", e.stack);
}
}
return; return;
}
if (pkg.command === TcpCommand.HeartbeatRequestCommand) if (pkg.command === TcpCommand.HeartbeatRequestCommand)
{ {
this._connection.enqueueSend(new TcpPackage( this._connection.enqueueSend(new TcpPackage(
@ -640,12 +659,13 @@ EventStoreConnectionLogicHandler.prototype._manageHeartbeats = function() {
if (this._heartbeatInfo.isIntervalStage) if (this._heartbeatInfo.isIntervalStage)
{ {
var correlationId = uuid.v4();
// TcpMessage.Heartbeat analog // TcpMessage.Heartbeat analog
this._connection.enqueueSend(new TcpPackage( this._connection.enqueueSend(new TcpPackage(
TcpCommand.HeartbeatRequestCommand, TcpCommand.HeartbeatRequestCommand,
TcpFlags.None, TcpFlags.None,
uuid.v4())); correlationId));
this._heartbeatInfo = {lastPackageNumber: this._heartbeatInfo.lastPackageNumber, isIntervalStage: false, timeStamp: Date.now()}; this._heartbeatInfo = {correlationId: correlationId, lastPackageNumber: this._heartbeatInfo.lastPackageNumber, isIntervalStage: false, timeStamp: Date.now()};
} }
else else
{ {

View File

@ -59,6 +59,9 @@ function EventStoreNodeConnection(settings, clusterSettings, endpointDiscoverer,
this._handler.on('error', function(e) { this._handler.on('error', function(e) {
self.emit('error', e); self.emit('error', e);
}); });
this._handler.on('heartbeatInfo', function(e) {
self.emit('heartbeatInfo', e);
});
} }
util.inherits(EventStoreNodeConnection, EventEmitter); util.inherits(EventStoreNodeConnection, EventEmitter);