socketry/async-websocket

how to catch and handle `Errno::EPIPE: Broken pipe`?

ksimmi opened this issue · 2 comments

ksimmi commented

Hello

I'm trying to make a client with endless negotiation loop and I want to make the client to be reconnectable. I throwing an Exception on the middle of a server process and I expect that my client going to reconnect after handling of exception. But I see a following backtrace in my terminal:

2m     warn: Async::Task [oid=0x15cc] [ec=0x15e0] [pid=1] [2024-06-28 15:11:34 +0000]
           | Task may have ended with unhandled exception.
           |   Errno::EPIPE: Broken pipe
           |   → /usr/local/lib/ruby/3.2.0/socket.rb:460 in `__write_nonblock'
           |     /usr/local/lib/ruby/3.2.0/socket.rb:460 in `write_nonblock'
           |     /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.4.0/lib/io/stream/buffered.rb:82 in `syswrite'
           |     /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.4.0/lib/io/stream/generic.rb:141 in `block in flush'
           |     /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.4.0/lib/io/stream/generic.rb:136 in `synchronize'
           |     /usr/local/bundle/ruby/3.2.0/gems/io-stream-0.4.0/lib/io/stream/generic.rb:136 in `flush'
           |     /usr/local/bundle/ruby/3.2.0/gems/protocol-websocket-0.14.0/lib/protocol/websocket/framer.rb:44 in `flush'
           |     /usr/local/bundle/ruby/3.2.0/gems/protocol-websocket-0.14.0/lib/protocol/websocket/connection.rb:61 in `flush'
           |     client.rb:42 in `block (2 levels) in run'
           |     /usr/local/bundle/ruby/3.2.0/gems/async-2.12.1/lib/async/task.rb:164 in `block in run'
           |     /usr/local/bundle/ruby/3.2.0/gems/async-2.12.1/lib/async/task.rb:377 in `block in schedule'

I tried to catch any Exception in the client and put breakpoint into rescue section but no exceptions were throwing when Errno::EPIPE: Broken pipe happened.

server code:

class Server
  def initialize(app)
    @app = app
  end

  def call(env)
    run(env) or @app.call(env)
  end

  def run(env)

    Async::WebSocket::Adapters::Rack.open(env) do |connection|

      while message = connection.read
        raise "Random Exception" if rand(10) % 2 == 0

        Console.logger.info(message)
        puts COMMANDS_MAP[message.parse[:CMD]]
      end

    ensure
      Console.logger.error(self, $!) if $!
      connection.close
    end
  end
end

use Server

run lambda {|env| [200, {}, []]}

client code:

$reconnect_attempts = 0

def run(task)
  connect_with_reconnection(task) do |connection|
    connection.write(Protocol::WebSocket::TextMessage.generate(CMD_SET_SETTINGS))
    connection.write(Protocol::WebSocket::TextMessage.generate(CMD_SUBSCRIBE))

    while true
      task.async do
        connection.write(Protocol::WebSocket::TextMessage.generate(CMD_ECHO))
        connection.flush
      end

      sleep 2
    end
  end
rescue => e
  binding.pry # breackpoint
end

def connect_with_reconnection(task)
  $reconnect_attempts += 1

  Async::WebSocket::Client.connect(Async::HTTP::Endpoint.parse(URL)) do |connection|
    yield connection
  end

rescue => e
  Console.logger.warn(self, "reconnection attempts: #{$reconnect_attempts}")
  sleep(1)
  retry
  # or 
  # run(task)

  binding.pry # breackpoint
end

Async do |task|
  run(task)
end

I have no idea and I really need help with this issue. In my expectation Errno::EPIPE: Broken pipe must throw some exception to be possible to handle it.

Thank you.

Replace this code:

def run(task)
  connect_with_reconnection(task) do |connection|
    connection.write(Protocol::WebSocket::TextMessage.generate(CMD_SET_SETTINGS))
    connection.write(Protocol::WebSocket::TextMessage.generate(CMD_SUBSCRIBE))

    while true
      # task.async do
        connection.write(Protocol::WebSocket::TextMessage.generate(CMD_ECHO))
        connection.flush
      # end

      sleep 2
    end
  end
rescue => e
  binding.pry # breackpoint
end

The exception will propagate out of your code. Otherwise, the exception can be swallowed by that task and never re-raised.

ksimmi commented

Thank you. It works.