/fastapi-elasticsearch

Primary LanguagePythonGNU Lesser General Public License v2.1LGPL-2.1

Query Helper for Elasticsearch or OpenSearch

Pypi

This is a helper library for creating elasticsearch or opensearch query proxies using FastAPI.

from fastapi_elasticsearch import ElasticsearchAPIQueryBuilder

# Create a new query_builder for the endpoint.
query_builder = ElasticsearchAPIQueryBuilder()

# Decorate a function as a filter.
# The filter can declare parameters.
@query_builder.filter()
def filter_category(c: Optional[List[str]] = Query([],
                                             description="Category name to filter results.")):
    return {
        "terms": {
            "category": c
        }
    } if len(c) > 0 else None

# Then use the query_builder in your endpoint.
@app.get("/search")
async def search(
        es: Elasticsearch = Depends(get_elasticsearch),
        query_body: Dict = Depends(query_builder.build())) -> JSONResponse:
    return es.search(
        body=query_body,
        index=index_name
    )

The swagger API will result in:

Category Filter

The resulting query will be like this:

{
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "category": [
              "the-category"
            ]
          }
        }
      ]
    }
  },
  "size": 10,
  "from": 0
}

To use OpenSearch, simply change the client.

from fastapi_elasticsearch import ElasticsearchAPIQueryBuilder
from opensearchpy import OpenSearch

...

@app.get("/search")
async def search(
        os: OpenSearch = Depends(get_opensearch),
        query_body: Dict = Depends(query_builder.build())) -> JSONResponse:
    return os.search(
        body=query_body,
        index=index_name
    )
...

# Create a new query_builder for the endpoint.
query_builder = ElasticsearchAPIQueryBuilder()

To control the scoring use a matcher.

# Or decorate a function as a matcher
# (will contribute to the query scoring).
# Parameters can also be used.
@query_builder.matcher()
def match_fields(q: Optional[str] = Query(None)):
    return {
        "multi_match": {
            "query": q,
            "fuzziness": "AUTO",
            "fields": [
                "name^2",
                "description"
            ]
        }
    } if q is not None else None

The swagger API will result in:

Category Filter

The resulting query will be like this:

{
  "query": {
    "bool": {
      "should": [
        {
          "multi_match": {
            "query": "bob",
            "fuzziness": "AUTO",
            "fields": [
              "name^2",
              "description"
            ]
          }
        }
      ],
      "minimum_should_match": 1
    }
  },
  "size": 10,
  "from": 0
}

To control the ordering, it is possible to annotate a function as sorter.

class Direction(str, Enum):
    asc = "asc"
    desc = "desc"

# Decorate a function as a sorter.
# Parameters can be declared.
@query_builder.sorter()
def sort_by(direction: Optional[Direction] = Query(None)):
    return {
        "name": direction
    } if direction is not None else None

The swagger API will result in:

Category Filter

The resulting query will be like this:

{
  "query": {
    "match_all": {}
  },
  "size": 10,
  "from": 0,
  "sort": [
    {
      "name": "asc"
    }
  ]
}

To add highlight functionality, it is possible to annotate a function as highlighter.

# Decorate a function as a highlighter.
# Parameters can also be declared.
@query_builder.highlighter()
def highlight(q: Optional[str] = Query(None,
                                       description="Query to match the document text."),
              h: bool = Query(False,
                              description="Highlight matched text and inner hits.")):
    return {
        "name": {
            "fragment_size": 256,
            "number_of_fragments": 1
        }
    } if q is not None and h else None

The swagger API will result in:

Category Filter

The resulting query will be like this:

{
  "query": {
    "bool": {
      "should": [
        {
          "multi_match": {
            "query": "bob",
            "fuzziness": "AUTO",
            "fields": [
              "name^2"
            ]
          }
        }
      ],
      "minimum_should_match": 1
    }
  },
  "size": 10,
  "from": 0,
  "highlight": {
    "fields": {
      "name": {
        "fragment_size": 256,
        "number_of_fragments": 1
      }
    }
  }
}

Now, a complete example:

app = FastAPI()

query_builder = ElasticsearchAPIQueryBuilder()


@query_builder.filter()
def filter_items():
    return {
        "term": {
            "join_field": "item"
        }
    }


@query_builder.filter()
def filter_category(c: Optional[List[str]] = Query([],
                                                   description="Category name to filter results.")):
    return {
        "terms": {
            "category": c
        }
    } if len(c) > 0 else None


@query_builder.matcher()
def match_fields(q: Optional[str] = Query(None,
                                          description="Query to match the document text.")):
    return {
        "multi_match": {
            "query": q,
            "fuzziness": "AUTO",
            "fields": [
                "name^2",
            ]
        }
    } if q is not None else None


@query_builder.matcher()
def match_fragments(q: Optional[str] = Query(None,
                                             description="Query to match the document text."),
                    h: bool = Query(False,
                                    description="Highlight matched text and inner hits.")):
    if q is not None:
        matcher = {
            "has_child": {
                "type": "fragment",
                "score_mode": "max",
                "query": {
                    "bool": {
                        "minimum_should_match": 1,
                        "should": [
                            {
                                "match": {
                                    "content": {
                                        "query": q,
                                        "fuzziness": "auto"
                                    }
                                }
                            },
                            {
                                "match_phrase": {
                                    "content": {
                                        "query": q,
                                        "slop": 3,
                                        "boost": 50
                                    }
                                }
                            },
                        ]
                    }
                }
            }
        }
        if h:
            matcher["has_child"]["inner_hits"] = {
                "size": 1,
                "_source": "false",
                "highlight": {
                    "fields": {
                        "content": {
                            "fragment_size": 256,
                            "number_of_fragments": 1
                        }
                    }
                }
            }
        return matcher
    else:
        return None


class Direction(str, Enum):
    asc = "asc"
    desc = "desc"


@query_builder.sorter()
def sort_by(direction: Optional[Direction] = Query(None)):
    return {
        "name": direction
    } if direction is not None else None


@query_builder.highlighter()
def highlight(q: Optional[str] = Query(None,
                                       description="Query to match the document text."),
              h: bool = Query(False,
                              description="Highlight matched text and inner hits.")):
    return {
        "name": {
            "fragment_size": 256,
            "number_of_fragments": 1
        }
    } if q is not None and h else None


@app.get("/search")
async def search(query_body: Dict = Depends(query_builder.build())) -> JSONResponse:
    return es.search(
        body=query_body,
        index=index_name
    )

Also it is possible to customize the generated query body using the decorator search_builder.

from typing import List, Dict

@query_builder.search_builder()
def build_search_body(size: int = 10,
                      start_from: int = 0,
                      source: Union[List, Dict, str] = None,
                      minimum_should_match: int = 1,
                      filters: List[Dict] = [],
                      matchers: List[Dict] = [],
                      highlighters: List[Dict] = [],
                      sorters: List[Dict] = []) -> Dict:
    return {
        "query": {
            ...
        },
        ...
    }

Adopt this project: if you like and want to adopt it, talk to me.