* the timeout parameters in options will trigger a timeout only when the socket is connected (cf. https://nodejs.org/api/net.html#net_socket_settimeout_timeout_callback). To handle connection timeout we have to handle it manually
175 lines
4.9 KiB
JavaScript
175 lines
4.9 KiB
JavaScript
var net = require('net');
|
|
var tls = require('tls');
|
|
var createBufferSegment = require('../../common/bufferSegment');
|
|
|
|
const MaxSendPacketSize = 64 * 1024;
|
|
|
|
function TcpConnection(log, connectionId, remoteEndPoint, onConnectionClosed) {
|
|
this._socket = null;
|
|
this._log = log;
|
|
this._connectionId = connectionId;
|
|
this._remoteEndPoint = remoteEndPoint;
|
|
this._localEndPoint = null;
|
|
this._onConnectionClosed = onConnectionClosed;
|
|
this._receiveCallback = null;
|
|
this._closed = false;
|
|
this._sendQueue = [];
|
|
this._receiveQueue = [];
|
|
|
|
Object.defineProperty(this, 'remoteEndPoint', {
|
|
enumerable: true,
|
|
get: function() {
|
|
return this._remoteEndPoint;
|
|
}
|
|
});
|
|
Object.defineProperty(this, 'localEndPoint', {
|
|
enumerable: true,
|
|
get: function() {
|
|
return this._localEndPoint;
|
|
}
|
|
});
|
|
}
|
|
|
|
TcpConnection.prototype._initSocket = function(socket) {
|
|
this._socket = socket;
|
|
this._localEndPoint = {host: socket.localAddress, port: socket.localPort};
|
|
this._remoteEndPoint.host = socket.remoteAddress;
|
|
|
|
this._socket.on('error', this._processError.bind(this));
|
|
this._socket.on('drain', this._trySend.bind(this));
|
|
this._socket.on('data', this._processReceive.bind(this));
|
|
this._socket.on('close', this._processClose.bind(this));
|
|
|
|
this._trySend();
|
|
};
|
|
|
|
TcpConnection.prototype.enqueueSend = function(bufSegmentArray) {
|
|
for(var i = 0; i < bufSegmentArray.length; i++) {
|
|
var bufSegment = bufSegmentArray[i];
|
|
this._sendQueue.push(bufSegment.toBuffer());
|
|
}
|
|
|
|
this._trySend();
|
|
};
|
|
|
|
TcpConnection.prototype._trySend = function() {
|
|
if (this._sendQueue.length === 0 || this._socket === null) return;
|
|
|
|
var buffers = [];
|
|
var bytes = 0;
|
|
var sendPiece;
|
|
while((sendPiece = this._sendQueue.shift())) {
|
|
buffers.push(sendPiece);
|
|
bytes += sendPiece.length;
|
|
if (bytes > MaxSendPacketSize) break;
|
|
}
|
|
|
|
var joinedBuffers = Buffer.concat(buffers, bytes);
|
|
if (!this._socket.write(joinedBuffers)) return;
|
|
|
|
setImmediate(this._trySend.bind(this));
|
|
};
|
|
|
|
TcpConnection.prototype._processError = function(err) {
|
|
this._closeInternal(err, "Socket error");
|
|
};
|
|
|
|
TcpConnection.prototype._processClose = function(had_error) {
|
|
this._closeInternal(had_error, "Socket closed");
|
|
};
|
|
|
|
TcpConnection.prototype._processReceive = function(buf) {
|
|
if (buf.length === 0) {
|
|
//NotifyReceiveCompleted(0);
|
|
this._closeInternal(null, "Socket closed");
|
|
return;
|
|
}
|
|
|
|
//NotifyReceiveCompleted(buf.length)
|
|
this._receiveQueue.push(buf);
|
|
|
|
this._tryDequeueReceivedData();
|
|
};
|
|
|
|
TcpConnection.prototype.receive = function(cb) {
|
|
this._receiveCallback = cb;
|
|
this._tryDequeueReceivedData();
|
|
};
|
|
|
|
TcpConnection.prototype._tryDequeueReceivedData = function() {
|
|
if (this._receiveCallback === null || this._receiveQueue.length === 0) return;
|
|
|
|
var res = [];
|
|
while(this._receiveQueue.length > 0) {
|
|
var buf = this._receiveQueue.shift();
|
|
var bufferSegment = createBufferSegment(buf);
|
|
res.push(bufferSegment);
|
|
}
|
|
var callback = this._receiveCallback;
|
|
this._receiveCallback = null;
|
|
|
|
callback(this, res);
|
|
|
|
var bytes = 0;
|
|
for(var i=0;i<res.length;i++) {
|
|
bytes += res[i].count;
|
|
}
|
|
|
|
//this._pendingReceivedBytes -= bytes;
|
|
};
|
|
|
|
TcpConnection.prototype.close = function(reason) {
|
|
this._closeInternal(null, reason || "Normal socket close.");
|
|
};
|
|
|
|
TcpConnection.prototype._closeInternal = function(err, reason) {
|
|
if (this._closed) return;
|
|
this._closed = true;
|
|
|
|
if (this._socket !== null) {
|
|
this._socket.end();
|
|
this._socket.unref();
|
|
this._socket = null;
|
|
}
|
|
|
|
if (this._onConnectionClosed !== null) this._onConnectionClosed(this, err);
|
|
};
|
|
|
|
TcpConnection.createConnectingConnection = function(
|
|
log, connectionId, remoteEndPoint, ssl, targetHost, validateServer,
|
|
connectionTimeout, onConnectionEstablished, onConnectionFailed, onConnectionClosed
|
|
) {
|
|
var connection = new TcpConnection(log, connectionId, remoteEndPoint, onConnectionClosed);
|
|
var provider = ssl ? tls : net;
|
|
var options = {
|
|
servername: targetHost,
|
|
rejectUnauthorized: validateServer,
|
|
port: remoteEndPoint.port,
|
|
host: remoteEndPoint.host,
|
|
timeout: connectionTimeout
|
|
};
|
|
var socket = provider.connect(options, function() {
|
|
socket.removeListener('error', onError);
|
|
connection._initSocket(socket);
|
|
if (onConnectionEstablished) onConnectionEstablished(connection);
|
|
});
|
|
var timer = setTimeout(function(){
|
|
log.error('TcpConnection: timeout when connecting to %j in %d ms', remoteEndPoint, connectionTimeout);
|
|
connection.close();
|
|
if (onConnectionFailed) onConnectionFailed(connection, new Error('Connection failed'));
|
|
}, connectionTimeout)
|
|
socket.once('error', onError);
|
|
function onError(err) {
|
|
clearTimeout(timer);
|
|
if (onConnectionFailed) onConnectionFailed(connection, err);
|
|
}
|
|
socket.once('connect', onConnect);
|
|
function onConnect() {
|
|
log.info('TcpConnection: successfully connected to %j', remoteEndPoint);
|
|
clearTimeout(timer);
|
|
}
|
|
return connection;
|
|
};
|
|
|
|
module.exports = TcpConnection;
|