denodrivers/redis

Errors on concurrent pipeline invocation

yahiro07 opened this issue · 3 comments

Hi. Thank you for the nice library. It helped me a lot.

In my project, pipeline operations are important for the performance.
However, I'm experiencing it causes errors when they invocated in parallel.

Here is a reproduction code example.

  for (let i = 0; i < 10; i++) {
    const key = `list_${i}`;
    await redis.del(key);
    for (let j = 0; j < 10; j++) {
      await redis.lpush(key, Math.random().toString());
    }
  }

  async function task() {
    const pl = redis.pipeline();
    for (let i = 0; i < 10; i++) {
      pl.lrange(`list_${i}`, 0, -1);
    }
    return await pl.flush();
  }

  await Promise.all([task(), task()]);
error: Uncaught (in promise) Error: Invalid state: unknown code: '0' (48)
      throw new InvalidStateError(
            ^
    at readReply (https://deno.land/x/redis@v0.32.1/protocol/deno_streams/reply.ts:37:13)
    at ...

I've debugged and found that the receiver stream of TCP is mixed up for concurrent pipeline calls.

Although I don't know this is a proper way, I made a fork to fix it and using this in my application.
I have pipeline invocations queued as well as the usual commands.

yahiro07@a334f96

I'm not sure if this is helpful as it relates to #412, #416.

Does it worth adopting? I might be able to help the implementation.
But please don't care this if you have a better thoughts on implementing auto-pipelining related features.

Thanks,
yahiro

@yahiro07 Oh, thanks for taking a look at this problem! Your changes look good. If you don't mind, could you open a PR for this issue? 🙏

Thank you for taking care.
I want to make a pull request.
By the way, I noticed my modification works well with Redis 7, but not works with Redis 6.
I'd like to take a little more time to investigate this.

I've resolved the problem and submitted a PR.
Kindly please review it.