Asking for Guidance in Writing a WebsocketHandler for the Python logging Module
jbdyn opened this issue ยท 3 comments
Hi! ๐
In my project, I heavily rely on websockets
for exchanging synchronization data and it works like a charm. โค๏ธ
Especially, there is a local websocket server running as a background service to which several user-facing apps connect to. The reason for this is to have only one outgoing websocket connection from this background service to a remote server distributing sync data to other clients.
Now I need centralized logging, so that logs of individual apps can end up in one file.
Since I already have a websocket server running in the background, it might be cool to have a WebsocketHandler
similar to SocketHandler
already shipping with the logging
module.
I know that for this, a websocket connection is a bit over the top as it is only used one-way (logs from user-facing app to the background websocket server), but otherwise I would need to manage a separate TCP Server.
For testing, I used a slightly modified version of the example script for an echo server from the docs:
๐ Server Script
#!/usr/bin/env python
import asyncio
from websockets.server import serve
async def print_message(websocket):
async for message in websocket:
print(message)
async def main():
async with serve(print_message, "localhost", 8000):
await asyncio.Future() # run forever
try:
asyncio.run(main())
except KeyboardInterrupt:
print("closing")
My criteria for a good WebsocketHandler
are:
- graceful shutdown on program exit
- graceful handling of connection interruption (i.e. killing the server) and start sending logs again upon reconnect
- handling of graceful server shutdown
I tried to implement such a handler in two ways, however, I am not really happy with both:
Threading Client API
I basically re-used the implementation of SocketHandler
๐ WebsocketThreadingHandler Class
import logging
import pickle
import struct
import time
from websockets.sync.client import connect
class WebsocketThreadingHandler(logging.Handler):
def __init__(self, uri: str):
self.uri = uri
self.sock = None
self.retryTime = None
#
# Exponential backoff parameters.
#
self.retryStart = 1.0
self.retryMax = 30.0
self.retryFactor = 2.0
super().__init__()
def makePickle(self, record):
"""
Pickles the record in binary format with a length prefix, and
returns it ready for transmission across the socket.
"""
ei = record.exc_info
if ei:
# just to get traceback text into record.exc_text ...
dummy = self.format(record)
# See issue #14436: If msg or args are objects, they may not be
# available on the receiving end. So we convert the msg % args
# to a string, save it as msg and zap the args.
d = dict(record.__dict__)
d["msg"] = record.getMessage()
d["args"] = None
d["exc_info"] = None
# Issue #25685: delete 'message' if present: redundant with 'msg'
d.pop("message", None)
s = pickle.dumps(d, 1)
slen = struct.pack(">L", len(s))
return slen + s
def createSocket(self):
"""
Try to create a socket, using an exponential backoff with
a max retry time. Thanks to Robert Olson for the original patch
(SF #815911) which has been slightly refactored.
"""
now = time.time()
# Either retryTime is None, in which case this
# is the first time back after a disconnect, or
# we've waited long enough.
if self.retryTime is None:
attempt = True
else:
attempt = now >= self.retryTime
if attempt:
try:
print("try create socket")
self.sock = connect(self.uri) # <-- here, the recv_events_thread starts
print("create socket")
self.retryTime = None # next time, no delay before trying
except OSError:
# Creation failed, so set the retry time and return.
if self.retryTime is None:
self.retryPeriod = self.retryStart
else:
self.retryPeriod = self.retryPeriod * self.retryFactor
if self.retryPeriod > self.retryMax:
self.retryPeriod = self.retryMax
self.retryTime = now + self.retryPeriod
def send(self, s):
if self.sock is None:
self.createSocket()
if self.sock:
try:
self.sock.send(s)
print("emit")
except (Exception, KeyboardInterrupt):
self.closeSocket()
def emit(self, record):
"""This gets called on every new log record."""
try:
s = self.makePickle(record)
self.send(s)
except Exception:
self.handleError(record)
def closeSocket(self):
print("close socket")
self.sock.close()
self.sock = None
def handleError(self, record):
print("HANDLE ERROR")
with self.lock:
if self.sock:
self.closeSocket()
else:
super().handleError(record)
def close(self):
with self.lock:
if self.sock:
self.closeSocket()
super().close()
print("close handler")
Problem
I need to press Ctrl+C
twice since the ClientConnection.recv_events_thread
is not exiting on the first one, so it is failing the first of my criteria. In order to overcome this, I have to explicitly call ClientConnection.close()
after catching the KeyboardInterrupt
exception:
๐ Client Script
import logging
from time import sleep
log = logging.getLogger(__name__)
websocket_handler = WebsocketThreadingHandler("ws://localhost:8000")
log.addHandler(websocket_handler)
log.setLevel(logging.DEBUG)
try:
while True:
log.debug("DEBUG")
sleep(1)
log.info("INFO")
sleep(1)
log.warning("WARNING")
sleep(1)
log.error("ERROR")
sleep(1)
log.critical("CRITICAL")
sleep(1)
except KeyboardInterrupt:
websocket_handler.sock.close() # <-- necessary for only one `Ctrl+C`, but not elegant
exit()
Sans-I/O API
Again I basically wrote it like a SocketHandler
, but this time there is no listening thread. Hence, no need to press Ctrl+C
twice.
I followed the docs as good as I could to implement all the rules for a working connection:
๐ WebsocketSansIOHandler Class
import logging
from logging.handlers import SocketHandler
import socket
from threading import Thread
from time import sleep
from websockets.client import ClientProtocol
from websockets.uri import parse_uri
class WebsocketSansIOHandler(SocketHandler):
def __init__(self, uri: str):
self.uri = parse_uri(uri)
self.events = list()
super().__init__(self.uri.host, self.uri.port)
def makeSocket(self):
print("make new socket")
# open TCP connection
self.sock = super().makeSocket()
sock = self.sock
# TODO: perform TLS handshake
# if self.uri.secure:
# ...
# init protocol
# TODO: Here or in __init__?
# it seems that otherwise messages don't get sent
# on reconnect after a BrokenPipeError
self.protocol = ClientProtocol(self.uri)
protocol = self.protocol
# send handshake request
print("handshake")
request = protocol.connect()
protocol.send_request(request)
self.send_data()
# receive data
self.receive_data()
# raise reason if handshake failed
if protocol.handshake_exc is not None:
self.reset_socket()
raise protocol.handshake_exc
return sock
def send_data(self):
try:
for data in self.protocol.data_to_send():
if data:
print("send data")
self.sock.sendall(data)
else:
# half-close TCP connection, i.e. close the write side
print("close write side")
self.sock.shutdown(socket.SHUT_WR)
except OSError:
self.reset_socket()
def receive_data(self):
try:
data = self.sock.recv(65536)
except OSError: # socket closed
data = b""
if data:
print("receive data")
self.protocol.receive_data(data)
self.process_events_received()
self.check_close_expected()
# necessary because `websockets` responds to ping frames,
# close frames, and incorrect inputs automatically
self.send_data()
else:
print("receive EOF")
self.protocol.receive_eof()
self.check_close_expected
self.close_socket()
def handleError(self, record):
print("HANDLE ERROR")
if self.closeOnError and self.sock:
self.close_socket()
else:
logging.Handler.handleError(self, record)
def check_close_expected(self):
# TODO: run in separate thread
if self.protocol.close_expected():
print("close expected")
t = Thread(target=self.close_socket, kwargs=dict(delay=10))
t.run()
def process_events_received(self):
# do something with the events,
# first event is handshake response
print("process events received")
events = self.protocol.events_received()
if events:
print("adding new events")
self.events.extend(events)
def close_socket(self, delay=None):
print("close socket")
if delay is not None:
print("add delay", delay)
sleep(delay)
self.protocol.send_close()
self.send_data()
self.reset_socket()
def reset_socket(self):
if self.sock is not None:
print("reset socket")
self.sock.close()
self.sock = None
self.protocol = None
def send(self, s):
if self.sock is None:
self.createSocket()
if self.sock:
try:
self.protocol.send_binary(s)
self.send_data()
except Exception as exc:
print(exc)
self.close_socket()
def close(self):
with self.lock:
if self.sock:
self.close_socket()
print("close handler")
logging.Handler.close(self)
Problem
It fulfills all my criteria. However, while puzzling all the protocol pieces together, I got really confused and now I don't know whether my implementation is good enough or needs to be improved.
๐ Client Script
import logging
from time import sleep
log = logging.getLogger(__name__)
websocket_handler = WebsocketSansIOHandler("ws://localhost:8000")
log.addHandler(websocket_handler)
log.setLevel(logging.DEBUG)
try:
while True:
log.debug("DEBUG")
sleep(1)
log.info("INFO")
sleep(1)
log.warning("WARNING")
sleep(1)
log.error("ERROR")
sleep(1)
log.critical("CRITICAL")
sleep(1)
except KeyboardInterrupt:
# no need to explicitly close the websocket connection
exit()
(I am sorry for the wall of text ...)
What is your opinion on that?
I basically re-used the implementation of SocketHandler
The websockets protocol includes a framing mechanism. WebSocket frames know their length. Prefixing the length
is redundant and can be removed from your code.
The threading implementation doesn't support automatic reconnection. As a consequence, you have to roll your own. That's annoying, sorry. A future version of websockets could provide this feature. Then, you could remove a fair amount of code.
Your implementation drops a message if the connection drops. A design that avoid this would be:
emit
pushes the log record to a queue;- a background thread maintains the connection open, pops messages from the queue, and pushes them to the websocket connection.
I see that handleError
must be taking care of that case. I'm not sure what it does exactly. Maybe it's enough.
I need to press Ctrl+C twice since the ClientConnection.recv_events_thread is not exiting on the first one, so it is failing the first of my criteria. In order to overcome this, I have to explicitly call ClientConnection.close() after catching the KeyboardInterrupt exception:
There's a fair chance that this is a duplicate of #1455, which was fixed in 13.0. Would you be able to test with the latest release?
However, while puzzling all the protocol pieces together, I got really confused and now I don't know whether my implementation is good enough or needs to be improved.
Indeed, adding an I/O layer on top of the Sans-I/O protocol implementation is a substantial endeavor. I would recommend your initial approach, using the threading implementation.
I've been thinking about including this in websockets, either as code or as a recipe in the logging docs (similar to the logging to JSON recipe).
At this point, I'm not convinced that this is a sufficiently useful in general to cross my threshold i.e. "I'm willing to put my time into testing and maintaning it".
Once the threading implementation supports automatic reconnection, if the code fits on one page, maybe I could reconsider.
If you want to share an updated version or you have further questions, feel free to ask.
Given that you called this issue "Asking for Guidance" rather than "Request for Inclusion", I'm going to close this for now.
If will reopen:
- if the "need to press Ctrl+C twice" problem isn't fixed in 13.0
- if further discussion turns up something actionable for me