MaxHalford/maxhalford.github.io

blog/flask-sse-no-deps/

utterances-bot opened this issue Β· 24 comments

Server-sent events in Flask without extra dependencies - Max Halford

Server-sent events (SSE) is a mechanism for sending updates from a server to a client. The fundamental difference with WebSockets is that the communication only goes in one direction. In other words, the client cannot send information to the server. For many usecases this is all you might need. Indeed, if you just want to receive notifications/updates/messages, then using a WebSocket is overkill. Once you’ve implemented the SSE functionality on your server, then all you need on a client that uses JavaScript is an EventSource.

https://maxhalford.github.io/blog/flask-sse-no-deps/

Opsi commented

Thank you very much for this well-written blog-post. I had the same issue that I can't use flask-sse because i can't add Redis as project dependency. I will try to integrate your solution instead.
πŸ‘

@Opsi glad to hear that! Tell me if you encounter any issue.

Max, thank you very much for this post. I was going to use some of your code snippets, and I wonder if you would care to clarify the licensing on them.

It's my pleasure @markgalassi. I've put the code in this repository under an MIT license, so feel free to proceed in any way you wish.

Hi Max. I am testing your implementation but it seems that the "msg = messages.get()" call in the '/listen' route does not return when messages are added to the queue, e.g. by the '/ping' route.
Maybe this is a threading issue (asynchrounous workers)? I think my web-host is running "passenger" on an apache server. It is not clear to me if the application is blocked by the messages.get() call, or if the two clients (listen and ping) are in different processes and not using the same instance of the MessageAnnouncer(), or if it something else that is blocking the communication throught the queue.

Hey @dmulvang. Just to be clear, I haven't tried this out with an Apache server, so it's hard for me to reproduce your issue.

Usually it's quite easy to figure out if a route is hanging or not. If you send a request to the /ping route and that it doesn't return immediately, then it's hanging. In that case, it's probably because your server is using a single web worker to handle all requests. You need to set your server up so that you have multiple web workers. If possible, I would recommend switching to a Nginx/Gunicorn stack. The latter duo is very commonly for Flask apps and is well documented. I'm sorry I can't help you out more. I hope you figure it out.

Thanks for this article. Well written and clear. FWIW I encountered this solution elsewhere for nginx/proxy issues with SSE:

    # the below header tells any proxy not to compress server-sent events (SSEs) - useful for Webpack DevServer
    sse_message.headers['Cache-Control'] = "no-transform"
    # the below header prevents nginx from swallowing SSEs.
    sse_message.headers['X-Accel-Buffering'] = "no"
c650 commented

this is a blessing tysm

c650 commented

Wow thanks that was a good read. Let’s see how it goes and worst case I switch to long polling I guess

Max, thank you for posting this. I have an internal web app to pull reports out of our LMS for staff and this lets me show progress on some long-running tasks.

Just a heads up that if you're listening with a JavasScript EventSource instance, you have to send some information back on the initial connect request or else the connection will never complete. Here's a simple example:

#... rest of the announcer
def listen(self):
        q = queue.Queue(maxsize=5)
        self.listeners.append(q)
        # Send an initial message to connect
        self.listeners[0].put_nowait(format_sse(data="You have successfully connected."))
        return q
#...
// I put mine in a global scope to connect immediately.
const source = new EventSource('/listen')
source.onopen = (e) => console.log('Connection requested')
source.onmessage = (msg) => console.log(msg.data)

Thanks for the interesting read, it helped me a lot to get started. However, this approach as it is now does have some problems.

You create a global instance of MessageAnnouncer to be used by all (request) threads. While the python queue.Queue is indeed thread safe, your MessageAnnouncer is not. Just imagine one thread deleting a listener while the other thread is still iterating over the list.

While that issue could probably be fixed easily, your solution is not process-safe either. In many environments, flask apps are run in multi process mode, e.g. behind a uwsgi. Then each process has its own announcer and they don't talk to each other. And that's exactly the point where other people turn to Redis, to share only one global instance.

The good news is you can solve both problems without Redis by using python's built-in multiprocessing.Manager. Here is a post that gives the basic idea about how to use it.

Here is part of my sse.py that has to be started before the flask app (my SSEQueue is basically your MessageAnnouncer).

class SSEManager(multiprocessing.managers.BaseManager):
	pass

def start_sse():
	lock = multiprocessing.Lock()
	sse = SSEQueue()
	
	def sse_listen():
		with lock:
			return sse.listen()
	
	def sse_put(item):
		with lock:
			sse.put(item)

	SSEManager.register("sse_listen", sse_listen)
	SSEManager.register("sse_put", sse_put)


	manager = SSEManager(address=("127.0.0.1", 2437), authkey=b'sse')
	server = manager.get_server()
	server.serve_forever()


def import_sse():
	SSEManager.register("sse_listen")
	SSEManager.register("sse_put")


if __name__ == "__main__":
	start_sse()
else:
	import_sse()

Then in app.py:

import sse
#[...]
	sse_manager = sse.SSEManager(address=("127.0.0.1", 2437), authkey=b'sse')
	sse_manager.connect()
	sse_manager.sse_put('data: {"some": "data"}')

and similarly for reading.

Hope that's useful to anyone.

Hey, you have a bug:

for i in reversed(range(len(self.listeners))):
try:
self.listeners[i].put_nowait(msg)
except queue.Full:
del self.listeners[i]

Instead do this:

hang_up = []
for i in reversed(range(len(self.listeners))):
try:
self.listeners[i].put_nowait(msg)
except queue.Full:
hang_up.append(i)
hang_up.sort(reverse=True)
for i in hang_up:
del self.listeners[i]

This way the list size doesn't shrink when you del a listener who's not listening.

@nickburnette-source are you sure? The whole point of looping over in reverse order is that deleting an element doesn't affect the indexes.

My bad, I didn't see the looping in reverse order part πŸ˜…

Thanks, I am strugginig with flask-sse and sanic-sse until reading this post

Hey, I love your bloigpost!
How would I forward the msg from the listener to the html, aka how would I go about displaying the sse from the listener to the user?

Hey, I love your bloigpost!

Thanks @yjaenike <3

How would I forward the msg from the listener to the html, aka how would I go about displaying the sse from the listener to the user?

For that you would need to use a client library on the frontend side. This MDN article seems quite clear to me.

I've been using this code for a while - worked great - however when it was moved to a "proper" multithreaded gunicorn/gevent appserver we immediately ran into an issue...

Whilst queues are threadsafe - the array you're holding them in isn't!!

Sooner or later you'll see "index out of range" errors thrown either when reading or removing elements from that array (e.g. you can get this error in the queue_full handler as well as the code itself)

It's probably OK just to catch those exceptions and ignore them - another thread did the work anyway - but if you DON'T add an exception handler for that, your SSE server will stop responding when this happens!

Just FYI

neojaw commented

@shrewdlogarithm I adressed this in my previous comment. If you like, you can give my solution a try.

@neojaw Ah - that's looks interesting!

I'll have to figure-out how to integrate that into my app as I'm using Flask much as per the "/listen" example in the original article

Thanks for that tho - I'll have a tinker!

this blog saved the day for me, really helpful!
hacky way to implement private messaging and not broadcast message to everyone using client's ip address (not throughly tested)

import queue
from flask import Response, request
import time

class MessageAnnouncer:
    def __init__(self):
        self.listeners = {}

    def listen(self, client_addr):
        client_id = client_addr  # Get client IP address as the unique identifier
        if client_id not in self.listeners:
            self.listeners[client_id] = []  # Initialize an empty list
        q = queue.Queue(maxsize=5)
        self.listeners[client_id].append(q)
        return q

    def announce(self, msg, client_addr):
        client_id = client_addr
        if client_id in self.listeners:
            messages = self.listeners[client_id]
            print(self.listeners)
            print(messages)
            for i in reversed(range(len(messages))):
                try:
                    messages[i].put_nowait(msg)
                except queue.Full:
                    del messages[i]

def format_sse(data: str, event=None) -> str:
    msg = f'data: {data}\n\n'
    if event is not None:
        msg = f'event: {event}\n{msg}'
    return msg

@app.route('/listen', methods=['GET'])
def listen():
    def stream(client_addr):
        messages = announcer.listen(client_addr)  # returns a queue.Queue
        print("streaming queue address ")
        print(messages)
        while True:
            msg = messages.get()  # blocks until a new message arrives
            yield msg

    return Response(stream(request.remote_addr), mimetype='text/event-stream')

@app.route("/ping")
def ping():
    print(request.remote_addr)
    announcer.announce(msg=format_sse(data="pong"), client_addr=request.remote_addr)
    time.sleep(5)
    announcer.announce(msg=format_sse(data="ping"), client_addr=request.remote_addr)
    return {}, 200

Thanks for sharing @navdhakar. I reformatted your code thanks to ChatGPT, I hope you don't mind :)

FYI there are several standalone python redis libraries which don't require a separate redis server, e.g. https://github.com/yahoo/redislite.