spotify/flo

Serialization doesn't work with forking

Closed this issue · 6 comments

flo version: 0.4.1

Minimal reproducible example:

package com.spotify.data.clickhouse;

import com.spotify.flo.Task;
import com.spotify.flo.context.FloRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class App {
  public static void main(String[] args) {
    PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();

    FloRunner
        .runTask(task(opts))
        .waitAndExit();
  }

  public static Task<String> task(PipelineOptions opts) {
    return Task.named("task")
        .ofType(String.class)
        .process(opts::toString);
  }
}

Workaround

Create resources/flo.conf and put flo.forking = false there.

Flo uses kryo for serialization and org.apache.beam.sdk.options.PipelineOptions is not kryo-serializable.

This can be worked around by instantiating the PipelineOptions inside the task, or passing in a closure instead:

import com.spotify.flo.Task;
import com.spotify.flo.TaskBuilder.F0;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class App {

  public static void main(String[] args) {
    F0<PipelineOptions> opts = () -> PipelineOptionsFactory.fromArgs(args).create();

    FloRunner
        .runTask(task(opts))
        .waitAndExit();
  }

  public static Task<String> task(F0<PipelineOptions> opts) {
    return Task.named("task")
        .ofType(String.class)
        .process(() -> opts.get().toString());
  }
}

The difficulty of serializing PipelineOptions seems somewhat intentional:

{@link PipelineOptions} is intentionally not marked {@link java.io.Serializable}

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java#L182

However, it is json serializable. Would perhaps be good if flo could attempt json serialization for such types.

They really don't want PipelineOptions to be java serialized:

PipelineOptions objects are not serializable

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java#L175

Serialization of all jackson serializable types (including beam PipelineOptions) should just work after #174: https://github.com/spotify/flo/pull/174/files#diff-423cdf97f3242f0814fc0902ee32232bR84

Not entirely fixed, but error messages are improved in #178

Should get much better error messages in #179