openedx/event-bus-kafka

[Consuming] Enable a management command that can process arbitrary signals

rgraber opened this issue · 4 comments

-------Blocked on #2 -----------

A/C

  • We are able to consume an arbitrary signal, as specified in arguments to a management command, and re-emit the event in the consuming service using the signal
  • Review TODO (EventBus) comments and remove or ensure they are tasked as appropriate

Implementation Proposal:

  • Create a get_and_fire_event_signal (consider better method name) in event-bus-kafka that executes the poll-emit loop.

  • Create a management command in course_discovery that calls MY_EVENT_BUS_IMPL.get_and_fire_event_signal().

Notes

  • Look here for current implementation https://github.com/edx/edx-arch-experiments/blob/main/edx_arch_experiments/kafka_consumer/management/commands/consume_events.py

  • The consume_events processing code is currently hardcoded to only work with SubscriptionLicenseModified signal. It should be updated to work with arbitrary instances of OpenEdxPublicSignal.

  • We send a signal name/identifier in the ce_type header of every message. This identifier should be the same for every message in a topic. In an ideal world, we could guarantee that the signal used to create the AvroSignalDeserializer in the management command would always match the ce_type. However, this is tricky because in order to get the message (and its headers), we need to already have a deserializer ready to go.

    • For a first step, we could simply update the management command to take the name of a signal as well as topic and group id. It could then use the passed signal name to fetch the signal using OpenEdxPublicSignal.get_signal_by_type() and use it to create the deserializer, failing informatively if the signal name doesn't match what we get in the message headers.
    • This solution would theoretically have to consider versioning, since the version of a signal is part of its event_type identifier. However, we don't expect versions to change often so this can be punted.

Additional thoughts:

  • I vote we use the current polling loop with a TODO about iterating on it if we decide we need the fancier loops. I think it will be hard to tell which is best until we actually see it in action
  • I don't think at this level of abstraction we'll get into any trouble

I’m not sure what the current loop is, and what would be “fancier”? From the docs, I like the “Synchronous Commits” option: https://docs.confluent.io/kafka-clients/python/current/overview.html#synchronous-commits. I like its comment: “The simplest and most reliable way to manually commit offsets…”, and that it provides “at least once” guarantee, which seems like a reasonable default. In all cases, hopefully we can comment to clarify the attributes of our final choice.

@rgraber: Can you note why this is blocked? If blocked on another ticket, maybe we can a section for that in the description? Thanks.