Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 69 additions & 19 deletions ingestion/src/metadata/utils/datalake/datalake_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@

logger = utils_logger()

# Explicit type precedence so mixed-type object columns are not mis-typed by lexicographic max().
# dict > list > datetime > numeric > str, matching _data_formats priority.
_TYPE_PRECEDENCE = (
"dict",
"list",
"datetime64[ns]",

Check failure on line 40 in ingestion/src/metadata/utils/datalake/datalake_utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Define a constant instead of duplicating this literal "datetime64[ns]" 4 times.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4VryzuSfAHfRjLQ2Ba&open=AZ4VryzuSfAHfRjLQ2Ba&pullRequest=27951
"datetime",
"timedelta[ns]",

Check failure on line 42 in ingestion/src/metadata/utils/datalake/datalake_utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Define a constant instead of duplicating this literal "timedelta[ns]" 4 times.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4VryzuSfAHfRjLQ2BZ&open=AZ4VryzuSfAHfRjLQ2BZ&pullRequest=27951
"float64",
"float32",
"float",
"int64",
"int32",
"int",
"bool",
"str",
"bytes",
)


def _resolve_col_type(type_list: List[str]) -> str: # noqa: UP006
"""Pick the dominant type from type_list using _TYPE_PRECEDENCE instead of lexicographic max()."""
type_set = set(type_list)
for t in _TYPE_PRECEDENCE:
if t in type_set:
return t
return type_list[0] if type_list else "str"


class _ArrayOfStruct:
"""Marker for a JSON value observed as a list of dicts. Carries the merged struct shape
Expand Down Expand Up @@ -226,7 +254,7 @@

@staticmethod
def _get_data_frame(
data_frame: Union[List["DataFrame"], "DataFrame"], # noqa: F821, UP006

Check warning on line 257 in ingestion/src/metadata/utils/datalake/datalake_utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Use a union type expression for this type hint.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4AquEsiO-pOX9cQGKZ&open=AZ4AquEsiO-pOX9cQGKZ&pullRequest=27951
sample: bool,
shuffle: bool, # noqa: F821, RUF100
):
Expand Down Expand Up @@ -282,7 +310,7 @@
return self._get_columns(self.data_frame)

@classmethod
def _get_columns(cls, data_frame: "DataFrame"): # noqa: F821

Check failure on line 313 in ingestion/src/metadata/utils/datalake/datalake_utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4AquEsiO-pOX9cQGKa&open=AZ4AquEsiO-pOX9cQGKa&pullRequest=27951
"""
method to process column details.

Expand Down Expand Up @@ -334,14 +362,20 @@
"""
data_type = None # default to string
try:
if data_frame[column_name].dtypes.name == "object" and any(data_frame[column_name].dropna().values):
col_series = data_frame[column_name]
col_non_null = col_series.dropna()
if col_series.dtypes.name == "object" and len(col_non_null) > 0:
try:
# Safely evaluate the input string
df_row_val_list = data_frame[column_name].dropna().values[:1000]
df_row_val_list = col_non_null.values[:1000]
parsed_object_datatype_list = []
for df_row_val in df_row_val_list:
try:
parsed_object_datatype_list.append(type(ast.literal_eval(str(df_row_val))).__name__.lower())
if isinstance(df_row_val, (dict, list)):
parsed_object_datatype_list.append(type(df_row_val).__name__.lower())
else:
parsed_object_datatype_list.append(
type(ast.literal_eval(str(df_row_val))).__name__.lower()
)
Comment thread
mohittilala marked this conversation as resolved.
except (ValueError, SyntaxError):
# we try to parse the value as a datetime, if it fails, we fallback to string
# as literal_eval will fail for string
Expand All @@ -354,11 +388,11 @@
if not str(df_row_val).isnumeric():
# check if the row value is time
try:
datetime.strptime(df_row_val, "%H:%M:%S").time()
datetime.strptime(str(df_row_val), "%H:%M:%S").time()
dtype_ = "timedelta[ns]"
except (ValueError, TypeError):
# check if the row value is date / time / datetime
type(parse(df_row_val)).__name__.lower()
type(parse(str(df_row_val))).__name__.lower()
dtype_ = "datetime64[ns]"
parsed_object_datatype_list.append(dtype_)
except (ParserError, TypeError):
Expand All @@ -369,19 +403,18 @@
)
parsed_object_datatype_list.append("str")

data_type = max(parsed_object_datatype_list)
# Determine the data type of the parsed object
data_type = _resolve_col_type(parsed_object_datatype_list)

except (ValueError, SyntaxError) as exc:
# Handle any exceptions that may occur
logger.debug(
f"ValueError/SyntaxError while parsing column '{column_name}' datatype: {exc}. "
f"Falling back to string."
)
data_type = "string"
data_type = "str"

data_type = cls._data_formats.get(
data_type or data_frame[column_name].dtypes.name,
data_type or col_series.dtypes.name,
)
if not data_type:
logger.debug(f"unknown data type {data_frame[column_name].dtypes.name}. resolving to string.")
Expand All @@ -394,7 +427,7 @@
return data_type or DataType.STRING

@classmethod
def unique_json_structure(cls, dicts: List[Dict]) -> Dict: # noqa: UP006

Check failure on line 430 in ingestion/src/metadata/utils/datalake/datalake_utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 17 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4AquEsiO-pOX9cQGKb&open=AZ4AquEsiO-pOX9cQGKb&pullRequest=27951
"""Given a sample of `n` json objects, return a json object that represents the unique
structure of all `n` objects. Note that the type of the key will be that of
the last object seen in the sample.
Expand Down Expand Up @@ -459,16 +492,33 @@
from pandas import Series # pylint: disable=import-outside-toplevel # noqa: PLC0415

json_column = cast(Series, json_column) # noqa: TC006
try:
json_column = json_column.apply(json.loads)
except TypeError as exc:
# if values are not strings, we will assume they are already json objects
# based on the read class logic
logger.debug(
f"TypeError while parsing JSON column children: {exc}. Assuming values are already JSON objects."
)
json_structure = cls.unique_json_structure(json_column.values.tolist())

dict_values = []
for value in json_column.dropna().values:
if isinstance(value, dict):
dict_values.append(value)
elif isinstance(value, str):
try:
parsed = json.loads(value)
if isinstance(parsed, dict):
dict_values.append(parsed)
else:
logger.debug(
"Skipping non-object JSON value while extracting column children: "
f"parsed type is {type(parsed).__name__}"
)
except (TypeError, json.JSONDecodeError) as exc:
logger.debug(f"Skipping unparseable string value while extracting column children: {exc}")
else:
logger.debug(
"Skipping non-string, non-dict value while extracting column children: "
f"type is {type(value).__name__}"
)

if not dict_values:
return []

json_structure = cls.unique_json_structure(dict_values)
return cls.construct_json_column_children(json_structure)

@classmethod
Expand Down
37 changes: 37 additions & 0 deletions ingestion/tests/unit/resources/datalake/dbt_catalog.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"dbt_version": "1.5.0",
"generated_at": "2024-01-01T00:00:00.000000Z",
"invocation_id": "abc-123",
"env": {}
},
"nodes": {
"model.my_project.customers": {
"metadata": {"type": "VIEW", "schema": "public", "name": "customers"},
"columns": {
"customer_id": {"type": "integer", "index": 1, "name": "customer_id"},
"name": {"type": "text", "index": 2, "name": "name"}
},
"stats": {},
"unique_id": "model.my_project.customers"
},
"model.my_project.orders": {
"metadata": {"type": "VIEW", "schema": "public", "name": "orders"},
"columns": {
"order_id": {"type": "integer", "index": 1, "name": "order_id"}
},
"stats": {},
"unique_id": "model.my_project.orders"
}
},
"sources": {
"source.my_project.raw.customers": {
"metadata": {"type": "TABLE", "schema": "raw", "name": "customers"},
"columns": {},
"stats": {},
"unique_id": "source.my_project.raw.customers"
}
},
"errors": null
}
42 changes: 42 additions & 0 deletions ingestion/tests/unit/resources/datalake/dbt_manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/manifest/v11/manifest.json",
"dbt_version": "1.5.0",
"generated_at": "2024-01-01T00:00:00.000000Z",
"invocation_id": "abc-123",
"env": {},
"project_name": "my_project",
"project_id": "xyz",
"adapter_type": "postgres",
"credential_id": null,
"profile_name": "my_profile"
},
"nodes": {
"model.my_project.customers": {
"name": "customers",
"description": "Customer records",
"unique_id": "model.my_project.customers",
"fqn": ["my_project", "customers"]
}
},
"sources": {
"source.my_project.raw.customers": {
"name": "customers",
"description": "Raw customer data",
"unique_id": "source.my_project.raw.customers"
}
},
"macros": {},
"docs": {},
"exposures": {},
"metrics": {},
"groups": {},
"selectors": {},
"disabled": {},
"parent_map": {},
"child_map": {},
"group_map": {},
"saved_queries": {},
"semantic_models": {},
"unit_tests": {}
}
Loading
Loading