GoogleCloudPlatform/DataflowPythonSDK

PicklingError on nested dataflow_v1b3_messages classes

adamdodev opened this issue · 3 comments

Apologies if this is no longer the correct place to report issues, its not immediately clear if these should go or in the Apache Beam Jira system?

Command:

python main.py --project my-project --runner BlockingDataflowPipelineRunner --staging_location gs://my-bucket/staging --temp_location gs://my-bucket/temp --job_name my-job-001 --save_main_session True --requirements_file requirements.txt

Result:

No handlers could be found for logger "oauth2client.contrib.multistore_file"
WARNING:root:Using fallback coder for typehint: Union[].
DEPRECATION: pip install --download has been deprecated and will be removed in the future. Pip now has a download command that should be used instead.
Collecting google-cloud-dataflow (from -r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/google-cloud-dataflow-0.4.1.zip
Collecting google-api-python-client (from -r requirements.txt (line 2))
  File was already downloaded /tmp/dataflow-requirements-cache/google-api-python-client-1.5.3.zip
Collecting avro>=1.7.7 (from google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/avro-1.8.1.tar.gz
Collecting dill>=0.2.5 (from google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/dill-0.2.5.tgz
Collecting google-apitools>=0.5.2 (from google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/google-apitools-0.5.4.zip
Collecting httplib2>=0.8 (from google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/httplib2-0.9.2.zip
Collecting mock>=1.0.1 (from google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/mock-2.0.0.tar.gz
Collecting oauth2client>=2.0.1 (from google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/oauth2client-3.0.0.tar.gz
Collecting protorpc>=0.9.1 (from google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/protorpc-0.11.1.zip
Collecting python-gflags>=2.0 (from google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/python-gflags-3.0.6.tar.gz
Collecting pyyaml>=3.10 (from google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/PyYAML-3.12.tar.gz
Collecting six<2,>=1.6.1 (from google-api-python-client->-r requirements.txt (line 2))
  File was already downloaded /tmp/dataflow-requirements-cache/six-1.10.0.tar.gz
Collecting uritemplate<1,>=0.6 (from google-api-python-client->-r requirements.txt (line 2))
  File was already downloaded /tmp/dataflow-requirements-cache/uritemplate-0.6.tar.gz
Collecting setuptools>=18.5 (from google-apitools>=0.5.2->google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/setuptools-26.1.1.tar.gz
Collecting funcsigs>=1 (from mock>=1.0.1->google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/funcsigs-1.0.2.tar.gz
Collecting pbr>=0.11 (from mock>=1.0.1->google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/pbr-1.10.0.tar.gz
Collecting pyasn1>=0.1.7 (from oauth2client>=2.0.1->google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/pyasn1-0.1.9.tar.gz
Collecting pyasn1-modules>=0.0.5 (from oauth2client>=2.0.1->google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/pyasn1-modules-0.0.8.tar.gz
Collecting rsa>=3.1.4 (from oauth2client>=2.0.1->google-cloud-dataflow->-r requirements.txt (line 1))
  File was already downloaded /tmp/dataflow-requirements-cache/rsa-3.4.2.tar.gz
Collecting simplejson>=2.5.0 (from uritemplate<1,>=0.6->google-api-python-client->-r requirements.txt (line 2))
  File was already downloaded /tmp/dataflow-requirements-cache/simplejson-3.8.2.tar.gz
Successfully downloaded google-cloud-dataflow google-api-python-client avro dill google-apitools httplib2 mock oauth2client protorpc python-gflags pyyaml six uritemplate setuptools funcsigs pbr pyasn1 pyasn1-modules rsa simplejson
Traceback (most recent call last):
  File "main.py", line 67, in <module>
    p.run()
  File "/usr/local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
    return self.runner.run(self)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/runners/dataflow_runner.py", line 172, in run
    self.dataflow_client.create_job(self.job))
  File "/usr/local/lib/python2.7/site-packages/apache_beam/utils/retry.py", line 160, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/apiclient.py", line 371, in create_job
    job.options, file_copy=self._gcs_file_copy)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/utils/dependency.py", line 306, in stage_job_resources
    pickler.dump_session(pickled_session_file)
  File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 204, in dump_session
    return dill.dump_session(file_path)
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 333, in dump_session
    pickler.dump(main)
  File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 123, in save_module
    return old_save_module(pickler, obj)
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 1168, in save_module
    state=_main_dict)
  File "/usr/local/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/local/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 159, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 835, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/local/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/local/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/local/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 159, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 835, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/local/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/local/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/local/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 159, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 835, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/local/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/local/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/local/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 159, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 835, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/local/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/local/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python2.7/pickle.py", line 425, in save_reduce
    save(state)
  File "/usr/local/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 159, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 835, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/local/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/local/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python2.7/site-packages/apache_beam/internal/pickler.py", line 159, in new_save_module_dict
    return old_save_module_dict(pickler, obj)
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 835, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/local/lib/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/local/lib/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/local/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/local/lib/python2.7/pickle.py", line 400, in save_reduce
    save(func)
  File "/usr/local/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 1231, in save_type
    StockPickler.save_global(pickler, obj)
  File "/usr/local/lib/python2.7/pickle.py", line 754, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <class 'apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum'>: it's not found as apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum

main.py imports:

from apache_beam.utils.options import PipelineOptions
from googleapiclient import discovery
from googleapiclient import http
from oauth2client.client import GoogleCredentials

import apache_beam as beam

import io, json, pprint

requirements.txt:

google-cloud-dataflow
google-api-python-client

Environment:

Python 2.7.12 (default, Aug  9 2016, 20:18:05)
[GCC 4.9.2] on linux2

Distributor ID: Debian
Description:    Debian GNU/Linux 8.5 (jessie)
Release:        8.5
Codename:      jessie

For reference the workaround for this was to remove --save_main_session True and move all import statements in to the relevant functions.

This should be fixed in the google-dataflow 0.4.4 sdk release. Please test it out and let me know if you run into anymore issues.