[meta] first persistence implementation release tracking
Closed this issue · 6 comments
colinsurprenant commented
This issue list and track progress toward the first release of the persistence integration into logstash.
Related issues
TODOs
Pre-GA
- sweep code TODOs and convert to tasks/issues and prioritize
- Add bootstrap checks to persistent queue #6552 (@original-brownbear)
- performance & benchmarks (@colinsurprenant)
Post-GA
- batched queue and multi write integration (@colinsurprenant)
- blog post (@colinsurprenant)
- code sweep for proper exception handling (@original-brownbear)
- more queue recovery tests
- more *pages tests
- refactor acking bitset to use ES LocalCheckpointService (@original-brownbear)
- refactor *Settings classes into a builder pattern #7053, also see #5958 (comment) (@original-brownbear)
- finalize streaming FilePageIO
- compare and benchmark MmapPageIO vs streaming FilePageIO
- Check Available Disk Space for Persisted Queues #6998 (@original-brownbear)
Completed
- optimistic recovery PR #6519 (@colinsurprenant)
- prevent PQ corruption if two logstash instances are started from the same config or location #6505 (@colinsurprenant)
- Pipeline initialization vs Persistent Queue initialization #5916 (@colinsurprenant)
- enable file-based tests/specs (@guyboertje)
- pass existing integration tests (using the current benchmarks setup)
- DLQ/eviction implementation (see #5283) - we agreed to not change the current event dropping behaviour for the first iteration
- fix queue.open bug with empty pages
- refactor Event serialization to use JSON/CBOR (@guyboertje)
- add checksum support to MmapPageIO (@guyboertje)
- optimize Queue.ack() page inefficient page traversal (@colinsurprenant)
- optimize Queue.firstUnreadPage() inefficient page traversal (@colinsurprenant)
- clean hardcoded "checkpoint" filenames (@colinsurprenant)
- handle case of single element bigger than page capacity in Queue.write()
- add proper Queue.close() with Page.close() and PageIO.close() and review queue closing sequence and exception handling.
- add blocking Queue.readBatch()
- add timedout blocking Queue.readBatch()
- timed-out blocking read_batch size maximisation #5652
- Queue Batch object integration into Logstash Batch object (@guyboertje) Rebase this into feature branch.
- JRuby ext for Queue API
- Shutdown/Flush signalling need to be reviewed, cannot work as-is with current persistence API #5672
- add acks-side and write-side threashold checkpointing? #6225 (@colinsurprenant)
- add usable queue size max capacity check using byte-size limit (e.g. 8g) (@jsvd) Issue #6293
- enable existing integration tests with matrix (with PQ on) #6328 (@suyograo)
- and new integration tests (@talevy)
- add monitoring info and API to expose PQ stats (issue: #6182) (@talevy)
- add user facing docs on PQ (@suyograo)
- review queue closing sequence and exception handling. (@colinsurprenant)
- investigate possible performance issue related to the number of tail pages
- Wrap AckedBatch in Batch, create Clients and move shutdown and flush signaling to pipleine level similar to Colin's prototype impl (@guyboertje)
- figure how to run logstash tests/specs with PQ code? (use in-memory pages impl?)(@guyboertje)
- remove ElementDeserializer class add Queue.deserialize (@colinsurprenant)
- get rid of ReadElementValue? I think it is useless since Queueable interface provides getSeqNum(). (@colinsurprenant)
- add queue elements size limit (@colinsurprenant)
-
queue.type
will default tomemory
with option forpersistent
see #5958 (comment) - use
version.yml
inbuild.gradle
- add correct handling of empty pages created by systematic beheading on Queue.open
- refactor Queue.open(), method is big (@colinsurprenant)
colinsurprenant commented
I prioritized tasks in p1/p2 /cc @guyboertje
colinsurprenant commented
I added the master mergability section to track what is required before submitting a PR against master.
colinsurprenant commented
WIP PR submitted in #5958
guyboertje commented
Some numbers:
queue.type: persisted
queue.page_capacity: 512KB
workers: 1
events: 1000000
took: 26.1s
queue.type: persisted
queue.page_capacity: 512KB
workers: 4
events: 1000000
took: 18.7s
queue.type: persisted
queue.page_capacity: 512KB
workers: 8
events: 1000000
took: 18.5s
queue.type: persisted
queue.page_capacity: 65536KB
workers: 1
events: 1000000
took: 32.3s
queue.type: persisted
queue.page_capacity: 65536KB
workers: 4
events: 1000000
took: 24.5s
queue.type: persisted
queue.page_capacity: 65536KB
workers: 8
events: 1000000
took: 24.4s
------------------
queue.type: memory
queue.page_capacity: 512KB
workers: 1
events: 1000000
took: 25.1s
queue.type: memory
queue.page_capacity: 512KB
workers: 4
events: 1000000
took: 19.1s
queue.type: memory
queue.page_capacity: 512KB
workers: 8
events: 1000000
took: 18.5s
------------------
queue.type: synchronous
workers: 1
events: 1000000
took: 31.4s
queue.type: synchronous
workers: 4
events: 1000000
took: 13.4s
queue.type: synchronous
workers: 8
events: 1000000
took: 12.9s
colinsurprenant commented
I also ran some benchmarks using an "empty" config and processing 1M events using the integration/benchmark framework:
input { stdin {} }
filter { clone {} }
output { stdout { codec => line } }
type | page size | tps | # pages created |
---|---|---|---|
synchronous | n/a | 52500 | n/a |
persisted | 1mb | 50325 | 138 |
persisted | 512mb | 39166 | 1 |
So there is in fact a slowdown related to the persisted page size. My guess is with the page bitset operations that are yet to be optimized. (P2 task, will move to P1 😄 )
colinsurprenant commented
closing this - this isn't much relevant anymore.