Really Confusing Error
chokosabe opened this issue · 2 comments
chokosabe commented
Created this Strategy based off the examples.
import json
import logging
from typing import Callable, Mapping, TypeVar
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies import CommitOffsets, RunTask
from arroyo.processing.strategies.abstract import (
ProcessingStrategy,
ProcessingStrategyFactory,
)
from arroyo.types import Commit, Message, Partition
logger = logging.getLogger(__name__)
T = TypeVar('T')
class DBPersistStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
"""
A factory that builds a processing strategy for persisting messages to a database.
The processing function should handle the logic for transforming and saving the messages.
"""
def __init__(self, processing_function: Callable[[Message[KafkaPayload]], None]):
self.__processing_function = processing_function
def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return RunTask(
function=self.__processing_function,
next_step=CommitOffsets(commit),
)
Trying to run it in tests and getting this error:
except InvalidMessage as e:
self._handle_invalid_message(e)
else:
# Resume if we are currently in a paused state
if self.__is_paused:
self.__metrics_buffer.incr_counter("arroyo.consumer.resume", 1)
self.__consumer.resume([*self.__consumer.tell().keys()])
self.__is_paused = False
# Clear backpressure timestamp if it is set
self._clear_backpressure()
self.__message = None
else:
if self.__message is not None:
> raise InvalidStateError(
"received message without active processing strategy"
)
E arroyo.processing.processor.InvalidStateError: received message without active processing strategy
/usr/local/lib/python3.10/site-packages/arroyo/processing/processor.py:457: InvalidStateError
My question is - what is an active processing strategy?!
Thanks - really need some sane basic examples. The 2 currently there not enough
untitaker commented
can you post the full example where you use the processing strategy? the reason the error message is confusing is because you're tripping an internal assertion that the user is never supposed to see. I don't think our examples can do that
untitaker commented
no response, closing