[Bug]: minimal wordcount golang example is freezeing on gcs reading
LeoCBS opened this issue ยท 7 comments
What happened?
I am trying to run the minimum wordcount Golang example on my local machine, but Apache Beam stays frozen on reading data from Google Storage, and the example never ends.
Example output:
ue:{urn:"beam:env:external:v1" payload:"\n\x11\n\x0flocalhost:33211" capabilities:"beam:protocol:progress_reporting:v1" capabilities:"beam:protocol:multi_core_bundle_processing:v1" capabilities:"beam:transform:sdf_truncate_sized_restrictions:v1" capabilities:"beam:protocol:worker_status:v1" capabilities:"beam:protocol:monitoring_info_short_ids:v1" capabilities:"beam:version:sdk_base:go:apache/beam_go_sdk:2.59.0" capabi
lities:"beam:transform:to_string:v1" capabilities:"beam:protocol:data_sampling:v1" capabilities:"beam:protocol:sdk_consuming_received_data:v1" capabilities:"beam:coder:bytes:v1" capabilities:"beam:coder:bool:v1" capabilities:"beam:coder:varint:v1" capabilities:"beam:coder:double:v1" capabilities:"beam:coder:string_utf8:v1" capabilities:"beam:coder:length_prefix:v1" capabilities:"beam:coder:kv:v1" capabilities:"beam:cod
er:iterable:v1" capabilities:"beam:coder:state_backed_iterable:v1" capabilities:"beam:coder:windowed_value:v1" capabilities:"beam:coder:global_window:v1" capabilities:"beam:coder:interval_window:v1" capabilities:"beam:coder:row:v1" capabilities:"beam:coder:nullable:v1" capabilities:"beam:coder:timer:v1" dependencies:{type_urn:"beam:artifact:type:file:v1" role_urn:"beam:artifact:role:go_worker_binary:v1"}}}} root_transf
orm_ids:"s1" root_transform_ids:"e6" root_transform_ids:"s4" root_transform_ids:"e10" root_transform_ids:"s7" requirements:"beam:requirement:pardo:splittable_dofn:v1" 2024/09/18 09:43:33 Prepared job with id: job-001 and staging token: job-001
2024/09/18 09:43:33 Staged binary artifact with token: job-001 2024/09/18 09:43:33 Submitted job: job-001
2024/09/18 09:43:33 (): starting job-001[go-job-1-1726663413483254464] 2024/09/18 09:43:33 (): running job-001[go-job-1-1726663413483254464]
2024/09/18 09:43:33 Job[job-001] state: RUNNING 2024/09/18 09:43:33 starting worker job-001[go-job-1-1726663413483254464]_go
2024/09/18 09:43:34 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-18T12:43:34.827Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-18T12:43:35.026Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-18T12:43:35.129Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-18T12:43:35.228Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:35.228Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:35.331Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:35.331Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:35.331Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:35.438Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:35.834Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:35.843Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:35.872Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:35 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:35.982Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:36 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:36.019Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:36 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:36.107Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
2024/09/18 09:43:36 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226
time=2024-09-18T12:43:36.181Z worker.ID=job-001[go-job-1-1726663413483254464]_go worker.endpoint=localhost:45985
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
Same issue here, running the example command from: https://beam.apache.org/get-started/quickstart-go/
Downloading the file from GCS the process finishes successfully in seconds. ๐ค
โญโ ๏
น ๎ฑ ๏ ~ ๎ฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ๎ฒ โ ๎ณ 10:07:57 ๏ โโฎ
โฐโ go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest --input "gs://apache-beam-samples/shakespeare/kinglear.txt" --output counts
2024/09/18 10:08:08 INFO Serving JobManagement endpoint=localhost:60915
2024/09/18 10:08:08 starting Loopback server at 127.0.0.1:60916
...
capabilities:"beam:protocol:multi_core_bundle_processing:v1" capabilities:"beam:transform:sdf_truncate_sized_restrictions:v1" capabilities:"beam:protocol:worker_status:v1" capabilities:"beam:protocol:monitoring_info_short_ids:v1" capabilities:"beam:version:sdk_base:go:apache/beam_go_sdk:2.59.0" capabilities:"beam:transform:to_string:v1" capabilities:"beam:protocol:data_sampling:v1" capabilities:"beam:protocol:sdk_consuming_received_data:v1" capabilities:"beam:coder:bytes:v1" capabilities:"beam:coder:bool:v1" capabilities:"beam:coder:varint:v1" capabilities:"beam:coder:double:v1" capabilities:"beam:coder:string_utf8:v1" capabilities:"beam:coder:length_prefix:v1" capabilities:"beam:coder:kv:v1" capabilities:"beam:coder:iterable:v1" capabilities:"beam:coder:state_backed_iterable:v1" capabilities:"beam:coder:windowed_value:v1" capabilities:"beam:coder:global_window:v1" capabilities:"beam:coder:interval_window:v1" capabilities:"beam:coder:row:v1" capabilities:"beam:coder:nullable:v1" capabilities:"beam:coder:timer:v1" dependencies:{type_urn:"beam:artifact:type:file:v1" role_urn:"beam:artifact:role:go_worker_binary:v1"}}}} root_transform_ids:"s1" root_transform_ids:"s4" root_transform_ids:"e10" root_transform_ids:"s8" requirements:"beam:requirement:pardo:splittable_dofn:v1"
2024/09/18 10:08:08 Prepared job with id: job-001 and staging token: job-001
2024/09/18 10:08:08 Staged binary artifact with token: job-001
2024/09/18 10:08:08 Submitted job: job-001
2024/09/18 10:08:08 (): starting job-001[go-job-1-1726664888026831000]
2024/09/18 10:08:08 (): running job-001[go-job-1-1726664888026831000]
2024/09/18 10:08:08 Job[job-001] state: RUNNING
2024/09/18 10:08:08 starting worker job-001[go-job-1-1726664888026831000]_go
2024/09/18 10:08:10 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/julianogalgaro/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-18T13:08:10.038Z worker.ID=job-001[go-job-1-1726664888026831000]_go worker.endpoint=localhost:60919
2024/09/18 10:08:10 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/julianogalgaro/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-18T13:08:10.239Z worker.ID=job-001[go-job-1-1726664888026831000]_go worker.endpoint=localhost:60919
2024/09/18 10:08:10 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/julianogalgaro/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-18T13:08:10.438Z worker.ID=job-001[go-job-1-1726664888026831000]_go worker.endpoint=localhost:60919
2024/09/18 10:08:10 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/julianogalgaro/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-18T13:08:10.541Z worker.ID=job-001[go-job-1-1726664888026831000]_go worker.endpoint=localhost:60919
2024/09/18 10:08:10 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/julianogalgaro/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-18T13:08:10.640Z worker.ID=job-001[go-job-1-1726664888026831000]_go worker.endpoint=localhost:60919
...
2024/09/18 10:15:48 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/julianogalgaro/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-18T13:15:48.765Z worker.ID=job-001[go-job-1-1726664888026831000]_go worker.endpoint=localhost:60919
2024/09/18 10:15:48 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/julianogalgaro/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.59.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-18T13:15:48.768Z worker.ID=job-001[go-job-1-1726664888026831000]_go worker.endpoint=localhost:60919
Does this happens only on Beam 2.59.0?
It's happens in others apache beam version too:
time=2024-09-19T12:47:48.529Z worker.ID=job-001[go-job-1-1726750062173590943]_go worker.endpoint=localhost:33015
2024/09/19 09:47:48 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.52.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-19T12:47:48.643Z worker.ID=job-001[go-job-1-1726750062173590943]_go worker.endpoint=localhost:33015
2024/09/19 09:47:48 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.52.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-19T12:47:48.767Z worker.ID=job-001[go-job-1-1726750062173590943]_go worker.endpoint=localhost:33015
2024/09/19 09:47:48 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.52.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-19T12:47:48.777Z worker.ID=job-001[go-job-1-1726750062173590943]_go worker.endpoint=localhost:33015
2024/09/19 09:47:48 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.52.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-19T12:47:48.898Z worker.ID=job-001[go-job-1-1726750062173590943]_go worker.endpoint=localhost:33015
2024/09/19 09:47:48 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.52.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-19T12:47:48.913Z worker.ID=job-001[go-job-1-1726750062173590943]_go worker.endpoint=localhost:33015
2024/09/19 09:47:48 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.52.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-19T12:47:48.930Z worker.ID=job-001[go-job-1-1726750062173590943]_go worker.endpoint=localhost:33015
2024/09/19 09:47:49 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.52.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-19T12:47:49.393Z worker.ID=job-001[go-job-1-1726750062173590943]_go worker.endpoint=localhost:33015
2024/09/19 09:47:49 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/home/leonardo-borges/projects/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.52.0/go/pkg/beam/io/textio/textio.go:226 time=2024-09-19T12:47:49.584Z worker.ID=job-001[go-job-1-1726750062173590943]_go worker.endpoint=localhost:33015
This is an over splitting problem with the prism runner, due to higher latency with GCS. I had thought we sorted this out (there are some previous issues that were resolved previously but apparently not).
This can be confirmed by comparing the behavior to a local file read vs from GCS.
There's a bit of tension between certain goals of the Prism runner (fast execution in test situations) and practical use (reading from remote stores) that the current split policy doesn't satisfy. That needs to be fixed.
The solution here is that we make the split policy more configurable so we can get the desired fast behavior check for the splitting tests, but increase the default wait time so the example works in higher latency environments.
OK, definitely works well for me, but I am also on Google's network, in Seattle.
It certainly must be made to work smoothly for folks who aren't in my specific unlikely situation.
Adding a bit more debugging tells me the following:
- (~200ms) Time to list the files from the service. Since this transform doesn't split, it isn't affected by the current policy. Actual file reading/opening are in a different bundle.
- (~200us) Time from Start bundle to get to ProcessElement. Negligible.
- (~100ms) Time to actually open the file for reading.
The current Default Split policy for Prism is to only ask for progress and similar every ~100ms, and if there has been any progress either by the channel counter, or downstream element emissions, then it will not split. This allows it to split when processing is slow (indicated by ~100-200ms where the counts have not moved).
Setting the progress ticker to ~ 10ms gives me similar behavior as the reports (Which gives me the chance to find something that should work.)
The split planning is so simple, it's not taking into account other work that has been previously done. So it's always only waiting a fixed interval for work for a given stage.
A more robust view would take into account work "globally" on the job, and only split if a stage is "straggling" or similar, but prism shouldn't go that far at this time. And we don't want to slow down all stages just because one needs to be more conservative in how it splits.
I'm now trying out adding a "back off", for a given stage. If a split needs to happen, the rate of progress requests (and split decisions) happens slower for all new stages. If stages finish faster than any progress requests, then they are made to go faster again. So this should even out to some "ideal" rate per stage. But for this issue, a few "quick" splits should happen and then the aggression is toned down enough for work to complete properly.
This isn't likely to be the final dynamic splitting decision approach, since it would be best for that to be also tied to the rate of input to output and similar. Combined with a better initial splits of data would probably solve most problems.
We've merged in a fix for the next release, but it would be great if you ran the following for us to verify the approach taken under your conditions.
go run github.com/apache/beam/sdks/v2/go/examples/wordcount@master --input "gs://apache-beam-samples/shakespeare/kinglear.txt" --output counts
head -n 10 count
It will take a little bit longer to start than normal since it will be requesting the code from HEAD, but hopefully this should work.
I'm going to be filing an issue later for the remaining part of the solution, which should avoid over splitting entirely.
Thanks again for the report!
Hey @lostluck !
It worked here!
2024/09/21 09:29:11 Prepared job with id: job-001 and staging token: job-001
2024/09/21 09:29:11 Staged binary artifact with token: job-001
2024/09/21 09:29:11 Submitted job: job-001
2024/09/21 09:29:11 (): starting job-001[go-job-1-1726921751126087000]
2024/09/21 09:29:11 Job[job-001] state: STARTING
2024/09/21 09:29:11 (): running job-001[go-job-1-1726921751126087000]
2024/09/21 09:29:11 Job[job-001] state: RUNNING
2024/09/21 09:29:11 starting worker job-001[go-job-1-1726921751126087000]_go
2024/09/21 09:29:12 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/julianogalgaro/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.0.0-20240921024708-818966c6163d/go/pkg/beam/io/textio/textio.go:226 time=2024-09-21T12:29:12.064Z worker.ID=job-001[go-job-1-1726921751126087000]_go worker.endpoint=localhost:63386
2024/09/21 09:29:12 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/julianogalgaro/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.0.0-20240921024708-818966c6163d/go/pkg/beam/io/textio/textio.go:226 time=2024-09-21T12:29:12.268Z worker.ID=job-001[go-job-1-1726921751126087000]_go worker.endpoint=localhost:63386
2024/09/21 09:29:12 INFO Writing to counts source=/Users/julianogalgaro/go/pkg/mod/github.com/apache/beam/sdks/v2@v2.0.0-20240921024708-818966c6163d/go/pkg/beam/io/textio/textio.go:343 time=2024-09-21T12:29:12.585Z worker.ID=job-001[go-job-1-1726921751126087000]_go worker.endpoint=localhost:63386
2024/09/21 09:29:12 stopping worker job-001[go-job-1-1726921751126087000]_go
2024/09/21 09:29:12 (): pipeline completed job-001[go-job-1-1726921751126087000]
2024/09/21 09:29:12 (): terminating job-001[go-job-1-1726921751126087000]
2024/09/21 09:29:12 Job[job-001] state: DONE
2024/09/21 09:29:12 control response channel closed
venture: 1
quakes: 1
toucheth: 1
summit: 1
bush: 1
melancholy: 1
escape: 1
seeking: 1
Hecate: 1
bereaved: 1
Thank you so much for your time and explanations.