chore(haystack-rag): remove entire RAG module
Some checks failed
Build and Push Docker / build-and-push (push) Failing after 2m44s

This commit is contained in:
gameloader
2025-10-18 10:36:25 +08:00
parent c0505479f0
commit ce1de82ee3
8 changed files with 0 additions and 836 deletions

View File

@ -1,5 +0,0 @@
# haystack_rag module
from .main import run_chat_session
from .rag_pipeline import build_rag_pipeline
__all__ = ["run_chat_session", "build_rag_pipeline"]

View File

@ -1,223 +0,0 @@
# app.py
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
import logging
from haystack import Document
# Import necessary components from the provided code
from .data_handling import initialize_milvus_lite
from .main import initialize_document_embedder
from .retrieval import initialize_vector_retriever
from .embedding import initialize_text_embedder
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(title="Document Embedding and Retrieval API")
# Define request and response models
class EmbedRequest(BaseModel):
user_id: str
content: str
meta: Optional[dict] = {}
class RetrieveRequest(BaseModel):
user_id: str
query: str
class DocumentResponse(BaseModel):
content: str
score: Optional[float] = None
meta: Optional[dict] = {}
class RetrieveResponse(BaseModel):
documents: List[DocumentResponse]
query: str
answer: Optional[str] = None
# Helper functions
def get_document_embedder():
return initialize_document_embedder()
def get_document_store(user_id: str):
return initialize_milvus_lite(user_id)
@app.post("/embed", response_model=dict)
async def embed_document(
request: EmbedRequest, embedder=Depends(get_document_embedder)
):
"""
Embed content and store it in a Milvus collection for the specified user.
"""
try:
# Initialize document store for the user
document_store = get_document_store(request.user_id)
# Create a document with user content
meta = request.meta.copy()
meta["user_id"] = request.user_id # Ensure user_id is in meta
user_doc = Document(content=request.content, meta=meta)
# Embed the document
logger.info(f"Embedding document for user {request.user_id}")
embedding_result = embedder.run([user_doc])
embedded_docs = embedding_result.get("documents", [])
if not embedded_docs:
raise HTTPException(status_code=500, detail="Failed to embed document")
# Write to document store
logger.info(f"Writing embedded document to Milvus for user {request.user_id}")
document_store.write_documents(embedded_docs)
return {
"status": "success",
"message": f"Document embedded and stored for user {request.user_id}",
}
except Exception as e:
logger.error(f"Error embedding document: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Error embedding document: {str(e)}"
)
@app.post("/retrieve", response_model=RetrieveResponse)
async def retrieve_documents(request: RetrieveRequest):
"""
Retrieve similar documents for a user based on a query without LLM generation.
Only retrieves documents using vector similarity.
"""
try:
# Get document store for the user
document_store = get_document_store(request.user_id)
# Initialize text embedder for query embedding
text_embedder = initialize_text_embedder()
# Initialize retriever
retriever = initialize_vector_retriever(document_store)
# Embed the query
logger.info(f"Embedding query for user {request.user_id}: '{request.query}'")
embedding_result = text_embedder.run(text=request.query)
query_embedding = embedding_result.get("embedding")
if not query_embedding:
raise HTTPException(status_code=500, detail="Failed to embed query")
# Retrieve similar documents
logger.info(f"Retrieving documents for query: '{request.query}'")
retriever_result = retriever.run(query_embedding=query_embedding)
retrieved_docs = retriever_result.get("documents", [])
# Convert to response format
documents = []
for doc in retrieved_docs:
documents.append(
DocumentResponse(
content=doc.content,
score=doc.score if hasattr(doc, "score") else None,
meta=doc.meta,
)
)
return RetrieveResponse(documents=documents, query=request.query, answer=None)
except Exception as e:
logger.error(f"Error retrieving documents: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Error retrieving documents: {str(e)}"
)
# Add these imports at the top if not already there
from typing import List, Optional, Literal
# Define message models for the new endpoint
class Message(BaseModel):
content: str
role: str
name: Optional[str] = None
class EmbedMessagesRequest(BaseModel):
user_id: str
messages: List[Message]
meta: Optional[dict] = {}
@app.post("/embed_messages", response_model=dict)
async def embed_messages(
request: EmbedMessagesRequest, embedder=Depends(get_document_embedder)
):
"""
Process a messages array, extract content from user messages,
concatenate them with newlines, then embed and store in Milvus.
"""
try:
# Initialize document store for the user
document_store = get_document_store(request.user_id)
# Filter messages to keep only those with role="user"
user_messages = [msg for msg in request.messages if msg.role == "user"]
# Extract content from each user message
user_contents = [msg.content for msg in user_messages]
# Join contents with newline character
concatenated_content = "\n".join(user_contents)
if not concatenated_content.strip():
return {
"status": "warning",
"message": "No user messages found or all user messages were empty",
}
# Create a document with concatenated content
meta = request.meta.copy()
meta["user_id"] = request.user_id # Ensure user_id is in meta
user_doc = Document(content=concatenated_content, meta=meta)
# Embed the document
logger.info(f"Embedding concatenated user messages for user {request.user_id}")
embedding_result = embedder.run([user_doc])
embedded_docs = embedding_result.get("documents", [])
if not embedded_docs:
raise HTTPException(status_code=500, detail="Failed to embed document")
# Write to document store
logger.info(f"Writing embedded document to Milvus for user {request.user_id}")
document_store.write_documents(embedded_docs)
return {
"status": "success",
"message": f"User messages embedded and stored for user {request.user_id}",
"processed_messages_count": len(user_messages),
"concatenated_length": len(concatenated_content),
}
except Exception as e:
logger.error(f"Error embedding messages: {str(e)}")
raise HTTPException(
status_code=500, detail=f"Error embedding messages: {str(e)}"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7999)

View File

@ -1,96 +0,0 @@
# data_handling.py
import os
from pathlib import Path
from typing import List, Optional
import logging # Added logging
from haystack import Document
from milvus_haystack import MilvusDocumentStore
# Import config variables needed
from config import (
OPENAI_EMBEDDING_DIM, # Keep for logging/validation if desired, but not passed to init
USER_ID_PREFIX,
MILVUS_PERSIST_BASE_DIR,
MILVUS_INDEX_PARAMS,
MILVUS_SEARCH_PARAMS,
MILVUS_STAND_URI,
)
logger = logging.getLogger(__name__) # Use logger
# get_user_milvus_path function remains the same
def get_user_milvus_path(user_id: str, base_dir: Path = MILVUS_PERSIST_BASE_DIR) -> str:
"""
获取指定用户的 Milvus Lite 数据库文件路径。
该函数会执行以下操作:
1. 基于- `base_dir` 和 `user_id` 构建一个用户专属的目录路径。
2. 确保该目录存在,如果不存在则会创建它。
3. 将目录路径与固定的数据库文件名 "milvus_lite.db" 组合。
4. 返回最终的完整文件路径(字符串格式)。
Args:
user_id (str): 用户的唯一标识符。
base_dir (Path, optional): Milvus 数据持久化的根目录.
默认为 MILVUS_PERSIST_BASE_DIR.
Returns:
str: 指向用户 Milvus 数据库文件的完整路径字符串。
"""
user_db_dir = base_dir / user_id
user_db_dir.mkdir(parents=True, exist_ok=True)
return str(user_db_dir / "milvus_lite.db")
def initialize_milvus_lite(user_id: str) -> MilvusDocumentStore:
"""
Initializes Milvus Lite DocumentStore for a user using milvus-haystack.
Dimension is inferred by Milvus upon first write, not passed here.
"""
print(f"Initializing Milvus Lite store for user: {user_id}")
milvus_uri = get_user_milvus_path(user_id)
print(f"Milvus Lite URI: {milvus_uri}")
# Log the dimension expected based on config, even if not passed directly
print(f"Expecting Embedding Dimension (for first write): {OPENAI_EMBEDDING_DIM}")
document_store = MilvusDocumentStore(
connection_args={"uri": milvus_uri},
collection_name=user_id, # Default or customize
index_params=MILVUS_INDEX_PARAMS, # Pass index config
search_params=MILVUS_SEARCH_PARAMS, # Pass search config
drop_old=False, # Keep drop_old for testing convenience
)
# Note: The actual schema dimension is set when the first document with an embedding is written.
print(f"Milvus Lite store instance created for user {user_id} at {milvus_uri}")
return document_store
# add_user_document_to_store and get_user_documents can remain if needed for other purposes,
def add_user_document_to_store(
document_store: MilvusDocumentStore, user_id: str, text: str
):
doc = Document(content=text, meta={"user_id": user_id})
print(f"Adding document for user {user_id}: '{text[:50]}...'")
document_store.write_documents([doc])
# get_user_documents function remains the same
def get_user_documents(
document_store: MilvusDocumentStore, user_id: str
) -> List[Document]:
print(f"Retrieving all documents for user {user_id}...")
all_docs = document_store.get_all_documents()
print(f"Found {len(all_docs)} documents for user {user_id}.")
return all_docs
# Optional: Test code similar to before, but now using the OpenAI dimension
if __name__ == "__main__":
test_user = "test_user_openai_data"
store = initialize_milvus_lite(test_user)
# Add dummy docs (won't be embedded here, just stored)
add_user_document_to_store(store, test_user, "第一个文档,关于 OpenAI。")
add_user_document_to_store(store, test_user, "第二个文档,使用 API。")
docs = get_user_documents(store, test_user)
for d in docs:
print(f" - {d.content} (Meta: {d.meta})")
# Cleanup code similar to before

View File

@ -1,63 +0,0 @@
# embedding.py
from haystack.components.embedders import OpenAITextEmbedder
from haystack.utils import Secret
# 从 config 导入新的变量名
from config import (
OPENAI_EMBEDDING_MODEL,
OPENAI_API_KEY_FROM_CONFIG, # 使用配置中的 Key
OPENAI_API_BASE_URL_CONFIG, # 使用配置中的 Base URL
OPENAI_EMBEDDING_KEY,
OPENAI_EMBEDDING_BASE,
HUGGINGFACE_KEY,
HUGGINGFACE_EMBEDDING_MODEL,
OPENAI_EMBEDDING_DIM
)
def initialize_text_embedder() -> OpenAITextEmbedder:
"""
Initializes the Haystack OpenAITextEmbedder component.
Reads API Key and Base URL directly from config.py.
"""
# 检查从配置加载的 key 是否有效 (基础检查)
if not OPENAI_API_KEY_FROM_CONFIG or "YOUR_API_KEY" in OPENAI_API_KEY_FROM_CONFIG:
print("警告: OpenAI API Key 未在 config.py 中有效配置。")
# Consider raising an error here if the key is mandatory
# raise ValueError("OpenAI API Key not configured correctly in config.py")
print(f"Initializing OpenAI Text Embedder with model: {OPENAI_EMBEDDING_MODEL}")
# 使用配置中的 Base URL
if OPENAI_API_BASE_URL_CONFIG:
print(f"Using custom API base URL from config: {OPENAI_API_BASE_URL_CONFIG}")
else:
print("Using default OpenAI API base URL (None specified in config).")
text_embedder = OpenAITextEmbedder(
# 直接使用从 config.py 导入的 key 和 base_url
api_key=Secret.from_token(OPENAI_EMBEDDING_KEY),
api_base_url=OPENAI_EMBEDDING_BASE,
model=OPENAI_EMBEDDING_MODEL,
dimensions=OPENAI_EMBEDDING_DIM,
)
print("Text Embedder initialized.")
return text_embedder
# __main__ 部分也需要调整以反映不依赖环境变量
# Example usage
if __name__ == "__main__":
embedder = initialize_text_embedder()
sample_text = "这是一个示例文本,用于测试嵌入功能。"
try:
result = embedder.run(text=sample_text)
embedding = result["embedding"]
print(f"Sample text: '{sample_text}'")
# print(f"Generated embedding (first 5 dims): {embedding[:5]}")
print(f"Generated embedding dimension: {len(embedding)}")
print(f"Tokens used: {result['meta']['usage']['total_tokens']}")
except Exception as e:
print(f"Error during huggingface API call: {e}")

View File

@ -1,73 +0,0 @@
# llm_integration.py
from haystack.components.generators.openai import OpenAIGenerator
from haystack.components.builders import PromptBuilder
from haystack.utils import Secret
# 从 config 导入新的变量名
from config import (
OPENAI_LLM_MODEL,
DEFAULT_PROMPT_TEMPLATE,
OPENAI_API_KEY_FROM_CONFIG, # 使用配置中的 Key
OPENAI_API_BASE_URL_CONFIG, # 使用配置中的 Base URL
)
def initialize_llm_and_prompt_builder() -> tuple[OpenAIGenerator, PromptBuilder]:
"""
Initializes the OpenAI Generator and PromptBuilder components.
Reads API Key and Base URL directly from config.py.
"""
if not OPENAI_API_KEY_FROM_CONFIG or "YOUR_API_KEY" in OPENAI_API_KEY_FROM_CONFIG:
print("警告: OpenAI API Key 未在 config.py 中有效配置。")
# Consider raising an error
# raise ValueError("OpenAI API Key not configured correctly in config.py")
print(f"Initializing OpenAI Generator with model: {OPENAI_LLM_MODEL}")
if OPENAI_API_BASE_URL_CONFIG:
print(f"Using custom API base URL from config: {OPENAI_API_BASE_URL_CONFIG}")
else:
print("Using default OpenAI API base URL (None specified in config).")
llm_generator = OpenAIGenerator(
# 直接使用从 config.py 导入的 key 和 base_url
api_key=Secret.from_token(OPENAI_API_KEY_FROM_CONFIG),
model=OPENAI_LLM_MODEL,
api_base_url=OPENAI_API_BASE_URL_CONFIG,
)
print("OpenAI Generator initialized.")
print("Initializing Prompt Builder...")
prompt_builder = PromptBuilder(template=DEFAULT_PROMPT_TEMPLATE)
print("Prompt Builder initialized.")
return llm_generator, prompt_builder
# __main__ 部分也需要调整
# Example Usage
if __name__ == "__main__":
from haystack import Document
llm, builder = initialize_llm_and_prompt_builder()
sample_question = "Haystack 是什么?"
sample_docs = [
Document(content="Haystack 是一个用于构建 NLP 应用程序的开源框架。"),
Document(content="你可以使用 Haystack 连接不同的组件。"),
]
prompt_builder_output = builder.run(question=sample_question, documents=sample_docs)
prompt = prompt_builder_output["prompt"]
print("\n--- Generated Prompt ---")
print(prompt)
print("\n--- Running OpenAI LLM ---")
try:
# Note: OpenAIGenerator expects 'prompt' as input key by default
llm_output = llm.run(prompt=prompt)
print("LLM Output:", llm_output)
except Exception as e:
print(f"Error during OpenAI API call: {e}")

View File

@ -1,151 +0,0 @@
# main.py
import sys
import logging
from haystack import Document
# 需要 OpenAIDocumentEmbedder 来嵌入要写入的文档
from haystack.components.embedders import OpenAIDocumentEmbedder
from haystack.utils import Secret
# 设置logger
logger = logging.getLogger(__name__)
# 导入所需的配置和构建函数
from config import (
DEFAULT_USER_ID,
OPENAI_API_KEY_FROM_CONFIG,
OPENAI_API_BASE_URL_CONFIG,
OPENAI_EMBEDDING_MODEL,
OPENAI_EMBEDDING_KEY,
OPENAI_EMBEDDING_BASE,
)
from .rag_pipeline import build_rag_pipeline # 构建 RAG 查询管道
# 辅助函数:初始化 Document Embedder (与 embedding.py 中的类似)
def initialize_document_embedder() -> OpenAIDocumentEmbedder:
"""初始化用于嵌入文档的 OpenAIDocumentEmbedder。"""
if not OPENAI_API_KEY_FROM_CONFIG or "YOUR_API_KEY" in OPENAI_API_KEY_FROM_CONFIG:
print("警告: OpenAI API Key 未在 config.py 中有效配置。")
# raise ValueError("OpenAI API Key not configured correctly in config.py")
print(f"Initializing OpenAI Document Embedder with model: {OPENAI_EMBEDDING_MODEL}")
if OPENAI_API_BASE_URL_CONFIG:
print(f"Using custom API base URL from config: {OPENAI_API_BASE_URL_CONFIG}")
else:
print("Using default OpenAI API base URL (None specified in config).")
document_embedder = OpenAIDocumentEmbedder(
api_key=Secret.from_token(OPENAI_EMBEDDING_KEY),
model=OPENAI_EMBEDDING_MODEL,
api_base_url=OPENAI_EMBEDDING_BASE,
# meta_fields_to_embed=["name"] # 如果需要嵌入元数据字段
# embedding_batch_size=10 # 可以调整批处理大小
)
print("OpenAI Document Embedder initialized.")
return document_embedder
def run_chat_session(user_id: str):
"""
运行 RAG 聊天会话主循环。
每次用户输入时,先将其嵌入并添加到 Milvus然后运行 RAG 管道生成回复。
"""
print(f"--- Starting Chat Session for User: {user_id} ---")
# 构建 RAG 查询管道和获取 DocumentStore 实例
rag_query_pipeline, document_store = build_rag_pipeline(user_id)
# 初始化用于写入用户输入的 Document Embedder
document_embedder = initialize_document_embedder()
print("\nChatbot is ready! Type your questions or 'exit' to quit.")
# 打印使用的模型信息
try:
pass
# print(f"Using LLM: {rag_query_pipeline.get_component('generator').model}")
# 注意 RAG pipeline 中 query embedder 的名字是 'text_embedder'
# print(f"Using Query Embedder: {rag_query_pipeline.get_component('text_embedder').model}")
# print(f"Using Document Embedder (for writing): {document_embedder.model}")
except Exception as e:
print(f"Warning: Could not get component model names - {e}")
while True:
try:
query = input(f"[{user_id}] You: ")
if query.lower() == "exit":
print("Exiting chat session. Goodbye!")
break
if not query.strip():
continue
# --- 步骤 1: 嵌入用户输入并写入 Milvus ---
# print(f"[Workflow] Embedding user input as a document...")
# 将用户输入包装成 Haystack Document
user_doc_to_write = Document(content=query, meta={"user_id": user_id})
# 使用 OpenAIDocumentEmbedder 运行嵌入
# 它需要一个列表作为输入,即使只有一个文档
embedding_result = document_embedder.run([user_doc_to_write])
embedded_docs = embedding_result.get(
"documents", []
) # 获取带有嵌入的文档列表
if embedded_docs:
# print(f"[Workflow] Writing embedded document to Milvus for user {user_id}...")
# 将带有嵌入的文档写入 DocumentStore
document_store.write_documents(embedded_docs)
# print("[Workflow] Document written to Milvus.")
else:
print("[Workflow] Warning: Failed to embed document, skipping write.")
# 可以在这里添加错误处理或日志记录
# --- 步骤 2: 使用 RAG 查询管道生成回复 ---
# print("[Workflow] Running RAG query pipeline...")
# 准备 RAG 管道的输入
# text_embedder 需要原始查询文本
# prompt_builder 也需要原始查询文本(在模板中用作 {{query}}
pipeline_input = {
"text_embedder": {"text": query},
"prompt_builder": {"query": query},
}
# 运行 RAG 查询管道
results = rag_query_pipeline.run(pipeline_input)
# --- 步骤 3: 处理并打印结果 ---
# 根据文档示例,生成器的输出在 'generator' 组件的 'replies' 键中
if "llm" in results and results["llm"]["replies"]:
answer = results["llm"]["replies"][0]
# 尝试获取 token 使用量(可能在 meta 中)
total_tokens = "N/A"
try:
# meta 结构可能因版本或配置而异,需要检查确认
if (
"meta" in results["llm"]
and isinstance(results["llm"]["meta"], list)
and results["llm"]["meta"]
):
usage_info = results["llm"]["meta"][0].get("usage", {})
total_tokens = usage_info.get("total_tokens", "N/A")
except Exception:
pass # 忽略获取 token 的错误
print(f"Chatbot: {answer} (Tokens: {total_tokens})")
else:
print("Chatbot: Sorry, I couldn't generate an answer for that.")
logger.debug(f"Pipeline Results: {results}") # 记录调试信息
except KeyboardInterrupt:
print("\nExiting chat session. Goodbye!")
break
except Exception as e:
print(f"\nAn error occurred: {e}")
import traceback
traceback.print_exc() # 打印详细的回溯信息
if __name__ == "__main__":
current_user_id = DEFAULT_USER_ID
run_chat_session(current_user_id)

View File

@ -1,195 +0,0 @@
# rag_pipeline.py
from haystack import Pipeline
from haystack import Document # 导入 Document
from milvus_haystack import MilvusDocumentStore
from .data_handling import initialize_milvus_lite
from .embedding import initialize_text_embedder
from .retrieval import initialize_vector_retriever
from .llm_integration import initialize_llm_and_prompt_builder
from haystack.utils import Secret
def build_rag_pipeline(user_id: str) -> tuple[Pipeline, MilvusDocumentStore]:
"""
为指定用户构建并返回 RAG 查询管道和对应的 DocumentStore。
"""
print(f"\n--- Building RAG Pipeline for User: {user_id} ---")
# 1. 初始化该用户的 DocumentStore
document_store = initialize_milvus_lite(user_id)
# 2. 初始化共享组件(可以在应用启动时初始化一次,这里为简单起见每次都创建)
text_embedder = initialize_text_embedder()
vector_retriever = initialize_vector_retriever(document_store)
llm, prompt_builder = initialize_llm_and_prompt_builder()
# 3. 创建 Haystack Pipeline
rag_pipeline = Pipeline()
# 4. 向管道添加组件,并指定名称
rag_pipeline.add_component(instance=text_embedder, name="text_embedder")
rag_pipeline.add_component(instance=vector_retriever, name="retriever")
rag_pipeline.add_component(instance=prompt_builder, name="prompt_builder")
rag_pipeline.add_component(instance=llm, name="llm")
# 5. 连接管道组件
# - 将用户问题文本输入到 text_embedder
# - 将 text_embedder 输出的嵌入向量连接到 retriever 的查询嵌入输入
# - 将 retriever 输出的文档连接到 prompt_builder 的文档输入
# - 将用户问题文本也连接到 prompt_builder 的问题输入
# - 将 prompt_builder 输出的完整提示连接到 llm 的提示输入
rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder.prompt", "llm.prompt")
print("--- RAG Pipeline Built Successfully ---")
# 返回管道和文档存储实例,因为主程序需要用文档存储来写入数据
return rag_pipeline, document_store
# --- Corrected Test Block ---
if __name__ == "__main__":
import os # Needed for API Key check
# We need OpenAIDocumentEmbedder to index test documents
from haystack.components.embedders import OpenAIDocumentEmbedder
# Import necessary config for initializing the Document Embedder
from config import (
OPENAI_API_KEY_FROM_CONFIG,
OPENAI_API_BASE_URL_CONFIG,
OPENAI_EMBEDDING_MODEL,
)
# --- Configuration ---
test_user = "test_user"
test_query = "Haystack是什么"
# Sample documents to index for testing
docs_to_index = [
Document(
content="Haystack是一个用于构建 NLP 应用程序(如问答系统、语义搜索)的开源框架。",
meta={"user_id": test_user, "source": "test_doc_1"},
),
Document(
content="你可以使用 Haystack 连接不同的组件,如文档存储、检索器和生成器。",
meta={"user_id": test_user, "source": "test_doc_2"},
),
Document(
content="Milvus 是一个流行的向量数据库,常用于 RAG 系统中存储嵌入。",
meta={"user_id": test_user, "source": "test_doc_3"},
),
]
print(f"--- Running Test for RAG Pipeline (User: {test_user}) ---")
# --- 1. Check API Key Availability ---
# Pipeline execution requires OpenAI API calls
api_key_configured = (
OPENAI_API_KEY_FROM_CONFIG and "YOUR_API_KEY" not in OPENAI_API_KEY_FROM_CONFIG
)
if not api_key_configured:
print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print("! WARNING: OpenAI API Key not configured in config.py. !")
print("! Skipping RAG pipeline test execution. !")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
exit() # Exit script if key is missing for test run
else:
print("\n[Test Setup] OpenAI API Key found in config.")
# --- 2. Build the RAG Pipeline and get the Document Store ---
# This function initializes the store (potentially dropping old data)
# and builds the *querying* pipeline.
try:
pipeline, store = build_rag_pipeline(test_user)
except Exception as e:
print(f"\nError building RAG pipeline: {e}")
import traceback
traceback.print_exc()
exit()
# --- 3. Index Test Documents (with embeddings) ---
print("\n[Test Setup] Initializing Document Embedder for indexing test data...")
try:
# Initialize the Document Embedder directly here for the test
document_embedder = OpenAIDocumentEmbedder(
api_key=Secret.from_token(OPENAI_API_KEY_FROM_CONFIG),
model=OPENAI_EMBEDDING_MODEL,
api_base_url=OPENAI_API_BASE_URL_CONFIG,
)
print("[Test Setup] Document Embedder initialized.")
print("[Test Setup] Embedding test documents...")
embedding_result = document_embedder.run(docs_to_index)
embedded_docs = embedding_result.get("documents", [])
if embedded_docs:
print(
f"[Test Setup] Writing {len(embedded_docs)} embedded documents to Milvus..."
)
store.write_documents(embedded_docs)
print("[Test Setup] Test documents written successfully.")
# Optional: Verify count
# print(f"[Test Setup] Document count in store: {store.count_documents()}")
documents_indexed = True
else:
print("[Test Setup] ERROR: Failed to embed test documents.")
documents_indexed = False
except Exception as e:
print(f"\nError during test data indexing: {e}")
import traceback
traceback.print_exc()
documents_indexed = False
# --- 4. Run the RAG Pipeline (if setup succeeded) ---
if documents_indexed:
print(f"\n[Test Run] Running RAG pipeline for query: '{test_query}'")
# Prepare input for the RAG pipeline instance built by build_rag_pipeline
pipeline_input = {
"text_embedder": {"text": test_query}, # Input for the query embedder
"prompt_builder": {
"query": test_query
}, # Input for the prompt builder template
}
try:
results = pipeline.run(pipeline_input)
print("\n[Test Run] Pipeline Results:")
# Process and print the generator's answer
if "llm" in results and results["llm"]["replies"]:
answer = results["llm"]["replies"][0]
print(f"\nGenerated Answer: {answer}")
else:
print("\n[Test Run] Could not extract answer from generator.")
print(
"Full Pipeline Output:", results
) # Print full output for debugging
except Exception as e:
print(f"\n[Test Run] Error running RAG pipeline: {e}")
import traceback
traceback.print_exc()
else:
print("\n[Test Run] Skipping RAG pipeline execution due to indexing failure.")
# --- 5. Cleanup Note ---
# Optional: Add instructions or commented-out code for cleaning up the test Milvus data
print(
f"\n[Test Cleanup] Test finished. Consider manually removing data in: ./milvus_user_data_openai_fixed/{test_user}"
)
# import shutil
# from pathlib import Path
# from config import MILVUS_PERSIST_BASE_DIR
# test_db_path = MILVUS_PERSIST_BASE_DIR / test_user
# if test_db_path.exists():
# print(f"\nAttempting to clean up test data at {test_db_path}...")
# # shutil.rmtree(test_db_path) # Use with caution
print("\n--- RAG Pipeline Test Complete ---")

View File

@ -1,30 +0,0 @@
# retrieval.py
# --- 确认 Import 路径已更新 ---
from milvus_haystack import MilvusDocumentStore # 用于类型提示
from milvus_haystack.milvus_embedding_retriever import (
MilvusEmbeddingRetriever,
) # 使用正确的 integration import
# 从配置导入 top_k
from config import RETRIEVER_TOP_K
def initialize_vector_retriever(
document_store: MilvusDocumentStore,
) -> MilvusEmbeddingRetriever:
"""
Initializes the MilvusEmbeddingRetriever using milvus-haystack package.
Requires a correctly initialized MilvusDocumentStore instance.
"""
print(f"Initializing Milvus Embedding Retriever with top_k={RETRIEVER_TOP_K}")
# 初始化 MilvusEmbeddingRetriever 实例
# 它需要 document_store 实例来进行实际的搜索操作
# top_k 参数控制返回文档的数量
retriever = MilvusEmbeddingRetriever(
document_store=document_store,
top_k=RETRIEVER_TOP_K,
# 其他可选参数可以根据需要添加,例如 filters_policy
)
print("Milvus Embedding Retriever initialized.")
return retriever