Source code for autorag.nodes.queryexpansion.run

import logging
import os
import pathlib
from copy import deepcopy
from typing import List, Dict, Optional

import pandas as pd

from autorag.nodes.retrieval.run import evaluate_retrieval_node
from autorag.schema.metricinput import MetricInput
from autorag.strategy import measure_speed, filter_by_threshold, select_best
from autorag.support import get_support_modules
from autorag.utils.util import make_combinations, explode

logger = logging.getLogger("AutoRAG")


[docs] def run_query_expansion_node( modules: List, module_params: List[Dict], previous_result: pd.DataFrame, node_line_dir: str, strategies: Dict, ) -> pd.DataFrame: """ Run evaluation and select the best module among query expansion node results. Initially, retrieval is run using expanded_queries, the result of the query_expansion module. The retrieval module is run as a combination of the retrieval_modules in strategies. If there are multiple retrieval_modules, run them all and choose the best result. If there are no retrieval_modules, run them with the default of bm25. In this way, the best result is selected for each module, and then the best result is selected. :param modules: Query expansion modules to run. :param module_params: Query expansion module parameters. :param previous_result: Previous result dataframe. In this case, it would be qa data. :param node_line_dir: This node line's directory. :param strategies: Strategies for query expansion node. :return: The best result dataframe. """ if not os.path.exists(node_line_dir): os.makedirs(node_line_dir) node_dir = os.path.join(node_line_dir, "query_expansion") if not os.path.exists(node_dir): os.makedirs(node_dir) project_dir = pathlib.PurePath(node_line_dir).parent.parent # run query expansion results, execution_times = zip( *map( lambda task: measure_speed( task[0].run_evaluator, project_dir=project_dir, previous_result=previous_result, **task[1], ), zip(modules, module_params), ) ) average_times = list(map(lambda x: x / len(results[0]), execution_times)) # save results to folder pseudo_module_params = deepcopy(module_params) for i, module_param in enumerate(pseudo_module_params): if "prompt" in module_params: module_param["prompt"] = str(i) filepaths = list( map(lambda x: os.path.join(node_dir, f"{x}.parquet"), range(len(modules))) ) list( map(lambda x: x[0].to_parquet(x[1], index=False), zip(results, filepaths)) ) # execute save to parquet filenames = list(map(lambda x: os.path.basename(x), filepaths)) # make summary file summary_df = pd.DataFrame( { "filename": filenames, "module_name": list(map(lambda module: module.__name__, modules)), "module_params": module_params, "execution_time": average_times, } ) # Run evaluation when there are more than one module. if len(modules) > 1: # pop general keys from strategies (e.g. metrics, speed_threshold) general_key = ["metrics", "speed_threshold", "strategy"] general_strategy = dict( filter(lambda x: x[0] in general_key, strategies.items()) ) extra_strategy = dict( filter(lambda x: x[0] not in general_key, strategies.items()) ) # first, filter by threshold if it is enabled. if general_strategy.get("speed_threshold") is not None: results, filenames = filter_by_threshold( results, average_times, general_strategy["speed_threshold"], filenames ) # check metrics in strategy if general_strategy.get("metrics") is None: raise ValueError( "You must at least one metrics for query expansion evaluation." ) if extra_strategy.get("top_k") is None: extra_strategy["top_k"] = 10 # default value # get retrieval modules from strategy retrieval_callables, retrieval_params = make_retrieval_callable_params( extra_strategy ) # get retrieval_gt retrieval_gt = pd.read_parquet( os.path.join(project_dir, "data", "qa.parquet"), engine="pyarrow" )["retrieval_gt"].tolist() # make rows to metric_inputs metric_inputs = [ MetricInput(retrieval_gt=ret_gt, query=query, generation_gt=gen_gt) for ret_gt, query, gen_gt in zip( retrieval_gt, previous_result["query"].tolist(), previous_result["generation_gt"].tolist(), ) ] # run evaluation evaluation_results = list( map( lambda result: evaluate_one_query_expansion_node( retrieval_callables, retrieval_params, [ setattr(metric_input, "queries", queries) or metric_input for metric_input, queries in zip( metric_inputs, result["queries"].to_list() ) ], general_strategy["metrics"], project_dir, previous_result, general_strategy.get("strategy", "mean"), ), results, ) ) evaluation_df = pd.DataFrame( { "filename": filenames, **{ f"query_expansion_{metric_name}": list( map(lambda x: x[metric_name].mean(), evaluation_results) ) for metric_name in general_strategy["metrics"] }, } ) summary_df = pd.merge( on="filename", left=summary_df, right=evaluation_df, how="left" ) best_result, best_filename = select_best( evaluation_results, general_strategy["metrics"], filenames, strategies.get("strategy", "mean"), ) # change metric name columns to query_expansion_metric_name best_result = best_result.rename( columns={ metric_name: f"query_expansion_{metric_name}" for metric_name in strategies["metrics"] } ) best_result = best_result.drop( columns=["retrieved_contents", "retrieved_ids", "retrieve_scores"] ) else: best_result, best_filename = results[0], filenames[0] best_result = pd.concat([previous_result, best_result], axis=1) # add 'is_best' column at summary file summary_df["is_best"] = summary_df["filename"] == best_filename # save files summary_df.to_csv(os.path.join(node_dir, "summary.csv"), index=False) best_result.to_parquet( os.path.join(node_dir, f"best_{os.path.splitext(best_filename)[0]}.parquet"), index=False, ) return best_result
[docs] def evaluate_one_query_expansion_node( retrieval_funcs: List, retrieval_params: List[Dict], metric_inputs: List[MetricInput], metrics: List[str], project_dir, previous_result: pd.DataFrame, strategy_name: str, ) -> pd.DataFrame: previous_result["queries"] = [ metric_input.queries for metric_input in metric_inputs ] retrieval_results = list( map( lambda x: x[0].run_evaluator( project_dir=project_dir, previous_result=previous_result, **x[1] ), zip(retrieval_funcs, retrieval_params), ) ) evaluation_results = list( map( lambda x: evaluate_retrieval_node( x, metric_inputs, metrics, ), retrieval_results, ) ) best_result, _ = select_best( evaluation_results, metrics, strategy_name=strategy_name ) best_result = pd.concat([previous_result, best_result], axis=1) return best_result
[docs] def make_retrieval_callable_params(strategy_dict: Dict): """ strategy_dict looks like this: .. Code:: json { "metrics": ["retrieval_f1", "retrieval_recall"], "top_k": 50, "retrieval_modules": [ {"module_type": "bm25"}, {"module_type": "vectordb", "embedding_model": ["openai", "huggingface"]} ] } """ node_dict = deepcopy(strategy_dict) retrieval_module_list: Optional[List[Dict]] = node_dict.pop( "retrieval_modules", None ) if retrieval_module_list is None: retrieval_module_list = [ { "module_type": "bm25", } ] node_params = node_dict modules = list( map( lambda module_dict: get_support_modules(module_dict.pop("module_type")), retrieval_module_list, ) ) param_combinations = list( map( lambda module_dict: make_combinations({**module_dict, **node_params}), retrieval_module_list, ) ) return explode(modules, param_combinations)