elijahr/ringbuf

Peek and skip method

Opened this issue ยท 8 comments

Hi, thank you for this library. This issue is a feature request.

I would like for the library to provide a peek method, which would behave like pop, but would not actually remove anything from the ring buffer (so, read_available and write_available would be left intact after a peek).

Motivating use case: I would like to read some data from the ring buffer to write it somewhere else, but that write might return short (meaning, not all data could be written). In this case, the "data that could not be written" should remain in the correct position in the ring buffer. However, had I used read and then push with the remaining data, then it would be in the wrong position!

# Initialize a sample ring buffer with 10 bytes
ring = RingBuffer(10)
ring.push(bytes(range(10))

# Read a smaller chunk of, say, 4 bytes (buffer is left with 6 bytes)
chunk = ring.pop(4)

# Try to write the chunk to some stream, which returns how much data could be written
n = stream.write(chunk)

# Error!
# If n returns short (say, 3), we're left with 4-3 = 1 bytes worth that should not have been removed from the buffer!
# This push will place them in the wrong position, so this is not a solution!
ring.push(chunk[n:])

With my proposed methods, this would not be an issue:

ring = RingBuffer(10)
ring.push(bytes(range(10))
chunk = ring.peek(4)  # Data is NOT removed from the buffer yet
n = stream.write(chunk)
ring.read(n)  # Only the actual amount written is removed from the buffer, success!

For the last read, it might be more efficient to provide a skip method. Although this is not necessary, it would save a few memory operations:

ring = RingBuffer(10)
ring.push(bytes(range(10))
chunk = ring.peek(4)
n = stream.write(chunk)
ring.skip(n)  # No data is returned, but we drop the right amount from the buffer

Ah, upon further inspection (trying to implement it myself), it appears this library is a small wrapper around boost's spsc_queue, which provides neither of these methods. That's unfortunate, as these would be a very good addition. I wonder why Boost doesn't provide these. Maybe they're not compatible with "lock-free" part of the implementation? It should be a matter of changing the copy_and_delete method with std::copy directly, I would think.

Would it be feasible to vendor their headers and make the modifications needed for this? This would also get rid of the dependency on all of boost, as we would copy only what we need. Licensing would of course be respected, but that's already the case by pulling all of it anyway.

@i-sf My apologies for the months-late response! Did not see this in my notifications.

This is an interesting proposal. This library was originally intended for parallel thread use, for instance, as a means of placing audio data generated by Python code into a buffer that can be consumed by real-time priority C/C++ code which should not acquire the GIL. I think peek() and skip() could maybe still function in a parallel context since this is a single-consumer queue, though they would not work for multi-consumer since the data could change between peek() and skip() calls.

For your re-pushing use case, might it work to instead just loop until everything has been written to the stream? If you try to push data to the buffer after popping you are likely to get out-of-order data unless you can guarantee that the queue is empty between read and write. In single-threaded use or with GIL, that's probably okay, but its an anti-pattern with SPSC since one thread should be pushing and one thread should be popping (or one thread doing both, I guess).

So, just using a loop:

# Initialize a sample ring buffer with 10 bytes
ring = RingBuffer(10)
ring.push(bytes(range(10))

# Read a smaller chunk of, say, 4 bytes (buffer is left with 6 bytes)
chunk = ring.pop(4)

# Try to write the chunk to some stream, which returns how much data could be written
while chunk:
    n = stream.write(chunk)
    chunk = chunk[n:]

As for forking spsc_queue code and placing it here - I am not opposed to it. Feel free to put together a pull request.

My use case was with asyncio, where one task reads from a client into the buffer only if the buffer has the capacity for it, and another task takes from the buffer and writes elsewhere, but it doesn't know how much it can write there (so it has to peek, write, and skip N written bytes; if it read N but only M could be written then N - M data would be "lost" and couldn't be re-inserted in order).

In any case, I've ended up writing my own Rust-based solution so I no longer have a need for this.

@i-sf do you mind linking your rust impl?

No, sorry, it's private code in the company I work with. However, I will ask my employer if it's okay to release that part as open-source.

@i-sf sweet thanks!

Put in on our test list ๐Ÿ„๐Ÿผ