diff --git a/haystack_rag/__init__.py b/haystack_rag/__init__.py deleted file mode 100644 index a331a38..0000000 --- a/haystack_rag/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -# haystack_rag module -from .main import run_chat_session -from .rag_pipeline import build_rag_pipeline - -__all__ = ["run_chat_session", "build_rag_pipeline"] \ No newline at end of file diff --git a/haystack_rag/api.py b/haystack_rag/api.py deleted file mode 100644 index c30c646..0000000 --- a/haystack_rag/api.py +++ /dev/null @@ -1,223 +0,0 @@ -# app.py -from fastapi import FastAPI, HTTPException, Depends -from pydantic import BaseModel -from typing import List, Optional -import logging - -from haystack import Document - -# Import necessary components from the provided code -from .data_handling import initialize_milvus_lite -from .main import initialize_document_embedder -from .retrieval import initialize_vector_retriever -from .embedding import initialize_text_embedder - -# Setup logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Initialize FastAPI app -app = FastAPI(title="Document Embedding and Retrieval API") - - -# Define request and response models -class EmbedRequest(BaseModel): - user_id: str - content: str - meta: Optional[dict] = {} - - -class RetrieveRequest(BaseModel): - user_id: str - query: str - - -class DocumentResponse(BaseModel): - content: str - score: Optional[float] = None - meta: Optional[dict] = {} - - -class RetrieveResponse(BaseModel): - documents: List[DocumentResponse] - query: str - answer: Optional[str] = None - - -# Helper functions -def get_document_embedder(): - return initialize_document_embedder() - - -def get_document_store(user_id: str): - return initialize_milvus_lite(user_id) - - -@app.post("/embed", response_model=dict) -async def embed_document( - request: EmbedRequest, embedder=Depends(get_document_embedder) -): - """ - Embed content and store it in a Milvus collection for the specified user. - """ - try: - # Initialize document store for the user - document_store = get_document_store(request.user_id) - - # Create a document with user content - meta = request.meta.copy() - meta["user_id"] = request.user_id # Ensure user_id is in meta - user_doc = Document(content=request.content, meta=meta) - - # Embed the document - logger.info(f"Embedding document for user {request.user_id}") - embedding_result = embedder.run([user_doc]) - embedded_docs = embedding_result.get("documents", []) - - if not embedded_docs: - raise HTTPException(status_code=500, detail="Failed to embed document") - - # Write to document store - logger.info(f"Writing embedded document to Milvus for user {request.user_id}") - document_store.write_documents(embedded_docs) - - return { - "status": "success", - "message": f"Document embedded and stored for user {request.user_id}", - } - - except Exception as e: - logger.error(f"Error embedding document: {str(e)}") - raise HTTPException( - status_code=500, detail=f"Error embedding document: {str(e)}" - ) - - -@app.post("/retrieve", response_model=RetrieveResponse) -async def retrieve_documents(request: RetrieveRequest): - """ - Retrieve similar documents for a user based on a query without LLM generation. - Only retrieves documents using vector similarity. - """ - try: - # Get document store for the user - document_store = get_document_store(request.user_id) - - # Initialize text embedder for query embedding - text_embedder = initialize_text_embedder() - - # Initialize retriever - retriever = initialize_vector_retriever(document_store) - - # Embed the query - logger.info(f"Embedding query for user {request.user_id}: '{request.query}'") - embedding_result = text_embedder.run(text=request.query) - query_embedding = embedding_result.get("embedding") - - if not query_embedding: - raise HTTPException(status_code=500, detail="Failed to embed query") - - # Retrieve similar documents - logger.info(f"Retrieving documents for query: '{request.query}'") - retriever_result = retriever.run(query_embedding=query_embedding) - retrieved_docs = retriever_result.get("documents", []) - - # Convert to response format - documents = [] - for doc in retrieved_docs: - documents.append( - DocumentResponse( - content=doc.content, - score=doc.score if hasattr(doc, "score") else None, - meta=doc.meta, - ) - ) - - return RetrieveResponse(documents=documents, query=request.query, answer=None) - - except Exception as e: - logger.error(f"Error retrieving documents: {str(e)}") - raise HTTPException( - status_code=500, detail=f"Error retrieving documents: {str(e)}" - ) - - -# Add these imports at the top if not already there -from typing import List, Optional, Literal - - -# Define message models for the new endpoint -class Message(BaseModel): - content: str - role: str - name: Optional[str] = None - - -class EmbedMessagesRequest(BaseModel): - user_id: str - messages: List[Message] - meta: Optional[dict] = {} - - -@app.post("/embed_messages", response_model=dict) -async def embed_messages( - request: EmbedMessagesRequest, embedder=Depends(get_document_embedder) -): - """ - Process a messages array, extract content from user messages, - concatenate them with newlines, then embed and store in Milvus. - """ - try: - # Initialize document store for the user - document_store = get_document_store(request.user_id) - - # Filter messages to keep only those with role="user" - user_messages = [msg for msg in request.messages if msg.role == "user"] - - # Extract content from each user message - user_contents = [msg.content for msg in user_messages] - - # Join contents with newline character - concatenated_content = "\n".join(user_contents) - - if not concatenated_content.strip(): - return { - "status": "warning", - "message": "No user messages found or all user messages were empty", - } - - # Create a document with concatenated content - meta = request.meta.copy() - meta["user_id"] = request.user_id # Ensure user_id is in meta - user_doc = Document(content=concatenated_content, meta=meta) - - # Embed the document - logger.info(f"Embedding concatenated user messages for user {request.user_id}") - embedding_result = embedder.run([user_doc]) - embedded_docs = embedding_result.get("documents", []) - - if not embedded_docs: - raise HTTPException(status_code=500, detail="Failed to embed document") - - # Write to document store - logger.info(f"Writing embedded document to Milvus for user {request.user_id}") - document_store.write_documents(embedded_docs) - - return { - "status": "success", - "message": f"User messages embedded and stored for user {request.user_id}", - "processed_messages_count": len(user_messages), - "concatenated_length": len(concatenated_content), - } - - except Exception as e: - logger.error(f"Error embedding messages: {str(e)}") - raise HTTPException( - status_code=500, detail=f"Error embedding messages: {str(e)}" - ) - - -if __name__ == "__main__": - import uvicorn - - uvicorn.run(app, host="0.0.0.0", port=7999) diff --git a/haystack_rag/data_handling.py b/haystack_rag/data_handling.py deleted file mode 100644 index d642752..0000000 --- a/haystack_rag/data_handling.py +++ /dev/null @@ -1,96 +0,0 @@ -# data_handling.py -import os -from pathlib import Path -from typing import List, Optional -import logging # Added logging - -from haystack import Document -from milvus_haystack import MilvusDocumentStore - -# Import config variables needed -from config import ( - OPENAI_EMBEDDING_DIM, # Keep for logging/validation if desired, but not passed to init - USER_ID_PREFIX, - MILVUS_PERSIST_BASE_DIR, - MILVUS_INDEX_PARAMS, - MILVUS_SEARCH_PARAMS, - MILVUS_STAND_URI, -) - -logger = logging.getLogger(__name__) # Use logger - - -# get_user_milvus_path function remains the same -def get_user_milvus_path(user_id: str, base_dir: Path = MILVUS_PERSIST_BASE_DIR) -> str: - """ - 获取指定用户的 Milvus Lite 数据库文件路径。 - 该函数会执行以下操作: - 1. 基于- `base_dir` 和 `user_id` 构建一个用户专属的目录路径。 - 2. 确保该目录存在,如果不存在则会创建它。 - 3. 将目录路径与固定的数据库文件名 "milvus_lite.db" 组合。 - 4. 返回最终的完整文件路径(字符串格式)。 - Args: - user_id (str): 用户的唯一标识符。 - base_dir (Path, optional): Milvus 数据持久化的根目录. - 默认为 MILVUS_PERSIST_BASE_DIR. - Returns: - str: 指向用户 Milvus 数据库文件的完整路径字符串。 - """ - user_db_dir = base_dir / user_id - user_db_dir.mkdir(parents=True, exist_ok=True) - return str(user_db_dir / "milvus_lite.db") - - -def initialize_milvus_lite(user_id: str) -> MilvusDocumentStore: - """ - Initializes Milvus Lite DocumentStore for a user using milvus-haystack. - Dimension is inferred by Milvus upon first write, not passed here. - """ - print(f"Initializing Milvus Lite store for user: {user_id}") - milvus_uri = get_user_milvus_path(user_id) - print(f"Milvus Lite URI: {milvus_uri}") - # Log the dimension expected based on config, even if not passed directly - print(f"Expecting Embedding Dimension (for first write): {OPENAI_EMBEDDING_DIM}") - - document_store = MilvusDocumentStore( - connection_args={"uri": milvus_uri}, - collection_name=user_id, # Default or customize - index_params=MILVUS_INDEX_PARAMS, # Pass index config - search_params=MILVUS_SEARCH_PARAMS, # Pass search config - drop_old=False, # Keep drop_old for testing convenience - ) - # Note: The actual schema dimension is set when the first document with an embedding is written. - print(f"Milvus Lite store instance created for user {user_id} at {milvus_uri}") - return document_store - - -# add_user_document_to_store and get_user_documents can remain if needed for other purposes, -def add_user_document_to_store( - document_store: MilvusDocumentStore, user_id: str, text: str -): - doc = Document(content=text, meta={"user_id": user_id}) - print(f"Adding document for user {user_id}: '{text[:50]}...'") - document_store.write_documents([doc]) - - -# get_user_documents function remains the same -def get_user_documents( - document_store: MilvusDocumentStore, user_id: str -) -> List[Document]: - print(f"Retrieving all documents for user {user_id}...") - all_docs = document_store.get_all_documents() - print(f"Found {len(all_docs)} documents for user {user_id}.") - return all_docs - - -# Optional: Test code similar to before, but now using the OpenAI dimension -if __name__ == "__main__": - test_user = "test_user_openai_data" - store = initialize_milvus_lite(test_user) - # Add dummy docs (won't be embedded here, just stored) - add_user_document_to_store(store, test_user, "第一个文档,关于 OpenAI。") - add_user_document_to_store(store, test_user, "第二个文档,使用 API。") - docs = get_user_documents(store, test_user) - for d in docs: - print(f" - {d.content} (Meta: {d.meta})") - # Cleanup code similar to before diff --git a/haystack_rag/embedding.py b/haystack_rag/embedding.py deleted file mode 100644 index 1fe008a..0000000 --- a/haystack_rag/embedding.py +++ /dev/null @@ -1,63 +0,0 @@ -# embedding.py -from haystack.components.embedders import OpenAITextEmbedder - -from haystack.utils import Secret - -# 从 config 导入新的变量名 -from config import ( - OPENAI_EMBEDDING_MODEL, - OPENAI_API_KEY_FROM_CONFIG, # 使用配置中的 Key - OPENAI_API_BASE_URL_CONFIG, # 使用配置中的 Base URL - OPENAI_EMBEDDING_KEY, - OPENAI_EMBEDDING_BASE, - HUGGINGFACE_KEY, - HUGGINGFACE_EMBEDDING_MODEL, - OPENAI_EMBEDDING_DIM -) - - -def initialize_text_embedder() -> OpenAITextEmbedder: - """ - Initializes the Haystack OpenAITextEmbedder component. - Reads API Key and Base URL directly from config.py. - """ - - # 检查从配置加载的 key 是否有效 (基础检查) - if not OPENAI_API_KEY_FROM_CONFIG or "YOUR_API_KEY" in OPENAI_API_KEY_FROM_CONFIG: - print("警告: OpenAI API Key 未在 config.py 中有效配置。") - # Consider raising an error here if the key is mandatory - # raise ValueError("OpenAI API Key not configured correctly in config.py") - - print(f"Initializing OpenAI Text Embedder with model: {OPENAI_EMBEDDING_MODEL}") - - # 使用配置中的 Base URL - if OPENAI_API_BASE_URL_CONFIG: - print(f"Using custom API base URL from config: {OPENAI_API_BASE_URL_CONFIG}") - else: - print("Using default OpenAI API base URL (None specified in config).") - - text_embedder = OpenAITextEmbedder( - # 直接使用从 config.py 导入的 key 和 base_url - api_key=Secret.from_token(OPENAI_EMBEDDING_KEY), - api_base_url=OPENAI_EMBEDDING_BASE, - model=OPENAI_EMBEDDING_MODEL, - dimensions=OPENAI_EMBEDDING_DIM, - ) - print("Text Embedder initialized.") - return text_embedder - - -# __main__ 部分也需要调整以反映不依赖环境变量 -# Example usage -if __name__ == "__main__": - embedder = initialize_text_embedder() - sample_text = "这是一个示例文本,用于测试嵌入功能。" - try: - result = embedder.run(text=sample_text) - embedding = result["embedding"] - print(f"Sample text: '{sample_text}'") - # print(f"Generated embedding (first 5 dims): {embedding[:5]}") - print(f"Generated embedding dimension: {len(embedding)}") - print(f"Tokens used: {result['meta']['usage']['total_tokens']}") - except Exception as e: - print(f"Error during huggingface API call: {e}") diff --git a/haystack_rag/llm_integration.py b/haystack_rag/llm_integration.py deleted file mode 100644 index 6062ffd..0000000 --- a/haystack_rag/llm_integration.py +++ /dev/null @@ -1,73 +0,0 @@ -# llm_integration.py -from haystack.components.generators.openai import OpenAIGenerator -from haystack.components.builders import PromptBuilder -from haystack.utils import Secret - -# 从 config 导入新的变量名 -from config import ( - OPENAI_LLM_MODEL, - DEFAULT_PROMPT_TEMPLATE, - OPENAI_API_KEY_FROM_CONFIG, # 使用配置中的 Key - OPENAI_API_BASE_URL_CONFIG, # 使用配置中的 Base URL -) - - -def initialize_llm_and_prompt_builder() -> tuple[OpenAIGenerator, PromptBuilder]: - """ - Initializes the OpenAI Generator and PromptBuilder components. - Reads API Key and Base URL directly from config.py. - """ - - if not OPENAI_API_KEY_FROM_CONFIG or "YOUR_API_KEY" in OPENAI_API_KEY_FROM_CONFIG: - print("警告: OpenAI API Key 未在 config.py 中有效配置。") - # Consider raising an error - # raise ValueError("OpenAI API Key not configured correctly in config.py") - - print(f"Initializing OpenAI Generator with model: {OPENAI_LLM_MODEL}") - - if OPENAI_API_BASE_URL_CONFIG: - print(f"Using custom API base URL from config: {OPENAI_API_BASE_URL_CONFIG}") - else: - print("Using default OpenAI API base URL (None specified in config).") - - llm_generator = OpenAIGenerator( - # 直接使用从 config.py 导入的 key 和 base_url - api_key=Secret.from_token(OPENAI_API_KEY_FROM_CONFIG), - model=OPENAI_LLM_MODEL, - api_base_url=OPENAI_API_BASE_URL_CONFIG, - ) - print("OpenAI Generator initialized.") - - print("Initializing Prompt Builder...") - prompt_builder = PromptBuilder(template=DEFAULT_PROMPT_TEMPLATE) - print("Prompt Builder initialized.") - - return llm_generator, prompt_builder - - -# __main__ 部分也需要调整 - -# Example Usage -if __name__ == "__main__": - from haystack import Document - - llm, builder = initialize_llm_and_prompt_builder() - - sample_question = "Haystack 是什么?" - sample_docs = [ - Document(content="Haystack 是一个用于构建 NLP 应用程序的开源框架。"), - Document(content="你可以使用 Haystack 连接不同的组件。"), - ] - - prompt_builder_output = builder.run(question=sample_question, documents=sample_docs) - prompt = prompt_builder_output["prompt"] - print("\n--- Generated Prompt ---") - print(prompt) - - print("\n--- Running OpenAI LLM ---") - try: - # Note: OpenAIGenerator expects 'prompt' as input key by default - llm_output = llm.run(prompt=prompt) - print("LLM Output:", llm_output) - except Exception as e: - print(f"Error during OpenAI API call: {e}") diff --git a/haystack_rag/main.py b/haystack_rag/main.py deleted file mode 100644 index d3e2931..0000000 --- a/haystack_rag/main.py +++ /dev/null @@ -1,151 +0,0 @@ -# main.py -import sys -import logging -from haystack import Document - -# 需要 OpenAIDocumentEmbedder 来嵌入要写入的文档 -from haystack.components.embedders import OpenAIDocumentEmbedder -from haystack.utils import Secret - -# 设置logger -logger = logging.getLogger(__name__) - -# 导入所需的配置和构建函数 -from config import ( - DEFAULT_USER_ID, - OPENAI_API_KEY_FROM_CONFIG, - OPENAI_API_BASE_URL_CONFIG, - OPENAI_EMBEDDING_MODEL, - OPENAI_EMBEDDING_KEY, - OPENAI_EMBEDDING_BASE, -) -from .rag_pipeline import build_rag_pipeline # 构建 RAG 查询管道 - - -# 辅助函数:初始化 Document Embedder (与 embedding.py 中的类似) -def initialize_document_embedder() -> OpenAIDocumentEmbedder: - """初始化用于嵌入文档的 OpenAIDocumentEmbedder。""" - if not OPENAI_API_KEY_FROM_CONFIG or "YOUR_API_KEY" in OPENAI_API_KEY_FROM_CONFIG: - print("警告: OpenAI API Key 未在 config.py 中有效配置。") - # raise ValueError("OpenAI API Key not configured correctly in config.py") - - print(f"Initializing OpenAI Document Embedder with model: {OPENAI_EMBEDDING_MODEL}") - if OPENAI_API_BASE_URL_CONFIG: - print(f"Using custom API base URL from config: {OPENAI_API_BASE_URL_CONFIG}") - else: - print("Using default OpenAI API base URL (None specified in config).") - - document_embedder = OpenAIDocumentEmbedder( - api_key=Secret.from_token(OPENAI_EMBEDDING_KEY), - model=OPENAI_EMBEDDING_MODEL, - api_base_url=OPENAI_EMBEDDING_BASE, - # meta_fields_to_embed=["name"] # 如果需要嵌入元数据字段 - # embedding_batch_size=10 # 可以调整批处理大小 - ) - print("OpenAI Document Embedder initialized.") - return document_embedder - - -def run_chat_session(user_id: str): - """ - 运行 RAG 聊天会话主循环。 - 每次用户输入时,先将其嵌入并添加到 Milvus,然后运行 RAG 管道生成回复。 - """ - print(f"--- Starting Chat Session for User: {user_id} ---") - - # 构建 RAG 查询管道和获取 DocumentStore 实例 - rag_query_pipeline, document_store = build_rag_pipeline(user_id) - - # 初始化用于写入用户输入的 Document Embedder - document_embedder = initialize_document_embedder() - - print("\nChatbot is ready! Type your questions or 'exit' to quit.") - # 打印使用的模型信息 - try: - pass - # print(f"Using LLM: {rag_query_pipeline.get_component('generator').model}") - # 注意 RAG pipeline 中 query embedder 的名字是 'text_embedder' - # print(f"Using Query Embedder: {rag_query_pipeline.get_component('text_embedder').model}") - # print(f"Using Document Embedder (for writing): {document_embedder.model}") - except Exception as e: - print(f"Warning: Could not get component model names - {e}") - - while True: - try: - query = input(f"[{user_id}] You: ") - if query.lower() == "exit": - print("Exiting chat session. Goodbye!") - break - if not query.strip(): - continue - - # --- 步骤 1: 嵌入用户输入并写入 Milvus --- - # print(f"[Workflow] Embedding user input as a document...") - # 将用户输入包装成 Haystack Document - user_doc_to_write = Document(content=query, meta={"user_id": user_id}) - - # 使用 OpenAIDocumentEmbedder 运行嵌入 - # 它需要一个列表作为输入,即使只有一个文档 - embedding_result = document_embedder.run([user_doc_to_write]) - embedded_docs = embedding_result.get( - "documents", [] - ) # 获取带有嵌入的文档列表 - - if embedded_docs: - # print(f"[Workflow] Writing embedded document to Milvus for user {user_id}...") - # 将带有嵌入的文档写入 DocumentStore - document_store.write_documents(embedded_docs) - # print("[Workflow] Document written to Milvus.") - else: - print("[Workflow] Warning: Failed to embed document, skipping write.") - # 可以在这里添加错误处理或日志记录 - - # --- 步骤 2: 使用 RAG 查询管道生成回复 --- - # print("[Workflow] Running RAG query pipeline...") - # 准备 RAG 管道的输入 - # text_embedder 需要原始查询文本 - # prompt_builder 也需要原始查询文本(在模板中用作 {{query}}) - pipeline_input = { - "text_embedder": {"text": query}, - "prompt_builder": {"query": query}, - } - - # 运行 RAG 查询管道 - results = rag_query_pipeline.run(pipeline_input) - - # --- 步骤 3: 处理并打印结果 --- - # 根据文档示例,生成器的输出在 'generator' 组件的 'replies' 键中 - if "llm" in results and results["llm"]["replies"]: - answer = results["llm"]["replies"][0] - # 尝试获取 token 使用量(可能在 meta 中) - total_tokens = "N/A" - try: - # meta 结构可能因版本或配置而异,需要检查确认 - if ( - "meta" in results["llm"] - and isinstance(results["llm"]["meta"], list) - and results["llm"]["meta"] - ): - usage_info = results["llm"]["meta"][0].get("usage", {}) - total_tokens = usage_info.get("total_tokens", "N/A") - except Exception: - pass # 忽略获取 token 的错误 - - print(f"Chatbot: {answer} (Tokens: {total_tokens})") - else: - print("Chatbot: Sorry, I couldn't generate an answer for that.") - logger.debug(f"Pipeline Results: {results}") # 记录调试信息 - - except KeyboardInterrupt: - print("\nExiting chat session. Goodbye!") - break - except Exception as e: - print(f"\nAn error occurred: {e}") - import traceback - - traceback.print_exc() # 打印详细的回溯信息 - - -if __name__ == "__main__": - current_user_id = DEFAULT_USER_ID - run_chat_session(current_user_id) diff --git a/haystack_rag/rag_pipeline.py b/haystack_rag/rag_pipeline.py deleted file mode 100644 index 6aed6b5..0000000 --- a/haystack_rag/rag_pipeline.py +++ /dev/null @@ -1,195 +0,0 @@ -# rag_pipeline.py -from haystack import Pipeline -from haystack import Document # 导入 Document - -from milvus_haystack import MilvusDocumentStore -from .data_handling import initialize_milvus_lite -from .embedding import initialize_text_embedder -from .retrieval import initialize_vector_retriever -from .llm_integration import initialize_llm_and_prompt_builder -from haystack.utils import Secret - - -def build_rag_pipeline(user_id: str) -> tuple[Pipeline, MilvusDocumentStore]: - """ - 为指定用户构建并返回 RAG 查询管道和对应的 DocumentStore。 - """ - print(f"\n--- Building RAG Pipeline for User: {user_id} ---") - - # 1. 初始化该用户的 DocumentStore - document_store = initialize_milvus_lite(user_id) - - # 2. 初始化共享组件(可以在应用启动时初始化一次,这里为简单起见每次都创建) - text_embedder = initialize_text_embedder() - vector_retriever = initialize_vector_retriever(document_store) - llm, prompt_builder = initialize_llm_and_prompt_builder() - - # 3. 创建 Haystack Pipeline - rag_pipeline = Pipeline() - - # 4. 向管道添加组件,并指定名称 - rag_pipeline.add_component(instance=text_embedder, name="text_embedder") - rag_pipeline.add_component(instance=vector_retriever, name="retriever") - rag_pipeline.add_component(instance=prompt_builder, name="prompt_builder") - rag_pipeline.add_component(instance=llm, name="llm") - - # 5. 连接管道组件 - # - 将用户问题文本输入到 text_embedder - # - 将 text_embedder 输出的嵌入向量连接到 retriever 的查询嵌入输入 - # - 将 retriever 输出的文档连接到 prompt_builder 的文档输入 - # - 将用户问题文本也连接到 prompt_builder 的问题输入 - # - 将 prompt_builder 输出的完整提示连接到 llm 的提示输入 - - rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding") - rag_pipeline.connect("retriever.documents", "prompt_builder.documents") - rag_pipeline.connect("prompt_builder.prompt", "llm.prompt") - - print("--- RAG Pipeline Built Successfully ---") - # 返回管道和文档存储实例,因为主程序需要用文档存储来写入数据 - return rag_pipeline, document_store - - -# --- Corrected Test Block --- -if __name__ == "__main__": - import os # Needed for API Key check - - # We need OpenAIDocumentEmbedder to index test documents - from haystack.components.embedders import OpenAIDocumentEmbedder - - # Import necessary config for initializing the Document Embedder - from config import ( - OPENAI_API_KEY_FROM_CONFIG, - OPENAI_API_BASE_URL_CONFIG, - OPENAI_EMBEDDING_MODEL, - ) - - # --- Configuration --- - test_user = "test_user" - test_query = "Haystack是什么?" - # Sample documents to index for testing - docs_to_index = [ - Document( - content="Haystack是一个用于构建 NLP 应用程序(如问答系统、语义搜索)的开源框架。", - meta={"user_id": test_user, "source": "test_doc_1"}, - ), - Document( - content="你可以使用 Haystack 连接不同的组件,如文档存储、检索器和生成器。", - meta={"user_id": test_user, "source": "test_doc_2"}, - ), - Document( - content="Milvus 是一个流行的向量数据库,常用于 RAG 系统中存储嵌入。", - meta={"user_id": test_user, "source": "test_doc_3"}, - ), - ] - - print(f"--- Running Test for RAG Pipeline (User: {test_user}) ---") - - # --- 1. Check API Key Availability --- - # Pipeline execution requires OpenAI API calls - api_key_configured = ( - OPENAI_API_KEY_FROM_CONFIG and "YOUR_API_KEY" not in OPENAI_API_KEY_FROM_CONFIG - ) - if not api_key_configured: - print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - print("! WARNING: OpenAI API Key not configured in config.py. !") - print("! Skipping RAG pipeline test execution. !") - print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - exit() # Exit script if key is missing for test run - else: - print("\n[Test Setup] OpenAI API Key found in config.") - - # --- 2. Build the RAG Pipeline and get the Document Store --- - # This function initializes the store (potentially dropping old data) - # and builds the *querying* pipeline. - try: - pipeline, store = build_rag_pipeline(test_user) - except Exception as e: - print(f"\nError building RAG pipeline: {e}") - import traceback - - traceback.print_exc() - exit() - - # --- 3. Index Test Documents (with embeddings) --- - print("\n[Test Setup] Initializing Document Embedder for indexing test data...") - try: - # Initialize the Document Embedder directly here for the test - document_embedder = OpenAIDocumentEmbedder( - api_key=Secret.from_token(OPENAI_API_KEY_FROM_CONFIG), - model=OPENAI_EMBEDDING_MODEL, - api_base_url=OPENAI_API_BASE_URL_CONFIG, - ) - print("[Test Setup] Document Embedder initialized.") - - print("[Test Setup] Embedding test documents...") - embedding_result = document_embedder.run(docs_to_index) - embedded_docs = embedding_result.get("documents", []) - - if embedded_docs: - print( - f"[Test Setup] Writing {len(embedded_docs)} embedded documents to Milvus..." - ) - store.write_documents(embedded_docs) - print("[Test Setup] Test documents written successfully.") - # Optional: Verify count - # print(f"[Test Setup] Document count in store: {store.count_documents()}") - documents_indexed = True - else: - print("[Test Setup] ERROR: Failed to embed test documents.") - documents_indexed = False - - except Exception as e: - print(f"\nError during test data indexing: {e}") - import traceback - - traceback.print_exc() - documents_indexed = False - - # --- 4. Run the RAG Pipeline (if setup succeeded) --- - if documents_indexed: - print(f"\n[Test Run] Running RAG pipeline for query: '{test_query}'") - - # Prepare input for the RAG pipeline instance built by build_rag_pipeline - pipeline_input = { - "text_embedder": {"text": test_query}, # Input for the query embedder - "prompt_builder": { - "query": test_query - }, # Input for the prompt builder template - } - - try: - results = pipeline.run(pipeline_input) - - print("\n[Test Run] Pipeline Results:") - # Process and print the generator's answer - if "llm" in results and results["llm"]["replies"]: - answer = results["llm"]["replies"][0] - print(f"\nGenerated Answer: {answer}") - else: - print("\n[Test Run] Could not extract answer from generator.") - print( - "Full Pipeline Output:", results - ) # Print full output for debugging - - except Exception as e: - print(f"\n[Test Run] Error running RAG pipeline: {e}") - import traceback - - traceback.print_exc() - else: - print("\n[Test Run] Skipping RAG pipeline execution due to indexing failure.") - - # --- 5. Cleanup Note --- - # Optional: Add instructions or commented-out code for cleaning up the test Milvus data - print( - f"\n[Test Cleanup] Test finished. Consider manually removing data in: ./milvus_user_data_openai_fixed/{test_user}" - ) - # import shutil - # from pathlib import Path - # from config import MILVUS_PERSIST_BASE_DIR - # test_db_path = MILVUS_PERSIST_BASE_DIR / test_user - # if test_db_path.exists(): - # print(f"\nAttempting to clean up test data at {test_db_path}...") - # # shutil.rmtree(test_db_path) # Use with caution - - print("\n--- RAG Pipeline Test Complete ---") diff --git a/haystack_rag/retrieval.py b/haystack_rag/retrieval.py deleted file mode 100644 index 569a3b2..0000000 --- a/haystack_rag/retrieval.py +++ /dev/null @@ -1,30 +0,0 @@ -# retrieval.py -# --- 确认 Import 路径已更新 --- -from milvus_haystack import MilvusDocumentStore # 用于类型提示 -from milvus_haystack.milvus_embedding_retriever import ( - MilvusEmbeddingRetriever, -) # 使用正确的 integration import - -# 从配置导入 top_k -from config import RETRIEVER_TOP_K - - -def initialize_vector_retriever( - document_store: MilvusDocumentStore, -) -> MilvusEmbeddingRetriever: - """ - Initializes the MilvusEmbeddingRetriever using milvus-haystack package. - Requires a correctly initialized MilvusDocumentStore instance. - """ - print(f"Initializing Milvus Embedding Retriever with top_k={RETRIEVER_TOP_K}") - - # 初始化 MilvusEmbeddingRetriever 实例 - # 它需要 document_store 实例来进行实际的搜索操作 - # top_k 参数控制返回文档的数量 - retriever = MilvusEmbeddingRetriever( - document_store=document_store, - top_k=RETRIEVER_TOP_K, - # 其他可选参数可以根据需要添加,例如 filters_policy - ) - print("Milvus Embedding Retriever initialized.") - return retriever