holepunchto/hyperbee

`Session is closed` timing issue in createDiffStream

Closed this issue · 2 comments

I'm trying to convert the range watcher into an rxjs observable, based on this test, and I'm running into a HypercoreError: SESSION_CLOSED: Session is closed error when running the following code:

import Hyperbee from "hyperbee";
import Hypercore from "hypercore";
import RAM from "random-access-memory";
import { from, concatMap, catchError, throwError } from "rxjs";
const core = new Hypercore(RAM);
const db = new Hyperbee(core, {
  keyEncoding: "utf-8",
  valueEncoding: "utf-8",
});

const watcher = db.watch();
await watcher.ready();

const withObservable = async () => {
  const watcher$ = from(watcher);

  watcher$
    .pipe(
      concatMap(([current, previous]) =>
        from(current.createDiffStream(previous)).pipe(
          catchError((err) => {
            console.error("Error in inner observable:", err);
            return throwError(() => err);
          }),
        ),
      ),
      catchError((err) => {
        console.error("Error in main observable:", err);
        return throwError(() => err);
      }),
    )
    .subscribe({
      next: (result) => console.log("result", result),
      error: (err) => console.error("Error in subscription:", err),
      complete: () => console.log("complete"),
    });

  await db.put("a", "1");
  // await new Promise((r) => setTimeout(r));
  await db.put("b", "2");
};

await withObservable();

The full error is:

Error in inner observable: HypercoreError: SESSION_CLOSED: Session is closed
    at Hypercore.get (/Users/jun/dev/iios/libraries/api/node_modules/hypercore/index.js:741:38)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
    at async Batch.getBlock (/Users/jun/dev/iios/node_modules/hyperbee/index.js:637:19) {
  code: 'SESSION_CLOSED'
}
Error in main observable: HypercoreError: SESSION_CLOSED: Session is closed
    at Hypercore.get (/Users/jun/dev/iios/libraries/api/node_modules/hypercore/index.js:741:38)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
    at async Batch.getBlock (/Users/jun/dev/iios/node_modules/hyperbee/index.js:637:19) {
  code: 'SESSION_CLOSED'
}
Error in subscription: HypercoreError: SESSION_CLOSED: Session is closed
    at Hypercore.get (/Users/jun/dev/iios/libraries/api/node_modules/hypercore/index.js:741:38)
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
    at async Batch.getBlock (/Users/jun/dev/iios/node_modules/hyperbee/index.js:637:19) {
  code: 'SESSION_CLOSED'
}

If I add the setTimeout in between the db.put()s then there's no error, and I do get the expected results:

result { left: { seq: 1, key: 'a', value: '1' }, right: null }
result { left: { seq: 2, key: 'b', value: '2' }, right: null }

You are only allowed to use the snapshots in the async session of the event, ie

for await (const [cur, prev] of db.watch(...)) {
  // only legal to use the snaps here
}

Thanks @mafintosh your comment helped guide me towards a solution driving an RXJS Subject and manually pushing values from the for await of loop like so:

const diffsSubject = new Subject();

const processWatcher = async (watcher) => {
  try {
    for await (const [current, previous] of watcher) {
      const diffs = [];
      for await (const diff of current.createDiffStream(previous)) {
        diffs.push(diff);
      }
      diffsSubject.next(diffs);
    }
    diffsSubject.complete();
  } catch (error) {
    diffsSubject.error(error);
  }
};

const withSubjectAndForAwait = async () => {
  processWatcher(watcher);

  diffsSubject.subscribe({
    next: (diffs) => console.log(diffs),
    error: (err) => console.error("Error:", err),
    complete: () => console.log("All diffs processed"),
  });

  await db.put("a", "1");
  await db.put("b", "2");
  // ... other db operations ...
};

await withSubjectAndForAwait();