Co-authored-by: Claire <claire.github-309c@sitedethib.com> Co-authored-by: David Roetzel <david@roetzel.de>
		
			
				
	
	
		
			206 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Ruby
		
	
	
	
	
	
			
		
		
	
	
			206 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Ruby
		
	
	
	
	
	
# frozen_string_literal: true
 | 
						|
 | 
						|
require 'websocket/driver'
 | 
						|
 | 
						|
class StreamingClient
 | 
						|
  module AUTHENTICATION
 | 
						|
    SUBPROTOCOL = 1
 | 
						|
    AUTHORIZATION_HEADER = 2
 | 
						|
    QUERY_PARAMETER = 3
 | 
						|
  end
 | 
						|
 | 
						|
  class Connection
 | 
						|
    attr_reader :url, :messages, :last_error
 | 
						|
    attr_accessor :logger, :protocols
 | 
						|
 | 
						|
    def initialize(url)
 | 
						|
      @uri = URI.parse(url)
 | 
						|
      @query_params = @uri.query.present? ? URI.decode_www_form(@uri.query).to_h : {}
 | 
						|
      @protocols = nil
 | 
						|
      @headers = {}
 | 
						|
 | 
						|
      @dead = false
 | 
						|
 | 
						|
      @events_queue = Thread::Queue.new
 | 
						|
      @messages = []
 | 
						|
      @last_error = nil
 | 
						|
    end
 | 
						|
 | 
						|
    def set_header(key, value)
 | 
						|
      @headers[key] = value
 | 
						|
    end
 | 
						|
 | 
						|
    def set_query_param(key, value)
 | 
						|
      @query_params[key] = value
 | 
						|
    end
 | 
						|
 | 
						|
    def driver
 | 
						|
      return @driver if defined?(@driver)
 | 
						|
 | 
						|
      @uri.query = URI.encode_www_form(@query_params)
 | 
						|
      @url = @uri.to_s
 | 
						|
      @tcp = TCPSocket.new(@uri.host, @uri.port)
 | 
						|
 | 
						|
      @driver = WebSocket::Driver.client(self, {
 | 
						|
        protocols: @protocols,
 | 
						|
      })
 | 
						|
 | 
						|
      @headers.each_pair do |key, value|
 | 
						|
        @driver.set_header(key, value)
 | 
						|
      end
 | 
						|
 | 
						|
      at_exit do
 | 
						|
        @driver.close
 | 
						|
      end
 | 
						|
 | 
						|
      @driver.on(:open) do
 | 
						|
        @events_queue.enq({ event: :opened })
 | 
						|
      end
 | 
						|
 | 
						|
      @driver.on(:message) do |event|
 | 
						|
        @events_queue.enq({ event: :message, payload: event.data })
 | 
						|
        @messages << event.data
 | 
						|
      end
 | 
						|
 | 
						|
      @driver.on(:error) do |event|
 | 
						|
        logger&.debug(event.message)
 | 
						|
        @events_queue.enq({ event: :error, payload: event })
 | 
						|
        @last_error = event
 | 
						|
      end
 | 
						|
 | 
						|
      @driver.on(:close) do |event|
 | 
						|
        @events_queue.enq({ event: :closing, payload: event })
 | 
						|
        finalize(event)
 | 
						|
      end
 | 
						|
 | 
						|
      @thread = Thread.new do
 | 
						|
        @driver.parse(@tcp.read(1)) until @dead || @tcp.closed?
 | 
						|
      rescue Errno::ECONNRESET
 | 
						|
        # Create a synthetic close event:
 | 
						|
        close_event = WebSocket::Driver::CloseEvent.new(
 | 
						|
          WebSocket::Driver::Hybi::ERRORS[:unexpected_condition],
 | 
						|
          'Connection reset'
 | 
						|
        )
 | 
						|
 | 
						|
        finalize(close_event)
 | 
						|
      end
 | 
						|
 | 
						|
      @driver
 | 
						|
    end
 | 
						|
 | 
						|
    def wait_for_event(expected_event, timeout: 10)
 | 
						|
      Timeout.timeout(timeout) do
 | 
						|
        loop do
 | 
						|
          event = dequeue_event
 | 
						|
 | 
						|
          return nil if event.nil? && @events_queue.closed?
 | 
						|
          return event[:payload] unless event.nil? || event[:event] != expected_event
 | 
						|
        end
 | 
						|
      end
 | 
						|
    end
 | 
						|
 | 
						|
    def write(data)
 | 
						|
      @tcp.write(data)
 | 
						|
    rescue Errno::EPIPE => e
 | 
						|
      logger&.debug("EPIPE: #{e}")
 | 
						|
    end
 | 
						|
 | 
						|
    def finalize(event)
 | 
						|
      @dead = true
 | 
						|
      @events_queue.enq({ event: :closed, payload: event })
 | 
						|
      @events_queue.close
 | 
						|
      @thread.kill
 | 
						|
    end
 | 
						|
 | 
						|
    def dequeue_event
 | 
						|
      event = @events_queue.pop
 | 
						|
      logger&.debug(event) unless event.nil?
 | 
						|
      event
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  def initialize
 | 
						|
    @logger = Logger.new($stdout)
 | 
						|
    @logger.level = 'info'
 | 
						|
 | 
						|
    @connection = Connection.new("ws://#{STREAMING_HOST}:#{STREAMING_PORT}/api/v1/streaming")
 | 
						|
    @connection.logger = @logger
 | 
						|
  end
 | 
						|
 | 
						|
  def debug!
 | 
						|
    @logger.debug!
 | 
						|
  end
 | 
						|
 | 
						|
  def authenticate(access_token, authentication_method = StreamingClient::AUTHENTICATION::SUBPROTOCOL)
 | 
						|
    raise 'Invalid access_token passed to StreamingClient, expected a string' unless access_token.is_a?(String)
 | 
						|
 | 
						|
    case authentication_method
 | 
						|
    when AUTHENTICATION::QUERY_PARAMETER
 | 
						|
      @connection.set_query_param('access_token', access_token)
 | 
						|
    when AUTHENTICATION::SUBPROTOCOL
 | 
						|
      @connection.protocols = access_token
 | 
						|
    when AUTHENTICATION::AUTHORIZATION_HEADER
 | 
						|
      @connection.set_header('Authorization', "Bearer #{access_token}")
 | 
						|
    else
 | 
						|
      raise 'Invalid authentication method'
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  def connect
 | 
						|
    @connection.driver.start
 | 
						|
    @connection.wait_for_event(:opened)
 | 
						|
  end
 | 
						|
 | 
						|
  def subscribe(channel, **params)
 | 
						|
    send(Oj.dump({ type: 'subscribe', stream: channel }.merge(params)))
 | 
						|
  end
 | 
						|
 | 
						|
  def wait_for(event = nil)
 | 
						|
    @connection.wait_for_event(event)
 | 
						|
  end
 | 
						|
 | 
						|
  def wait_for_message
 | 
						|
    message = @connection.wait_for_event(:message)
 | 
						|
    event = Oj.load(message)
 | 
						|
    event['payload'] = Oj.load(event['payload']) if event['payload']
 | 
						|
 | 
						|
    event.deep_symbolize_keys
 | 
						|
  end
 | 
						|
 | 
						|
  delegate :status, :state, to: :'@connection.driver'
 | 
						|
  delegate :messages, to: :@connection
 | 
						|
 | 
						|
  def open?
 | 
						|
    state == :open
 | 
						|
  end
 | 
						|
 | 
						|
  def closing?
 | 
						|
    state == :closing
 | 
						|
  end
 | 
						|
 | 
						|
  def closed?
 | 
						|
    state == :closed
 | 
						|
  end
 | 
						|
 | 
						|
  def send(message)
 | 
						|
    @connection.driver.text(message) if open?
 | 
						|
  end
 | 
						|
 | 
						|
  def close
 | 
						|
    return if closed?
 | 
						|
 | 
						|
    @connection.driver.close unless closing?
 | 
						|
    @connection.wait_for_event(:closed)
 | 
						|
  end
 | 
						|
end
 | 
						|
 | 
						|
module StreamingClientHelper
 | 
						|
  def streaming_client
 | 
						|
    @streaming_client ||= StreamingClient.new
 | 
						|
  end
 | 
						|
end
 | 
						|
 | 
						|
RSpec.configure do |config|
 | 
						|
  config.include StreamingClientHelper, :streaming
 | 
						|
end
 |