import logging
from datetime import timedelta
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from typing import List, Tuple, Optional
from autorag.utils.util import make_batch
from autorag.vectordb import BaseVectorStore
logger = logging.getLogger("AutoRAG")
[docs]
class Couchbase(BaseVectorStore):
def __init__(
self,
embedding_model: str,
bucket_name: str,
scope_name: str,
collection_name: str,
index_name: str,
embedding_batch: int = 100,
connection_string: str = "",
username: str = "",
password: str = "",
ingest_batch: int = 100,
text_key: Optional[str] = "text",
embedding_key: Optional[str] = "embedding",
scoped_index: bool = True,
):
super().__init__(
embedding_model=embedding_model,
similarity_metric="ip",
embedding_batch=embedding_batch,
)
self.index_name = index_name
self.bucket_name = bucket_name
self.scope_name = scope_name
self.collection_name = collection_name
self.scoped_index = scoped_index
self.text_key = text_key
self.embedding_key = embedding_key
self.ingest_batch = ingest_batch
auth = PasswordAuthenticator(username, password)
self.cluster = Cluster(connection_string, ClusterOptions(auth))
# Wait until the cluster is ready for use.
self.cluster.wait_until_ready(timedelta(seconds=5))
# Check if the bucket exists
if not self._check_bucket_exists():
raise ValueError(
f"Bucket {self.bucket_name} does not exist. "
" Please create the bucket before searching."
)
try:
self.bucket = self.cluster.bucket(self.bucket_name)
self.scope = self.bucket.scope(self.scope_name)
self.collection = self.scope.collection(self.collection_name)
except Exception as e:
raise ValueError(
"Error connecting to couchbase. "
"Please check the connection and credentials."
) from e
# Check if the index exists. Throws ValueError if it doesn't
try:
self._check_index_exists()
except Exception:
raise
# Reinitialize to ensure a consistent state
self.bucket = self.cluster.bucket(self.bucket_name)
self.scope = self.bucket.scope(self.scope_name)
self.collection = self.scope.collection(self.collection_name)
[docs]
async def add(self, ids: List[str], texts: List[str]):
from couchbase.exceptions import DocumentExistsException
texts = self.truncated_inputs(texts)
text_embeddings: List[
List[float]
] = await self.embedding.aget_text_embedding_batch(texts)
documents_to_insert = []
for _id, text, embedding in zip(ids, texts, text_embeddings):
doc = {
self.text_key: text,
self.embedding_key: embedding,
}
documents_to_insert.append({_id: doc})
batch_documents_to_insert = make_batch(documents_to_insert, self.ingest_batch)
for batch in batch_documents_to_insert:
insert_batch = {}
for doc in batch:
insert_batch.update(doc)
try:
self.collection.upsert_multi(insert_batch)
except DocumentExistsException as e:
logger.debug(f"Document already exists: {e}")
[docs]
async def fetch(self, ids: List[str]) -> List[List[float]]:
# Fetch vectors by IDs
fetched_result = self.collection.get_multi(ids)
fetched_vectors = {
k: v.value[f"{self.embedding_key}"]
for k, v in fetched_result.results.items()
}
return list(map(lambda x: fetched_vectors[x], ids))
[docs]
async def is_exist(self, ids: List[str]) -> List[bool]:
existed_result = self.collection.exists_multi(ids)
existed_ids = {k: v.exists for k, v in existed_result.results.items()}
return list(map(lambda x: existed_ids[x], ids))
[docs]
async def query(
self, queries: List[str], top_k: int, **kwargs
) -> Tuple[List[List[str]], List[List[float]]]:
import couchbase.search as search
from couchbase.options import SearchOptions
from couchbase.vector_search import VectorQuery, VectorSearch
queries = self.truncated_inputs(queries)
query_embeddings: List[
List[float]
] = await self.embedding.aget_text_embedding_batch(queries)
ids, scores = [], []
for query_embedding in query_embeddings:
# Create Search Request
search_req = search.SearchRequest.create(
VectorSearch.from_vector_query(
VectorQuery(
self.embedding_key,
query_embedding,
top_k,
)
)
)
# Search
if self.scoped_index:
search_iter = self.scope.search(
self.index_name,
search_req,
SearchOptions(limit=top_k),
)
else:
search_iter = self.cluster.search(
self.index_name,
search_req,
SearchOptions(limit=top_k),
)
# Parse the search results
# search_iter.rows() can only be iterated once.
id_list, score_list = [], []
for result in search_iter.rows():
id_list.append(result.id)
score_list.append(result.score)
ids.append(id_list)
scores.append(score_list)
return ids, scores
[docs]
async def delete(self, ids: List[str]):
self.collection.remove_multi(ids)
def _check_bucket_exists(self) -> bool:
"""Check if the bucket exists in the linked Couchbase cluster.
Returns:
True if the bucket exists
"""
bucket_manager = self.cluster.buckets()
try:
bucket_manager.get_bucket(self.bucket_name)
return True
except Exception as e:
logger.debug("Error checking if bucket exists:", e)
return False
def _check_index_exists(self) -> bool:
"""Check if the Search index exists in the linked Couchbase cluster
Returns:
bool: True if the index exists, False otherwise.
Raises a ValueError if the index does not exist.
"""
if self.scoped_index:
all_indexes = [
index.name for index in self.scope.search_indexes().get_all_indexes()
]
if self.index_name not in all_indexes:
raise ValueError(
f"Index {self.index_name} does not exist. "
" Please create the index before searching."
)
else:
all_indexes = [
index.name for index in self.cluster.search_indexes().get_all_indexes()
]
if self.index_name not in all_indexes:
raise ValueError(
f"Index {self.index_name} does not exist. "
" Please create the index before searching."
)
return True