WebSocketClientWorker silently overloaded/saturated
fommil opened this issue · 13 comments
UPDATE:
I found out what the problem is, and I'm going to be fixing it in smootoo's fork because this project seems to be dead.
The problem is that Pong
is being send without Ack
ing and that mean we can potentially fill up the network buffer and the connection will refuse to write anything until we do the ResumeWriting
/ WritingResumed
dance. The workaround is similar to #78 but also forward all Tcp.Command
messages to the connection, and force low level error handling into the worker. Ouch.
Using Acking dramatically reduces the probability of error.
The hierarchy of pipeline actors is too confusing for me to be able to propose an elegant/simple solution, sorry.
This is somewhat concerning because it means that this library effectively doesn't do any real error handling at all, so that certainly gives me pause.
Original description below:
Running this client -- against a basic WebSocket server that simply accepts anything:
import akka.actor._
import akka.io._
import spray.can.websocket._
import spray.can._
import spray.can.server.UHttp
import spray.http._
import concurrent.duration._
import akka.util.Timeout
import spray.can.websocket.frame.TextFrame
object Buggy {
// for running on a remote
val host = "myremotemachine"
val port = 9081
}
// websocket/test:run-main testing.BuggyClient
class BuggyClient(
host: String,
port: Int
) extends WebSocketClientWorker {
object Go
def upgradeRequest = HttpRequest(
HttpMethods.GET, "/",
HttpHeaders.Host(host, port) :: List(
HttpHeaders.Connection("Upgrade"),
HttpHeaders.RawHeader("Upgrade", "websocket"),
HttpHeaders.RawHeader("Sec-WebSocket-Version", "13"),
HttpHeaders.RawHeader("Sec-WebSocket-Key", "x3JJHMbDL1EzLkh9GBhXDw=="),
HttpHeaders.RawHeader("Sec-WebSocket-Extensions", "permessage-deflate")
)
)
override def preStart(): Unit = {
super.preStart()
import context.system
IO(UHttp) ! Http.Connect(host, port)
implicit val d = context.system.dispatcher
context.system.scheduler.schedule(0 seconds, 1 seconds) {
println("GO GO GO")
self ! Go
}
}
def businessLogic: Receive = {
case UpgradedToWebSocket =>
case Go =>
log.info("sending")
connection ! TextFrame("foobar" * 1024 * 1024)
case FrameCommandFailed(frame: TextFrame, _) =>
log.error("FAIL - SHOULD REALLY RETRY")
case other =>
log.error("WTF?" + other.getClass)
}
}
object BuggyClient extends App {
import BuggyServer._
val system = ActorSystem()
system.actorOf(
Props(new BuggyClient(Buggy.host, Buggy.port))
)
}
results in the following output
GO GO GO
sending
acked
GO GO GO
sending
acked
(and various permutations thereoff, then)
GO GO GO
sending
GO GO GO
sending
GO GO GO
(and then)
GO GO GO
GO GO GO
GO GO GO
GO GO GO
GO GO GO
GO GO GO
...
I expect
GO GO GO
sending
acked
repeated forever, with the occasional out-of-order message.
I'm logging akka messages and lifecycle changes at DEBUG and not seeing any actors being stopped.
I'm not even seeing any Http.CommandFailed
errors although I have on occassion seen the same log messages as reported in #1060
Any ideas what is going on?
actually, this may be a recurrence of #72 that I seen a while ago... also unsolved.
the larger I make my messages, the likelier it is to trigger the FrameCommandFailed
and if I run this with my client side backpressure PR #78 i.e. lose the scheduled task and do this
def businessLogic: Receive = {
case UpgradedToWebSocket =>
self ! Go
case Ack =>
log.info("acked")
self ! Go
case Go =>
log.info("sending")
//connection ! TextFrame("foobar" * 1024 * 1024)
connection ! Tcp.Write(FrameRender.render(TextFrame("foobar" * 1024 * 1024 * 10), Array.empty[Byte]), Ack)
case fail: FrameCommandFailed => // (frame: TextFrame, _) =>
log.error("FAIL - SHOULD REALLY RETRY")
case other =>
log.error("WTF?" + other.getClass)
}
I can see
acked
sending
repeating for ever, taking about 3 seconds per message. Sometimes I see FrameCommandFailed.
I'm putting this down as a network related problem and error handling is up to me. All the same, I would have expected akka-io to make best endeavours to retry.
There's only one way a CommandFailed(Write)
is sent and this is when the previous Write
is still (partly) pending. This is not directly related to the network but it is part of the API of akka.io's TCP implementation.
One reason this may still happen even if you use Acking is that you are not the only party sending Write
s to the connection. Are you sure the websocket implementation doesn't automatically respond to ping frames, for example?
@jrudolph yes, it will respond to ping frames. If the write is still partially pending, how can I be sure that I don't send the message again if I retry?
ok, I can now reliably reproduce the problem. All that is needed is to send a message without expecting an Ack
and another message expecting an Ack
(only the "no ack expected" message needs to be large).
extremely concerning is that resending the frame never succeeds. When this happens, the entire connection seems to be borked... but is not marked as closed.
wow, this is really weird... if I send any number of TextFrame
messages without asking for an Ack
, as long as they are 36040032
characters or less, everything is fine when I send a message asking for an Ack
. However, if my non-ack messages are 36040032
or more bytes, then I get into a very bad state where I receive FrameCommandFailed
no matter how many times I retry.
Example of the failure here (note, we fork and improves on wandoulabs... this project seems to be dead)
https://github.com/fommil/simple-spray-websockets/blob/master/src/test/scala/testing/Buggy.scala#L78
updated description with my analysis and proposed workaround.
I have a same problem, when i send many message (json) to websocket (about 1000 msg/s)
@fommil did you solved this problem?
@fommil yeah, i see the way you work around via https://github.com/fommil/simple-spray-websockets
Thanks.