diff --git a/lightrag/api/config.py b/lightrag/api/config.py index 981653c332..002f8f5d29 100644 --- a/lightrag/api/config.py +++ b/lightrag/api/config.py @@ -42,6 +42,7 @@ DEFAULT_OLLAMA_MODEL_TAG, DEFAULT_RERANK_BINDING, DEFAULT_ENTITY_TYPES, + DEFAULT_ENTITY_ATTRIBUTES, ) # use the .env that is inside the current folder @@ -474,6 +475,9 @@ def parse_args() -> argparse.Namespace: args.cors_origins = get_env_value("CORS_ORIGINS", "*") args.summary_language = get_env_value("SUMMARY_LANGUAGE", DEFAULT_SUMMARY_LANGUAGE) args.entity_types = get_env_value("ENTITY_TYPES", DEFAULT_ENTITY_TYPES, list) + args.entity_attributes = get_env_value( + "ENTITY_ATTRIBUTES", DEFAULT_ENTITY_ATTRIBUTES, list + ) args.whitelist_paths = get_env_value("WHITELIST_PATHS", "/health,/api/*") # For JWT Auth diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 242fe57240..ced29d4148 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -1168,6 +1168,7 @@ async def server_rerank_func( addon_params={ "language": args.summary_language, "entity_types": args.entity_types, + "entity_attributes": args.entity_attributes, }, ollama_server_infos=ollama_server_infos, ) diff --git a/lightrag/constants.py b/lightrag/constants.py index 318a83ea5b..2cc9639654 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -40,6 +40,11 @@ "NaturalObject", ] +# Default entity attributes: empty means no extra attribute field is extracted. +# Override with ENTITY_ATTRIBUTES env var, e.g.: +# ENTITY_ATTRIBUTES='["sentiment","urgency","business_impact","confidence"]' +DEFAULT_ENTITY_ATTRIBUTES: list[str] = [] + # Separator for: description, source_id and relation-key fields(Can not be changed after data inserted) GRAPH_FIELD_SEP = "" diff --git a/lightrag/operate.py b/lightrag/operate.py index 8ca3f5d303..841cdf6fc8 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -388,11 +388,14 @@ def _handle_single_entity_extraction( chunk_key: str, timestamp: int, file_path: str = "unknown_source", + entity_attributes: list[str] | None = None, ): - if len(record_attributes) != 4 or "entity" not in record_attributes[0]: + # Accept 4 fields (no attributes) or 5 fields (with attributes JSON as 5th field). + expected_fields = 5 if entity_attributes else 4 + if len(record_attributes) not in (4, 5) or "entity" not in record_attributes[0]: if len(record_attributes) > 1 and "entity" in record_attributes[0]: logger.warning( - f"{chunk_key}: LLM output format error; found {len(record_attributes)}/4 fields on ENTITY `{record_attributes[1]}` @ `{record_attributes[2] if len(record_attributes) > 2 else 'N/A'}`" + f"{chunk_key}: LLM output format error; found {len(record_attributes)}/{expected_fields} fields on ENTITY `{record_attributes[1]}` @ `{record_attributes[2] if len(record_attributes) > 2 else 'N/A'}`" ) logger.debug(record_attributes) return None @@ -449,6 +452,28 @@ def _handle_single_entity_extraction( ) return None + # Parse optional 5th field: compact JSON object with entity attributes. + # Produced only when entity_attributes is non-empty and the LLM chose to + # output them. Malformed JSON is logged and silently dropped so extraction + # still succeeds for the entity itself. + attributes: dict | None = None + if len(record_attributes) >= 5 and record_attributes[4].strip(): + raw_attrs = record_attributes[4].strip() + try: + parsed = json.loads(raw_attrs) + if isinstance(parsed, dict): + attributes = parsed + else: + logger.warning( + f"{chunk_key}: entity attributes field is not a JSON object " + f"for '{entity_name}': {raw_attrs[:120]}" + ) + except json.JSONDecodeError: + logger.warning( + f"{chunk_key}: could not parse entity attributes JSON " + f"for '{entity_name}': {raw_attrs[:120]}" + ) + return dict( entity_name=entity_name, entity_type=entity_type, @@ -456,6 +481,7 @@ def _handle_single_entity_extraction( source_id=chunk_key, file_path=file_path, timestamp=timestamp, + attributes=attributes, ) except ValueError as e: @@ -941,6 +967,7 @@ async def _process_extraction_result( file_path: str = "unknown_source", tuple_delimiter: str = "<|#|>", completion_delimiter: str = "<|COMPLETE|>", + entity_attributes: list[str] | None = None, ) -> tuple[dict, dict]: """Process a single extraction result (either initial or gleaning) Args: @@ -1023,7 +1050,8 @@ async def _process_extraction_result( # Try to parse as entity entity_data = _handle_single_entity_extraction( - record_attributes, chunk_key, timestamp, file_path + record_attributes, chunk_key, timestamp, file_path, + entity_attributes=entity_attributes, ) if entity_data is not None: truncated_name = _truncate_entity_identifier( @@ -1124,6 +1152,7 @@ async def _update_entity_storage( file_paths: list[str], source_chunk_ids: list[str], truncation_info: str = "", + attributes: dict | None = None, ): try: # Update entity in graph storage (critical path) @@ -1138,6 +1167,11 @@ async def _update_entity_storage( "created_at": int(time.time()), "truncate": truncation_info, } + # Store merged attributes when present. Stored as a JSON string so that + # all graph backends (NetworkX, Postgres, Neo4j) can round-trip it without + # schema changes. Consumers read it back with json.loads. + if attributes: + updated_entity_data["attributes"] = json.dumps(attributes) await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) # Update entity in vector database (equally critical) @@ -1255,6 +1289,8 @@ async def _update_entity_storage( entity_types = [] file_paths_list = [] seen_paths = set() + # Collect attributes from all entity instances; last non-null value per key wins. + merged_attributes: dict = {} for entity_data in all_entity_data: if entity_data.get("description"): @@ -1266,6 +1302,11 @@ async def _update_entity_storage( if file_path and file_path not in seen_paths: file_paths_list.append(file_path) seen_paths.add(file_path) + attrs = entity_data.get("attributes") + if isinstance(attrs, dict): + merged_attributes.update( + {k: v for k, v in attrs.items() if v is not None} + ) # Apply MAX_FILE_PATHS limit max_file_paths = global_config.get("max_file_paths", DEFAULT_MAX_FILE_PATHS) @@ -1327,6 +1368,7 @@ async def _update_entity_storage( file_paths_list, limited_chunk_ids, truncation_info, + attributes=merged_attributes or None, ) # Log rebuild completion with truncation info @@ -1761,6 +1803,30 @@ async def _merge_nodes_then_upsert( reverse=True, )[0][0] + # Merge attributes across all new entity instances (last non-null value per key wins). + # Seed from the existing node so re-processing a document doesn't wipe old attrs. + merged_attributes: dict = {} + if already_node: + existing_attrs_raw = already_node.get("attributes") + if existing_attrs_raw: + try: + existing_attrs = ( + json.loads(existing_attrs_raw) + if isinstance(existing_attrs_raw, str) + else existing_attrs_raw + ) + if isinstance(existing_attrs, dict): + merged_attributes.update(existing_attrs) + except (json.JSONDecodeError, TypeError): + pass + + for dp in nodes_data: + attrs = dp.get("attributes") + if isinstance(attrs, dict): + merged_attributes.update( + {k: v for k, v in attrs.items() if v is not None} + ) + # 7. Deduplicate nodes by description, keeping first occurrence in the same document unique_nodes = {} for i, dp in enumerate(nodes_data, start=1): @@ -1912,6 +1978,8 @@ async def _merge_nodes_then_upsert( created_at=int(time.time()), truncate=truncation_info, ) + if merged_attributes: + node_data["attributes"] = json.dumps(merged_attributes) await knowledge_graph_inst.upsert_node( entity_name, node_data=node_data, @@ -2905,13 +2973,32 @@ async def extract_entities( entity_types = global_config["addon_params"].get( "entity_types", DEFAULT_ENTITY_TYPES ) + entity_attributes: list[str] = global_config["addon_params"].get( + "entity_attributes", [] + ) or [] examples = "\n".join(PROMPTS["entity_extraction_examples"]) + # Build the attributes instruction injected into the user prompt. + # Empty list → plain 4-field format (backward-compatible). + if entity_attributes: + attrs_quoted = ", ".join(f'"{a}"' for a in entity_attributes) + entity_attributes_instruction = ( + f"For each entity append a 5th field: a compact single-line JSON object " + f"with keys [{attrs_quoted}]. Infer each value from the context; " + f"use null if not determinable. " + f"Numeric confidence must be 0.0–1.0. " + f"Example: {{\"sentiment\": \"negative\", \"urgency\": \"high\", \"confidence\": 0.85}}" + ) + else: + entity_attributes_instruction = "No additional attributes are required." + example_context_base = dict( tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"], completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"], entity_types=", ".join(entity_types), + entity_attributes=", ".join(entity_attributes), + entity_attributes_instruction=entity_attributes_instruction, language=language, ) # add example's format @@ -2921,6 +3008,8 @@ async def extract_entities( tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"], completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"], entity_types=",".join(entity_types), + entity_attributes=", ".join(entity_attributes), + entity_attributes_instruction=entity_attributes_instruction, examples=examples, language=language, ) @@ -2981,6 +3070,7 @@ async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]): file_path, tuple_delimiter=context_base["tuple_delimiter"], completion_delimiter=context_base["completion_delimiter"], + entity_attributes=entity_attributes, ) # Process additional gleaning results only 1 time when entity_extract_max_gleaning is greater than zero. @@ -3024,6 +3114,7 @@ async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]): file_path, tuple_delimiter=context_base["tuple_delimiter"], completion_delimiter=context_base["completion_delimiter"], + entity_attributes=entity_attributes, ) # Merge results - compare description lengths to choose better version diff --git a/lightrag/prompt.py b/lightrag/prompt.py index dcd829d487..eb3500c391 100644 --- a/lightrag/prompt.py +++ b/lightrag/prompt.py @@ -20,6 +20,8 @@ * `entity_description`: Provide a concise yet comprehensive description of the entity's attributes and activities, based *solely* on the information present in the input text. * **Output Format - Entities:** Output a total of 4 fields for each entity, delimited by `{tuple_delimiter}`, on a single line. The first field *must* be the literal string `entity`. * Format: `entity{tuple_delimiter}entity_name{tuple_delimiter}entity_type{tuple_delimiter}entity_description` + * **Attributes (when requested):** If `{entity_attributes}` is non-empty, append a 5th field containing a compact JSON object with keys `{entity_attributes}`. Infer each value from the entity context; use `null` if not determinable. The JSON must be on a single line with no surrounding whitespace. Numeric confidence values must be in the range 0.0–1.0. + * Extended format: `entity{tuple_delimiter}entity_name{tuple_delimiter}entity_type{tuple_delimiter}entity_description{tuple_delimiter}{{"attr1": "val1", "attr2": "val2"}}` 2. **Relationship Extraction & Output:** * **Identification:** Identify direct, clearly stated, and meaningful relationships between previously extracted entities. @@ -32,6 +34,7 @@ * `relationship_description`: A concise explanation of the nature of the relationship between the source and target entities, providing a clear rationale for their connection. * **Output Format - Relationships:** Output a total of 5 fields for each relationship, delimited by `{tuple_delimiter}`, on a single line. The first field *must* be the literal string `relation`. * Format: `relation{tuple_delimiter}source_entity{tuple_delimiter}target_entity{tuple_delimiter}relationship_keywords{tuple_delimiter}relationship_description` + * Relationships never carry an attributes field — the 5-field format is unchanged regardless of `{entity_attributes}`. 3. **Delimiter Usage Protocol:** * The `{tuple_delimiter}` is a complete, atomic marker and **must not be filled with content**. It serves strictly as a field separator. @@ -68,6 +71,7 @@ 2. **Output Content Only:** Output *only* the extracted list of entities and relationships. Do not include any introductory or concluding remarks, explanations, or additional text before or after the list. 3. **Completion Signal:** Output `{completion_delimiter}` as the final line after all relevant entities and relationships have been extracted and presented. 4. **Output Language:** Ensure the output language is {language}. Proper nouns (e.g., personal names, place names, organization names) must be kept in their original language and not translated. +5. **Entity Attributes:** {entity_attributes_instruction} ---Data to be Processed---