opensearch-project/OpenSearch

[BUG][Concurrent Search] Search request for top anomalies from AD plugin is failing

sohami opened this issue · 10 comments

Describe the bug

AD plugin exposes a top anomalies api which internally create a painless script based composite aggregation query. When concurrent search is enabled on a cluster, then the search request sometimes fails with error.

Ref:
Top Anomalies API: https://opensearch.org/docs/latest/observing-your-data/ad/api/#search-top-anomalies

Input to the API:

GET _plugins/_anomaly_detection/detectors/uBtS6HkBmDH36MzijITq/results/_topAnomalies
{
    "size": 10,
    "category_field": ["host", "service"],
    "order": "severity",
    "start_time_ms": 1622333056000,
    "end_time_ms": 1622937856000
}

Example Query which it creates:

"query": {
        "bool": {
            "filter": {
                "term": {
                    "detector_id": "uBtS6HkBmDH36MzijITq"
                }
            }
        }
    },
    "aggs": {
        "multi_buckets" : {
            "composite": {
                "sources": [
                    {
                        "host": {
                            "terms": {
                                "script": {
                                    "source": """
                                    String value = null;
                                    if (params == null || params._source == null || params._source.entity == null) {
                                        return "";
                                    }
                                    for (item in params._source.entity) {
                                        if (item['name'] == "host") {
                                            value = item['value'];
                                            break;
                                        }
                                    }
                                    return value;
                                    """,
                                    "lang": "painless"
                                }
                            }
                        }
                    },
                    {
                        "service": {
                            "terms": {
                                "script": {
                                    "source": """
                                    String value = null;
                                    if (params == null || params._source == null || params._source.entity == null) {
                                        return "";
                                    }
                                    for (item in params._source.entity) {
                                        if (item['name'] == "service") {
                                            value = item['value'];
                                            break;
                                        }
                                    }
                                    return value;
                                    """,
                                    "lang": "painless"
                                }
                            }
                        }
                    }
                ]
            },
            "aggregations": {
                "max": {
                    "max": {
                        "field": "anomaly_grade"
                    }
                },
                "multi_buckets_sort": {
                    "bucket_sort": {
                        "sort": [
                            { "max": { "order": "desc" } } 
                        ],
                        "size": 10                                
                    }
                }
            }
        }
    }

Related component

Search:Query Capabilities

To Reproduce

  1. Create an index with AD plugin result index mapping. Ref here
  2. Ingest the dummy data
  3. Run the query shared in the description above.

Test failing in the plugin has sample of the index mapping for entity field and logic to create the data which can be used to reproduce. Ref here

Expected behavior

The query should be successful

Additional Details

Plugins
Anomaly Detection

Screenshots
If applicable, add screenshots to help explain your problem.

Host/Environment (please complete the following information):

  • OS: [e.g. iOS]
  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

On looking more into this failure, there seems to be a race condition in the composite aggregation in general where the lookup instances is shared across slices. So this will have issues in all the composite aggregation path with concurrent segment search. We will need to disable the concurrent path with composite aggregation being used in the request to fall back to the non-concurrent path. The change will be 1 liner as we already have a mechanism to control it at per aggregation level.

In the description, can you plz share the exception and stack trace related to the race condition

@eirsep Will do

Took a closer look at this and it seems there are actually 2 separate issues related to concurrent segment search here:

  1. There's an AssertingCodec issue when global ordinals are used with composite aggs. This applies to both cases with and without sub-aggs and is caught by the test cases in CompositeAggIT. The problem here is DocValues are created on the index_searcher thread during collection but then re-used on the search thread during buildAggregations. I don't think there is necessarily a race condition here since buildAggregations will always happen after collection, even in cases of subAggs that use deferred collection. However, we are still using DocValues in a way that is not expected by Lucene so this requires a fix.
  2. When using an aggregation script on the _source field there is a race condition where the leafLookup is shared across threads and thus the created fieldReader can be modified from multiple threads concurrently. This mostly manifests as an AssertingCodec issue on the StoredFields object however if the asserting codec is not used it can also throw a CorruptIndexException if the race condition is hit. This can be found in the reproduction here: jed326@5a15d8e

Problem 1 should be a pretty straightforward fix that I will open a PR for, however problem 2 will require some more investigation. Depending on the outcome of that investigation one path forward could be to re-enable composite aggregations for concurrent segment search but only when a script is not used (or specifically when it is not scripting on the _source field).

Going to close this issue as it's been resolved by the following:

There's no issue with using the AD plugin today with concurrent segment search, the next item is to support scripting for composite aggs, which I will create a new issue to track.

@jed326 could you help me understand why/how the LeafSearchLookup is shared across multiple threads in concurrent segment search codepath?
LeafSearchLookup isn't threadsafe and that's the invariant.

SourceLookup::setSegmentAndDocument will get called from the IndexSearcher threads during concurrent search (I believe it comes through the ScriptDocValues), and then this fieldReader reference can get updated from multiple threads in that case.

fieldReader = lf.getSequentialStoredFieldsReader()::document;

Then in SourceLookup::loadSourceIfNeeded this fieldReader might not be the correct fieldReader:

fieldReader.accept(docId, sourceFieldVisitor);

And in the AggregationScript here:

this.leafLookup = lookup.getLeafSearchLookup(leafContext);

This is where that LeafSearchLookup is created

Shouldn't each thread creates its own SourceLookup instance? is there a reason why they need to share it?

@rishabhmaurya I need to refresh my memory on this, but I believe the problem is that there is only one instance of the AggregationScript class.

Here is the stack trace from investigating this issue:


WARNING: Uncaught exception in thread: Thread[opensearch[node_s5][search][T#1],5,TGRP-TemporaryIT]
java.lang.AssertionError: StoredFieldsReader are only supposed to be consumed in the thread in which they have been acquired. But was acquired in Thread[opensearch[node_s5][index_searcher][T#1],5,TGRP-TemporaryIT] and consumed in Thread[opensearch[node_s5][search][T#1],5,TGRP-TemporaryIT].
    at __randomizedtesting.SeedInfo.seed([8D650412C6C5AF37]:0)
    at org.apache.lucene.tests.codecs.asserting.AssertingCodec.assertThread(AssertingCodec.java:44)
    at org.apache.lucene.tests.codecs.asserting.AssertingStoredFieldsFormat$AssertingStoredFieldsReader.document(AssertingStoredFieldsFormat.java:75)
    at org.opensearch.search.lookup.SourceLookup.loadSourceIfNeeded(SourceLookup.java:104)
    at org.opensearch.script.AggregationScript.lambda$static$2(AggregationScript.java:77)
    at org.opensearch.script.DynamicMap.get(DynamicMap.java:84)
    at org.opensearch.painless.PainlessScript$Script.execute(String value = null; if (params == null || params._source == null || params._source.entity == null) { return ""; } for (item in params._source.entity) { if (item["name"] == "keyword-field") { value = item['value']; break; } } return value;:50)
    at org.opensearch.search.aggregations.support.values.ScriptBytesValues.advanceExact(ScriptBytesValues.java:71)
    at org.opensearch.search.aggregations.bucket.composite.BinaryValuesSource$1.collect(BinaryValuesSource.java:183)
    at org.opensearch.search.aggregations.LeafBucketCollector.collect(LeafBucketCollector.java:123)
    at org.opensearch.search.aggregations.bucket.composite.CompositeAggregator.runDeferredCollections(CompositeAggregator.java:671)
    at org.opensearch.search.aggregations.bucket.composite.CompositeAggregator.buildAggregations(CompositeAggregator.java:247)
    at org.opensearch.search.aggregations.Aggregator.buildTopLevel(Aggregator.java:194)
    at org.opensearch.search.aggregations.AggregationCollectorManager.reduce(AggregationCollectorManager.java:65)
    at org.opensearch.search.aggregations.AggregationCollectorManager.reduce(AggregationCollectorManager.java:28)
    at org.apache.lucene.search.MultiCollectorManager.reduce(MultiCollectorManager.java:73)
    at org.opensearch.search.query.QueryCollectorManagerContext$QueryCollectorManager.reduce(QueryCollectorManagerContext.java:41)
    at org.opensearch.search.query.QueryCollectorManagerContext$QueryCollectorManager.reduce(QueryCollectorManagerContext.java:27)
    at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:726)
    at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:692)
    at org.opensearch.search.query.ConcurrentQueryPhaseSearcher.searchWithCollectorManager(ConcurrentQueryPhaseSearcher.java:82)
    at org.opensearch.search.query.ConcurrentQueryPhaseSearcher.searchWithCollector(ConcurrentQueryPhaseSearcher.java:52)
    at org.opensearch.search.query.QueryPhase$DefaultQueryPhaseSearcher.searchWith(QueryPhase.java:427)
    at org.opensearch.search.query.QueryPhaseSearcherWrapper.searchWith(QueryPhaseSearcherWrapper.java:58)
    at org.opensearch.search.query.QueryPhase.executeInternal(QueryPhase.java:282)
    at org.opensearch.search.query.QueryPhase.execute(QueryPhase.java:155)
    at org.opensearch.search.SearchService.loadOrExecuteQueryPhase(SearchService.java:547)
    at org.opensearch.search.SearchService.executeQueryPhase(SearchService.java:611)
    at org.opensearch.search.SearchService$2.lambda$onResponse$0(SearchService.java:580)
    at org.opensearch.action.ActionRunnable.lambda$supply$0(ActionRunnable.java:74)
    at org.opensearch.action.ActionRunnable$2.doRun(ActionRunnable.java:89)
    at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
    at org.opensearch.threadpool.TaskAwareRunnable.doRun(TaskAwareRunnable.java:78)
    at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
    at org.opensearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:59)
    at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:913)
    at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)