Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lightrag/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ def _generate_collection_suffix(self) -> str | None:

# Check if model_name exists (model_name is optional in EmbeddingFunc)
model_name = getattr(self.embedding_func, "model_name", None)
if not isinstance(model_name, str):
return None

model_name = model_name.strip()
if not model_name:
return None

Expand Down
114 changes: 81 additions & 33 deletions lightrag/kg/milvus_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,65 @@ def _repair_missing_vector_index(self):
)
self._create_indexes_after_collection()

def _activate_existing_collection(self, collection_name: str) -> None:
"""Validate and load an existing collection, keeping it active on success."""
original_final_namespace = self.final_namespace

try:
self.final_namespace = collection_name
self._client.describe_collection(self.final_namespace)
self._validate_collection_compatibility()

try:
self._ensure_collection_loaded()
return
except Exception as load_error:
if not self._is_missing_vector_index_error(load_error):
raise

try:
self._repair_missing_vector_index()
self._ensure_collection_loaded()
logger.info(
f"[{self.workspace}] Repaired missing vector index for existing collection '{self.namespace}'"
)
except Exception as repair_error:
raise RuntimeError(
f"Index repair failed for collection '{self.final_namespace}'. "
f"Original error: {repair_error}"
) from repair_error
except Exception:
self.final_namespace = original_final_namespace
raise

def _try_activate_legacy_collection(self) -> bool:
"""Reuse a compatible legacy collection when model-isolated naming is enabled."""
if self.final_namespace == self.legacy_namespace:
return False

legacy_exists = self._client.has_collection(self.legacy_namespace)
if not legacy_exists:
return False

logger.info(
f"[{self.workspace}] Found legacy Milvus collection '{self.legacy_namespace}' for namespace '{self.namespace}'"
)

try:
self._activate_existing_collection(self.legacy_namespace)
logger.info(
f"[{self.workspace}] Reusing compatible legacy collection '{self.legacy_namespace}' for namespace '{self.namespace}'"
)
return True
Comment on lines +1222 to +1226
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Skip legacy reuse when embedding model identity is unknown

This branch reuses the unsuffixed legacy collection whenever _activate_existing_collection succeeds, but that compatibility path validates only schema and vector dimension, not the embedding model identity. In a migration from one model to another with the same dimension (for example, two different 1536d models), initialization will silently bind to the old legacy collection and mix incompatible vectors, which undermines retrieval correctness and defeats the model-isolation goal.

Useful? React with 👍 / 👎.

except ValueError as legacy_error:
logger.warning(
f"[{self.workspace}] Legacy collection '{self.legacy_namespace}' is incompatible with the current embedding configuration: {legacy_error}"
)
logger.warning(
f"[{self.workspace}] Creating isolated collection '{self.final_namespace}' instead."
)
return False

def _ensure_collection_loaded(self):
"""Ensure the collection is loaded into memory for search operations"""
try:
Expand Down Expand Up @@ -1208,28 +1267,8 @@ def _create_collection_if_not_exist(self):
if collection_exists:
# Double-check by trying to describe the collection
try:
self._client.describe_collection(self.final_namespace)
self._validate_collection_compatibility()
try:
# Ensure the collection is loaded after validation
self._ensure_collection_loaded()
return
except Exception as load_error:
if not self._is_missing_vector_index_error(load_error):
raise

try:
self._repair_missing_vector_index()
self._ensure_collection_loaded()
logger.info(
f"[{self.workspace}] Repaired missing vector index for existing collection '{self.namespace}'"
)
return
except Exception as repair_error:
raise RuntimeError(
f"Index repair failed for collection '{self.final_namespace}'. "
f"Original error: {repair_error}"
) from repair_error
self._activate_existing_collection(self.final_namespace)
return
except Exception as validation_error:
# CRITICAL: Collection exists but validation failed
# This indicates potential data migration failure or incompatible schema
Expand Down Expand Up @@ -1267,8 +1306,13 @@ def _create_collection_if_not_exist(self):
f"Original error: {validation_error}"
)

if self._try_activate_legacy_collection():
return

# Collection doesn't exist, create new collection
logger.info(f"[{self.workspace}] Creating new collection: {self.namespace}")
logger.info(
f"[{self.workspace}] Creating new collection: {self.final_namespace}"
)
schema = self._create_schema_for_namespace()

# Create collection with schema only first
Expand Down Expand Up @@ -1389,18 +1433,22 @@ def __post_init__(self):
f"Using passed workspace parameter: '{effective_workspace}'"
)

# Build final_namespace with workspace prefix for data isolation
# Keep original namespace unchanged for type detection logic
if effective_workspace:
self.final_namespace = f"{effective_workspace}_{self.namespace}"
# Keep the legacy namespace so compatible deployments can continue using
# their existing collection, while model-aware names isolate new embeddings.
self.workspace = effective_workspace or ""
self.model_suffix = self._generate_collection_suffix()
self.legacy_namespace = (
f"{self.workspace}_{self.namespace}" if self.workspace else self.namespace
)

if self.model_suffix:
self.final_namespace = f"{self.legacy_namespace}_{self.model_suffix}"
logger.info(f"[{self.workspace}] Milvus collection: {self.final_namespace}")
else:
self.final_namespace = self.legacy_namespace
logger.debug(
f"Final namespace with workspace prefix: '{self.final_namespace}'"
f"Final namespace without model suffix: '{self.final_namespace}'"
)
else:
# When workspace is empty, final_namespace equals original namespace
self.final_namespace = self.namespace
self.workspace = ""
logger.debug(f"Final namespace (no workspace): '{self.final_namespace}'")
cosine_threshold = kwargs.get("cosine_better_than_threshold")
if cosine_threshold is None:
raise ValueError(
Expand Down
102 changes: 102 additions & 0 deletions tests/test_milvus_index_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,108 @@ def test_initialize_uses_existing_database_without_recreating_it(self):
bootstrap_client.create_database.assert_not_called()
bootstrap_client.use_database.assert_called_once_with("lightrag")

def test_model_suffix_is_used_for_milvus_collection_names(self):
"""Milvus should isolate collections by embedding model when model_name is available."""
mock_embedding_func = MagicMock()
mock_embedding_func.embedding_dim = 2560
mock_embedding_func.model_name = "qwen3-embedding:4b"

storage = MilvusVectorDBStorage(
namespace="entities",
workspace="",
global_config={
"embedding_batch_num": 100,
"working_dir": "/tmp/lightrag",
"vector_db_storage_cls_kwargs": {
"cosine_better_than_threshold": 0.3,
},
},
embedding_func=mock_embedding_func,
meta_fields=set(),
)

assert storage.legacy_namespace == "entities"
assert storage.final_namespace == "entities_qwen3_embedding_4b_2560d"

def test_compatible_legacy_collection_is_reused_when_suffix_available(self):
"""Compatible legacy collections should still be reused after suffixing is introduced."""
mock_embedding_func = MagicMock()
mock_embedding_func.embedding_dim = 2560
mock_embedding_func.model_name = "qwen3-embedding:4b"

storage = MilvusVectorDBStorage(
namespace="entities",
workspace="",
global_config={
"embedding_batch_num": 100,
"working_dir": "/tmp/lightrag",
"vector_db_storage_cls_kwargs": {
"cosine_better_than_threshold": 0.3,
},
},
embedding_func=mock_embedding_func,
meta_fields=set(),
)
storage._client = MagicMock()
storage._client.has_collection.side_effect = (
lambda name: name == storage.legacy_namespace
)
storage._client.describe_collection.return_value = {}

with patch.object(storage, "_validate_collection_compatibility"):
with patch.object(storage, "_ensure_collection_loaded"):
storage._create_collection_if_not_exist()

storage._client.create_collection.assert_not_called()
assert storage.final_namespace == storage.legacy_namespace

def test_legacy_dimension_mismatch_creates_isolated_collection_when_suffix_available(
self,
):
"""A legacy dimension mismatch should create a new suffixed Milvus collection."""
mock_embedding_func = MagicMock()
mock_embedding_func.embedding_dim = 2560
mock_embedding_func.model_name = "qwen3-embedding:4b"

storage = MilvusVectorDBStorage(
namespace="entities",
workspace="",
global_config={
"embedding_batch_num": 100,
"working_dir": "/tmp/lightrag",
"vector_db_storage_cls_kwargs": {
"cosine_better_than_threshold": 0.3,
},
},
embedding_func=mock_embedding_func,
meta_fields=set(),
)
storage._client = MagicMock()
storage._client.has_collection.side_effect = (
lambda name: name == storage.legacy_namespace
)
storage._client.describe_collection.return_value = {}

with patch.object(
storage,
"_validate_collection_compatibility",
side_effect=ValueError(
"Vector dimension mismatch for collection 'entities': existing=4096, current=2560"
),
):
with patch.object(
storage, "_create_schema_for_namespace", return_value="schema"
):
with patch.object(storage, "_create_indexes_after_collection"):
with patch.object(storage, "_ensure_collection_loaded"):
storage._create_collection_if_not_exist()

storage._client.create_collection.assert_called_once_with(
collection_name="entities_qwen3_embedding_4b_2560d",
schema="schema",
)
assert storage.final_namespace == "entities_qwen3_embedding_4b_2560d"

def test_existing_collection_missing_vector_index_is_repaired(self):
"""Existing collections missing vector indexes should be repaired automatically."""
mock_embedding_func = MagicMock()
Expand Down
Loading