puniverse/quasar

Fiebr start another Fiber, get value from fiber by fiber.get() throw npe

BigPotato opened this issue · 2 comments

java.lang.NullPointerException
	at co.paralleluniverse.strands.Strand.parkNanos(Strand.java:670) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]
	at co.paralleluniverse.strands.ConditionSynchronizer.awaitNanos(ConditionSynchronizer.java:79) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]
	at co.paralleluniverse.strands.dataflow.Val.get(Val.java:188) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]
	at co.paralleluniverse.fibers.Fiber.get(Fiber.java:1396) ~[quasar-core-0.7.9-jdk8.jar:0.7.9]

when I call fiber.get() or fiber.get(10, TimeUnit), it does not wait, throw a npe .

In my system, for each user request, there are lot of concurrent rpc, so I try to adopt Fiber to improve the throughput, but this problem confused me one week, I need some help, thanks in advance!

this is my code snippet, JMonitor.add("fiber.odd.timeout") is triggered, but JMonitor.add("fiber.odd.timeout") is not.

public class FiberHelper {

private static Logger logger = LogManager.getLogger(FiberHelper.class);

private static FiberScheduler fiberScheduler = new FiberForkJoinScheduler("fiber-scheduler", Runtime.getRuntime().availableProcessors() * 2,
        (Thread t, Throwable e) -> {
            logger.error("fiber exception! " + t.getName(), e);
        }, null, false);

@Suspendable
public static Fiber runFiber(String name, SuspendableRunnable runnable) {
    Fiber fiber = new Fiber<>(name, fiberScheduler, runnable).start();
    return fiber;
}

@Suspendable
public static <T> Fiber<T> runFiber(String name, SuspendableCallable<T> callable) {
    return new Fiber<T>(name, fiberScheduler, callable).start();
}

@Suspendable
public static <T> Fiber<T> runFiberCallable(String name, SuspendableCallable<T> callable, CountDownLatch latch) {
    return new Fiber<T>(name, fiberScheduler, () -> {
        T r = callable.run();
        logger.info(name + " -> " + r);
        latch.countDown();
        return r;
    }).start();
}

public static Builder newBuilder() {
    return new Builder();
}
public static class Builder {
    private Map<String, SuspendableCallable> callableMap = Maps.newHashMap();
    private CountDownLatch latch;
    private Map<String, Fiber> fibers = null;
    private Map<String, Optional> resultMap = null;
    private boolean isStarted = false;

    public Builder submitJob(String name, SuspendableCallable r) {
        if (isStarted) {
            throw new RuntimeException(" this builder is started, cannot submit more jobs!");
        }
        if (callableMap.containsKey(name)) {
            throw new RuntimeException(name  + " duplicated in job list!");
        }
        callableMap.put(name, r);
        return this;
    }

    @Suspendable
    public Builder start() {
        if (isStarted) {
            throw new RuntimeException(" this builder is started, cannot call start()");
        }
        if (callableMap.isEmpty()) {
            logger.info("no fiber to start");
            return this;
        }
        latch = new CountDownLatch(callableMap.size());
        resultMap = Maps.newConcurrentMap();
        fibers = callableMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e ->
            new Fiber(e.getKey(), fiberScheduler, new SuspendableCallable() {
                @Override
                @Suspendable
                public Object run() throws SuspendExecution, InterruptedException {
                    if (logger.isDebugEnabled()) {
                        logger.debug(Thread.currentThread().getName() + " ###fiber start:" + e.getKey());
                    }
                    Object r = e.getValue().run();
                    if (logger.isDebugEnabled()) {
                        logger.debug(Thread.currentThread().getName() + " ###fiber finish:" + e.getKey() + " -> " + r);
                    }
                    resultMap.put(e.getKey(), Optional.ofNullable(r));
                    logger.info(e.getKey() + " put value to resultMap");
                    latch.countDown();
                    return r;
                }
            }).start()
        ));
        return this;
    }

    @Suspendable
    public boolean awaitMs(long ts) {
        return await(ts, TimeUnit.MILLISECONDS);
    }

    @Suspendable
    public boolean await(long ts, TimeUnit unit) {
        boolean isDone = false;
        try {
            long ts1 = System.currentTimeMillis();
            isDone = latch.await(ts, unit);
            long ts2 = System.currentTimeMillis();
            if (logger.isDebugEnabled()) {
                String log = fibers.entrySet().stream().map(e -> e.getKey() + " isDone? " + e.getValue().isDone()).collect(Collectors.toList()).toString();
                logger.info(ts + " , fiber wait on latch isDone? " + isDone + ", cost " + (ts2 - ts1) + ", " + log);
            }
        } catch (InterruptedException e) {
            logger.error("fail when wait fibers!", e);
        }
        return isDone;
    }

    @Suspendable
    public Map<String, Fiber> getFibers() {
        if (logger.isDebugEnabled()) {
            logger.debug("fibers:" + fibers.keySet());
        }
        return fibers;
    }

    @Suspendable
    public Map<String, Fiber> getFibers(boolean onlyDone) {
        return fibers.entrySet()
                     .stream()
                     .filter(e -> e.getValue().isDone())
                     .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
    }

    public long getUndoneJobCount() {
        return latch.getCount();
    }

    private boolean switchOn = true;

    @Suspendable
    private <T> T getFiberResult(String name, T defaultValue, long timeoutMs) {
        if (fibers == null || !fibers.containsKey(name)) {
            Exception ex = new IllegalStateException(name  + " fiber not exist!");
            logger.error("get value exception!", ex);
            return defaultValue;
        }

        Optional<T> result = resultMap.getOrDefault(name, Optional.ofNullable(defaultValue));
        T r = result.isPresent() ? result.get() : defaultValue;

        if (r == null || r == defaultValue) {
            logger.error(name + " can not get value from resultMap!");
            Fiber<T> fiber = fibers.get(name);
            long ts1 = System.currentTimeMillis();
            try {
                r = fiber.get(timeoutMs > 0 ? timeoutMs : 10, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                JMonitor.add("fiber.odd.timeout");
                logger.error(name + " fiber get timeout!");
            } catch (Exception e) {
                JMonitor.add("fiber.odd.exception");
                //logger.error(name + " fiber get exception!", e);
            }
            long ts2 = System.currentTimeMillis();
            logger.error(name + " fiber get cost:"+ (ts2 - ts1));
        }

        return r;
    }

    @Suspendable
    public <T> T getFiberResultByName(String name, T defaultValue, long timeoutMs) {
        if (switchOn) {
            return getFiberResult(name, defaultValue, timeoutMs);
        }

        if (fibers == null || !fibers.containsKey(name)) {
            Exception ex = new IllegalStateException(name  + " fiber not exist!");
            logger.error("get value exception!", ex);
            return defaultValue;
        }
        Fiber<T> fiber = fibers.get(name);
        T r = defaultValue;

        if (timeoutMs <= 0) {
            if (!fiber.isDone()) {
                if (latch.getCount() == 0) {
                    logger.info(name + " odd!!! latch count is 0, but the future is not done!");
                    try {
                        r = fiber.get(10, TimeUnit.MILLISECONDS);
                    } catch (TimeoutException e) {
                        logger.info(name + " odd!!! latch count is 0, but the future is not done! try another 3ms, still timeout!");
                        JMonitor.add("fiber.odd.timeout");
                    } catch (Exception ex) {
                        logger.info(name + " odd!!! latch count is 0, but the future is not done! try another 3ms, exception!", ex);
                        JMonitor.add("fiber.odd.exception");
                    }
                } else {
                    logger.error(name + " fiber is not done!");
                }
            } else {
                try {
                    r = fiber.get();
                } catch (Exception e) {
                    logger.error(name + " fail to get value from fiber!", e);
                }
            }
        } else {
            try {
                r = fiber.get(timeoutMs, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                logger.error(name + " fail to get value from fiber with timeout!", e);
            }
        }

        if (logger.isDebugEnabled()) {
            logger.debug(name + " get value -> " + r);
        }
        return r;
    }

    @Suspendable
    public <T> T getFiberResultByName(String name) {
        return getFiberResultByName(name, null, -1);
    }

    @Suspendable
    public <T> T getFiberResultByName(String name, long timeoutMs) {
        return getFiberResultByName(name, null, timeoutMs);
    }

    @Suspendable
    public <T> T getFiberResultByName(String name, T defaultValue) {
        return getFiberResultByName(name, defaultValue, -1);
    }
}

}

besides, when i use co.paralleluniverse.strands.concurrent.CountDownLatch , and I call

isDone = latch.await(ts, unit);

java.lang.NullPointerException at co.paralleluniverse.strands.Strand.parkNanos(Strand.java:670) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at co.paralleluniverse.strands.concurrent.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at co.paralleluniverse.strands.concurrent.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1357) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at co.paralleluniverse.strands.concurrent.CountDownLatch.await(CountDownLatch.java:268) ~[quasar-core-0.7.9-jdk8.jar:0.7.9] at com.meituan.service.mobile.search.thread.FiberHelper$Builder.await(FiberHelper.java:113) ~[classes/:?] at com.meituan.service.mobile.search.thread.FiberHelper$Builder.awaitMs(FiberHelper.java:105) ~[classes/:?]

when i use java.util.concurrent.CountDownLatch, no this exception.

It must be an incorrect instrumentation.