scalaz/scalaz-nio

isOpen on AsyncronousSocketChannel fails

Closed this issue · 8 comments

Really excited to see this library exists!

I've been playing around with the server, and when I accept a connection from a netcat client, after terminating the connection isOpen remains true, and there doesn't seem to be any way on the server to tell that the connection has disappeared. If I have a loop running to continue reading data from the socket, then it will spin forever. Example application is here:
https://gist.github.com/jasonmartens/91819c538608497a4dbfa547df334b98

After running, I can send data using nc -v 127.0.0.1 1337, then type in a few characters followed by ctrl-c or ctrl-d. Then the server will loop endlessly (limited here to 100) reading 0 byte buffers.

I would expect either isOpen to change to false, or no bytes to be available to read.

I will try to fix that.

@jasonmartens would you mind to test #115 ?

Here is snippet I was using

import zio.clock.Clock
import zio.console._
import zio.nio._
import zio.nio.channels.{AsynchronousServerSocketChannel, AsynchronousSocketChannel}
import zio.{App, ZIO}


object Main extends App {
  override def run(args: List[String]): ZIO[Environment, Nothing, Int] = {
    theSocket
      .flatMap(s => awaitConnection(s))
      .foldM(
        err => putStrLn(s"Execution Failed with: $err") *> ZIO.succeed(1),
        _ => ZIO.succeed(0)
      )
  }

  val theSocket =
    for {
      address <- SocketAddress.inetSocketAddress("127.0.0.1", 1337)
      socket <- AsynchronousServerSocketChannel()
      _ <- socket.bind(address)
      _ <- putStrLn("bind")
    } yield socket

  def awaitConnection(socket: AsynchronousServerSocketChannel) =
    socket.accept.flatMap(ch => putStrLn("accept") *> doWork(ch).fork).forever


  def doWork(channel: AsynchronousSocketChannel): ZIO[Console with Clock, Throwable, Unit] = {
    for {
      chunk <- channel.read(16)
      str = chunk.toArray.map(_.toChar).map(c => if (c.isControl) "[c]" else s"$c").mkString
      _ <- putStrLn(s"received: [$str] [${chunk.length}]")
    } yield ()
  }
    .whenM(channel.isOpen)
    .forever
    .ensuring(putStrLn("closing channel") *> channel.close.orDie)
}

Hi @pshemass, I've tested the changes in jasonmartens/zio-nio-server#1, and it appears to work differently, but now I get an exception in my fiber rather than a clean close.

accept
content for con 1: channel 1A
closing channel
Fiber failed.
A checked error was not handled.
java.io.IOException: Connection reset by peer
	at zio.nio.channels.AsynchronousByteChannel.$anonfun$read$3(AsynchronousChannel.scala:34)
	at zio.internal.FiberContext.evaluateNow(FiberContext.scala:705)
	at zio.internal.FiberContext.$anonfun$evaluateLater$1(FiberContext.scala:599)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Maybe I need to wrap the read(16) with something to catch the exception?

@jasonmartens read will throw IOException when connection is closed by peer.

You created new fiber with .fork in https://github.com/jasonmartens/zio-nio-server/blob/510f785af1fee0449c7cdf7464151449b0fb4d77/src/main/scala/Main.scala#L34 whatever unhandled exception will be throw fiber will fail. you need to deal somehow with those errors using those combinators https://zio.dev/docs/overview/overview_handling_errors.

that might look like this
_ <- socket.accept.flatMap(s => socketChannelWorker(i)(s).catchAll(_ => putStrLn("connection failed").fork)

Ahhh, yup that's exactly what I needed. This is much better, thanks for looking into it!

just FYI this has been move to zio-nio and released https://mvnrepository.com/artifact/dev.zio/zio-nio

Works great. Thanks.