Not work on express app
tinytortoise-dev opened this issue · 4 comments
I'm working on creating a chat app in which node.js server counts up the number of characters of all messages. For instance, 100 people sending messages each of them having the length of 10 means that total character count is 1000(100 * 10). And if a person sends a message with the length of 10 after that, total character count should be 1010(1000 + 10).
All messages are sent to node.js server, counted its length and saved on DB.
A big problem occurred when a lot of people send messages at the same time.
When I send 100 messages each of them having the length of 10 (like the example above) AT THE SAME TIME by jMeter, total character count was 120. It is supposed to be exact 1000!
Here's my code(node version 8.13.0):
'use strict'
const express = require('express');
const router = express.Router();
const co = require('co');
const models = require('models');
const Mutex = require('async-mutex').Mutex;
const CharacterCounter = models.CharacterCounter;
const mutex = new Mutex();
router.get('/message', function (req, res, next) {
co(function* () {
const originalText = req.body.message;
const release = yield mutex.acquire();
let addedCount = 0;
try {
const { currentCount } = yield CharacterCounter.findOne(); // get total character count so far
addedCount = currentCount + originalText.length;
yield CharacterCounter.updateTotalCharacterCount(addedCount); // save on DB
} catch (e) {
console.log(e);
} finally {
release();
}
// ... my code is keep going
It seems that the cause of this problem is in this mutex library. But I don't know the exact solution. Does anyone have an idea to achieve my goal? I've only started working on node.js recently, so a plain explanation would be appreciated.
I am referring to the original code sample you posted.
router.get('/message', function (req, res, next) {
co(function* () {
const originalText = req.body.message;
const mutex = new Mutex();
const release = yield mutex.acquire();
let addedCount = 0;
try {
const { currentCount } = yield CharacterCounter.findOne(); // get total character count so far
addedCount = currentCount + originalText.length;
CharacterCounter.updateTotalCharacterCount(addedCount); // save on DB
} catch (e) {
console.log(e);
} finally {
release();
}
// ... my code is keep going
You're using the mutex wrong. You are creating a new mutex each time you access the DB, so concurrent requests will access the db concurrently, as the individual mutexes know nothing about each other. In addition, you do not yield when updating.
Your edited code above should work fine. If you still have problem and suspect async-mutex
to be the issue, please add console.log
messages at the beginning and end of the try block to see if this is really a concurrency issue. You can also use a canary boolean variable to check if the critical section is already executing.
@DirtyHairy Thank you for commenting! I eventually run this code below and sent 20 concurrent requests.
Each one has message content and its length is 10, so the expected result is 200(20 * 10). But I got the result of 160...
Looking console, I noticed the initial mutex object appeared two times, which is strange because mutex object should be a singleton.
Code:
'use strict'
const express = require('express');
const router = express.Router();
const co = require('co');
const models = require('models');
const Mutex = require('async-mutex').Mutex;
const CharacterCounter = models.CharacterCounter;
const mutex = new Mutex();
router.get('/message', function (req, res, next) {
co(function* () {
const originalText = req.body.message;
console.log('BEFORE STARTING LOCK\n', mutex);
const release = yield mutex.acquire();
console.log('START LOCK\n', mutex);
let addedCount = 0;
try {
const { currentCount } = yield CharacterCounter.findOne(); // get total character count so far
addedCount = currentCount + originalText.length;
yield CharacterCounter.updateTotalCharacterCount(addedCount); // save on DB
} catch (e) {
console.log(e);
} finally {
console.log('BEFORE FINISHING LOCK\n', mutex);
release();
console.log('FINISH LOCK\n', mutex);
}
// ... my code is keep going
Console:
// 1st request arrived
BEFORE STARTING LOCK
Mutex {
_semaphore: Semaphore { _maxConcurrency: 1, _queue: [], _value: 1 } } // this object is an initial object and should appear only once, right?
START LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 0,
_currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 10 } }, { upsert: true, new: false, remove: false,
fields: {} })
// arrived 2nd request but have to wait because 1st request is still processing
BEFORE FINISHING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 0,
_currentReleaser: [Function] } }
// 1st request is processed and mutex is unlocked
FINISH LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 1,
_currentReleaser: [Function] } }
// 3rd request arrived
BEFORE STARTING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 1,
_currentReleaser: [Function] } }
// started lock for 2nd request
START LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 0,
_currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
// 4th request arrived
BEFORE STARTING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 0,
_currentReleaser: [Function] } }
// 5th request arrived
BEFORE STARTING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function] ],
_value: 0,
_currentReleaser: [Function] } }
// 2nd request is being processed
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 20 } }, { upsert: true, new: false, remove: false,
fields: {} })
// 6th request arrived and 2nd request is finished
BEFORE STARTING LOCK
BEFORE FINISHING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
Mutex {
_semaphore: Semaphore { _maxConcurrency: 1, _queue: [], _value: 1 } } // it seems a new mutex instance is created? I don't think this object is supposed to be here...
// (...)
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 90 } }, { upsert: true, new: false, remove: false,
fields: {} })
BEFORE FINISHING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 0,
_currentReleaser: [Function] } }
FINISH LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 1,
_currentReleaser: [Function] } }
BEFORE STARTING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 0,
_currentReleaser: [Function] } }
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 90 } }, { upsert: true, new: false, remove: false,
fields: {} })
BEFORE STARTING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function] ],
_value: 0,
_currentReleaser: [Function] } }
BEFORE STARTING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
BEFORE FINISHING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
FINISH LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
START LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 100 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE STARTING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
BEFORE FINISHING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
FINISH LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
START LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
BEFORE STARTING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 110 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE STARTING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
BEFORE STARTING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function], [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
BEFORE FINISHING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function], [Function], [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
FINISH LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function], [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
START LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function], [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 120 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE FINISHING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function], [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
FINISH LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
START LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 130 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE FINISHING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
FINISH LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
START LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 140 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE FINISHING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function], [Function] ],
_value: 0,
_currentReleaser: [Function] } }
FINISH LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function] ],
_value: 0,
_currentReleaser: [Function] } }
START LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function] ],
_value: 0,
_currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 150 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE FINISHING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [ [Function] ],
_value: 0,
_currentReleaser: [Function] } }
FINISH LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 0,
_currentReleaser: [Function] } }
START LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 0,
_currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 160 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE FINISHING LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 0,
_currentReleaser: [Function] } }
FINISH LOCK
Mutex {
_semaphore:
Semaphore {
_maxConcurrency: 1,
_queue: [],
_value: 1,
_currentReleaser: [Function] } }
When I do the same thing with 10 concurrent requests, the result is always correct.(I've got exact the result of 100.)
And the larger the number of concurrent requests become, the higher the more initial mutex objects appears on console.
Sending 100 concurrent requests gave me 4 or 5 initial mutex objects on console and total character count was 460.
p.s.
In addition, you do not yield when updating.
Could you explain more about this comment, please? I think if I don't yield when updating, another process can get the resource value that should be after updated but not yet.
I've only started working on programming world recently, so if I'm saying stupid things and take your much time, I apoligise.
Mmh, I don't know why your program does not work as intended, but the mutex works fine --- I am pretty sure that the error is elsewhere. Observe that there is never a "START LOCK" before the previous cycle "START - BEFORE FINISH - FINISH" has completed.
I am not sure what you are referring to with "initial mutex object". The logs show that the mutex starts with an empty queue and then starts filling up its queue as more and more requests are waiting for the mutex to become available.
Could you explain more about this comment, please?
I was referring to your initial code sample:
try {
const { currentCount } = yield CharacterCounter.findOne(); // get total character count so far
addedCount = currentCount + originalText.length;
CharacterCounter.updateTotalCharacterCount(addedCount); // save on DB
} catch (e) {
console.log(e);
} finally {
release();
}
Here, you immediately release the mutex after calling updateTotalCharacterCount
, even though the update has not finished yet. In your more current code samples, you yield
which causes the request to return to the event loop until the Promise
for the update has resolved, so that's no problem anymore.
Now I'm trying to find another solution...
But your replay was much appreciated. Thank you!