Source code for autorag.nodes.passageaugmenter.base

import functools
import logging
import os
from pathlib import Path
from typing import List, Union, Tuple

import numpy as np
import pandas as pd

from autorag import embedding_models
from autorag.evaluation.metric.util import calculate_cosine_similarity
from autorag.utils import (
	result_to_dataframe,
	validate_qa_dataset,
	fetch_contents,
	sort_by_scores,
	validate_corpus_dataset,
	cast_corpus_dataset,
)
from autorag.utils.util import filter_dict_keys, select_top_k, embedding_query_content

logger = logging.getLogger("AutoRAG")


[docs] def passage_augmenter_node(func): @functools.wraps(func) @result_to_dataframe(["retrieved_contents", "retrieved_ids", "retrieve_scores"]) def wrapper( project_dir: Union[str, Path], previous_result: pd.DataFrame, *args, **kwargs ) -> Tuple[List[List[str]], List[List[str]], List[List[float]]]: logger.info(f"Running passage augmenter node - {func.__name__} module...") validate_qa_dataset(previous_result) data_dir = os.path.join(project_dir, "data") # find queries columns assert ( "query" in previous_result.columns ), "previous_result must have query column." queries = previous_result["query"].tolist() # find ids columns assert ( "retrieved_ids" in previous_result.columns ), "previous_result must have retrieved_ids column." ids = previous_result["retrieved_ids"].tolist() corpus_df = pd.read_parquet( os.path.join(data_dir, "corpus.parquet"), engine="pyarrow" ) validate_corpus_dataset(corpus_df) # get top_k top_k = kwargs.pop("top_k") if func.__name__ == "prev_next_augmenter": corpus_df = cast_corpus_dataset(corpus_df) slim_corpus_df = corpus_df[["doc_id", "metadata"]] slim_corpus_df.loc[:, "metadata"] = slim_corpus_df["metadata"].apply( filter_dict_keys, keys=["prev_id", "next_id"] ) mode = kwargs.pop("mode", "both") num_passages = kwargs.pop("num_passages", 1) # get augmented ids augmented_ids = func( ids_list=ids, corpus_df=slim_corpus_df, mode=mode, num_passages=num_passages, ) # fetch contents from corpus to use augmented ids augmented_contents = fetch_contents(corpus_df, augmented_ids) # set embedding model for getting scores embedding_model_str = kwargs.pop("embedding_model", "openai") embedding_model = embedding_models[embedding_model_str] query_embeddings, contents_embeddings = embedding_query_content( queries, augmented_contents, embedding_model, batch=128 ) # get scores from calculated cosine similarity augmented_scores = [ np.array( [ calculate_cosine_similarity(query_embedding, x) for x in content_embeddings ] ).tolist() for query_embedding, content_embeddings in zip( query_embeddings, contents_embeddings ) ] else: contents = previous_result["retrieved_contents"].tolist() scores = previous_result["retrieve_scores"].tolist() augmented_ids, augmented_contents, augmented_scores = func( ids_list=ids, contents_list=contents, scores_list=scores, **kwargs ) # sort by scores df = pd.DataFrame( { "contents": augmented_contents, "ids": augmented_ids, "scores": augmented_scores, } ) df[["contents", "ids", "scores"]] = df.apply( sort_by_scores, axis=1, result_type="expand" ) # select by top_k results = select_top_k(df, ["contents", "ids", "scores"], top_k) return ( results["contents"].tolist(), results["ids"].tolist(), results["scores"].tolist(), ) return wrapper