marqo-ai/marqo

[BUG] Model Cache / vectorise error when client parallel indexing

vicilliar opened this issue · 2 comments

Describe the bug
A 500 error is received a few minutes through trying to index the simplewiki dataset with the hf/e5-base model with multiple client threads.

Error Messages
From the client terminal:

Reprint test parameters: Namespace(ef_construction=128, m=16, dataset='simplewiki', model='hf/e5-base', device='cuda', os_nodes=3, marqo_ip='34.201.146.141', marqo_tag='0.0.21')
Indexing finished. Total indexing time: 41.96356773376465
Final index stats: {'numberOfDocuments': 487}
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: {'message': "Request rejected, as this request attempted to update the model cache, while another request was updating the model cache at the same time.\n Please wait for 10 seconds and send the request again.\n Marqo's documentation can be found here: `https://docs.marqo.ai/latest/`", 'code': 'model_cache_management_error', 'type': 'invalid_request', 'link': None}
status_code: 409, type: invalid_request, code: model_cache_management_error, link:
MarqoWebError: MarqoWebError Error message: Internal Server Error
status_code: 500, type: unhandled_error_type, code: unhandled_error, link:
sh-4.2$ python3

From the marqo terminal:

INFO:     34.230.156.67:4595 - "POST /indexes/recall-marqo0.0.21-simplewiki-hf.e5-base-m16-ef128/documents?refresh=false&device=cuda&use_existing_tensors=false&non_tensor_fields=docDate&non_tensor_fields=domain&non_tensor_fields=url HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/uvicorn/protocols/http/httptools_impl.py", line 435, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/usr/local/lib/python3.8/dist-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/usr/local/lib/python3.8/dist-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.8/dist-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/usr/local/lib/python3.8/dist-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 275, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.8/dist-packages/starlette/routing.py", line 65, in app
    response = await func(request)
  File "/usr/local/lib/python3.8/dist-packages/fastapi/routing.py", line 235, in app
    raw_response = await run_endpoint_function(
  File "/usr/local/lib/python3.8/dist-packages/fastapi/routing.py", line 163, in run_endpoint_function
    return await run_in_threadpool(dependant.call, **values)
  File "/usr/local/lib/python3.8/dist-packages/starlette/concurrency.py", line 41, in run_in_threadpool
    return await anyio.to_thread.run_sync(func, *args)
  File "/usr/local/lib/python3.8/dist-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.8/dist-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.8/dist-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/app/src/marqo/tensor_search/throttling/redis_throttle.py", line 87, in wrapper
    raise e
  File "/app/src/marqo/tensor_search/throttling/redis_throttle.py", line 83, in wrapper
    result = function(*args, **kwargs)
  File "/app/src/marqo/tensor_search/api.py", line 187, in add_or_replace_documents
    return tensor_search.add_documents_orchestrator(
  File "/app/src/marqo/tensor_search/tensor_search.py", line 245, in add_documents_orchestrator
    return add_documents(config=config, add_docs_params=add_docs_params)
  File "/app/src/marqo/tensor_search/tensor_search.py", line 582, in add_documents
    vector_chunks = s2_inference.vectorise(
  File "/app/src/marqo/s2_inference/s2_inference.py", line 76, in vectorise
    raise RuntimeError(f"Vectorise created an empty list of batches! Content: {content}")
RuntimeError: Vectorise created an empty list of batches! Content: []

To Reproduce
Steps to reproduce the behavior:

  1. Run a marqo instance (0.0.21 tag)
  • AMI: Deep Learning AMI Neuron PyTorch 1.13 (Ubuntu 20.04) 20230613
    Quickstart one: ami-000c1abd6dad09158
    TCP Port 8882 exposed
    g4dn.2xlarge
    200 GiB gp3 Storage
  1. Run a managed OpenSearch instance
  • OpenSearch 2.5
    1-AZ
    r6g.2xlarge.search
    32 GiB Memory
    3 nodes, 300 EBS storage per node
    Public access
    Access Policy - JSON configure
    Auto-Tune Off
    No Automatic software update
  1. Run a Sagemaker client

  2. Run the following multithreaded indexing script

index_settings = {
    "index_defaults": {
        "treat_urls_and_pointers_as_images": False if args.dataset == "simplewiki" else True,
        "model": args.model,
        "normalize_embeddings": True,

        "ann_parameters" : {
            "space_type": "cosinesimil",
            "parameters": {
                "ef_construction": args.ef_construction,
                "m": args.m
            }
        }
    },
    "number_of_shards": args.os_nodes,
    "number_of_replicas": 0
}

try:
    mq.create_index(index_name, settings_dict=index_settings)
    print(f"Successfully created index {index_name}!")

except Exception as e:
    print(f"ERROR CREATING INDEX: {e}")
    print(f"Index {index_name} already exists! Skipping creation.")

print("Confirming index settings are: ")
pprint.pprint(mq.index(index_name).get_settings())

#####################################################
### STEP 3. indexing with marqo
#####################################################

# TODO: track add docs throughput

# Add documents in parallel

def send_docs(docs, error_queue):
    try:
        responses = mq.index(index_name).add_documents(docs, device=args.device, client_batch_size=10, non_tensor_fields=["docDate", "domain", "url"])
    except Exception as e:
        error_queue.put(e)

t0 = time.time()

THREAD_COUNT = 7
split_size = math.ceil(len(data)/THREAD_COUNT)
splits = [data[i: i + split_size] for i in range(0, len(data), split_size)]
allocation_dict = {
    i: splits[i]
    for i in range(THREAD_COUNT)
}

error_queue = queue.Queue()
threads = [threading.Thread(target=send_docs, args=(split_, error_queue)) for split_ in splits]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

Expected behavior
This error should not occur.

Additional context
Add any other context about the problem here.
image
image
image

Models endpoint after the bug occurred:

{'models': [{'model_name': 'hf/all_datasets_v4_MiniLM-L6', 'model_device': 'cpu'}, {'model_name': 'hf/all_datasets_v4_MiniLM-L6', 'model_device': 'cuda'}, {'model_name': 'ViT-L/14', 'model_device': 'cpu'}, {'model_name': 'ViT-L/14', 'model_device': 'cuda'}, {'model_name': 'hf/e5-base', 'model_device': 'cuda'}]}

500 error issue split into separate issue: #520