Implement concurrent filtering and postprocessing
Closed this issue · 18 comments
(This bug has morphed a couple of times but has mostly been about optimizing the filtering pipeline in the sonarlog and db layers. What remains now is making use of available parallelism - this is not high priority, perf is pretty good already.)
For the heavy-users script, we end up running the profile command for every job that passes the initial filtering. The profile command is actually pretty slow. I don't know why precisely, but there is one main loop that is quadratic-ish (though there's an assertion that "the inner loop will tend to be very short") and then when we do bucketing, which I do, there's a bucketing loop that is cubic-ish. One would want to profile this. File I/O should not be a factor because this is with the caching daemon.
I'm running this on four weeks of fox data: heavy-users.py fox 4w
, which would be roughly may 24 through june 21. There are 38 output lines unless I miscounted.
- Move filter earlier in the pipeline
- Avoid allocating a tremendously long array for all the samples
- Avoid computing bounds when they are not needed, as this is expensive (it must be done before filtering)
- Maybe: optimize the filter by specializing it for common cases (one value of one attribute + time) (see below + branch larstha-526-better-filter for WIP, this looks hard in general and the best we can do may be to implement special logic for particularly important cases)
- Maybe: push filtering down into the reading pipeline to do it concurrently
- Maybe: parallelize postprocessing, though this matters a lot less than efficient reading and filtering
Here's what I'm trying, running locally on naic-monitor:
./sonalyze profile \
-cpuprofile profile.prof \
-data-dir ~/sonar/data/fox.educloud.no \
-from 2024-05-24 -to 2024-06-21 \
-job 698478 -user ec-thallesss \
-bucket 6 \
-fmt csv,gpu
(can't run against the daemon with -cpuprofile and it's not very sensible to run the daemon with -cpuprofile). Unsurprisingly parsing dominates this:
flat flat% sum% cum cum%
0.81s 17.02% 17.02% 1.14s 23.95% sonalyze/db.(*CsvTokenizer).Get
0.44s 9.24% 26.26% 3.37s 70.80% sonalyze/db.ParseSonarLog
0.30s 6.30% 32.56% 0.30s 6.30% sonalyze/common.(*hashtable).getBytes (inline)
0.26s 5.46% 38.03% 0.26s 5.46% runtime/internal/syscall.Syscall6
0.18s 3.78% 41.81% 0.21s 4.41% main.buildFilters.func1
0.16s 3.36% 45.17% 0.17s 3.57% sonalyze/db.parseUint64 (inline)
0.15s 3.15% 48.32% 0.15s 3.15% sonalyze/db.(*CsvTokenizer).MatchTag (inline)
0.14s 2.94% 51.26% 0.20s 4.20% runtime.mapaccess2
0.14s 2.94% 54.20% 0.14s 2.94% sonalyze/common.hashBytes (inline)
0.12s 2.52% 56.72% 0.36s 7.56% sonalyze/sonarlog.ComputeAndFilter
0.10s 2.10% 58.82% 0.12s 2.52% runtime.findObject
0.10s 2.10% 60.92% 0.10s 2.10% runtime.memmove
0.09s 1.89% 62.82% 0.09s 1.89% runtime.(*mspan).base (inline)
0.09s 1.89% 64.71% 0.09s 1.89% runtime.nextFreeFast (inline)
but the hashtable thing is interesting, because the profiler uses hashtables for its sparse matrices. Still, more heat than light here - the call tree for the profile doesn't really show the profiler at all.
Some manual instrumentation:
- running heavy-users.py on naic-monitor (so, remote access but on the local machine) each invocation of the profiler takes about 1.10s and generates a few KB of data
- postprocessing time of the output seems negligible
- the initial
sonalyze jobs
run takes 0.3s and generates about 175KB data
It really is the ~38 profiler runs that are expensive here, everything else is noise.
Re the sparse matrix:
- The timestamps will tend to be large and spread out but new rows will tend to be created at the end.
- The process numbers will also tend to be large and spread out but it's slightly less clear whether new columns will be created in order or more arbitrarily.
- We can probably almost all HT probes on set() by keeping min/max for row and col, but will this matter? There's a quadratic number of set() but a cubic number of get(). And we still don't know if this is where the time goes.
- Is there a problem with profiling this on a VM and not on a real computer?
Profiling locally on my laptop does not shed much light. It still looks like reading data is the main culprit. The processing time barely shows up, the profile computation really not at all.
It would be worth it to investigate whether caching works properly in the daemon. On naic-monitor it has only a 512MB resident set, but the cache size was set to 12GB, and it should really have a lot more data cached, given how often reports are created and so on.
Having isolated reading from the rest of it, here's the top profile of a request where every read hit the cache:
Showing nodes accounting for 1120ms, 89.60% of 1250ms total
Showing top 20 nodes out of 74
flat flat% sum% cum cum%
180ms 14.40% 14.40% 200ms 16.00% main.buildFilters.func1
130ms 10.40% 24.80% 340ms 27.20% sonalyze/sonarlog.ComputeAndFilter
130ms 10.40% 35.20% 570ms 45.60% sonalyze/sonarlog.createInputStreams
90ms 7.20% 42.40% 130ms 10.40% runtime.findObject
80ms 6.40% 48.80% 140ms 11.20% runtime.mapaccess2
70ms 5.60% 54.40% 270ms 21.60% runtime.scanobject
60ms 4.80% 59.20% 60ms 4.80% runtime.(*gcBits).bitp (inline)
60ms 4.80% 64.00% 60ms 4.80% sonalyze/sonarlog.TimeSortableSampleStream.Less
40ms 3.20% 67.20% 40ms 3.20% aeshashbody
40ms 3.20% 70.40% 40ms 3.20% runtime.memclrNoHeapPointers
40ms 3.20% 73.60% 40ms 3.20% runtime.spanOf (inline)
30ms 2.40% 76.00% 30ms 2.40% runtime.memhash32
30ms 2.40% 78.40% 30ms 2.40% runtime.memmove
20ms 1.60% 80.00% 20ms 1.60% memeqbody
20ms 1.60% 81.60% 20ms 1.60% runtime.(*mspan).base (inline)
20ms 1.60% 83.20% 20ms 1.60% runtime.add (inline)
20ms 1.60% 84.80% 70ms 5.60% runtime.mallocgc
20ms 1.60% 86.40% 20ms 1.60% runtime.mapaccess1_fast32
20ms 1.60% 88.00% 60ms 4.80% runtime.mapaccess2_fast32
20ms 1.60% 89.60% 30ms 2.40% runtime.mapassign_fast32
This illustrates that it is the underlying stream creation machinery that is expensive, although frankly the resulting call graph with profile annotations still feels wrong, as the profile package does not show up anywhere:
It's worth observing that createInputStreams precedes ComputeAndFilter in the source code and that the go profiler can get things wrong, golang/go#41338. So more likely than not we should view these as part of the same problem. Clearly main.builfFilters.func1 is also this - it's the constructed filter function called by ComputeAndFilter.
Anyway, the profiler is not the issue here per se, but stream creation and filtering.
Another dimension is that this was for a particularly long time period - four weeks. There will be a lot of records to filter and process, it's not insane that this can take a second or more.
Compiling the filters as a set of closures - to avoid redundant filters - is not much of a winner, basically there will always be at least a couple, for the time span and probably a job, a user, or a host. The function call overhead eats up the winnings.
Basically:
- the result from ReadSampleStreams with the same from/to date and host globber is invariant provided no new records have been added in the time window
- the upper part of ComputeAndFilter ditto, it just computes attributes from the unfiltered records
- the lower part of ComputeAndFilter applies the filter, which is much more likely to change from run to run
To prove that ReadSampleStreams would return the same value we need to prove (in the current implementation) that all files are cached, or (more generally) that all files have the same mtime as last time.
Additionally, there's probably a ceiling on how long it could take for a record to arrive, and if the "to" timestamp is more than - say - 30 minutes in the past, we could reuse a query.
Thus there could be a query cache and maybe some way of asking the db about whether there have been new data for some set of files involved in a query. This would remove all of the time for createInputStreams, and maybe 25% of the time for ComputeAndFilter, for a total of about half the running time. But maintaining the cache will be interesting, if there's cache pressure it will have to be purged.
Another thing that would make sense here is amortisation. For the specific use case of heavy-users.py we're going to be running the exact same command except for the job number for a number of jobs. Being able to just ask for multiple profiles by job number would speed this process up because the input record set could be computed once and filtered multiple times, once for each job.
Another thing that would help the script (but not fix the problem per se) is to pass in a host filter - the hosts for the job are known from the initial jobs command (in fact right now it must be precisely one) and can be attached to the profile query. This should speed up the record ingestion a lot. Testing this, it turns out to be a massive winner: per-invocation time drops from 1.10s to about 0.08s.
The insight that the host filter should help is important, but it's astonishing that it helps as much as it does. The code looks like it is linear in the size of the input set but there are nonlinear components hidden here:
- hash tables that must grow (and can become quite large in some cases) in the initial partitioning, in the full unfiltered set there is a huge number of streams, I expect
- slices that are appended to in the initial partitioning - there are some long-running jobs here and the individual streams can become very long
- at least one (stable) sort call after the initial partitioning, the stable sort is
O(n * lg(n) * lg(n)) + O(n * lg(n))
according to the docs. but the data may already be mostly-ordered by time, so we could likely do better.
The most robust solution would probably be an in-memory index that is built lazily and attached to each cached blob. Consider filtering records by job number. The first time through we have to scan all records in all the blobs (and we do this late, after building up an initial search set). Subsequently the files will be in memory but if the first scan built up an index then we could go straight to that. The problem is that the index takes space and takes some time to build, and we don't know which attributes we need, so we'll have to not do more than we have to. An index for exact inclusive filtering is a table that maps a key to a set of rows. In our case this could be a set of row indices since the table data are immutable; this would save space and be kind to the GC. (If space is a huge concern then we would do well to have the option of using 16-bit row indices as this will almost always be enough, but not actually always.)
The size of the index would have to be accounted for in the cached content.
To make this effective we would apply the filter (ie use the index) early - during the building of the initial record set. Doing that now with the existing filter structure might also be beneficial. I think it's not done early now for some code structuring reasons - there's some filtering that can't be done until late in the game - but most filters are truly record filters, not inter-record filters, and apply at any time after ingestion.
Another matter is that a lot of concurrency is being left on the table right now. Most operations in postprocessing are massively parallelizable: we can construct partial sample streams per input blob in parallel, then these can be merged (this step is sequential but need not be awful), and after that everything happens per-stream. If the number of streams is large then the speedup potential by throwing cores at the problem is very significant. For a large data set from a large number of nodes, there will almost always be a large number of streams as there is one stream per (node,jobid,cmd) triplet.
... we would apply the filter (ie use the index) early - during the building of the initial record set. Doing that now with the existing filter structure might also be beneficial. I think it's not done early now for some code structuring reasons - there's some filtering that can't be done until late in the game - but most filters are truly record filters, not inter-record filters, and apply at any time after ingestion.
This is a massive win, bringing the time for a run of the profiler down from about 1200ms to about 180ms (more numbers coming).
Final end-to-end numbers for the move-filtering patch: without host filtering, heavy-users.py fox 2024-05-01 2024-06-30
takes 2m42s with current sonalyze, 0m36s with the new one, almost a 5x improvement. Warm caches in both cases and plenty of RAM, I tried to be fair. Will land this.
Avoiding the allocation of a huge intermediate array is also a significant win. The main consumers are now various maps and hash functions used by createInputStreams
. Getting rid of bounds computation when not needed + specializing the filter function will deal with many of these in common cases.
Actually the filter function specialization is a little trickier than I thought. The default filter function is used to exclude heartbeat records via excludeCommands. But it may still be worth it: running time for the test case drops from 28s to 25s in an informal test, ie 10% from an already pretty good time.
Also branch larstha-526-better-filter.
Avoiding allocating the huge array took 25% off the running time of heavy-users.py (with either warm or cold cache), bringing us down to about 26s for this code for two months of data on a warm cache.
Getting rid of the bounds computation for the profile runs is a massive win, dropping the total running time for the heavy-users.py report by 50%, with this we are now down by over 90% relative to the original running time. To be sure, bounds info is used by several other verbs and they will continue to suffer, but they seem to be less affected than the profiler.
Compiling the filter to a bytecode program and executing that cut the time by half again.
For the two remaining items (concurrency): file reading and parsing is being done in parallel but there is one worker per CPU and there will be plenty of idle time while we're waiting for I/O to complete, if there is a cache miss. If there is a cache hit then the "I/O" is trivial but the workers will be busy. I think a worker pool separate from the I/O workers may be the best thing here. Implementing anything more here is not a priority though, what remains here is background work.
Let's just defer the rest of this and handle it if it becomes an issue again.