hazelcast/hazelcast-jet

ReadMapOrCacheP does not complete

lukasherman opened this issue · 13 comments

Hi Team,
Got into regression issue on Jet 4.5 with a very simple scenario:

 Pipeline p1 = Pipeline.create();
p1.readFrom(Sources.map(sourceMapName)).writeTo(Sinks.map(targetMapName));
jetInstance.newJob(p1).join();

This job never completes. Analyzing result data only some items from sourceMapName is copied over to targetMapName.
Method complete does not return true. Method tryGetNextResultSet() never returns false.
A sample unit test with the code above completes correctly.

Do you have a reproducer? I tried a simple test and it passes.

public class Testt extends JetTestSupport {
    @Test
    public void test() {
        JetInstance inst = createJetMember();

        IMap<Object, Object>srcmap = inst.getMap("srcmap");
        for (int i = 0; i < 10_000; i++) {
            srcmap.put(i, i);
        }

        Pipeline p1 = Pipeline.create();
        p1.readFrom(Sources.map(srcmap)).writeTo(Sinks.map("destmap"));
        inst.newJob(p1).join();

        assertEquals(10_000, inst.getMap("destmap").size());

        inst.shutdown();
    }
}

No simple reproducer so far. Simple unit test - same as yours - passes correctly. Some observations:

  • copying 150 000 records
  • fails consistently on deployed application (3 nodes) or locally (1 node)
  • passes when slowed down a bit by excessive breakpoint logging
  • logging shows that after some time source simply stops producing events, so the map is partially copied over

Isn't the source map mutated at the same time?

No, source map is not mutated.

If it was mutated, there could be duplicated or missing events, but shouldn't get stuck.

A guess: looking at the thread dump doesn't show some blocked thread?

No stuck threads. It just stopped iterating.
Another observation: reducing number of partitions to small number results in “empty but not terminal batch”.

Update:

2021-05-15 12:25:03.039  INFO 9204 --- [cached.thread-4] com.hazelcast.jet.impl.MasterJobContext  : Start executing job '100c-1d3f-b103-0001', execution 100c-1d3f-b103-2001, execution graph in DOT format:
digraph DAG {
	"mapSource(468c1a3583194fce8bb506b8eb20fc89)" [localParallelism=1];
	"mapSink(bic_search_index)" [localParallelism=1];
	"mapSource(468c1a3583194fce8bb506b8eb20fc89)" -> "mapSink(bic_search_index)" [label="partitioned", queueSize=1024];
}

After some looping job stuck at line 161:

...
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:173)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:173)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:173)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:161)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:161)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:161)

if (pendingItem != null && !tryEmit(pendingItem)) {
return false;
}

Looks like outbound queue is full:
image

Anything to check for on receiving map side?

Removed MapStore persistence on receiving map, rerun the test. Got a bit different outcome (job is still not finishing).

Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:173)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.tryGetNextResultSet(ReadMapOrCacheP.java:218) tryGetNextResultSet() returns false
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.complete(ReadMapOrCacheP.java:173) complete (numCompletedPartitions == partitionIds.length) = true

Replaced receiving map with random map name.

2021-05-15 13:16:40.499  INFO 9372 --- [ached.thread-30] com.hazelcast.jet.impl.MasterJobContext  : Start executing job '100c-3535-2283-0001', execution 100c-3535-2283-2001, execution graph in DOT format:
digraph DAG {
	"mapSource(dd2a7d2b43e449d9a2126a4884defa7a)" [localParallelism=1];
	"mapSink(141059ec749749fc9dd0727cce2f21e6)" [localParallelism=1];
	"mapSource(dd2a7d2b43e449d9a2126a4884defa7a)" -> "mapSink(141059ec749749fc9dd0727cce2f21e6)" [label="partitioned", queueSize=1024];
}
...
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.tryGetNextResultSet(ReadMapOrCacheP.java:195)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.tryGetNextResultSet(ReadMapOrCacheP.java:218)
return false
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.complete(ReadMapOrCacheP.java:143)
complete (numCompletedPartitions == partitionIds.length) = false
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.tryGetNextResultSet(ReadMapOrCacheP.java:218) return false
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.complete(ReadMapOrCacheP.java:143) complete (numCompletedPartitions == partitionIds.length) = true
2021-05-15 13:16:46.043  INFO 9372 --- [cached.thread-4] com.hazelcast.jet.impl.MasterJobContext  : Execution of job '100c-3535-2283-8001', execution 100c-3535-2283-a001 completed successfully
	Start time: 2021-05-15T13:16:44.704
	Duration: 00:00:01.339
	To see additional job metrics enable JobConfig.storeMetricsAfterJobCompletion

job completes.

Ok guys, root cause narrowed down to:

 NearCacheConfig nearCacheConfig = new NearCacheConfig();
 config.getMapConfig(targetMapName).setNearCacheConfig(nearCacheConfig);

So looks like we've got two issues here:

  1. AssertionError: empty but not terminal batch. This happens with or without near cache configured on the target map, happens in the source
  2. job stuck, if the target map has near cache configured. I had to disable assertions to reproduce this, otherwise I get the assertion first.

I reproduced both issues on master, so going to fix it there first and then maybe backport it to 4.5.

The reproducer:

public class Testt extends JetTestSupport {
    @Test
    public void test() {
        NearCacheConfig nearCacheConfig = new NearCacheConfig();
        Config config = new Config();
        config.getMapConfig("destmap").setNearCacheConfig(nearCacheConfig);

        JetInstance inst = createJetMember(config);

        IMap<Object, Object> srcmap = inst.getMap("srcmap");
        Map<Object, Object> tmpMap = new HashMap<>();
        for (int i = 0; i < 1_000_000; i++) {
            tmpMap.put(i, i);
            if (tmpMap.size() == 10_000) {
                srcmap.putAll(tmpMap);
                tmpMap.clear();
            }
        }
        srcmap.putAll(tmpMap);

        Pipeline p1 = Pipeline.create();
        p1.readFrom(Sources.map(srcmap)).writeTo(Sinks.map("destmap"));
        inst.newJob(p1).join();

        assertEquals(1_000_000, inst.getMap("destmap").size());

        inst.shutdown();
    }
}

Run the test without -ea to reproduce the 2nd issue. Run it with -ea to reproduce the 1st issue. For the 1st issue, you can even comment out the near cache config.

There's a 3rd issue: we call putAllAsync(buffer) and we clear the buffer immediately after this call returns. However, the putAllAsync keeps using it to invalidate the near cache - that's why the job gets stuck only with a large data set when there's a backpressure - with a few items it clears the map before it tries to invalidate the near cache and the near cache cleanup doesn't fail and the future is completed... This is easy to fix.