camunda/camunda-8-js-sdk

404 Error when querying object immediately

hassan-alnator opened this issue · 6 comments

SDK Component

Using the operate API Client with the following code :

const instance = await zbc.createProcessInstance({
    bpmnProcessId: `c8-sdk-demo`,
    variables: {
        humanTaskStatus: 'Needs doing',
    },
})

console.log(`[Zeebe] Finished Process Instance ${instance.processInstanceKey}`)

const flow = await operate.getProcessInstanceSequenceFlows(res.processInstanceKey);
// 404 Error

const instance = await operate.getProcessInstance(res.processInstanceKey);
// 404 Error

Expected Behavior

Access to data related to the process while creating these objects on the fly returns 404 as it takes time to make these objects available which is confusing and results in an unpredictable application behavior as well as making it hard to unit test apps built with the SDK.

Current Behavior

Error for the Sequence Flow

Screenshot 2024-04-30 at 9 02 14 PM

Result of opening the same link in the browser immediately after the error:

Screenshot 2024-04-30 at 9 02 28 PM

Error for the Process Instance

proccess-error

Result of opening the same link in the browser immediately after the error:

broweser-response

Possible Solution

I think the main issue is related to how elastic search works and that it needs time to index the data unless its manually refreshed.

Option 1:

I suggest we introduce a method that allows the developer to refresh the index effected when needed (as optional considering the performance impact on ELK )or await for data to come back knowing that the create function returned 200 OK

Option 2:

another option is adding an option to get data as a streams and check the result in an interval and return when ready then kill the interval or something like function generators like Observables to subscribe to changes.

Option 3 (after browser support is resolved - ISSUE #79):

to elevate the experience of using these apis while building front-ends is using EventSource to get changes when its available

Steps to Reproduce

  1. create instance
  2. query instance
  3. query sequence flow.

Context (Environment)

I am building a wizard application on my self-hosted environment that has each step represented as a userTask to be able to evaluate data and decide on the next step using DMNs and proper logic which requires real-time access to data and process variables to drive the UI.

Screenshot 2024-05-06 at 8 11 12 AM

Yes, this is a "known characteristic" of Camunda 8's distributed, eventually-consistent architecture.

Some great thoughts and suggested approaches.

I like Option 2 for the SDK.

Option 1 involves changes to the core system, which we can't do here.

Option 3 is from a hypothetical future.

Option 2 looks good - it's a realistic ergonomic to address this scenario. In my integration tests, I use awaited timeouts to introduce delays - and I have to tune them sometimes depending on load and location. Having a subscribeable callback would be more ergonomic.

Cleaning up resources and dealing with timeouts are two aspects that I can see.

What do you think about an API like this:

const instance = await zbc.createProcessInstance({
    bpmnProcessId: `c8-sdk-demo`,
    variables: {
        humanTaskStatus: 'Needs doing',
    },
})

console.log(`[Zeebe] Finished Process Instance ${instance.processInstanceKey}`)

const flow = await camunda8.getProcessInstanceSequenceFlows({
  processInstanceKey: res.processInstanceKey,
  timeout: 10000,
  pollInterval: 500
}).catch(() => console.log(`Didn't get it within ten seconds`)

pollInterval is optional, with some sensible default.

Under the hood, the call is executed every pollInterval until we get back a result, or we throw the 404 if we hit the timeout.

Users can "subscribe" to the Promise outcome with a callback function in then, or await it for synchronous looking code.

What are your thoughts on that?

I like that I think it will enhance the experience a lot , so basically something like this :

import  { HTTPError } from 'got';

..... class code ....

public async getProcessInstance(
	processInstanceKey: number | string
): Promise < ProcessInstance > {
	const headers = await this.getHeaders();
	const rest = await this.rest;
	// can also be used later for a benchmarking utility
	const requestStartTime = Date.now();

	while(true) {
		try {
			const response = rest(`process-instances/${processInstanceKey}`, {
				headers,
				parseJson: (text) => losslessParse(text, ProcessInstance),
			});

			return response.json();
		} catch (error) {
			if (error instanceof HTTPError && error.response.statusCode === 404) {
				// If we receive a 404, it means the process instance key is not found or not available yet
				// We'll retry until we reach the timeout
				if (Date.now() - requestStartTime >= timeout) {
					throw new Error('Timeout reached while waiting for process instance sequence flows');
				}
			} else {
				// Handle other types of errors
				console.error('Error fetching process instance sequence flows:', error);
				throw error;
			}
		}

		// Wait for the specified poll interval before retrying
		await new Promise(resolve => setTimeout(resolve, pollInterval));
	}
};

the other concept I had will change the way the sdk is used so maybe its not a good idea for now

// Define a function generator that fetches data at regular intervals
async function* dataGenerator(interval: number): AsyncGenerator<any, void, unknown> {
    while (true) {
        try {
            const data = await fetchData();
            if (data) { // or 404
                // If data is available, yield it
                yield data;
                // Break the loop if data is available
                break;
            }
        } catch (error) {
            console.error('Error fetching data:', error);
        }
        // Wait for the specified interval before trying again
        await new Promise(resolve => setTimeout(resolve, interval));
    }
}

// Usage example
(async () => {
    const generator = dataGenerator(5000); // Fetch data every 5 seconds
    for await (const data of generator) {
        console.log('Data:', data);
        // Use the fetched data here
    }
})();

A related issue are the search API calls - on Operate and Tasklist you can search for processes.

const processes = await operate
  .searchProcessInstances({
    filter: {
      processDefinitionKey,
      state: 'ACTIVE',
    },
})

That is not going to return a 404 while waiting for eventual consistency - it is going to return a 200 with zero results.

At some point it will return a 200 with one or more results.

So these APIs could benefit from a generator API.

What do you think about a search subscription that is an event emitter that wraps the dataGenerator and emits data event?

That might be a more familiar API for most developers. So something like:

const processSubscription = operate
  .subscribeProcessInstances({
    filter: {
      processDefinitionKey,
      state: 'ACTIVE',
    },
})

processSubscription.on('data', process => {
  console.log(process)
  processSubscription.close() 
  // effectively processSubscription.once() with cleanup
})

@jwulf if we have fix multiple issues at once I think it would be great , the only thing I see here is that what if the search is for something that dose not exist , the subscription needs to have a timeout , so maybe we should merge both solution in one and add it to all our apis.

const processSubscription = operate
  .subscribeProcessInstances({
    timeout: 10000, // Optiona With default value of 5000
    pollInterval: 500 // Optional with default value of 300
    filter: {
      processDefinitionKey,
      state: 'ACTIVE',
    },
  })

processSubscription.on('data', process => {
  console.log(process)
  processSubscription.close()
  // effectively processSubscription.once() with cleanup
})

Should the timeout default to something or should it default to infinite?

Think about this use case: refreshing tasks from tasklist as they appear, adding them to a tasklist in a frontend. That is a subscription that you would want to be persistent.

When an timeout for the subscription is specified, it should emit an event when it times out - and then maybe only if no data was received.

The timeout needs to have default to value just to avoid any memory leaks as this has a filter "its looking for something" the default can be something a little high like 50000 MS and the interval will keep refreshing when new data received.

in the use case mentioned (No process or task based filters ) I would design it around ServerSentEvents or WebSockets , something like fetch-event-source as it will let the browser handle the logic of having an interval on the front-end and on the backend its exactly like emit event.

operate
  .subscribeProcessInstances({
    filter: {
      processDefinitionKey,
      state: 'ACTIVE',
    },
    async onopen(response) {
       
    },
    onmessage(msg) {
      
    },
    onclose() {
      
    },
    onerror(err) {
       
    }
});

we can add events to manage the full life cycle of the subscription like the methods above , in the case of any ID based filter the timeout will trigger the on error callback any state or assignee like filters will keep going.