long2ice/meilisync

Data not syncing

MartijnStraatman opened this issue · 16 comments

Data is not syncing between postgres:16-bookworm ( with wal2json installed) and meilisync.

Logs:

2024-01-13 09:02:33 2024-01-13 08:02:33.273 | INFO     | meilisync.main:_:101 - Start increment sync data from "SourceType.postgres" to MeiliSearch...
2024-01-13 09:02:33 2024-01-13 08:02:33.284 | DEBUG    | meilisync.main:_:104 - progress={'start_lsn': '0/19B5B50'}

Logs PSQL:

2024-01-13 09:02:33 2024-01-13 08:02:33.275 UTC [35] ERROR:  replication slot "meilisync" already exists
2024-01-13 09:02:33 2024-01-13 08:02:33.275 UTC [35] STATEMENT:  CREATE_REPLICATION_SLOT "meilisync" LOGICAL "wal2json"
2024-01-13 09:02:33 2024-01-13 08:02:33.279 UTC [35] LOG:  0/19B5B50 has been already streamed, forwarding to 0/19B5B88
2024-01-13 09:02:33 2024-01-13 08:02:33.279 UTC [35] STATEMENT:  START_REPLICATION SLOT "meilisync" LOGICAL 0/019B5B50 ("include-lsn" 'true')
2024-01-13 09:02:33 2024-01-13 08:02:33.283 UTC [35] LOG:  starting logical decoding for slot "meilisync"
2024-01-13 09:02:33 2024-01-13 08:02:33.283 UTC [35] DETAIL:  Streaming transactions committing after 0/19B5B88, reading WAL from 0/19B5B50.
2024-01-13 09:02:33 2024-01-13 08:02:33.283 UTC [35] STATEMENT:  START_REPLICATION SLOT "meilisync" LOGICAL 0/019B5B50 ("include-lsn" 'true')
2024-01-13 09:02:33 2024-01-13 08:02:33.283 UTC [35] LOG:  logical decoding found consistent point at 0/19B5B50
2024-01-13 09:02:33 2024-01-13 08:02:33.283 UTC [35] DETAIL:  There are no running transactions.
2024-01-13 09:02:33 2024-01-13 08:02:33.283 UTC [35] STATEMENT:  START_REPLICATION SLOT "meilisync" LOGICAL 0/019B5B50 ("include-lsn" 'true')

Some help is appreciated!

Make sure you do this

I followed the tutorial and am also having issues with the data not syncing... the service seems to start up and then do nothing. I tried running meilisync check -t <my table> inside Docker and it spit out the following error

2024-01-20 06:50:06.119 | DEBUG    | meilisync.main:_:36 - plugins=[] progress=Progress(type=<ProgressType.file: 'file'>) debug=True source=Source(type=<SourceType.postgres: 'postgres'>, database='tjnickerson', host='host.docker.internal', port=5432, user='tjnickerson') meilisearch=MeiliSearch(api_url='http://host.docker.internal:7700/', api_key='3OF83dZ0dl-3Am85Va9eri9IrQZob22pD91hRXlvbXU', insert_size=1000, insert_interval=10) sync=[Sync(plugins=[], table='entries', pk='id', full=True, index='entries', fields=None)] sentry=None
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /meilisync/meilisync/main.py:208 in check                                                        │
│                                                                                                  │
│   205 │   │   │   │   │   │   f'MeiliSearch count: {meili_count}."'                              │
│   206 │   │   │   │   │   )                                                                      │
│   207 │                                                                                          │
│ ❱ 208 │   asyncio.run(_())                                                                       │
│   209                                                                                            │
│   210                                                                                            │
│   211 if __name__ == "__main__":                                                                 │
│                                                                                                  │
│ ╭─────────────────────── locals ────────────────────────╮                                        │
│ │       _ = <function check.<locals>._ at 0x4008927f60> │                                        │
│ │ context = <click.core.Context object at 0x40085e3da0> │                                        │
│ │   table = ['entries']                                 │                                        │
│ ╰───────────────────────────────────────────────────────╯                                        │
│                                                                                                  │
│ /usr/local/lib/python3.12/asyncio/runners.py:194 in run                                          │
│                                                                                                  │
│   191 │   │   │   "asyncio.run() cannot be called from a running event loop")                    │
│   192 │                                                                                          │
│   193 │   with Runner(debug=debug, loop_factory=loop_factory) as runner:                         │
│ ❱ 194 │   │   return runner.run(main)                                                            │
│   195                                                                                            │
│   196                                                                                            │
│   197 def _cancel_all_tasks(loop):                                                               │
│                                                                                                  │
│ ╭────────────────────────────── locals ──────────────────────────────╮                           │
│ │        debug = None                                                │                           │
│ │ loop_factory = None                                                │                           │
│ │         main = <coroutine object check.<locals>._ at 0x400886ccc0> │                           │
│ │       runner = <asyncio.runners.Runner object at 0x4005d0f0e0>     │                           │
│ ╰────────────────────────────────────────────────────────────────────╯                           │
│                                                                                                  │
│ /usr/local/lib/python3.12/asyncio/runners.py:118 in run                                          │
│                                                                                                  │
│   115 │   │                                                                                      │
│   116 │   │   self._interrupt_count = 0                                                          │
│   117 │   │   try:                                                                               │
│ ❱ 118 │   │   │   return self._loop.run_until_complete(task)                                     │
│   119 │   │   except exceptions.CancelledError:                                                  │
│   120 │   │   │   if self._interrupt_count > 0:                                                  │
│   121 │   │   │   │   uncancel = getattr(task, "uncancel", None)                                 │
│                                                                                                  │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │        context = <_contextvars.Context object at 0x4008923c00>                               │ │
│ │           coro = <coroutine object check.<locals>._ at 0x400886ccc0>                         │ │
│ │           self = <asyncio.runners.Runner object at 0x4005d0f0e0>                             │ │
│ │ sigint_handler = functools.partial(<bound method Runner._on_sigint of                        │ │
│ │                  <asyncio.runners.Runner object at 0x4005d0f0e0>>, main_task=<Task finished  │ │
│ │                  name='Task-4' coro=<check.<locals>._() done, defined at                     │ │
│ │                  /meilisync/meilisync/main.py:188> exception=KeyError(0)>)                   │ │
│ │           task = <Task finished name='Task-4' coro=<check.<locals>._() done, defined at      │ │
│ │                  /meilisync/meilisync/main.py:188> exception=KeyError(0)>                    │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
│                                                                                                  │
│ /usr/local/lib/python3.12/asyncio/base_events.py:684 in run_until_complete                       │
│                                                                                                  │
│    681 │   │   if not future.done():                                                             │
│    682 │   │   │   raise RuntimeError('Event loop stopped before Future completed.')             │
│    683 │   │                                                                                     │
│ ❱  684 │   │   return future.result()                                                            │
│    685 │                                                                                         │
│    686 │   def stop(self):                                                                       │
│    687 │   │   """Stop running the event loop.                                                   │
│                                                                                                  │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │   future = <Task finished name='Task-4' coro=<check.<locals>._() done, defined at            │ │
│ │            /meilisync/meilisync/main.py:188> exception=KeyError(0)>                          │ │
│ │ new_task = False                                                                             │ │
│ │     self = <_UnixSelectorEventLoop running=False closed=True debug=False>                    │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
│                                                                                                  │
│ /meilisync/meilisync/main.py:194 in _                                                            │
│                                                                                                  │
│   191 │   │   meili = context.obj["meili"]                                                       │
│   192 │   │   for sync in settings.sync:                                                         │
│   193 │   │   │   if not table or sync.table in table:                                           │
│ ❱ 194 │   │   │   │   count = await source.get_count(sync)                                       │
│   195 │   │   │   │   meili_count = await meili.get_count(sync.index_name)                       │
│   196 │   │   │   │   if count == meili_count:                                                   │
│   197 │   │   │   │   │   logger.info(                                                           │
│                                                                                                  │
│ ╭───────────────────────────────── locals ──────────────────────────────────╮                    │
│ │  context = <click.core.Context object at 0x40085e3da0>                    │                    │
│ │    meili = <meilisync.meili.Meili object at 0x400862e990>                 │                    │
│ │ settings = Settings(                                                      │                    │
│ │            │   plugins=[],                                                │                    │
│ │            │   progress=Progress(type=<ProgressType.file: 'file'>),       │                    │
│ │            │   debug=True,                                                │                    │
│ │            │   source=Source(                                             │                    │
│ │            │   │   type=<SourceType.postgres: 'postgres'>,                │                    │
│ │            │   │   database='tjnickerson',                                │                    │
│ │            │   │   host='host.docker.internal',                           │                    │
│ │            │   │   port=5432,                                             │                    │
│ │            │   │   user='tjnickerson'                                     │                    │
│ │            │   ),                                                         │                    │
│ │            │   meilisearch=MeiliSearch(                                   │                    │
│ │            │   │   api_url='http://host.docker.internal:7700/',           │                    │
│ │            │   │   api_key='3OF83dZ0dl-3Am85Va9eri9IrQZob22pD91hRXlvbXU', │                    │
│ │            │   │   insert_size=1000,                                      │                    │
│ │            │   │   insert_interval=10                                     │                    │
│ │            │   ),                                                         │                    │
│ │            │   sync=[                                                     │                    │
│ │            │   │   Sync(                                                  │                    │
│ │            │   │   │   plugins=[],                                        │                    │
│ │            │   │   │   table='entries',                                   │                    │
│ │            │   │   │   pk='id',                                           │                    │
│ │            │   │   │   full=True,                                         │                    │
│ │            │   │   │   index='entries',                                   │                    │
│ │            │   │   │   fields=None                                        │                    │
│ │            │   │   )                                                      │                    │
│ │            │   ],                                                         │                    │
│ │            │   sentry=None                                                │                    │
│ │            )                                                              │                    │
│ │   source = <meilisync.source.postgres.Postgres object at 0x40056c5070>    │                    │
│ │     sync = Sync(                                                          │                    │
│ │            │   plugins=[],                                                │                    │
│ │            │   table='entries',                                           │                    │
│ │            │   pk='id',                                                   │                    │
│ │            │   full=True,                                                 │                    │
│ │            │   index='entries',                                           │                    │
│ │            │   fields=None                                                │                    │
│ │            )                                                              │                    │
│ │    table = ['entries']                                                    │                    │
│ ╰───────────────────────────────────────────────────────────────────────────╯                    │
│                                                                                                  │
│ /meilisync/meilisync/source/postgres.py:122 in get_count                                         │
│                                                                                                  │
│   119 │   │   with self.conn_dict.cursor() as cur:                                               │
│   120 │   │   │   cur.execute(f"SELECT COUNT(*) FROM {sync.table}")                              │
│   121 │   │   │   ret = cur.fetchone()                                                           │
│ ❱ 122 │   │   │   return ret[0]                                                                  │
│   123 │                                                                                          │
│   124 │   async def __aiter__(self):                                                             │
│   125 │   │   self.queue = Queue()                                                               │
│                                                                                                  │
│ ╭────────────────────────────────────────── locals ──────────────────────────────────────────╮   │
│ │  cur = <cursor object at 0x4008948c50; closed: -1>                                         │   │
│ │  ret = RealDictRow({'count': 1182940})                                                     │   │
│ │ self = <meilisync.source.postgres.Postgres object at 0x40056c5070>                         │   │
│ │ sync = Sync(plugins=[], table='entries', pk='id', full=True, index='entries', fields=None) │   │
│ ╰────────────────────────────────────────────────────────────────────────────────────────────╯   │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
KeyError: 0

Make sure that the indexes you specified already exist in the database and in meillisearch

Both the table I'm querying does exist, and AFAIK so does the index (confirmed via the Meilisearch REST API)

Can you share the config file with me?

I got it working by using this docker image quay.io/debezium/postgres:16. I was using just the default psql image.
It would expect it to work with the default psql image, or at least document which libraries to load.

I am using the official image but the problem seems to be in your database configuration, check the ports

Which Ports do I need to check?

Try that command
docker pull postgres
docker run --name some-postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres
Change password, and default username: postgres, that command run postgres on port 5432
try that and tell me if still problem

Do not forget to create the tables that you specified in the configuration file

Hey, I have the same error, I don't use docker. I have a local Postgres 15 instance on Debian 12.
The Meilisearch is another Debian 12 Server, with Meilisearch 1.6.2.
And yes, I have followed all the steps in the installation instructions.

The error pattern looks exactly as described in the issue.

Brief feedback from me, the problem has been solved. I started the sync again today and it worked. Unfortunately, I can't say what caused it, maybe I forgot to restart some service of the database.