Source code for autorag.nodes.retrieval.run

import logging
import os
import pathlib
from copy import deepcopy
from typing import List, Callable, Dict, Tuple, Union

import numpy as np
import pandas as pd

from autorag.evaluation import evaluate_retrieval
from autorag.schema.metricinput import MetricInput
from autorag.strategy import measure_speed, filter_by_threshold, select_best
from autorag.support import get_support_modules
from autorag.utils.util import get_best_row, to_list, apply_recursive

logger = logging.getLogger("AutoRAG")

semantic_module_names = ["vectordb", "VectorDB"]
lexical_module_names = ["bm25", "BM25"]
hybrid_module_names = ["hybrid_rrf", "hybrid_cc", "HybridCC", "HybridRRF"]


[docs] def run_retrieval_node( modules: List, module_params: List[Dict], previous_result: pd.DataFrame, node_line_dir: str, strategies: Dict, ) -> pd.DataFrame: """ Run evaluation and select the best module among retrieval node results. :param modules: Retrieval modules to run. :param module_params: Retrieval module parameters. :param previous_result: Previous result dataframe. Could be query expansion's best result or qa data. :param node_line_dir: This node line's directory. :param strategies: Strategies for retrieval node. :return: The best result dataframe. It contains previous result columns and retrieval node's result columns. """ if not os.path.exists(node_line_dir): os.makedirs(node_line_dir) project_dir = pathlib.PurePath(node_line_dir).parent.parent qa_df = pd.read_parquet( os.path.join(project_dir, "data", "qa.parquet"), engine="pyarrow" ) retrieval_gt = qa_df["retrieval_gt"].tolist() retrieval_gt = apply_recursive(lambda x: str(x), to_list(retrieval_gt)) # make rows to metric_inputs metric_inputs = [ MetricInput(retrieval_gt=ret_gt, query=query, generation_gt=gen_gt) for ret_gt, query, gen_gt in zip( retrieval_gt, qa_df["query"].tolist(), qa_df["generation_gt"].tolist() ) ] save_dir = os.path.join(node_line_dir, "retrieval") # node name if not os.path.exists(save_dir): os.makedirs(save_dir) def run(input_modules, input_module_params) -> Tuple[List[pd.DataFrame], List]: """ Run input modules and parameters. :param input_modules: Input modules :param input_module_params: Input module parameters :return: First, it returns list of result dataframe. Second, it returns list of execution times. """ result, execution_times = zip( *map( lambda task: measure_speed( task[0].run_evaluator, project_dir=project_dir, previous_result=previous_result, **task[1], ), zip(input_modules, input_module_params), ) ) average_times = list(map(lambda x: x / len(result[0]), execution_times)) # run metrics before filtering if strategies.get("metrics") is None: raise ValueError("You must at least one metrics for retrieval evaluation.") result = list( map( lambda x: evaluate_retrieval_node( x, metric_inputs, strategies.get("metrics"), ), result, ) ) return result, average_times def save_and_summary( input_modules, input_module_params, result_list, execution_time_list, filename_start: int, ): """ Save the result and make summary file :param input_modules: Input modules :param input_module_params: Input module parameters :param result_list: Result list :param execution_time_list: Execution times :param filename_start: The first filename to use :return: First, it returns list of result dataframe. Second, it returns list of execution times. """ # save results to folder filepaths = list( map( lambda x: os.path.join(save_dir, f"{x}.parquet"), range(filename_start, filename_start + len(input_modules)), ) ) list( map( lambda x: x[0].to_parquet(x[1], index=False), zip(result_list, filepaths), ) ) # execute save to parquet filename_list = list(map(lambda x: os.path.basename(x), filepaths)) summary_df = pd.DataFrame( { "filename": filename_list, "module_name": list(map(lambda module: module.__name__, input_modules)), "module_params": input_module_params, "execution_time": execution_time_list, **{ metric: list(map(lambda result: result[metric].mean(), result_list)) for metric in strategies.get("metrics") }, } ) summary_df.to_csv(os.path.join(save_dir, "summary.csv"), index=False) return summary_df def find_best(results, average_times, filenames): # filter by strategies if strategies.get("speed_threshold") is not None: results, filenames = filter_by_threshold( results, average_times, strategies["speed_threshold"], filenames ) selected_result, selected_filename = select_best( results, strategies.get("metrics"), filenames, strategies.get("strategy", "mean"), ) return selected_result, selected_filename filename_first = 0 # run semantic modules logger.info("Running retrieval node - semantic retrieval module...") if any([module.__name__ in semantic_module_names for module in modules]): semantic_modules, semantic_module_params = zip( *filter( lambda x: x[0].__name__ in semantic_module_names, zip(modules, module_params), ) ) semantic_results, semantic_times = run(semantic_modules, semantic_module_params) semantic_summary_df = save_and_summary( semantic_modules, semantic_module_params, semantic_results, semantic_times, filename_first, ) semantic_selected_result, semantic_selected_filename = find_best( semantic_results, semantic_times, semantic_summary_df["filename"].tolist() ) semantic_summary_df["is_best"] = ( semantic_summary_df["filename"] == semantic_selected_filename ) filename_first += len(semantic_modules) else: ( semantic_selected_filename, semantic_summary_df, semantic_results, semantic_times, ) = None, pd.DataFrame(), [], [] # run lexical modules logger.info("Running retrieval node - lexical retrieval module...") if any([module.__name__ in lexical_module_names for module in modules]): lexical_modules, lexical_module_params = zip( *filter( lambda x: x[0].__name__ in lexical_module_names, zip(modules, module_params), ) ) lexical_results, lexical_times = run(lexical_modules, lexical_module_params) lexical_summary_df = save_and_summary( lexical_modules, lexical_module_params, lexical_results, lexical_times, filename_first, ) lexical_selected_result, lexical_selected_filename = find_best( lexical_results, lexical_times, lexical_summary_df["filename"].tolist() ) lexical_summary_df["is_best"] = ( lexical_summary_df["filename"] == lexical_selected_filename ) filename_first += len(lexical_modules) else: ( lexical_selected_filename, lexical_summary_df, lexical_results, lexical_times, ) = None, pd.DataFrame(), [], [] logger.info("Running retrieval node - hybrid retrieval module...") # Next, run hybrid retrieval if any([module.__name__ in hybrid_module_names for module in modules]): hybrid_modules, hybrid_module_params = zip( *filter( lambda x: x[0].__name__ in hybrid_module_names, zip(modules, module_params), ) ) if all( ["target_module_params" in x for x in hybrid_module_params] ): # for Runner.run # If target_module_params are already given, run hybrid retrieval directly hybrid_results, hybrid_times = run(hybrid_modules, hybrid_module_params) hybrid_summary_df = save_and_summary( hybrid_modules, hybrid_module_params, hybrid_results, hybrid_times, filename_first, ) filename_first += len(hybrid_modules) else: # for Evaluator # get id and score ids_scores = get_ids_and_scores( save_dir, [semantic_selected_filename, lexical_selected_filename], semantic_summary_df, lexical_summary_df, previous_result, ) hybrid_module_params = list( map(lambda x: {**x, **ids_scores}, hybrid_module_params) ) # optimize each modules real_hybrid_times = [ get_hybrid_execution_times(semantic_summary_df, lexical_summary_df) ] * len(hybrid_module_params) hybrid_times = real_hybrid_times.copy() hybrid_results = [] for module, module_param in zip(hybrid_modules, hybrid_module_params): module_result_df, module_best_weight = optimize_hybrid( module, module_param, strategies, metric_inputs, project_dir, previous_result, ) module_param["weight"] = module_best_weight hybrid_results.append(module_result_df) hybrid_summary_df = save_and_summary( hybrid_modules, hybrid_module_params, hybrid_results, hybrid_times, filename_first, ) filename_first += len(hybrid_modules) hybrid_summary_df["execution_time"] = hybrid_times best_semantic_summary_row = semantic_summary_df.loc[ semantic_summary_df["is_best"] ].iloc[0] best_lexical_summary_row = lexical_summary_df.loc[ lexical_summary_df["is_best"] ].iloc[0] target_modules = ( best_semantic_summary_row["module_name"], best_lexical_summary_row["module_name"], ) target_module_params = ( best_semantic_summary_row["module_params"], best_lexical_summary_row["module_params"], ) hybrid_summary_df = edit_summary_df_params( hybrid_summary_df, target_modules, target_module_params ) else: if any([module.__name__ in hybrid_module_names for module in modules]): logger.warning( "You must at least one semantic module and lexical module for hybrid evaluation." "Passing hybrid module." ) _, hybrid_summary_df, hybrid_results, hybrid_times = ( None, pd.DataFrame(), [], [], ) summary = pd.concat( [semantic_summary_df, lexical_summary_df, hybrid_summary_df], ignore_index=True ) results = semantic_results + lexical_results + hybrid_results average_times = semantic_times + lexical_times + hybrid_times filenames = summary["filename"].tolist() # filter by strategies selected_result, selected_filename = find_best(results, average_times, filenames) best_result = pd.concat([previous_result, selected_result], axis=1) # add summary.csv 'is_best' column summary["is_best"] = summary["filename"] == selected_filename # save the result files best_result.to_parquet( os.path.join( save_dir, f"best_{os.path.splitext(selected_filename)[0]}.parquet" ), index=False, ) summary.to_csv(os.path.join(save_dir, "summary.csv"), index=False) return best_result
[docs] def evaluate_retrieval_node( result_df: pd.DataFrame, metric_inputs: List[MetricInput], metrics: Union[List[str], List[Dict]], ) -> pd.DataFrame: """ Evaluate retrieval node from retrieval node result dataframe. :param result_df: The result dataframe from a retrieval node. :param metric_inputs: List of metric input schema for AutoRAG. :param metrics: Metric list from input strategies. :return: Return result_df with metrics columns. The columns will be 'retrieved_contents', 'retrieved_ids', 'retrieve_scores', and metric names. """ @evaluate_retrieval( metric_inputs=metric_inputs, metrics=metrics, ) def evaluate_this_module(df: pd.DataFrame): return ( df["retrieved_contents"].tolist(), df["retrieved_ids"].tolist(), df["retrieve_scores"].tolist(), ) return evaluate_this_module(result_df)
[docs] def edit_summary_df_params( summary_df: pd.DataFrame, target_modules, target_module_params ) -> pd.DataFrame: def delete_ids_scores(x): del x["ids"] del x["scores"] return x summary_df["module_params"] = summary_df["module_params"].apply(delete_ids_scores) summary_df["new_params"] = [ {"target_modules": target_modules, "target_module_params": target_module_params} ] * len(summary_df) summary_df["module_params"] = summary_df.apply( lambda row: {**row["module_params"], **row["new_params"]}, axis=1 ) summary_df = summary_df.drop(columns=["new_params"]) return summary_df
[docs] def get_ids_and_scores( node_dir: str, filenames: List[str], semantic_summary_df: pd.DataFrame, lexical_summary_df: pd.DataFrame, previous_result, ) -> Dict[str, Tuple[List[List[str]], List[List[float]]]]: project_dir = pathlib.PurePath(node_dir).parent.parent.parent best_results_df = list( map( lambda filename: pd.read_parquet( os.path.join(node_dir, filename), engine="pyarrow" ), filenames, ) ) ids = tuple( map(lambda df: df["retrieved_ids"].apply(list).tolist(), best_results_df) ) scores = tuple( map(lambda df: df["retrieve_scores"].apply(list).tolist(), best_results_df) ) # search non-duplicate ids semantic_ids = deepcopy(ids[0]) lexical_ids = deepcopy(ids[1]) def get_non_duplicate_ids(target_ids, compare_ids) -> List[List[str]]: """ Get non-duplicate ids from target_ids and compare_ids. If you want to non-duplicate ids of semantic_ids, you have to put it at target_ids. """ result_ids = [] assert len(target_ids) == len(compare_ids) for target_id_list, compare_id_list in zip(target_ids, compare_ids): query_duplicated = list(set(compare_id_list) - set(target_id_list)) duplicate_list = query_duplicated if len(query_duplicated) != 0 else [] result_ids.append(duplicate_list) return result_ids lexical_target_ids = get_non_duplicate_ids(lexical_ids, semantic_ids) semantic_target_ids = get_non_duplicate_ids(semantic_ids, lexical_ids) new_id_tuple = ( [a + b for a, b in zip(semantic_ids, semantic_target_ids)], [a + b for a, b in zip(lexical_ids, lexical_target_ids)], ) # search non-duplicate ids' scores new_semantic_scores = get_scores_by_ids( semantic_target_ids, semantic_summary_df, project_dir, previous_result ) new_lexical_scores = get_scores_by_ids( lexical_target_ids, lexical_summary_df, project_dir, previous_result ) new_score_tuple = ( [a + b for a, b in zip(scores[0], new_semantic_scores)], [a + b for a, b in zip(scores[1], new_lexical_scores)], ) return { "ids": new_id_tuple, "scores": new_score_tuple, }
[docs] def get_scores_by_ids( ids: List[List[str]], module_summary_df: pd.DataFrame, project_dir, previous_result ) -> List[List[float]]: module_name = get_best_row(module_summary_df)["module_name"] module_params = get_best_row(module_summary_df)["module_params"] module = get_support_modules(module_name) result_df = module.run_evaluator( project_dir=project_dir, previous_result=previous_result, ids=ids, **module_params, ) return to_list(result_df["retrieve_scores"].tolist())
[docs] def find_unique_elems(list1: List[str], list2: List[str]) -> List[str]: return list(set(list1).symmetric_difference(set(list2)))
[docs] def get_hybrid_execution_times(lexical_summary, semantic_summary) -> float: lexical_execution_time = lexical_summary.loc[lexical_summary["is_best"]].iloc[0][ "execution_time" ] semantic_execution_time = semantic_summary.loc[semantic_summary["is_best"]].iloc[0][ "execution_time" ] return lexical_execution_time + semantic_execution_time
[docs] def optimize_hybrid( hybrid_module_func: Callable, hybrid_module_param: Dict, strategy: Dict, input_metrics: List[MetricInput], project_dir, previous_result, ): if ( hybrid_module_func.__name__ == "HybridRRF" or hybrid_module_func.__name__ == "hybrid_rrf" ): weight_range = hybrid_module_param.pop("weight_range", (4, 80)) test_weight_size = weight_range[1] - weight_range[0] + 1 elif ( hybrid_module_func.__name__ == "HybridCC" or hybrid_module_func.__name__ == "hybrid_cc" ): weight_range = hybrid_module_param.pop("weight_range", (0.0, 1.0)) test_weight_size = hybrid_module_param.pop("test_weight_size", 101) else: raise ValueError("You must input hybrid module function at hybrid_module_func.") weight_candidates = np.linspace( weight_range[0], weight_range[1], test_weight_size ).tolist() result_list = [] for weight_value in weight_candidates: result_df = hybrid_module_func.run_evaluator( project_dir=project_dir, previous_result=previous_result, weight=weight_value, **hybrid_module_param, ) result_list.append(result_df) # evaluate here if strategy.get("metrics") is None: raise ValueError("You must at least one metrics for retrieval evaluation.") result_list = list( map( lambda x: evaluate_retrieval_node( x, input_metrics, strategy.get("metrics"), ), result_list, ) ) # select best result best_result_df, best_weight = select_best( result_list, strategy.get("metrics"), metadatas=weight_candidates, strategy_name=strategy.get("strategy", "normalize_mean"), ) return best_result_df, best_weight