GoogleCloudPlatform/DataflowPythonSDK

Accessing the content of a PCollection of PCollections

finiterank opened this issue · 3 comments

Hello, not sure if this is really something you can help me here but I'm gonna give it a try:

I'm trying to generate dynamically a bunch of queries from a Collection (together they involve more than a thousand tables so I cannot really use just a long query instead) and then get a Collection that merges their bigquery results together. Is that possible?

I was trying to use the 'side tables' but I'm stuck. This is what I'm trying (kinda following what's in the description here):

import google.cloud.dataflow as df

project = 'YOUR_PROJECT'
input_query = 'SELECT month ' \
    'FROM [clouddataflow-readonly:samples.weather_stations] ' \
    'WHERE tornado=true GROUP BY month'

data = [input_query]

p = df.Pipeline(argv=['--project', project])

database = lambda x: p | df.io.Read(df.io.BigQuerySource(query=x))

(p
 | df.Create(data) 
 | df.Map('map', lambda x: database(x))
 | df.Write(df.io.TextFileSink('./results')))

p.run()

But then, as a result, instead of getting the contents of database(x) I get:

<PCollection transform=<Read(PTransform) label=[91d2002c-9a11-4467-9db2-a33a019ece9f]>>

Is there a way of accessing the contents of that 'collection of collections' after the Map transform?

You cannot express in Dataflow what you want assuming I understand correctly because essentially you want a dynamic processing graph. Just to check my understanding. You have a PCollection of queries and you would like to run them all and combine the results. And the PCollection of queries is generated by some initial read (or create). As long as you get the queries somehow before hand you can generate a graph that contains all of them with code like this:

pcolls = []
for q in queries:
pcolls.append(p | df.io.Read(df.io.BigQuerySource(q))
tuple(pcolls) | Flatten | ...

Alternatively you can execute the query yourself from a DoFn. Kind of how you tried above but you cannot use PTransforms inside a DoFn. Boils down to the same argument that the processing graph cannot be dynamic after you built it and submitted the job for execution.

database = lambda x: Execute the BigQuery query 'x' using directly the BigQuery API and return rows

(p
| df.Create(data)
| df.Map('map', lambda x: database(x))
| df.Write(df.io.TextFileSink('./results')))

p.run()

To make it easy to write BigQuery code check out the gcloud-python package:
https://googlecloudplatform.github.io/gcloud-python/stable/bigquery-usage.html

Hope this helps.

Totally helps, @silviulica, thanks a lot! I'm familiar with the BigQuery API. I'll try both solutions and see which one fits best our needs (I suspect the second one.)

Closing this. @finiterank plesae re-open if you are still experiencing issues.