Parallelising problem with ReadFromDatastore
bowd opened this issue · 13 comments
I need to get all entities of a specific kind from datastore into my pipeline.
I've used a variant of the code found in the beam python sdk examples:
def build_query():
query = query_pb2.Query()
query.kind.add().name = "Entry"
return query
# ... later
with beam.Pipeline(options=pipeline_options) as p:
(p | 'Read entries' >> ReadFromDatastore(PROJECT, build_query())
# | ... more steps
And it works, but I get this warning in Dataflow:
Unable to parallelize the given query: kind {
name: "Entry"
}
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/datastore/v1/datastoreio.py", line 197, in process
helper.make_partition(self._project, self._datastore_namespace))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/datastore/v1/query_splitter.py", line 84, in get_splits
scatter_keys = _get_scatter_keys(datastore, query, num_splits, partition)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/datastore/v1/query_splitter.py", line 187, in _get_scatter_keys
key_splits.sort(helper.key_comparator)
TypeError: comparison function must return int, not long
When trying to split the query, it looks like there's an internal step there where the comparator helper.key_comparator
is returning long and not int but I feel like that's as deep as I can go, without really getting my ears caught.
The problem I'm facing because of this is that the pipeline is unable to scale. When starting it, it indeed autoscaled to 6 workers and then downscaled back, I'm thinking because of this IO.
Autoscaling: Reduced the number of workers to 1 based on the ability to parallelise the work in the currently running step(s).
Any help would be greatly appreciated!
@bogdan-dumitru what version of Dataflow you are using?
cc: @vikkyrk
Extract from my requirements.txt:
apache-beam==2.0.0
gapic-google-cloud-datastore-v1==0.15.3
proto-google-cloud-datastore-v1==0.90.4
google-cloud-datastore==1.0.0
googledatastore==7.0.1
I can provide the full file if there are more packages relevant
@aaltay Looks like the fix (apache/beam#3155) didn't make it to the 2.0.0
release :(
@vikkyrk ah I see, cool. Is there any workaround. Will I suffer if I reference master in my requirements? 😄
@bogdan-dumitru or you could use https://github.com/apache/beam/releases/tag/v2.0.0
and patch my PR
I ended up forking 2.0.0 as you suggested and patching your PR but now I see a different/weird behaviour. My pipeline's handing on the first step. It feels like it's consuming all the entries from datastore in the intermediate consumers it creates:
Executing operation Read entries/UserQuery/Read+Read entries/SplitQuery+Read entries/GroupByKey/Reify+Read entries/GroupByKey/Write
And not feeding anything in the rest of the pipeline until that part is done. Or at least that's my hypothesis – I let it hang for ~35minutes and nothing happened. Anyway I guess I'm just gonna live with it like this for now, or setup a streamed consumer and publish messages to pub/sub from a different service.
Thanks for the help though!
is there any update on this? I'm trying to read in 200k+ entities from datastore as the entry step to a dataflow pipeline and it's timing out after an hour of inactivity and there is only a single worker running. I don't have any inequality filters on my initial datastore read, it's just grabbing every single entity. I can run it on testing data (2,000 entities) locally just fine so it's not a code problem, seems to be a scaling issue which is what I thought dataflow was supposed to take care of itself.
Beam is in the process of changing its datastore library. Please file an issue at JIRA and we will see how this could be handled in the newer API. (JIRA -> https://issues.apache.org/jira/projects/BEAM/issues/BEAM-6919?filter=allopenissues)
cc: @udim
@jhallard I'm in advanced stages of rewriting the Datastore implementation for Beam Python.
Meanwhile, I could take a look at your pipeline if you agree. If you have GCP support you can open a ticket here: https://cloud.google.com/support/. You can also reply with the project and job id here and I'll take a look.
Edit: real tracking issue for the implementation: https://issues.apache.org/jira/browse/BEAM-4543
Closing this. Please use Beam JIRA (https://issues.apache.org/jira/browse/BEAM-4543) for additional comment.s
Job ID:
2019-04-05_18_43_38-6390779817404791413
Project: camiologger
Thanks! We do have a Google cloud support plan and I’ll open up a ticket if this doesn’t get resolved soon.
Alright, I took a quick look at the logs (LOGS -> Stackdriver -> switch "all logs" to ".../worker-startup"), and it's failing to install ffmpeg, which seems to be a custom command. The worker is failing to start, which is why you don't see any progress.
Wow, that is a bit embarrassing! Sorry you had to look that up for me, I had just assumed since the read step was timing out that that was the step that was failing. Thanks for your help.