python-websockets/websockets

Asking for Guidance in Writing a WebsocketHandler for the Python logging Module

jbdyn opened this issue ยท 3 comments

Hi! ๐Ÿ‘‹

In my project, I heavily rely on websockets for exchanging synchronization data and it works like a charm. โค๏ธ
Especially, there is a local websocket server running as a background service to which several user-facing apps connect to. The reason for this is to have only one outgoing websocket connection from this background service to a remote server distributing sync data to other clients.

Now I need centralized logging, so that logs of individual apps can end up in one file.
Since I already have a websocket server running in the background, it might be cool to have a WebsocketHandler similar to SocketHandler already shipping with the logging module.
I know that for this, a websocket connection is a bit over the top as it is only used one-way (logs from user-facing app to the background websocket server), but otherwise I would need to manage a separate TCP Server.

For testing, I used a slightly modified version of the example script for an echo server from the docs:

๐Ÿ Server Script
#!/usr/bin/env python

import asyncio

from websockets.server import serve


async def print_message(websocket):
    async for message in websocket:
        print(message)


async def main():
    async with serve(print_message, "localhost", 8000):
        await asyncio.Future()  # run forever


try:
    asyncio.run(main())
except KeyboardInterrupt:
    print("closing")

My criteria for a good WebsocketHandler are:

  • graceful shutdown on program exit
  • graceful handling of connection interruption (i.e. killing the server) and start sending logs again upon reconnect
  • handling of graceful server shutdown

I tried to implement such a handler in two ways, however, I am not really happy with both:

Threading Client API

I basically re-used the implementation of SocketHandler

๐Ÿ WebsocketThreadingHandler Class
import logging
import pickle
import struct
import time

from websockets.sync.client import connect

class WebsocketThreadingHandler(logging.Handler):
    def __init__(self, uri: str):
        self.uri = uri
        self.sock = None
        self.retryTime = None
        #
        # Exponential backoff parameters.
        #
        self.retryStart = 1.0
        self.retryMax = 30.0
        self.retryFactor = 2.0

        super().__init__()

    def makePickle(self, record):
        """
        Pickles the record in binary format with a length prefix, and
        returns it ready for transmission across the socket.
        """
        ei = record.exc_info
        if ei:
            # just to get traceback text into record.exc_text ...
            dummy = self.format(record)
        # See issue #14436: If msg or args are objects, they may not be
        # available on the receiving end. So we convert the msg % args
        # to a string, save it as msg and zap the args.
        d = dict(record.__dict__)
        d["msg"] = record.getMessage()
        d["args"] = None
        d["exc_info"] = None
        # Issue #25685: delete 'message' if present: redundant with 'msg'
        d.pop("message", None)
        s = pickle.dumps(d, 1)
        slen = struct.pack(">L", len(s))
        return slen + s

    def createSocket(self):
        """
        Try to create a socket, using an exponential backoff with
        a max retry time. Thanks to Robert Olson for the original patch
        (SF #815911) which has been slightly refactored.
        """
        now = time.time()
        # Either retryTime is None, in which case this
        # is the first time back after a disconnect, or
        # we've waited long enough.
        if self.retryTime is None:
            attempt = True
        else:
            attempt = now >= self.retryTime
        if attempt:
            try:
                print("try create socket")
                self.sock = connect(self.uri)  # <-- here, the recv_events_thread starts
                print("create socket")
                self.retryTime = None  # next time, no delay before trying
            except OSError:
                # Creation failed, so set the retry time and return.
                if self.retryTime is None:
                    self.retryPeriod = self.retryStart
                else:
                    self.retryPeriod = self.retryPeriod * self.retryFactor
                    if self.retryPeriod > self.retryMax:
                        self.retryPeriod = self.retryMax
                self.retryTime = now + self.retryPeriod

    def send(self, s):
        if self.sock is None:
            self.createSocket()

        if self.sock:
            try:
                self.sock.send(s)
                print("emit")
            except (Exception, KeyboardInterrupt):
                self.closeSocket()

    def emit(self, record):
        """This gets called on every new log record."""
        try:
            s = self.makePickle(record)
            self.send(s)
        except Exception:
            self.handleError(record)

    def closeSocket(self):
        print("close socket")
        self.sock.close()
        self.sock = None

    def handleError(self, record):
        print("HANDLE ERROR")
        with self.lock:
            if self.sock:
                self.closeSocket()
            else:
                super().handleError(record)

    def close(self):
        with self.lock:
            if self.sock:
                self.closeSocket()

        super().close()
        print("close handler")

Problem

I need to press Ctrl+C twice since the ClientConnection.recv_events_thread is not exiting on the first one, so it is failing the first of my criteria. In order to overcome this, I have to explicitly call ClientConnection.close() after catching the KeyboardInterrupt exception:

๐Ÿ Client Script
import logging
from time import sleep

log = logging.getLogger(__name__)
websocket_handler = WebsocketThreadingHandler("ws://localhost:8000")
log.addHandler(websocket_handler)
log.setLevel(logging.DEBUG)

try:
    while True:
        log.debug("DEBUG")
        sleep(1)
        log.info("INFO")
        sleep(1)
        log.warning("WARNING")
        sleep(1)
        log.error("ERROR")
        sleep(1)
        log.critical("CRITICAL")
        sleep(1)
except KeyboardInterrupt:
    websocket_handler.sock.close()  # <-- necessary for only one `Ctrl+C`, but not elegant
    exit()

Sans-I/O API

Again I basically wrote it like a SocketHandler, but this time there is no listening thread. Hence, no need to press Ctrl+C twice.
I followed the docs as good as I could to implement all the rules for a working connection:

๐Ÿ WebsocketSansIOHandler Class
import logging
from logging.handlers import SocketHandler
import socket
from threading import Thread
from time import sleep

from websockets.client import ClientProtocol
from websockets.uri import parse_uri

class WebsocketSansIOHandler(SocketHandler):
    def __init__(self, uri: str):
        self.uri = parse_uri(uri)
        self.events = list()
        super().__init__(self.uri.host, self.uri.port)

    def makeSocket(self):
        print("make new socket")
        # open TCP connection
        self.sock = super().makeSocket()
        sock = self.sock

        # TODO: perform TLS handshake
        # if self.uri.secure:
        #    ...

        # init protocol
        # TODO: Here or in __init__?
        #       it seems that otherwise messages don't get sent
        #       on reconnect after a BrokenPipeError
        self.protocol = ClientProtocol(self.uri)
        protocol = self.protocol

        # send handshake request
        print("handshake")
        request = protocol.connect()
        protocol.send_request(request)
        self.send_data()

        # receive data
        self.receive_data()

        # raise reason if handshake failed
        if protocol.handshake_exc is not None:
            self.reset_socket()
            raise protocol.handshake_exc

        return sock

    def send_data(self):
        try:
            for data in self.protocol.data_to_send():
                if data:
                    print("send data")
                    self.sock.sendall(data)
                else:
                    # half-close TCP connection, i.e. close the write side
                    print("close write side")
                    self.sock.shutdown(socket.SHUT_WR)
        except OSError:
            self.reset_socket()

    def receive_data(self):
        try:
            data = self.sock.recv(65536)
        except OSError:  # socket closed
            data = b""
        if data:
            print("receive data")
            self.protocol.receive_data(data)
            self.process_events_received()
            self.check_close_expected()
            # necessary because `websockets` responds to ping frames,
            # close frames, and incorrect inputs automatically
            self.send_data()
        else:
            print("receive EOF")
            self.protocol.receive_eof()
            self.check_close_expected
            self.close_socket()

    def handleError(self, record):
        print("HANDLE ERROR")
        if self.closeOnError and self.sock:
            self.close_socket()
        else:
            logging.Handler.handleError(self, record)

    def check_close_expected(self):
        # TODO: run in separate thread
        if self.protocol.close_expected():
            print("close expected")
            t = Thread(target=self.close_socket, kwargs=dict(delay=10))
            t.run()

    def process_events_received(self):
        # do something with the events,
        # first event is handshake response
        print("process events received")
        events = self.protocol.events_received()
        if events:
            print("adding new events")
        self.events.extend(events)

    def close_socket(self, delay=None):
        print("close socket")
        if delay is not None:
            print("add delay", delay)
            sleep(delay)
        self.protocol.send_close()
        self.send_data()
        self.reset_socket()

    def reset_socket(self):
        if self.sock is not None:
            print("reset socket")
            self.sock.close()
            self.sock = None
            self.protocol = None

    def send(self, s):
        if self.sock is None:
            self.createSocket()

        if self.sock:
            try:
                self.protocol.send_binary(s)
                self.send_data()
            except Exception as exc:
                print(exc)
                self.close_socket()

    def close(self):
        with self.lock:
            if self.sock:
                self.close_socket()
        print("close handler")
        logging.Handler.close(self)

Problem

It fulfills all my criteria. However, while puzzling all the protocol pieces together, I got really confused and now I don't know whether my implementation is good enough or needs to be improved.

๐Ÿ Client Script
import logging
from time import sleep

log = logging.getLogger(__name__)
websocket_handler = WebsocketSansIOHandler("ws://localhost:8000")
log.addHandler(websocket_handler)
log.setLevel(logging.DEBUG)

try:
    while True:
        log.debug("DEBUG")
        sleep(1)
        log.info("INFO")
        sleep(1)
        log.warning("WARNING")
        sleep(1)
        log.error("ERROR")
        sleep(1)
        log.critical("CRITICAL")
        sleep(1)
except KeyboardInterrupt:
    # no need to explicitly close the websocket connection
    exit()

(I am sorry for the wall of text ...)

What is your opinion on that?

I basically re-used the implementation of SocketHandler

The websockets protocol includes a framing mechanism. WebSocket frames know their length. Prefixing the length is redundant and can be removed from your code.

The threading implementation doesn't support automatic reconnection. As a consequence, you have to roll your own. That's annoying, sorry. A future version of websockets could provide this feature. Then, you could remove a fair amount of code.

Your implementation drops a message if the connection drops. A design that avoid this would be:

  1. emit pushes the log record to a queue;
  2. a background thread maintains the connection open, pops messages from the queue, and pushes them to the websocket connection.

I see that handleError must be taking care of that case. I'm not sure what it does exactly. Maybe it's enough.

I need to press Ctrl+C twice since the ClientConnection.recv_events_thread is not exiting on the first one, so it is failing the first of my criteria. In order to overcome this, I have to explicitly call ClientConnection.close() after catching the KeyboardInterrupt exception:

There's a fair chance that this is a duplicate of #1455, which was fixed in 13.0. Would you be able to test with the latest release?

However, while puzzling all the protocol pieces together, I got really confused and now I don't know whether my implementation is good enough or needs to be improved.

Indeed, adding an I/O layer on top of the Sans-I/O protocol implementation is a substantial endeavor. I would recommend your initial approach, using the threading implementation.

I've been thinking about including this in websockets, either as code or as a recipe in the logging docs (similar to the logging to JSON recipe).

At this point, I'm not convinced that this is a sufficiently useful in general to cross my threshold i.e. "I'm willing to put my time into testing and maintaning it".

Once the threading implementation supports automatic reconnection, if the code fits on one page, maybe I could reconsider.

If you want to share an updated version or you have further questions, feel free to ask.

Given that you called this issue "Asking for Guidance" rather than "Request for Inclusion", I'm going to close this for now.

If will reopen:

  • if the "need to press Ctrl+C twice" problem isn't fixed in 13.0
  • if further discussion turns up something actionable for me