| Documentation | Blog | | Discord |
Latest News 🔥
- [2023/11] Release Byzer-Retrieval 0.1.4
Byzer-retrieval is a distributed retrieval system which designed as a backend for LLM RAG (Retrieval Augmented Generation). The system supports both full-text search and vector retrieval algorithm, you can also use both of them at the same time and get a fusion score for each document.
This project is implemented based on Lucene + Ray which use Lucene to build the inverted index/vector index and use Ray to build the distributed system.
In contrast to the traditional way,there is no need to deploy so many systems e.g. the Elasticsearch or Milvus, and reduce the cost of deployment and maintenance. You can reuse the cluster which is used for training/serving the LLM model because Byzer-retrieval use CPU/Memory(LLM using GPU/GPU Memory) which will make full use of the resources.
Notice that this project requires JDK 21 or higher, because the new features of JDK 21 e.g. vector API and foreign memory will bring a great performance improvement to the system. We also introduce the virtual threads in Java to improve the concurrency performance of cluster.
Download: https://download.byzer.org/byzer-retrieval/byzer-retrieval-lib-0.1.4.tar.gz
- Support filter query
- Fix minor bugs
Download: https://download.byzer.org/byzer-retrieval/byzer-retrieval-lib-0.1.3.tar.gz
- fix the issue that the jvm options is not set correctly when launching the cluster
- sort the collection when fetch results from the multiple shards otherwise the results will be disordered and cause the rerank error.
Download: https://download.byzer.org/byzer-retrieval/byzer-retrieval-lib-0.1.2.tar.gz
- fix rerank bug
- add shutdown cluster API
- 0.1.1: support multi-requests at the same time
- 0.1.0: first version
-
Install Ray
conda create -n byzer-retrieval python=3.10.11 conda activate byzer-retrieval pip install -r requirements.txt ray start --head --dashboard-host 0.0.0.0
-
Download the latest version of Byzer-Retrieval Package and the JDK-21, extract them to the local disk in every node in Ray.
-
Install Byzer-Retrieval
import ray from byzerllm.utils.retrieval import ByzerRetrieval code_search_path=["/home/winubuntu/softwares/byzer-retrieval-lib/"] env_vars = {"JAVA_HOME": "/home/winubuntu/softwares/jdk-21", "PATH":"/home/winubuntu/softwares/jdk-21/bin:/home/winubuntu/.cargo/bin:/usr/local/cuda/bin:/home/winubuntu/softwares/byzer-lang-all-in-one-linux-amd64-3.1.1-2.3.2/jdk8/bin:/home/winubuntu/miniconda3/envs/byzerllm-dev/bin:/home/winubuntu/miniconda3/condabin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin"} ray.init(address="auto",namespace="default", job_config=ray.job_config.JobConfig(code_search_path=code_search_path, runtime_env={"env_vars": env_vars}) ) retrieval = ByzerRetrieval() # you can use ray.kill(ray.get_actor("RetrievalGateway")) # to kill the RetrievalGateway actor if you want to restart the RetrievalGateway actor. retrieval.launch_gateway() builder = retrieval.cluster_builder() builder.set_name("cluster1").set_location("/tmp/cluster1").set_num_nodes(2).set_node_cpu(1).set_node_memory("3g") builder.set_java_home(env_vars["JAVA_HOME"]).set_path(env_vars["PATH"]).set_enable_zgc() builder.start_cluster()
Now you can check the retrieval cluster in Ray Dashboard or you can use the following code to check the cluster:
retrieval.cluster_info("cluster1")
If there is something wrong,you can validate the Environment e.g. JAVA_HOME
and PATH
before you
can launch the retrieval gateway in every kind of API(Python/Byzer-SQL/Rest).
Try to use the following code to validate the environment especially the PATH
is correct:
import os
os.environ["JAVA_HOME"]="/home/winubuntu/softwares/jdk-21"
os.environ["PATH"]="/home/winubuntu/softwares/jdk-21/bin:/home/winubuntu/.cargo/bin:/usr/local/cuda/bin:/home/winubuntu/softwares/byzer-lang-all-in-one-linux-amd64-3.1.1-2.3.2/jdk8/bin:/home/winubuntu/miniconda3/envs/byzerllm-dev/bin:/home/winubuntu/miniconda3/condabin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin"
os.execvp("bash", args=["bash", "-c", "java -version"])
If this script fails, The PATH
is not correct, and you need to check the PATH
again.
You may miss some key paths e.g. /usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin
in the PATH
.
The above figure shows the architecture of Byzer-retrieval. The user can use Python/Rest/SQL API to launch a RetrievalGateway, and then use the RetrievalGateway to create a retrieval cluster. Actually, you can create any number of clusters, and each cluster can contain any number of tables. Notice that all tables in the same cluster have the same shard number. Each cluster will have a master actor and some worker actors, and all these actors are Java actors. The master actor is as the entry point of the cluster.
If you use python API to insert data into the table, the data will be put in the ray cluster object store, and then the master actor will route the data to the worker actors. The worker actors will build the index in parallel.
Here is the command to build the jar file and dependency from source code:
## use jdk 21 to compile the project
export JAVA_HOME=/home/winubuntu/softwares/jdk-21
export PATH=$JAVA_HOME/bin/:$PATH
mvn clean package -DskipTests
mvn dependency:copy-dependencies -DoutputDirectory=target/dependency
Then copy the jar file and dependency to the ray cluster. Suppose you put
all the jars in /home/winubuntu/softwares/byzer-retrieval-lib/
.
If you want to use Byzer-SQL API, you need to download the
byzer-llm
jar file and put it in$BYZER_HOME/plugin/
directory.
Once you have a retrieval cluster, now try to create a database/table in the cluster:
retrieval.create_table("cluster1",TableSettings(
database="db1",table="table1",
schema='''st(
field(_id,long),
field(name,string),
field(content,string,analyze),
field(raw_content,string),
field(vector,array(float)))''',
location="",num_shards=1,
))
Notice that when the third value of the field is
analyze
, the field will be analyzed when indexing(using WhiteSpaceAnalyzer).
After that, we can insert some data into the table:
data = [
{"_id":1, "name":"a", "content":"b c", "raw_content":"b c","vector":[1.0,2.0,3.0]},
{"_id":2, "name":"d", "content":"b e", "raw_content":"b e", "vector":[1.0,2.6,4.0]}
]
retrieval.build_from_dicts("cluster1","db1","table1",data)
retrieval.commit("cluster1","db1","table1")
Notice that you can use Jieba to analyze the Chinese text, and use Byzer-LLM to get the vector of the text.
In this step, we will insert the data into the table, and build the index. Notice that
- we need to commit the index after building the index to make the index persistent.
- we strongly recommend use JuiceFS to store the index data, because JuiceFS is a distributed file system which is compatible with POSIX, and it is very easy to deploy and use.
For now, we can search the data.
try to search by keyword:
retrieval.search_keyword("cluster1","db1","table1",filters={},
keyword="c",fields=["content"],limit=10)
## output: [{'name': 'a', 'raw_content': 'b c', '_id': 1, '_score': 0.31506687}]
try to search by vector:
retrieval.search_vector("cluster1","db1","table1",filters={},
vector=[1.0,2.0,3.0],vector_field="vector",limit=10)
## output: [{'name': 'a', '_id': 1, '_score': 1.0, 'content': 'b c'},{'name': 'd', '_id': 2, '_score': 0.9989467, 'content': 'b e'}]
or you can search by both keyword and vector:
from byzerllm.records import SearchQuery
retrieval.search("cluster1",
[SearchQuery(database="db1",table="table1",keyword="c",fields=["content"], filters={},
vector=[1.0,2.0,3.0],vectorField="vector",
limit=10)])
## output: [{'name': 'a', 'raw_content': 'b c', '_id': 1, '_score': 0.033333335},
## {'name': 'd', 'raw_content': 'b e', '_id': 2, '_score': 0.016393442}]
If you just want to get the filter some documents, you can use the following code:
retrieval.filter("cluster1",
[SearchQuery("db1","table1",
filters={"and":[{"field":"name","value":"d"}]},
keyword=None,fields=[],
vector=[],vectorField=None,
limit=1)])
If you want to filter then search(with score), you can use the following code:
retrieval.search("cluster1",
[SearchQuery("db1","table1",
filters={"and":[{"field":"name","value":"d"}]},
keyword="b",fields=["content"],
vector=[],vectorField=None,
limit=1)])
If you want to filter then sort the results:
retrieval.filter("cluster1",
[SearchQuery("db1","table1",
filters={"and":[{"field":"name","value":"d"}]},
sorts=[{"created_time":"desc"}],
keyword=None,fields=[],
vector=[],vectorField=None,
limit=1)])
However, the field created_time
must be marked as sort
in the schema, otherwise the sort will not work or throw an exception.
Try to add a field created_time
in the schema like following:
st(
...
field(created_time,long,sort)
...
)
You can also do follow operations to the table:
- truncate: delete all data in the table
- close: close the table and release the resources
- closeAndDeleteFile: close the table and delete the index files
retrieval.truncate("cluster1","db1","table1")
retrieval.close("cluster1","db1","table1")
retrieval.closeAndDeleteFile("cluster1","db1","table1")
You can use the following code to shutdown the cluster:
retrieval.shutdown_cluster(cluster_name="cluster1")
When the Retrieval Cluster is crash or the Ray Cluster is down, we need to recover our cluster. You can manually export cluster metadata, and then save it to the storage. Try to use the following code to export the metadata:
cluster = retrieval.cluster("cluster1")
cluster1_meta = json.loads(ray.get(cluster.clusterInfo.remote()))
# save s to file
with open("/tmp/cluster_info.json","w") as f:
json.dump(cluster1_meta,f,ensure_ascii=False)
Then you can use the following code to recover the cluster once the cluster is down:
import json
import os
import ray
from byzerllm.records import EnvSettings,ClusterSettings,TableSettings,JVMSettings
from byzerllm.utils.retrieval import ByzerRetrieval
with open("/tmp/cluster_info.json","r") as f:
s = json.load(f)
retrieval = ByzerRetrieval()
retrieval.launch_gateway()
retrieval.restore_from_cluster_info(s)
Notice that if the Ray Cluster is down, you need to connect it Ray cluster first, and then restore the retrieval cluster.
import json
import os
import ray
from byzerllm.records import EnvSettings,ClusterSettings,TableSettings,JVMSettings
from byzerllm.utils.retrieval import ByzerRetrieval
with open("/tmp/cluster_info.json","r") as f:
s = json.load(f)
code_search_path=["/home/winubuntu/softwares/byzer-retrieval-lib/"]
env_vars = {"JAVA_HOME": s["envSettings"]["javaHome"],
"PATH":s["envSettings"]["path"]}
ray.init(address="auto",namespace="default",
job_config=ray.job_config.JobConfig(code_search_path=code_search_path,
runtime_env={"env_vars": env_vars})
)
retrieval = ByzerRetrieval()
retrieval.launch_gateway()
retrieval.restore_from_cluster_info(s)
We also provide a rest api to access the retrieval clusters. You can use the following code to start the rest api:
import ray
from byzerllm.utils.retrieval.rest import deploy_retrieval_rest_server
code_search_path=["/home/winubuntu/softwares/byzer-retrieval-lib/"]
env_vars = {"JAVA_HOME": "/home/winubuntu/softwares/jdk-21",
"PATH":"/home/winubuntu/softwares/jdk-21/bin:/home/winubuntu/.cargo/bin:/usr/local/cuda/bin:/home/winubuntu/softwares/byzer-lang-all-in-one-linux-amd64-3.1.1-2.3.2/jdk8/bin:/home/winubuntu/miniconda3/envs/byzerllm-dev/bin:/home/winubuntu/miniconda3/condabin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin"}
ray.init(address="auto",namespace="default",
job_config=ray.job_config.JobConfig(code_search_path=code_search_path,
runtime_env={"env_vars": env_vars}),ignore_reinit_error=True
)
deploy_retrieval_rest_server(host="0.0.0.0",route_prefix="/retrieval")
Then you can use endpoint http://127.0.0.1:8000/retrievel
to access the retrieval cluster.
create a retrieval cluster:
import requests
import json
from byzerllm.utils.retrieval.rest import (ClusterSettingsParam,
EnvSettingsParam,
JVMSettingsParam,
ResourceRequirementParam,
ResourceRequirementSettingsParam)
r = requests.post("http://127.0.0.1:8000/retrieval/cluster/create",json={
"cluster_settings":ClusterSettingsParam(
name="cluster1",
location="/tmp/cluster1",
numNodes=1
).dict(),
"env_settings":EnvSettingsParam(
javaHome=env_vars["JAVA_HOME"],
path=env_vars["PATH"]
).dict(),
"jvm_settings":JVMSettingsParam(
options=[]
).dict(),
"resource_requirement_settings": ResourceRequirementSettingsParam(
resourceRequirements=[ResourceRequirementParam(name="CPU",resourceQuantity=1.0)]).dict()
})
r.text
or recover the cluster is also supported in the rest api, you can use the following code to recover the cluster:
import requests
import json
with open("/tmp/cluster_info.json","r") as f:
s = f.read()
r = requests.post("http://127.0.0.1:8000/retrieval/cluster/restore",params={
"cluster_info":s
})
json.loads(r.text)
get cluster info:
import requests
import json
r = requests.get("http://127.0.0.1:8000/retrieval/cluster/get/cluster1")
json.loads(r.text)
create a table:
import requests
import json
from byzerllm.utils.retrieval.rest import TableSettingsParam
r = requests.post("http://127.0.0.1:8000/retrieval/table/create/cluster1",json=TableSettingsParam(
database="db1",table="table1",
schema="st(field(_id,long),field(name,string),field(content,string,analyze),field(vector,array(float)))",
location="/tmp/cluster1",num_shards=1,
).dict())
r.text
insert data:
import requests
import json
from byzerllm.utils.retrieval.rest import TableSettingsParam
data = [
{"_id":3, "name":"a", "content":"b c", "vector":[1.0,2.0,3.0]},
{"_id":4, "name":"d", "content":"b e", "vector":[1.0,2.6,4.0]}
]
r = requests.post("http://127.0.0.1:8000/retrieval/table/data",json={
"cluster_name":"cluster1",
"database":"db1",
"table":"table1",
"data":data
})
r.text
make the index persistent:
import requests
import json
from byzerllm.utils.retrieval.rest import TableSettingsParam
r = requests.post("http://127.0.0.1:8000/retrieval/table/commit",json={
"cluster_name":"cluster1",
"database":"db1",
"table":"table1"
})
r.text
search:
import requests
import json
from byzerllm.utils.retrieval.rest import SearchQueryParam
r = requests.post("http://127.0.0.1:8000/retrieval/table/search",json={
"cluster_name":"cluster1",
"database":"db1",
"table":"table1",
"query":SearchQueryParam(keyword="c",fields=["content"],
vector=[1.0,2.0,3.0],vectorField="vector",
limit=10).dict()
})
json.loads(r.text)
More details please refer to http://127.0.0.1:8000/retrieval/docs
We also integrate the retrieval system into Byzer-SQL, the first step is setup environment:
!byzerllm setup retrieval;
!byzerllm setup "code_search_path=/home/winubuntu/softwares/byzer-retrieval-lib/";
!byzerllm setup "JAVA_HOME=/home/winubuntu/softwares/jdk-21";
!byzerllm setup "PATH=/home/winubuntu/softwares/jdk-21/bin:/home/winubuntu/.cargo/bin:/usr/local/cuda/bin:/home/winubuntu/softwares/byzer-lang-all-in-one-linux-amd64-3.1.1-2.3.2/jdk8/bin:/home/winubuntu/miniconda3/envs/byzerllm-dev/bin:/home/winubuntu/miniconda3/condabin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin";
Then we can create a retrieval cluster:
run command as Retrieval.``
where action="cluster/create"
and `cluster_settings.name`="cluster1"
and `cluster_settings.location`="/tmp/cluster1"
and `cluster_settings.numNodes`="1";
You can create a table in the cluster:
run command as Retrieval.``
where action="table/create/cluster1"
and `table_settings.database`="db1"
and `table_settings.table`="table2"
and `table_settings.location`="/tmp/cluster1"
and `table_settings.schema`="st(field(_id,long),field(name,string),field(content,string,analyze),field(vector,array(float)))";
After that, we can insert some data into the table:
!byzerllm setup retrieval/data;
set jsondata = '''
{"_id":3, "name":"a", "content":"b c", "vector": [1.0,2.0,3.0] }
{"_id":4, "name":"d", "content":"b e", "vector": [1.0,2.6,4.0] }
''';
load jsonStr.`jsondata` as newdata;
run command as Retrieval.``
where action="table/data"
and clusterName="cluster1"
and database="db1"
and table="table2"
and inputTable="newdata";
Try to register the retrieval as a UDF:
run command as Retrieval.`` where
action="register"
and udfName="search";
You can change the udfName to any name you want, and then you can use the udf to search the data:
select search(
array(to_json(map(
"clusterName","cluster1",
"database","db1",
"table","table2",
"query.keyword","c",
"query.fields","content",
"query.vector","1.0,2.0,3.0",
"query.vectorField","vector",
"query.limit","10"
)))
)
as c as output;
-- output: [ "[{\"name\": \"a\", \"_id\": 3, \"_score\": 0.016666668, \"content\": \"b c\"}, {\"name\": \"d\", \"_id\": 4, \"_score\": 0.016393442, \"content\": \"b e\"}]" ]
try to modify the map key-value as you want, and you can get the search result.
We introduce a new schema language to describe the table schema.
st(
field(_id,long),
field(name,string),
field(content,string,analyze),
field(vector,array(float))
)
st means Struct, field means Field,the first value in field is columnName,and the second is type,
the third is analyze, and it is optional. If you want to analyze the field when indexing, you can set the third value as
analyze
.
For now, simple schema supports type like following:
- st
- field
- string
- float
- double
- integer
- short
- date
- binary
- map
- array
- long
- boolean
- byte
- decimal
st also supports nesting:
st(field(column1,map(string,array(st(field(columnx,string))))))