MongoEngine/mongoengine

(limited) query returns 0 docs after N iterations

greenomy-matteo opened this issue · 1 comments

Hello,

I'm trying to update some documents. The update conisists in adding an embedded document that is, in practice, a vector of float. Because this vector is the product of a ML model, and because I have tens of thousands of documents, I'm trying to work in batches.

        total_docs = Sentence.objects(query).no_cache().count()
        logging.info(f' - Query generated and executed. Found {total_docs} documents.')

        logging.info('Generating embeddings...')
        for i in range(0, total_docs, chunk_size):
            batch = list(Sentence.objects(query)[i:i+chunk_size])
            sentences = list(map(lambda obj: obj.sentence, batch))
            logging.info(f' - Fetched {len(sentences)} documents.')
            logging.info(f' - Embedding chunk {i} to {i + chunk_size}')
            start = timer()

            embeddings = self._embed_chunk(sentences)
            end = timer()
            logging.info(f' - Embedding performed in {end - start:.2f} seconds. Updating database...')
            start = timer()
            for j, sentence_object in enumerate(batch):
                sentence_object.update(push__embeddings=Vector(embeddings[j], self.last_model_hash),
                                       write_concern={'w':1, 'fsync': True})
            end = timer()
            logging.info(f' - Database updated of {j+1}  documents in {end - start:.2f} seconds.')

Unfortunately, when I do so, after around 2500 "documents" I see that the

            for j, sentence_object in enumerate(batch):
                sentence_object.update(push__embeddings=Vector(embeddings[j], self.last_model_hash),
                                       write_concern={'w':1, 'fsync': True})

At a certain point the number of feteched documents beging to decrease until it is zero, as you can see from my logs:

INFO:root: - Fetched 300 documents.
INFO:root: - Embedding chunk 1800 to 2100
INFO:root: - Embedding performed in 24.96 seconds. Updating database...
INFO:root: - Database updated of 300  documents in 2.20 seconds.
INFO:root: - Fetched 300 documents.
INFO:root: - Embedding chunk 2100 to 2400
INFO:root: - Embedding performed in 1.11 seconds. Updating database...
INFO:root: - Database updated of 300  documents in 2.21 seconds.
INFO:root: - Fetched 136 documents.
INFO:root: - Embedding chunk 2400 to 2700
INFO:root: - Embedding performed in 1.73 seconds. Updating database...
INFO:root: - Database updated of 136  documents in 0.92 seconds.
INFO:root: - Fetched 0 documents.
INFO:root: - Embedding chunk 2700 to 3000

I've tried with different batch sizes (i usually use a batch size of 300 and the error happens around 2700, if I use a batch size of 1000 i get it around 3000, if I try 100 around 22000). The error is not produced by the self._embed_chunk(sentences). The mongodb is on my local machine.

those are the logs from mongodb:

    '{"t":{"$date":"2023-04-11T11:46:01.657+02:00"},"s":"I",  "c":"NETWORK",  "id":22943,   "ctx":"listener","msg":"Connection accepted","attr":{"remote":"127.0.0.1:58625","uuid":"b2396341-5a53-4069-9ce3-9e5ea17e19b2","connectionId":32,"connectionCount":11}}\n',
    '{"t":{"$date":"2023-04-11T11:46:01.658+02:00"},"s":"I",  "c":"NETWORK",  "id":51800,   "ctx":"conn32","msg":"client metadata","attr":{"remote":"127.0.0.1:58625","client":"conn32","doc":{"driver":{"name":"PyMongo","version":"4.3.3"},"os":{"type":"Darwin","name":"Darwin","architecture":"arm64","version":"13.3"},"platform":"CPython 3.10.9.final.0"}}}\n',
    '{"t":{"$date":"2023-04-11T11:46:01.659+02:00"},"s":"I",  "c":"NETWORK",  "id":22943,   "ctx":"listener","msg":"Connection accepted","attr":{"remote":"127.0.0.1:58626","uuid":"7f28e5f9-7882-402e-a1d5-3db6e5be3c8a","connectionId":33,"connectionCount":12}}\n',
    '{"t":{"$date":"2023-04-11T11:46:01.659+02:00"},"s":"I",  "c":"NETWORK",  "id":22943,   "ctx":"listener","msg":"Connection accepted","attr":{"remote":"127.0.0.1:58627","uuid":"89e41391-93ef-4214-bac1-4a4f56e93558","connectionId":34,"connectionCount":13}}\n',
    '{"t":{"$date":"2023-04-11T11:46:01.660+02:00"},"s":"I",  "c":"NETWORK",  "id":51800,   "ctx":"conn33","msg":"client metadata","attr":{"remote":"127.0.0.1:58626","client":"conn33","doc":{"driver":{"name":"PyMongo","version":"4.3.3"},"os":{"type":"Darwin","name":"Darwin","architecture":"arm64","version":"13.3"},"platform":"CPython 3.10.9.final.0"}}}\n',
    '{"t":{"$date":"2023-04-11T11:46:01.660+02:00"},"s":"I",  "c":"NETWORK",  "id":51800,   "ctx":"conn34","msg":"client metadata","attr":{"remote":"127.0.0.1:58627","client":"conn34","doc":{"driver":{"name":"PyMongo","version":"4.3.3"},"os":{"type":"Darwin","name":"Darwin","architecture":"arm64","version":"13.3"},"platform":"CPython 3.10.9.final.0"}}}\n',
    '{"t":{"$date":"2023-04-11T11:47:33.662+02:00"},"s":"I",  "c":"STORAGE",  "id":20320,   "ctx":"conn34","msg":"createCollection","attr":{"namespace":"local_mongodb.sentence","uuidDisposition":"generated","uuid":{"uuid":{"$uuid":"adfeb51c-9aa7-471b-ae96-665245cd09af"}},"options":{}}}\n',
    '{"t":{"$date":"2023-04-11T11:47:33.684+02:00"},"s":"I",  "c":"INDEX",    "id":20345,   "ctx":"conn34","msg":"Index build: done building","attr":{"buildUUID":null,"collectionUUID":{"uuid":{"$uuid":"adfeb51c-9aa7-471b-ae96-665245cd09af"}},"namespace":"local_mongodb.sentence","index":"_id_","ident":"index-13-4525368203612087005","collectionIdent":"collection-12-4525368203612087005","commitTimestamp":null}}\n',
    '{"t":{"$date":"2023-04-11T11:48:31.497+02:00"},"s":"I",  "c":"COMMAND",  "id":51803,   "ctx":"conn31","msg":"Slow query","attr":{"type":"command","ns":"admin.$cmd","command":{"ismaster":1,"$db":"admin"},"numYields":0,"reslen":304,"locks":{},"remote":"127.0.0.1:58591","protocol":"op_msg","durationMillis":125}}\n',
    '{"t":{"$date":"2023-04-11T11:49:28.278+02:00"},"s":"I",  "c":"COMMAND",  "id":20499,   "ctx":"ftdc","msg":"serverStatus was very slow","attr":{"timeStats":{"after basic":0,"after activeIndexBuilds":0,"after asserts":0,"after batchedDeletes":0,"after bucketCatalog":0,"after catalogStats":0,"after connections":0,"after electionMetrics":0,"after extra_info":0,"after flowControl":0,"after freeMonitoring":0,"after globalLock":0,"after indexBulkBuilder":0,"after indexStats":0,"after locks":0,"after logicalSessionRecordCache":0,"after mirroredReads":0,"after network":0,"after opLatencies":0,"after opcounters":0,"after opcountersRepl":0,"after oplog":0,"after oplogTruncation":0,"after readConcernCounters":0,"after repl":0,"after scramCache":0,"after security":0,"after storageEngine":0,"after tenantMigrations":0,"after trafficRecording":0,"after transactions":0,"after transportSecurity":0,"after twoPhaseCommitCoordinator":0,"after wiredTiger":1214,"at end":1256}}}\n',
    '{"t":{"$date":"2023-04-11T11:49:44.765+02:00"},"s":"I",  "c":"NETWORK",  "id":22944,   "ctx":"conn33","msg":"Connection ended","attr":{"remote":"127.0.0.1:58626","uuid":"7f28e5f9-7882-402e-a1d5-3db6e5be3c8a","connectionId":33,"connectionCount":11}}\n',
    '{"t":{"$date":"2023-04-11T11:49:44.764+02:00"},"s":"I",  "c":"NETWORK",  "id":22944,   "ctx":"conn34","msg":"Connection ended","attr":{"remote":"127.0.0.1:58627","uuid":"89e41391-93ef-4214-bac1-4a4f56e93558","connectionId":34,"connectionCount":12}}\n',
    '{"t":{"$date":"2023-04-11T11:49:44.780+02:00"},"s":"W",  "c":"NETWORK",  "id":4615610, "ctx":"conn32","msg":"Failed to check socket connectivity","attr":{"error":{"code":6,"codeName":"HostUnreachable","errmsg":"Connection closed by peer"}}}\n',
    '{"t":{"$date":"2023-04-11T11:49:44.780+02:00"},"s":"I",  "c":"-",        "id":20883,   "ctx":"conn32","msg":"Interrupted operation as its client disconnected","attr":{"opId":258876}}\n',
    '{"t":{"$date":"2023-04-11T11:49:44.780+02:00"},"s":"I",  "c":"NETWORK",  "id":22944,   "ctx":"conn32","msg":"Connection ended","attr":{"remote":"127.0.0.1:58625","uuid":"b2396341-5a53-4069-9ce3-9e5ea17e19b2","connectionId":32,"connectionCount":10}}\n'

MongoDB version is 6.0.5, Macos 13.3

Found the problem.

I had to change this:

batch = list(Sentence.objects(query)[i:i+chunk_size])
into this

batch = list(Sentence.objects(query).limit(chunk_size))
I was moving the cursor on an indexing that was "old". In fact, I was rerunning the query that was each time returning a smaller number of records but i was "cutting" with the "initial" query in mind.