GoogleCloudPlatform/DataflowPythonSDK

Datastore?

Jonovono opened this issue · 10 comments

Is there any way to connect to datastore? I would like to use datastore as the source.

@Jonovono development moved to a new repository, and datastore (experimental) support was added after this move. You can find an example here:

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py

@aaltay awesome, thanks. I'll give that a try!

I'm trying this with my project now...what is the intended behavior when running locally on remote datastore? It seems to be (infinitely) blocking fetching more and more items from the datastore.

ReadFromDatastore(...) calls ParDo(ReadFromDatastore.ReadFn(...)), and ReadFn returns a QueryIterator(...). I can see that the iterator is correctly pulling items off the generator (batch by batch), but it appears to not pass them down the pipeline. My receiver just after ReadFromDatastore(...) is never called with any of the items.

From what I can tell behaviorally (can't pinpoint the code itself, though), it is trying to fetch out the entire iterator first, before passing on to the next stage. Is this the "expected" behavior when running locally?

I'm trying to test it locally to develop and test that things work, before throwing up a bunch of workers/CPU/$$$ on a prod dataflow instance. But as it is, I'm not able to get anything working locally with my dataset (100K items).

@mikelambert Have you tried running this example:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py Does it work for you?

(Also @vikkyrk may have additional information.)

I did, and it works. But I'm not sure that means anything?

The "grab all the data and then run the pipeline locally" approach works fine when there's only a small amount of data that can be loaded in a reasonable amount of time. But it does not work quickly for my case of 100K records.

@mikelambert Local runner is not optimized for large pipelines, it is possible that 100K records is beyond its limits. That said, Datastore IO has a query splitting functionality. It tries to split your query and fetch everything in each split. Assuming that a nice split happened, it should try to read all 100K records at once.

  1. Could you try explicitly setting num_splits the argument of ReadFromDatastore to a high number.
  2. Could you try running on something smaller, like 1K records and see if the fetches are happening on all records or on splits.

(cc: @vikkyrk in case there are additional ideas.)

+1 for trying with increasing the number of splits, which can be found here. That should create more splits and hence more batches (bundles) with smaller size so that they can fit in-memory.

Okay, just had time to investigate this again (I have reached my patience limit with the appengine-mapreduce code :P ).

On both my code and the wordcount demo, regardless of the number of splits, it appears to work like this:

  1. split the query into a bunch of split queries
  2. run each split query, collecting the results
  3. then pass the results to the ParDo

However, 2 is run to completion before starting 3 with the DirectRunner. So the splits may be good for reducing query result set size, but they do nothing for parallelization. :(

I will continue development by passing a small query when running locally (only), but this took way too long for me to debug and I'm sure will trip up new developers. Let me know if I should open a separate issue to track this, or if you want to use this issue here?

Ahh sorry I see that this github repo is the wrong place for issues these days, and I should file issues at https://issues.apache.org/jira/browse/BEAM.

I have filed this issue at https://issues.apache.org/jira/browse/BEAM-1787

Closing this issue, since it already moved to BEAM JIRA. The discussion can continue there.

@mikelambert thank you for moving it.