diff --git a/src/transport/tcp/tcpConnection.js b/src/transport/tcp/tcpConnection.js index 82b1c95..865d69d 100644 --- a/src/transport/tcp/tcpConnection.js +++ b/src/transport/tcp/tcpConnection.js @@ -1,7 +1,7 @@ var net = require('net'); var createBufferSegment = require('../../common/bufferSegment'); -const MaxSendPacketSize = 64 * 1000; +const MaxSendPacketSize = 64 * 1024; function TcpConnection(log, connectionId, remoteEndPoint, onConnectionClosed) { this._socket = null; @@ -34,8 +34,11 @@ TcpConnection.prototype._initSocket = function(socket) { this._localEndPoint = {host: socket.localAddress, port: socket.localPort}; this._remoteEndPoint.host = socket.remoteAddress; + this._socket.on('drain', this._trySend.bind(this)); this._socket.on('error', this._processError.bind(this)); this._socket.on('data', this._processReceive.bind(this)); + + this._trySend(); }; TcpConnection.prototype.enqueueSend = function(bufSegmentArray) { @@ -54,19 +57,20 @@ TcpConnection.prototype._trySend = function() { var buffers = []; var bytes = 0; - var sendPiece = this._sendQueue.shift(); - while(sendPiece) { - if (bytes + sendPiece.length > MaxSendPacketSize) - break; - + var sendPiece; + while(sendPiece = this._sendQueue.shift()) { buffers.push(sendPiece); bytes += sendPiece.length; - - sendPiece = this._sendQueue.shift(); + if (bytes > MaxSendPacketSize) + break; } var joinedBuffers = Buffer.concat(buffers, bytes); - this._socket.write(joinedBuffers); + if (!this._socket.write(joinedBuffers)) { + return; + } + + setImmediate(this._trySend.bind(this)); }; TcpConnection.prototype._processError = function(err) {