import math from collections import Counter from typing import Optional import numpy as np from core.model_manager import ModelManager from core.model_runtime.entities.model_entities import ModelType from core.rag.datasource.keyword.jieba.jieba_keyword_table_handler import JiebaKeywordTableHandler from core.rag.embedding.cached_embedding import CacheEmbedding from core.rag.models.document import Document from core.rag.rerank.entity.weight import VectorSetting, Weights from core.rag.rerank.rerank_base import BaseRerankRunner class WeightRerankRunner(BaseRerankRunner): def __init__(self, tenant_id: str, weights: Weights) -> None: self.tenant_id = tenant_id self.weights = weights def run( self, query: str, documents: list[Document], score_threshold: Optional[float] = None, top_n: Optional[int] = None, user: Optional[str] = None, ) -> list[Document]: """ Run rerank model :param query: search query :param documents: documents for reranking :param score_threshold: score threshold :param top_n: top n :param user: unique user id if needed :return: """ docs = [] doc_id = [] unique_documents = [] for document in documents: if document.metadata["doc_id"] not in doc_id: doc_id.append(document.metadata["doc_id"]) docs.append(document.page_content) unique_documents.append(document) documents = unique_documents rerank_documents = [] query_scores = self._calculate_keyword_score(query, documents) query_vector_scores = self._calculate_cosine(self.tenant_id, query, documents, self.weights.vector_setting) for document, query_score, query_vector_score in zip(documents, query_scores, query_vector_scores): # format document score = ( self.weights.vector_setting.vector_weight * query_vector_score + self.weights.keyword_setting.keyword_weight * query_score ) if score_threshold and score < score_threshold: continue document.metadata["score"] = score rerank_documents.append(document) rerank_documents = sorted(rerank_documents, key=lambda x: x.metadata["score"], reverse=True) return rerank_documents[:top_n] if top_n else rerank_documents def _calculate_keyword_score(self, query: str, documents: list[Document]) -> list[float]: """ Calculate BM25 scores :param query: search query :param documents: documents for reranking :return: """ keyword_table_handler = JiebaKeywordTableHandler() query_keywords = keyword_table_handler.extract_keywords(query, None) documents_keywords = [] for document in documents: # get the document keywords document_keywords = keyword_table_handler.extract_keywords(document.page_content, None) document.metadata["keywords"] = document_keywords documents_keywords.append(document_keywords) # Counter query keywords(TF) query_keyword_counts = Counter(query_keywords) # total documents total_documents = len(documents) # calculate all documents' keywords IDF all_keywords = set() for document_keywords in documents_keywords: all_keywords.update(document_keywords) keyword_idf = {} for keyword in all_keywords: # calculate include query keywords' documents doc_count_containing_keyword = sum(1 for doc_keywords in documents_keywords if keyword in doc_keywords) # IDF keyword_idf[keyword] = math.log((1 + total_documents) / (1 + doc_count_containing_keyword)) + 1 query_tfidf = {} for keyword, count in query_keyword_counts.items(): tf = count idf = keyword_idf.get(keyword, 0) query_tfidf[keyword] = tf * idf # calculate all documents' TF-IDF documents_tfidf = [] for document_keywords in documents_keywords: document_keyword_counts = Counter(document_keywords) document_tfidf = {} for keyword, count in document_keyword_counts.items(): tf = count idf = keyword_idf.get(keyword, 0) document_tfidf[keyword] = tf * idf documents_tfidf.append(document_tfidf) def cosine_similarity(vec1, vec2): intersection = set(vec1.keys()) & set(vec2.keys()) numerator = sum(vec1[x] * vec2[x] for x in intersection) sum1 = sum(vec1[x] ** 2 for x in vec1) sum2 = sum(vec2[x] ** 2 for x in vec2) denominator = math.sqrt(sum1) * math.sqrt(sum2) if not denominator: return 0.0 else: return float(numerator) / denominator similarities = [] for document_tfidf in documents_tfidf: similarity = cosine_similarity(query_tfidf, document_tfidf) similarities.append(similarity) # for idx, similarity in enumerate(similarities): # print(f"Document {idx + 1} similarity: {similarity}") return similarities def _calculate_cosine( self, tenant_id: str, query: str, documents: list[Document], vector_setting: VectorSetting ) -> list[float]: """ Calculate Cosine scores :param query: search query :param documents: documents for reranking :return: """ query_vector_scores = [] model_manager = ModelManager() embedding_model = model_manager.get_model_instance( tenant_id=tenant_id, provider=vector_setting.embedding_provider_name, model_type=ModelType.TEXT_EMBEDDING, model=vector_setting.embedding_model_name, ) cache_embedding = CacheEmbedding(embedding_model) query_vector = cache_embedding.embed_query(query) for document in documents: # calculate cosine similarity if "score" in document.metadata: query_vector_scores.append(document.metadata["score"]) else: # transform to NumPy vec1 = np.array(query_vector) vec2 = np.array(document.vector) # calculate dot product dot_product = np.dot(vec1, vec2) # calculate norm norm_vec1 = np.linalg.norm(vec1) norm_vec2 = np.linalg.norm(vec2) # calculate cosine similarity cosine_sim = dot_product / (norm_vec1 * norm_vec2) query_vector_scores.append(cosine_sim) return query_vector_scores