Serialization doesn't work with forking
Closed this issue · 6 comments
kanterov commented
flo version: 0.4.1
Minimal reproducible example:
package com.spotify.data.clickhouse;
import com.spotify.flo.Task;
import com.spotify.flo.context.FloRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
public class App {
public static void main(String[] args) {
PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
FloRunner
.runTask(task(opts))
.waitAndExit();
}
public static Task<String> task(PipelineOptions opts) {
return Task.named("task")
.ofType(String.class)
.process(opts::toString);
}
}
Workaround
Create resources/flo.conf
and put flo.forking = false
there.
danielnorberg commented
Flo uses kryo for serialization and org.apache.beam.sdk.options.PipelineOptions
is not kryo-serializable.
This can be worked around by instantiating the PipelineOptions
inside the task, or passing in a closure instead:
import com.spotify.flo.Task;
import com.spotify.flo.TaskBuilder.F0;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
public class App {
public static void main(String[] args) {
F0<PipelineOptions> opts = () -> PipelineOptionsFactory.fromArgs(args).create();
FloRunner
.runTask(task(opts))
.waitAndExit();
}
public static Task<String> task(F0<PipelineOptions> opts) {
return Task.named("task")
.ofType(String.class)
.process(() -> opts.get().toString());
}
}
danielnorberg commented
danielnorberg commented
They really don't want PipelineOptions
to be java serialized:
PipelineOptions objects are not serializable
danielnorberg commented
Serialization of all jackson serializable types (including beam PipelineOptions
) should just work after #174: https://github.com/spotify/flo/pull/174/files#diff-423cdf97f3242f0814fc0902ee32232bR84
danielnorberg commented
Should get much better error messages in #179