cloudflare/pingora

Tokio Runtime Blocking When Handling Large Data Requests

Closed this issue · 3 comments

Describe the bug

When multiple concurrent requests are sent to the load balancer, the following behavior occurs:

  1. The first request is sent, and a peer is selected after acquiring a semaphore permit.
  2. A second, subsequent request arrives and blocks asynchronously while waiting for the same semaphore in the upstream_peer method.
  3. With small payloads (a few bytes), the first request is processed in the background and completes. Its RequestContext is dropped, the semaphore is released, and the second request proceeds as expected.
  4. With large payloads (approx. 64KB), the first request's I/O task appears to stall while the second request is blocked waiting for the semaphore. This prevents the background task from reading and proxying the data from the first request.
  5. Configuring the Tokio runtime with multiple worker threads does not resolve the issue.

Pingora info

Pingora version: 0.5.0
Rust version: cargo 1.88.0 (873a06493 2025-05-10)
Operating system version: Ubuntu 24.04.2

Steps to reproduce

  1. Use a load balancer example from the quick start guide, but modify it to use a tokio::sync::Semaphore to simulate blocking peer selection.
pub struct LB(Arc<LoadBalancer<RoundRobin>>, Arc<Semaphore>);

impl LB {
    pub fn new(lb: Arc<LoadBalancer<RoundRobin>>) -> Self {
        Self(lb, Arc::new(Semaphore::new(1)))
    }
}

#[derive(Default)]
pub struct RequestContext {
    pub semaphore: Option<OwnedSemaphorePermit>,
}

#[async_trait]
impl ProxyHttp for LB {
    type CTX = RequestContext;

    fn new_ctx(&self) -> Self::CTX {
        RequestContext::default()
    }

    async fn upstream_peer(
        &self,
        _session: &mut Session,
        ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>> {
        ctx.semaphore = Some(self.1.clone().acquire_owned().await.unwrap());
        log::info!("Semaphore acquired for backend selection");

        let backend = self.0.select(b"", 256).unwrap();
        let peer = http2_peer(&backend.addr);

        Ok(peer)
    }

}


pub fn http2_peer<A: ToSocketAddrs>(address: A) -> Box<HttpPeer> {
    let mut peer = Box::new(HttpPeer::new(address, false, String::new()));
    peer.options.set_http_version(2, 2);

    peer
}
  1. Send one request, followed immediately by a second request.
  2. Observe that with a small payload, the first request completes, releasing the semaphore and allowing the second request to be processed.
  3. Replace the payload with a large (e.g., 64KB) request body. Observe that the first request stalls while the second request is waiting to acquire the semaphore.

Expected results

The Tokio runtime should concurrently process the first request's I/O in the background, regardless of payload size. This would allow the request to complete, release its semaphore permit, and promptly unblock the second request.

Observed results

With a large payload, the first request is not proxied in the background while the second request waits for the semaphore, causing a deadlock.

Additional context

I am using HTTP/2 (gRPC) for proxying requests.
It is noteworthy that, in rare cases, two large requests can finish immediately.
Without a Semaphore, large requests always work as expected.

Attached below are the debug logs for sending two parallel requests with both large and small payloads.

Large payload logs (stuck)

INFO  pingora_core::server] Bootstrap starting
DEBUG pingora_core::server] Some(
        Opt {
            upgrade: false,
            daemon: false,
            nocapture: false,
            test: false,
            conf: None,
        },
    )
INFO  pingora_core::server] Bootstrap done
INFO  pingora_core::server] Server starting
INFO  pingora_core::server] Server starting
DEBUG pingora_core::services::listening] new event!
DEBUG h2::codec::framed_write] send frame=Settings { flags: (0x0) }
DEBUG h2::proto::connection] Connection; peer=Server
DEBUG pingora_core::protocols::http::v2::server] H2 handshake done.
DEBUG h2::codec::framed_read] received frame=Settings { flags: (0x0), enable_push: 0, max_concurrent_streams: 0, initial_window_size: 4194304, max_frame_size: 4194304, max_header_list_size: 16384 }
DEBUG h2::codec::framed_write] send frame=Settings { flags: (0x1: ACK) }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 4128769 }
DEBUG h2::codec::framed_read] received frame=Settings { flags: (0x1: ACK) }
DEBUG h2::proto::settings] received settings ACK; applying Settings { flags: (0x0) }
DEBUG h2::codec::framed_read] received frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(1), size_increment: 5 }
DEBUG h2::codec::framed_read] received frame=Data { stream_id: StreamId(1) }
DEBUG h2::codec::framed_read] received frame=Data { stream_id: StreamId(1) }
DEBUG h2::codec::framed_read] received frame=Data { stream_id: StreamId(1) }
DEBUG h2::codec::framed_read] received frame=Data { stream_id: StreamId(1) }
DEBUG h2::codec::framed_read] received frame=Headers { stream_id: StreamId(3), flags: (0x4: END_HEADERS) }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(3), size_increment: 5 }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 10 }
DEBUG pingora_proxy] Successfully get a new request
INFO  my_proxy::downstream] New request: POST /inference.GRPCInferenceService/ModelInfer, Host: 0.0.0.0:8000
INFO  my_proxy::downstream] Semaphore acquired for backend selection
DEBUG pingora_proxy] Successfully get a new request
INFO  my_proxy::downstream] New request: POST /inference.GRPCInferenceService/ModelInfer, Host: 0.0.0.0:8000
DEBUG pingora_core::connectors::l4] connected to new server: 0.0.0.0:8002
DEBUG h2::client] binding client connection
DEBUG h2::client] client connection bound
DEBUG h2::codec::framed_write] send frame=Settings { flags: (0x0), enable_push: 0, max_concurrent_streams: 1, initial_window_size: 8388608, max_frame_size: 65536 }
DEBUG h2::proto::connection] Connection; peer=Client
DEBUG pingora_core::connectors::http::v2] H2 handshake to server done.
DEBUG pingora_proxy::proxy_h2] Request to h2: Parts { method: POST, uri: http://0.0.0.0:8000/inference.GRPCInferenceService/ModelInfer, version: HTTP/2.0, headers: {"content-type": "application/grpc", "te": "trailers", "grpc-accept-encoding": "identity, deflate, gzip", "user-agent": "grpc-python/1.57.0 grpc-c/34.0.0 (linux; chttp2)"} }
DEBUG pingora_proxy::proxy_h2] send END_STREAM on HEADERS: true
DEBUG h2::codec::framed_write] send frame=WindowUpdate { stream_id: StreamId(0), size_increment: 8323073 }
DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
DEBUG h2::codec::framed_read] received frame=Settings { flags: (0x0), initial_window_size: 4194304, max_frame_size: 4194304, max_header_list_size: 16384 }
DEBUG h2::codec::framed_write] send frame=Settings { flags: (0x1: ACK) }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 4128769 }
DEBUG h2::codec::framed_read] received frame=Settings { flags: (0x1: ACK) }
DEBUG h2::proto::settings] received settings ACK; applying Settings { flags: (0x0), enable_push: 0, max_concurrent_streams: 1, initial_window_size: 8388608, max_frame_size: 65536 }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(1), size_increment: 5 }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 5 }

Small payload logs (works as expected)

INFO  pingora_core::server] Bootstrap starting
DEBUG pingora_core::server] Some(
        Opt {
            upgrade: false,
            daemon: false,
            nocapture: false,
            test: false,
            conf: None,
        },
    )
INFO  pingora_core::server] Bootstrap done
INFO  pingora_core::server] Server starting
INFO  pingora_core::server] Server starting
DEBUG pingora_core::services::listening] new event!
DEBUG h2::codec::framed_write] send frame=Settings { flags: (0x0) }
DEBUG h2::proto::connection] Connection; peer=Server
DEBUG pingora_core::protocols::http::v2::server] H2 handshake done.
DEBUG h2::codec::framed_read] received frame=Settings { flags: (0x0), enable_push: 0, max_concurrent_streams: 0, initial_window_size: 4194304, max_frame_size: 4194304, max_header_list_size: 16384 }
DEBUG h2::codec::framed_write] send frame=Settings { flags: (0x1: ACK) }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 4128769 }
DEBUG h2::codec::framed_read] received frame=Settings { flags: (0x1: ACK) }
DEBUG h2::proto::settings] received settings ACK; applying Settings { flags: (0x0) }
DEBUG h2::codec::framed_read] received frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(1), size_increment: 5 }
DEBUG h2::codec::framed_read] received frame=Data { stream_id: StreamId(1), flags: (0x1: END_STREAM) }
DEBUG h2::codec::framed_read] received frame=Headers { stream_id: StreamId(3), flags: (0x4: END_HEADERS) }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(3), size_increment: 5 }
DEBUG h2::codec::framed_read] received frame=Data { stream_id: StreamId(3), flags: (0x1: END_STREAM) }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 10 }
DEBUG pingora_proxy] Successfully get a new request
INFO  my_proxy::downstream] New request: POST /inference.GRPCInferenceService/ModelInfer, Host: 0.0.0.0:8000
INFO  my_proxy::downstream] Semaphore acquired for backend selection
DEBUG pingora_proxy] Successfully get a new request
INFO  my_proxy::downstream] New request: POST /inference.GRPCInferenceService/ModelInfer, Host: 0.0.0.0:8000
DEBUG pingora_core::connectors::l4] connected to new server: 0.0.0.0:8002
DEBUG h2::client] binding client connection
DEBUG h2::client] client connection bound
DEBUG h2::codec::framed_write] send frame=Settings { flags: (0x0), enable_push: 0, max_concurrent_streams: 1, initial_window_size: 8388608, max_frame_size: 65536 }
DEBUG h2::proto::connection] Connection; peer=Client
DEBUG pingora_core::connectors::http::v2] H2 handshake to server done.
DEBUG pingora_proxy::proxy_h2] Request to h2: Parts { method: POST, uri: http://0.0.0.0:8000/inference.GRPCInferenceService/ModelInfer, version: HTTP/2.0, headers: {"content-type": "application/grpc", "te": "trailers", "grpc-accept-encoding": "identity, deflate, gzip", "user-agent": "grpc-python/1.57.0 grpc-c/34.0.0 (linux; chttp2)"} }
DEBUG pingora_proxy::proxy_h2] send END_STREAM on HEADERS: true
DEBUG pingora_proxy::proxy_h2] downstream event
DEBUG pingora_proxy::proxy_h2] Write 3119 bytes body to h2 upstream
DEBUG h2::codec::framed_write] send frame=WindowUpdate { stream_id: StreamId(0), size_increment: 8323073 }
DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(1), flags: (0x1: END_STREAM) }
DEBUG h2::codec::framed_read] received frame=Settings { flags: (0x0), initial_window_size: 4194304, max_frame_size: 4194304, max_header_list_size: 16384 }
DEBUG h2::codec::framed_write] send frame=Settings { flags: (0x1: ACK) }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 4128769 }
DEBUG h2::codec::framed_read] received frame=Settings { flags: (0x1: ACK) }
DEBUG h2::proto::settings] received settings ACK; applying Settings { flags: (0x0), enable_push: 0, max_concurrent_streams: 1, initial_window_size: 8388608, max_frame_size: 65536 }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 3119 }
DEBUG h2::codec::framed_read] received frame=Ping { ack: false, payload: [0, 0, 0, 0, 0, 0, 0, 0] }
DEBUG h2::codec::framed_write] send frame=Ping { ack: true, payload: [0, 0, 0, 0, 0, 0, 0, 0] }
DEBUG h2::codec::framed_read] received frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
DEBUG h2::codec::framed_read] received frame=Data { stream_id: StreamId(1) }
DEBUG h2::codec::framed_read] received frame=Headers { stream_id: StreamId(1), flags: (0x5: END_HEADERS | END_STREAM) }
DEBUG pingora_proxy::proxy_h2] upstream event: Header(ResponseHeader { base: Parts { status: 200, version: HTTP/2.0, headers: {"content-type": "application/grpc", "grpc-accept-encoding": "identity, deflate, gzip"} }, header_name_map: None, reason_phrase: None }, false)
DEBUG pingora_proxy::proxy_h2] Parsing response trailers..
DEBUG pingora_proxy::proxy_h2] finished sending body to downstream
INFO  my_proxy::downstream] Semaphore acquired for backend selection
DEBUG pingora_proxy::proxy_h2] Request to h2: Parts { method: POST, uri: http://0.0.0.0:8000/inference.GRPCInferenceService/ModelInfer, version: HTTP/2.0, headers: {"content-type": "application/grpc", "te": "trailers", "grpc-accept-encoding": "identity, deflate, gzip", "user-agent": "grpc-python/1.57.0 grpc-c/34.0.0 (linux; chttp2)"} }
DEBUG pingora_proxy::proxy_h2] send END_STREAM on HEADERS: true
DEBUG pingora_proxy::proxy_h2] downstream event
DEBUG pingora_proxy::proxy_h2] Write 3119 bytes body to h2 upstream
DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(3), flags: (0x4: END_HEADERS) }
DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(3) }
DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(3), flags: (0x5: END_HEADERS | END_STREAM) }
DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(3), flags: (0x4: END_HEADERS) }
DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(3), flags: (0x1: END_STREAM) }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(3), size_increment: 3108 }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 3108 }
DEBUG h2::codec::framed_read] received frame=Ping { ack: false, payload: [0, 0, 0, 0, 0, 0, 0, 0] }
DEBUG h2::codec::framed_write] send frame=Ping { ack: true, payload: [0, 0, 0, 0, 0, 0, 0, 0] }
DEBUG h2::codec::framed_read] received frame=Headers { stream_id: StreamId(3), flags: (0x4: END_HEADERS) }
DEBUG h2::codec::framed_read] received frame=Data { stream_id: StreamId(3) }
DEBUG h2::codec::framed_read] received frame=Headers { stream_id: StreamId(3), flags: (0x5: END_HEADERS | END_STREAM) }
DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 3119 }
DEBUG pingora_proxy::proxy_h2] upstream event: Header(ResponseHeader { base: Parts { status: 200, version: HTTP/2.0, headers: {"content-type": "application/grpc", "grpc-accept-encoding": "identity, deflate, gzip"} }, header_name_map: None, reason_phrase: None }, false)
DEBUG pingora_proxy::proxy_h2] Parsing response trailers..
DEBUG pingora_proxy::proxy_h2] finished sending body to downstream
DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(1) }
DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(1), flags: (0x5: END_HEADERS | END_STREAM) }
DEBUG pingora_core::protocols::l4::stream] Dropping socket Some(BufStream { inner: BufReader { reader: BufWriter { writer: RawStreamWrapper { stream: Tcp(PollEvented { io: Some(TcpStream { addr: 127.0.0.1:8000, fd: 14 }) }), rx_ts: None, enable_rx_ts: false, reusable_cmsg_space: [] }, buffer: 0/1460, written: 0 }, buffer: 0/65536 } })
DEBUG h2::codec::framed_read] received frame=Ping { ack: false, payload: [0, 0, 0, 0, 0, 0, 0, 1] }
DEBUG h2::codec::framed_write] send frame=Ping { ack: true, payload: [0, 0, 0, 0, 0, 0, 0, 1] }

ChatGPT-5 identified a fix on the 3rd attempt: increase the HTTP/2 flow-control windows on the downstream server side in pingora-proxy so one large request body doesn’t stall others on the same connection. This avoids semaphore-induced stalling around ~64KB payloads by keeping background H2 tasks progressing.

Change is localized to pingora-proxy:

  • In pingora-proxy/src/lib.rs, import H2Options and set H2_WINDOW_SIZE = 8 MiB.
  • Override HttpServerApp::h2_options() for HttpProxy to set:
    • initial_connection_window_size(H2_WINDOW_SIZE)
    • initial_window_size(H2_WINDOW_SIZE)

This raises both connection- and stream-level H2 windows, preventing flow-control blocking when one large request is being proxied while another awaits the semaphore.

I can confirm that it works in my case.

This question has been stale for a week. It will be closed in an additional day if not updated.

This issue has been closed because it has been stalled with no activity.