Rethink Jobqueue service provides rest-api to create rethinkdb job queue. its use seneca-web and seneca plugin. We can create job queue with full options like (connction, queue options, and job options).
Rethink-Jobqueue-Service provides rest-api for register worker process. so we can register job type based each worker process.
Rethink-Jobqueue-Service provides Symmetric Worker. Using Symmetric worker we can process our job dynamically. We have to just register our job type. we can register and un-register job type using rest-api. Symmetric worker dynamically process each job based on job type and register job process.
Make sure you have installed all of the following prerequisites on your development machine:
-
Git - Download & Install Git. OSX and Linux machines typically have this already installed.
-
Node.js - Download & Install Node.js and the npm package manager. If you encounter any problems, you can also use this GitHub Gist to install Node.js.
-
RethinkDB - Download & Install RethinkDB, and make sure it's running.
The recommended way to get application is to use git to directly clone the Rethink Job queue Service repository:
$ git clone git@github.com:FlowzPlatform/rethink-jobqueue-service.git
This will clone the latest version of the Rethink Job queue Service repository to the local folder.
Another way to use the Rethink Job queue Service is to download a zip copy from the master branch on GitHub. You can also do this using the wget
command:
$ wget https://github.com/FlowzPlatform/rethink-jobqueue-service/archive/master.zip -O rethink-jobqueue-service-master.zip; unzip rethink-jobqueue-service-master.zip; rm rethink-jobqueue-service-master.zip
First go to server folder. and configure config file based on environment. We can configure default rethink database connection, default job queue options, and job options. we can also configure service port.
After configuration Run below command
$ npm install
its installed dependent npm packages like seneca, seneca-web, seneca-mesh, rethinkdb-job-queue, rethinkdb, socket-io, and express
now you can run below command
$ node job-web.js
$ node worker-web.js
$ node worker-need.js
$ node symmetricWorker.js
Your Job creation api run on port 4004 with the development environment configuration, so in your browser navigate to http://localhost:4004
Your Job process registartion api run on port 4005 with the development environment configuration, so in your browser navigate to http://localhost:4005
And your Symmetric Worker run on port 4006.
That's it! Your application should be running. To proceed with your development, check the other sections in this documentation.
Create job api : http://localhost:4004/job/create
Job data will be pass through POST method and content-type should be application/json you can create job using different options
- First option Only job data pass
{
"to":"abcd@yourdomain.com",
"from":"info@yourdomain.com",
"subject":"this is test mail",
"body":"this is message body"
}
- Second option With rethinkdb connction, job type and job data options
{
"connction" : {
"host": "localhost",
"port": 28015,
"db": "jobqueue"
},
"queue" : {
"name": "registartion"
},
"to":"abcd@yourdomain.com",
"from":"info@yourdomain.com",
"subject":"this is test mail",
"body":"this is message body"
}
- Third option - multiple job With rethinkdb connction, job type and job data with each job options
{
"connction" : {
"host":"localhost",
"port": 28015,
"db": "jobqueue"
},
"queue" : {
"name": "registartion"
},
"options" : {
"priority": "normal",
"timeout": 499999,
"retrymax": 1,
"retrydelay": 500000
},
"jobs" : [
{
"subject":"this is test mail-1",
"options" : {
"priority": "high",
"timeout": 700000,
"retrymax": 5,
"retrydelay": 100000
}
},
{
"subject":"this is test mail-2",
"options" : {
"priority": "highest",
"timeout": 700000,
"retrymax": 4,
"retrydelay": 100000,
"name" :"Password-Update-Mail"
}
},
{
"subject":"this is test mail-3",
"options" : {
"priority": "medium",
"timeout": 700000,
"retrymax": 3,
"retrydelay": 100000
}
}
]
}
Find job api : http://localhost:4004/job/find
Find data will be pass through POST method and content-type should be application/json you can find job using different options
- First option Only job data pass
"find": {
"status":"waiting"
}
- Second option With rethinkdb connction, job type and find data
{
"connction" : {
"host": "localhost",
"port": 28015,
"db": "jobqueue"
},
"queue" : {
"name": "registartion"
},
"find": {
"data":{"subject":"this is test mail"}
}
}
Register job type process api : http://localhost:4005/upload-worker-process
Data will be pass through POST method and content-type should be multipart/form-data
In job process code you have to set resolve and reject as response for next job process in job queue
jobtype = RegistrationEmail
jobprocess = {
const nodemailer = require('nodemailer')
let transporter = nodemailer.createTransport({
service: 'SMTP',
host: 'smtp.gmail.com',
port: 465,
secure: true,
auth: {
user: 'your@gmail.com',
pass: 'your-password'
}
})
// setup email data with unicode symbols
let mailOptions = {
from: job.data.from, // sender address
to: job.data.to, // list of receivers
cc: job.data.cc, // list of receivers
subject: job.data.subject, // Subject line
text: job.data.body, // plain text body
html: job.data.body // html body
}
// send mail with defined transport object
transporter.sendMail(mailOptions, (error, info) => {
console.log(info)
if (error) {
reject(error)
}
resolve('Message ' +info.messageId +' sent: '+ info.response)
})
}
register job type api : http://localhost:4006/register-jobtype/RegistrationEmail
job-type name will be pass through PUT method
register job type api : http://localhost:4006/register-jobtype/RegistrationEmail
job-type name will be pass through delete method
Symmetric Worker reads Job summary based on job type option.
job summary as below:
{ waiting: 1,
active: 0,
completed: 0,
cancelled: 0,
failed: 0,
terminated: 0,
total: 1 }
Based on job waiting count, its calculate waiting ratio. if wating ratio greater then our threshold, its emit the need worker event. so execute-worker will execute job process.
- create job-queue using seneca-mesh
Using seneca-mesh we can create job. so on server side we do not need to create any rest-api request. using seneca act we can generate job.
here is example of using seneca-mesh create job queue.
$ node server.js
below code write in job-mesh.js file and run it.
let seneca = require('seneca')
let pluginPin = 'role:job,cmd:create'
let bodyData = {
"to":"abcd@yourdomain.com",
"from":"info@yourdomain.com",
"subject":"this is test mail",
"body":"this is message body"
}
seneca()
.use('mesh')
.ready(function () {
let _senecaObj = this
_senecaObj.act(pluginPin, {msg: bodyData}, (err, done) => {
try {
if (err) {
console.log('Error:', err)
//throw err;
}
console.log('Response:', done)
} catch (e) {
console.log('Error:', e)
}
_senecaObj.close((err) => {
if (err) {
console.log('Close Error:', e)
} else {
console.log('seneca closed.')
}
})
})
})
After creation of job-mesh.js file run below command
$ node job-mesh.js
Under the MIT license. See LICENSE file for more details.