neo4j/neo4j-javascript-driver

[Bug] Neo4jError: Ignored either because of an error or RESET

gchicoye opened this issue · 8 comments

Bug Report

While updating my Neo4J database (community edition) with the following script

const upsertPromiseArrayCreator = (dbEntries:DbEntry[], tx:ManagedTransaction)
:Promise<QueryResult>[] => dbEntries
  .filter((dbEntry) => dbEntry.action === 'UPSERT')
  .map((dbEntry) => new Promise((resolve, reject) => {
    tx.run(
      `
        MERGE (user:User{first_id: $userId })
        ON CREATE SET user.created_at = timestamp()
        ON MATCH SET user.updated_at = timestamp()
        MERGE (audience:Audience{mediarithmics_id:  $segmentId })
        ON CREATE SET audience.created_at = timestamp()
        ON MATCH SET audience.updated_at = timestamp()
        MERGE (user)-[rel:BELONGS_TO]->(audience)
        ON CREATE SET rel.created_at = timestamp()
      `,
      dbEntry,
    )
      .then((result) => resolve(result))
      .catch((e) => {
        // eslint-disable-next-line no-console
        console.log('upsertPromiseArrayCreator error');
        reject((e as Error).message);
      });
  }));

const deletePromiseArrayCreator = (dbEntries:DbEntry[], tx:ManagedTransaction)
:Promise<QueryResult>[] => dbEntries
  .filter((dbEntry) => dbEntry.action === 'DELETE')
  .map((dbEntry) => new Promise((resolve, reject) => {
    tx.run(
      `MATCH (user:User{first_id:$userId})-[belong:BELONGS_TO]->(audience:Audience{mediarithmics_id: $segmentId})
        DELETE belong`,
      dbEntry,
    )
      .then((result) => resolve(result))
      .catch((e) => {
        // eslint-disable-next-line no-console
        console.log('deletePromiseCreator error');
        reject((e as Error).message);
      });
  }));

export const registerChunk = (dbEntries:DbEntry[])
:Promise<true> => new Promise((resolve, reject) => {
  try {
    const session:Session = driver.session();
    session.executeWrite((tx) => {
      const promises = [
        ...upsertPromiseArrayCreator(dbEntries, tx),
        ...deletePromiseArrayCreator(dbEntries, tx),
      ];
      Promise.all(promises)
        .then(() => {
          session.close();
          mediarithmicsLinesCounter.inc(dbEntries.length);
          resolve(true);
        })
        .catch((e) => {
          session.close();
          // eslint-disable-next-line no-console
          console.log('Error in registerChunk Element', e.message);
          reject((e as Error).message);
        });
    });
  } catch (e) {
    // eslint-disable-next-line no-console
    console.log('Error in registerChunk', (e as Error).message);
    reject((e as Error).message);
  }
});

I get the following uncatchable error.

node:internal/process/promises:288
            triggerUncaughtException(err, true /* fromPromise */);
            ^

Neo4jError: Ignored either because of an error or RESET

    at captureStacktrace (/app/node_modules/neo4j-driver-core/lib/result.js:611:17)
    at new Result (/app/node_modules/neo4j-driver-core/lib/result.js:105:23)
    at finishTransaction (/app/node_modules/neo4j-driver-core/lib/transaction.js:488:12)
    at Object.commit (/app/node_modules/neo4j-driver-core/lib/transaction.js:301:25)
    at Transaction.commit (/app/node_modules/neo4j-driver-core/lib/transaction.js:202:37)
    at TransactionExecutor._handleTransactionWorkSuccess (/app/node_modules/neo4j-driver-core/lib/internal/transaction-executor.js:155:16)
    at /app/node_modules/neo4j-driver-core/lib/internal/transaction-executor.js:131:42
    at process.processTicksAndRejections (node:internal/process/task_queues:95:5) {
  constructor: [Function: Neo4jError] { isRetriable: [Function (anonymous)] },
  code: 'N/A',
  retriable: false
}

Node.js v18.16.0

On my server, we get the following log

2023-04-20 14:21:47.041+0000 ERROR [bolt-1789] Terminating connection due to unexpected error
org.neo4j.bolt.protocol.error.streaming.BoltStreamingWriteException: Failed to finalize batch: Cannot write result response
        at org.neo4j.bolt.protocol.common.transaction.result.ResultHandler.onFinish(ResultHandler.java:116) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.fsm.AbstractStateMachine.after(AbstractStateMachine.java:126) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.fsm.AbstractStateMachine.process(AbstractStateMachine.java:99) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.lambda$submit$4(AtomicSchedulingConnection.java:113) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.executeJob(AtomicSchedulingConnection.java:336) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.doExecuteJobs(AtomicSchedulingConnection.java:270) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.executeJobs(AtomicSchedulingConnection.java:212) ~[neo4j-bolt-5.4.0.jar:5.4.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: io.netty.channel.unix.Errors$NativeIoException: writevAddresses(..) failed: Broken pipe

My Environment

Javascript Runtime Version: 18.16.0
Driver Version: 5.7.0
Neo4j Version and Edition: docker Neo4j:latest - 5.6 Community edition
Operating System: Debian 10 for Server, node:lts-alpine for Client (18.16.0)

Hey, any news on that? Thanks!

Hi, would it be possible to get some insight on this? Thanks!

Hi @gchicoye, do you have some logs or an minimal example?

You can enable the logs by creating the driver with:

const driver = neo4j.driver(<your url>, <your credentials>, {
  ...// other configurations,
  logging: neo4j.logging.console("debug")
})

See:
https://neo4j.com/docs/api/javascript-driver/5.9/function/index.html#static-function-driver

Hi @gchicoye, do you have some logs or an minimal example?

You can enable the logs by creating the driver with:

const driver = neo4j.driver(<your url>, <your credentials>, {
  ...// other configurations,
  logging: neo4j.logging.console("debug")
})

See: https://neo4j.com/docs/api/javascript-driver/5.9/function/index.html#static-function-driver

I get a TS error here...

Property 'logging' does not exist on type '{ driver: (url: string, authToken?: AuthToken | undefined, config?: Config | undefined) => Driver; hasReachableServer: (url: string, config?: Pick<Config, "logging"> | undefined) => Promise<...>; ... 68 more ...; notificationFilterMinimumSeverityLevel: EnumRecord<...>; }'.ts(2339)

Hi, any idea on this?

Can you try to ignore the ts definition?

const driver = neo4j.driver(<your url>, <your credentials>, {
  ...// other configurations,
  // @ts-ignore
  logging: neo4j.logging.console("debug")
})

I will investigate the ts type error.

Hey,

Done !
I've done a lot of updates on my code, now it looks like the following

const upsertDbEntry = async (dbEntry:DbEntry, txc:Transaction)
:Promise<QueryResult> => txc.run(
  `
      MERGE (user:User{first_id: $userId })
      ON CREATE SET user.created_at = timestamp()
      ON MATCH SET user.updated_at = timestamp()
      MERGE (audience:Audience{mediarithmics_id:  $segmentId })
      ON CREATE SET audience.created_at = timestamp()
      ON MATCH SET audience.updated_at = timestamp()
      MERGE (user)-[rel:BELONGS_TO]->(audience)
      ON CREATE SET rel.created_at = timestamp()
    `,
  dbEntry,
);

const deleteDbEntry = async (dbEntry:DbEntry, txc:Transaction)
:Promise<QueryResult> => txc.run(
  `MATCH (user:User{first_id:$userId})-[belong:BELONGS_TO]->(audience:Audience{mediarithmics_id: $segmentId})
      DELETE belong`,
  dbEntry,
);

export const saveChunkData = async (dbEntries:DbEntry[]):Promise<boolean> => {
  const session = driver.session();
  const txc = session.beginTransaction();
  try {
    for (let i = 0; i < dbEntries.length; i += 1) {
      const dbEntry = dbEntries[i];
      if (dbEntry.action === 'UPSERT') upsertDbEntry(dbEntry, txc);
      if (dbEntry.action === 'DELETE') deleteDbEntry(dbEntry, txc);
      mediarithmicsLinesCounter.inc();
    }
    await txc.commit();
    return true;
  } catch (error) {
    // eslint-disable-next-line no-console
    console.log(error);
    await txc.rollback();
    // eslint-disable-next-line no-console
    console.log('rolled back');
    return false;
  } finally {
    await session.close();
  }
};

export const saveChunkDataWithRace = async (dbEntries:DbEntry[]):Promise<void> => {
  const endPromise:Promise<boolean> = new Promise((resolve) => {
    setTimeout(() => {
      resolve(false);
    }, 60000);
  });
  const promises:Promise<boolean>[] = [
    saveChunkData(dbEntries),
    endPromise,
  ];
  const result = await Promise.race(promises);
  if (!result) {
    // eslint-disable-next-line no-console
    console.log(`${new Date().toString()} - Commit timeout`);
    process.exit(1);
  }
};

I get the following logs from debug (the ts-ignore trick made the job :))

1687880367930 DEBUG Connection [0][bolt-2411] C: RUN 
      MERGE (user:User{first_id: $userId })
      ON CREATE SET user.created_at = timestamp()
      ON MATCH SET user.updated_at = timestamp()
      MERGE (audience:Audience{mediarithmics_id:  $segmentId })
      ON CREATE SET audience.created_at = timestamp()
      ON MATCH SET audience.updated_at = timestamp()
      MERGE (user)-[rel:BELONGS_TO]->(audience)
      ON CREATE SET rel.created_at = timestamp()
     {"action":"UPSERT","userId":"7c5c2e7fb4cb4eba77e2daa02a4ce301","segmentId":"36595"} {}
1687880367930 DEBUG Connection [0][bolt-2411] C: PULL {"n":{"low":1000,"high":0}}
1687880367932 DEBUG Connection [0][bolt-2411] S: SUCCESS {"signature":112,"fields":[{}]}

However, I usually cannot get the commit to perform, and so nothing happens, my timeout function is executed (nothings happens for minutes otherwise) and container restarts...

You should resolve the promise returned by the tx.run. You can do by:

if (dbEntry.action === 'UPSERT') await upsertDbEntry(dbEntry, txc);
if (dbEntry.action === 'DELETE') await deleteDbEntry(dbEntry, txc);