Removed some console

Fixed issue 60
This commit is contained in:
Nicolas Dextraze 2019-02-18 09:11:51 -08:00
parent 2c272a19f5
commit 7db060af6e
7 changed files with 93 additions and 26 deletions

62
samples/issue-60.js Normal file
View File

@ -0,0 +1,62 @@
const client = require('../src/client'); // RWM: Import from npm installed package rather than from src
//const client = require("node-eventstore-client");
const resolveLinkTos = true;
function resumeEvent(event) {
return [
event.originalEvent.eventType,
[event.originalEventNumber.toNumber(), event.originalStreamId].join('@'),
event.originalPosition
].join(" ")
}
const eventAppeared = (subscription, event) => console.log("Event received", resumeEvent(event));
const subscriptionDropped = (subscription, reason, error) => console.log("Subscription dropped", reason, error);
const libeProcessingStarted = () => console.log("Live processing started.");
const credentials = new client.UserCredentials("admin", "changeit");
const settings = {
maxReconnections: 10,
reconnectionDelay: 1000, // RWM: slow down the reconnection attempts. 10 seconds to restore connection.
};
if (process.env.ENABLE_LOGGING) settings.log = console;
if (process.env.VERBOSE) settings.verboseLogging = true;
const endpoint = "tcp://localhost:1113";
const connection = client.createConnection(settings, endpoint);
connection.connect().catch(err => console.log("Connection failed", err));
connection.on('heartbeatInfo', heartbeatInfo =>
console.log('Heartbeat latency', heartbeatInfo.responseReceivedAt - heartbeatInfo.requestSentAt, 'ms')
);
connection.once("connected", tcpEndPoint => {
console.log(`Connected to eventstore at ${tcpEndPoint.host}:${tcpEndPoint.port}`);
// RWM: subscribe Stream instead of All
connection.subscribeToStreamFrom(
"test", // RWM: Stream to subscribe to
null,
resolveLinkTos,
eventAppeared,
libeProcessingStarted,
subscriptionDropped,
credentials
);
});
connection.on("error", error =>
console.log(`Error occurred on connection: ${error}`)
)
connection.on("closed", reason =>
console.log(`Connection closed, reason: ${reason}`)
)
// RWM: Handle the reconnecting event, for better awareness of what's happening
connection.on("reconnecting", msg =>
console.log(`Reconnecting, msg: ${JSON.stringify(msg, null, 4)}`)
)

View File

@ -46,7 +46,7 @@ VolatileSubscriptionOperation.prototype._inspectPackage = function(pkg) {
} }
return null; return null;
} catch(e) { } catch(e) {
console.log(e.stack); this._log.warn(e.stack);
return null; return null;
} }
}; };
@ -55,4 +55,4 @@ VolatileSubscriptionOperation.prototype._createSubscriptionObject = function(las
return new VolatileEventStoreSubscription(this, this._streamId, lastCommitPosition, lastEventNumber); return new VolatileEventStoreSubscription(this, this._streamId, lastCommitPosition, lastEventNumber);
}; };
module.exports = VolatileSubscriptionOperation; module.exports = VolatileSubscriptionOperation;

View File

@ -50,7 +50,7 @@ function EventStoreConnectionLogicHandler(esConnection, settings) {
EventEmitter.call(this); EventEmitter.call(this);
this._esConnection = esConnection; this._esConnection = esConnection;
this._settings = settings; this._settings = settings;
this._queue = new SimpleQueuedHandler(); this._queue = new SimpleQueuedHandler(this._settings.log);
this._state = ConnectionState.Init; this._state = ConnectionState.Init;
this._connectingPhase = ConnectingPhase.Invalid; this._connectingPhase = ConnectingPhase.Invalid;
this._endpointDiscoverer = null; this._endpointDiscoverer = null;
@ -722,4 +722,4 @@ EventStoreConnectionLogicHandler.prototype._logInfo = function(message){
this._settings.log.info("EventStoreConnection '%s': %s", this._esConnection.connectionName, message); this._settings.log.info("EventStoreConnection '%s': %s", this._esConnection.connectionName, message);
}; };
module.exports = EventStoreConnectionLogicHandler; module.exports = EventStoreConnectionLogicHandler;

View File

@ -4,10 +4,11 @@ function typeName(t) {
throw new TypeError('type must be a function or object, not ' + typeof t); throw new TypeError('type must be a function or object, not ' + typeof t);
} }
function SimpleQueuedHandler() { function SimpleQueuedHandler(log) {
this._handlers = {}; this._handlers = {};
this._messages = []; this._messages = [];
this._isProcessing = false; this._isProcessing = false;
this._log = log;
} }
SimpleQueuedHandler.prototype.registerHandler = function(type, handler) { SimpleQueuedHandler.prototype.registerHandler = function(type, handler) {
@ -16,7 +17,7 @@ SimpleQueuedHandler.prototype.registerHandler = function(type, handler) {
try { try {
handler(msg); handler(msg);
} catch(e) { } catch(e) {
console.log('ERROR: ', e.stack); this._log.error('handle for', type, 'failed:', e.stack);
} }
}; };
}; };
@ -41,4 +42,4 @@ SimpleQueuedHandler.prototype._processQueue = function() {
this._isProcessing = false; this._isProcessing = false;
}; };
module.exports = SimpleQueuedHandler; module.exports = SimpleQueuedHandler;

View File

@ -38,6 +38,7 @@ EventStorePersistentSubscriptionBase.prototype.start = function() {
return this._startSubscription(this._subscriptionId, this._streamId, this._bufferSize, this._userCredentials, return this._startSubscription(this._subscriptionId, this._streamId, this._bufferSize, this._userCredentials,
this._onEventAppeared.bind(this), this._onSubscriptionDropped.bind(this), this._settings) this._onEventAppeared.bind(this), this._onSubscriptionDropped.bind(this), this._settings)
.then(function(subscription) { .then(function(subscription) {
this._log.debug('Subscription started.');
self._subscription = subscription; self._subscription = subscription;
return self; return self;
}); });
@ -182,4 +183,4 @@ EventStorePersistentSubscriptionBase.prototype._dropSubscription = function(reas
} }
}; };
module.exports = EventStorePersistentSubscriptionBase; module.exports = EventStorePersistentSubscriptionBase;

View File

@ -35,16 +35,15 @@ TcpConnection.prototype._initSocket = function(socket) {
this._localEndPoint = {host: socket.localAddress, port: socket.localPort}; this._localEndPoint = {host: socket.localAddress, port: socket.localPort};
this._remoteEndPoint.host = socket.remoteAddress; this._remoteEndPoint.host = socket.remoteAddress;
this._socket.on('drain', this._trySend.bind(this));
this._socket.on('error', this._processError.bind(this)); this._socket.on('error', this._processError.bind(this));
this._socket.on('drain', this._trySend.bind(this));
this._socket.on('data', this._processReceive.bind(this)); this._socket.on('data', this._processReceive.bind(this));
this._socket.on('close', this._processClose.bind(this));
this._trySend(); this._trySend();
}; };
TcpConnection.prototype.enqueueSend = function(bufSegmentArray) { TcpConnection.prototype.enqueueSend = function(bufSegmentArray) {
//console.log(bufSegmentArray);
for(var i = 0; i < bufSegmentArray.length; i++) { for(var i = 0; i < bufSegmentArray.length; i++) {
var bufSegment = bufSegmentArray[i]; var bufSegment = bufSegmentArray[i];
this._sendQueue.push(bufSegment.toBuffer()); this._sendQueue.push(bufSegment.toBuffer());
@ -59,7 +58,7 @@ TcpConnection.prototype._trySend = function() {
var buffers = []; var buffers = [];
var bytes = 0; var bytes = 0;
var sendPiece; var sendPiece;
while(sendPiece = this._sendQueue.shift()) { while((sendPiece = this._sendQueue.shift())) {
buffers.push(sendPiece); buffers.push(sendPiece);
bytes += sendPiece.length; bytes += sendPiece.length;
if (bytes > MaxSendPacketSize) break; if (bytes > MaxSendPacketSize) break;
@ -75,6 +74,10 @@ TcpConnection.prototype._processError = function(err) {
this._closeInternal(err, "Socket error"); this._closeInternal(err, "Socket error");
}; };
TcpConnection.prototype._processClose = function(had_error) {
this._closeInternal(had_error, "Socket closed");
};
TcpConnection.prototype._processReceive = function(buf) { TcpConnection.prototype._processReceive = function(buf) {
if (buf.length === 0) { if (buf.length === 0) {
//NotifyReceiveCompleted(0); //NotifyReceiveCompleted(0);
@ -140,20 +143,21 @@ TcpConnection.createConnectingConnection = function(
var provider = ssl ? tls : net; var provider = ssl ? tls : net;
var options = { var options = {
servername: targetHost, servername: targetHost,
rejectUnauthorized: validateServer rejectUnauthorized: validateServer,
port: remoteEndPoint.port,
host: remoteEndPoint.host,
timeout: connectionTimeout
}; };
var socket = provider.connect(remoteEndPoint.port, remoteEndPoint.host, options); var socket = provider.connect(options, function() {
function onError(err) {
if (onConnectionFailed)
onConnectionFailed(connection, err);
}
socket.once('error', onError);
socket.on('connect', function() {
socket.removeListener('error', onError); socket.removeListener('error', onError);
connection._initSocket(socket); connection._initSocket(socket);
if (onConnectionEstablished) onConnectionEstablished(connection); if (onConnectionEstablished) onConnectionEstablished(connection);
}); });
socket.once('error', onError);
function onError(err) {
if (onConnectionFailed) onConnectionFailed(connection, err);
}
return connection; return connection;
}; };
module.exports = TcpConnection; module.exports = TcpConnection;

View File

@ -54,12 +54,11 @@ function TcpPackageConnection(
log.debug("TcpPackageConnection: connection to [%j, L%j, %s] failed. Error: %s.", conn.remoteEndPoint, conn.localEndPoint, connectionId, error); log.debug("TcpPackageConnection: connection to [%j, L%j, %s] failed. Error: %s.", conn.remoteEndPoint, conn.localEndPoint, connectionId, error);
connectionClosed(self, error); connectionClosed(self, error);
}, },
function (conn, had_error) { function (conn, error) {
var error; if (error === true) error = new Error('transmission error.');
if (had_error) error = new Error('transmission error.');
log.debug("TcpPackageConnection: connection [%j, L%j, %s] was closed %s", conn.remoteEndPoint, conn.localEndPoint, log.debug("TcpPackageConnection: connection [%j, L%j, %s] was closed %s", conn.remoteEndPoint, conn.localEndPoint,
connectionId, had_error ? "with error: " + error + "." : "cleanly."); connectionId, error ? "with error: " + error.stack : "cleanly.");
connectionClosed(self, error); connectionClosed(self, error);
}); });
@ -144,4 +143,4 @@ TcpPackageConnection.prototype.equals = function(other) {
}; };
module.exports = TcpPackageConnection; module.exports = TcpPackageConnection;