Retry middleware does not handle other side closing unexpectedly
mitom opened this issue · 4 comments
I'm not entirely sure if it's a bug or if I'm doing something wrong, but hope you can help me out.
To reproduce I used a small server that will generate errors in a predictable way:
import Koa from 'koa'
import { PassThrough, Duplex, pipeline } from 'stream';
import { randomBytes, createHash } from 'crypto'
class Throttle extends Duplex {
constructor(time) {
super();
this.delay = time;
}
_read() {}
_write(chunk, encoding, callback) {
this.push(chunk);
setTimeout(callback, this.delay);
}
_final() {
this.push(null);
}
}
const app = new Koa();
let lastResult = true
const buf = []
for (let i = 0; i < 1024; i++) {
buf.push(randomBytes(1024))
}
app.use(async ctx => {
if (lastResult) {
console.log("sending error")
ctx.status = 502
} else {
console.log("sending file")
ctx.attachment("test.file")
// 1000=~1MB/s
const throttle = new Throttle(10);
const tunnel = new PassThrough()
let sent = 0
const start = new Date().getTime()
tunnel.on("data", (chunk) => {
sent += chunk.length
const timePassed = (new Date().getTime() - start)/1000
const speed = sent/timePassed / 1024
console.log(`Sent bytes: ${sent} at ${Math.round(speed)} KB/s`)
})
tunnel.on("end", () => {
console.log("Pipe finished")
})
const blob = new Blob(buf)
ctx.set("content-length", blob.size)
ctx.set("x-md5", createHash('md5').update((await blob.text())).digest("hex"))
ctx.body = tunnel
const stream = blob.stream()
pipeline(stream, throttle, tunnel, (err) => {console.log("err", err)})
setTimeout(() => {
ctx.res.destroy()
console.log("forcefully aborted")
}, 3000)
}
lastResult = !lastResult
});
app.listen(3002, () => console.log("started"));
The only dependency for it is koa (npm i koa
). It will do 2 things:
- alternate between sending a random file of 1MiB at around 90KiB/s (the bandwidth throttle is just to make sure the stream doesn't end immediately, no other purpose) and sending a 502 error
- if it starts sending the file, it'll destroy the response stream in 3 seconds without sending a close packet or similar (i.e. replicate what would happen if the network dropped between the server and client)
To fetch the file, I use the below simple client.
import wretch from 'wretch'
import { retry } from "wretch/middlewares"
import ProgressAddon from "wretch/addons/progress"
async function main() {
return wretch("http://localhost:3002")
.addon(ProgressAddon())
.middlewares([retry({ retryOnNetworkError: true })])
.fetch()
.progress((loaded, total) => {
console.log(`progress ${(loaded / total * 100).toFixed(0)}%`)
})
.blob()
}
await main()
So what I expect to happen here is:
- start download
- server gives 502
- client retries
- server streams file
- download progresses for 3s
- server abruptly closes connection
- client retries
- server sends 502
- client retries
- sever streams file again
- repeat 5-10 indefinitely
What actually happens is
- start download
- server gives 502
- client retries
- server streams file
- download progresses for 3s
- server abruptly closes connection
- client gives up with error
node:internal/deps/undici/undici:11014
fetchParams.controller.controller.error(new TypeError("terminated", {
^
TypeError: terminated
at Fetch.onAborted (node:internal/deps/undici/undici:11014:53)
at Fetch.emit (node:events:519:28)
at Fetch.terminate (node:internal/deps/undici/undici:10200:14)
at Object.onError (node:internal/deps/undici/undici:11132:38)
at _Request.onError (node:internal/deps/undici/undici:7303:31)
at errorRequest (node:internal/deps/undici/undici:9863:17)
at Socket.onSocketClose (node:internal/deps/undici/undici:9018:9)
at Socket.emit (node:events:519:28)
at TCP.<anonymous> (node:net:337:12) {
[cause]: SocketError: other side closed
at Socket.onSocketEnd (node:internal/deps/undici/undici:8994:26)
at Socket.emit (node:events:531:35)
at endReadableNT (node:internal/streams/readable:1696:12)
at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
code: 'UND_ERR_SOCKET',
socket: {
localAddress: '::1',
localPort: 52736,
remoteAddress: '::1',
remotePort: 3002,
remoteFamily: 'IPv6',
timeout: undefined,
bytesWritten: 338,
bytesRead: 280000
}
}
}
Any advice? Thanks!
Hi @mitom,
I'm not entirely sure if it's a bug or if I'm doing something wrong, but hope you can help me out.
I don't think you are doing anything wrong, it is caused by a limitation of the retry middleware that only checks the initial response from the server (status 200) to determine whether it should retry the request or not.
In this particular case the error is thrown by the .blob()
method which happens later on when downloading the body.
Thank you @elbywan, that makes sense. Do you have any suggestion on how to approach this?
What I can think of is to catch the error outside of wretch and trigger a new request through a new wretch instance until it completes (or whatever other condition is met). The downside of this is that it kinda makes the retry middleware almost harmful for me since the number of actual retries I could make is retry middleware config * my external error catching, and it's kinda difficult to adjust the combination of the both.
I've looked into the addons, but from what I understand they don't really cater to this and I'm not sure it's possible to handle an error in them when it's in the body itself. (Although, my confidence in this statemenet being accurate is low, so feel free to correct me.)
I think a middleware may be able to do this but I'm not sure if this is intended for them (i.e. docs say Middlewares are functions that can intercept requests before being processed by Fetch
- that reads to me like they're not mean to deal with this scenario).
Is the best path to wrap the error handling around wretch, drop the retry addon and just create new wretch instances or is there something in wretch that I can use to handle these nicer?
Is the best path to wrap the error handling around wretch, drop the retry addon and just create new wretch instances or is there something in wretch that I can use to handle these nicer?
I had another look at the problem this morning and I came up with a pretty concise way of doing this by tweaking the retry middleware a bit (by adding an until
argument):
import wretch from 'wretch'
import { retry } from "wretch/middlewares"
import ProgressAddon from "wretch/addons/progress"
async function main() {
return wretch("http://localhost:3002")
.addon(ProgressAddon())
.middlewares([retry({
retryOnNetworkError: true,
until: async (response, error) => {
if (!response?.ok || error) {
// Response status !== 2xx or network error = retry
return false
}
try {
// Clone the body and buffer it ahead of time…
const reader = response.clone().body.getReader();
await (async function process({ done, value }) {
if (done) return;
return process(await reader.read());
})(await reader.read());
// No issue while reading the body = don't retry
return true
} catch (readError) {
// Error while reading the body = retry
return false
}
}
})])
.get()
.progress((loaded, total) => {
console.log(`progress ${(loaded / total * 100).toFixed(0)}%`)
})
.blob()
.then(blob => console.log("blob size", blob.size))
.catch(error => console.log("error", error))
}
await main()
I tested with node@20 (21.x implementation seems bugged and panics) and I could see the requests being retried when the connection gets closed, let me know it this works for you 🤞 .
Thank you so much! That seems to work fine for me as well!