cloudamqp/amqp-client.js

ArrayBuffer errors with large amounts of messages

fbushman opened this issue · 7 comments

Hey,

Great library so far, but I encountered an issue that I can't explain.
A bit of background; I'm sending simulated air traffic to a browser that is displaying it in Leaflet. I'm sending info about 6000 planes, an update each 10 seconds. So about 600 messages per second. After 'some time' (seemingly random, but quicker with higher volumes) I get a pair of errors out of amqp-websocket-client.js:

ERROR TypeError: First argument to DataView constructor must be an ArrayBuffer
    at new DataView (<anonymous>)
    at AMQPWebSocketClient.handleMessage (amqp-websocket-client.js:59:38)
    at WebSocket.wrapFn (zone.js:769:39)
    at _ZoneDelegate.invokeTask (zone.js:409:31)
    at core.mjs:25299:55
    at AsyncStackTaggingZoneSpec.onInvokeTask (core.mjs:25299:36)
    at _ZoneDelegate.invokeTask (zone.js:408:60)
    at Object.onInvokeTask (core.mjs:25607:33)
    at _ZoneDelegate.invokeTask (zone.js:408:60)
    at Zone.runTask (zone.js:178:47)
ERROR RangeError: offset is out of bounds
    at Uint8Array.set (<anonymous>)
    at AMQPWebSocketClient.handleMessage (amqp-websocket-client.js:58:38)
    at WebSocket.wrapFn (zone.js:769:39)
    at _ZoneDelegate.invokeTask (zone.js:409:31)
    at core.mjs:25299:55
    at AsyncStackTaggingZoneSpec.onInvokeTask (core.mjs:25299:36)
    at _ZoneDelegate.invokeTask (zone.js:408:60)
    at Object.onInvokeTask (core.mjs:25607:33)
    at _ZoneDelegate.invokeTask (zone.js:408:60)
    at Zone.runTask (zone.js:178:47)

As far as I can tell the data is intact. I have another listener on the same exchange (this one in Java) that parses the messages just fine.

Here is the code for the consumer running in the browser:

private queue_name = uuidv4()
private queue_options = { passive: false, durable: false, autoDelete: true, exclusive: true }
private exchange_name = "exchange_name"
private exchange_type = "fanout"
private exchange_options = { passive: false, durable: true, autoDelete: false, internal: false }
private routing_key = "gps"
private subscribe_options = { noAck: false, exclusive: false }

private connectRabbitMQ() {
    const amqp = new AMQPWebSocketClient("ws://localhost:15670", "/", "rabbituser", "rabbitpassword")
    amqp.connect().then(conn => {
      conn.channel().then(channel => {
        this.rabbitmq_channel = channel
        channel.exchangeDeclare(this.exchange_name, this.exchange_type, this.exchange_options)
        channel.queue(this.queue_name, this.queue_options).then(async queue => {
          await queue.bind(this.exchange_name, this.routing_key)
          await queue.subscribe(this.subscribe_options, msg => {
            if (msg.routingKey === "track") {
              let newTrack: TrackInfo = JSON.parse(msg.bodyToString() as string) as TrackInfo;

              // Some handling...

              msg.ack();
            } else if (msg.routingKey === "plot") {
              let newPlot: PlotInfo = JSON.parse(msg.bodyToString() as string) as PlotInfo;

              // Some handling...

              msg.ack();
            } else {
              console.log(msg);
              console.log(msg.bodyToString());
            }
          })
        })
      })
    })

I am using your latest version:

"@cloudamqp/amqp-client": "^2.1.1"

Am I missing a step, or is something actually going wrong?

The track routed messages are on average 63 uint8's long.
The plot routed messages are on average 102 uint8's long.

I'm running amqp-client in the browser over websockets, and I got the first of these exceptions. I wouldn't say I was receiving a lot of data at once unless the browser decided to batch a bunch of messages. The error did appear to repeat for every message until the connection was closed by a heartbeat timeout.

Screenshot - 2023-02-16 , 8_40_30 AM_ver001
Screenshot - 2023-02-16 , 8_40_39 AM_ver001

The native .NET clients had no problems with the same traffic.

I'm not very familiar with byte and buffer manipulation in JavaScript, so I'm just spit-balling:

  • How often is this.frameSize === 0 and this.framePos !== 0 in test conditions? i.e., is this code exercised much?
  • Should new DataView(this.frameBuffer) instead pass the underlying ArrayBuffer like new DataView(this.frameBuffer.buffer)? This is how it is done later in the handler, with new AMQPView(this.frameBuffer.buffer, 0, this.frameSize).
  • This fails in the browser with the same error: new DataView(new Uint8Array(32)), so if the type is wrong, I'm surprised that TypeScript isn't erroring on it.

Thank you for reporting, and your assessment is right. Also surprised that typescript doesn't react to it..

Will try to reproduce the offset is out of bounds error too

There's no test coverage for amqp-websocket-client.ts 🤔.

As for TypeScript, the DataView spec clearly only allows ArrayBuffer | SharedArrayBuffer but because TypeScript is "structurally typed" it can't catch this sort of error. Both ArrayBuffer and Uint8Array implement byteLength and slice(), so it's a match. More at microsoft/TypeScript#52815...

Also, recent versions of node implement these same buffer and data view types, so maybe move back to a common set of code for most of this logic?

I'm still having errors related to this issue and am getting "ERROR RangeError: offset is out of bounds" exceptions.

Somehow, leftOfFrame is ending up negative, causing the copyBytes variable being passed into the length parameter of the Uint8Array constructor to also be negative:

Screenshot - 2023-07-18 , 8_39_48 AM_ver001

This happens after some indeterminate number of messages. In this case it was 41,360 WebSocket messages. While the inspector shows a number of messages in the last twenty seconds, so it's hard to tell exactly which message it crashed on, it might have been the first message after 3 long messages that were part of a single message (first one starts with I< <amq.ctag). The large-size sequence of received messages were 4.10 kB+4.10 kB+2.40 kB. This is likely multiple individual AMQP messages packed together, because there are multiple amq.ctag strings within those three WebSocket messages, and then there's the 9 sent messages that look like acknowledgements. The last large WebSocket message ends with Î (0xc38e?) like every other smaller message, so it appears to be complete?

Then the next received message is a shorter 644 byte message, which might be the message being handled during this exception within handleMessage:

Screenshot - 2023-07-18 , 9_36_52 AM

In this case, I am using the default 4096 for frameMax. Elsewhere, it is mentioned frameMax is for large headers. I'm not clear on a header is vs. a message. I doubt any AMQP header itself is larger than 4096 bytes. Is it possible for there to be a direct detection and a clear Error thrown for when this parameter needs to be larger?