[Consuming] Enable a management command that can process arbitrary signals
rgraber opened this issue · 4 comments
-------Blocked on #2 -----------
A/C
- We are able to consume an arbitrary signal, as specified in arguments to a management command, and re-emit the event in the consuming service using the signal
- Review
TODO (EventBus)
comments and remove or ensure they are tasked as appropriate
Implementation Proposal:
-
Create a
get_and_fire_event_signal
(consider better method name) in event-bus-kafka that executes the poll-emit loop. -
Create a management command in course_discovery that calls
MY_EVENT_BUS_IMPL.get_and_fire_event_signal()
.
Notes
-
Look here for current implementation https://github.com/edx/edx-arch-experiments/blob/main/edx_arch_experiments/kafka_consumer/management/commands/consume_events.py
-
The consume_events processing code is currently hardcoded to only work with SubscriptionLicenseModified signal. It should be updated to work with arbitrary instances of OpenEdxPublicSignal.
-
We send a signal name/identifier in the
ce_type
header of every message. This identifier should be the same for every message in a topic. In an ideal world, we could guarantee that the signal used to create theAvroSignalDeserializer
in the management command would always match thece_type
. However, this is tricky because in order to get the message (and its headers), we need to already have a deserializer ready to go.- For a first step, we could simply update the management command to take the name of a signal as well as topic and group id. It could then use the passed signal name to fetch the signal using
OpenEdxPublicSignal.get_signal_by_type()
and use it to create the deserializer, failing informatively if the signal name doesn't match what we get in the message headers. - This solution would theoretically have to consider versioning, since the version of a signal is part of its
event_type
identifier. However, we don't expect versions to change often so this can be punted.
- For a first step, we could simply update the management command to take the name of a signal as well as topic and group id. It could then use the passed signal name to fetch the signal using
Additional thoughts:
- What type of polling loop do we plan to implement? Maybe more than one in the future? Should we document this decision? See examples here: https://docs.confluent.io/kafka-clients/python/current/overview.html#python-code-examples
- Are we going to run into trouble abstracting before adding error handling?
- I vote we use the current polling loop with a TODO about iterating on it if we decide we need the fancier loops. I think it will be hard to tell which is best until we actually see it in action
- I don't think at this level of abstraction we'll get into any trouble
I’m not sure what the current loop is, and what would be “fancier”? From the docs, I like the “Synchronous Commits” option: https://docs.confluent.io/kafka-clients/python/current/overview.html#synchronous-commits. I like its comment: “The simplest and most reliable way to manually commit offsets…”, and that it provides “at least once” guarantee, which seems like a reasonable default. In all cases, hopefully we can comment to clarify the attributes of our final choice.