diff --git a/lightrag/base.py b/lightrag/base.py index 28c564459c..0fee411a71 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -248,6 +248,10 @@ def _generate_collection_suffix(self) -> str | None: # Check if model_name exists (model_name is optional in EmbeddingFunc) model_name = getattr(self.embedding_func, "model_name", None) + if not isinstance(model_name, str): + return None + + model_name = model_name.strip() if not model_name: return None diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index 92135ef4a5..c6688dbad8 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -1174,6 +1174,65 @@ def _repair_missing_vector_index(self): ) self._create_indexes_after_collection() + def _activate_existing_collection(self, collection_name: str) -> None: + """Validate and load an existing collection, keeping it active on success.""" + original_final_namespace = self.final_namespace + + try: + self.final_namespace = collection_name + self._client.describe_collection(self.final_namespace) + self._validate_collection_compatibility() + + try: + self._ensure_collection_loaded() + return + except Exception as load_error: + if not self._is_missing_vector_index_error(load_error): + raise + + try: + self._repair_missing_vector_index() + self._ensure_collection_loaded() + logger.info( + f"[{self.workspace}] Repaired missing vector index for existing collection '{self.namespace}'" + ) + except Exception as repair_error: + raise RuntimeError( + f"Index repair failed for collection '{self.final_namespace}'. " + f"Original error: {repair_error}" + ) from repair_error + except Exception: + self.final_namespace = original_final_namespace + raise + + def _try_activate_legacy_collection(self) -> bool: + """Reuse a compatible legacy collection when model-isolated naming is enabled.""" + if self.final_namespace == self.legacy_namespace: + return False + + legacy_exists = self._client.has_collection(self.legacy_namespace) + if not legacy_exists: + return False + + logger.info( + f"[{self.workspace}] Found legacy Milvus collection '{self.legacy_namespace}' for namespace '{self.namespace}'" + ) + + try: + self._activate_existing_collection(self.legacy_namespace) + logger.info( + f"[{self.workspace}] Reusing compatible legacy collection '{self.legacy_namespace}' for namespace '{self.namespace}'" + ) + return True + except ValueError as legacy_error: + logger.warning( + f"[{self.workspace}] Legacy collection '{self.legacy_namespace}' is incompatible with the current embedding configuration: {legacy_error}" + ) + logger.warning( + f"[{self.workspace}] Creating isolated collection '{self.final_namespace}' instead." + ) + return False + def _ensure_collection_loaded(self): """Ensure the collection is loaded into memory for search operations""" try: @@ -1208,28 +1267,8 @@ def _create_collection_if_not_exist(self): if collection_exists: # Double-check by trying to describe the collection try: - self._client.describe_collection(self.final_namespace) - self._validate_collection_compatibility() - try: - # Ensure the collection is loaded after validation - self._ensure_collection_loaded() - return - except Exception as load_error: - if not self._is_missing_vector_index_error(load_error): - raise - - try: - self._repair_missing_vector_index() - self._ensure_collection_loaded() - logger.info( - f"[{self.workspace}] Repaired missing vector index for existing collection '{self.namespace}'" - ) - return - except Exception as repair_error: - raise RuntimeError( - f"Index repair failed for collection '{self.final_namespace}'. " - f"Original error: {repair_error}" - ) from repair_error + self._activate_existing_collection(self.final_namespace) + return except Exception as validation_error: # CRITICAL: Collection exists but validation failed # This indicates potential data migration failure or incompatible schema @@ -1267,8 +1306,13 @@ def _create_collection_if_not_exist(self): f"Original error: {validation_error}" ) + if self._try_activate_legacy_collection(): + return + # Collection doesn't exist, create new collection - logger.info(f"[{self.workspace}] Creating new collection: {self.namespace}") + logger.info( + f"[{self.workspace}] Creating new collection: {self.final_namespace}" + ) schema = self._create_schema_for_namespace() # Create collection with schema only first @@ -1389,18 +1433,22 @@ def __post_init__(self): f"Using passed workspace parameter: '{effective_workspace}'" ) - # Build final_namespace with workspace prefix for data isolation - # Keep original namespace unchanged for type detection logic - if effective_workspace: - self.final_namespace = f"{effective_workspace}_{self.namespace}" + # Keep the legacy namespace so compatible deployments can continue using + # their existing collection, while model-aware names isolate new embeddings. + self.workspace = effective_workspace or "" + self.model_suffix = self._generate_collection_suffix() + self.legacy_namespace = ( + f"{self.workspace}_{self.namespace}" if self.workspace else self.namespace + ) + + if self.model_suffix: + self.final_namespace = f"{self.legacy_namespace}_{self.model_suffix}" + logger.info(f"[{self.workspace}] Milvus collection: {self.final_namespace}") + else: + self.final_namespace = self.legacy_namespace logger.debug( - f"Final namespace with workspace prefix: '{self.final_namespace}'" + f"Final namespace without model suffix: '{self.final_namespace}'" ) - else: - # When workspace is empty, final_namespace equals original namespace - self.final_namespace = self.namespace - self.workspace = "" - logger.debug(f"Final namespace (no workspace): '{self.final_namespace}'") cosine_threshold = kwargs.get("cosine_better_than_threshold") if cosine_threshold is None: raise ValueError( diff --git a/tests/test_milvus_index_creation.py b/tests/test_milvus_index_creation.py index a091d34b05..925f16e2b9 100644 --- a/tests/test_milvus_index_creation.py +++ b/tests/test_milvus_index_creation.py @@ -379,6 +379,108 @@ def test_initialize_uses_existing_database_without_recreating_it(self): bootstrap_client.create_database.assert_not_called() bootstrap_client.use_database.assert_called_once_with("lightrag") + def test_model_suffix_is_used_for_milvus_collection_names(self): + """Milvus should isolate collections by embedding model when model_name is available.""" + mock_embedding_func = MagicMock() + mock_embedding_func.embedding_dim = 2560 + mock_embedding_func.model_name = "qwen3-embedding:4b" + + storage = MilvusVectorDBStorage( + namespace="entities", + workspace="", + global_config={ + "embedding_batch_num": 100, + "working_dir": "/tmp/lightrag", + "vector_db_storage_cls_kwargs": { + "cosine_better_than_threshold": 0.3, + }, + }, + embedding_func=mock_embedding_func, + meta_fields=set(), + ) + + assert storage.legacy_namespace == "entities" + assert storage.final_namespace == "entities_qwen3_embedding_4b_2560d" + + def test_compatible_legacy_collection_is_reused_when_suffix_available(self): + """Compatible legacy collections should still be reused after suffixing is introduced.""" + mock_embedding_func = MagicMock() + mock_embedding_func.embedding_dim = 2560 + mock_embedding_func.model_name = "qwen3-embedding:4b" + + storage = MilvusVectorDBStorage( + namespace="entities", + workspace="", + global_config={ + "embedding_batch_num": 100, + "working_dir": "/tmp/lightrag", + "vector_db_storage_cls_kwargs": { + "cosine_better_than_threshold": 0.3, + }, + }, + embedding_func=mock_embedding_func, + meta_fields=set(), + ) + storage._client = MagicMock() + storage._client.has_collection.side_effect = ( + lambda name: name == storage.legacy_namespace + ) + storage._client.describe_collection.return_value = {} + + with patch.object(storage, "_validate_collection_compatibility"): + with patch.object(storage, "_ensure_collection_loaded"): + storage._create_collection_if_not_exist() + + storage._client.create_collection.assert_not_called() + assert storage.final_namespace == storage.legacy_namespace + + def test_legacy_dimension_mismatch_creates_isolated_collection_when_suffix_available( + self, + ): + """A legacy dimension mismatch should create a new suffixed Milvus collection.""" + mock_embedding_func = MagicMock() + mock_embedding_func.embedding_dim = 2560 + mock_embedding_func.model_name = "qwen3-embedding:4b" + + storage = MilvusVectorDBStorage( + namespace="entities", + workspace="", + global_config={ + "embedding_batch_num": 100, + "working_dir": "/tmp/lightrag", + "vector_db_storage_cls_kwargs": { + "cosine_better_than_threshold": 0.3, + }, + }, + embedding_func=mock_embedding_func, + meta_fields=set(), + ) + storage._client = MagicMock() + storage._client.has_collection.side_effect = ( + lambda name: name == storage.legacy_namespace + ) + storage._client.describe_collection.return_value = {} + + with patch.object( + storage, + "_validate_collection_compatibility", + side_effect=ValueError( + "Vector dimension mismatch for collection 'entities': existing=4096, current=2560" + ), + ): + with patch.object( + storage, "_create_schema_for_namespace", return_value="schema" + ): + with patch.object(storage, "_create_indexes_after_collection"): + with patch.object(storage, "_ensure_collection_loaded"): + storage._create_collection_if_not_exist() + + storage._client.create_collection.assert_called_once_with( + collection_name="entities_qwen3_embedding_4b_2560d", + schema="schema", + ) + assert storage.final_namespace == "entities_qwen3_embedding_4b_2560d" + def test_existing_collection_missing_vector_index_is_repaired(self): """Existing collections missing vector indexes should be repaired automatically.""" mock_embedding_func = MagicMock()