Running graph twice is not safe.
Opened this issue · 2 comments
Many of the javadsl methods are not safe when running the same graph twice.
For example, fold takes a concrete object as first parameters, such that all streams will use the same object. If you try to use with the List, then each run of the graph will reuse the same list, and results will be jumbled up together.
In the other extreme, there is Sink.lazyInit that has 2 levels of indirection. 1. the function. 2. the future. With this I can compensate for lack of methods that use Creator<> instead of concrete objects, but the code becomes a difficult to decipher mess.
You must use immutable collections with all such stages. I can see how this feels confusing from the Java point of things where immutable collections are not commonly used (compared to Scala where it is pretty much the default), so maybe we could improve on that by being more clear in the docs of the Java APIs.
Note that this repository is for additional community contributed stages, so if you feel we should continue discussing this, please open a ticket in the akka/akka repository instead, thanks!
I agree with Johan that it is intended to be used with immutable data structures.
I was thinking if there is any way to make it somewhat usable with mutable collections. Here is a sketch:
public class LazyMutableValue<T> {
private final Supplier<T> factory;
protected LazyMutableValue() {
this.factory = null;
}
public LazyMutableValue(Supplier<T> factory) {
this.factory = factory;
}
public Initialized<T> init() {
return new Initialized<T>(factory.get());
}
public T getValue() {
return init().getValue();
}
private static class Initialized<T> extends LazyMutableValue<T>{
private final T value;
public Initialized(T value) {
this.value = value;
}
@Override
public Initialized<T> init() {
return this;
}
@Override
public T getValue() {
return value;
}
}
}
It can be used like:
ActorSystem sys = ActorSystem.create("Sys");
Materializer mat = ActorMaterializer.create(sys);
Source<Integer, NotUsed> source = Source.range(1, 100);
Sink<Integer, CompletionStage<LazyMutableValue<List<Integer>>>> sink =
Sink.fold(new LazyMutableValue<>(() -> new ArrayList<Integer>()), (acc, elem) -> {
LazyMutableValue<List<Integer>> nextAcc = acc.init();
nextAcc.getValue().add(elem);
return nextAcc;
});
RunnableGraph<CompletionStage<LazyMutableValue<List<Integer>>>> runnable =
source.toMat(sink, Keep.right());
CompletionStage<LazyMutableValue<List<Integer>>> result1 = runnable.run(mat);
CompletionStage<LazyMutableValue<List<Integer>>> result2 = runnable.run(mat);