faust-streaming/faust

Still fetching changelog topics for recovery

pythoniste opened this issue · 4 comments

Steps to reproduce

We have a test platform that works fine. We have some topics (all have 3 partitions) linked to some records and some RocksDB tables storing some of the records.

When we do a test release, we encounter an message that says, over and over :

[2023-01-16 14:18:33,333] [1] [INFO] [^---Recovery]: Still fetching changelog topics for recovery, estimated time remaining **none** (total remaining=29):
┌Remaining for active recovery──────────────────
│ topic                                            │ partition │ need offset │ have offset │ remaining │
──────────────────
│ orchestrator-steps-changelog │ 0         │ 1247        │ 1224        │ 23        │
│ 〃                                           │ 1         │ 1111        │ 1106        │ 5         │
│ 〃                                           │ 2         │ 3963        │ 3962        │ 1         │

Actual behavior

The recovery process seems to do nothing. The error message seems to be caused by _publish_stats routine.
The none part of the message seems to appear because we do not have sufficient number of message (num_samples_required_for_estimate versus _processing_times), if my understanding is correct. I do not know if this is a relevant information.

The only way to resolve this issue is to delete rocksdb database and empty all topics, and we do not want to be doing this.

Questions

  • Is the issue is really due to the fact that the record has changed ?
  • Should we put in place a mecanism to deal with "record migration" ?
  • Is there a way to deal with this issue other than empy the topics ?

Versions

  • Python: 3.11
  • faust-streaming: ^0.8.6
  • faust-streaming-rocksdb: ^0.8.0
  • python-rocksdb: ^0.7.0
  • confluent-kafka: 1.9.0
  • Operating system: bulleyes (docker image built from python 3.11.4-bullseye)
  • Kafka version: 3.2
  • RocksDB version (if applicable): 6.29.5

Could you test with a different version of Python? I'm wondering if this is related to ongoing issues with 3.11 support.

Could you test with a different version of Python? I'm wondering if this is related to ongoing issues with 3.11 support.

Sorry, I was wrong. The issue is with Python 3.10.4. I upgrade every container to Python 3.11 a few months ago, except the ones using Kafka because we havan't been able to install RocksDB with Python 3.11.

After doing nothing but restarting the container, we have a new set of messages:

[^---Recovery]: Restoring state from changelog topics...
[^---Recovery]: Resuming flow...
[^---Fetcher]: Starting...
[INFO] Fetch offset 21 is out of range for partition TopicPartition(topic=orchestrator-steps-changelog', partition=2), resetting offset
Error in recovery invalid literal for int() with base 10: '00:00:59'

So, my first hunch was the correct one: the problems happens because the record has been modified. I should have told you that use coerce=True and validation=True.

Now I have pointed the exact problem : I had a field named duration: str which contained a string formated duration such as 00:12:34" meaning 12 minutes and 34 seconds. Now, duration is an integer and I store the number of seconds. The formatting happens on the display only.

The good news is that there is no bug, except the fact that the loop message that i have first reported floods the log file and appears useless.

Now, the question remains the same : when i do something like that (we are in a first draft phase), what can I do to recover old data ? (Some sort of migration process) ?

Also, this happens only on the test platform, not with my local settings. I am thinking of a configuration issue. If you can think of anything to help me reproduce this in my local settings, please let me know.

Sorry, I was wrong. The issue is with Python 3.10.4. I upgrade every container to Python 3.11 a few months ago, except the ones using Kafka because we havan't been able to install RocksDB with Python 3.11.

Sorry to hear that, I have support in our Python-RocksDB fork for Python 3.11 under https://pypi.org/project/faust-streaming-rocksdb/#files. Since it affects Python 3.10, that narrows our issues down. We've been having some issues supporting Python 3.11 since there were changes to the asyncio library.

So, my first hunch was the correct one: the problems happens because the record has been modified. I should have told you that use coerce=True and validation=True.

That helps to know. 😄

Now, the question remains the same : when i do something like that (we are in a first draft phase), what can I do to recover old data ? (Some sort of migration process) ?

Not sure what you mean about old data. You mean you want to replay the changelog for a table? You could configure a Faust Agent to consume messages from the topic, starting from offset 0 (I don't recommend it).

I know that a lot of the data is stored in the RocksDB stores, since they're essentially used to replay a changelog topic without needing to consume messages from a broker.

A better solution is that you could directly parse entries within those files before initializing your Faust application. I've used https://github.com/Congyuwang/RocksDict and https://github.com/adammarples/rocksdbdict for this before and it works fine. I've noticed that rocksdict parses roughly 11x faster so you may want to use that. The only downside is that the boilerplate settings code is painful, you need to essentially copy every setting from the rocksdb.py file in Faust. I'll see if I can release my implementation, for your convenience.

Also, this happens only on the test platform, not with my local settings. I am thinking of a configuration issue. If you can think of anything to help me reproduce this in my local settings, please let me know.

I suspect there is a configuration issue here as well. However I cannot immediately think of how to reproduce this.

Error in recovery invalid literal for int() with base 10: '00:00:59'

This error is reminiscent of #389. I never got a chance to formally address this.