Fiebr start another Fiber, get value from fiber by fiber.get() throw npe
BigPotato opened this issue · 2 comments
java.lang.NullPointerException
at co.paralleluniverse.strands.Strand.parkNanos(Strand.java:670) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]
at co.paralleluniverse.strands.ConditionSynchronizer.awaitNanos(ConditionSynchronizer.java:79) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]
at co.paralleluniverse.strands.dataflow.Val.get(Val.java:188) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]
at co.paralleluniverse.fibers.Fiber.get(Fiber.java:1396) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]
when I call fiber.get() or fiber.get(10, TimeUnit), it does not wait, throw a npe .
In my system, for each user request, there are lot of concurrent rpc, so I try to adopt Fiber to improve the throughput, but this problem confused me one week, I need some help, thanks in advance!
this is my code snippet, JMonitor.add("fiber.odd.timeout") is triggered, but JMonitor.add("fiber.odd.timeout") is not.
public class FiberHelper {
private static Logger logger = LogManager.getLogger(FiberHelper.class);
private static FiberScheduler fiberScheduler = new FiberForkJoinScheduler("fiber-scheduler", Runtime.getRuntime().availableProcessors() * 2,
(Thread t, Throwable e) -> {
logger.error("fiber exception! " + t.getName(), e);
}, null, false);
@Suspendable
public static Fiber runFiber(String name, SuspendableRunnable runnable) {
Fiber fiber = new Fiber<>(name, fiberScheduler, runnable).start();
return fiber;
}
@Suspendable
public static <T> Fiber<T> runFiber(String name, SuspendableCallable<T> callable) {
return new Fiber<T>(name, fiberScheduler, callable).start();
}
@Suspendable
public static <T> Fiber<T> runFiberCallable(String name, SuspendableCallable<T> callable, CountDownLatch latch) {
return new Fiber<T>(name, fiberScheduler, () -> {
T r = callable.run();
logger.info(name + " -> " + r);
latch.countDown();
return r;
}).start();
}
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
private Map<String, SuspendableCallable> callableMap = Maps.newHashMap();
private CountDownLatch latch;
private Map<String, Fiber> fibers = null;
private Map<String, Optional> resultMap = null;
private boolean isStarted = false;
public Builder submitJob(String name, SuspendableCallable r) {
if (isStarted) {
throw new RuntimeException(" this builder is started, cannot submit more jobs!");
}
if (callableMap.containsKey(name)) {
throw new RuntimeException(name + " duplicated in job list!");
}
callableMap.put(name, r);
return this;
}
@Suspendable
public Builder start() {
if (isStarted) {
throw new RuntimeException(" this builder is started, cannot call start()");
}
if (callableMap.isEmpty()) {
logger.info("no fiber to start");
return this;
}
latch = new CountDownLatch(callableMap.size());
resultMap = Maps.newConcurrentMap();
fibers = callableMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e ->
new Fiber(e.getKey(), fiberScheduler, new SuspendableCallable() {
@Override
@Suspendable
public Object run() throws SuspendExecution, InterruptedException {
if (logger.isDebugEnabled()) {
logger.debug(Thread.currentThread().getName() + " ###fiber start:" + e.getKey());
}
Object r = e.getValue().run();
if (logger.isDebugEnabled()) {
logger.debug(Thread.currentThread().getName() + " ###fiber finish:" + e.getKey() + " -> " + r);
}
resultMap.put(e.getKey(), Optional.ofNullable(r));
logger.info(e.getKey() + " put value to resultMap");
latch.countDown();
return r;
}
}).start()
));
return this;
}
@Suspendable
public boolean awaitMs(long ts) {
return await(ts, TimeUnit.MILLISECONDS);
}
@Suspendable
public boolean await(long ts, TimeUnit unit) {
boolean isDone = false;
try {
long ts1 = System.currentTimeMillis();
isDone = latch.await(ts, unit);
long ts2 = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
String log = fibers.entrySet().stream().map(e -> e.getKey() + " isDone? " + e.getValue().isDone()).collect(Collectors.toList()).toString();
logger.info(ts + " , fiber wait on latch isDone? " + isDone + ", cost " + (ts2 - ts1) + ", " + log);
}
} catch (InterruptedException e) {
logger.error("fail when wait fibers!", e);
}
return isDone;
}
@Suspendable
public Map<String, Fiber> getFibers() {
if (logger.isDebugEnabled()) {
logger.debug("fibers:" + fibers.keySet());
}
return fibers;
}
@Suspendable
public Map<String, Fiber> getFibers(boolean onlyDone) {
return fibers.entrySet()
.stream()
.filter(e -> e.getValue().isDone())
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}
public long getUndoneJobCount() {
return latch.getCount();
}
private boolean switchOn = true;
@Suspendable
private <T> T getFiberResult(String name, T defaultValue, long timeoutMs) {
if (fibers == null || !fibers.containsKey(name)) {
Exception ex = new IllegalStateException(name + " fiber not exist!");
logger.error("get value exception!", ex);
return defaultValue;
}
Optional<T> result = resultMap.getOrDefault(name, Optional.ofNullable(defaultValue));
T r = result.isPresent() ? result.get() : defaultValue;
if (r == null || r == defaultValue) {
logger.error(name + " can not get value from resultMap!");
Fiber<T> fiber = fibers.get(name);
long ts1 = System.currentTimeMillis();
try {
r = fiber.get(timeoutMs > 0 ? timeoutMs : 10, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
JMonitor.add("fiber.odd.timeout");
logger.error(name + " fiber get timeout!");
} catch (Exception e) {
JMonitor.add("fiber.odd.exception");
//logger.error(name + " fiber get exception!", e);
}
long ts2 = System.currentTimeMillis();
logger.error(name + " fiber get cost:"+ (ts2 - ts1));
}
return r;
}
@Suspendable
public <T> T getFiberResultByName(String name, T defaultValue, long timeoutMs) {
if (switchOn) {
return getFiberResult(name, defaultValue, timeoutMs);
}
if (fibers == null || !fibers.containsKey(name)) {
Exception ex = new IllegalStateException(name + " fiber not exist!");
logger.error("get value exception!", ex);
return defaultValue;
}
Fiber<T> fiber = fibers.get(name);
T r = defaultValue;
if (timeoutMs <= 0) {
if (!fiber.isDone()) {
if (latch.getCount() == 0) {
logger.info(name + " odd!!! latch count is 0, but the future is not done!");
try {
r = fiber.get(10, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
logger.info(name + " odd!!! latch count is 0, but the future is not done! try another 3ms, still timeout!");
JMonitor.add("fiber.odd.timeout");
} catch (Exception ex) {
logger.info(name + " odd!!! latch count is 0, but the future is not done! try another 3ms, exception!", ex);
JMonitor.add("fiber.odd.exception");
}
} else {
logger.error(name + " fiber is not done!");
}
} else {
try {
r = fiber.get();
} catch (Exception e) {
logger.error(name + " fail to get value from fiber!", e);
}
}
} else {
try {
r = fiber.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error(name + " fail to get value from fiber with timeout!", e);
}
}
if (logger.isDebugEnabled()) {
logger.debug(name + " get value -> " + r);
}
return r;
}
@Suspendable
public <T> T getFiberResultByName(String name) {
return getFiberResultByName(name, null, -1);
}
@Suspendable
public <T> T getFiberResultByName(String name, long timeoutMs) {
return getFiberResultByName(name, null, timeoutMs);
}
@Suspendable
public <T> T getFiberResultByName(String name, T defaultValue) {
return getFiberResultByName(name, defaultValue, -1);
}
}
}
besides, when i use co.paralleluniverse.strands.concurrent.CountDownLatch , and I call
isDone = latch.await(ts, unit);
java.lang.NullPointerException at co.paralleluniverse.strands.Strand.parkNanos(Strand.java:670) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at co.paralleluniverse.strands.concurrent.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at co.paralleluniverse.strands.concurrent.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1357) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at co.paralleluniverse.strands.concurrent.CountDownLatch.await(CountDownLatch.java:268) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at com.meituan.service.mobile.search.thread.FiberHelper$Builder.await(FiberHelper.java:113) ~[classes/:?] at com.meituan.service.mobile.search.thread.FiberHelper$Builder.awaitMs(FiberHelper.java:105) ~[classes/:?]
when i use java.util.concurrent.CountDownLatch, no this exception.
It must be an incorrect instrumentation.