timboudreau/acteur

Any support planned for 'push' ?

Ryan-ZA opened this issue · 6 comments

One of the best parts of the netty server is 1st order support for things like websockets and comet. Actueur looks great, but it seems very difficult to add in push support into it. Am I missing anything here and it can be done, or is it a planned feature in the future?

Thanks! In principle push is pretty dead-simple - just don't close the connection, keep a reference to the channel, and send data down the pipe whenever you need to. The connection is only closed if you close it - you can keep it open and push data down it whenever you want.

Here's a quick example of a little SSE server - things that wanted to send events would simply ask for a Publisher instance, and pass objects that should be published to all clients to its publish() method. All it does is publish the IP address of any new caller to all the current callers:

public class PushTest extends Page {
    @Inject
    public PushTest(ActeurFactory af) {
        add(af.matchPath("^events$"));
        add(af.matchMethods(Method.GET));
        add(PushActeur.class);
        super.getReponseHeaders().setContentType(
                MediaType.parse("text/event-stream").withCharset(CharsetUtil.UTF_8));
    }

    @Singleton
    public static final class Publisher {

        private final Set<Channel> channels = new CopyOnWriteArraySet<>();
        private final ObjectMapper mapper;
        @Inject
        public Publisher(ObjectMapper mapper) {
            this.mapper = mapper;
        }

        public void add(final Channel channel) {
            channel.closeFuture().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    channels.remove(channel);
                }
            });
            channels.add(channel);
        }

        public synchronized void publish(Object object) throws JsonProcessingException {
            StringBuilder sb = new StringBuilder("id: ")
                    .append(DateTimeUtils.currentTimeMillis()).append('\n')
                    .append("data: ").append(mapper.writeValueAsString(object))
                    .append("\n\n");
            ByteBuf buf = Unpooled.copiedBuffer(sb, CharsetUtil.UTF_8);
            for (Channel channel : channels) {
                channel.write(buf.copy());
            }
        }
    }

    private static class PushActeur extends Acteur implements ChannelFutureListener {
        private final Publisher publisher;
        @Inject
        PushActeur(Publisher publisher, Event evt) throws JsonProcessingException {
            this.publisher = publisher;
            setState(new RespondWith(OK));

            //publish that this connection showed up, so the demo does something
            publisher.publish(Collections.singletonMap("arrived",
                    evt.getChannel().remoteAddress().toString()));
            setResponseBodyWriter(this);
        }

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            // Add here - otherwise we can be sent events before
            // the HTTP headers have been written to the socket
            publisher.add(future.channel());
        }
    }
}

If this isn't what you're looking for, I'd love to hear what you're after.

Largely what I was looking for - thanks. However, just delaying writes on HTTP is only a part of the issue. The other side is having reliable code for consuming the push messages - and SSE doesn't seem to be supported by IE, while Websockets are. Websockets are just some headers and handshake I believe, so how difficult would it to be add in native Websocket support to your SSE example?

Doing the TCP stuff would require some changes in the plumbing, probably in PipelineFactoryImpl and UpstreamHandlerImpl, and some sort of API-level support. Since the pipeline config determines what objects are legal to send downstream, outputting stuff is probably just a matter of channel.write(websocketMessage) using a dedicated type for that. The upstream bit probably would be a bit more work - maybe the websockets sample app for Netty (I seem to remember there being one) would point the direction.

It does seem as if it should be easy to add judging by the websocket example on Netty:
https://github.com/netty/netty/blob/master/example/src/main/java/io/netty/example/http/websocketx/html5/WebSocketServer.java
The only difference appears to be adding in the Netty built in WebSocketServerProtocolHandler. I'm guessing it will be a bit more complicated in Acteur as you'd need to route only certain urls through that handler.

                public void initChannel(final SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                        new HttpRequestDecoder(),
                        new HttpObjectAggregator(65536),
                        new HttpResponseEncoder(),
                        new WebSocketServerProtocolHandler("/websocket"),
                        new CustomTextFrameHandler());
                }
public class WebSocketServerProtocolHandler
extends ChannelInboundMessageHandlerAdapter<WebSocketFrame>
This handler does all the heavy lifting for you to run a websocket server. It takes care of websocket handshaking as well as processing of control frames (Close, Ping, Pong). Text and Binary data frames are passed to the next handler in the pipeline (implemented by you) for processing. See io.netty.example.http.websocketx.html5.WebSocketServer for usage. The implementation of this handler assumes that you just want to run a websocket server and not process other types HTTP requests (like GET and POST). If you wish to support both HTTP requests and websockets in the one server, refer to the io.netty.example.http.websocketx.server.WebSocketServer example.

Hmm, I bet the API could be made as simple as @websocket on the Page subclass; under the hood, there would need to be some more complicated stuff to get things routed correctly, but it could probably be made transparent to the user.

Long since fixed - websocket support, plus processing the request as soon as the headers arrive for HTTP mode, plus SSE in a separate library, plus a websocket testing framework blather.