Response body stream callback invoked multiple times on write error
gwynne opened this issue · 2 comments
Credit to @omrd for reporting this issue.
Given the following route:
app.get { request in
request.fileio.streamFile(at: "/any/valid/path", onCompleted: { result in
print("File stream finished: \(result)")
})
}
If the client cancels the request (i.e. closes the connection) before the file has been completely streamed, the output will be:
File stream finished: failure(read(descriptor:pointer:size:): Connection reset by peer (errno: 54))
File stream finished: failure(read(descriptor:pointer:size:): Connection reset by peer (errno: 54))
This occurs because the response body stream's callback is invoked multiple times by the HTTP server logic. The stream callback used by streamFile(at:...)
(slightly edited to reduce code length) is:
response.body = .init(stream: { stream in
self.read(path: path, fromOffset: offset, byteCount: byteCount, chunkSize: chunkSize) {
stream.write(.buffer($0))
}.whenComplete { result in
switch result {
case .failure(let error): stream.write(.error(error), promise: nil)
case .success: stream.write(.end, promise: nil)
}
onCompleted(result)
}
}, count: byteCount, byteBufferAllocator: request.byteBufferAllocator)
The sequence of events is:
HTTPServerHandler.serialize(_:for:context:)
, in the case of a non-HTTP/2 client, writes the response to the channel and adds awhenComplete(_:)
callback to the associated future. (We'll come back to this.)- The next handler,
HTTPServerResponseEncoder
, sees that the response has a streaming body and invokes the stream callback with aChannelResponseBodyStream
that is created with the promise on which theHTTPServerHandler
is "waiting":case .stream(let stream): let channelStream = ChannelResponseBodyStream( context: context, handler: self, promise: promise, count: stream.count == -1 ? nil : stream.count ) stream.callback(channelStream)
- The stream callback starts writing the file contents to the
ChannelResponseBodyStream
, which in turn writes buffers it receives directly to the channel context. The client connection is closed, triggering anECONNRESET
, causing the write to the stream to fail. - The failed write propagates through the NIO file I/O operation and triggers the
whenComplete(_:)
callback found inside the stream callback (withinstreamFile()
), which performscase .failure(let error): stream.write(.error(error), promise: nil)
, then invokes theonCompleted
callback. - The body stream error causes the
ChannelResponseBodyStream
's promise to fail, which fails thedone
future from step 1 (I said we'd be coming back to that!). It then hits this logic:case .failure(let error): if case .stream(let stream) = response.body.storage { stream.callback(ErrorBodyStreamWriter(eventLoop: request.eventLoop, error: error)) } self.errorCaught(context: context, error: error)
- The response body is still set to the same stream callback, which is now invoked a second time (this is the underlying issue), this time with an
ErrorBodyStreamWriter
, whose only function is to respond with the original error to everything the stream callback does. - The stream callback, knowing no better (as it isn't required to be reentrant or idempotent and has no reason to guard against it), attempts the file read again. It fails immediately on the exact same code path, including invoking the
onCompleted
callback a second time.
As far as I can see, the fact that Request.FileIO.streamFile(at:chunkSize:mediaType:onCompleted:)
doesn't guard against a second invocation of the response's body stream callback (whether or not it is passed the same writer) is correct behavior. A response body stream callback expects to be invoked exactly once - for that matter, many of the scenarios which would benefit most from response streaming do so exactly because they aren't (necessarily) repeatable. This invariant is not documented, but is nonetheless implicitly part of the API contact.
The proximate cause of the bug is the change made in #2905, ironically - and all too appropriately - intended to ensure that a stream callback is always called at least once.
I'd tend to expect invoking a body stream callback to require performing the spiritual equivalent of a compare and swap (except we're on an event loop and don't actually need a lock, semaphore, whatever) - i.e.:
case .stream(let stream):
response.body.storage = .none
let channelStream = ChannelResponseBodyStream(
context: context, handler: self, promise: promise,
count: stream.count == -1 ? nil : stream.count
)
stream.callback(channelStream)
and:
if case .stream(let stream) = response.body.storage {
response.body.storage = .none
stream.callback(ErrorBodyStreamWriter(eventLoop: request.eventLoop, error: error))
}
However, I'm not clear on whether this would be 1) actually thread safe, 2) semantically correct, or, for that matter, 3) the best option available. @0xTim, can you weigh in?