swift-server/async-http-client

Provide the Channel with the Response

cloewen8 opened this issue · 7 comments

Use Case

In addition to executing an HTTP request, I also need to send additional data to the server after the initial request and periodically after that.

I don't believe this is currently possible but would be required for my use case.

Potential Solution

One solution could be to provide the Connection as a property of the Response.

This is similar to a solution I used for OkHttp (an HTTP client for Android):

// Inside an Interceptor
Runnable nmeaProvider = new NMEAProvider(ntripReq, Objects.requireNonNull(chain.connection()).socket());

Willing to Contribute

This is my top priority. If my solution is approved, I would be happy to provide a pull request implementing it right away along with any required tests.

With the new async/await API you can provide an AsyncSequence to be used as the body of the request, using this constructor:

public static func stream<SequenceOfBytes: AsyncSequence & Sendable>(

This should allow you to publish data incrementally as needed. Note that it will still be HTTP-framed, so it does not allow arbitrary data sending.

Unfortunately AsyncSequence is unavailable (need to support iOS 12 minimum). If you know of any other way, please let me know. Glad to know it will be possible, thank you for your help.

Is the new data you have to send part of the HTTP body of your request?

Is the new data you have to send part of the HTTP body of your request?

Yes. The data needs to be sent after the HTTP head (as soon as possible), then roughly every 10~ seconds. Here is an example:

$GPGGA,111123.0,1220.70000000,N,06753.40600000,E,4,10,1.0,0.000,M,0.0,M,,*6B<CR><LF>

Interesting. So you can use the existing StreamWriter abstraction for this. If you set the Request body to .stream, that callback will be invoked with a StreamWriter. You can stream body parts into this StreamWriter (and thus into the request) by calling StreamWriter.write, which will return you a Future that completes when the write is flushed. From the callback that gives you the StreamWriter, you return a Future that signals that you're finished sending the body (you need to complete the associated promise when you're done).

Importantly, you can safely escape the StreamWriter and save it off somewhere. This will let you achieve your goal, I think.

You can see an example of using this here:

let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let client = HTTPClient(eventLoopGroupProvider: .shared(group))
defer {
XCTAssertNoThrow(try client.syncShutdown())
}
let headPromise = group.next().makePromise(of: HTTPRequestHead.self)
let bodyPromises = (0..<16).map { _ in group.next().makePromise(of: ByteBuffer.self) }
let endPromise = group.next().makePromise(of: Void.self)
let sentOffAllBodyPartsPromise = group.next().makePromise(of: Void.self)
let streamWriterPromise = group.next().makePromise(of: HTTPClient.Body.StreamWriter.self)
func makeServer() -> Channel? {
return try? ServerBootstrap(group: group)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline().flatMap {
channel.pipeline.addHandler(HTTPServer(headPromise: headPromise,
bodyPromises: bodyPromises,
endPromise: endPromise))
}
}
.serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
.bind(host: "127.0.0.1", port: 0)
.wait()
}
func makeRequest(server: Channel) -> Request? {
guard let localAddress = server.localAddress else {
return nil
}
return try? HTTPClient.Request(url: "http://\(localAddress.ipAddress!):\(localAddress.port!)",
method: .POST,
headers: ["transfer-encoding": "chunked"],
body: .stream { streamWriter in
streamWriterPromise.succeed(streamWriter)
return sentOffAllBodyPartsPromise.futureResult
})
}
guard let server = makeServer(), let request = makeRequest(server: server) else {
XCTFail("couldn't make a server Channel and a matching Request...")
return
}
defer {
XCTAssertNoThrow(try server.close().wait())
}
var buffer = ByteBufferAllocator().buffer(capacity: 1)
let runningRequest = client.execute(request: request)
guard let streamWriter = try? streamWriterPromise.futureResult.wait() else {
XCTFail("didn't get StreamWriter")
return
}
XCTAssertEqual(.POST, try headPromise.futureResult.wait().method)
for bodyChunkNumber in 0..<16 {
buffer.clear()
buffer.writeString(String(bodyChunkNumber, radix: 16))
XCTAssertEqual(1, buffer.readableBytes)
XCTAssertNoThrow(try streamWriter.write(.byteBuffer(buffer)).wait())
XCTAssertEqual(buffer, try bodyPromises[bodyChunkNumber].futureResult.wait())
}
sentOffAllBodyPartsPromise.succeed(())
XCTAssertNoThrow(try endPromise.futureResult.wait())
XCTAssertNoThrow(try runningRequest.wait())

Completely missed the stream option. Appears to be working