igrigorik/em-synchrony

redis.pipelined blocks intermittently. what am I doing wrong?

akonsu opened this issue · 16 comments

the code below stops responding on a second request to the server. it seems that the execution blocks on the call to redis.pipelined. If I remove this call, and issue commands without pipelining, or if I stop using the connection pool, then it works.
is it me or is there a bug?

require 'em-synchrony'
require 'evma_httpserver'
require 'redis/connection/synchrony'
require 'redis'

class Server < EM::Connection
  include EM::HttpServer

  def self.redis
    @@redis ||= EM::Synchrony::ConnectionPool.new(size: 10) do
      Redis.new :url=>'redis://localhost:6379'
    end
  end

  def process_http_request
    Fiber.new{
      response = EM::DelegatedHttpResponse.new self
      response.status = 200

      self.class.redis.pipelined do
        response.content = self.class.redis.sMembers 'test'
      end

      response.send_response
      close_connection_after_writing
    }.resume
  end
end

EM.synchrony do
  EM.start_server '0.0.0.0', 9999, Server
end

Hard to say.. where, or on what, does it hang?

it hangs on the redis.pipelined call. I think.

Well.. keep digging. Is it the same connection, what happens if you remove the connection pool, and so on.

without connection pool it works. if I set the pool size to 1 then it blocks on first request.

I am not very familiar with the code and to me it looks like if I have redis.pipelined with another redis call inside the pipelined block, and if I use a connection pool, then it uses more than one connection, first one for the pipelined call, and subsequent ones for each redis call inside the block. this does not sound right. so if my pool has just one connection then it blocks right away on the first call inside the pipelined block.

thank you. I saw that. Here is my test code that exhibits the problem. It blocks on the second call to redis (lpush) forever. whereas if I change the size of the pool to something more than 1, it works. Same with redis.multi.

require 'em-synchrony'
require 'em-synchrony/connection_pool'
require 'redis'
require 'redis/connection/synchrony'


EM.synchrony do
  r = EM::Synchrony::ConnectionPool.new(size: 1) { Redis.new }

  r.pipelined do
    r.lpush "foo", "s1"
  end

  EM.stop
end

I discovered this by stress testing my server. Setting the pool size to something bigger than 1 does not help because this code blocks too:

EM.synchrony do
  r = EM::Synchrony::ConnectionPool.new(size: 5) { Redis.new }

  10.times do
    r.pipelined do
      r.lpush "foo", "s1"
    end
  end

  EM.stop
end

Ok, think I see what's happening, and it's related to this:
redis/redis-rb#204

Handling this correctly would require some additional logic in the synchrony driver. I don't think there is any clean way to handle this in an agnostic way within the abstracted ConnectionPool.

Solution (b): determine the size of your pool to be larger than the number of pipelined commands.. Although, that in itself may break pipelining.

/cc @pietern (just FYI.. :)

how is this related to #150? or rather, how to make #150 work?

@igrigorik Maybe the connection pool can use the same connection if the current fiber already checked one out? It would be pretty gross though. Using the block argument for this is definitely the right way to solve this.

@pietern @akonsu actually, wait a second... Why wouldn't this work as is:

  r.pipelined do |c|
    c.lpush "foo", "s1"
    c.lpush "bar", "s2"
  end

@akonsu can you give that a try?

yes, with the parameter in the block it works, thanks. But how to make it work inside EM::Synchrony::Multi (#150)? I am trying to run several queries concurrently.

You're pipelining requests and you want to run multiple pipelines concurrently? Jeez.. :-)

Haven't tested this, but you could try something like:

work = Proc.do |c|
  c.lpush "foo", "s1"
  c.lpush "bar", "s2"
end

multi = EM::Synchrony::Multi.new
multi.add :a, r.apipeline &work
multi.add :b, r.apipeline &work
res = multi.perform

At least in theory, this should check out two connections from the pool, and pipeline two commands on each.

I am not pipelining requests. from #150:

EM.synchrony do
    multi = EM::Synchrony::Multi.new
    multi.add :a, redis.alpush('foo', 1)
    multi.add :b, redis.alpush('foo', 2)
    res = multi.perform

    p res

    EM.stop
end

this gives me an error.

You can't execute async requests against a Fiber backed pool.. for that you'll need to write your own connection pool to do extra tracking of used connections. Use pipelining.