ReadMapOrCacheP does not complete
lukasherman opened this issue · 13 comments
Hi Team,
Got into regression issue on Jet 4.5 with a very simple scenario:
Pipeline p1 = Pipeline.create();
p1.readFrom(Sources.map(sourceMapName)).writeTo(Sinks.map(targetMapName));
jetInstance.newJob(p1).join();
This job never completes. Analyzing result data only some items from sourceMapName
is copied over to targetMapName
.
Method complete
does not return true
. Method tryGetNextResultSet()
never returns false
.
A sample unit test with the code above completes correctly.
Do you have a reproducer? I tried a simple test and it passes.
public class Testt extends JetTestSupport {
@Test
public void test() {
JetInstance inst = createJetMember();
IMap<Object, Object>srcmap = inst.getMap("srcmap");
for (int i = 0; i < 10_000; i++) {
srcmap.put(i, i);
}
Pipeline p1 = Pipeline.create();
p1.readFrom(Sources.map(srcmap)).writeTo(Sinks.map("destmap"));
inst.newJob(p1).join();
assertEquals(10_000, inst.getMap("destmap").size());
inst.shutdown();
}
}
No simple reproducer so far. Simple unit test - same as yours - passes correctly. Some observations:
- copying 150 000 records
- fails consistently on deployed application (3 nodes) or locally (1 node)
- passes when slowed down a bit by excessive breakpoint logging
- logging shows that after some time source simply stops producing events, so the map is partially copied over
Isn't the source map mutated at the same time?
No, source map is not mutated.
If it was mutated, there could be duplicated or missing events, but shouldn't get stuck.
A guess: looking at the thread dump doesn't show some blocked thread?
No stuck threads. It just stopped iterating.
Another observation: reducing number of partitions to small number results in “empty but not terminal batch”.
Update:
2021-05-15 12:25:03.039 INFO 9204 --- [cached.thread-4] com.hazelcast.jet.impl.MasterJobContext : Start executing job '100c-1d3f-b103-0001', execution 100c-1d3f-b103-2001, execution graph in DOT format:
digraph DAG {
"mapSource(468c1a3583194fce8bb506b8eb20fc89)" [localParallelism=1];
"mapSink(bic_search_index)" [localParallelism=1];
"mapSource(468c1a3583194fce8bb506b8eb20fc89)" -> "mapSink(bic_search_index)" [label="partitioned", queueSize=1024];
}
After some looping job stuck at line 161:
...
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:173)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:173)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:173)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:161)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:161)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:161)
Looks like outbound queue is full:
Anything to check for on receiving map side?
Removed MapStore persistence on receiving map, rerun the test. Got a bit different outcome (job is still not finishing).
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:173)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.tryGetNextResultSet(ReadMapOrCacheP.java:218) tryGetNextResultSet() returns false
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.complete(ReadMapOrCacheP.java:173) complete (numCompletedPartitions == partitionIds.length) = true
Replaced receiving map with random map name.
2021-05-15 13:16:40.499 INFO 9372 --- [ached.thread-30] com.hazelcast.jet.impl.MasterJobContext : Start executing job '100c-3535-2283-0001', execution 100c-3535-2283-2001, execution graph in DOT format:
digraph DAG {
"mapSource(dd2a7d2b43e449d9a2126a4884defa7a)" [localParallelism=1];
"mapSink(141059ec749749fc9dd0727cce2f21e6)" [localParallelism=1];
"mapSource(dd2a7d2b43e449d9a2126a4884defa7a)" -> "mapSink(141059ec749749fc9dd0727cce2f21e6)" [label="partitioned", queueSize=1024];
}
...
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.tryGetNextResultSet(ReadMapOrCacheP.java:195)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.tryGetNextResultSet(ReadMapOrCacheP.java:218)
return false
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.complete(ReadMapOrCacheP.java:143)
complete (numCompletedPartitions == partitionIds.length) = false
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.emitResultSet(ReadMapOrCacheP.java:177)
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.tryGetNextResultSet(ReadMapOrCacheP.java:218) return false
Breakpoint reached at com.hazelcast.jet.impl.connector.ReadMapOrCacheP.complete(ReadMapOrCacheP.java:143) complete (numCompletedPartitions == partitionIds.length) = true
2021-05-15 13:16:46.043 INFO 9372 --- [cached.thread-4] com.hazelcast.jet.impl.MasterJobContext : Execution of job '100c-3535-2283-8001', execution 100c-3535-2283-a001 completed successfully
Start time: 2021-05-15T13:16:44.704
Duration: 00:00:01.339
To see additional job metrics enable JobConfig.storeMetricsAfterJobCompletion
job completes.
Ok guys, root cause narrowed down to:
NearCacheConfig nearCacheConfig = new NearCacheConfig();
config.getMapConfig(targetMapName).setNearCacheConfig(nearCacheConfig);
So looks like we've got two issues here:
AssertionError: empty but not terminal batch
. This happens with or without near cache configured on the target map, happens in the source- job stuck, if the target map has near cache configured. I had to disable assertions to reproduce this, otherwise I get the assertion first.
I reproduced both issues on master
, so going to fix it there first and then maybe backport it to 4.5.
The reproducer:
public class Testt extends JetTestSupport {
@Test
public void test() {
NearCacheConfig nearCacheConfig = new NearCacheConfig();
Config config = new Config();
config.getMapConfig("destmap").setNearCacheConfig(nearCacheConfig);
JetInstance inst = createJetMember(config);
IMap<Object, Object> srcmap = inst.getMap("srcmap");
Map<Object, Object> tmpMap = new HashMap<>();
for (int i = 0; i < 1_000_000; i++) {
tmpMap.put(i, i);
if (tmpMap.size() == 10_000) {
srcmap.putAll(tmpMap);
tmpMap.clear();
}
}
srcmap.putAll(tmpMap);
Pipeline p1 = Pipeline.create();
p1.readFrom(Sources.map(srcmap)).writeTo(Sinks.map("destmap"));
inst.newJob(p1).join();
assertEquals(1_000_000, inst.getMap("destmap").size());
inst.shutdown();
}
}
Run the test without -ea
to reproduce the 2nd issue. Run it with -ea
to reproduce the 1st issue. For the 1st issue, you can even comment out the near cache config.
There's a 3rd issue: we call putAllAsync(buffer)
and we clear the buffer
immediately after this call returns. However, the putAllAsync
keeps using it to invalidate the near cache - that's why the job gets stuck only with a large data set when there's a backpressure - with a few items it clears the map before it tries to invalidate the near cache and the near cache cleanup doesn't fail and the future is completed... This is easy to fix.