tmm1/amqp

stdout error msgs, unrecoverable state of client/consumer, apparent corruption of RabbitMQ database

dyoung opened this issue · 2 comments

In lib/amqp/client.rb I found this:
when Protocol::Connection::Close
# raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
STDERR.puts "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id

That's actually handy when, for example, the AMQP broker (I use RabbitMQ 1.7.2) shuts down or becomes unavailable. Unfortunately, in some circumstances I also saw error messages passed through from RabbitMQ that indicated a queue's durability was inconsistent with the exchange--this usually happened after a reset or force_reset of RabbitMQ, but could also occur on the client (consumer) side if a client attempted to subscribe to an exchange that did not exist. It didn't matter whether I chose durable or not on the consumer side, I got the same error.

Once I got this error, it simply kept coming no matter what. The only solution was to stop and restart the client AFTER the publisher had sent a message--but there was no error thrown, so it was hard to know when to do it. Worse yet, somehow RabbitMQ itself would sometimes get corrupted after a client tried to subscribe to an exchange that didn't exist, and I had to do a "sudo rabbitmqctl force_reset" to clear all data in order to make the clients work at all.

I dealt with the most common situation by causing my clients to publish a message (content: 'ignore_me') prior to subscribing to an exchange.

This actually didn't work--the client would hang after publishing, and would not receive messages--until I moved the publish code (client-side, now) to a subprocess. That worked.

But I still occasionally ran into the issue with the error message about queue/exchange compatibility. So I un-commented the error-raising line in the code snippet above, which let me exit from the client & restart.

Unfortunately this meant my clients also died upon a broker disconnect. So I needed a good way to handle the error (a retry simply didn't work...I don't know why, but the app just hangs at that point).

Simultaneously with the above, I'd noticed that I got an annoying error when a client (actually this happened for either publisher or subscriber) couldn't initially connect to the broker. Attempting to handle this via a rescue/retry gave me a hanging app that never connected after a failure. This sort of thing could probably be handled by god or monit or something that monitors and restarts processes (whereas not raising any errors meant not handling the situation at all), but I wanted something cleaner.

So I made it all work by putting all AMQP code into a subprocess, such as the following publisher-side example:

def send_message(message, host, exchange, key)
  @exchange, @host, @message, @key = exchange, host, message, key
  cryptor = Cryptor.new
  @crypt_message = cryptor.encrypt_text @message, @key
  cryptor = nil
  begin
    fork do
      begin
        AMQP.start(:host => @host) do
          queue = MQ.fanout(@exchange, :durable => true)
          queue.publish(@crypt_message, :persistent => true)
          AMQP.stop {EM.stop}
        end
      rescue
        exit 99
      end
    end
    Process.wait
    raise "error sending message" unless $?.success?
  rescue
    sleep 5
    retry
  end

end

It works pretty much the same way on the subscriber-side. I really like the convenience of this gem, but...am I missing something, or should "do it all in subprocesses!" be part of the standard documentation? And isn't it better to raise errors than to write to STDOUT?

Even with all this, if I ever have to reset the RabbitMQ broker, I have to shut down and restart all consumer clients (because they're nearly immortal as written) or...and this is just weird...the broker will not accept any new messages. No errors get thrown on the publisher size, but queue length stays at zero.

I suppose I could make a client shut down via an AMQP message by spawning yet another subprocess that subscribes to a "shutdown" queue. I could "exit 99" from that process upon receiving such a message, then trap "CLD" from the parent process, and exit from that as well (assuming I was clever enough to verify it was the correct "child death," anyway).

Is all this really necessary? Though I suppose the way I'm handling this would probably work in any Rails environment without worrying about Phusion vs Mongrel vs Nginx vs Thin or whatever...I actually haven't tried that, but probably will.

Boils down to this: why no actual raised errors when we need them? Can that change?

Plus a bonus: what's corrupting RabbitMQ, anyway? Is it the gem, or a problem on their side?

Um...STDERR, I meant, but in a terminal window it amounts to the same thing by default...okay, dumb of me.

I'm closing this ticket because there's a lot of different issues mixed together and it just isn't possible to follow. If you want my help, create one ticket per issue. Regarding the RabbitMQ issue, the RabbitMQ guys will be happy to take a look if you'll reproduce the error on a more recent version of RabbitMQ (preferably the latest one). STDERR.puts sounds like a jolly bad idea, I'll take a look at this one as a part of work on ruby-amqp#5.