/queue-schedule

Primary LanguageJavaScriptMIT LicenseMIT

Queue Shedule

NPM version Build status

Kafka is a high avaliable message queue, but it lacks of consuming message with a slow speed. Some of task with no need to finish it at none, and we want to complete it with a small cost. This is just the reason why we develop Queue Shedule.

Install

npm install queue-schedule

How to use

A basic example is showed as follows:

const kafka = require('kafka-node');
const {expect} = require('chai');
const {KafkaProducer,KafkaConsumer} = require('queue-schedule');
const KAFKA_HOST = process.env.KAFKA_PEERS;
const FIST_DATA = {a:1,b:2};
const SCHEDULE_NAME1 = 'schedule1';
const TOPIC_NAME1 = 'topic.1';
const PARTITION1 = 0;

let hasDone = false;
new KafkaConsumer({
    name: 'kafka',
    topics:[TOPIC_NAME1],
    consumerOption: {
        kafkaHost: KAFKA_HOST,
        fromOffset: 'earliest',
        fetchMaxBytes: 1024*1024,
    },
    doTask:function(messages,callback) {console.log(messages);
        if (!hasDone) {
            const value = messages[0].value;
            let data = null;
            try {
                data = JSON.parse(value);
            } catch (e) {
                hasDone = true;
                console.error('parse message error',e);
                return;
            }
            expect(data).to.have.property('a').and.equal(1);
            console.log('recieve data',data);
            hasDone = true;
        }
        callback();
    },
    readCount : 1,
    pauseTime : 500,
    idleCheckInter: 10 * 1000
}).on(KafkaConsumer.EVENT_CONSUMER_ERROR,function(err) {
    console.error('consumer error',err);
    hasDone = true;
    
});


new KafkaProducer({
    name : SCHEDULE_NAME1,
    topic: TOPIC_NAME1,
    kafkarHost:KAFKA_HOST
}).addData(FIST_DATA,{},function(err) {
    if (err) {
        console.error('write to queue error',err);
        return ;
    }
    console.info('write to kafka finished');
});

API

For detail usage, see the document online here.

License

MIT