elastic/logstash

[meta] first persistence implementation release tracking

Closed this issue · 6 comments

This issue list and track progress toward the first release of the persistence integration into logstash.

Related issues

TODOs

Pre-GA

Post-GA

Completed

  • optimistic recovery PR #6519 (@colinsurprenant)
  • prevent PQ corruption if two logstash instances are started from the same config or location #6505 (@colinsurprenant)
  • Pipeline initialization vs Persistent Queue initialization #5916 (@colinsurprenant)
  • enable file-based tests/specs (@guyboertje)
  • pass existing integration tests (using the current benchmarks setup)
  • DLQ/eviction implementation (see #5283) - we agreed to not change the current event dropping behaviour for the first iteration
  • fix queue.open bug with empty pages
  • refactor Event serialization to use JSON/CBOR (@guyboertje)
  • add checksum support to MmapPageIO (@guyboertje)
  • optimize Queue.ack() page inefficient page traversal (@colinsurprenant)
  • optimize Queue.firstUnreadPage() inefficient page traversal (@colinsurprenant)
  • clean hardcoded "checkpoint" filenames (@colinsurprenant)
  • handle case of single element bigger than page capacity in Queue.write()
  • add proper Queue.close() with Page.close() and PageIO.close() and review queue closing sequence and exception handling.
  • add blocking Queue.readBatch()
  • add timedout blocking Queue.readBatch()
  • timed-out blocking read_batch size maximisation #5652
  • Queue Batch object integration into Logstash Batch object (@guyboertje) Rebase this into feature branch.
  • JRuby ext for Queue API
  • Shutdown/Flush signalling need to be reviewed, cannot work as-is with current persistence API #5672
  • add acks-side and write-side threashold checkpointing? #6225 (@colinsurprenant)
  • add usable queue size max capacity check using byte-size limit (e.g. 8g) (@jsvd) Issue #6293
  • enable existing integration tests with matrix (with PQ on) #6328 (@suyograo)
  • and new integration tests (@talevy)
  • add monitoring info and API to expose PQ stats (issue: #6182) (@talevy)
  • add user facing docs on PQ (@suyograo)
  • review queue closing sequence and exception handling. (@colinsurprenant)
  • investigate possible performance issue related to the number of tail pages
  • Wrap AckedBatch in Batch, create Clients and move shutdown and flush signaling to pipleine level similar to Colin's prototype impl (@guyboertje)
  • figure how to run logstash tests/specs with PQ code? (use in-memory pages impl?)(@guyboertje)
  • remove ElementDeserializer class add Queue.deserialize (@colinsurprenant)
  • get rid of ReadElementValue? I think it is useless since Queueable interface provides getSeqNum(). (@colinsurprenant)
  • add queue elements size limit (@colinsurprenant)
  • queue.type will default to memory with option for persistent see #5958 (comment)
  • use version.yml in build.gradle
  • add correct handling of empty pages created by systematic beheading on Queue.open
  • refactor Queue.open(), method is big (@colinsurprenant)

I prioritized tasks in p1/p2 /cc @guyboertje

I added the master mergability section to track what is required before submitting a PR against master.

WIP PR submitted in #5958

Some numbers:

  queue.type: persisted
  queue.page_capacity: 512KB
  workers: 1
  events: 1000000
  took: 26.1s

  queue.type: persisted
  queue.page_capacity: 512KB
  workers: 4
  events: 1000000
  took: 18.7s

  queue.type: persisted
  queue.page_capacity: 512KB
  workers: 8
  events: 1000000
  took: 18.5s

  queue.type: persisted
  queue.page_capacity: 65536KB
  workers: 1
  events: 1000000
  took: 32.3s

  queue.type: persisted
  queue.page_capacity: 65536KB
  workers: 4
  events: 1000000
  took: 24.5s

  queue.type: persisted
  queue.page_capacity: 65536KB
  workers: 8
  events: 1000000
  took: 24.4s

  ------------------

  queue.type: memory
  queue.page_capacity: 512KB
  workers: 1
  events: 1000000
  took: 25.1s

  queue.type: memory
  queue.page_capacity: 512KB
  workers: 4
  events: 1000000
  took: 19.1s

  queue.type: memory
  queue.page_capacity: 512KB
  workers: 8
  events: 1000000
  took: 18.5s

  ------------------

  queue.type: synchronous
  workers: 1
  events: 1000000
  took: 31.4s

  queue.type: synchronous
  workers: 4
  events: 1000000
  took: 13.4s

  queue.type: synchronous
  workers: 8
  events: 1000000
  took: 12.9s

I also ran some benchmarks using an "empty" config and processing 1M events using the integration/benchmark framework:

input { stdin {} }
filter { clone {} }
output { stdout { codec => line } }
type page size tps # pages created
synchronous n/a 52500 n/a
persisted 1mb 50325 138
persisted 512mb 39166 1

So there is in fact a slowdown related to the persisted page size. My guess is with the page bitset operations that are yet to be optimized. (P2 task, will move to P1 😄 )

closing this - this isn't much relevant anymore.