how to catch and handle `Errno::EPIPE: Broken pipe`?
ksimmi opened this issue · 2 comments
Hello
I'm trying to make a client with endless negotiation loop and I want to make the client to be reconnectable. I throwing an Exception on the middle of a server process and I expect that my client going to reconnect after handling of exception. But I see a following backtrace in my terminal:
2m warn: Async::Task [oid=0x15cc] [ec=0x15e0] [pid=1] [2024-06-28 15:11:34 +0000]
| Task may have ended with unhandled exception.
| Errno::EPIPE: Broken pipe
| → /usr/local/lib/ruby/3.2.0/socket.rb:460 in `__write_nonblock'
| /usr/local/lib/ruby/3.2.0/socket.rb:460 in `write_nonblock'
| /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.4.0/lib/io/stream/buffered.rb:82 in `syswrite'
| /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.4.0/lib/io/stream/generic.rb:141 in `block in flush'
| /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.4.0/lib/io/stream/generic.rb:136 in `synchronize'
| /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.4.0/lib/io/stream/generic.rb:136 in `flush'
| /usr/local/bundle/ruby/3.2.0/gems/protocol-websocket-0.14.0/lib/protocol/websocket/framer.rb:44 in `flush'
| /usr/local/bundle/ruby/3.2.0/gems/protocol-websocket-0.14.0/lib/protocol/websocket/connection.rb:61 in `flush'
| client.rb:42 in `block (2 levels) in run'
| /usr/local/bundle/ruby/3.2.0/gems/async-2.12.1/lib/async/task.rb:164 in `block in run'
| /usr/local/bundle/ruby/3.2.0/gems/async-2.12.1/lib/async/task.rb:377 in `block in schedule'
I tried to catch any Exception in the client and put breakpoint into rescue section but no exceptions were throwing when Errno::EPIPE: Broken pipe happened.
server code:
class Server
def initialize(app)
@app = app
end
def call(env)
run(env) or @app.call(env)
end
def run(env)
Async::WebSocket::Adapters::Rack.open(env) do |connection|
while message = connection.read
raise "Random Exception" if rand(10) % 2 == 0
Console.logger.info(message)
puts COMMANDS_MAP[message.parse[:CMD]]
end
ensure
Console.logger.error(self, $!) if $!
connection.close
end
end
end
use Server
run lambda {|env| [200, {}, []]}
client code:
$reconnect_attempts = 0
def run(task)
connect_with_reconnection(task) do |connection|
connection.write(Protocol::WebSocket::TextMessage.generate(CMD_SET_SETTINGS))
connection.write(Protocol::WebSocket::TextMessage.generate(CMD_SUBSCRIBE))
while true
task.async do
connection.write(Protocol::WebSocket::TextMessage.generate(CMD_ECHO))
connection.flush
end
sleep 2
end
end
rescue => e
binding.pry # breackpoint
end
def connect_with_reconnection(task)
$reconnect_attempts += 1
Async::WebSocket::Client.connect(Async::HTTP::Endpoint.parse(URL)) do |connection|
yield connection
end
rescue => e
Console.logger.warn(self, "reconnection attempts: #{$reconnect_attempts}")
sleep(1)
retry
# or
# run(task)
binding.pry # breackpoint
end
Async do |task|
run(task)
end
I have no idea and I really need help with this issue. In my expectation Errno::EPIPE: Broken pipe must throw some exception to be possible to handle it.
Thank you.
Replace this code:
def run(task)
connect_with_reconnection(task) do |connection|
connection.write(Protocol::WebSocket::TextMessage.generate(CMD_SET_SETTINGS))
connection.write(Protocol::WebSocket::TextMessage.generate(CMD_SUBSCRIBE))
while true
# task.async do
connection.write(Protocol::WebSocket::TextMessage.generate(CMD_ECHO))
connection.flush
# end
sleep 2
end
end
rescue => e
binding.pry # breackpoint
endThe exception will propagate out of your code. Otherwise, the exception can be swallowed by that task and never re-raised.
Thank you. It works.