Errors on concurrent pipeline invocation
yahiro07 opened this issue · 3 comments
Hi. Thank you for the nice library. It helped me a lot.
In my project, pipeline operations are important for the performance.
However, I'm experiencing it causes errors when they invocated in parallel.
Here is a reproduction code example.
for (let i = 0; i < 10; i++) {
const key = `list_${i}`;
await redis.del(key);
for (let j = 0; j < 10; j++) {
await redis.lpush(key, Math.random().toString());
}
}
async function task() {
const pl = redis.pipeline();
for (let i = 0; i < 10; i++) {
pl.lrange(`list_${i}`, 0, -1);
}
return await pl.flush();
}
await Promise.all([task(), task()]);
error: Uncaught (in promise) Error: Invalid state: unknown code: '0' (48)
throw new InvalidStateError(
^
at readReply (https://deno.land/x/redis@v0.32.1/protocol/deno_streams/reply.ts:37:13)
at ...
I've debugged and found that the receiver stream of TCP is mixed up for concurrent pipeline calls.
Although I don't know this is a proper way, I made a fork to fix it and using this in my application.
I have pipeline invocations queued as well as the usual commands.
I'm not sure if this is helpful as it relates to #412, #416.
Does it worth adopting? I might be able to help the implementation.
But please don't care this if you have a better thoughts on implementing auto-pipelining related features.
Thanks,
yahiro
@yahiro07 Oh, thanks for taking a look at this problem! Your changes look good. If you don't mind, could you open a PR for this issue? 🙏
Thank you for taking care.
I want to make a pull request.
By the way, I noticed my modification works well with Redis 7, but not works with Redis 6.
I'd like to take a little more time to investigate this.
I've resolved the problem and submitted a PR.
Kindly please review it.