Unable to create Aggregator on release-2.0.0-beta2
gowrishankarsundersc opened this issue · 2 comments
I am trying to create a custom aggregator to sum long values similar to Sum.SumLongFn()
in Dataflow but I am getting the following error. I am also attaching the code I used to create the aggregator. This is on release Version 2.0.0-beta2
and I am running this pipeline on Google Dataflow.
Running the same with the DirectRunner
does not throw this issue although I am not sure if the aggregator even takes effect when running with the DirectRunner
public class SumLongAggregator extends Combine.BinaryCombineLongFn {
@Override
public long apply(long left, long right) {
return left + right;
}
@Override
public long identity() {
return 0;
}
}
static class StatefulDoFn extends DoFn<KV<String, String>, String> {
final Aggregator processedElements = createAggregator("processed", new SumLongAggregator());
Aggregator skippedElements = createAggregator("skipped", new SumLongAggregator());
@StateId("tracker")
private final StateSpec<Object, ValueState<Integer>> trackerSpec =
StateSpecs.value(VarIntCoder.of());
@ProcessElement
public void processElement(
ProcessContext context,
@StateId("tracker") ValueState<Integer> tracker) {
processedElements.addValue(1l);
int wasSeen = firstNonNull(tracker.read(), 0);
final String id = context.element().getKey();
if (wasSeen == 0) {
tracker.write(1);
context.output(context.element().getValue());
logger.info("Writing id first time: {}", id);
} else {
skippedElements.addValue(1l);
}
}
}
2017-02-26 18:10:56 ERROR MonitoringUtil$LoggingHandler:97 - 2017-02-27T02:10:55.848Z: (d2440485b08b19a3): java.lang.IllegalArgumentException: Unsupported CombineFn: com.company.analytics.beam.dataflow.SumLongAggregator
at com.google.cloud.dataflow.worker.counters.CounterAggregator.constructCounter(CounterAggregator.java:142)
at com.google.cloud.dataflow.worker.counters.CounterAggregator.<init>(CounterAggregator.java:95)
at com.google.cloud.dataflow.worker.counters.CounterAggregator$CounterAggregatorFactory.createAggregatorForDoFn(CounterAggregator.java:66)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.createAggregator(SimpleDoFnRunner.java:415)
at org.apache.beam.sdk.transforms.DoFn$Context.setupDelegateAggregator(DoFn.java:249)
at org.apache.beam.sdk.transforms.DoFn$Context.setupDelegateAggregators(DoFn.java:240)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.<init>(SimpleDoFnRunner.java:280)
at org.apache.beam.runners.core.SimpleDoFnRunner.<init>(SimpleDoFnRunner.java:140)
at org.apache.beam.runners.core.DoFnRunners.simpleRunner(DoFnRunners.java:63)
at com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:200)
at com.google.cloud.dataflow.worker.runners.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:235)
at com.google.cloud.dataflow.worker.runners.worker.ForwardingParDoFn.processElement(ForwardingParDoFn.java:42)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerLoggingParDoFn.processElement(DataflowWorkerLoggingParDoFn.java:47)
at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48)
at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:198)
at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:72)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:321)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:280)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:227)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:213)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:193)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:180)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I realised that Beam itself has this packaged under Sum
transforms and so, tried using it like createAggregator("processed", Sum.ofLongs());
and it is actually work although I am not sure what was different from mine since I am also extending the BinaryCombineFn
. Anyway, the following worked but would definitely like to know the delta which caused the error earlier.
Thanks @gowrishankarsundersc. Indeed, as you observed, aggregators work with a set of pre-defined CombineFn
s only. Glad to see you were able to resolve the issue yourself!
Thanks and let us know if we can help in any way!