dcaoyuan/spray-websocket

WebSocketClientWorker silently overloaded/saturated

fommil opened this issue · 13 comments

UPDATE:

I found out what the problem is, and I'm going to be fixing it in smootoo's fork because this project seems to be dead.

The problem is that Pong is being send without Acking and that mean we can potentially fill up the network buffer and the connection will refuse to write anything until we do the ResumeWriting / WritingResumed dance. The workaround is similar to #78 but also forward all Tcp.Command messages to the connection, and force low level error handling into the worker. Ouch.

Using Acking dramatically reduces the probability of error.

The hierarchy of pipeline actors is too confusing for me to be able to propose an elegant/simple solution, sorry.

This is somewhat concerning because it means that this library effectively doesn't do any real error handling at all, so that certainly gives me pause.

Original description below:

Running this client -- against a basic WebSocket server that simply accepts anything:

import akka.actor._
import akka.io._
import spray.can.websocket._
import spray.can._
import spray.can.server.UHttp
import spray.http._
import concurrent.duration._
import akka.util.Timeout
import spray.can.websocket.frame.TextFrame

object Buggy {
  // for running on a remote
  val host = "myremotemachine"
  val port = 9081
}

// websocket/test:run-main testing.BuggyClient
class BuggyClient(
  host: String,
  port: Int
) extends WebSocketClientWorker {
  object Go

  def upgradeRequest = HttpRequest(
    HttpMethods.GET, "/",
    HttpHeaders.Host(host, port) :: List(
      HttpHeaders.Connection("Upgrade"),
      HttpHeaders.RawHeader("Upgrade", "websocket"),
      HttpHeaders.RawHeader("Sec-WebSocket-Version", "13"),
      HttpHeaders.RawHeader("Sec-WebSocket-Key", "x3JJHMbDL1EzLkh9GBhXDw=="),
      HttpHeaders.RawHeader("Sec-WebSocket-Extensions", "permessage-deflate")
    )
  )

  override def preStart(): Unit = {
    super.preStart()

    import context.system

    IO(UHttp) ! Http.Connect(host, port)

    implicit val d = context.system.dispatcher
    context.system.scheduler.schedule(0 seconds, 1 seconds) {
      println("GO GO GO")
      self ! Go
    }
  }

  def businessLogic: Receive = {
    case UpgradedToWebSocket =>
    case Go =>
      log.info("sending")
      connection ! TextFrame("foobar" * 1024 * 1024)

    case FrameCommandFailed(frame: TextFrame, _) =>
      log.error("FAIL - SHOULD REALLY RETRY")

    case other =>
      log.error("WTF?" + other.getClass)

  }
}
object BuggyClient extends App {
  import BuggyServer._
  val system = ActorSystem()
  system.actorOf(
    Props(new BuggyClient(Buggy.host, Buggy.port))
  )
}

results in the following output

GO GO GO
sending
acked
GO GO GO
sending
acked
(and various permutations thereoff, then)
GO GO GO
sending
GO GO GO
sending
GO GO GO
(and then)
GO GO GO
GO GO GO
GO GO GO
GO GO GO
GO GO GO
GO GO GO
...

I expect

GO GO GO
sending
acked

repeated forever, with the occasional out-of-order message.

I'm logging akka messages and lifecycle changes at DEBUG and not seeing any actors being stopped.

I'm not even seeing any Http.CommandFailed errors although I have on occassion seen the same log messages as reported in #1060

Any ideas what is going on?

actually, this may be a recurrence of #72 that I seen a while ago... also unsolved.

the larger I make my messages, the likelier it is to trigger the FrameCommandFailed

and if I run this with my client side backpressure PR #78 i.e. lose the scheduled task and do this

  def businessLogic: Receive = {
    case UpgradedToWebSocket =>
      self ! Go
    case Ack =>
      log.info("acked")
      self ! Go
    case Go =>
      log.info("sending")
      //connection ! TextFrame("foobar" * 1024 * 1024)
      connection ! Tcp.Write(FrameRender.render(TextFrame("foobar" * 1024 * 1024 * 10), Array.empty[Byte]), Ack)

    case fail: FrameCommandFailed => // (frame: TextFrame, _) =>
      log.error("FAIL - SHOULD REALLY RETRY")

    case other =>
      log.error("WTF?" + other.getClass)

  }

I can see

acked
sending

repeating for ever, taking about 3 seconds per message. Sometimes I see FrameCommandFailed.

I'm putting this down as a network related problem and error handling is up to me. All the same, I would have expected akka-io to make best endeavours to retry.

There's only one way a CommandFailed(Write) is sent and this is when the previous Write is still (partly) pending. This is not directly related to the network but it is part of the API of akka.io's TCP implementation.

One reason this may still happen even if you use Acking is that you are not the only party sending Writes to the connection. Are you sure the websocket implementation doesn't automatically respond to ping frames, for example?

@jrudolph yes, it will respond to ping frames. If the write is still partially pending, how can I be sure that I don't send the message again if I retry?

ok, I can now reliably reproduce the problem. All that is needed is to send a message without expecting an Ack and another message expecting an Ack (only the "no ack expected" message needs to be large).

extremely concerning is that resending the frame never succeeds. When this happens, the entire connection seems to be borked... but is not marked as closed.

wow, this is really weird... if I send any number of TextFrame messages without asking for an Ack, as long as they are 36040032 characters or less, everything is fine when I send a message asking for an Ack. However, if my non-ack messages are 36040032 or more bytes, then I get into a very bad state where I receive FrameCommandFailed no matter how many times I retry.

Example of the failure here (note, we fork and improves on wandoulabs... this project seems to be dead)

https://github.com/fommil/simple-spray-websockets/blob/master/src/test/scala/testing/Buggy.scala#L78

updated description with my analysis and proposed workaround.

I have a same problem, when i send many message (json) to websocket (about 1000 msg/s)
@fommil did you solved this problem?

@fommil yeah, i see the way you work around via https://github.com/fommil/simple-spray-websockets
Thanks.