Need to handle AstraDB timeouts
devinbost opened this issue · 1 comments
devinbost commented
I got this AstraDB exception:
17:34:53.524 [main] INFO a.l.a.v.c.CassandraAssetsManagerProvider -- Executing: CREATE TABLE IF NOT EXISTS "doc_qa_demos"."chatbot2" (
filename TEXT,
chunk_text_length TEXT,
chunk_num_tokens TEXT,
chunk_id TEXT,
contents TEXT,
name TEXT,
embeddings_vector VECTOR<FLOAT, 1536>,
PRIMARY KEY (filename, chunk_id));
17:34:57.982 [main] INFO a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-vector-agents-0.1.1-SNAPSHOT-nar.nar'}
17:34:58.007 [main] INFO a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-kafka-runtime-0.1.1-SNAPSHOT-nar.nar'}
com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT2S
at com.datastax.oss.driver.api.core.DriverTimeoutException.copy(DriverTimeoutException.java:34)
at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:53)
at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30)
at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230)
at com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:54)
at ai.langstream.agents.vector.cassandra.CassandraAssetsManagerProvider$CassandraTableAssetManager.execStatements(CassandraAssetsManagerProvider.java:111)
at ai.langstream.agents.vector.cassandra.CassandraAssetsManagerProvider$CassandraTableAssetManager.deployAsset(CassandraAssetsManagerProvider.java:102)
at ai.langstream.api.runner.assets.AssetManagerAndLoader$1.lambda$deployAsset$2(AssetManagerAndLoader.java:58)
at ai.langstream.api.runner.assets.AssetManagerAndLoader.executeWithContextClassloader(AssetManagerAndLoader.java:26)
at ai.langstream.api.runner.assets.AssetManagerAndLoader$1.deployAsset(AssetManagerAndLoader.java:58)
at ai.langstream.impl.deploy.ApplicationDeployer.setupAsset(ApplicationDeployer.java:127)
at ai.langstream.impl.deploy.ApplicationDeployer.setupAssets(ApplicationDeployer.java:101)
at ai.langstream.impl.deploy.ApplicationDeployer.setup(ApplicationDeployer.java:86)
at ai.langstream.runtime.tester.LocalApplicationRunner.deployApplicationWithSecrets(LocalApplicationRunner.java:134)
at ai.langstream.runtime.tester.Main.main(Main.java:159)
Steps to reproduce:
cd examples/applications/azure-document-ingestion
langstream docker run test -app . -s ../../secrets/secrets-azure-document-ingestion.yaml --langstream-runtime-docker-image public.ecr.aws/y3i6u2n7/datastax-public/langstream-runtime-tester --langstream-runtime-version latest
Run the pipeline below:
assets:
- name: "langstream-keyspace"
asset-type: "astra-keyspace"
creation-mode: create-if-not-exists
config:
keyspace: "${secrets.astra.keyspace}"
datasource: "AstraDatasource"
- name: "langstream-docs-table"
asset-type: "cassandra-table"
creation-mode: create-if-not-exists
config:
table-name: "${secrets.astra.table}"
keyspace: "${secrets.astra.keyspace}"
datasource: "AstraDatasource"
create-statements:
- |
CREATE TABLE IF NOT EXISTS "${secrets.astra.keyspace}"."${secrets.astra.table}" (
filename TEXT,
chunk_text_length TEXT,
chunk_num_tokens TEXT,
chunk_id TEXT,
contents TEXT,
name TEXT,
embeddings_vector VECTOR<FLOAT, 1536>,
PRIMARY KEY (filename, chunk_id));
- |
CREATE CUSTOM INDEX IF NOT EXISTS ${secrets.astra.table}_ann_index ON ${secrets.astra.keyspace}.${secrets.astra.table}(embeddings_vector) USING 'StorageAttachedIndex';
name: "Embeddings processor"
pipeline:
- name: "Azure blob source"
type: "azure-blob-storage-source"
configuration:
container: "${secrets.azure.container}"
endpoint: "https://${secrets.azure.storage-account-name}.blob.core.windows.net/${secrets.azure.container}"
storage-account-name: "${secrets.azure.storage-account-name}"
storage-account-key: "${secrets.azure.storage-access-key}"
- name: "Extract text"
type: "text-extractor"
- name: "Normalise text"
type: "text-normaliser"
configuration:
make-lowercase: true
trim-spaces: true
- name: "Split into chunks"
type: "text-splitter"
configuration:
splitter_type: "RecursiveCharacterTextSplitter"
chunk_size: 4000
separators: [ "\n\n", "\n", " ", "" ]
keep_separator: false
chunk_overlap: 2000
length_function: "cl100k_base"
- name: "Convert to structured data"
type: "document-to-json"
configuration:
text-field: text
copy-properties: true
- name: "prepare-structure" # This step assumes that the filename for the doc follows the naming convention.
type: "compute"
configuration:
fields:
- name: "value.product_name"
expression: "fn:split(properties.filename, ' ')[0]"
type: STRING
- name: "value.product_version"
expression: "fn:split(properties.filename, ' ')[1]"
type: STRING
- name: "compute-embeddings"
id: "step1"
type: "compute-ai-embeddings"
configuration:
model: "${secrets.open-ai.embeddings-model}" # This needs to match the name of the model deployment, not the base model
embeddings-field: "value.embeddings"
text: "{{ value.contents }}"
batch-size: 10
# this is in milliseconds. It is important to take this value into consideration when using this agent in the chat response pipeline
# in fact this value impacts the latency of the response
# for latency sensitive applications, consider to set batch-size to 1 or flush-interval to 0
flush-interval: 500
- name: "Write to Astra"
type: "vector-db-sink"
configuration:
datasource: "AstraDatasource"
table-name: "${secrets.astra.table}"
keyspace: "${secrets.astra.keyspace}"
mapping: "filename=value.name,chunk_text_length=value.chunk_text_length, chunk_num_tokens=value.chunk_num_tokens, chunk_id=value.chunk_id, contents=value.text, name=value.name"
with this configuration.yaml:
configuration:
resources:
- type: "vector-database"
name: "AstraDatasource"
configuration:
service: "astra"
clientId: "token"
secret: "${ secrets.astra.token }"
token: "${ secrets.astra.token }"
database: "${ secrets.astra.database }"
environment: "PROD"
password: "${ secrets.astra.token }"
Issue is transient. (Doesn't always occur.) But, it's more likely to occur if the table needs to be created.
Desired behavior:
AstraDB queries should be retried when creating new assets.
I'm not sure if this should be handled by the pipeline or lower in the DB interaction layer.
Perhaps we should allow the user to specify if it should be retried on failure or not?