superstreamlabs/memphis.js

EPIPE error

Closed this issue · 1 comments

code: 'EPIPE'
    at processTimers (internal/timers.js:497:7) {
    at listOnTimeout (internal/timers.js:554:17)
    at Timeout._onTimeout (/app/node_modules/memphis-dev/index.js:232:30)
    at Socket.writeAfterFIN [as write] (net.js:456:14)
Error: This socket has been ended by the other party
}
  code: 'EPIPE'
    at processTimers (internal/timers.js:497:7) {
    at listOnTimeout (internal/timers.js:554:17)
    at Timeout._onTimeout (/app/node_modules/memphis-dev/index.js:232:30)
    at Socket.writeAfterFIN [as write] (net.js:456:14)
Error: This socket has been ended by the other party
}
  code: 'EPIPE'
    at processTimers (internal/timers.js:497:7) {
    at listOnTimeout (internal/timers.js:554:17)
    at Timeout._onTimeout (/app/node_modules/memphis-dev/index.js:232:30)
    at Socket.writeAfterFIN [as write] (net.js:456:14)
Error: This socket has been ended by the other party
}

App code -

const memphis = require('memphis-dev');

const sleep = sec => {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve()
}, sec * 1000)
})
}

(async function () {
    try {
        await memphis.connect({
            host: 'broker.sandbox.memphis.dev',
            username: 'sanity_checks',
            connectionToken: 's0B7YGdbhcBnvHLSR4Gt'
        });

        const producer = await memphis.producer({
            stationName: 'sanity_checks',
            producerName: 'bot_producer'
        });

        const consumer = await memphis.consumer({
            stationName: "sanity_checks",
            consumerName: "bot_consumer",
            consumerGroup: "bot_consumers_cg"
        });

        consumer.on("message", message => {
            console.log(message.getData().toString());
            message.ack();
        });

        consumer.on("error", error => {
            console.log(error);
        });

        while(true) {
            for (let index = 0; index < 10; index++) {
                await producer.produce({
                    message: Buffer.from(`Message #${index}: Hello world`)
                })
            };
            await sleep(300);
        }
    } catch (ex) {
        console.log(ex);
        memphis.close();
    }
})();

Thanks for that, this has been fixed and already released on 0.3.5 version