socketry/async-websocket

stack level too deep (SystemStackError)

Opened this issue · 6 comments

ksimmi commented

Hi.

I’m not sure how to describe my issue. A few months ago, I implemented the MVP with async-websocket, and everything worked fine. However, last night, after about six hours of the program running, I encountered a new error. This is something I haven’t seen before, and I’m unsure how to address it.

/usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:36:in `<<': stack level too deep (SystemStackError)
 from /usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:345:in `do_write'
 from /usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:471:in `flush'
 from /usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:411:in `write_nonblock'
 from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/buffered.rb:93:in `syswrite'
 from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:184:in `drain'
 from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:196:in `block in flush'
 from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:195:in `synchronize'
 from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:195:in `flush'
  ... 954 levels...
 from client.rb:78:in `keep_alive'
 from client.rb:68:in `block in run'
 from /usr/local/bundle/ruby/3.2.0/gems/async-2.21.1/lib/async/task.rb:197:in `block in run'
 from /usr/local/bundle/ruby/3.2.0/gems/async-2.21.1/lib/async/task.rb:435:in `block in schedule'

Thank you.

Thanks for the report. Let me see if I can figure out what's going on.

Are you able to share the code in client.rb?

ksimmi commented

client.rb

# frozen_string_literal: true

require 'bundler/setup'
require 'active_support'
require 'active_support/core_ext'
require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'
require 'karafka'

require_relative 'settings'

require_relative 'callable'
require_relative 'api_interactor'
require_relative 'api_interactor/http'
require_relative 'api_interactor/token_manager'

require 'pry'

MessagePack::DefaultFactory.register_type(
  MessagePack::Timestamp::TYPE, # or just -1
  Time,
  packer: MessagePack::Time::Packer,
  unpacker: MessagePack::Time::Unpacker
)


# class KafkaApp < Karafka::App
#   setup do |config|
#     config.client_id = 'my_application'
#     # librdkafka configuration options need to be set as symbol values
#     config.kafka = {
#       'bootstrap.servers': Settings.kafka_url
#     }
#   end
# end

PRODUCER = WaterDrop::Producer.new do |config|
  config.deliver = true
  config.kafka = {
    'bootstrap.servers': Settings.kafka_url,
    'enable.idempotence': true
  }
end

class MyClient

  def initialize
    @reconnection_attempts = 0
  end

  def run
    # get_sports

    WS.connect

    Async do
      WS.listen
    end

    Async do
      clear_expired
    end

    Async do
      WS.set_settings
      WS.subscribe
      keep_alive
    end

  rescue WS::ConnectionError => e
    @reconnection_attempts += 1
    Console.logger.warn(self, "reconnection attempts cause `Invalid Token`: #{@reconnection_attempts}")
    retry
  end

  def keep_alive
    loop do
      WS.echo
      @hits ||= 0
      @hits += 1
      PRODUCER.produce_async(topic: 'pong', payload: { hits: @hits }.to_json)

      sleep(1)
    end
  end

  def clear_expired
    loop do
      WS.clear_expired
      sleep(10)
    end
  end
end


Async do |task|
  MyClient.new.run
end

Are you able to tell me what is WS? And what is the implementation of WS.echo?

ksimmi commented
class WS
  ConnectionError = Class.new(StandardError)

  class << self

    def token_manager
      @token_manager ||= ApiInteractor::TokenManager.new(Settings.ws_identity_srv_url)
    end

    def connect
      @queue = TTLHash.new

      @connection = Async::WebSocket::Client.connect(
        Async::HTTP::Endpoint.parse(
          Settings.ws_api_url_url,
          alpn_protocols: Async::HTTP::Protocol::HTTP11.names
        ),
        headers: { Authorization: token_manager.fetch }
      )
      rescue Async::WebSocket::ConnectionError => e
        Console.logger.error(e)
        Console.logger.error(e.response) if e.response

        raise ConnectionError
    end

    def set_settings
      set_settings_response = call_command(SetSettings)
      get_settings_response = call_command(GetSettings)
    end

    def subscribe
      subscribe_response = call_command(Subscribe)
      get_subscriptions_response = call_command(GetSubscriptions)
      # unsubscribe_response = call_command(Unsubscribe)
    end

    def echo
      call_command(Echo)
    end

    def clear_expired
      @queue.clear_expired
    end

    def call_command(cmd)
      msg = cmd.to_message

      @connection.write(msg.body)
      @connection.flush

      @queue.set(msg.message_id, ttl: cmd.await_time_secs)

      debug_info = {
        cmd_name: cmd.usecase_name,
        bytes: msg.body,
        payload: msg.payload,
        hex: msg.debug_as_hex
      }

      # binding.pry if cmd == GetSettings

      while @queue.queued?(msg.message_id)
        Console.logger.info("#{cmd.usecase_name}[#{cmd.dgn_api_code}] message awaiting #{msg.message_id}")

        if @queue.expired?(msg.message_id)
          Console.logger.info("#{cmd.usecase_name}[#{cmd.dgn_api_code}] message expired  #{msg.message_id}")

          @queue.dequeue(msg.message_id)
          call_command(cmd)
        end

        if response = @queue.get(msg.message_id)
          if response&.message
            debug_info.merge!(response: response.message)
          end

          Console.logger.info("#{cmd.usecase_name}[#{cmd.dgn_api_code}] message handled  #{msg.message_id}")
          return @queue.dequeue(msg.message_id)
        end

        sleep(cmd.await_time_secs)
      end

    rescue IOError => e
      Console.logger.error("[WS] #{e.class} #{e.message}")
      raise e

    rescue => e
      binding.pry
    end

    def listen
      @results = {}
      @file_counts = 0

      while message = @connection.read
        msg = InboxHandler.new(Message.unwrap(message.buffer))

        case
        when msg.command_response?
          @queue.set(msg.message_id, msg)

        when msg.broadcast?
          handle_broadcast(msg)

        else
          Console.logger.info("UNHANDLED MESSAGE TYPE RT=#{msg.rt_code} #{msg.type} #{msg} #{msg&.data}")
          # raise "UNHANDLED MESSAGE #{msg} #{msg&.data}"
        end
      end
    rescue => e
      # binding.pry
    end

    def handle_broadcast(msg)
      case
      when msg.matches_broadcast?
        data = WS::Broadcast::Match.handle(msg.data)

        @match_id ||= data[:id]

        Console.logger.info("#{msg.type} RT=#{msg.rt_code} MATCH_ID=#{@match_id}") if @match_id == data[:id]

      when msg.stakes_broadcast?
        data = WS::Broadcast::Stake.handle(msg.data)

        @match_id = nil
        @match_id ||= data.first[:event_id]

        Console.logger.info("#{msg.type} RT=#{msg.rt_code} MATCH_ID=#{@match_id}") if @match_id == data.first[:event_id]

      when msg.sports_broadcast?
        WS::Broadcast::Sport.handle(msg.data)
        Console.logger.info("#{msg.type} RT=#{msg.rt_code}")

      when msg.championship_broadcast?
        WS::Broadcast::Championship.handle(msg.data)
        Console.logger.info("#{msg.type} RT=#{msg.rt_code}")

      when msg.tournaments_broadcast?
        WS::Broadcast::Tournament.handle(msg.data)
        Console.logger.info("#{msg.type} RT=#{msg.rt_code}")

      else
        Console.logger.info("UNHANDLED BROADCAST TYPE RT=#{msg.rt_code} #{msg.type} #{msg} #{msg&.data}")
        # raise "UNHANDLED BROADCAST TYPE RT=#{msg.rt_code} #{msg} #{msg&.data}"
      end

      rescue => e
        # binding.pry
      end
  end
end

The problem occurs intermittently.