Source code for autorag.nodes.passagereranker.cohere

import os
from typing import List, Tuple

import cohere
import pandas as pd
from cohere import RerankResponseResultsItem

from autorag.nodes.passagereranker.base import BasePassageReranker
from autorag.utils.util import get_event_loop, process_batch, result_to_dataframe


[docs] class CohereReranker(BasePassageReranker): def __init__(self, project_dir: str, *args, **kwargs): """ Initialize Cohere rerank node. :param project_dir: The project directory path. :param api_key: The API key for Cohere rerank. You can set it in the environment variable COHERE_API_KEY. Or, you can directly set it on the config YAML file using this parameter. Default is env variable "COHERE_API_KEY". :param kwargs: Extra arguments that are not affected """ super().__init__(project_dir) api_key = kwargs.pop("api_key", None) api_key = os.getenv("COHERE_API_KEY", None) if api_key is None else api_key if api_key is None: api_key = os.getenv("CO_API_KEY", None) if api_key is None: raise KeyError( "Please set the API key for Cohere rerank in the environment variable COHERE_API_KEY " "or directly set it on the config YAML file." ) self.cohere_client = cohere.AsyncClientV2(api_key=api_key) def __del__(self): del self.cohere_client super().__del__()
[docs] @result_to_dataframe(["retrieved_contents", "retrieved_ids", "retrieve_scores"]) def pure(self, previous_result: pd.DataFrame, *args, **kwargs): queries, contents, scores, ids = self.cast_to_run(previous_result) top_k = kwargs.pop("top_k") batch = kwargs.pop("batch", 64) model = kwargs.pop("model", "rerank-v3.5") return self._pure(queries, contents, scores, ids, top_k, batch, model)
def _pure( self, queries: List[str], contents_list: List[List[str]], scores_list: List[List[float]], ids_list: List[List[str]], top_k: int, batch: int = 64, model: str = "rerank-v3.5", ) -> Tuple[List[List[str]], List[List[str]], List[List[float]]]: """ Rerank a list of contents with Cohere rerank models. You can get the API key from https://cohere.com/rerank and set it in the environment variable COHERE_API_KEY. :param queries: The list of queries to use for reranking :param contents_list: The list of lists of contents to rerank :param scores_list: The list of lists of scores retrieved from the initial ranking :param ids_list: The list of lists of ids retrieved from the initial ranking :param top_k: The number of passages to be retrieved :param batch: The number of queries to be processed in a batch :param model: The model name for Cohere rerank. You can choose between "rerank-v3.5", "rerank-english-v3.0", and "rerank-multilingual-v3.0". Default is "rerank-v3.5". :return: Tuple of lists containing the reranked contents, ids, and scores """ # Run async cohere_rerank_pure function tasks = [ cohere_rerank_pure(self.cohere_client, model, query, document, ids, top_k) for query, document, ids in zip(queries, contents_list, ids_list) ] loop = get_event_loop() results = loop.run_until_complete(process_batch(tasks, batch_size=batch)) content_result = list(map(lambda x: x[0], results)) id_result = list(map(lambda x: x[1], results)) score_result = list(map(lambda x: x[2], results)) return content_result, id_result, score_result
[docs] async def cohere_rerank_pure( cohere_client: cohere.AsyncClient, model: str, query: str, documents: List[str], ids: List[str], top_k: int, ) -> Tuple[List[str], List[str], List[float]]: """ Rerank a list of contents with Cohere rerank models. :param cohere_client: The Cohere AsyncClient to use for reranking :param model: The model name for Cohere rerank :param query: The query to use for reranking :param documents: The list of contents to rerank :param ids: The list of ids corresponding to the documents :param top_k: The number of passages to be retrieved :return: Tuple of lists containing the reranked contents, ids, and scores """ rerank_results = await cohere_client.rerank( model=model, query=query, documents=documents, top_n=top_k, return_documents=False, ) results: List[RerankResponseResultsItem] = rerank_results.results reranked_scores: List[float] = list(map(lambda x: x.relevance_score, results)) indices = list(map(lambda x: x.index, results)) reranked_contents: List[str] = list(map(lambda i: documents[i], indices)) reranked_ids: List[str] = list(map(lambda i: ids[i], indices)) return reranked_contents, reranked_ids, reranked_scores