tc39/proposal-observable

Observable.prototype.subscribe return Promise<Subscription> instead of Subscription

mgtitimoli opened this issue ยท 27 comments

Hi!

Today I found myself dealing with a use case where subscribing is an async process with all this implies.

I found current implementation does not take into account this possibility as if subscribing is not sync, then subscribe method shouldn't return a subscription at the time of executing it, because we still don't know how this process will end.

Nevertheless, I believe this could be solved in a very simple way, simply returning a promise that will resolve to the Subscription or an Error in case subscription failed.

Maybe there is a way to address this case with the current implementation and I'm missing something, if so, I would really appreaciate a guidance on this.

Many thanks!

Can you please post a concrete example, with code snippet, of the problem you're describing?

A basic design decision of es-observable is that subscription is completely synchronous. Promises delay reaction to subscription because they fire their thens after a microtask. Consider:

   const subscription = someObservable.subscribe(e => {
     console.log("Fired!");
   });
   // do some actions
   subscription();

Let's say someObservable fires in a microtick after the subscription - if you get a promise back you can't cancel that subscription in time. This is in particular a problem when writing combinators and operators.

Thus, this can't work - and you can subscribe asynchronously. Of course - since an observable is a push base model your code with the observer should not be concerned of when the data is available - it subscribes when it chooses and then all the events are up to the producer - they may be sync or async.

I don't think this is doable.

A subscription is as much about signalling that you don't want something to happen as it is about cancelling something that's currently happening.

If you subscribe to an observable and then in the same job, synchronously decide you want to unsubscribe from it, you won't be able to do so, because Promises guarantee that you'll have to wait until the next tick before you have a subscription back. By then it might be too late.

There are a variety of ways to subscribe to something asynchronously and still return a Subscription. It's very easy with Rx... but even with the most primitive es-observable you can still do it.

new Observable(observer => {
  let sub;

  // wait a whole second and subscribe to an observable
  const id = setTimeout(() => {
    // subscription here
    sub = getSomeAjaxObservable().subscribe({ next(json) { doAThing(json) } });
  }, 1000);
  
  return () => {
    clearTimeout(id);
    if (sub) sub.unsubscribe();
  };
});

I mean, if there's a specific scenario, maybe I could help more.

I understand that changing the return to a promise make it difficult to manage the rest of operators, but this is not entirely true, since once that promise is resolved, you can proceed as usual.

The point here I'm trying to expose is that there are cases where the subscription depends on an async process, and this one could fail, so for these cases returning directly a subscription does not model the reality, that's why I was proposing to analyze the posibility of returning a promise that resolves with the subscription once it's done or fails if it couldn't subscribe.

My proposal allows the producer to be async (and the subscriber to get information about how subscribing occured), so it will work as usual but instead of creating the subscription directly, it will have to be created awaiting for it to return the disposal.

As I mentioned before, maybe I'm missing something, but as far as I can see the use case, which in my opinion, is pretty common, can't be modelled with the current API.

@mgtitimoli: again, I ask for a concrete example of the problem you're encountering.

As the others have pointed out, Observables are designed to work in both synchronous and asynchronous scenarios. You should be able to interoperate with Promises using the existing specification, and making the subscribe method itself asynchronous would have huge ramifications to the spec and the ecosystem.

We'd like to help, but that's hard if we don't understand the problem. ๐Ÿ˜ƒ

sure @appsforartists, I will try to do my best at explaining the use case with a concrete example, I guess this is better as posting some code without the proper background.

I have a absinthe-phoenix server which I'm subscribing, and the subscription is not instantly, it takes 3 messages to establish. I need to upload a file to S3 after the subscription is established. Once the file is uploaded, S3 hits the server and it pushes notifications to the subscription.

So, the subscription is modeled using an observable, the question would be:

How can I know that the subscription has been established so I can upload the file to S3?

And again, I completely understand that what I'm proposing might sound crazy, but take some time to think about it, what I'm describing is a pretty common case.

The other alternative would be to trigger start in an async way, but doing this without returning a promise will force subscribe to return nothing, that's why I was suggesting the promise approach.

Right now, we are always getting a subscription, no matter if it has been established or not...

  • what would happen if there is still no subscription and you are trying to unsubscribe? enqueued the operation?
  • what if I need to do something once the subscription has been established (this is what triggered the issue)?
  • this opens a new issue, the same problem exists with unsubscribe, what if I want to know the result so if it failed I can retry until it success? (returning a Promise can also solve this problem)

I think you've misunderstood the distinction between an Observable and its subscription. If you understand Promises, observable.subscribe(callback) is analogous to promise.then(callback). In each case, when the promise or observable has some data, it calls callback with that data as an argument. The difference is that an observable may call the callback more than once, if it has more than one piece of data to pass along. Regardless, when the observable passes data to the callback (what we call the observer), the subscription isn't involved.

then returns the promise itself, so you can chain method calls. Instead, subscribe returns a subscription. A subscription is an object that lets you stop listening to the observable (by calling subscription.unsubscribe()). Promises can't be cancelled; because observables have a subscription, they can.

I'm afraid that your question is out-of-scope for this forum. (This is specifically to describe the specification/implementation of the Observable object.) I don't know enough about your question to know if you need Observables at all - if you only care about a single event, a Promise might suit your needs better.

In any case, I recommend closing this issue and asking your question (with more specific detail) in the RxJS community, or on Stack Overflow. I'm sorry I don't have time to be more helpful now, but in any case, this is the wrong venue for that discussion.

I understand @appsforartists that it might be difficult to see the case I explained above, which in my opinion when you are dealing with client/server subscriptions it's a very common case, but anyway, so based on what you are saying, all subscriptions are synchronous, and is it OK to return the subscription object directly without taking into account that the subscriber can fail asynchronously? and I should somehow do a middleware or some kind of hack to get that info from the subscriber because there is no chance to consider this?

No, I'm saying that the subscription object has nothing to do with whether or not an observable is asynchronous, and that you'd have better luck explaining your specific situation in a forum like Stack Overflow than asking it here.

The observer might receive a value synchronously or asynchronously. That depends on the implementation of the connect function, which you control. For instance, here is an observable with a connect function that waits for a promise to resolve before passing a value to the observer:

const database$ = new Observable(
  function connect(observer) {
    let receivedDatabase;

    waitForDatabase().then(
      database => {
        receivedDatabase = database;
        observer.next(database);
      }
    );

    return function disconnect() {
      if (receivedDatabase) {
        receivedDatabase.destroy();
      }
    }
  }
);

const subscription = database$.subscribe(
  (database) => {
    doSomeS3Stuff(database.read('something'))
  }
);

// if later you decide you don't want to connect to the database or get any more
// values from database$:
justKiddingIDontWantValuesAnymore.then(
  () => subscription.unsubscribe();
)

That's just an example, because I don't know the specifics of your scenario. If you only care about a single database object, a Promise might suit your needs better than an Observable.

Please close this issue and ask your question, in as much detail as possible, in a more appropriate forum.

For more information on how Observables work (including what an Observable is, what a Subscription is, and what a connect function is), see this guide I wrote to explain our simplified subset of this Observable spec, or the RxJS Manual.

Thanks @appsforartists, your example almost addressed the issue, but connecting to a database should be a single time operation, and shouldn't depend on the observer, so let's modify it a bit, let's say you have a database that supports receiving queries to observe, so the client sends a query and then a persistent connection is established between it and the database, so each time the dataset that matches the query changes, the database pushes the new values to the client. You could model your client using an observable similar to the following one:

const queryListener$ = database.observeQuery(query);

const subscription = queryListener$.subscribe(observer);

So you got a subscription but the database hasn't answered yet with the ACK, so you are in a stage, where you believe a connection has been established and it hasn't yet, and maybe it could even fail, so this is the case I'm trying to explain since the beginning, does it make sense now?

What if you would like to know if the connection has been established in order to do something else inmediately after that event?

The Subscription object is not there to indicate that the producer is ready.

The Subscription object is there to represent that a Subscription exists and can now be unsubscribe from.

If you need to know if a producer is ready that have another Observable which will produce an event once the producer is ready. (This is potentially an anti-pattern in reactive programming)

There is a contradiction between the 2 statements you wrote @acutmore:

The Subscription object is there to represent that a Subscription exists and can now be unsubscribe from.

Sure, I agree, but there are cases where the subscription is not done instantaneously

If you need to know if a producer is ready that have another Observable which will produce an event once the producer is ready. (This is potentially an anti-pattern in reactive programming)

The readiness of the producer coincides with the subscription ready event, so knowing when the producer is ready would be the same as knowing when the subscription has been established, with a potential place of having an error in the meantime.

IMHO this is not an antipattern, but something pretty common one would like to know.

To be clearer:

The Subscription object is there to represent that a local Subscription exists and can now be unsubscribed from. That subscription may have started some async tasks and that does not make it any less of a subscription.

Maybe one way of thinking about it is seeing the subscription object more like a 'SubscriptionRequest'. When I call a function which returns a promise, it returns that promise synchronously - that promise does not mean that all connections have been setup, the promise represents the value or error that will eventually be returned. Subscriptions are also returned synchronously because they are a representation, a placeholder, a box, a token.

Furthermore making .subscribe return a promise adds overhead to every subscription and also makes it impossible to unsubscribe until the producer is ready.

I believe the problem you are trying to solve already has solutions without making such fundamental changes to how observables work.

People, in order to move forward with this (or any) issue there are 2 steps you need to follow:

  1. Analyze the facts, their impact and scope

In this step everyone in the discussion should try to be as open minded as possible, listen to everyone else opinion, and do her best to be able to understand what's going on. Check the proves, and try to refutate them with solutions. This sometimes can be really hard, because when we are too used to something we might have found some hacks to overcome its limitations, that's why I mentioned to be as open minded as possible, or in other words, if you are about to present a hack or a workaround, this won't refute the fact, because the fact appeared because of an error or a limitation on a tool that is trying to model the reality.

  1. Present alternatives to deal (or not) with the facts

Once everyone understood the facts, it's time to present some alternatives to deal with them, where certainly one could be TO NOT DO ANYTHING, which is perfectly fine, but if this path is taken, then everyone who is using this API should be aware of its limitation, this means that the people behind it have to clearly express that it won't be possible to overcome this issue it has, and ideally express why an action hasn't been taken to improve it. In the other case, this is, if people are open to discuss about alternatives, then each of the one presented has to be evaluated analyzing its pros and cons, and pick the most balanced one.

As I have already mentioned, I understand it's hard to be open mind where you have been using something for a while, and you are too used to it at the point that there is no space in your mind to ever thing about expanding its API or analyze its limitation, or in a few words, to be impartial. Nevertheless, I remind you that we are discussing about a Spec, not a userland lib/tool, so this means that there is a tremendous oportunity to improve the existing tools, to better model the reality, and to open them to fields that are currently closed because what we have been using for so long, didn't take this into account. If a Spec is created, then be sure that the existing tools will change to align to it, but please we should give us the oportunity to go in this direction and not in the opposite one, that will be, to model some Spec (and close it) based on the existing tools. One example of this I'm saying is the case of Promises, that the Spec removed the concept of Deferred, and what did happen next? Nothing, the libs out there aligned with it.

Based on what I could pick from your comments, you implicitly agreed in your examples about the facts, have a look for example to this piece of code:

    return function disconnect() {
      // What would happen if the promise failed?
      // Spoiler: you leaked a connection
      if (receivedDatabase) {
        receivedDatabase.destroy();
      }
    }

So to be clear, the questions you should be asking yourselves in order to understand the impact and the scope of this are the one expressed in this comment.

Sorry but I won't continue discussing about the facts, because this is drained me a lot of energy, but I'll happily get into the step 2 with anyone who is open to discuss about this.

In anycase, thanks to everyone.

@mgtitimoli There are some good ideas in this thread on how you can tweak your modeling to get the API that you want using a combination of Observables and Promises, but for the reasons mentioned by @benlesh the Subscription object must be returned from subscribe synchronously.

Hi @zenparsing I saw the comment @benlesh did, but there are 2 misconceptions in it:

A subscription is as much about signalling that you don't want something to happen as it is about cancelling something that's currently happening.

I agree on this one, but there is intrinsic point in it that in fact is what I exposed:

At the point of returning the subscription, the connection couldn't have been established, and therefore nothing is currently happening yet, but it might start happening if the producer is able to start producing values or not, and in this case the subscription doesn't represent anything

There are a variety of ways to subscribe to something asynchronously and still return a Subscription

The point of this issue is not to discuss about how to subscribe to something asynchronously, but to expose the fact that there are cases where the producer needs to do some stuff to make effective the subscription, and this could fail, or maybe not, but in anycase you can't know about this using the current API without doing any hack.

The subscribe method returns something to signal that the the subscription has effectively taken place, the current API assumes a subscription is always created and does not take into account a possible error in the producer, and neither tracks the process of subscribing, since it returns the subscription synchronously and therefore also calls start in the same way.

We are losing some important part of the process, and this could end up in leaks and other kind of errors, and we are also closing the possibility of the one who is calling subscribe (or unsubscribe) to take an action to handle this.

@mgtitimoli From the description of your use case, it sounds like there are two async "fundamentals" at play:

  1. The setup of a connection to some external resource, which could succeed or fail, and
  2. The sequence of notifications coming from that external resource.

Perhaps it would make sense to use a promise-returning function to represent the first and an observable to represent the second. For instance:

async function connect() {
  // Attempt to connect to the database asynchronously and then:
  return {
    query(q) {
      return new Observable(...);
    }
  };
}

async function main() {
  let db = await connect();
  let subscription = db.query(databaseQuery).subscribe(next => {
    console.log(`Got query result ${ next }`);
  });
}

Does this capture what you're trying to express?

By keeping the primatives (like Promise and Observable) separate, we have more flexibility to compose them in whatever combination we like.

@zenparsing thanks, that addresses one subcase of this issue, that is when your producer needs to do something only once and then it can start producing values for every observer. Or in other words, your example shows the case when your producer requirements doesn't depend on the observer but on something else that it can be resolved beforehand.

I can't see a way to express the other case, that would be:

What can we do when your producer needs to do something before creating a subscription and this something depends on each observer?

An example could be incrementing a counter on each subscription and only if this has succeed start getting values from somewhere else.

Maybe I'm missing something -> why wouldn't the failure to connect just be passed down to your subscribers via an error? Ie, in pseudocode...

var dbConnect$ = Observable.create(observer=>{
    // logic here to establish your connection...
    if(<successful connection>) {
        observer.next();
        observer.complete(); // you could complete here since it just needs to connect once...
    }
    else { // assuming failed connection
        observer.error();
    }
});

Then anything downstream of this Observable, whether it be another Observable used in conjunction with this one to send/receive messages, or a subscriber consuming those messages, etc, would get the error from the failed connection? Then you can implement whatever error handling you want...

Hi @skokenes,

As I wrote before, the database example doesn't explain the case where your producer has to do something on each observer, and the subscription (producer is able to start producing values) depends on this. You might also want to know when the subscription has been established since that event could be a requirement for some other operation.

Moreover, in the case I expressed above you would like to know when this happens, and the same would apply for unsubscribing, what if your subscription needs to do something before unsubscribing, you (the one who called unsubscribe) would like to know how this process went, so if for example you couldn't unsubscribe you could implement a retry mechanism until this is accomplished.

One thing we haven't explored yet is the addition of a ready event, despite the fact that I would prefer to model this by wrapping the Observable.subscribe and Subscription.unsubscribe responses with promises, adding a ready event could be an intermediate solution to this issue. So where I still believe we shouldn't be conditioned by the libs depending on this implementation (it should instead be the opposite), taking this path won't break them, but this will only solve the subscription issue since we can add this extra method to the observer, but unsubscribe will still suffer the same.

If there is no chance to take this issue into account, and add the required modifications to support these use cases, by wrapping Observable.subscribe and Subscription.unsubscribe with Promises, or partially support it by adding the ready event, then the only way to handle this would be by using a combination of async generator and a promise like it follows:

const createSubscription = async *(...subscriptionArgs) => {
    // Note:
    // subscriptionArgs will include for sure something that holds
    // what it has be done (asynchronousy) in subscribe,
    // which might be used to get the values

    // sadly there is no other way to trap subscription.return (unsubscribe),
    // but only by the use of a try statement
    try {
        // yield values or error
    } finally {
        // proceed with unsubscription
    }
};

const subscribe = (...subscribeArgs) => new Promise((resolve, reject) => {
     // do whatever you need to do in order to subscribe
     // (each subscription is independent of the other) and...
     // if everything went well
     // => resolve(createSubscription(/* args could depend on the async op or not */));
     // if not
     // => reject(error)
});

We can use this approach as each async generator would be independent of the others (the same beviour that we have in Observable each time we call the subscribe method).

The unsubscribing mechanism could be implementing by calling the return method available at Generator.prototype, but it will also share the same issue with the observable subcription that once it is called, there is no way to call it again (retry) in case it failed.

Nevertheless I will keep an eye here, I would love to see Observable improving to take this into account.

Thanks.

This is probably an inappropriate forum for this discussion to continue as others have pointed out - have you asked this same question on StackOverflow? Here is one last attempt at an answer on how you could meet your requirements based on the existing API.

My understanding of your example is that you have 1 async operation that happens once, like connects to a database. Assuming that has worked ok, then for every subscriber you want a file to get uploaded, then messages to be returned to the subscriber.

Does this example accomplish that? See this JSBin to see the messages that get logged to the console. The db connects once, then for each subscriber a file gets uploaded and then messages are passed down.

var dbConnect$ = Rx.Observable.create(observer=> {
  // Model your async logic for getting connected, authenticated, etc as a 1s delay, then signal that it worked
  setTimeout(()=>{
    console.log("db connection created!");
     observer.next();
     observer.complete();
  },1000)
})
.share();

var uploadFile$ = dbConnect$
  .ignoreElements()
  .concat(Rx.Observable.create(observer=>{
    // model your async logic for a file getting uploaded as a 1s delay
    setTimeout(()=>{
      console.log("file got updated!");
      observer.next();
      observer.complete();
    },1000)
  }));

var messagesFromServer$ = uploadFile$
  .ignoreElements()
  .concat(Rx.Observable.interval(1000).map(m=>"message " + m));

// subscriber 1 -> initiates a new file upload and then messages
messagesFromServer$.subscribe(s=>console.log("sub 1 got message: " + s));

// subscriber 2 -> initiates a new file upload and then messages
messagesFromServer$.subscribe(s=>console.log("sub 2 got message: " + s));```

Let's simplify this a little more: A Subscription can actually wrap a Promise<Subscription>, but not the other way around. So we should be returning Subscription.

const eventualSubscription = Promise.resolve(new Subscription());

const sub = new Subscription(() => {
  eventualSubscription.then(sub => {
      sub.unsubscribe();
  });
});

sub.unsubscribe();

What you might be asking for is the ability to return a promise from inside of the Observable's teardown function, which is a different animal. That would mean a variety of things, that are probably worth discussing:

  1. teardown functions can return a value or a promise.
  2. teardown is always synchronous now, and needs to be synchronous to support things like EventTarget, how will adding Promises into the mix effect things?
  3. Should unsubscribe() then return a Promise<void>?

These are things we've discussed a little over in the RxJS community, but I'm not sure they're tenable because of Promise's forced asynchrony and the implications on using Observable for APIs that can be completely synchronous like EventTarget.

Returning Promise<void> from unsubscribe will definitely work.

This change would open the door to the cases where unsubscribing is not synchronous and even better, now the one who is triggering the unsubscription would be able to track the process, having a way to know when it finished if successful or get the error in case it failed (being able to retry if necessary).

@benlesh regarding to subscribe, sorry but I I didn't get what you mean with wrapping a Promise with a Subscription, how doing this would allow the caller to know when subscription finished or get the error in case it failed?

And with the things you enumerated to discuss, I totally agree that the main point is

how will adding Promises into the mix effect things?

I'm convinced that what these changes provide at least worth getting into this discussion.

I don't think a Promise will work for unsubscribe. There are APIs we want to be able to represent, like EventTarget, that require totally synchronous unsubscription. And returning a teardown Observable might just get crazy.

The only thing I see that we can't accomplish easily with the current API is a situation where you'd want to call some teardown logic that was asynchronous, but you want to call it in a serial fashion rather than concurrently. But that seems like a pretty extreme edge case. I'd need to see the use cases around that to even understand why someone might want to do that.