Check if port/socket is available before forking in Streaming API (#9023)
Previously, the server would attempt taking port/socket in worker process, and if it was taken, fail, which made the master process create a new worker. This led to really high CPU usage if the streaming API was started when the port or socket were not available. Now, before clustering (forking) into worker processes, a test server is created and then removed to check if it can be done.
This commit is contained in:
		@@ -74,6 +74,7 @@ const startMaster = () => {
 | 
			
		||||
  if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
 | 
			
		||||
    log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  log.info(`Starting streaming API server master with ${numWorkers} workers`);
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@@ -616,16 +617,9 @@ const startWorker = (workerId) => {
 | 
			
		||||
    });
 | 
			
		||||
  }, 30000);
 | 
			
		||||
 | 
			
		||||
  if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
 | 
			
		||||
    server.listen(process.env.SOCKET || process.env.PORT, () => {
 | 
			
		||||
      fs.chmodSync(server.address(), 0o666);
 | 
			
		||||
      log.info(`Worker ${workerId} now listening on ${server.address()}`);
 | 
			
		||||
    });
 | 
			
		||||
  } else {
 | 
			
		||||
    server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => {
 | 
			
		||||
      log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`);
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
  attachServerWithConfig(server, address => {
 | 
			
		||||
    log.info(`Worker ${workerId} now listening on ${address}`);
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  const onExit = () => {
 | 
			
		||||
    log.info(`Worker ${workerId} exiting, bye bye`);
 | 
			
		||||
@@ -645,9 +639,49 @@ const startWorker = (workerId) => {
 | 
			
		||||
  process.on('uncaughtException', onError);
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
throng({
 | 
			
		||||
  workers: numWorkers,
 | 
			
		||||
  lifetime: Infinity,
 | 
			
		||||
  start: startWorker,
 | 
			
		||||
  master: startMaster,
 | 
			
		||||
const attachServerWithConfig = (server, onSuccess) => {
 | 
			
		||||
  if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
 | 
			
		||||
    server.listen(process.env.SOCKET || process.env.PORT, () => {
 | 
			
		||||
      fs.chmodSync(server.address(), 0o666);
 | 
			
		||||
 | 
			
		||||
      if (onSuccess) {
 | 
			
		||||
        onSuccess(server.address());
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
  } else {
 | 
			
		||||
    server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => {
 | 
			
		||||
      if (onSuccess) {
 | 
			
		||||
        onSuccess(`${server.address().address}:${server.address().port}`);
 | 
			
		||||
      }
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
const onPortAvailable = onSuccess => {
 | 
			
		||||
  const testServer = http.createServer();
 | 
			
		||||
 | 
			
		||||
  testServer.once('error', err => {
 | 
			
		||||
    onSuccess(err);
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  testServer.once('listening', () => {
 | 
			
		||||
    testServer.once('close', () => onSuccess());
 | 
			
		||||
    testServer.close();
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  attachServerWithConfig(testServer);
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
onPortAvailable(err => {
 | 
			
		||||
  if (err) {
 | 
			
		||||
    log.error('Could not start server, the port or socket is in use');
 | 
			
		||||
    return;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  throng({
 | 
			
		||||
    workers: numWorkers,
 | 
			
		||||
    lifetime: Infinity,
 | 
			
		||||
    start: startWorker,
 | 
			
		||||
    master: startMaster,
 | 
			
		||||
  });
 | 
			
		||||
});
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user