acupofjose/elasticstore

Adding retry_on_conflict parameter?

Opened this issue · 12 comments

Hey, thanks for the cool project, got me up and running very quickly.

I'm using the project to allow search of some fields in our document collection, which is constantly being updated with new documents. Occasionally its possible we modify some of the documents, but that is rare.

I'm running into this issue occasionally:

Error in FS_ADDED handler [doc@1098949404239818800]: [version_conflict_engine_exception] [tweets][1098949404239818800]: version conflict, current version [2] is different than the one provided [1], with { index_uuid="qj6ftGLfSYqNEut4ShXbeg" & shard="2" & index="tweets" }

I've not been able to confirm, but some testers have reported that not all search results that should show up are actually showing up. I believe this is the cause, as those documents simply have failed to add?

From my research, looks like adding a retry_on_conflict value of one or two should handle these issues. Looks like it would go in handleAdded in FirestoreHandler.ts

This look right to you?

@StoryStar glad it's been of help to you!

It would appear that yes, editing the retry_on_conflict parameter would should fix this problem. Appreciate that you did some research into it.

Sources:

I've added a different branch fix/retry-on-conflict for you to pull and test with!

Please let me know if that does, in fact, fix your issue, and I will merge it with the master.

Hey sorry took me a while to get back to you.

Appreciate the help, that looks like it should work -- I'd love to confirm, but unfortunately I have a new issue and am unable to get back to that point so I can test.

Not sure if this is worth opening a new issue for, let me know if you would like me to.

My DB is quite large now, and the size seems to be causing me some issues. When first running the javascript, it indexes a bunch of the documents, occasionally running into a quota exceeded error (where it backs off and reconnects just fine) -- but eventually gets stuck and stops indexing. It seems to be an issue with reading Firebase, as I get this:

'Error 4: The datastore operation timed out, or the data was temporarily unavailable.'
and
'Received stream error: { Error: 4 DEADLINE_EXCEEDED: Deadline Exceeded'

Not exactly sure what's causing the issue, besides slamming Firestore with a lot of requests for the number of documents I'm trying to index. I found possibly related bugs in the node admin sdk, however they are a little old: firebase/firebase-admin-node#119 and firebase/firebase-admin-node#271

After it runs into the error, it appears to completely stop indexing, and Elasticsearch also does not have any index for the documents that did successfully load -- if I make a test search, it returns index not found.

Any ideas?

Detailed log follows, including detailed Firestore logging

Firestore (0.14.1) 2019-03-05T22:14:45.187Z [Watch.onSnapshot]: Processing change event
Firestore (0.14.1) 2019-03-05T22:14:45.187Z [Watch.onSnapshot]: Received document change
Firestore (0.14.1) 2019-03-05T22:14:45.190Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[1],"targetChangeType":"REMOVE","cause":{"details":[],"code":4,"message":"The datastore operation timed out, or the data was temporarily unavailable."},"resumeToken":[],"readTime":null},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:14:45.190Z [Watch.onSnapshot]: Processing target change
Firestore (0.14.1) 2019-03-05T22:14:45.191Z [Watch.onSnapshot]: Invoking onError:  Error: Error 4: The datastore operation timed out, or the data was temporarily unavailable.
    at DestroyableTransform.stream.on.proto (/home/developer/tweetSearch/elasticstore/node_modules/@google-cloud/firestore/src/watch.js:718:25)
    at DestroyableTransform.emit (events.js:189:13)
    at addChunk (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_readable.js:291:12)
    at readableAddChunk (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_readable.js:278:11)
    at DestroyableTransform.Readable.push (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_readable.js:245:10)
    at DestroyableTransform.Transform.push (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_transform.js:148:32)
    at DestroyableTransform.afterTransform (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_transform.js:91:10)
    at DestroyableTransform.noop [as _transform] (/home/developer/tweetSearch/elasticstore/node_modules/through2/through2.js:26:3)
    at DestroyableTransform.Transform._read (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_transform.js:184:10)
    at DestroyableTransform.Transform._write (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_transform.js:172:83)
Error: Error 4: The datastore operation timed out, or the data was temporarily unavailable.
    at DestroyableTransform.stream.on.proto (/home/developer/tweetSearch/elasticstore/node_modules/@google-cloud/firestore/src/watch.js:718:25)
    at DestroyableTransform.emit (events.js:189:13)
    at addChunk (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_readable.js:291:12)
    at readableAddChunk (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_readable.js:278:11)
    at DestroyableTransform.Readable.push (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_readable.js:245:10)
    at DestroyableTransform.Transform.push (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_transform.js:148:32)
    at DestroyableTransform.afterTransform (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_transform.js:91:10)
    at DestroyableTransform.noop [as _transform] (/home/developer/tweetSearch/elasticstore/node_modules/through2/through2.js:26:3)
    at DestroyableTransform.Transform._read (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_transform.js:184:10)
    at DestroyableTransform.Transform._write (/home/developer/tweetSearch/elasticstore/node_modules/readable-stream/lib/_stream_transform.js:172:83)
Firestore (0.14.1) 2019-03-05T22:14:45.192Z [Watch.onSnapshot]: Processing stream end
Firestore (0.14.1) 2019-03-05T22:14:45.223Z [Firestore._initializeStream]: Received stream end
Firestore (0.14.1) 2019-03-05T22:14:45.224Z [Watch.onSnapshot]: Stream ended, sending error:  { Error: Stream ended unexpectedly
    at BunWrapper.currentStream.on (/home/developer/tweetSearch/elasticstore/node_modules/@google-cloud/firestore/src/watch.js:487:27)
    at BunWrapper.emit (events.js:194:15)
    at /home/developer/tweetSearch/elasticstore/node_modules/bun/node_modules/readable-stream/lib/_stream_readable.js:965:16
    at process._tickCallback (internal/process/next_tick.js:61:11) code: 2 }
Firestore (0.14.1) 2019-03-05T22:15:21.367Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[],"targetChangeType":"NO_CHANGE","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,161,217,134,172,131,236,224,2]},"readTime":{"seconds":"1551824121","nanos":343137000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:15:21.367Z [Watch.onSnapshot]: Processing target change
Firestore (0.14.1) 2019-03-05T22:15:21.367Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[],"targetChangeType":"NO_CHANGE","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,161,217,134,172,131,236,224,2]},"readTime":{"seconds":"1551824121","nanos":343137000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:15:21.367Z [Watch.onSnapshot]: Processing target change
Tue Mar 05 2019 22:15:25 GMT+0000 (Coordinated Universal Time): Running Cleanup
Firestore (0.14.1) 2019-03-05T22:15:25.865Z [Firestore.readStream]: Sending request: {"parent":"projects/endorsed-club/databases/(default)","structuredQuery":{"from":[{"collectionId":"search"}],"orderBy":[{"field":{"fieldPath":"response.timestamp"},"direction":"ASCENDING"}],"endAt":{"values":[{"valueType":"timestampValue","timestampValue":{"seconds":1551824065,"nanos":864000000}}]}}}
Firestore (0.14.1) 2019-03-05T22:15:25.965Z [Firestore.readStream]: Received response: {"document":null,"transaction":[],"readTime":{"seconds":"1551824125","nanos":940137000},"skippedResults":0}
Firestore (0.14.1) 2019-03-05T22:15:25.965Z [Firestore._initializeStream]: Releasing stream
Firestore (0.14.1) 2019-03-05T22:15:25.968Z [Firestore._initializeStream]: Received stream end
Firestore (0.14.1) 2019-03-05T22:16:21.369Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[],"targetChangeType":"NO_CHANGE","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,203,233,212,200,131,236,224,2]},"readTime":{"seconds":"1551824181","nanos":343435000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:16:21.369Z [Watch.onSnapshot]: Processing target change
Firestore (0.14.1) 2019-03-05T22:16:21.370Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[],"targetChangeType":"NO_CHANGE","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,203,233,212,200,131,236,224,2]},"readTime":{"seconds":"1551824181","nanos":343435000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:16:21.370Z [Watch.onSnapshot]: Processing target change
Tue Mar 05 2019 22:16:25 GMT+0000 (Coordinated Universal Time): Running Cleanup
Firestore (0.14.1) 2019-03-05T22:16:25.970Z [Firestore.readStream]: Sending request: {"parent":"projects/endorsed-club/databases/(default)","structuredQuery":{"from":[{"collectionId":"search"}],"orderBy":[{"field":{"fieldPath":"response.timestamp"},"direction":"ASCENDING"}],"endAt":{"values":[{"valueType":"timestampValue","timestampValue":{"seconds":1551824125,"nanos":968000000}}]}}}
Firestore (0.14.1) 2019-03-05T22:16:26.067Z [Firestore.readStream]: Received response: {"document":null,"transaction":[],"readTime":{"seconds":"1551824186","nanos":42807000},"skippedResults":0}
Firestore (0.14.1) 2019-03-05T22:16:26.067Z [Firestore._initializeStream]: Releasing stream
Firestore (0.14.1) 2019-03-05T22:16:26.069Z [Firestore._initializeStream]: Received stream end
Firestore (0.14.1) 2019-03-05T22:17:21.378Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[],"targetChangeType":"NO_CHANGE","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,161,215,163,229,131,236,224,2]},"readTime":{"seconds":"1551824241","nanos":355681000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:17:21.379Z [Watch.onSnapshot]: Processing target change
Firestore (0.14.1) 2019-03-05T22:17:21.379Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[],"targetChangeType":"NO_CHANGE","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,161,215,163,229,131,236,224,2]},"readTime":{"seconds":"1551824241","nanos":355681000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:17:21.379Z [Watch.onSnapshot]: Processing target change
Tue Mar 05 2019 22:17:26 GMT+0000 (Coordinated Universal Time): Running Cleanup
Firestore (0.14.1) 2019-03-05T22:17:26.071Z [Firestore.readStream]: Sending request: {"parent":"projects/endorsed-club/databases/(default)","structuredQuery":{"from":[{"collectionId":"search"}],"orderBy":[{"field":{"fieldPath":"response.timestamp"},"direction":"ASCENDING"}],"endAt":{"values":[{"valueType":"timestampValue","timestampValue":{"seconds":1551824186,"nanos":69000000}}]}}}
Firestore (0.14.1) 2019-03-05T22:17:26.135Z [Firestore.readStream]: Received response: {"document":null,"transaction":[],"readTime":{"seconds":"1551824246","nanos":111011000},"skippedResults":0}
Firestore (0.14.1) 2019-03-05T22:17:26.135Z [Firestore._initializeStream]: Releasing stream
Firestore (0.14.1) 2019-03-05T22:17:26.136Z [Firestore._initializeStream]: Received stream end
Firestore (0.14.1) 2019-03-05T22:18:23.353Z [Firestore._initializeStream]: Received stream error: { Error: 4 DEADLINE_EXCEEDED: Deadline Exceeded
    at Object.exports.createStatusError (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/common.js:87:15)
    at ClientDuplexStream._emitStatusIfDone (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client.js:235:26)
    at ClientDuplexStream._receiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client.js:213:8)
    at Object.onReceiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:1290:15)
    at InterceptingListener._callNext (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:564:42)
    at InterceptingListener.onReceiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:614:8)
    at /home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:1110:18
  code: 4,
  metadata: Metadata { _internal_repr: {} },
  details: 'Deadline Exceeded' }
Firestore (0.14.1) 2019-03-05T22:18:23.353Z [Watch.onSnapshot]: Stream ended, re-opening after retryable error:  { Error: 4 DEADLINE_EXCEEDED: Deadline Exceeded
    at Object.exports.createStatusError (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/common.js:87:15)
    at ClientDuplexStream._emitStatusIfDone (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client.js:235:26)
    at ClientDuplexStream._receiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client.js:213:8)
    at Object.onReceiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:1290:15)
    at InterceptingListener._callNext (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:564:42)
    at InterceptingListener.onReceiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:614:8)
    at /home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:1110:18
  code: 4,
  metadata: Metadata { _internal_repr: {} },
  details: 'Deadline Exceeded' }
Firestore (0.14.1) 2019-03-05T22:18:23.354Z [Watch.onSnapshot]: Opening new stream
Firestore (0.14.1) 2019-03-05T22:18:23.357Z [Firestore.readWriteStream]: Opening stream
Firestore (0.14.1) 2019-03-05T22:18:23.357Z [Firestore._initializeStream]: Sending request: {"database":"projects/endorsed-club/databases/(default)","addTarget":{"query":{"parent":"projects/endorsed-club/databases/(default)","structuredQuery":{"from":[{"collectionId":"Tweets"}]}},"targetId":1,"resumeToken":{"type":"Buffer","data":[10,9,8,161,215,163,229,131,236,224,2]}}}
Firestore (0.14.1) 2019-03-05T22:18:23.357Z [Firestore.readWriteStream]: Streaming request: {"database":"projects/endorsed-club/databases/(default)","addTarget":{"query":{"parent":"projects/endorsed-club/databases/(default)","structuredQuery":{"from":[{"collectionId":"Tweets"}]}},"targetId":1,"resumeToken":{"type":"Buffer","data":[10,9,8,161,215,163,229,131,236,224,2]}}}
Firestore (0.14.1) 2019-03-05T22:18:23.358Z [Firestore._initializeStream]: Marking stream as healthy
Firestore (0.14.1) 2019-03-05T22:18:23.358Z [Firestore._initializeStream]: Releasing stream
Firestore (0.14.1) 2019-03-05T22:18:23.358Z [Watch.onSnapshot]: Opened new stream
Firestore (0.14.1) 2019-03-05T22:18:23.382Z [Firestore._initializeStream]: Received stream error: { Error: 4 DEADLINE_EXCEEDED: Deadline Exceeded
    at Object.exports.createStatusError (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/common.js:87:15)
    at ClientDuplexStream._emitStatusIfDone (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client.js:235:26)
    at ClientDuplexStream._receiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client.js:213:8)
    at Object.onReceiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:1290:15)
    at InterceptingListener._callNext (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:564:42)
    at InterceptingListener.onReceiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:614:8)
    at /home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:1110:18
  code: 4,
  metadata: Metadata { _internal_repr: {} },
  details: 'Deadline Exceeded' }
Firestore (0.14.1) 2019-03-05T22:18:23.382Z [Watch.onSnapshot]: Stream ended, re-opening after retryable error:  { Error: 4 DEADLINE_EXCEEDED: Deadline Exceeded
    at Object.exports.createStatusError (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/common.js:87:15)
    at ClientDuplexStream._emitStatusIfDone (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client.js:235:26)
    at ClientDuplexStream._receiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client.js:213:8)
    at Object.onReceiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:1290:15)
    at InterceptingListener._callNext (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:564:42)
    at InterceptingListener.onReceiveStatus (/home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:614:8)
    at /home/developer/tweetSearch/elasticstore/node_modules/grpc/src/client_interceptors.js:1110:18
  code: 4,
  metadata: Metadata { _internal_repr: {} },
  details: 'Deadline Exceeded' }
Firestore (0.14.1) 2019-03-05T22:18:23.382Z [Watch.onSnapshot]: Opening new stream
Firestore (0.14.1) 2019-03-05T22:18:23.384Z [Firestore.readWriteStream]: Opening stream
Firestore (0.14.1) 2019-03-05T22:18:23.384Z [Firestore._initializeStream]: Sending request: {"database":"projects/endorsed-club/databases/(default)","addTarget":{"query":{"parent":"projects/endorsed-club/databases/(default)","structuredQuery":{"from":[{"collectionId":"search"}],"where":{"unaryFilter":{"field":{"fieldPath":"response"},"op":"IS_NULL"}}}},"targetId":1,"resumeToken":{"type":"Buffer","data":[10,9,8,161,215,163,229,131,236,224,2]}}}
Firestore (0.14.1) 2019-03-05T22:18:23.385Z [Firestore.readWriteStream]: Streaming request: {"database":"projects/endorsed-club/databases/(default)","addTarget":{"query":{"parent":"projects/endorsed-club/databases/(default)","structuredQuery":{"from":[{"collectionId":"search"}],"where":{"unaryFilter":{"field":{"fieldPath":"response"},"op":"IS_NULL"}}}},"targetId":1,"resumeToken":{"type":"Buffer","data":[10,9,8,161,215,163,229,131,236,224,2]}}}
Firestore (0.14.1) 2019-03-05T22:18:23.385Z [Firestore._initializeStream]: Marking stream as healthy
Firestore (0.14.1) 2019-03-05T22:18:23.385Z [Firestore._initializeStream]: Releasing stream
Firestore (0.14.1) 2019-03-05T22:18:23.385Z [Watch.onSnapshot]: Opened new stream
Firestore (0.14.1) 2019-03-05T22:18:23.576Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[1],"targetChangeType":"ADD","cause":null,"resumeToken":[],"readTime":null},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:18:23.577Z [Watch.onSnapshot]: Processing target change
Firestore (0.14.1) 2019-03-05T22:18:23.577Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[],"targetChangeType":"NO_CHANGE","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,161,215,163,229,131,236,224,2]},"readTime":{"seconds":"1551824241","nanos":355681000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:18:23.577Z [Watch.onSnapshot]: Processing target change
Firestore (0.14.1) 2019-03-05T22:18:23.605Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[1],"targetChangeType":"ADD","cause":null,"resumeToken":[],"readTime":null},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:18:23.605Z [Watch.onSnapshot]: Processing target change
Firestore (0.14.1) 2019-03-05T22:18:23.605Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[],"targetChangeType":"NO_CHANGE","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,161,215,163,229,131,236,224,2]},"readTime":{"seconds":"1551824241","nanos":355681000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:18:23.606Z [Watch.onSnapshot]: Processing target change
Firestore (0.14.1) 2019-03-05T22:18:23.630Z [Firestore.readWriteStream]: Received response: {"filter":{"targetId":1,"count":1},"responseType":"filter"}
Firestore (0.14.1) 2019-03-05T22:18:23.630Z [Watch.onSnapshot]: Processing filter update
Firestore (0.14.1) 2019-03-05T22:18:23.630Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[1],"targetChangeType":"CURRENT","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,183,166,248,130,132,236,224,2]},"readTime":{"seconds":"1551824303","nanos":559479000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:18:23.630Z [Watch.onSnapshot]: Processing target change
Firestore (0.14.1) 2019-03-05T22:18:23.630Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[],"targetChangeType":"NO_CHANGE","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,183,166,248,130,132,236,224,2]},"readTime":{"seconds":"1551824303","nanos":559479000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:18:23.630Z [Watch.onSnapshot]: Processing target change
Firestore (0.14.1) 2019-03-05T22:18:23.651Z [Firestore.readWriteStream]: Received response: {"filter":{"targetId":1,"count":0},"responseType":"filter"}
Firestore (0.14.1) 2019-03-05T22:18:23.651Z [Watch.onSnapshot]: Processing filter update
Firestore (0.14.1) 2019-03-05T22:18:23.651Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[1],"targetChangeType":"CURRENT","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,231,133,250,130,132,236,224,2]},"readTime":{"seconds":"1551824303","nanos":588071000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:18:23.652Z [Watch.onSnapshot]: Processing target change
Firestore (0.14.1) 2019-03-05T22:18:23.652Z [Firestore.readWriteStream]: Received response: {"targetChange":{"targetIds":[],"targetChangeType":"NO_CHANGE","cause":null,"resumeToken":{"type":"Buffer","data":[10,9,8,231,133,250,130,132,236,224,2]},"readTime":{"seconds":"1551824303","nanos":588071000}},"responseType":"targetChange"}
Firestore (0.14.1) 2019-03-05T22:18:23.652Z [Watch.onSnapshot]: Processing target change
Tue Mar 05 2019 22:18:26 GMT+0000 (Coordinated Universal Time): Running Cleanup
Firestore (0.14.1) 2019-03-05T22:18:26.139Z [Firestore.readStream]: Sending request: {"parent":"projects/endorsed-club/databases/(default)","structuredQuery":{"from":[{"collectionId":"search"}],"orderBy":[{"field":{"fieldPath":"response.timestamp"},"direction":"ASCENDING"}],"endAt":{"values":[{"valueType":"timestampValue","timestampValue":{"seconds":1551824246,"nanos":138000000}}]}}}
Firestore (0.14.1) 2019-03-05T22:18:26.209Z [Firestore.readStream]: Received response: {"document":null,"transaction":[],"readTime":{"seconds":"1551824306","nanos":184985000},"skippedResults":0}
Firestore (0.14.1) 2019-03-05T22:18:26.209Z [Firestore._initializeStream]: Releasing stream
Firestore (0.14.1) 2019-03-05T22:18:26.211Z [Firestore._initializeStream]: Received stream end

I am able to reproduce the issue with a simple function, so it seems to be unrelated to this project and is just a firestore issue

//test binding to entire collection
function testBind() {
    var tweetsRef = db.collection("Tweets")
    .doc("all")
    .collection("All_Tweets");
    tweetsRef.onSnapshot(querySnapshot => {
        querySnapshot.docChanges().forEach(change => {
            console.log(change.type);
            if (change.type === 'added') {
                console.log('New tweet: ', change.doc.id);
            }
            if (change.type === 'modified') {
                console.log('Modified tweet: ', change.doc.id);
            }
            if (change.type === 'removed') {
                console.log('Removed tweet: ', change.doc.id);
            }
        });
    });
}

Unfortunately I seem to be stuck here for now 👎

@StoryStar really appreciate that you were able to find a way to reproduce it! I’ve seen that error before, but only because I was looping through a gigantic collection. Which it seems you also are doing.

I’d thought about distributing listeners or limiting listeners to a specific time. I’m not sure.

Alternatively, there may be a way to coordinate a Firestore trigger to tell an external script to update elasticstore? Just thinking out loud here.

If you ended up opening an issue on firestore-admin, link it here please! I’ll go ahead and merge the pull request.

(reopening as the initial issue was fixed, but a separate one is included in this issue)

Exactly, seems to be an issue with very large collections.

Good ideas, I had a similar idea to paginate through the entire collection and update elasticstore with the data, and pair that with a cloud function that is triggered whenever a document is added to the collection (adding the new data to elasticstore). I'll see about making a proof of concept for that.

I've submitted a bug report and reached out to Firebase support in a few places, waiting to hear back from them now -- will keep this updated.

I think the issue is with the backend and not the node firestore-admin implementation, as it seems to be an issue across other languages as well (ie firebase/firebase-js-sdk#359 ), but if I may still open an issue on firestore-admin and will certainly link it here if I do

Hey, here's an update on our situation

As expected, Firebase support suggested the same pagination of the entire collection idea, and that while this is a bug with their setup, it would not be possible or appropriate to have listening on an entire collection of this size.

Firebase support:

This may be a slight backend bug but in general they would strongly advise you against applying a listener to a collection that large without limiting the size of the result set or adding filters

So I've written the pagination importer, along with a cloud function that is triggered on document change (added, modified, deleted) paired with a simple express.js server that listens for document change requests and sends them to Elasticsearch-- and everything seems to be working just fine in this setup. Really appreciate all your help!

I hate to bring up more issues again but it seems the SearchHandler now runs into an issue after a few days of uptime
Error: 10 ABORTED: The operation was aborted. at Object.exports.createStatusError (node_modules/grpc/src/common.js:87:15) at ClientDuplexStream._emitStatusIfDone (node_modules/grpc/src/client.js:235:26) at ClientDuplexStream._receiveStatus (node_modules/grpc/src/client.js:213:8) at Object.onReceiveStatus (node_modules/grpc/src/client_interceptors.js:1290:15) at InterceptingListener._callNext (node_modules/grpc/src/client_interceptors.js:564:42) at InterceptingListener.onReceiveStatus (node_modules/grpc/src/client_interceptors.js:614:8) at node_modules/grpc/src/client_interceptors.js:1110:18 code: 10, metadata: Metadata { _internal_repr: { 'content-disposition': [Array] } }, details: 'The operation was aborted.' }
After some time, the onSnapshot in SearchHandler stops receiving updates and then dies with this error. Not sure of the cause, however a solution could be to just restart the listener on this crash.
This seems like a related post on Stack https://stackoverflow.com/questions/54629241/firestore-real-time-updates-connection-in-nodejs
Thoughts?

@StoryStar thanks for continuing to update!

If you want to add your implementation to Github, I'd be happy to link to it on the readme of this repo.

As for restarting listeners, I too have seen this issue if left up for too long. I thought about adding a restart into the default implementation, but it means that you'll end up doing a full query of the documents each time the listener is restarted, yes? So feasible to restart it, sure, but also a (potentially) expensive operation. I'm also unsure if the problem lies in listeners of any size or if it's spanning listeners that cause the problem.

@StoryStar In my implementation, I make sure to filter based off of an UpdatedAt value in firestore. So anytime a document is updated the UpdatedAt flag is set accordingly and then it flows into a listener that looks for anything updated in the last 30 minutes. In my case, this seems to cut down on the number of documents enough to work.

Are you doing something similar?

@ACupaJoe no problem!

I'll see about adding out implementation, it will need to be cleaned up first at the very least. Will update here when I get to it.

Regarding restarting listeners, I've actually fully separated the document import script from the search handler so restarting the listener is what I'm doing right now, just manually. I'd like to get it set up to automatically restart on the listener dying though.

As for the cause, I am also unsure. I don't believe its due to size, the search collection is usually empty, so I suppose it has to do with just the length of time the listener is kept alive -- I see a similar issue here, though for the .NET library instead - googleapis/google-cloud-dotnet#2721 - and they're having the same 'Aborted' error we have. In that issue they discuss adding the aborted error code to the automatic retry status codes list, so maybe it is possible to have it added to the node library as well. Opened an issue here firebase/firebase-admin-node#478

I'm thinking to just add a function to handle listen errors and restart the listener if it dies due to this 'aborted' error. Something like this:

this.unsubscribe = this.queryRef.where(this.resKey, '==', null).onSnapshot(this.handleSnapshot, this.handleSnapshotError)
. . . . . 
  private handleSnapshotError = (err: Error) => {
    console.log(colors.red(`onSnapshot error: ${err}`));
    this.queryRef.where(this.resKey, '==', null).onSnapshot(this.handleSnapshot);
  }

Thoughts?

As an aside, this is what my current setup looks like:

  • search handler, runs independently of any importer
  • import server + firestore cloud function - cloud function triggered on document changes, sends request to import server with document which imports the changes to elasticsearch
  • import script - simple script that runs through all documents and imports them to elasticsearch
    I'm not using an updatedAt flag at the moment - the cloud function makes it less necessary at this point, but I might implement it for the import script