LangStream/langstream

Need to handle AstraDB timeouts

devinbost opened this issue · 1 comments

I got this AstraDB exception:

17:34:53.524 [main] INFO  a.l.a.v.c.CassandraAssetsManagerProvider -- Executing: CREATE TABLE IF NOT EXISTS "doc_qa_demos"."chatbot2" (
filename TEXT,
chunk_text_length TEXT,
chunk_num_tokens TEXT,
chunk_id TEXT,
contents TEXT,
name TEXT,
embeddings_vector VECTOR<FLOAT, 1536>,
PRIMARY KEY (filename, chunk_id));
17:34:57.982 [main] INFO  a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-vector-agents-0.1.1-SNAPSHOT-nar.nar'}
17:34:58.007 [main] INFO  a.langstream.impl.nar.NarFileHandler -- Closing classloader NarFileClassLoader{name='langstream-kafka-runtime-0.1.1-SNAPSHOT-nar.nar'}
com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT2S
        at com.datastax.oss.driver.api.core.DriverTimeoutException.copy(DriverTimeoutException.java:34)
        at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
        at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:53)
        at com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor.process(CqlRequestSyncProcessor.java:30)
        at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230)
        at com.datastax.oss.driver.api.core.cql.SyncCqlSession.execute(SyncCqlSession.java:54)
        at ai.langstream.agents.vector.cassandra.CassandraAssetsManagerProvider$CassandraTableAssetManager.execStatements(CassandraAssetsManagerProvider.java:111)
        at ai.langstream.agents.vector.cassandra.CassandraAssetsManagerProvider$CassandraTableAssetManager.deployAsset(CassandraAssetsManagerProvider.java:102)
        at ai.langstream.api.runner.assets.AssetManagerAndLoader$1.lambda$deployAsset$2(AssetManagerAndLoader.java:58)
        at ai.langstream.api.runner.assets.AssetManagerAndLoader.executeWithContextClassloader(AssetManagerAndLoader.java:26)
        at ai.langstream.api.runner.assets.AssetManagerAndLoader$1.deployAsset(AssetManagerAndLoader.java:58)
        at ai.langstream.impl.deploy.ApplicationDeployer.setupAsset(ApplicationDeployer.java:127)
        at ai.langstream.impl.deploy.ApplicationDeployer.setupAssets(ApplicationDeployer.java:101)
        at ai.langstream.impl.deploy.ApplicationDeployer.setup(ApplicationDeployer.java:86)
        at ai.langstream.runtime.tester.LocalApplicationRunner.deployApplicationWithSecrets(LocalApplicationRunner.java:134)
        at ai.langstream.runtime.tester.Main.main(Main.java:159)

Steps to reproduce:

cd examples/applications/azure-document-ingestion
langstream docker run test -app . -s ../../secrets/secrets-azure-document-ingestion.yaml --langstream-runtime-docker-image public.ecr.aws/y3i6u2n7/datastax-public/langstream-runtime-tester --langstream-runtime-version latest

Run the pipeline below:

assets:
  - name: "langstream-keyspace"
    asset-type: "astra-keyspace"
    creation-mode: create-if-not-exists
    config:
      keyspace: "${secrets.astra.keyspace}"
      datasource: "AstraDatasource"
  - name: "langstream-docs-table"
    asset-type: "cassandra-table"
    creation-mode: create-if-not-exists
    config:
      table-name: "${secrets.astra.table}"
      keyspace: "${secrets.astra.keyspace}"
      datasource: "AstraDatasource"
      create-statements:
        - |
          CREATE TABLE IF NOT EXISTS "${secrets.astra.keyspace}"."${secrets.astra.table}" (
          filename TEXT,
          chunk_text_length TEXT,
          chunk_num_tokens TEXT,
          chunk_id TEXT,
          contents TEXT,
          name TEXT,
          embeddings_vector VECTOR<FLOAT, 1536>,
          PRIMARY KEY (filename, chunk_id));
        - |
          CREATE CUSTOM INDEX IF NOT EXISTS ${secrets.astra.table}_ann_index ON ${secrets.astra.keyspace}.${secrets.astra.table}(embeddings_vector) USING 'StorageAttachedIndex';
name: "Embeddings processor"
pipeline:
  - name: "Azure blob source"
    type: "azure-blob-storage-source"
    configuration:
      container: "${secrets.azure.container}"
      endpoint: "https://${secrets.azure.storage-account-name}.blob.core.windows.net/${secrets.azure.container}"
      storage-account-name: "${secrets.azure.storage-account-name}"
      storage-account-key: "${secrets.azure.storage-access-key}"
  - name: "Extract text"
    type: "text-extractor"
  - name: "Normalise text"
    type: "text-normaliser"
    configuration:
      make-lowercase: true
      trim-spaces: true
  - name: "Split into chunks"
    type: "text-splitter"
    configuration:
      splitter_type: "RecursiveCharacterTextSplitter"
      chunk_size: 4000
      separators: [ "\n\n", "\n", " ", "" ]
      keep_separator: false
      chunk_overlap: 2000
      length_function: "cl100k_base"
  - name: "Convert to structured data"
    type: "document-to-json"
    configuration:
      text-field: text
      copy-properties: true
  - name: "prepare-structure" # This step assumes that the filename for the doc follows the naming convention.
    type: "compute"
    configuration:
      fields:
        - name: "value.product_name"
          expression: "fn:split(properties.filename, ' ')[0]"
          type: STRING
        - name: "value.product_version"
          expression: "fn:split(properties.filename, ' ')[1]"
          type: STRING
  - name: "compute-embeddings"
    id: "step1"
    type: "compute-ai-embeddings"
    configuration:
      model: "${secrets.open-ai.embeddings-model}" # This needs to match the name of the model deployment, not the base model
      embeddings-field: "value.embeddings"
      text: "{{ value.contents }}"
      batch-size: 10
      # this is in milliseconds. It is important to take this value into consideration when using this agent in the chat response pipeline
      # in fact this value impacts the latency of the response
      # for latency sensitive applications, consider to set batch-size to 1 or flush-interval to 0
      flush-interval: 500
  - name: "Write to Astra"
    type: "vector-db-sink"
    configuration:
      datasource: "AstraDatasource"
      table-name: "${secrets.astra.table}"
      keyspace: "${secrets.astra.keyspace}"
      mapping: "filename=value.name,chunk_text_length=value.chunk_text_length, chunk_num_tokens=value.chunk_num_tokens, chunk_id=value.chunk_id, contents=value.text, name=value.name"

with this configuration.yaml:

configuration:
  resources:
    - type: "vector-database"
      name: "AstraDatasource"
      configuration:
        service: "astra"
        clientId: "token"
        secret: "${ secrets.astra.token }"
        token: "${ secrets.astra.token }"
        database: "${ secrets.astra.database }"
        environment: "PROD"
        password: "${ secrets.astra.token }"

Issue is transient. (Doesn't always occur.) But, it's more likely to occur if the table needs to be created.

Desired behavior:
AstraDB queries should be retried when creating new assets.
I'm not sure if this should be handled by the pipeline or lower in the DB interaction layer.
Perhaps we should allow the user to specify if it should be retried on failure or not?

When you run in k8s the pod is restarted.

With #615 when you are running in docker mode the container will gracefully exit.