encode/httpcore

Missing client stream reset in h2 connection

MarkLux opened this issue · 0 comments

Discussed in #941

Originally posted by MarkLux August 9, 2024
Background

  1. Using Http2Connection with a high concurrency stream num (e.g over 400 stream_ids reusing single connection)
  2. Each stream is waiting for SSE events (assume that there are many server events to receive for each stream)
  3. Some of the streams cancelled by client (Not ALL)

Phenomenon

  1. The Cancelled streams will be closed
  2. But, the Server is still sending SSE events for that stream (because server doesn't receive any signal to indicate the stream is closed, and the connection is still alive because other streams are using)
  3. The rest streams on this connection, will be blocked in this loop:
# http2.py, class AsyncHTTP2Connection
# ...
async def _receive_stream_event(
        self, request: Request, stream_id: int
    ) -> typing.Union[
        h2.events.ResponseReceived, h2.events.DataReceived, h2.events.StreamEnded
    ]:
        while not self._events.get(stream_id): 
            # REST STREAM ARE BLOCKED HERE
            # for each loop, the _receive_events give an events that belongs to other stream (those we closed, but server didn't know, keep sending)
            await self._receive_events(request, stream_id)
        event = self._events[stream_id].pop(0)
        if isinstance(event, h2.events.StreamReset):
            raise RemoteProtocolError(event)
        return event

we made some experiments and the following logs were observed

img_v3_02dj_93f91287-7a04-4c02-8089-1bb14c957c5g

Solution

reset the stream when client cancelled the stream, here is a demo (have made experiments to prove it works)

# http2.py, class AsyncHTTP2Connection

async def _response_closed(self, stream_id: int) -> None:
        await self._max_streams_semaphore.release()
        del self._events[stream_id]
        # ADD
        stream = self._h2_state._get_stream_by_id(stream_id=stream_id)
        if stream and not stream.closed:
            # send a RstStream with Cancel Code 
            self._h2_state.reset_stream(stream_id=stream_id, error_code=8)
        # ADD END
        async with self._state_lock:
            if self._connection_terminated and not self._events:
                await self.aclose()

We referred some h2 libraries of other language, and found that the 'client stream reset when cancelled' behavior were both implemented.

e.g.1: go net http2 library
e.g.2: rust h2 library