Fix leaking Elasticsearch connections in Sidekiq processes (#30450)
This commit is contained in:
		@@ -8,6 +8,7 @@ class Mastodon::SidekiqMiddleware
 | 
			
		||||
  rescue Mastodon::HostValidationError
 | 
			
		||||
    # Do not retry
 | 
			
		||||
  rescue => e
 | 
			
		||||
    clean_up_elasticsearch_connections!
 | 
			
		||||
    limit_backtrace_and_raise(e)
 | 
			
		||||
  ensure
 | 
			
		||||
    clean_up_sockets!
 | 
			
		||||
@@ -25,6 +26,32 @@ class Mastodon::SidekiqMiddleware
 | 
			
		||||
    clean_up_statsd_socket!
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # This is a hack to immediately free up unused Elasticsearch connections.
 | 
			
		||||
  #
 | 
			
		||||
  # Indeed, Chewy creates one `Elasticsearch::Client` instance per thread,
 | 
			
		||||
  # and each such client manages its long-lasting connection to
 | 
			
		||||
  # Elasticsearch.
 | 
			
		||||
  #
 | 
			
		||||
  # As far as I know, neither `chewy`,  `elasticsearch-transport` or even
 | 
			
		||||
  # `faraday` provide a reliable way to immediately close a connection, and
 | 
			
		||||
  # rely on the underlying object to be garbage-collected instead.
 | 
			
		||||
  #
 | 
			
		||||
  # Furthermore, `sidekiq` creates a new thread each time a job throws an
 | 
			
		||||
  # exception, meaning that each failure will create a new connection, and
 | 
			
		||||
  # the old one will only be closed on full garbage collection.
 | 
			
		||||
  def clean_up_elasticsearch_connections!
 | 
			
		||||
    return unless Chewy.enabled? && Chewy.current[:chewy_client].present?
 | 
			
		||||
 | 
			
		||||
    Chewy.client.transport.transport.connections.each do |connection|
 | 
			
		||||
      # NOTE: This bit of code is tailored for the HTTPClient Faraday adapter
 | 
			
		||||
      connection.connection.app.instance_variable_get(:@client)&.reset_all
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    Chewy.current.delete(:chewy_client)
 | 
			
		||||
  rescue
 | 
			
		||||
    nil
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def clean_up_redis_socket!
 | 
			
		||||
    RedisConfiguration.pool.checkin if Thread.current[:redis]
 | 
			
		||||
    Thread.current[:redis] = nil
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user