How do I get the processed message (by worker) in express
antick opened this issue · 3 comments
Hi,
This is a much better library than other available amqplb libraries out there.
Although, I am not able to figure out one thing with this.
I have the following code in my node express route:
module.exports.testing = controller(async (req, res) => {
const broker = await Broker.create(config);
broker.on('error', err => {
console.error(err);
});
const publication = await broker.publish('demo_pub', params.text);
publication.on('error', err => {
console.error(err);
});
publication.on('success', taskId => {
res.send({
taskId
});
});
});
I'm running a worker on yarn start with this code:
const Broker = require('rascal').BrokerAsPromised;
const config = require('../api-1/config/queue');
(async () => {
const broker = await Broker.create(config);
broker.on('error', err => {
console.error(err);
});
const subscription = await broker.subscribe('demo_sub');
subscription.on('message', async (message, content, ackOrNack) => {
const processed = await doingSomethingWithContent(content);
console.log(processed);
ackOrNack();
}).on('error', console.error);
})();
Everything works fine so far. I'm able to get the processed message in this worker and it's printed in the terminal.
Now I want to create an endpoint in node express to fetch the above processed output based on the taskId/messageId that I received while I published it. I'm not sure how can I do this. I have tried this but it's not working:
module.exports.worker1 = controller(async (req, res) => {
console.log(req.query.taskId); // not sure where to use this messageId/taskId
const broker = await Broker.create(config);
broker.on('error', err => {
console.error(err);
});
const subscription = await broker.subscribe('demo_sub');
console.log('code wont run after this');
subscription.on('message', async (message, content, ackOrNack) => {
console.log('this never executes while hitting this endpoint')
// ackOrNack();
res.send({
message
});
}).on('error', console.error);
});
Please can someone help me with this?
Hi @antick, Sorry missed this. Will respond shortly.
Hi @antick,
After going through your issue, I understand you want to do the following...
Stage 1
- Receive an HTTP request
- Publish a message
- Respond to the HTTP request with a task id associated with the message
Stage 2 (separate worker)
- Receive the message
- Process the message
Stage 3
- Receive an HTTP request, specifying the task id
- Fetch the output of the processed message from stage 2
- Respond to the HTTP request with this output
If my understanding is correct, you need to use a database to store the output of the message in stage 2, and retrieve this from the database in stage 3. You cannot retrieve messages from a queue by id.
Also you should only need to call Broker.create once per application, rather than from express middleware.
This makes sense. Thanks a lot.