node-ts/bus

The memory-queue is inappropriately deleting messages

Roustalski opened this issue · 1 comments

Hey,

I have a branch demonstrating this issue.

The problem is that if I use busInstance.fail(), any messages sent with the bus just prior to the current message failure using an underlying memory queue get deleted. If you uncomment out the line to initialize the bus instance with a transport of rabbitMq, that transport correctly handles the message sent just prior to the current message failure.

Here is some sample output with the memory queue:

...
  @node-ts/bus-core:memory-queue Deleting message { queueDepth: 2, messageIndex: 0 } +0ms
  @node-ts/bus-core:memory-queue Message Deleted { queueDepth: 1 } +0ms
  @node-ts/bus-core:memory-queue Reading next message { depth: 1, numberMessagesVisible: 1 } +0ms
  @node-ts/bus-core:service-bus Message read from transport {
  message: {
    id: undefined,
    domainMessage: _EmailMaintenanceTeam {
      message: 'A siren has failed its test and requires maintenance',
      sirenId: '5de2e032-55e4-4d54-b3a0-2deb0c16793f',
      '$name': 'bus-starter/email-maintenance-team',
      '$version': 0
    },
    attributes: {
      correlationId: '65b1b40c-ec69-46b9-adcc-b30d6afe5248',
      attributes: {},
      stickyAttributes: [Object]
    },
    raw: { seenCount: 0, payload: [_EmailMaintenanceTeam], inFlight: true }
  }
} +1ms
  @node-ts/bus-core:handler-registry Getting handlers for message {
  msg: _EmailMaintenanceTeam {
    message: 'A siren has failed its test and requires maintenance',
    sirenId: '5de2e032-55e4-4d54-b3a0-2deb0c16793f',
    '$name': 'bus-starter/email-maintenance-team',
    '$version': 0
  }
} +4ms
  @node-ts/bus-core:handler-registry Found handlers for message {
  msg: _EmailMaintenanceTeam {
    message: 'A siren has failed its test and requires maintenance',
    sirenId: '5de2e032-55e4-4d54-b3a0-2deb0c16793f',
    '$name': 'bus-starter/email-maintenance-team',
    '$version': 0
  },
  numMessageHandlers: 1,
  numCustomHandlers: 0
} +0ms
  @node-ts/bus-core:service-bus Publishing event {
  event: _DummyError {
    message: '{}',
    '$name': 'bus-starter/dummy-error',
    '$version': 0
  },
  messageAttributes: {}
} +0ms
  @node-ts/bus-core:service-bus Prepared transport options {
  messageAttributes: {
    correlationId: '65b1b40c-ec69-46b9-adcc-b30d6afe5248',
    attributes: {},
    stickyAttributes: { workflowId: '9e929743-caea-4944-85fe-2eed6d2d448f' }
  }
} +0ms
  @node-ts/bus-core:memory-queue Added message to queue {
  message: _DummyError {
    message: '{}',
    '$name': 'bus-starter/dummy-error',
    '$version': 0
  },
  queueSize: 2
} +1ms
  @node-ts/bus-core:service-bus Failing message {
  message: {
    id: undefined,
    domainMessage: _EmailMaintenanceTeam {
      message: 'A siren has failed its test and requires maintenance',
      sirenId: '5de2e032-55e4-4d54-b3a0-2deb0c16793f',
      '$name': 'bus-starter/email-maintenance-team',
      '$version': 0
    },
    attributes: {
      correlationId: '65b1b40c-ec69-46b9-adcc-b30d6afe5248',
      attributes: {},
      stickyAttributes: [Object]
    },
    raw: { seenCount: 0, payload: [_EmailMaintenanceTeam], inFlight: true }
  }
} +0ms
  @node-ts/bus-core:memory-queue Deleting message { queueDepth: 2, messageIndex: 0 } +0ms
  @node-ts/bus-core:memory-queue Message Deleted { queueDepth: 1 } +0ms
  @node-ts/bus-core:service-bus Message dispatched to all handlers {
  message: _EmailMaintenanceTeam {
    message: 'A siren has failed its test and requires maintenance',
    sirenId: '5de2e032-55e4-4d54-b3a0-2deb0c16793f',
    '$name': 'bus-starter/email-maintenance-team',
    '$version': 0
  },
  numHandlers: 1
} +0ms
  @node-ts/bus-core:memory-queue Deleting message { queueDepth: 1, messageIndex: -1 } +0ms
  @node-ts/bus-core:memory-queue Message Deleted { queueDepth: 0 } +0ms
  @node-ts/bus-core:memory-queue Reading next message { depth: 0, numberMessagesVisible: 0 } +0ms
  @node-ts/bus-core:memory-queue No messages available in queue +0ms

Thanks for finding & repro'ing this @Roustalski

This should be resolved in @node-ts/bus-core@1.0.9