Simpler algorithm
Opened this issue · 9 comments
Couldn't quite get thread-stream running well in our services so we re-implemented something similar. We used a simpler concurrent queue implementation that might be useful for thread-stream as well.
Basically we do it like this.
const MAX_LEN = 8 * 1024
const BUF_END = 256 * MAX_LEN
const BUF_LEN = BUF_END + MAX_LEN
const WRITE_INDEX = 4
const READ_INDEX = 8
const sharedBuffer = new SharedArrayBuffer(BUF_LEN)
const sharedState = new SharedArrayBuffer(128)
const state = new Int32Array(sharedState)
const buffer = Buffer.from(sharedBuffer)
buffer[0] = 31 // Item header
Atomics.store(state, WRITE_INDEX, 1)
Atomics.notify(state, WRITE_INDEX)
// producer
let draining = false
async function drain () {
draining = true
let write = Atomics.load(state, WRITE_INDEX)
let read = Atomics.load(state, READ_INDEX)
while (write <= read && write + MAX_LEN > read) {
const { async, value } = Atomics.waitAsync(state, READ_INDEX, read)
if (async) {
await value
}
read = Atomics.load(state, READ_INDEX)
}
draining = false
emit('drain')
}
send(data) {
const len = MAX_LEN // or Buffer.byteLength(name)
let read = Atomics.load(this._state, READ_INDEX)
while (write < read && write + len > read) {
Atomics.wait(this._state, READ_INDEX, read)
read = Atomics.load(state, READ_INDEX)
}
write += this._buffer.write(name, this._write)
buffer[write++] = 31
if (write > BUF_END) {
write = 0
}
Atomics.store(state, WRITE_INDEX, write)
Atomics.notify(state, WRITE_INDEX)
const needDrain = write + MAX_LEN >= BUF_END
if (needDrain && !draining) {
draining = true
drain()
}
return !draining
}
// consumer
async function * receive () {
while (true) {
let write = Atomics.load(state, WRITE_INDEX)
if (read > BUF_END) {
read = 0
}
while (read === write) {
const { async, value } = Atomics.waitAsync(state, WRITE_INDEX, write)
if (async) {
await value
}
write = Atomics.load(state, WRITE_INDEX)
}
if (write < read) {
write = BUF_END
}
const arr = []
while (read < write) {
const idx = buffer.indexOf(31, read)
arr.push(buffer.toString('utf-8', read, idx))
read = idx + 1
}
Atomics.store(state, READ_INDEX, read)
Atomics.notify(state, READ_INDEX)
yield* arr
}
}
I don't think you can make the assumption that the consumer is faster.
I would really like to solve those bugs... however I need reproductions.
I don't think you can make the assumption that the consumer is faster.
We already kind of do that https://github.com/pinojs/thread-stream/blob/main/index.js#L218. My suggestion is missing 'drain'
atm.
I can sort that out as well. I'd like to try applying this to thread-stream but is a bit short on time atm so just wanted to post our progress here n case someone wants to have a take on it-
Updated with drain
We already kind of do that https://github.com/pinojs/thread-stream/blob/main/index.js#L218. My suggestion is missing 'drain' atm.
This only happens if we are over MAX_STRING which happens only if the consumer is extremely slower than the producer.
Note that you use waitAsync
which is not available in Node v14 as far as I am aware.
const assert = require('assert')
const EE = require('events')
const WRITE_INDEX = 4
const READ_INDEX = 8
const HWM = 16 * 1024
async function* read({ sharedState, sharedBuffer }) {
assert(sharedState)
assert(sharedBuffer)
const state = new Int32Array(sharedState)
const buffer = Buffer.from(sharedBuffer)
const bufEnd = buffer.length - HWM
let read = 0
while (true) {
let write = Atomics.load(state, WRITE_INDEX)
if (read > bufEnd) {
read = 0
}
while (read === write) {
const { async, value } = Atomics.waitAsync(state, WRITE_INDEX, write)
if (async) {
await value
}
write = Atomics.load(state, WRITE_INDEX)
}
if (write < read) {
write = bufEnd
}
const arr = []
while (read < write) {
const len = buffer.readInt32LE(read)
arr.push(buffer.toString('utf-8', read + 4, read + 4 + len))
read += 4 + len
}
Atomics.store(state, READ_INDEX, read)
Atomics.notify(state, READ_INDEX)
yield* arr
}
}
class Writer extends EE {
constructor({ sharedState, sharedBuffer }) {
assert(sharedState && sharedState.byteLength >= 32)
assert(sharedBuffer)
super()
this._state = new Int32Array(sharedState)
this._buffer = Buffer.from(sharedBuffer)
this._write = 0
this._bufEnd = this._buffer.length - HWM
}
write(data) {
let read = Atomics.load(this._state, READ_INDEX)
while (data) {
const toWrite = data.slice(0, HWM)
data = data.slice(HWM)
const len = Buffer.byteLength(toWrite)
while (this._write < read && this._write + len > read) {
// TODO (fix): Avoid blocking.
Atomics.wait(this._state, READ_INDEX, read)
read = Atomics.load(this._state, READ_INDEX)
}
const written =
typeof data === 'string'
? this._buffer.write(toWrite, this._write + 4)
: toWrite.copy(this._buffer, this._write + 4)
assert(len === written)
this._buffer.writeInt32LE(len, this._write)
this._write += 4 + len
if (this._write > this._bufEnd) {
this._write = 0
}
Atomics.store(this._state, WRITE_INDEX, this._write)
Atomics.notify(this._state, WRITE_INDEX)
}
// TODO (fix): Implement back-pressure w/ 'drain' event.
}
}
module.exports = {
read,
Writer,
}
IMHO 90% of the complexity comes when avoiding the blocking on the writer, moreover it's the first requirement of this system. If we are blocking the main thread, we could avoid all this threading and just keep everything in main.
IMHO 90% of the complexity comes when avoiding the blocking on the writer, moreover it's the first requirement of this system. If we are blocking the main thread, we could avoid all this threading and just keep everything in main.
Here you go with non blocking writer:
const assert = require('assert')
const EE = require('events')
const WRITE_INDEX = 4
const READ_INDEX = 8
async function* read({ sharedState, sharedBuffer, maxMessageSize }) {
assert(sharedState)
assert(sharedBuffer)
assert(maxMessageSize)
const state = new Int32Array(sharedState)
const buffer = Buffer.from(sharedBuffer)
const bufEnd = buffer.length - maxMessageSize
let read = 0
while (true) {
let write = Atomics.load(state, WRITE_INDEX)
if (read > bufEnd) {
read = 0
}
while (read === write) {
const { async, value } = Atomics.waitAsync(state, WRITE_INDEX, write)
if (async) {
await value
}
write = Atomics.load(state, WRITE_INDEX)
}
if (write < read) {
write = bufEnd
}
const arr = []
while (read < write) {
const len = buffer.readInt32LE(read)
arr.push(buffer.toString('utf-8', read + 4, read + 4 + len))
read += 4 + len
}
Atomics.store(state, READ_INDEX, read)
Atomics.notify(state, READ_INDEX)
yield* arr
}
}
class Writer extends EE {
constructor({ sharedState, sharedBuffer, maxMessageSize }) {
assert(sharedState && sharedState.byteLength >= 32)
assert(sharedBuffer)
assert(maxMessageSize)
super()
this._state = new Int32Array(sharedState)
this._buffer = Buffer.from(sharedBuffer)
this._write = 0
this._maxMessageSize = maxMessageSize
this._bufEnd = this._buffer.length - maxMessageSize
this._needDrain = false
}
write(data) {
let read = Atomics.load(this._state, READ_INDEX)
while (this._isFull(read)) {
Atomics.wait(this._state, READ_INDEX, read)
read = Atomics.load(this._state, READ_INDEX)
}
const written = this._buffer.write(data, this._write + 4)
this._buffer.writeInt32LE(written, this._write)
this._write += 4 + written
assert(this._write + 1 < this._buffer.length)
if (this._write > this._bufEnd) {
this._write = 0
}
Atomics.store(this._state, WRITE_INDEX, this._write)
Atomics.notify(this._state, WRITE_INDEX)
const needDrain = this._isFull(read)
if (needDrain && !this._needDrain) {
this._needDrain = true
this._drain()
}
return needDrain
}
_isFull(read) {
return this._write < read && this._write + this._maxMessageSize > read
}
async _drain() {
let read = Atomics.load(this._state, READ_INDEX)
while (this._isFull(read)) {
const { async, value } = Atomics.wait(this._state, READ_INDEX, read)
if (async) {
await value
}
read = Atomics.load(this._state, READ_INDEX)
}
this._needDrain = false
this.emit('drain')
}
}
module.exports = {
read,
Writer,
}
We can polyfill Atomics.asyncWait
in node 14.
There are a couple of edge cases relates to how we handle flushSync() and end() this is not managing. However I'be really happy to see the crappy waitAsync implementation we have here replaced with code similar to yours.
Something that worries me is that given we do not have the reproduction scripts for your problems, we'll make the same mistakes twice.