node-ts/bus

Question: are new requests not handled while a process is running?

haayman-imagem opened this issue · 7 comments

I'm sorry I'm misusing the Issues here to ask questions, but I don't know other way.

I've got an order-system running and I'm using workflows to handle the orders: for every row in the order I start a new workflow that takes care of generating the requested product.

Now I have a product that takes a really long time to generate and even though this generation is done in a separate workflow that constists of 2 steps I notice that while these steps are running (async) none of the new orders get processed. For every order the first thing I do is send a confirmation mail, but this is only done after the long process finished.

I don't think this has anything to do with the event-loop, since everything is done async

So my question is: is it true that none of the messages are processed while one of the @Handles methods is running? Is there some way circumvent this?

in short to give you an idea of the general setup.

order-workflow.ts

   const sendEmailMessage = new SendEmail(order.customer, "received", $t("Order received"), {
      order,
    });
   await this.bus.send(sendEmailMessage);

    for (const row of rows) {
        const message = new StartNewLongRunningJob(order, row);
        await this.bus.send(message);
      }

row-workflow.ts

  @StartedBy<StartNewLongRunningJob, RowWorkflowData, "handlesStartRow">(StartNewLongRunningJob)
  async handlesStartRow({ order, row }: StartNewLongRunningJob): Promise<Partial<RowWorkflowData>> {
    const workflowData: Partial<RowWorkflowData> = {
      orderRowId: row.id,
      row,
      stepOneDone: false
    };

    const message = new startStepOne(row.id);
    this.bus.send(message);

    return workflowData;
  }

  @Handles<startStepOne, RowWorkflowData, "stepOne">(startStepOne, (event)=>event.orderRowId, "orderRowId")
  async startStepOne({orderRowId}: startStepOne, workflowData: RowWorkflowData) {
     // do stuff
     const message = new startStepTwo(orderRowId);
    this.bus.send(message);

    return {stepOneDone:true}
  }

@Handles<startStepTwo, RowWorkflowData, "stepOne">(startStepOne, (event)=>event.orderRowId, "orderRowId")
  async startStepTwo({orderRowId}: startStepOne, workflowData: RowWorkflowData) {
     // do stuff
    return this.complete()
  }

hey @haayman-imagem , let me repeat your question to check I'm understanding correctly.

You run an app which has 2 workflows and they both use the same (in-memory?) queue. When you receive an order it triggers the first workflow with message (A). This sends a number of StartNewLongRunningJob commands (B). These are received by another workflow that sends a step1 message (C) and then step 2 (D).

The question is, whilst this process is happening and another message (A) arrives, it doesn't get processed until messages (B, C, D) have completed?

Visualising this, is your queue:

[A] - order workflow triggered
[B, B, B] - row workflows triggered
[B, B, C] -> [B, C, C] -> [C, C, C] - row workflows sent the first message
[C, C, D] -> [C, D, D] -> [D, D, D] - row workflows completed step 1
...etc

If you send another A message whilst this process is happening, it will go to the back of the queue and be processed once everything in front of it is complete:

[D, D, D, A] -> [D, D, A] -> [D, A] -> [A] -> [B, B, B]

This is expected because queues are generally first-in-first-out (FIFO) in terms of their processing order.

If you want to have A processed immediately without being affected by the processing of B, C, D; I suggest you create a separate service and move the OrderWorkflow into that. This way it'll have its own queue that only processes messages of A, and messages B, C, D will be processed by the original queue service in due time.

Hi,

What I kind of expected that whenever a new messages arrives a new Handler is started immediately. What I'm seeing now that the message-queue isn't read while any of the handlers is running. I don't understand why. Why wait? Why couldn't we just fetch the next message immediately?

By default, the message handlers will run with a concurrency of 1.

You can change this to whatever number is sensible for your application by setting it in your bus configuration:

// Handle messages 10 at a time
bind(BUS_SYMBOLS.BusConfiguration).toConstantValue(new DefaultBusConfiguration(10))

I'm getting concurrency problems just like #134
My messageLookup in the @Handles is usually something like
(event) => event.orderId,
or
(event) => event.orderRowId,

Do I understand correctly from #134, that I should make my id more unique? Should it incorporate some concurrency id?

hey @haayman-imagem

the lookup that you use on a message should always be able to uniquely identify a single workflow instance. If your orderId or orderRowId lookup can return more than one instance, and if that's undesirable, then you'll need to find a unique key

What is meant by 'single workflow instance'? If I have set concurrency to 10, will there be 10 instances? So then I should know which instance a workflow is running in

Each time your @StartedBy handler of your workflow gets hit - that starts a new workflow instance that has its own workflow state data. So if it's been hit 100 times you'd have 100 workflow instances.

If you're using the postsgres persistence, you can see the workflow state data for each of these instances in the underlying workflow table (the name of the table depends on the name of your workflow). When the bus receives a message that's handled by a workflow, it looks up the workflow state data in this table by eg:

select * from workflow.your-workflow-name where data->>'orderId' = 123 and data->>'$status' = 'running'

If that query returns more than 1 result then you'll know that your orderId isn't unique and might need to use a better key.