hazelcast/hazelcast-jet

.rebalance() nonfunctional before fused stages

mwheeler-aicradle opened this issue · 0 comments

Hello, I'm new to jet, running v4.4, and having a problem with rebalance() before fused stages.

This is reproducible with:

    Pipeline p = Pipeline.create();
    p.readFrom(TestSources.itemStream(10))
        .withIngestionTimestamps()
        .rebalance()
        .filter(simpleEvent -> true)
        .filter(simpleEvent -> true)
        .setName("filter trues")
        .writeTo(Sinks.logger());
    JetInstance jet = Jet.bootstrappedInstance();
    jet.newJob(p);

With two jet members running locally stream items are not distributed from one member to the other. This seems to happen in any case where there is a rebalance before a fused stage. One possible clue, in the hz mc for the fused operation I see that the queue cap is zero for half of the processors.

If I remove one of the filters it works fine: stream items are distributed to both members, then filtered, then logs