ruby-amqp/bunny

Forced connection close on Bunny with ConnectionPool can't recover

fuegas opened this issue · 14 comments

Hi,

I'm running into an issue using Bunny in a Rails project. I'm using the gem connection_pool to refrain from setting up a connection every time I want to send a message. The initializer for the pool looks as follows:

BUNNY_POOL = ConnectionPool.new(size: 5) do
  connection = Bunny.new(
    ENV['AMQP_URL'] || ::ExampleApp::Application.credentials.amqp_url,
  ).tap(&:start)

  connection.create_channel
end

I have a health check route which roughly does the following:

def health
  render json: {
    web: :ok,
    redis: redis_health,
    rabbitmq: rabbitmq_health,
  }
end

The rabbitmq_health method creates a reply queue and sends a ping message to it, nothing special here.

However, when I restart RabbitMQ or close the connection from the RabbitMQ management interface I get the following error:

W, [2020-02-02T12:06:09.585404 #52051]  WARN -- #<Bunny::Session:0x7f8f7f902f20 example_dev@localhost:5672, vhost=dev, addresses=[localhost:5672]>: Recovering from connection.close (CONNECTION_FORCED - Closed via management plugin)
#<Thread:0x00007f8f7fb8e0f0@/Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:35 run> terminated with exception (report_on_exception is true):
Traceback (most recent call last):
  8: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:36:in `run_loop'
  7: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:36:in `loop'
  6: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:39:in `block in run_loop'
  5: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:92:in `run_once'
  4: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/session.rb:611:in `handle_frame'
  3: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/session.rb:862:in `clean_up_on_shutdown'
  2: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/session.rb:1029:in `maybe_shutdown_reader_loop'
  1: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:119:in `join'
/Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:119:in `join': Target thread must not be current thread (ThreadError)
E, [2020-02-02T12:06:13.437710 #52051] ERROR -- #<Bunny::Session:0x7f8f7f902f20 example_dev@localhost:5672, vhost=dev, addresses=[localhost:5672]>: I/O error in the hearbeat sender: closed stream

It tries to recover from a closed connection but then tries to join a thread which it is not allowed to join resulting in an exception and breaking that connection. From that moment that connection will return:

Bunny::ConnectionClosedError: Trying to send frame through a closed connection. Frame is #\u003cAMQ::Protocol::MethodFrame:0x00007f8f89b64638 @payload=\"\\x002\\x00\\n\\x00\\x00\\x00\\x04\\x00\\x00\\x00\\x00\", @channel=1\u003e, method class is AMQ::Protocol::Queue::Declare"

To resolve the broken connection the webservice (Puma) needs to be restarted.

Do I have a flaw in the way I'm using Bunny? Is this caused by the usage of ConnectionPool?

(Gem) Versions:

  • connection_pool: 2.2.2
  • bunny: 2.14.3
  • rabbitmq: 3.8.2 (Erlang 22.2.1)
  • ruby: 2.6.5
  • rails: 6.0.2.1
  • puma: 4.3.1

Thank you for your time.

Team RabbitMQ uses GitHub issues for specific actionable items engineers can work on. GitHub issues are not used for questions, investigations, root cause analysis, discussions of potential issues, etc (as defined by this team).

We get at least a dozen of questions through various venues every single day, often light on details.
At that rate GitHub issues can very quickly turn into a something impossible to navigate and make sense of even for our team. Because GitHub is a tool our team uses heavily nearly every day, the signal/noise ratio of issues is something we care about a lot.

Please post this to rabbitmq-users.

Thank you.

I'd only investigate issues that can be reproduced without 3rd party gems. Connection pooling can make all kinds of assumptions about the resources it pools, and pooling by definition is caching which introduces a significant layer of complexity.

According to the log Bunny did begin to recover, hit an exception and because you have Thread.report_on_exception (or Thread#report_on_exception, although that's less likely) set to true, an exception terminated the VM process.

Target thread must not be current thread (ThreadError)

looks interesting. It happened when Bunny tried to shut down its consumer work pool. I have no idea what might have caused it but since this is during shutdown, we can only try to catch/rescue this exception and ignore it… but the root cause is not this obscure assertion violation, it's the fact that Thread.report_on_exception is used. It makes no sense for applications with data service connections since those can fail at any moment and the only realistic solution is recovery.

I wonder if what we did in 7b8f24e for the heartbeat sender thread should be done ot the I/O thread. No strong opinion here but it's largely a similar problem in a different place. A PR with a similar change to the main loop thread would be considered.

Thanks for your time and elaboration @michaelklishin. After further testing I've been able to reduce the issue to the gem connection_pool. Without it the recovery works as intended. So I'll dive into the inner workings of connection_pool to see what causes this issue (and next time use the mailing list).

@fuegas if you can test with Bunny from a local repo, consider setting Thread#report_on_exception on the main I/O thread used by every Bunny connection. Or share a set of steps to reproduce so I can experiment. I have no idea what sets report_on_exception (Bunny should not do that) to true but we can at least make things less painful in those scenarios as we've done in 7b8f24e.

TIL (from mperham/connection_pool@306fd1b) that Thread.report_on_exception is enabled by default on Ruby 2.5+.

@michaelklishin : I've been able to make a minimal version to reproduce the issue:

Gemfile:

# frozen_string_literal: true

source 'https://rubygems.org'
git_source(:github) { |repo| "https://github.com/#{repo}.git" }

ruby '2.6.5'

gem 'bunny'
gem 'connection_pool'

run.rb:

require 'bunny'
require 'connection_pool'

BUNNY_POOL = ConnectionPool.new(size: 1) do
  connection = Bunny.new(
    'amqp://guest:guest@localhost/dev',
  ).tap(&:start)

  connection.create_channel
end

BUNNY_POOL.with do |bunny|
  puts bunny.status
end

loop do
  sleep 1
end

Steps to reproduce:

  1. Run ruby run.rb to start the connection and show the channel status (should print open on your terminal).
  2. Restart RabbitMQ (for example brew services restart rabbitmq)
  3. Error
W, [2020-02-03T09:18:19.530255 #33528]  WARN -- #<Bunny::Session:0x7fa36c19e2d8 guest@localhost:5672, vhost=dev, addresses=[localhost:5672]>: Recovering from connection.close (CONNECTION_FORCED - broker forced connection closure with reason 'shutdown')
#<Thread:0x00007fa36d803a00@/Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:35 run> terminated with exception (report_on_exception is true):
Traceback (most recent call last):
  8: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:36:in `run_loop'
  7: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:36:in `loop'
  6: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:39:in `block in run_loop'
  5: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:92:in `run_once'
  4: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/session.rb:611:in `handle_frame'
  3: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/session.rb:862:in `clean_up_on_shutdown'
  2: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/session.rb:1029:in `maybe_shutdown_reader_loop'
  1: from /Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:119:in `join'
/Users/fuegas/.rbenv/versions/2.6.5/lib/ruby/gems/2.6.0/gems/bunny-2.14.3/lib/bunny/reader_loop.rb:119:in `join': Target thread must not be current thread (ThreadError)

Versions:

GEM
  remote: https://rubygems.org/
  specs:
    amq-protocol (2.3.0)
    bunny (2.14.3)
      amq-protocol (~> 2.3, >= 2.3.0)
    connection_pool (2.2.2)

I've tried to dive deeper in what is happening. What I see as a difference in flow is that the unpooled version raises a ShutdownSignal which is rescued. The pooled version however arrives in the same method (raise(e) of RenderLoop), but it isn't rescued in clean_up_on_shutdown of Session.

Both pooled and unpooled are threaded and have report_on_exception set to true.

The test file I load into IRB looks as follows:

require 'bunny'
require 'connection_pool'

AMQP_URL = 'amqp://guest:guest@localhost/dev'.freeze

P1 = ConnectionPool.new(size: 1) do
  connection = Bunny.new(
    AMQP_URL,
    # log_level: ::Logger::DEBUG,
  ).tap(&:start)

  connection.create_channel
end.freeze

class BunnyPool
  def with
    yield channel
  end

  def channel
    @channel ||= begin
      connection = Bunny.new(
        AMQP_URL,
        # log_level: ::Logger::DEBUG,
      ).tap(&:start)

      connection.create_channel
    end
  end
end
P2 = BunnyPool.new

def dump_threads
  Thread.list.each {|t| puts t.inspect}; true
end


def pooled; P1.with(&:status); end
def unpooled; P2.with(&:status); end

A run from IRB consists of dumping threads using dump_threads, starting a connection using pooled or unpooled, dumping the threads using dump_threads again and then force a disconnect using rabbitmqctl close_all_connections --vhost=dev 'ByeBye!' in a terminal.

In the logs of the runs below I've replaced the gem path with __GEM_PATH__ to make it a bit easier to read. You'll see debug output added to render_loop.rb and session.rb to see what is happening in the recovery process, these are marked by the file name and approximate line number in that file.

The unpooled version recovers from a forced disconnect:

irb(main):001:0> dump_threads
#<Thread:0x00007fe87e057fa8 run>
=> true

irb(main):002:0> unpooled
[reader_loop.rb:29] Starting @thread: #<Thread:0x00007fe88011df58@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 run>
[reader_loop.rb:30] Responds to :report_on_exception? yes
[reader_loop.rb:31] report_on_exception? yes
=> :open

irb(main):003:0> dump_threads
#<Thread:0x00007fe87e057fa8 run>
#<Thread:0x00007fe88011e318@__GEM_PATH__/bunny-2.14.3/lib/bunny/heartbeat_sender.rb:46 sleep>
#<Thread:0x00007fe88011df58@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 sleep>
=> true

irb(main):004:0> 
W, [2020-02-04T22:27:13.634373 #60922]  WARN -- #<Bunny::Session:0x7fe8800cda80 guest@localhost:5672, vhost=dev, addresses=[localhost:5672]>: Recovering from connection.close (CONNECTION_FORCED - ByeBye!)
[session.rb:1016] @reader_loop exists
[session.rb:1019] Threaded? yes
[reader_loop.rb:119] Raising Bunny::ShutdownSignal on #<Thread:0x00007fe88011df58@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 run> from #<Thread:0x00007fe88011df58@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 run>
[session.rb:865] Rescued ShutdownSignal: Bunny::ShutdownSignal
W, [2020-02-04T22:27:13.634611 #60922]  WARN -- #<Bunny::Session:0x7fe8800cda80 guest@localhost:5672, vhost=dev, addresses=[localhost:5672]>: Will recover from a network failure (no retry limit)...
W, [2020-02-04T22:27:23.638684 #60922]  WARN -- #<Bunny::Session:0x7fe8800cda80 guest@localhost:5672, vhost=dev, addresses=[localhost:5672]>: Retrying connection on next host in line: localhost:5672
[reader_loop.rb:29] Starting @thread: #<Thread:0x00007fe87f08a100@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 run>
[reader_loop.rb:30] Responds to :report_on_exception? yes
[reader_loop.rb:31] report_on_exception? yes

The pooled version does not recover from a forced disconnect:

irb(main):001:0> dump_threads
#<Thread:0x00007fb26d85c010 run>
=> true

irb(main):002:0> pooled
[reader_loop.rb:29] Starting @thread: #<Thread:0x00007fb26d8fb610@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 run>
[reader_loop.rb:30] Responds to :report_on_exception? yes
[reader_loop.rb:31] report_on_exception? yes
=> :open

irb(main):003:0> dump_threads
#<Thread:0x00007fb26d85c010 run>
#<Thread:0x00007fb26d8fba70@__GEM_PATH__/bunny-2.14.3/lib/bunny/heartbeat_sender.rb:46 sleep>
#<Thread:0x00007fb26d8fb610@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 sleep>
=> true

irb(main):004:0> 
W, [2020-02-04T22:28:03.347802 #61383]  WARN -- #<Bunny::Session:0x7fb2708713b8 guest@localhost:5672, vhost=dev, addresses=[localhost:5672]>: Recovering from connection.close (CONNECTION_FORCED - ByeBye!)
[session.rb:1016] @reader_loop exists
[session.rb:1019] Threaded? yes
[reader_loop.rb:119] Raising Bunny::ShutdownSignal on #<Thread:0x00007fb26d8fb610@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 run> from #<Thread:0x00007fb26d8fb610@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 run>
[session.rb:1029] Post terminate_with
[session.rb:1033] Joining @reader_loop!
[reader_loop.rb:124] Joining @thread: #<Thread:0x00007fb26d8fb610@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 run> from #<Thread:0x00007fb26d8fb610@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 run>

#<Thread:0x00007fb26d8fb610@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:39 run> terminated with exception (report_on_exception is true):
Traceback (most recent call last):
  8: from __GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:40:in `run_loop'
  7: from __GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:40:in `loop'
  6: from __GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:43:in `block in run_loop'
  5: from __GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:96:in `run_once'
  4: from __GEM_PATH__/bunny-2.14.3/lib/bunny/session.rb:611:in `handle_frame'
  3: from __GEM_PATH__/bunny-2.14.3/lib/bunny/session.rb:862:in `clean_up_on_shutdown'
  2: from __GEM_PATH__/bunny-2.14.3/lib/bunny/session.rb:1034:in `maybe_shutdown_reader_loop'
  1: from __GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:125:in `join'
__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:125:in `join': Target thread must not be current thread (ThreadError)

E, [2020-02-04T22:28:24.748168 #61383] ERROR -- #<Bunny::Session:0x7fb2708713b8 guest@localhost:5672, vhost=dev, addresses=[localhost:5672]>: I/O error in the hearbeat sender: closed stream

If I change join in reader_loop into:

    def join
      puts "[#{::File.basename(__FILE__)}:#{__LINE__}] Joining @thread: #{@thread.inspect} from #{Thread.current.inspect}"
      if @thread == Thread.current
        puts "[#{::File.basename(__FILE__)}:#{__LINE__}] Not joining current Thread"
        return
      end
      @thread.join if @thread
    end

Then the recovery works as intended:

irb(main):001:0> dump_threads
#<Thread:0x00007fbe56060010 run>
=> true

irb(main):002:0> pooled
[reader_loop.rb:28] Starting @thread: #<Thread:0x00007fbe57125328@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:38 run>
[reader_loop.rb:29] Responds to :report_on_exception? yes
[reader_loop.rb:30] report_on_exception? yes
=> :open

irb(main):003:0> dump_threads
#<Thread:0x00007fbe56060010 run>
#<Thread:0x00007fbe57125620@__GEM_PATH__/bunny-2.14.3/lib/bunny/heartbeat_sender.rb:46 sleep>
#<Thread:0x00007fbe57125328@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:38 sleep>
=> true

irb(main):004:0> 
W, [2020-02-04T23:02:37.258200 #69122]  WARN -- #<Bunny::Session:0x7fbe570d4ea0 guest@localhost:5672, vhost=dev, addresses=[localhost:5672]>: Recovering from connection.close (CONNECTION_FORCED - ByeBye!)
[session.rb:1016] @reader_loop exists
[session.rb:1019] Threaded? yes
[reader_loop.rb:118] Raising Bunny::ShutdownSignal on #<Thread:0x00007fbe57125328@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:38 run> from #<Thread:0x00007fbe57125328@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:38 run>
[session.rb:1029] Post terminate_with
[session.rb:1033] Joining @reader_loop!
[reader_loop.rb:123] Joining @thread: #<Thread:0x00007fbe57125328@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:38 run> from #<Thread:0x00007fbe57125328@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:38 run>
[reader_loop.rb:125] Not joining current Thread
W, [2020-02-04T23:02:37.258443 #69122]  WARN -- #<Bunny::Session:0x7fbe570d4ea0 guest@localhost:5672, vhost=dev, addresses=[localhost:5672]>: Will recover from a network failure (no retry limit)...
W, [2020-02-04T23:02:47.258908 #69122]  WARN -- #<Bunny::Session:0x7fbe570d4ea0 guest@localhost:5672, vhost=dev, addresses=[localhost:5672]>: Retrying connection on next host in line: localhost:5672
[reader_loop.rb:28] Starting @thread: #<Thread:0x00007fbe560f5430@__GEM_PATH__/bunny-2.14.3/lib/bunny/reader_loop.rb:38 run>
[reader_loop.rb:29] Responds to :report_on_exception? yes
[reader_loop.rb:30] report_on_exception? yes

@michaelklishin: What is your opinion of changing

    def join
      @thread.join if @thread
    end

into

    def join
      @thread.join if @thread && @thread != Thread.current
    end

?

@fuegas yes, it matters in Ruby in what thread the exception was originated. In addition, Thread#join forces a re-raise of a thrown unhandled exception in the thread that joins it.

I'd be happy to consider and test the change you are proposing but it would be great to get to the bottom of this. Semantically it makes sense to not join a thread with itself. I wonder if that could make a previously unhandled exception to be re-raised at the moment when the code absolutely does not expect it?

While we don't need it, while researching the state of Ruby concurrency primitives in 2020 I've discovered Thread.handle_interrupt.

After further investigation, I've found (https://github.com/mperham/connection_pool/blob/master/lib/connection_pool.rb#L58):

  def with(options = {})
    Thread.handle_interrupt(Exception => :never) do
      conn = checkout(options)
      begin
        Thread.handle_interrupt(Exception => :immediate) do
          yield conn
        end
      ensure
        checkin
      end
    end
  end

This means that the connection is made while inside Thread.handle_interrupt(Exception => :never), the block is yielded in Thread.handle_interrupt(Exception => :immediate). This means the exception raised in raise(e) of RenderLoop is caught by Thread.handle_interrupt(Exception => :never) which explains why I do not see it being rescued in the log.

Oh, great find. Man that's a bold decision to put something like that in a generic library.

OK, since we have a sensible workaround and enough understanding, please submit a PR and I will test it a bit. Assuming no issues pop up, we can merge it and produce a release 👏 Thank you so much for sticking with the problem!

Thank you as well for giving valuable pointers that helped me get to the core of the issue 👍

#590 has shipped in 2.14.4