andywer/threads.js

Pool initialization rejections cannot be handled currently, and they return errors in the global main handler of javascript.

bhavitsharma opened this issue · 1 comments

There's no way to handle pool init errors. Consider a scenario where a website's CSP doesn't allow us to run a worker; the pool init will fail to make a handshake with the worker because the worker never gets initialized. If we queue tasks to the pool, then the PromiseLike returned from the queue method of the pool class is not catchable.

For example, if you have this code:

  let work;
  try {
    work = new Worker(new URL(`data:text/javascript;base64:${workerJs}`))
  } catch (e) {
    work = new Worker();
  }
  const pool = Pool(() =>
    spawn(work, {
      timeout: 1000,
    }),
    1
  );

  const allTasks = [];
  for (let i = 0; i < 10; i ++) {
    const p = new Promise(async (resolve, reject) => {
      try {
        const t = await pool.queue((e) => {});
        return resolve(t);
      } catch (e) {
        return reject(e);
      }
    });
    allTasks.push(p.catch(e => {console.log(e)}));
  }

// You can never catch it since it always throws errors in the main thread.
await Promise.all(allTasks);

Possible Fix

When a pool fails to init, it inserts an error event to the eventSubjects stream:
https://sourcegraph.com/github.com/andywer/threads.js/-/blob/src/master/pool.ts?L151&subtree=true

this.eventSubject.error(error)

We can't catch the error because the taskCompletion method does not subscribe to the error events. It's only subscribing to the next() events. Adding a case for the error in the below code appropriately rejects the error.
https://sourcegraph.com/github.com/andywer/threads.js/-/blob/src/master/pool.ts?L233&subtree=true#tab=references

// minified.
 taskCompletion(e) {
            return new Promise((t, r) => {
              const n = this.events().subscribe({
                next(o) {
                  o.type === u.PoolEventType.taskCompleted && o.taskID === e
                  ? (n.unsubscribe(), t(o.returnValue))
                  : o.type === u.PoolEventType.taskFailed && o.taskID === e
                  ? (n.unsubscribe(), r(o.error))
                  : o.type === u.PoolEventType.terminated &&
                    (n.unsubscribe(),
                    r(Error("Pool has been terminated before task was run.")));
                },
                error: r
              });
            });

I can create a PR with the fix but want to know if I am doing something wrong.

Monkey patching the taskCompletion function of the pool works fine for now:

  pool["taskCompletion"] = (taskID) => {
    return new Promise((resolve, reject) => {
      const eventSubscription = pool.events().subscribe({
        next(event) {
          if (
            event.type === Pool.EventType.taskCompleted &&
            event.taskID === taskID
          ) {
            eventSubscription.unsubscribe();
            resolve(event.returnValue);
          } else if (
            event.type === Pool.EventType.taskFailed &&
            event.taskID === taskID
          ) {
            eventSubscription.unsubscribe();
            reject(event.error);
          } else if (event.type === Pool.EventType.terminated) {
            eventSubscription.unsubscribe();
            reject(Error("Pool has been terminated before task was run."));
          }
        },
        error: reject,
      });
    });
  };

If the snippet makes sense, then I can raise a PR.