aol/cyclops

futureStream from Queue<T> does not fire on timeout.

DarekDan opened this issue · 2 comments

Describe the bug
I have created a reproducible test case, and it seems that this old issue still persists:
https://stackoverflow.com/questions/42363084/using-cyclops-react-for-batching-on-a-async-queue-stream

To Reproduce
Steps to reproduce the behavior:
Test case attached in comment.

Expected behavior
The future should be run after 200ms.

Here is a test case:

  @Test
   public void isolatedQueueTest() throws InterruptedException {
       Queue<String> pubSubMessageQueue = QueueFactories.<String>unboundedQueue().build();
       StreamSource.futureStream(pubSubMessageQueue, new LazyReact(Executors.newCachedThreadPool()))
               .parallel()
               .groupedBySizeAndTime(3, 20, TimeUnit.MILLISECONDS)
               .runFuture(Executors.newCachedThreadPool(), vectors -> vectors.forEach(
                       bufferedMessages -> log.info("Received {}",
                               bufferedMessages.stream().collect(Collectors.joining(",")))));
       ReactiveSeq.range(0,11)
               .forEachAsync(i -> pubSubMessageQueue.offer(i.toString()));
       TimeUnit.MILLISECONDS.sleep(1000);
   }

The last pair will not be logged.

More accurate test case

    @Test
    public void isolatedQueueTest() throws InterruptedException {
        Queue<String> pubSubMessageQueue = QueueFactories.<String>unboundedQueue().build();
        ListX<String> receiver = ListX.empty();
        StreamSource.futureStream(pubSubMessageQueue, new LazyReact(Executors.newCachedThreadPool()))
                .parallel()
                .groupedBySizeAndTime(3, 20, TimeUnit.MILLISECONDS)
                .runFuture(Executors.newCachedThreadPool(), vectors -> vectors.forEach(
                        bufferedMessages -> receiver.addAll(bufferedMessages.stream().toList())));
        ReactiveSeq.range(0,11)
                .forEachAsync(i -> pubSubMessageQueue.offer(i.toString()));
        TimeUnit.MILLISECONDS.sleep(1000);
        pubSubMessageQueue.close();
        TimeUnit.MILLISECONDS.sleep(1000);
        assertEquals(11,receiver.size());
    }

Without close and secondary wait, this test case will fail.