[BUG][Concurrent Search] Search request for top anomalies from AD plugin is failing
sohami opened this issue · 10 comments
Describe the bug
AD plugin exposes a top anomalies api which internally create a painless script based composite aggregation query. When concurrent search is enabled on a cluster, then the search request sometimes fails with error.
Ref:
Top Anomalies API: https://opensearch.org/docs/latest/observing-your-data/ad/api/#search-top-anomalies
Input to the API:
GET _plugins/_anomaly_detection/detectors/uBtS6HkBmDH36MzijITq/results/_topAnomalies
{
"size": 10,
"category_field": ["host", "service"],
"order": "severity",
"start_time_ms": 1622333056000,
"end_time_ms": 1622937856000
}
Example Query which it creates:
"query": {
"bool": {
"filter": {
"term": {
"detector_id": "uBtS6HkBmDH36MzijITq"
}
}
}
},
"aggs": {
"multi_buckets" : {
"composite": {
"sources": [
{
"host": {
"terms": {
"script": {
"source": """
String value = null;
if (params == null || params._source == null || params._source.entity == null) {
return "";
}
for (item in params._source.entity) {
if (item['name'] == "host") {
value = item['value'];
break;
}
}
return value;
""",
"lang": "painless"
}
}
}
},
{
"service": {
"terms": {
"script": {
"source": """
String value = null;
if (params == null || params._source == null || params._source.entity == null) {
return "";
}
for (item in params._source.entity) {
if (item['name'] == "service") {
value = item['value'];
break;
}
}
return value;
""",
"lang": "painless"
}
}
}
}
]
},
"aggregations": {
"max": {
"max": {
"field": "anomaly_grade"
}
},
"multi_buckets_sort": {
"bucket_sort": {
"sort": [
{ "max": { "order": "desc" } }
],
"size": 10
}
}
}
}
}
Related component
Search:Query Capabilities
To Reproduce
- Create an index with AD plugin result index mapping. Ref here
- Ingest the dummy data
- Run the query shared in the description above.
Test failing in the plugin has sample of the index mapping for entity field and logic to create the data which can be used to reproduce. Ref here
Expected behavior
The query should be successful
Additional Details
Plugins
Anomaly Detection
Screenshots
If applicable, add screenshots to help explain your problem.
Host/Environment (please complete the following information):
- OS: [e.g. iOS]
- Version [e.g. 22]
Additional context
Add any other context about the problem here.
On looking more into this failure, there seems to be a race condition in the composite aggregation in general where the lookup instances is shared across slices. So this will have issues in all the composite aggregation path with concurrent segment search. We will need to disable the concurrent path with composite aggregation being used in the request to fall back to the non-concurrent path. The change will be 1 liner as we already have a mechanism to control it at per aggregation level.
In the description, can you plz share the exception and stack trace related to the race condition
Took a closer look at this and it seems there are actually 2 separate issues related to concurrent segment search here:
- There's an AssertingCodec issue when global ordinals are used with composite aggs. This applies to both cases with and without sub-aggs and is caught by the test cases in CompositeAggIT. The problem here is DocValues are created on the
index_searcher
thread during collection but then re-used on thesearch
thread duringbuildAggregations
. I don't think there is necessarily a race condition here sincebuildAggregations
will always happen aftercollection
, even in cases of subAggs that use deferred collection. However, we are still using DocValues in a way that is not expected by Lucene so this requires a fix. - When using an aggregation script on the
_source
field there is a race condition where the leafLookup is shared across threads and thus the created fieldReader can be modified from multiple threads concurrently. This mostly manifests as an AssertingCodec issue on the StoredFields object however if the asserting codec is not used it can also throw aCorruptIndexException
if the race condition is hit. This can be found in the reproduction here: jed326@5a15d8e
Problem 1 should be a pretty straightforward fix that I will open a PR for, however problem 2 will require some more investigation. Depending on the outcome of that investigation one path forward could be to re-enable composite aggregations for concurrent segment search but only when a script is not used (or specifically when it is not scripting on the _source
field).
@jed326 could you help me understand why/how the LeafSearchLookup
is shared across multiple threads in concurrent segment search codepath?
LeafSearchLookup
isn't threadsafe and that's the invariant.
SourceLookup::setSegmentAndDocument
will get called from the IndexSearcher
threads during concurrent search (I believe it comes through the ScriptDocValues
), and then this fieldReader
reference can get updated from multiple threads in that case.
Then in SourceLookup::loadSourceIfNeeded
this fieldReader
might not be the correct fieldReader
:
And in the AggregationScript
here:
This is where that LeafSearchLookup
is created
Shouldn't each thread creates its own SourceLookup
instance? is there a reason why they need to share it?
@rishabhmaurya I need to refresh my memory on this, but I believe the problem is that there is only one instance of the AggregationScript
class.
Here is the stack trace from investigating this issue:
WARNING: Uncaught exception in thread: Thread[opensearch[node_s5][search][T#1],5,TGRP-TemporaryIT]
java.lang.AssertionError: StoredFieldsReader are only supposed to be consumed in the thread in which they have been acquired. But was acquired in Thread[opensearch[node_s5][index_searcher][T#1],5,TGRP-TemporaryIT] and consumed in Thread[opensearch[node_s5][search][T#1],5,TGRP-TemporaryIT].
at __randomizedtesting.SeedInfo.seed([8D650412C6C5AF37]:0)
at org.apache.lucene.tests.codecs.asserting.AssertingCodec.assertThread(AssertingCodec.java:44)
at org.apache.lucene.tests.codecs.asserting.AssertingStoredFieldsFormat$AssertingStoredFieldsReader.document(AssertingStoredFieldsFormat.java:75)
at org.opensearch.search.lookup.SourceLookup.loadSourceIfNeeded(SourceLookup.java:104)
at org.opensearch.script.AggregationScript.lambda$static$2(AggregationScript.java:77)
at org.opensearch.script.DynamicMap.get(DynamicMap.java:84)
at org.opensearch.painless.PainlessScript$Script.execute(String value = null; if (params == null || params._source == null || params._source.entity == null) { return ""; } for (item in params._source.entity) { if (item["name"] == "keyword-field") { value = item['value']; break; } } return value;:50)
at org.opensearch.search.aggregations.support.values.ScriptBytesValues.advanceExact(ScriptBytesValues.java:71)
at org.opensearch.search.aggregations.bucket.composite.BinaryValuesSource$1.collect(BinaryValuesSource.java:183)
at org.opensearch.search.aggregations.LeafBucketCollector.collect(LeafBucketCollector.java:123)
at org.opensearch.search.aggregations.bucket.composite.CompositeAggregator.runDeferredCollections(CompositeAggregator.java:671)
at org.opensearch.search.aggregations.bucket.composite.CompositeAggregator.buildAggregations(CompositeAggregator.java:247)
at org.opensearch.search.aggregations.Aggregator.buildTopLevel(Aggregator.java:194)
at org.opensearch.search.aggregations.AggregationCollectorManager.reduce(AggregationCollectorManager.java:65)
at org.opensearch.search.aggregations.AggregationCollectorManager.reduce(AggregationCollectorManager.java:28)
at org.apache.lucene.search.MultiCollectorManager.reduce(MultiCollectorManager.java:73)
at org.opensearch.search.query.QueryCollectorManagerContext$QueryCollectorManager.reduce(QueryCollectorManagerContext.java:41)
at org.opensearch.search.query.QueryCollectorManagerContext$QueryCollectorManager.reduce(QueryCollectorManagerContext.java:27)
at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:726)
at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:692)
at org.opensearch.search.query.ConcurrentQueryPhaseSearcher.searchWithCollectorManager(ConcurrentQueryPhaseSearcher.java:82)
at org.opensearch.search.query.ConcurrentQueryPhaseSearcher.searchWithCollector(ConcurrentQueryPhaseSearcher.java:52)
at org.opensearch.search.query.QueryPhase$DefaultQueryPhaseSearcher.searchWith(QueryPhase.java:427)
at org.opensearch.search.query.QueryPhaseSearcherWrapper.searchWith(QueryPhaseSearcherWrapper.java:58)
at org.opensearch.search.query.QueryPhase.executeInternal(QueryPhase.java:282)
at org.opensearch.search.query.QueryPhase.execute(QueryPhase.java:155)
at org.opensearch.search.SearchService.loadOrExecuteQueryPhase(SearchService.java:547)
at org.opensearch.search.SearchService.executeQueryPhase(SearchService.java:611)
at org.opensearch.search.SearchService$2.lambda$onResponse$0(SearchService.java:580)
at org.opensearch.action.ActionRunnable.lambda$supply$0(ActionRunnable.java:74)
at org.opensearch.action.ActionRunnable$2.doRun(ActionRunnable.java:89)
at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
at org.opensearch.threadpool.TaskAwareRunnable.doRun(TaskAwareRunnable.java:78)
at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
at org.opensearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:59)
at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:913)
at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)