feat(ingestion): introduce TagRegistry domain layer#27991
feat(ingestion): introduce TagRegistry domain layer#27991
Conversation
Adds metadata.domain.tags with TagRegistry (per-Source bookkeeping) and TagCanonicalizer (case-corrected name resolution against OM). Migrates the Snowflake connector to the new architecture; other connectors stay on the legacy context.tags flow (strangler pattern). TagRegistry interns shared TagLabel instances by (classification, tag, label_type, state) and rebinds the per-entity dict on scope clear. On a schema with ~120k tag attachments and 21 unique tags, peak heap drops from ~112 MB to ~24 MB. Public get_*_tag_labels methods on the database service base are unchanged; non-Snowflake DB connectors are not touched.
There was a problem hiding this comment.
Pull request overview
This PR introduces a new ingestion “domain” layer for tags/classifications to reduce memory usage by interning TagLabel instances and separating canonicalization (case correction) from bookkeeping. It wires the new TagRegistry/TagCanonicalizer into the database source base and migrates the Snowflake connector to use the new flow, with added unit test coverage and a new tenacity dependency for retry behavior.
Changes:
- Added
metadata.domain.tagswithTagRegistry(interning + scope clearing) andTagCanonicalizer(system-provider case-corrected resolution with retries). - Wired registry/canonicalizer into
DatabaseServiceSourceand migrated Snowflake tag ingestion + tag-label lookup to use the registry. - Added unit tests for the new domain layer and added
tenacityto ingestion dependencies.
Reviewed changes
Copilot reviewed 9 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| ingestion/src/metadata/domain/init.py | Defines the new domain-layer package and its intended boundaries. |
| ingestion/src/metadata/domain/tags/init.py | Exposes the tags domain API (TagRegistry, TagCanonicalizer, etc.). |
| ingestion/src/metadata/domain/tags/registry.py | Implements tag payload dedup + TagLabel interning + per-scope label storage/clearing. |
| ingestion/src/metadata/domain/tags/canonicalizer.py | Implements ES-backed canonicalization with retry + caching. |
| ingestion/src/metadata/ingestion/source/database/database_service.py | Adds per-source registry/canonicalizer and topology hooks intended to clear per-scope tag state. |
| ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py | Migrates Snowflake tag ingestion and tag-label lookup/inheritance to the new registry/canonicalizer. |
| ingestion/tests/unit/domain/init.py | Test package marker for the new unit-test module path. |
| ingestion/tests/unit/domain/tags/init.py | Test package marker for tags domain unit tests. |
| ingestion/tests/unit/domain/tags/test_registry.py | Unit tests validating registry semantics, interning, and concurrency behavior. |
| ingestion/tests/unit/domain/tags/test_canonicalizer.py | Unit tests validating canonicalization, caching, and retry behavior. |
| ingestion/setup.py | Adds tenacity runtime dependency for canonicalizer retries. |
| def clear_scope(self, scope_fqn: str) -> None: | ||
| """Drop labels under ``scope_fqn`` and mark the scope cleared. | ||
|
|
||
| Subsequent ``attach`` calls for this scope will raise. | ||
| """ | ||
| prefix = scope_fqn + fqn.FQN_SEPARATOR | ||
|
|
||
| with self._scope_state_lock: | ||
| kept = {k: v for k, v in self._labels_by_entity.items() if k != scope_fqn and not k.startswith(prefix)} | ||
| dropped = len(self._labels_by_entity) - len(kept) | ||
| self._labels_by_entity = kept | ||
| if dropped: | ||
| logger.debug("TagRegistry: cleared scope %s (%d entity labels dropped)", scope_fqn, dropped) | ||
|
|
||
| with self._run_state_lock: | ||
| self._cleared_scopes.add(scope_fqn) |
| def yield_database_tag(self, database_entity: str) -> Iterable[Either[OMetaTagAndClassification]]: | ||
| """Yield database-level tags for the topology.""" | ||
| if not self.source_config.includeTags: | ||
| return | ||
|
|
||
| if database_entity in self.database_tags_map: | ||
| database_fqn = fqn.build( | ||
| self.metadata, | ||
| entity_type=Database, | ||
| service_name=self.context.get().database_service, | ||
| database_name=database_entity, | ||
| ) | ||
| for tag_info in self.database_tags_map[database_entity]: | ||
| yield from get_ometa_tag_and_classification( | ||
| tag_fqn=FullyQualifiedEntityName(database_fqn), | ||
| tags=[tag_info["tag_value"]], | ||
| classification_name=tag_info["tag_name"], | ||
| tag_description=SNOWFLAKE_TAG_DESCRIPTION, | ||
| classification_description=SNOWFLAKE_CLASSIFICATION_DESCRIPTION, | ||
| metadata=self.metadata, | ||
| system_tags=True, | ||
| if database_entity not in self.database_tags_map: | ||
| return | ||
|
|
||
| database_fqn = fqn.build( | ||
| self.metadata, | ||
| entity_type=Database, | ||
| service_name=self.context.get().database_service, | ||
| database_name=database_entity, | ||
| ) | ||
| for tag_info in self.database_tags_map[database_entity]: | ||
| try: | ||
| classification = self.tag_canonicalizer.classification( | ||
| tag_info["tag_name"], SNOWFLAKE_CLASSIFICATION_DESCRIPTION | ||
| ) | ||
| tag = self.tag_canonicalizer.tag(classification.name, tag_info["tag_value"], SNOWFLAKE_TAG_DESCRIPTION) | ||
|
|
||
| self.tags_registry.attach( | ||
| scope_fqn=database_fqn, | ||
| entity_fqn=database_fqn, | ||
| classification_name=classification.name, | ||
| tag_name=tag.name, | ||
| classification_description=classification.description, | ||
| tag_description=tag.description, | ||
| ) | ||
| except Exception as exc: |
| from metadata.domain.tags.canonicalizer import Canonical, TagCanonicalizer | ||
| from metadata.domain.tags.registry import ScopeAlreadyClearedError, TagRegistry | ||
|
|
||
| __all__ = [ |
There was a problem hiding this comment.
nit - i dont like this because then people like to *
There was a problem hiding this comment.
This is way handy that having too deep imports imho, we have static checks to prevent import *
| logger = ingestion_logger() | ||
|
|
||
|
|
||
| _es_retry = retry( |
There was a problem hiding this comment.
thinking this could be more commonly reused in other places other than the canolicalaizer, e.g., when we search for tables, or owners
There was a problem hiding this comment.
I agree, but did not want to add it there for now. If this works properly, I will need to create another PR to migrate the rest of the databases to use this and the other service types, while also doing some cleaning. For sure this should be moved there.
| return canonical | ||
|
|
||
| @_es_retry | ||
| def _es_search(self, entity_type: Any, search_string: str) -> Iterable[Any]: |
There was a problem hiding this comment.
this could be inside ometa with the decorator and all
There was a problem hiding this comment.
I agree, but did not want to add it there for now. If this works properly, I will need to create another PR to migrate the rest of the databases to use this and the other service types, while also doing some cleaning. For sure this should be moved there.
| error=f"Tag canonicalization failed for {tag_info['tag_name']}.{tag_info['tag_value']}: {exc}", | ||
| ) | ||
| ) | ||
| yield from (Either(right=record) for record in self.tags_registry.drain()) |
There was a problem hiding this comment.
i dont follow why the drain() here helps vs the vanilla yield we had
There was a problem hiding this comment.
It is not about the drain itself.
The gains are on how we populate and what we yield.
Before we were yielding the following:
for tag_info in self.schema_tags_map[schema_name]:
yield from get_ometa_tag_and_classification(
tag_fqn=FullyQualifiedEntityName(schema_fqn),
tags=[tag_info["tag_value"]],
classification_name=tag_info["tag_name"],
tag_description=SNOWFLAKE_TAG_DESCRIPTION,
classification_description=SNOWFLAKE_CLASSIFICATION_DESCRIPTION,
metadata=self.metadata,
system_tags=True,
)This was yielding multiple times the same tag and classification to be ingested since we are saving basically Tag References.
If Tag A was used one time at a schema, one time at a database and one time at a column, we would send 3 puts on that Tag and Classification.
Drain comes from pending tags that have not yet been yielded to the sink, based on this logic that should avoid us sending duplicates (just classification ones due to the current coupling between both, but I did not want to delve there due to the urgency)
with self._run_state_lock:
tag_fqn = model_str(tag_label.tagFQN)
if tag_fqn not in self._known_tag_fqns:
self._known_tag_fqns.add(tag_fqn)
self._pending.append(
self._build_pending_record(
classification_name=classification_name,
classification_description=classification_description,
tag_name=tag_name,
tag_description=tag_description,
)
)This is geared towards time improvement, memory improvement is handled within the TagRegistry as well, avoiding creating a lot of repeated pydantic TagLabel objects by using references to them instead.
All this while centralizing the Tag logic in one place with a clear api.
| schema_name=schema_name, | ||
| ) | ||
| self.tags_registry.clear_scope(schema_fqn) | ||
| yield from () |
There was a problem hiding this comment.
Due to how the post process actually works in the Topology Runner:
yield from self._run_node_post_process(node=node)
- Drop None defaults from `TagRegistry.attach()` description params
(required `str | None`); normalize None -> "" inside
`_build_pending_record` so the OM schema's required-non-null Markdown
contract is owned at the registry boundary, not at every caller.
- Fix `yield_database_tag` missing registry drain (PR review).
- Fix `clear_scope` lock-order race that left labels visible after the
scope was marked cleared (PR review).
- Resolve basedpyright errors:
- `Either` and `TopologyNode` use Form 3 Pydantic v2 fields
(`Annotated[X, Field(...)] = default`) so static checkers see the
defaults.
- `cast("str", fqn.build(...))` at the 13 sites that feed
`fqn.build(...)` results into FQN-typed args.
- Scoped `# pyright: ignore[reportAttributeAccessIssue]` on
`TopologyContext` dynamic-attribute accesses (matches the codebase
pattern of grandfathering 8.4k+ such errors via baseline).
- Populate `stackTrace` on the three snowflake tag-error
`StackTraceError`s so the Status UI shows the trace, not just the
one-line summary.
- Rewrite three snowflake tag-inheritance tests to drive the real
registry attach + inheritance walk after `get_tag_label` was removed:
- `test_schema_tag_inheritance`
- `test_database_tag_inheritance`
- `test_tag_value_precedence` (one attach intentionally passes
`None` descriptions to exercise the registry's normalization)
🔴 Playwright Results — 4 failure(s), 8 flaky✅ 4016 passed · ❌ 4 failed · 🟡 8 flaky · ⏭️ 86 skipped
Genuine Failures (failed on all attempts)❌
|
…cascade Reverting `Either.left`/`Either.right` to the original `Annotated[Optional[T], Field(default=None, ...)]` form. The Form 3 shape (`Annotated[T | None, Field(...)] = None`) introduced in the prior commit caused pyright to eagerly bind `T` from the literal-None default at every no-arg construction site like `Either(left=...)`, resolving them to `Either[Unknown]`. Because `Either[T]` is invariant, those failed to satisfy declared generator return types like `Iterable[Either[CreateTableRequest]]` — surfacing 45 latent reportReturnType errors across sample_data, dbt, sas, qliksense, sigma, common_db_source, common_broker_source, amundsen, sink/metadata_rest. Form 2 wraps the default inside `Annotated` metadata where pyright treats it as opaque: it sees "this field has a default" (so construction sites pass) but doesn't eagerly bind `T` from None. Context-driven inference works, no cascade. `TopologyNode` stays in Form 3 — its fields are concrete-typed (`list[str] | None`, `bool`), no Generic-T to bind.
| with self._run_state_lock: | ||
| if scope_fqn in self._cleared_scopes: | ||
| raise ScopeAlreadyClearedError( | ||
| f"Tag attach called for cleared scope '{scope_fqn!r}' for entity '{entity_fqn!r}'" | ||
| ) | ||
|
|
||
| tag_label = self._intern_tag_label( | ||
| classification_name=classification_name, | ||
| tag_name=tag_name, | ||
| label_type=label_type, | ||
| state=state, | ||
| ) | ||
|
|
||
| with self._scope_state_lock: | ||
| self._labels_by_entity.setdefault(entity_fqn, []).append(tag_label) |
There was a problem hiding this comment.
💡 Edge Case: TOCTOU race between _run_state_lock and _scope_state_lock in attach
In TagRegistry.attach, the check for _cleared_scopes (line 122-126 under _run_state_lock) and the mutation of _labels_by_entity (line 135-136 under _scope_state_lock) are not atomic. A concurrent clear_scope call could interleave between these two critical sections: Thread A passes the cleared-scope guard, Thread B calls clear_scope (marks the scope cleared and rebuilds _labels_by_entity), Thread A then appends a label into the freshly rebuilt dict — effectively inserting a label into a scope that was just cleared.
In the current topology model this is unlikely to trigger because clear_scope runs in post_process after all attach calls for that scope have finished. However, the class advertises itself as "safe for concurrent use across the topology's parallel schema workers" (docstring line 21), which makes this a latent correctness gap if threading assumptions change.
Suggested fix:
Either:
1. Use a single lock for both _cleared_scopes and _labels_by_entity, or
2. Re-check _cleared_scopes inside the _scope_state_lock block:
with self._scope_state_lock:
with self._run_state_lock:
if scope_fqn in self._cleared_scopes:
raise ScopeAlreadyClearedError(...)
self._labels_by_entity.setdefault(entity_fqn, []).append(tag_label)
- Apply suggested fix
Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎
Code Review 👍 Approved with suggestions 1 resolved / 2 findingsIntroduces the TagRegistry domain layer to optimize ingestion memory usage and resolves missing tag drains, though a TOCTOU race condition in 💡 Edge Case: TOCTOU race between _run_state_lock and _scope_state_lock in attach📄 ingestion/src/metadata/domain/tags/registry.py:122-136 In In the current topology model this is unlikely to trigger because Suggested fix✅ 1 resolved✅ Bug: yield_database_tag never drains registry — tags lost
🤖 Prompt for agentsOptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
| @cached_property | ||
| def tags_registry(self) -> TagRegistry: | ||
| """Per-Source registry tracking tag/classification ingestion state.""" | ||
| return TagRegistry(metadata=self.metadata) | ||
|
|
||
| @cached_property | ||
| def tag_canonicalizer(self) -> TagCanonicalizer: | ||
| """Per-Source canonicalizer for case-corrected tag/classification names.""" | ||
| return TagCanonicalizer(metadata=self.metadata) |
| with self._scope_state_lock: | ||
| self._labels_by_entity.setdefault(entity_fqn, []).append(tag_label) | ||
|
|
||
| with self._run_state_lock: |
| """Skip tenacity's between-retry sleeps so retry-tests run instantly.""" | ||
| monkeypatch.setattr("tenacity.nap.time.sleep", lambda *_args, **_kwargs: None) |
|


Fixes open-metadata/openmetadata-collate#3999
Describe your changes:
Why
The legacy
context.tagslist overloads two roles (sink-bound queue + per-entity inheritance lookup) and stores one PydanticTagLabelper attachment. On a Snowflake schema with ~120k tag attachments this peaks at ~112 MB — disproportionate to the ~20 unique tags actually in play.What
Adds
metadata.domain.tagswith two classes:TagRegistry— per-Source bookkeeping for tag/classification ingestionTagCanonicalizer— case-corrected name resolution against system-provider entities in OMMigrates the Snowflake connector to use them. Other DB connectors continue on the legacy
context.tagsflow (strangler pattern; future PRs migrate per-connector).How
TagRegistryinterns sharedTagLabelinstances by(classification, tag, labelType, state)— N attachments dereference to the same Pydantic graph.clear_scoperebinds_labels_by_entityto a fresh dict (not pop-in-place) so the hash-table bucket array releases immediately.Type of change:
High-level design:
Architecture
metadata.domain.tagsis a new domain layer (in-memory helpers operating on OM-generated types, no I/O ownership beyond an injectedOpenMetadataclient for read-only queries). Two classes by separated concerns:TagRegistry— three storage groups behind two locks._pendingqueues sink-boundOMetaTagAndClassificationpayloads (deduped by_known_tag_fqns)._labels_by_entitymaps entity FQN → list of sharedTagLabelreferences._tag_label_cacheinterns the underlyingTagLabelobjects.TagCanonicalizer— separate concern: case-corrected lookup against system-provider classifications/tags via ES with retry. Failures raise; callers wrap inEither.DatabaseServiceSourceexposes both ascached_propertyand wiresclear_schema_tag_scope/clear_database_tag_scopetopology hooks.Files added
ingestion/src/metadata/domain/__init__.pyingestion/src/metadata/domain/tags/__init__.pyingestion/src/metadata/domain/tags/registry.pyingestion/src/metadata/domain/tags/canonicalizer.pyingestion/tests/unit/domain/tags/test_registry.py(34 tests)ingestion/tests/unit/domain/tags/test_canonicalizer.py(13 tests)Files updated
ingestion/src/metadata/ingestion/source/database/database_service.py— registry/canonicalizer wiring + topology hooksingestion/src/metadata/ingestion/source/database/snowflake/metadata.py— migrated `yield_tag` and `get_*_tag_labels` overridesingestion/setup.py— addedtenacity>=8.0,<10for canonicalizer retriesTests:
Unit tests
test_registry.py(attach/labels_for/drain/clear_scope/ensure_known/stats, thread safety, interning identity by `is`-equality, cache survives `clear_scope`, fqn-level dedup across `label_type` variants)test_canonicalizer.py(canonicalization, retry behavior, no cache poisoning on persistent failure)Manual testing performed
UI screen recording / screenshots:
Not applicable.
Checklist:
Summary by Gitar
clear_scopeby moving the_cleared_scopesupdate inside the state lock._build_pending_recordto handle empty strings for descriptions instead of optional checks.Eithermodel typing usingAnnotatedand|unions for better type checking.TopologyNodeto useAnnotatedfor field definitions andlist[str] | Nonefor union types.DatabaseServiceSourceandSnowflakeSourceusingcastto satisfy type checkers.traceback.format_exc()inStackTraceErrorobjects during tag canonicalization.TagRegistrystate rather than complex mocks, improving test reliability and clarity.This will update automatically on new commits.