stack level too deep (SystemStackError)
Opened this issue · 6 comments
ksimmi commented
Hi.
I’m not sure how to describe my issue. A few months ago, I implemented the MVP with async-websocket, and everything worked fine. However, last night, after about six hours of the program running, I encountered a new error. This is something I haven’t seen before, and I’m unsure how to address it.
/usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:36:in `<<': stack level too deep (SystemStackError)
from /usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:345:in `do_write'
from /usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:471:in `flush'
from /usr/local/bundle/ruby/3.2.0/gems/openssl-3.2.0/lib/openssl/buffering.rb:411:in `write_nonblock'
from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/buffered.rb:93:in `syswrite'
from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:184:in `drain'
from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:196:in `block in flush'
from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:195:in `synchronize'
from /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.6.1/lib/io/stream/generic.rb:195:in `flush'
... 954 levels...
from client.rb:78:in `keep_alive'
from client.rb:68:in `block in run'
from /usr/local/bundle/ruby/3.2.0/gems/async-2.21.1/lib/async/task.rb:197:in `block in run'
from /usr/local/bundle/ruby/3.2.0/gems/async-2.21.1/lib/async/task.rb:435:in `block in schedule'
Thank you.
ioquatix commented
Thanks for the report. Let me see if I can figure out what's going on.
ioquatix commented
Are you able to share the code in client.rb?
ksimmi commented
client.rb
# frozen_string_literal: true
require 'bundler/setup'
require 'active_support'
require 'active_support/core_ext'
require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'
require 'karafka'
require_relative 'settings'
require_relative 'callable'
require_relative 'api_interactor'
require_relative 'api_interactor/http'
require_relative 'api_interactor/token_manager'
require 'pry'
MessagePack::DefaultFactory.register_type(
MessagePack::Timestamp::TYPE, # or just -1
Time,
packer: MessagePack::Time::Packer,
unpacker: MessagePack::Time::Unpacker
)
# class KafkaApp < Karafka::App
# setup do |config|
# config.client_id = 'my_application'
# # librdkafka configuration options need to be set as symbol values
# config.kafka = {
# 'bootstrap.servers': Settings.kafka_url
# }
# end
# end
PRODUCER = WaterDrop::Producer.new do |config|
config.deliver = true
config.kafka = {
'bootstrap.servers': Settings.kafka_url,
'enable.idempotence': true
}
end
class MyClient
def initialize
@reconnection_attempts = 0
end
def run
# get_sports
WS.connect
Async do
WS.listen
end
Async do
clear_expired
end
Async do
WS.set_settings
WS.subscribe
keep_alive
end
rescue WS::ConnectionError => e
@reconnection_attempts += 1
Console.logger.warn(self, "reconnection attempts cause `Invalid Token`: #{@reconnection_attempts}")
retry
end
def keep_alive
loop do
WS.echo
@hits ||= 0
@hits += 1
PRODUCER.produce_async(topic: 'pong', payload: { hits: @hits }.to_json)
sleep(1)
end
end
def clear_expired
loop do
WS.clear_expired
sleep(10)
end
end
end
Async do |task|
MyClient.new.run
end
ioquatix commented
Are you able to tell me what is WS? And what is the implementation of WS.echo?
ksimmi commented
class WS
ConnectionError = Class.new(StandardError)
class << self
def token_manager
@token_manager ||= ApiInteractor::TokenManager.new(Settings.ws_identity_srv_url)
end
def connect
@queue = TTLHash.new
@connection = Async::WebSocket::Client.connect(
Async::HTTP::Endpoint.parse(
Settings.ws_api_url_url,
alpn_protocols: Async::HTTP::Protocol::HTTP11.names
),
headers: { Authorization: token_manager.fetch }
)
rescue Async::WebSocket::ConnectionError => e
Console.logger.error(e)
Console.logger.error(e.response) if e.response
raise ConnectionError
end
def set_settings
set_settings_response = call_command(SetSettings)
get_settings_response = call_command(GetSettings)
end
def subscribe
subscribe_response = call_command(Subscribe)
get_subscriptions_response = call_command(GetSubscriptions)
# unsubscribe_response = call_command(Unsubscribe)
end
def echo
call_command(Echo)
end
def clear_expired
@queue.clear_expired
end
def call_command(cmd)
msg = cmd.to_message
@connection.write(msg.body)
@connection.flush
@queue.set(msg.message_id, ttl: cmd.await_time_secs)
debug_info = {
cmd_name: cmd.usecase_name,
bytes: msg.body,
payload: msg.payload,
hex: msg.debug_as_hex
}
# binding.pry if cmd == GetSettings
while @queue.queued?(msg.message_id)
Console.logger.info("#{cmd.usecase_name}[#{cmd.dgn_api_code}] message awaiting #{msg.message_id}")
if @queue.expired?(msg.message_id)
Console.logger.info("#{cmd.usecase_name}[#{cmd.dgn_api_code}] message expired #{msg.message_id}")
@queue.dequeue(msg.message_id)
call_command(cmd)
end
if response = @queue.get(msg.message_id)
if response&.message
debug_info.merge!(response: response.message)
end
Console.logger.info("#{cmd.usecase_name}[#{cmd.dgn_api_code}] message handled #{msg.message_id}")
return @queue.dequeue(msg.message_id)
end
sleep(cmd.await_time_secs)
end
rescue IOError => e
Console.logger.error("[WS] #{e.class} #{e.message}")
raise e
rescue => e
binding.pry
end
def listen
@results = {}
@file_counts = 0
while message = @connection.read
msg = InboxHandler.new(Message.unwrap(message.buffer))
case
when msg.command_response?
@queue.set(msg.message_id, msg)
when msg.broadcast?
handle_broadcast(msg)
else
Console.logger.info("UNHANDLED MESSAGE TYPE RT=#{msg.rt_code} #{msg.type} #{msg} #{msg&.data}")
# raise "UNHANDLED MESSAGE #{msg} #{msg&.data}"
end
end
rescue => e
# binding.pry
end
def handle_broadcast(msg)
case
when msg.matches_broadcast?
data = WS::Broadcast::Match.handle(msg.data)
@match_id ||= data[:id]
Console.logger.info("#{msg.type} RT=#{msg.rt_code} MATCH_ID=#{@match_id}") if @match_id == data[:id]
when msg.stakes_broadcast?
data = WS::Broadcast::Stake.handle(msg.data)
@match_id = nil
@match_id ||= data.first[:event_id]
Console.logger.info("#{msg.type} RT=#{msg.rt_code} MATCH_ID=#{@match_id}") if @match_id == data.first[:event_id]
when msg.sports_broadcast?
WS::Broadcast::Sport.handle(msg.data)
Console.logger.info("#{msg.type} RT=#{msg.rt_code}")
when msg.championship_broadcast?
WS::Broadcast::Championship.handle(msg.data)
Console.logger.info("#{msg.type} RT=#{msg.rt_code}")
when msg.tournaments_broadcast?
WS::Broadcast::Tournament.handle(msg.data)
Console.logger.info("#{msg.type} RT=#{msg.rt_code}")
else
Console.logger.info("UNHANDLED BROADCAST TYPE RT=#{msg.rt_code} #{msg.type} #{msg} #{msg&.data}")
# raise "UNHANDLED BROADCAST TYPE RT=#{msg.rt_code} #{msg} #{msg&.data}"
end
rescue => e
# binding.pry
end
end
end
ksimmi commented
The problem occurs intermittently.