futureStream from Queue<T> does not fire on timeout.
DarekDan opened this issue · 2 comments
DarekDan commented
Describe the bug
I have created a reproducible test case, and it seems that this old issue still persists:
https://stackoverflow.com/questions/42363084/using-cyclops-react-for-batching-on-a-async-queue-stream
To Reproduce
Steps to reproduce the behavior:
Test case attached in comment.
Expected behavior
The future should be run after 200ms.
DarekDan commented
Here is a test case:
@Test
public void isolatedQueueTest() throws InterruptedException {
Queue<String> pubSubMessageQueue = QueueFactories.<String>unboundedQueue().build();
StreamSource.futureStream(pubSubMessageQueue, new LazyReact(Executors.newCachedThreadPool()))
.parallel()
.groupedBySizeAndTime(3, 20, TimeUnit.MILLISECONDS)
.runFuture(Executors.newCachedThreadPool(), vectors -> vectors.forEach(
bufferedMessages -> log.info("Received {}",
bufferedMessages.stream().collect(Collectors.joining(",")))));
ReactiveSeq.range(0,11)
.forEachAsync(i -> pubSubMessageQueue.offer(i.toString()));
TimeUnit.MILLISECONDS.sleep(1000);
}
The last pair will not be logged.
DarekDan commented
More accurate test case
@Test
public void isolatedQueueTest() throws InterruptedException {
Queue<String> pubSubMessageQueue = QueueFactories.<String>unboundedQueue().build();
ListX<String> receiver = ListX.empty();
StreamSource.futureStream(pubSubMessageQueue, new LazyReact(Executors.newCachedThreadPool()))
.parallel()
.groupedBySizeAndTime(3, 20, TimeUnit.MILLISECONDS)
.runFuture(Executors.newCachedThreadPool(), vectors -> vectors.forEach(
bufferedMessages -> receiver.addAll(bufferedMessages.stream().toList())));
ReactiveSeq.range(0,11)
.forEachAsync(i -> pubSubMessageQueue.offer(i.toString()));
TimeUnit.MILLISECONDS.sleep(1000);
pubSubMessageQueue.close();
TimeUnit.MILLISECONDS.sleep(1000);
assertEquals(11,receiver.size());
}
Without close and secondary wait, this test case will fail.