# data_handling.py import os from pathlib import Path from typing import List, Optional import logging # Added logging from haystack import Document from milvus_haystack import MilvusDocumentStore # Import config variables needed from config import ( OPENAI_EMBEDDING_DIM, # Keep for logging/validation if desired, but not passed to init USER_ID_PREFIX, MILVUS_PERSIST_BASE_DIR, MILVUS_INDEX_PARAMS, MILVUS_SEARCH_PARAMS, MILVUS_STAND_URI, ) logger = logging.getLogger(__name__) # Use logger # get_user_milvus_path function remains the same def get_user_milvus_path(user_id: str, base_dir: Path = MILVUS_PERSIST_BASE_DIR) -> str: """ 获取指定用户的 Milvus Lite 数据库文件路径。 该函数会执行以下操作: 1. 基于- `base_dir` 和 `user_id` 构建一个用户专属的目录路径。 2. 确保该目录存在,如果不存在则会创建它。 3. 将目录路径与固定的数据库文件名 "milvus_lite.db" 组合。 4. 返回最终的完整文件路径(字符串格式)。 Args: user_id (str): 用户的唯一标识符。 base_dir (Path, optional): Milvus 数据持久化的根目录. 默认为 MILVUS_PERSIST_BASE_DIR. Returns: str: 指向用户 Milvus 数据库文件的完整路径字符串。 """ user_db_dir = base_dir / user_id user_db_dir.mkdir(parents=True, exist_ok=True) return str(user_db_dir / "milvus_lite.db") def initialize_milvus_lite(user_id: str) -> MilvusDocumentStore: """ Initializes Milvus Lite DocumentStore for a user using milvus-haystack. Dimension is inferred by Milvus upon first write, not passed here. """ print(f"Initializing Milvus Lite store for user: {user_id}") milvus_uri = get_user_milvus_path(user_id) print(f"Milvus Lite URI: {milvus_uri}") # Log the dimension expected based on config, even if not passed directly print(f"Expecting Embedding Dimension (for first write): {OPENAI_EMBEDDING_DIM}") document_store = MilvusDocumentStore( connection_args={"uri": milvus_uri}, collection_name=user_id, # Default or customize index_params=MILVUS_INDEX_PARAMS, # Pass index config search_params=MILVUS_SEARCH_PARAMS, # Pass search config drop_old=False, # Keep drop_old for testing convenience ) # Note: The actual schema dimension is set when the first document with an embedding is written. print(f"Milvus Lite store instance created for user {user_id} at {milvus_uri}") return document_store # add_user_document_to_store and get_user_documents can remain if needed for other purposes, def add_user_document_to_store( document_store: MilvusDocumentStore, user_id: str, text: str ): doc = Document(content=text, meta={"user_id": user_id}) print(f"Adding document for user {user_id}: '{text[:50]}...'") document_store.write_documents([doc]) # get_user_documents function remains the same def get_user_documents( document_store: MilvusDocumentStore, user_id: str ) -> List[Document]: print(f"Retrieving all documents for user {user_id}...") all_docs = document_store.get_all_documents() print(f"Found {len(all_docs)} documents for user {user_id}.") return all_docs # Optional: Test code similar to before, but now using the OpenAI dimension if __name__ == "__main__": test_user = "test_user_openai_data" store = initialize_milvus_lite(test_user) # Add dummy docs (won't be embedded here, just stored) add_user_document_to_store(store, test_user, "第一个文档,关于 OpenAI。") add_user_document_to_store(store, test_user, "第二个文档,使用 API。") docs = get_user_documents(store, test_user) for d in docs: print(f" - {d.content} (Meta: {d.meta})") # Cleanup code similar to before