gnieh/sohva

changes feed dies

Closed this issue · 6 comments

Run this:

object ChangeFeed {
  def main(args: Array[String]) {
    import scala.concurrent.duration._
    implicit val timeout = Timeout(3 minutes)
    implicit val system = ActorSystem("MySystem")
    val couch = new CouchClient()

    try {
      val db = couch.database("btce-history")
      val sub = db.changes().stream.subscribe { psf => println(psf)}
      sub.synchronized {
        sub.wait()
      }
    } finally {
      couch.shutdown()
      system.shutdown()
    }
  }
}

It successfully prints out changes for a while, but then stops printing. If you launch lots of instances but staggered by 30 sec or so, you can see that they fail to receive updates one after the other at different points in time. The time to failure doesn't seem to follow any obvious patterns.

It looks like perhaps couchdb is breaking the contact periodically and it is not being healed by sohva. Investigating...

With akka debugging enabled, I get this:

[DEBUG] [05/15/2014 22:40:09.434] [MySystem-akka.actor.default-dispatcher-4] [akka://MySystem/user/IO-HTTP/group-1/0] Closing connection due to idle timeout...
[DEBUG] [05/15/2014 22:40:09.434] [MySystem-akka.actor.default-dispatcher-2] [akka://MySystem/user/IO-HTTP/group-1/0] Connection was Aborted, awaiting TcpConnection termination...
[DEBUG] [05/15/2014 22:40:09.434] [MySystem-akka.actor.default-dispatcher-2] [akka://MySystem/user/IO-HTTP/group-1/0] TcpConnection terminated, stopping

Added some log.debug() calls within ChangeActor, and now were getting some evidence.

[DEBUG] [05/15/2014 23:07:13.390] [MySystem-akka.actor.default-dispatcher-8] [akka://MySystem/user/$a] Connected to changes stream at http://localhost:5984/trades/_changes
[DEBUG] [05/15/2014 23:07:13.457] [MySystem-akka.actor.default-dispatcher-5] [akka://MySystem/user/IO-HTTP/host-connector-0/0] Delivering 200 OK response for GET request to /
[DEBUG] [05/15/2014 23:08:13.439] [MySystem-akka.actor.default-dispatcher-2] [akka://MySystem/user/$a] Change feed has reached the end of its chunks
[DEBUG] [05/15/2014 23:09:13.816] [MySystem-akka.actor.default-dispatcher-8] [akka://MySystem/user/IO-HTTP/group-0/0] Closing connection due to idle timeout...
[DEBUG] [05/15/2014 23:09:13.825] [MySystem-akka.actor.default-dispatcher-4] [akka://MySystem/user/IO-HTTP/group-0/0] Connection was Aborted, awaiting TcpConnection termination...
[DEBUG] [05/15/2014 23:09:13.826] [MySystem-akka.actor.default-dispatcher-4] [akka://MySystem/user/IO-HTTP/group-0/0] TcpConnection terminated, stopping
[DEBUG] [05/15/2014 23:09:43.846] [MySystem-akka.actor.default-dispatcher-2] [akka://MySystem/user/IO-HTTP/host-connector-0/0] Initiating idle shutdown

So, something in the Http side is interpreting a lack of traffic on the changes http connection as a ChunkedMessageEnd.

Fixing the ChunkedMessageEnd logic fixes this bug.

The good way to go would be to add the heartbeat parameter here: https://github.com/gnieh/sohva/blob/master/sohva-client/src/main/scala/gnieh/sohva/async/ChangeStream.scala#L104

Always reconnecting on end chunk does not allow for differentiating between real end of stream and prematurely closed stream.

I've had my fix running over-night and it has kept working. Have you tested your heartbeat solution? Things start to time out after one minute, so the heartbeat needs to be set more frequently than that, say every 30 seconds.

I'd say this timeout is what appears in the documentation linked above under the timeout parameter. I don't really agree with your solution because you always try to reconnect when you receive the end chunk, and that's not what is meant. Sometimes it is a timeout, some other times it could just be that the server shut down, or that the database was deleted, or whatever reason where you don't want to (or can) reconnect.

I'm not saying your solution doesn't work here, because you reconnect as soon as you lost connection, I'm just saying it's not the semantic we want in general here, or actually, we cannot decide in the general case. If this is what you want you can do this with a monitor observer that reconnects when it receives the end of stream message.
Moreover, you may lose some changes if they occurred when you were reconnecting.

The best way to go is to avoid closing the connection at all because that's what you want, and that is what this heartbeat parameter is doing. I thought I added the heartbeat parameter when I implemented it, but I didn't, which is an error.