openedx/event-bus-kafka

[Consumer] Adding a replay timestamp causes the consumer to replay from the same timestamp multiple times

rgraber opened this issue · 9 comments

AC:
- [ ] Confirm that replaying events multiple times during replay was due to partitions being reassigned. (Splunk and/or New Relic should help. If we confirm that this is not the case, let's discuss next steps.)

  • Implement proposal to change offset based on timestamp and then sleep loop with logging and without processing more when a timestamp is provided.
  • Update runbook as needed.

When replaying events in prod and stage, adding a timestamp to the management command seems to make the consumer constantly restart from that time.
You can search index="stage-edx" host="ip-10-3-180-69.ec2.internal" found offset in splunk on 1/9/2023 to see an example of the same pod resetting the index multiple times in a row.
It is a little unclear whether this is the pod restarting multiple times, or the command restarting within the pod, or Kafka secretly calling the on_assignment callback that resets the offsets when we don't expect it to.

  • I assume this is meant to be a discovery ticket, with an AC to create a ticket for resolving?
  • I think we should resolve this sooner than later, but with a timeboxed effort. I want people to feel confidence when using the event bus, and this won't create confidence in the midst of a replay event, which may already be scary.
  • We think we have enough info in New Relic and logs to confirm if this is just a partition reassignment problem, but have not yet confirmed.
  • If this is the issue, do we have a chance to send different config in response to reassignment (versus initial assignment)?

[idea] What if we never poll for messages when we are resetting offsets? We either exit, or possibly loop with an inform log that message processing is being skipped until the container is restarted without the offset change.

It may be safer to find a way to not exit, but to not commit any new offsets.

New observations:

  • During the cluster of repeat-replays we saw on Dec 22, the pod count went as high as 6. We normally run 3.
    • Our cluster is configured for autoscaling between [EDIT] 3-6 pods with a target of 100% CPU utilization.
  • Some pods persisted through the hour-long incident but others did not; some new pods appeared and they or others disappeared.
  • Each of the three inadvertent replay events begins with partition reassignments (seen in the logs as a Found offsets for timestamp message with one or more partitions listed) followed by new workers starting (seen as Running consumer for messages). Some of the reassignments seem pointless, reassigning a partition to a worker that already was assigned it, but others show doubling-up.

Log extract: replay107logs.txt

So, new theory:

  1. During a replay, CPU load increases. K8s scales out the cluster to 6, and partitions are reassigned from 2-per-pod to 1-per-pod (as we have 6 partitions on that topic).
  2. Each reassignment causes replay to start over, but as this happens immediately after deployment, things settle down quickly.
  3. A few minutes after replay finishes, CPU utilization has dropped, so k8s scales the cluster in again, back to 3 pods or so.
  4. The rescaling means that some of the partitions no longer are assigned to consumers, so the broker reassigns them.
  5. Each reassignment again resets the offsets. CPU load increases, rinse, repeat.

I don't think this invalidates the proposed fix, but it does explain the behavior much better.

I think based on Tim's findings and my review of the logs, we can say that replays are not being triggered by reassignments (replays happen even when the partitions are still assigned to the same worker). I am going to add code to modify the existing behavior of replaying to reassign offsets and then sleep and we can discuss whether or not we want to continue on this route during parking lot.

I’m not clear on your conclusion or planned implementation.

It seemed to me from Tim’s findings that replays are triggered by reassignment, even when containers get the same assignments, in-turn which is triggered by scale-up and down on the number of consumers.

Are you planning on sleeping in a new loop with a log message? Seemed like a simple plan, and if you are proposing an alternative, can you clarify the benefits? Thanks.

Yeah, I think we're getting confused about what the AC meant by 'reassignment.' I think my own assumption was that it meant that the reassignment was the only thing that triggered the changes we saw, but rather that it was a result of some external juggling of workers, etc.

I was going to go ahead and modify the consume loop so that when we're in the mode where we are resetting the offsets, it just sleeps instead of trying to consume anything off the queue.

@dianakhuang: Thanks.

  1. A reminder to add logging.
  2. You'll need to see if you want a different sleep interval, since we'll be logging, but shouldn't be a big deal either way since we really don't want to leave it in this mode for long.