Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
"sqlalchemy>=2.0.0,<3",
"collate-sqllineage>=2.1.1",
"tabulate==0.9.0",
"tenacity>=8.0,<10",
"typing-inspect",
"packaging", # For version parsing
"setuptools>=78.1.1,<81", # <81 required: pkg_resources removed in setuptools 81+
Expand Down
22 changes: 22 additions & 0 deletions ingestion/src/metadata/domain/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""OpenMetadata domain utilities.

In-memory helpers operating on OpenMetadata's data model, reusable across
service-source bases and features. A module belongs here when it satisfies
ALL of:

1. Knows OM concepts (operates on OM-generated types or OM-specific ideas).
2. Owns no I/O infrastructure. May use an INJECTED OM client for read-only
queries; the client's lifecycle is the caller's.
3. Framework-independent — no topology, stages, or sinks.
4. Cross-cutting — used by more than one service-source base or feature.
"""
21 changes: 21 additions & 0 deletions ingestion/src/metadata/domain/tags/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tag and Classification domain utilities."""

from metadata.domain.tags.canonicalizer import Canonical, TagCanonicalizer
from metadata.domain.tags.registry import ScopeAlreadyClearedError, TagRegistry

__all__ = [
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - i dont like this because then people like to *

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is way handy that having too deep imports imho, we have static checks to prevent import *

"Canonical",
"ScopeAlreadyClearedError",
"TagCanonicalizer",
"TagRegistry",
]
136 changes: 136 additions & 0 deletions ingestion/src/metadata/domain/tags/canonicalizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TagCanonicalizer — case-corrected name resolution against OpenMetadata.

Resolves source-system Classification and Tag names to the canonical form
of any matching system-provider entity in OM (e.g., source reports
``pii.sensitive`` → returns ``PII.Sensitive``). Persistent ES failures
raise after retry exhaustion.
"""

import logging
import threading
from collections.abc import Iterable
from typing import Any, NamedTuple, cast

from tenacity import (
before_sleep_log,
retry,
stop_after_attempt,
wait_random_exponential,
)

from metadata.generated.schema.entity.classification.classification import Classification
from metadata.generated.schema.entity.classification.tag import Tag
from metadata.generated.schema.type.basic import ProviderType
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


_es_retry = retry(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking this could be more commonly reused in other places other than the canolicalaizer, e.g., when we search for tables, or owners

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but did not want to add it there for now. If this works properly, I will need to create another PR to migrate the rest of the databases to use this and the other service types, while also doing some cleaning. For sure this should be moved there.

stop=stop_after_attempt(5),
wait=wait_random_exponential(multiplier=2, max=30),
reraise=True,
before_sleep=before_sleep_log(logger, logging.WARNING),
)


class Canonical(NamedTuple):
"""Canonical (name, description) pair returned from OpenMetadata."""

name: str
description: str | None


class TagCanonicalizer:
"""Case-corrected name resolution for system Classifications and Tags.

Persistent ES failures raise; callers should wrap in ``Either`` to
surface them to workflow status.
"""

def __init__(self, metadata: OpenMetadata) -> None:
self._metadata = metadata
self._classification_cache: dict[str, Canonical] = {}
self._tag_cache: dict[str, Canonical] = {}
self._lock = threading.RLock()

def classification(
self,
name: str,
description: str | None = None,
) -> Canonical:
"""Return canonical classification name + description from OM, cached."""
key = name.lower()
with self._lock:
cached = self._classification_cache.get(key)
if cached is not None:
return cached

results = self._es_search(Classification, name)
canonical = Canonical(name=name, description=description)
for entity in results:
if entity.provider == ProviderType.system and entity.name.root.lower() == key:
canonical = Canonical(
name=entity.name.root,
description=entity.description.root if entity.description else description,
)
break

with self._lock:
self._classification_cache.setdefault(key, canonical)
return canonical

def tag(
self,
classification_name: str,
tag_name: str,
tag_description: str | None = None,
) -> Canonical:
"""Return canonical tag name + description from OM, cached.

``classification_name`` must already be canonical (call ``classification`` first).
"""
tag_fqn = cast(
"str",
fqn.build(None, Tag, classification_name=classification_name, tag_name=tag_name),
)
key = tag_fqn.lower()
with self._lock:
cached = self._tag_cache.get(key)
if cached is not None:
return cached

results = self._es_search(Tag, tag_fqn)
canonical = Canonical(name=tag_name, description=tag_description)
for entity in results:
if (
entity.provider == ProviderType.system
and entity.classification.name == classification_name
and entity.name.root.lower() == tag_name.lower()
):
canonical = Canonical(
name=entity.name.root,
description=entity.description.root if entity.description else tag_description,
)
break

with self._lock:
self._tag_cache.setdefault(key, canonical)
return canonical

@_es_retry
def _es_search(self, entity_type: Any, search_string: str) -> Iterable[Any]:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be inside ometa with the decorator and all

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but did not want to add it there for now. If this works properly, I will need to create another PR to migrate the rest of the databases to use this and the other service types, while also doing some cleaning. For sure this should be moved there.

"""Run an ES search by FQN with retries."""
return self._metadata.es_search_from_fqn(entity_type=entity_type, fqn_search_string=search_string) or []
Loading
Loading