Since porting to 2.1.0, pipeline now fails when it encounters an empty `PCollection` for BigQuery sink/output
polleyg opened this issue · 1 comments
We are porting some pipelines from 1.8.0
to 2.1.0
in our applications.
These pipelines are basic: Read from GCS -> ParDo
-> Write to BigQuery. There are multiple side outputs being written to BigQuery (think different months/years, which correspond to sharded tables in BigQuery).
In some cases, we may not have any elements for one or more of the BigQuery sinks. Running on 1.8.0
, this never causes an issue. However, when running on 2.1.0
, the pipeline now fails if it encounters an empty PCollection<TableRow>
:
ID: 2017-10-08_18_49_08-4262554366118214565
2017-10-09 (13:13:50) java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: ... (db49af7a45b5ecaa): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Failed to create load job with id prefix c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001, reached max retries: 3, last failed load job: { "configuration" : { "load" : { "createDisposition" : "CREATE_IF_NEEDED", "destinationTable" : { "datasetId" : "<redacted>", "projectId" : "<redacted>", "tableId" : "Clicks_transformed_2017_11" }, "sourceFormat" : "NEWLINE_DELIMITED_JSON", "sourceUris" : [ "gs://<redacted>/BigQueryWriteTemp/c4b4181de93d47f3b2b6ba5d2cdd2507/43f3689a-6b4d-4410-b566-3d0bcbf7cbb7" ], "writeDisposition" : "WRITE_APPEND" } }, "etag" : "\"Z8nD8CIuj_TqmPrS_MNV-O9x2rU/CT0v4ZW2CSH0SWrB04bn5-C_ckc\"", "id" : "<redacted>:c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "jobReference" : { "jobId" : "c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "projectId" : "<redacted>" }, "kind" : "bigquery#job", "selfLink" : "https://www.googleapis.com/bigquery/v2/projects/<redacted>/jobs/c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "statistics" : { "creationTime" : "1507515172555", "endTime" : "1507515172669", "startTime" : "1507515172669" }, "status" : { "errorResult" : { "message" : "No schema specified on job or table.", "reason" : "invalid" }, "errors" : [ { "message" : "No schema specified on job or table.", "reason" : "invalid" } ], "state" : "DONE" }, "user_email" : "<redacted>-compute@developer.gserviceaccount.com" }. at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182) at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104) at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:54) at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:37) at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:117) at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:74) at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:113) at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:187) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148) at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68) at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336) at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294) at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Failed to create load job with id prefix c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001, reached max retries: 3, last failed load job: { "configuration" : { "load" : { "createDisposition" : "CREATE_IF_NEEDED", "destinationTable" : { "datasetId" : "<redacted>", "projectId" : "<redacted>", "tableId" : "Clicks_transformed_2017_11" }, "sourceFormat" : "NEWLINE_DELIMITED_JSON", "sourceUris" : [ "gs://<redacted>/BigQueryWriteTemp/c4b4181de93d47f3b2b6ba5d2cdd2507/43f3689a-6b4d-4410-b566-3d0bcbf7cbb7" ], "writeDisposition" : "WRITE_APPEND" } }, "etag" : "\"Z8nD8CIuj_TqmPrS_MNV-O9x2rU/CT0v4ZW2CSH0SWrB04bn5-C_ckc\"", "id" : "<redacted>:c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "jobReference" : { "jobId" : "c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "projectId" : "<redacted>" }, "kind" : "bigquery#job", "selfLink" : "https://www.googleapis.com/bigquery/v2/projects/<redacted>/jobs/c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "statistics" : { "creationTime" : "1507515172555", "endTime" : "1507515172669", "startTime" : "1507515172669" }, "status" : { "errorResult" : { "message" : "No schema specified on job or table.", "reason" : "invalid" }, "errors" : [ { "message" : "No schema specified on job or table.", "reason" : "invalid" } ], "state" : "DONE" }, "user_email" : "<redacted>-compute@developer.gserviceaccount.com" }. at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) at org.apache.beam.sdk.io.gcp.bigquery.WriteTables$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138) at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233) at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:183) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:84) at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233) at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:180) ... 21 more Caused by: java.lang.RuntimeException: Failed to create load job with id prefix c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001, reached max retries: 3, last failed load job: { "configuration" : { "load" : { "createDisposition" : "CREATE_IF_NEEDED", "destinationTable" : { "datasetId" : "<redacted>", "projectId" : "<redacted>", "tableId" : "Clicks_transformed_2017_11" }, "sourceFormat" : "NEWLINE_DELIMITED_JSON", "sourceUris" : [ "gs://<redacted>/BigQueryWriteTemp/c4b4181de93d47f3b2b6ba5d2cdd2507/43f3689a-6b4d-4410-b566-3d0bcbf7cbb7" ], "writeDisposition" : "WRITE_APPEND" } }, "etag" : "\"Z8nD8CIuj_TqmPrS_MNV-O9x2rU/CT0v4ZW2CSH0SWrB04bn5-C_ckc\"", "id" : "<redacted>:c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "jobReference" : { "jobId" : "c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "projectId" : "<redacted>" }, "kind" : "bigquery#job", "selfLink" : "https://www.googleapis.com/bigquery/v2/projects/<redacted>/jobs/c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "statistics" : { "creationTime" : "1507515172555", "endTime" : "1507515172669", "startTime" : "1507515172669" }, "status" : { "errorResult" : { "message" : "No schema specified on job or table.", "reason" : "invalid" }, "errors" : [ { "message" : "No schema specified on job or table.", "reason" : "invalid" } ], "state" : "DONE" }, "user_email" : "<redacted>-compute@developer.gserviceaccount.com" }. at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.load(WriteTables.java:179) at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:114)
As a test, I hacked one of the GCS (csv) input files, to ensure at least one element ends up in each of the BigQuery sinks. Et voila, it worked (ID: 2017-10-08_20_25_01-3317103171891646372).
We are skipping 1.9.0 and 2.0.0, so I'm not sure which version this bug was introduced in.
Is there any update?