reactive-streams/reactive-streams-jvm

AsyncIterablePublisher violates rule 1.9 on executor failure

mizosoft opened this issue · 2 comments

When the publisher is subscribed to, and an onSubscribe signal is to be scheduled, the executor can reject the drain task. It appears that tryScheduleToExecute anticipates this failure, but it assumes that the subscriber's onSubscribe has been called and calls onError directly. As stated above this might not be the case. For example, the following tck test fail:

@Test
public class AsyncIterablePubisherTest extends PublisherVerification<Long> {

  public AsyncIterablePubisherTest() {
    super(new TestEnvironment());
  }

  @Override
  public Publisher<Long> createPublisher(long elements) {
    return new AsyncIterablePublisher<>(LongStream.range(0, elements)::iterator,
        ForkJoinPool.commonPool());
  }

  @Override
  public Publisher<Long> createFailedPublisher() {
    return new AsyncIterablePublisher<>(List.of(1L, 2L, 3L), r -> {
      throw new RejectedExecutionException("Sorry but I'm really busy");
    });
  }
}

The failed tests are optional_spec104_mustSignalOnErrorWhenFails and required_spec109_mayRejectCallsToSubscribeIfPublisherIsUnableOrUnwillingToServeThemRejectionMustTriggerOnErrorAfterOnSubscribe.
I think simply using a subscribed flag fixes the issue; The flag is set to true in doSubscribe and checked in terminateDueToError, passing a NOOP subscription downstream if not set. But I'm interested as to what really should happen on that case when the executor throws. Doesn't this mean that the publisher can no longer hold the contract of passing downstream signals on the supplied executor ? Shouldn't this be considered as fatal and the subscription should indicate that by rethrowing ?
I've skimmed through Java 9's SubmissionPublisher and it looks like any RuntimeException or Error thrown from the executor is directly rethrown after cancelling the subscription.
I'm interested to know what is the wisest thing to do as i'm implementing an async publisher with a similar setup.
Thanks for any help in advance!

Hi @mizosoft,

Throwing from onSubscribe is not conformant in general: https://github.com/reactive-streams/reactive-streams-jvm#2.13

@viktorklang Thanks for the response!

I think the executor can throw from the publisher's side when tasks are scheduled, not the subscriber. But I've checked the rules again and it seems that neither of subscribe, request or cancel (from which the tasks are normally scheduled) can throw. So It seems that SubmissionPubilsher's behaviour confused me and the error should be signalled downstream.