[Bug]: ElasticSearch : Timeout context manager should be used inside a task
FlorentGrenier opened this issue · 22 comments
Bug Description
I'm developing a chatbot, and on a second request sent the bug appears
- I have llama-index installed in the conda environment with Pyhton 3.12.3.
- I have streamlit 1.36.0 for UI.
- I have elastic-transport 8.13.1, elasticsearch 8.14.0 and llama-index-veco-stores-elasticsearch 0.2.0
I opened an issue on the github of the llama_index library, but apparently the bug comes more from the elastic search library.
I opened a post on elastic search forum
Version
elasticsearch : 8.14.0
elastic-transport : 8.13.1
Steps to Reproduce
With llama_index, send a second query to the RetrieverQueryEngine, built from a VectorIndexRetriever, a VectorStoreIndex and an ElasticSearchVectorStore.
Relevant Logs/Tracbacks
Traceback (most recent call last):
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\streamlit\runtime\scriptrunner\script_runner.py", line 589, in _run_script
exec(code, module.__dict__)
File "C:\data\git\.......\streamlit_app.py", line 52, in <module>
response = response_generator.chat(user_query=prompt)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\data\git\.......\src\components\response_synthesis.py", line 84, in chat
content = self.build_context_prompt(self.retriever(user_query=user_query))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\data\git\.......\src\components\response_synthesis.py", line 61, in retriever
retrieved_nodes = retriever.retrieve(user_query)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 230, in wrapper
result = func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\base\base_retriever.py", line 243, in retrieve
nodes = self._retrieve(query_bundle)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 230, in wrapper
result = func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 101, in _retrieve
return self._get_nodes_with_embeddings(query_bundle)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 177, in _get_nodes_with_embeddings
query_result = self._vector_store.query(query, **self._kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 412, in query
return asyncio.get_event_loop().run_until_complete(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
return f.result()
^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\futures.py", line 203, in result
raise self._exception.with_traceback(self._exception_tb)
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\tasks.py", line 314, in __step_run_and_handle_result
result = coro.send(None)
^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 452, in aquery
hits = await self._store.search(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\helpers\vectorstore\_async\vectorstore.py", line 277, in search
response = await self.client.search(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\__init__.py", line 4121, in search
return await self.perform_request( # type: ignore[return-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 271, in perform_request
response = await self._perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 316, in _perform_request
meta, resp_body = await self.transport.perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_async_transport.py", line 264, in perform_request
resp = await node.perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_node\_http_aiohttp.py", line 179, in perform_request
async with self.session.request(
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
self._resp = await self._coro
^^^^^^^^^^^^^^^^
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 507, in _request
with timer:
File "C:\Users\...\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a taskIs there any solution or conclusion here? I run into the same problem when using llama-index
Ran into same issue while using llama-index
@FlorentGrenier try running the application using uvicorn/gunicorn rather than python app.py, it gives the error but works
@carlyrichmond any solution ?
Hi @FlorentGrenier,
I'm not a Python expert, so I'll defer to our engineers on this one. I do see one commenter suggesting this is an issue with the elasticsearch-py library. However, looking at the issue you raised on the LlamaIndex repo, there are some suggestions shared for you to check in this comment. Can you confirm those have been actioned?
@FlorentGrenier Hi! I'm not super familiar with the LlamaIndex integration for Elasticsearch, but the issue that you have occurs because objects associated with two different async loops are getting mixed up.
The reason a second loop is being used is that you are invoking a synchronous retriever interface. That forces LlamaIndex to internally create a second async loop to run the Elasticsearch code on. Once again I apologize about not being super familiar with LlamaIndex, but it does look like most query functions have an async version, which usually starts with "a", such as aretrieve(), aquery() and so on. If you were to use async versions of the Elasticsearch integration functions I think it would eliminate the need to create a second loop. If possible, give that a try and report back if that helps or we need to keep looking.
Hi @FlorentGrenier,
I'm not a Python expert, so I'll defer to our engineers on this one. I do see one commenter suggesting this is an issue with the
elasticsearch-pylibrary. However, looking at the issue you raised on the LlamaIndex repo, there are some suggestions shared for you to check in this comment. Can you confirm those have been actioned?
Hi @carlyrichmond, Yes I actioned the solution, but it didn't solve my problem
@FlorentGrenier Hi! I'm not super familiar with the LlamaIndex integration for Elasticsearch, but the issue that you have occurs because objects associated with two different async loops are getting mixed up.
The reason a second loop is being used is that you are invoking a synchronous retriever interface. That forces LlamaIndex to internally create a second async loop to run the Elasticsearch code on. Once again I apologize about not being super familiar with LlamaIndex, but it does look like most query functions have an async version, which usually starts with "a", such as
aretrieve(),aquery()and so on. If you were to use async versions of the Elasticsearch integration functions I think it would eliminate the need to create a second loop. If possible, give that a try and report back if that helps or we need to keep looking.
Hi @miguelgrinberg, LlamaIndex does indeed offer asynchronous query functions, and following a solution provided in the LlamaIndex issue, I'm using the achat() method, but unfortunately this doesn't solve the problem.
Here's some more code, for a better context :
Streamlit part
if st.session_state.messages[-1]["role"] == "user":
with st.chat_message("assistant"):
with st.spinner("Je recherche dans mes documents..."):
response = asyncio.run(response_generator.chat_async(user_query=prompt, stream=False))
response_chat = st.write(response.response)
source_badges = generate_source_badges(Utils.get_sources(response.source_nodes))
st.markdown(source_badges)
st.session_state.messages.append({"role": "assistant", "content": response_chat, "source": source_badges})Response synthesis part
def load_es_index(self) :
async_client = AsyncElasticsearch(
hosts=[Configs.elastic_url],
basic_auth=(Configs.elastic_username, Configs.elastic_password)
)
vector_store = ElasticsearchStore(
index_name=Configs.index_name,
es_url=Configs.elastic_url,
es_client=async_client
)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store,embed_model=Settings.embed_model)
self.index = index
def chat_async(self, user_query, similarity_top_k, stream ):
memory = ChatMemoryBuffer.from_defaults(token_limit=3000)
system_mesage = [
ChatMessage(role=MessageRole.SYSTEM, content= Utils.DEFAULT_SYSTEM_PROMPT + Utils.DEFAULT_PREFIX_PROMPT)
]
chat_engine = self.index.as_chat_engine(
chat_mode="context",
memory=memory,
llm=Settings.llm,
system_prompt = system_mesage,
context_template=Utils.DEFAULT_CONTEXT_PROMPT,
verbose=True
)
# achat() from llama-index
response = chat_engine.achat(user_query)
return response@FlorentGrenier your chat_async() function should be defined as async def. And the call to chat_engine.achat() should be preceded with await. You will likely have additional changes to make as a result of changing the chat_async() function from sync to async.
@miguelgrinberg I've already tried this but nothing changes
@FlorentGrenier Please provide an updated stack trace if you are still getting errors.
@miguelgrinberg Here :
Traceback (most recent call last):
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\streamlit\runtime\scriptrunner\script_runner.py", line 589, in _run_script
exec(code, module.__dict__)
File "C:\data\git\***\streamlit_app.py", line 66, in <module>
response = asyncio.run(response_generator.chat_async(user_query=prompt, stream=False))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 30, in run
return loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
return f.result()
^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\futures.py", line 203, in result
raise self._exception.with_traceback(self._exception_tb)
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\tasks.py", line 314, in __step_run_and_handle_result
result = coro.send(None)
^^^^^^^^^^^^^^^
File "C:\data\git\***\src\components\response_synthesis.py", line 177, in chat_async
response = await chat_engine.achat(user_query)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\callbacks\utils.py", line 56, in async_wrapper
return await func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 236, in achat
context_str_template, nodes = await self._agenerate_context(message)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 124, in _agenerate_context
nodes = await self._retriever.aretrieve(message)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\base\base_retriever.py", line 274, in aretrieve
nodes = await self._aretrieve(query_bundle=query_bundle)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 112, in _aretrieve
return await self._aget_nodes_with_embeddings(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 184, in _aget_nodes_with_embeddings
query_result = await self._vector_store.aquery(query, **self._kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 452, in aquery
hits = await self._store.search(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\helpers\vectorstore\_async\vectorstore.py", line 277, in search
response = await self.client.search(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\__init__.py", line 4121, in search
return await self.perform_request( # type: ignore[return-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 271, in perform_request
response = await self._perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 316, in _perform_request
meta, resp_body = await self.transport.perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_async_transport.py", line 264, in perform_request
resp = await node.perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_node\_http_aiohttp.py", line 179, in perform_request
async with self.session.request(
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
self._resp = await self._coro
^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 507, in _request
with timer:
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
The streamlit app is already asynchronous. With the changes you've made, you just moved the place in which the 2nd loop is created higher in the stack trace. You are now doing it yourself in this line:
response = asyncio.run(response_generator.chat_async(user_query=prompt, stream=False))What you need to do to avoid this issue is to use a fully async application. That likely means converting the function that has the line of code above to async, so that you can just use await response_generator.chat_async(...). You may also need to convert to async other functions that call this one, all the way up the call stack until you reach streamlit. What you want is to use the streamlit async loop, not your own secondary one.
I try this (complete code of my streamlit app)
import streamlit as st
#sys.path.append('C:/data/git/***/src')
from src.components.response_synthesis import ResponseSynthesis
from src.utils.utils import Utils
import time
from streamlit_pills import pills
import asyncio
import nest_asyncio
nest_asyncio.apply()
st.set_page_config(page_title="AthenAI", page_icon='🤖', layout="centered", initial_sidebar_state="auto", menu_items=None)
@st.cache_resource(show_spinner=False)
def load_data():
with st.spinner(text="Chargement des documents. Cela devrait prendre 1 à 2 minutes."):
response_generator = ResponseSynthesis(local_index=False)
return response_generator
def generate_source_badges(sources):
badges = ""
for file_name, source_info in sources.items():
badges += f" **Fichier**: `{file_name}` | **Pages**: `{source_info['page']}` | **Dernière modification**: `{source_info['last_modified']}` \n"
return badges
def response_generator_stream(response_chat):
for word in response_chat.split():
yield word + " "
time.sleep(0.05)
async def main():
response_generator = load_data()
st.title("AthenAI 💬")
with st.expander("Information"):
st.info("Version de démonstration", icon="ℹ️")
st.markdown('''
Sa base de connaissance est constitué de 4 documents :
- Un document sur le contrat d'apprentissage
- Un guide CFA
- Un document sur ParcourSup
- Un document sur la signature electronique
''')
if "messages" not in st.session_state.keys():
st.session_state.messages = [
{"role": "assistant", "content": "Posez moi une question sur la documentation de Val!"}
]
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
if "source" in message.keys():
st.markdown(message["source"])
if prompt := st.chat_input("Votre question"):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
if st.session_state.messages[-1]["role"] == "user":
with st.chat_message("assistant"):
with st.spinner("Je recherche dans mes documents..."):
response = await response_generator.chat_async(user_query=prompt, stream=False)
response_chat = response.response
st.write(response_chat)
source_badges = generate_source_badges(Utils.get_sources(response.source_nodes))
st.markdown(source_badges)
st.session_state.messages.append({"role": "assistant", "content": response_chat, "source": source_badges})
if __name__ == '__main__':
asyncio.run(main())Trace :
RuntimeError: Timeout context manager should be used inside a task
Traceback:
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\streamlit\runtime\scriptrunner\script_runner.py", line 589, in _run_script
exec(code, module.__dict__)
File "C:\data\git\***\streamlit_app.py", line 78, in <module>
asyncio.run(main())
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 30, in run
return loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
return f.result()
^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\futures.py", line 203, in result
raise self._exception.with_traceback(self._exception_tb)
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\tasks.py", line 314, in __step_run_and_handle_result
result = coro.send(None)
^^^^^^^^^^^^^^^
File "C:\data\git\***\streamlit_app.py", line 66, in main
response = await response_generator.chat_async(user_query=prompt, stream=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\data\git\***\src\components\response_synthesis.py", line 177, in chat_async
response = await chat_engine.achat(user_query)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\callbacks\utils.py", line 56, in async_wrapper
return await func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 236, in achat
context_str_template, nodes = await self._agenerate_context(message)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 124, in _agenerate_context
nodes = await self._retriever.aretrieve(message)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\base\base_retriever.py", line 274, in aretrieve
nodes = await self._aretrieve(query_bundle=query_bundle)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 112, in _aretrieve
return await self._aget_nodes_with_embeddings(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 184, in _aget_nodes_with_embeddings
query_result = await self._vector_store.aquery(query, **self._kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 452, in aquery
hits = await self._store.search(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\helpers\vectorstore\_async\vectorstore.py", line 277, in search
response = await self.client.search(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\__init__.py", line 4121, in search
return await self.perform_request( # type: ignore[return-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 271, in perform_request
response = await self._perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 316, in _perform_request
meta, resp_body = await self.transport.perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_async_transport.py", line 264, in perform_request
resp = await node.perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_node\_http_aiohttp.py", line 179, in perform_request
async with self.session.request(
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
self._resp = await self._coro
^^^^^^^^^^^^^^^^
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 507, in _request
with timer:
File "C:\Users\***\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
raise RuntimeError(
@FlorentGrenier remove all references to nest_asyncio from your application and try again.
Done, but still the same error with the same trace... 😅
It can't be the same stack trace if you removed nest_asyncio... Maybe similar, but not identical. Can you please share it? I don't have a test application to use to debug this, so I need to see the stack trace.
Traceback (most recent call last):
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\streamlit\runtime\scriptrunner\script_runner.py", line 589, in _run_script
exec(code, module.__dict__)
File "C:\data\git\AthenIA\streamlit_app.py", line 76, in <module>
asyncio.run(main())
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 30, in run
return loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
return f.result()
^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\futures.py", line 203, in result
raise self._exception.with_traceback(self._exception_tb)
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\asyncio\tasks.py", line 314, in __step_run_and_handle_result
result = coro.send(None)
^^^^^^^^^^^^^^^
File "C:\data\git\AthenIA\streamlit_app.py", line 64, in main
response = await response_generator.chat_async(user_query=prompt, stream=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\data\git\AthenIA\src\components\response_synthesis.py", line 175, in chat_async
response = await chat_engine.achat(user_query)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\callbacks\utils.py", line 56, in async_wrapper
return await func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 236, in achat
context_str_template, nodes = await self._agenerate_context(message)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\chat_engine\context.py", line 124, in _agenerate_context
nodes = await self._retriever.aretrieve(message)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\base\base_retriever.py", line 274, in aretrieve
nodes = await self._aretrieve(query_bundle=query_bundle)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\instrumentation\dispatcher.py", line 255, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 112, in _aretrieve
return await self._aget_nodes_with_embeddings(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\core\indices\vector_store\retrievers\retriever.py", line 184, in _aget_nodes_with_embeddings
query_result = await self._vector_store.aquery(query, **self._kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\llama_index\vector_stores\elasticsearch\base.py", line 452, in aquery
hits = await self._store.search(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\helpers\vectorstore\_async\vectorstore.py", line 277, in search
response = await self.client.search(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\__init__.py", line 4121, in search
return await self.perform_request( # type: ignore[return-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 271, in perform_request
response = await self._perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elasticsearch\_async\client\_base.py", line 316, in _perform_request
meta, resp_body = await self.transport.perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_async_transport.py", line 264, in perform_request
resp = await node.perform_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\elastic_transport\_node\_http_aiohttp.py", line 179, in perform_request
async with self.session.request(
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 1197, in __aenter__
self._resp = await self._coro
^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\client.py", line 507, in _request
with timer:
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\aiohttp\helpers.py", line 715, in __enter__
raise RuntimeError(
RuntimeError: Timeout context manager should be used inside a task
You are still using nest_asyncio
Traceback (most recent call last):
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\streamlit\runtime\scriptrunner\script_runner.py", line 589, in _run_script
exec(code, module.__dict__)
File "C:\data\git\AthenIA\streamlit_app.py", line 76, in <module>
asyncio.run(main())
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 30, in run
return loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\fgr\AppData\Local\anaconda3\envs\llama-index\Lib\site-packages\nest_asyncio.py", line 98, in run_until_complete
return f.result()
^^^^^^^^^^
I'm really sorry, but I don't understand where....
streamlit_app.py
import streamlit as st
#sys.path.append('C:/data/git/***/src')
from src.components.response_synthesis import ResponseSynthesis
from src.utils.utils import Utils
import time
from streamlit_pills import pills
import asyncio
st.set_page_config(page_title="AthenAI", page_icon='🤖', layout="centered", initial_sidebar_state="auto", menu_items=None)
@st.cache_resource(show_spinner=False)
def load_data():
with st.spinner(text="Chargement des documents. Cela devrait prendre 1 à 2 minutes."):
response_generator = ResponseSynthesis(local_index=False)
return response_generator
def generate_source_badges(sources):
badges = ""
for file_name, source_info in sources.items():
badges += f" **Fichier**: `{file_name}` | **Pages**: `{source_info['page']}` | **Dernière modification**: `{source_info['last_modified']}` \n"
return badges
def response_generator_stream(response_chat):
for word in response_chat.split():
yield word + " "
time.sleep(0.05)
async def main():
response_generator = load_data()
st.title("AthenAI 💬")
with st.expander("Information"):
st.info("Version de démonstration", icon="ℹ️")
st.markdown('''
Sa base de connaissance est constitué de 4 documents :
- Un document sur le contrat d'apprentissage
- Un guide CFA
- Un document sur ParcourSup
- Un document sur la signature electronique
''')
if "messages" not in st.session_state.keys():
st.session_state.messages = [
{"role": "assistant", "content": "Posez moi une question sur la documentation de Val!"}
]
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
if "source" in message.keys():
st.markdown(message["source"])
if prompt := st.chat_input("Votre question"):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
if st.session_state.messages[-1]["role"] == "user":
with st.chat_message("assistant"):
with st.spinner("Je recherche dans mes documents..."):
response = await response_generator.chat_async(user_query=prompt, stream=False)
response_chat = response.response
st.write(response_chat)
source_badges = generate_source_badges(Utils.get_sources(response.source_nodes))
st.markdown(source_badges)
st.session_state.messages.append({"role": "assistant", "content": response_chat, "source": source_badges})
if __name__ == '__main__':
asyncio.run(main())response_synthesis.py
from elasticsearch import AsyncElasticsearch
from llama_index.core.chat_engine.context import ContextChatEngine
from llama_index.core.chat_engine.simple import SimpleChatEngine
from llama_index.vector_stores.elasticsearch import ElasticsearchStore
from llama_index.core.prompts.prompt_type import PromptType
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.ollama import Ollama
from llama_index.core import (
VectorStoreIndex, PromptTemplate, StorageContext,
load_index_from_storage, Settings, get_response_synthesizer
)
from llama_index.core.tools import FunctionTool
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.llms import LLM, ChatMessage, MessageRole
from typing import List, Tuple
from llama_index.core.base.response.schema import RESPONSE_TYPE, NodeWithScore
from src.variables.config import Configs
from src.utils.utils import Utils
import os
import asyncio
class ResponseSynthesis:
def __init__(self, local_index: bool) -> None :
Settings.embed_model = HuggingFaceEmbedding(model_name=Configs.embed_model, embed_batch_size=Configs.batch_size)
Settings.llm = Ollama(model=Configs.model, request_timeout=1000)
if local_index:
self.load_local_index()
else:
self.load_es_index()
def load_es_index(self) -> None :
async_client = AsyncElasticsearch(
hosts=[Configs.elastic_url],
basic_auth=(Configs.elastic_username, Configs.elastic_password)
)
try:
vector_store = ElasticsearchStore(
index_name=Configs.index_name,
es_url=Configs.elastic_url,
es_client=async_client
)
self.index = VectorStoreIndex.from_vector_store(vector_store=vector_store, embed_model=Settings.embed_model)
finally:
async_client.transport.close()
def load_local_index(self) -> None:
if os.path.exists(Configs.local_index_dir):
storage_context = StorageContext.from_defaults(persist_dir=Configs.local_index_dir)
index = load_index_from_storage(storage_context)
self.index = index
else:
raise FileNotFoundError(f"Le dossier de stockage d'index '{Configs.local_index_dir}' n'existe pas. Veuillez d'abord exécuter le script d'indexation.")
def query(self, user_query: str, similarity_top_k: int=Configs.similarity_top_k) -> RESPONSE_TYPE:
query_engine = self.index.as_query_engine(similarity_top_k=similarity_top_k, llm=Settings.llm)
query_engine.update_prompts(
{"response_synthesizer:text_qa_template": self.build_context_prompt()}
)
query_response = query_engine.query(user_query)
return query_response
def retriever(self, user_query: str, similarity_top_k: int = Configs.similarity_top_k) -> List[NodeWithScore] :
retriever = self.index.as_retriever(similarity_top_k=similarity_top_k)
retrieved_nodes = retriever.retrieve(user_query)
return retrieved_nodes
def chat(self, user_query: str, similarity_top_k: int = Configs.similarity_top_k, stream: bool = True):
memory = ChatMemoryBuffer.from_defaults(token_limit=3000)
system_mesage = [
ChatMessage(role=MessageRole.SYSTEM, content= Utils.DEFAULT_SYSTEM_PROMPT + Utils.DEFAULT_PREFIX_PROMPT)
]
chat_engine = self.index.as_chat_engine(
chat_mode="context",
memory=memory,
llm=Settings.llm,
system_prompt = system_mesage,
context_template=Utils.DEFAULT_CONTEXT_PROMPT,
verbose=True
)
if stream :
response = chat_engine.stream_chat(user_query)
else :
response = chat_engine.chat(user_query)
return response
async def chat_async(self, user_query: str, similarity_top_k: int = Configs.similarity_top_k, stream: bool = True):
memory = ChatMemoryBuffer.from_defaults(token_limit=3000)
system_mesage = [
ChatMessage(role=MessageRole.SYSTEM, content= Utils.DEFAULT_SYSTEM_PROMPT + Utils.DEFAULT_PREFIX_PROMPT)
]
# my_retriever = self.index.as_retriever(similarity_top_k=similarity_top_k)
chat_engine = self.index.as_chat_engine(
chat_mode="context",
memory=memory,
llm=Settings.llm,
system_prompt = system_mesage,
context_template=Utils.DEFAULT_CONTEXT_PROMPT,
verbose=True
)
response = await chat_engine.achat(user_query)
return response
Okay. I'm not exactly sure, but you do have some sync code, maybe that is causing a second loop to run.
In any case, here is another possible workaround. The issues you are having are caused by aiohttp (low-level HTTP client) not liking that there are two active loops in the same app. You can try switching from aiohttp to httpx as HTTP client to see if that maybe avoids the issue. In the load_es_index(), try this change:
async_client = AsyncElasticsearch(
hosts=[Configs.elastic_url],
basic_auth=(Configs.elastic_username, Configs.elastic_password),
node_class='httpxasync' # <--- add this argument
)Also make sure you have the httpx package installed on your virtualenv.
I'm thinking if that does not work I'm going to have to invest some time and build a small streamlit + llama-index app to test this myself, so let me know. It may take me a couple of weeks, but I can look into it if httpx does not solve your issue.
Hey ! I tested your solution with httpx and it solves my problem, thank you !
Closing as it appears your issue here was solved. Thank you!
