zettajs/zetta

EventEmitter Memory Leak in Event Broker

Closed this issue · 4 comments

Issue

For every subscription created over the same socket, a "close" event handler is added to the same socket instance. After 11 subscriptions, this triggers Node's warning of a possible memory leak.

Source

lib/event_broker.js
https://github.com/zettajs/zetta/blob/master/lib/event_broker.js#L125

client.once('close', function() {
  unsubscribe();
});

Steps to Reproduce:

  1. Run an instance of zetta
  2. Connect to /events
  3. Create 11 subscriptions over the same socket
var zetta = require('zetta');
zetta()
 .name('test')
 .listen(3000);
> wscat -c "http://localhost:3000/events"

> {"type":"subscribe", "topic":"**"}
< {"type":"subscribe-ack","timestamp":1498851155023,"topic":"**","subscriptionId":1}

.....


> {"type":"subscribe", "topic":"**"}
< {"type":"subscribe-ack","timestamp":1498851172264,"topic":"**","subscriptionId":11}

(node:7108) Warning: Possible EventEmitter memory leak detected. 11 close listeners added. Use emitter.setMaxListeners() to increase limit

It looks like you might be able to just remove the block of code at line 125. There is already a "close" event handler that runs through all of the saved unsubscribe functions.

Welp, that was wrong.

https://github.com/zettajs/zetta/blob/master/lib/event_broker.js#L322

  // Unsubscribe to all subscriptions if the client disconnects
  client.on('close', function() {
    Object.keys(unsubscriptions).forEach(function(subscriptionId) {
      unsubscriptions[subscriptionId].forEach(function(unsubscribe) {
        unsubscribe();
      });
      delete unsubscriptions[subscriptionId];
    })
  });

@kevinswiber or @AdamMagaluk Any insights? I'm not sure what's going on in this unit test that causes a failure (https://travis-ci.org/zettajs/zetta/jobs/248924395). From what I can tell, the created unsubscribe function is called as part of the unsubscriptions collection on "close" and "unsubscribe" events.

I don't understand what the additional "close" handler added on each subscription is accomplishing (and why the unit test fails when it's removed).

I've got it. What I was missing was the difference between streamEnabled clients and those that are not streamEnabled. PR on its way.

Having this fixed is important because we've got a custom Node Red node for creating topic subscriptions. Underneath it is a single, long-lived socket connection over which subscriptions are managed. New nodes, simple UI changes of existing nodes, updating topics of existing nodes, etc. will ALL cause subscriptions over the socket.

Eventually the number of "close" event listeners added to the client would get out of control.