googleapis/nodejs-pubsub

Messages not emitted in time for processing, resulting in failed ack deadline extensions and redeliveries

vikmovcan opened this issue · 6 comments

Description

Hello, I've been experiencing message redeliveries when using a subscription with exactly once delivery enabled where batches of messages are published. It is not clear whether this is a client or product issue, therefore I first wanted to make sure there is no misunderstanding/misconfiguration on my side.

Environment details

  • OS: MacOS 13
  • Node.js version: 20.8.1
  • npm version: 8.1.0
  • @google-cloud/pubsub version: v4.0.5

Steps to reproduce

  1. Create a pull subscription with exactly once delivery enabled and ack deadline of 60 seconds
    e.g.
   const projectId = "some-project-id";
   const topicNameOrId = "test-topic";
   const pubsub = new PubSub({ projectId });
   const topic = pubsub.topic(topicNameOrId);
   const subscriptionName = "eod-with-batching";

   await topic.createSubscription(subscriptionName, {
     enableExactlyOnceDelivery: true,
     ackDeadlineSeconds: 60,
   });
  1. Set up a subscription with the following configuration (I would like to manually extend the lease)
  const maxMessages = 300; // a couple of hundred should suffice
  const subscription = topic.subscription(subscriptionName, {
    flowControl: {
      allowExcessMessages: false,
      maxMessages: maxMessages,
      maxExtensionMinutes: 0,
    },
    maxAckDeadline: Duration.from({
      seconds: 90,
    }),
    minAckDeadline: Duration.from({
      seconds: 90,
    }),
    streamingOptions: {
      maxStreams: 2, // issue more prominent with more than one stream
    },
  });
  1. Add a message handler which adds an interval which extends the deadline of each message received every 60 seconds by 120 seconds and awaits the completion of a long-running async action.
  const waitFor = (ms: number) =>
    new Promise((resolve) => {
      setTimeout(() => {
        resolve(0);
      }, ms);
    });
    
  const messageMap: Record<string, Message> = {};

  subscription.on("message", async (message: Message) => {
    if (messageMap[message.id]) {
       console.log(`message ${message.id} unexpectedly redelivered. delivery attempt:${message.deliveryAttempt}`);
    }
  
    messageMap[message.id] = message;
  
    const interval = setInterval(async () => {
      try {
        await message.modAckWithResponse(120);
      } catch (e) {
        console.log("error", e);
      }
    }, 1000 * 60);

    await waitFor(1000 * 60 * 1200);
  });
  1. Create and publish messages (note: redeliveries occur with or without batching)
  const messagesToPublish: string[] = [];

  // ensure some messages are in the backlog
  const numberOfBatchedMessages = maxMessages * 3;

  for (let i = 0; i < numberOfBatchedMessages; i++) {
    messagesToPublish.push(
      `Test message ${i + 1} at ${new Date().toISOString()}`
    );
  }

  const publishOptions: PublishOptions = {
    batching: {
      maxMessages: 5,
      maxMilliseconds: 100,
    },
  };

  const batchPublisher = pubsub.topic(topicNameOrId, publishOptions);

  const promises = messagesToPublish.map((m) => {
    return batchPublisher.publishMessage({
      data: Buffer.from(m),
    });
  });

    const publishedMessages = await Promise.all(promises);
    console.log(`published ${publishedMessages.length} messages`);
  1. Wait for redeliveries starting to happen within minutes of running the example above. Note that the issue is intermittent.

As I understand the documentation here for EOD (https://cloud.google.com/pubsub/docs/exactly-once-delivery), as long as

  1. message is being processed
  2. message deadline keeps being extended
  3. message is not negatively acked
    the message should not get redelivered (but it is).

Is there something I am missing/doing incorrectly or this is expected behaviour?

After investigating this further, I narrowed the issue down (will update the title to match)
Some messages received by the library internally do not get emitted immediately for processing (triggering message event callback). This was identified when comparing the received timestamps on the messages against the time they got into the callback for message event.

The issue can be reproduced given the following:

a) a topic with a few hundred messages
b) a subscription using streaming pull e.g.

  const subscription = topic.subscription(subscriptionName, {
    flowControl: {
      allowExcessMessages: false,
      maxMessages: tripleDigitMaxMessages,
      maxExtensionMinutes: 0,
    },
    maxAckDeadline: Duration.from({
      seconds: 90,
    }),
    minAckDeadline: Duration.from({
      seconds: 90,
    }),
    streamingOptions: {
      maxStreams: 2, // any value greater than 1
    },
  });

c) message processing can take minutes

While this makes sense if you consider the maxMessages configuration value, it is not clear why
a) the maxMessages value gets exceeded (is this value per stream?), (not immediately, but over a period of time, the number of messages being processed reaches maxMessages value times the number of streams)
b) the "excess" messages are sent only after a (sometimes minute-long) delay
c) the library (internally) obtains and holds onto the messages which may potentially exceed their ack deadline soon after being emitted

If I want to process, for example, 500 messages at a time max with the above subscription config and manually extend each message’s ack deadline upon receipt without bumping into the problem described above, what options are there ? One option I see is restricting the number of streams to 1, thus eliminating the possibility of "excess" messages not being emitted or their ack deadlines not extended.