SuperDuperDB/superduperdb

[BUG] SQL Create vector index error

Closed this issue · 2 comments

# <tab: SQLite>
from superduperdb import superduper

db = superduper("sqlite://my_db.db")


# <tab: Text>
import json

with open("text.json", "r") as f:
    data = json.load(f)


# <tab: SQL>
from superduperdb.backends.ibis import Table
from superduperdb import Schema, DataType
from superduperdb.backends.ibis.field_types import FieldType

datatype = "str"

if isinstance(datatype, DataType):
    schema = Schema(identifier="schema", fields={"id": FieldType("str"), "x": datatype})
else:
    schema = Schema(
        identifier="schema", fields={"id": FieldType("str"), "x": FieldType(datatype)}
    )

table_or_collection = Table("documents", schema=schema)

db.apply(table_or_collection)


# <tab: SQL>
from superduperdb import Document


def do_insert(data):
    db.execute(
        table_or_collection.insert([Document({"id": "str", "x": x}) for x in data])
    )


do_insert(data[:-len(data) // 4])
# <tab: SQL>

select = table_or_collection.to_query()


# <tab: Text>
from superduperdb import objectmodel

CHUNK_SIZE = 200


@objectmodel(
    flatten=True,
    model_update_kwargs={"document_embedded": False},
    datatype=FieldType(datatype),
)
def chunker(text):
    text = text.split()
    chunks = [
        " ".join(text[i : i + CHUNK_SIZE]) for i in range(0, len(text), CHUNK_SIZE)
    ]
    return chunks


from superduperdb import Listener

upstream_listener = Listener(
    model=chunker,
    select=select,
    key="x",
)

db.apply(upstream_listener)
indexing_key = upstream_listener.output_key


# <tab: SQL>
select = db.load("table", upstream_listener.outputs).to_query()


# <tab: OpenAI>
from superduperdb.ext.openai import OpenAIEmbedding

model = OpenAIEmbedding(identifier="text-embedding-ada-002")


vector_index_name = "my-vector-index"


# <tab: 1-Modality>
from superduperdb import VectorIndex, Listener

jobs, _ = db.apply(
    VectorIndex(
        vector_index_name,
        indexing_listener=Listener(
            key=indexing_key,  # the `Document` key `model` should ingest to create embedding
            select=select,  # a `Select` query telling which data to search over
            model=model,  # a `_Predictor` how to convert data to embeddings
        ),
    )
)

ERROR MESSAGE:

Traceback (most recent call last):
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/superduperdb/jobs/tasks.py", line 51, in method_job
    method(*args, db=db, **kwargs)
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/superduperdb/components/model.py", line 553, in predict_in_db
    predict_ids = self._get_ids_from_select(
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/superduperdb/components/model.py", line 511, in _get_ids_from_select
    for r in tqdm.tqdm(db.execute(query)):
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/superduperdb/base/datalayer.py", line 289, in execute
    return self.select(query, *args, **kwargs)
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/superduperdb/base/datalayer.py", line 360, in select
    return select.execute(self, reference=reference)
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/superduperdb/backends/ibis/query.py", line 313, in execute
    output[column] = output[column].map(type.decode_data)
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/env/lib/python3.10/site-packages/pandas/core/series.py", line 4691, in map
    new_values = self._map_values(arg, na_action=na_action)
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/env/lib/python3.10/site-packages/pandas/core/base.py", line 921, in _map_values
    return algorithms.map_array(arr, mapper, na_action=na_action, convert=convert)
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/env/lib/python3.10/site-packages/pandas/core/algorithms.py", line 1743, in map_array
    return lib.map_infer(values, mapper, convert=convert)
  File "lib.pyx", line 2972, in pandas._libs.lib.map_infer
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/superduperdb/components/component.py", line 459, in wrapper
    return func(self, *args, **kwargs)
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/superduperdb/components/datatype.py", line 160, in decode_data
    return self.decoder(item, info=info)
  File "/Users/zhouhaha/workspace/SuperDuperDB/superduperdb/superduperdb/components/vector_index.py", line 224, in __call__
TypeError: a bytes-like object is required, not 'str'

    def _get_all_fields(self, db):
        tables = self.get_all_tables()
        component_tables = []
        for tab in tables:
            component_tables.append(db.load('table', tab))

        fields = {}

        for tab in component_tables:
            fields_copy = tab.schema.fields.copy()
            if '_outputs' in tab.identifier and self.renamings:
                fields_copy[tab.identifier] = fields_copy['output']
                del fields_copy['output']
            else:
                for k in fields_copy:
                    if k in self.renamings.values():
                        fields_copy[k] = fields_copy[self.renamings[k]]
                        del fields_copy[k]
            fields.update(fields_copy)
        return fields

Because the key for the model stored in Ibis is always 'output', it leads to update conflicts where the fields retrieved first get overwritten. Therefore, this will cause random reference errors in the datatype.