import asyncio
import os
import pickle
import re
from typing import List, Dict, Tuple, Callable, Union, Iterable, Optional
import numpy as np
import pandas as pd
from llama_index.core.indices.keyword_table.utils import simple_extract_keywords
from nltk import PorterStemmer
from rank_bm25 import BM25Okapi
from transformers import AutoTokenizer, PreTrainedTokenizerBase
from autorag.nodes.retrieval.base import (
evenly_distribute_passages,
BaseRetrieval,
get_bm25_pkl_name,
)
from autorag.utils import validate_corpus_dataset, fetch_contents
from autorag.utils.util import (
get_event_loop,
normalize_string,
result_to_dataframe,
pop_params,
)
[docs]
def tokenize_ko_kiwi(texts: List[str]) -> List[List[str]]:
try:
from kiwipiepy import Kiwi, Token
except ImportError:
raise ImportError(
"You need to install kiwipiepy to use 'ko_kiwi' tokenizer. "
"Please install kiwipiepy by running 'pip install kiwipiepy'. "
"Or install Korean version of AutoRAG by running 'pip install AutoRAG[ko]'."
)
texts = list(map(lambda x: x.strip().lower(), texts))
kiwi = Kiwi()
tokenized_list: Iterable[List[Token]] = kiwi.tokenize(texts)
return [list(map(lambda x: x.form, token_list)) for token_list in tokenized_list]
[docs]
def tokenize_ko_kkma(texts: List[str]) -> List[List[str]]:
try:
from konlpy.tag import Kkma
except ImportError:
raise ImportError(
"You need to install konlpy to use 'ko_kkma' tokenizer. "
"Please install konlpy by running 'pip install konlpy'. "
"Or install Korean version of AutoRAG by running 'pip install AutoRAG[ko]'."
)
tokenizer = Kkma()
tokenized_list: List[List[str]] = list(map(lambda x: tokenizer.morphs(x), texts))
return tokenized_list
[docs]
def tokenize_ko_okt(texts: List[str]) -> List[List[str]]:
try:
from konlpy.tag import Okt
except ImportError:
raise ImportError(
"You need to install konlpy to use 'ko_kkma' tokenizer. "
"Please install konlpy by running 'pip install konlpy'. "
"Or install Korean version of AutoRAG by running 'pip install AutoRAG[ko]'."
)
tokenizer = Okt()
tokenized_list: List[List[str]] = list(map(lambda x: tokenizer.morphs(x), texts))
return tokenized_list
[docs]
def tokenize_porter_stemmer(texts: List[str]) -> List[List[str]]:
def tokenize_remove_stopword(text: str, stemmer) -> List[str]:
text = text.lower()
words = list(simple_extract_keywords(text))
return [stemmer.stem(word) for word in words]
stemmer = PorterStemmer()
tokenized_list: List[List[str]] = list(
map(lambda x: tokenize_remove_stopword(x, stemmer), texts)
)
return tokenized_list
[docs]
def tokenize_space(texts: List[str]) -> List[List[str]]:
def tokenize_space_text(text: str) -> List[str]:
text = normalize_string(text)
return re.split(r"\s+", text.strip())
return list(map(tokenize_space_text, texts))
[docs]
def load_bm25_corpus(bm25_path: str) -> Dict:
if bm25_path is None:
return {}
with open(bm25_path, "rb") as f:
bm25_corpus = pickle.load(f)
return bm25_corpus
[docs]
def tokenize_ja_sudachipy(texts: List[str]) -> List[List[str]]:
try:
from sudachipy import dictionary, tokenizer
except ImportError:
raise ImportError(
"You need to install SudachiPy to use 'sudachipy' tokenizer. "
"Please install SudachiPy by running 'pip install sudachipy'."
)
# Initialize SudachiPy with the default tokenizer
tokenizer_obj = dictionary.Dictionary(dict="core").create()
# Choose the tokenizer mode: NORMAL, SEARCH, A
mode = tokenizer.Tokenizer.SplitMode.A
# Tokenize the input texts
tokenized_list = []
for text in texts:
tokens = tokenizer_obj.tokenize(text, mode)
tokenized_list.append([token.surface() for token in tokens])
return tokenized_list
BM25_TOKENIZER = {
"porter_stemmer": tokenize_porter_stemmer,
"ko_kiwi": tokenize_ko_kiwi,
"space": tokenize_space,
"ko_kkma": tokenize_ko_kkma,
"ko_okt": tokenize_ko_okt,
"sudachipy": tokenize_ja_sudachipy,
}
[docs]
class BM25(BaseRetrieval):
def __init__(self, project_dir: str, *args, **kwargs):
"""
Initialize BM25 module.
(Retrieval)
:param project_dir: The project directory path.
:param bm25_tokenizer: The tokenizer name that is used to the BM25.
It supports 'porter_stemmer', 'ko_kiwi', and huggingface `AutoTokenizer`.
You can pass huggingface tokenizer name.
Default is porter_stemmer.
:param kwargs: The optional arguments.
"""
super().__init__(project_dir)
# check if bm25_path and file exist
bm25_tokenizer = kwargs.get("bm25_tokenizer", None)
if bm25_tokenizer is None:
bm25_tokenizer = "porter_stemmer"
bm25_path = os.path.join(self.resources_dir, get_bm25_pkl_name(bm25_tokenizer))
assert (
bm25_path is not None
), "bm25_path must be specified for using bm25 retrieval."
assert os.path.exists(
bm25_path
), f"bm25_path {bm25_path} does not exist. Please ingest first."
self.bm25_corpus = load_bm25_corpus(bm25_path)
assert (
"tokens" and "passage_id" in list(self.bm25_corpus.keys())
), "bm25_corpus must contain tokens and passage_id. Please check you ingested bm25 corpus correctly."
self.tokenizer = select_bm25_tokenizer(bm25_tokenizer)
assert self.bm25_corpus["tokenizer_name"] == bm25_tokenizer, (
f"The bm25 corpus tokenizer is {self.bm25_corpus['tokenizer_name']}, but your input is {bm25_tokenizer}. "
f"You need to ingest again. Delete bm25 pkl file and re-ingest it."
)
self.bm25_instance = BM25Okapi(self.bm25_corpus["tokens"])
[docs]
@result_to_dataframe(["retrieved_contents", "retrieved_ids", "retrieve_scores"])
def pure(self, previous_result: pd.DataFrame, *args, **kwargs):
queries = self.cast_to_run(previous_result)
pure_params = pop_params(self._pure, kwargs)
ids, scores = self._pure(queries, *args, **pure_params)
contents = fetch_contents(self.corpus_df, ids)
return contents, ids, scores
def _pure(
self,
queries: List[List[str]],
top_k: int,
ids: Optional[List[List[str]]] = None,
) -> Tuple[List[List[str]], List[List[float]]]:
"""
BM25 retrieval function.
You have to load a pickle file that is already ingested.
:param queries: 2-d list of query strings.
Each element of the list is a query strings of each row.
:param top_k: The number of passages to be retrieved.
:param ids: The optional list of ids that you want to retrieve.
You don't need to specify this in the general use cases.
Default is None.
:return: The 2-d list contains a list of passage ids that retrieved from bm25 and 2-d list of its scores.
It will be a length of queries. And each element has a length of top_k.
"""
if ids is not None:
score_result = list(
map(
lambda query_list, id_list: get_bm25_scores(
query_list,
id_list,
self.tokenizer,
self.bm25_instance,
self.bm25_corpus,
),
queries,
ids,
)
)
return ids, score_result
# run async bm25_pure function
tasks = [
bm25_pure(
input_queries,
top_k,
self.tokenizer,
self.bm25_instance,
self.bm25_corpus,
)
for input_queries in queries
]
loop = get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
id_result = list(map(lambda x: x[0], results))
score_result = list(map(lambda x: x[1], results))
return id_result, score_result
[docs]
async def bm25_pure(
queries: List[str], top_k: int, tokenizer, bm25_api: BM25Okapi, bm25_corpus: Dict
) -> Tuple[List[str], List[float]]:
"""
Async BM25 retrieval function.
Its usage is for async retrieval of bm25 row by row.
:param queries: A list of query strings.
:param top_k: The number of passages to be retrieved.
:param tokenizer: A tokenizer that will be used to tokenize queries.
:param bm25_api: A bm25 api instance that will be used to retrieve passages.
:param bm25_corpus: A dictionary containing the bm25 corpus, which is doc_id from corpus and tokenized corpus.
Its data structure looks like this:
.. Code:: python
{
"tokens": [], # 2d list of tokens
"passage_id": [], # 2d list of passage_id. Type must be str.
}
:return: The tuple contains a list of passage ids that retrieved from bm25 and its scores.
"""
# I don't make queries operation to async, because queries length might be small, so it will occur overhead.
tokenized_queries = tokenize(queries, tokenizer)
id_result = []
score_result = []
for query in tokenized_queries:
scores = bm25_api.get_scores(query)
sorted_scores = sorted(scores, reverse=True)
top_n_index = np.argsort(scores)[::-1][:top_k]
ids = [bm25_corpus["passage_id"][i] for i in top_n_index]
id_result.append(ids)
score_result.append(sorted_scores[:top_k])
# make a total result to top_k
id_result, score_result = evenly_distribute_passages(id_result, score_result, top_k)
# sort id_result and score_result by score
result = [
(_id, score)
for score, _id in sorted(
zip(score_result, id_result), key=lambda pair: pair[0], reverse=True
)
]
id_result, score_result = zip(*result)
return list(id_result), list(score_result)
[docs]
def get_bm25_scores(
queries: List[str],
ids: List[str],
tokenizer,
bm25_api: BM25Okapi,
bm25_corpus: Dict,
) -> List[float]:
if len(ids) == 0 or not bool(ids):
return []
tokenized_queries = tokenize(queries, tokenizer)
result_dict = {id_: [] for id_ in ids}
for query in tokenized_queries:
scores = bm25_api.get_scores(query)
for i, id_ in enumerate(ids):
result_dict[id_].append(scores[bm25_corpus["passage_id"].index(id_)])
result_df = pd.DataFrame(result_dict)
return result_df.max(axis=0).tolist()
[docs]
def tokenize(queries: List[str], tokenizer) -> List[List[int]]:
if isinstance(tokenizer, PreTrainedTokenizerBase):
tokenized_queries = tokenizer(queries).input_ids
else:
tokenized_queries = tokenizer(queries)
return tokenized_queries
[docs]
def bm25_ingest(
corpus_path: str, corpus_data: pd.DataFrame, bm25_tokenizer: str = "porter_stemmer"
):
if not corpus_path.endswith(".pkl"):
raise ValueError(f"Corpus path {corpus_path} is not a pickle file.")
validate_corpus_dataset(corpus_data)
ids = corpus_data["doc_id"].tolist()
# Initialize bm25_corpus
bm25_corpus = pd.DataFrame()
# Load the BM25 corpus if it exists and get the passage ids
if os.path.exists(corpus_path) and os.path.getsize(corpus_path) > 0:
with open(corpus_path, "rb") as r:
corpus = pickle.load(r)
bm25_corpus = pd.DataFrame.from_dict(corpus)
duplicated_passage_rows = bm25_corpus[bm25_corpus["passage_id"].isin(ids)]
new_passage = corpus_data[
~corpus_data["doc_id"].isin(duplicated_passage_rows["passage_id"])
]
else:
new_passage = corpus_data
if not new_passage.empty:
tokenizer = select_bm25_tokenizer(bm25_tokenizer)
if isinstance(tokenizer, PreTrainedTokenizerBase):
tokenized_corpus = tokenizer(new_passage["contents"].tolist()).input_ids
else:
tokenized_corpus = tokenizer(new_passage["contents"].tolist())
new_bm25_corpus = pd.DataFrame(
{
"tokens": tokenized_corpus,
"passage_id": new_passage["doc_id"].tolist(),
}
)
if not bm25_corpus.empty:
bm25_corpus_updated = pd.concat(
[bm25_corpus, new_bm25_corpus], ignore_index=True
)
bm25_dict = bm25_corpus_updated.to_dict("list")
else:
bm25_dict = new_bm25_corpus.to_dict("list")
# add tokenizer name to bm25_dict
bm25_dict["tokenizer_name"] = bm25_tokenizer
with open(corpus_path, "wb") as w:
pickle.dump(bm25_dict, w)
[docs]
def select_bm25_tokenizer(
bm25_tokenizer: str,
) -> Callable[[str], List[Union[int, str]]]:
if bm25_tokenizer in list(BM25_TOKENIZER.keys()):
return BM25_TOKENIZER[bm25_tokenizer]
return AutoTokenizer.from_pretrained(bm25_tokenizer, use_fast=False)