Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions ingestion/src/metadata/utils/datalake/datalake_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@

@staticmethod
def _get_data_frame(
data_frame: Union[List["DataFrame"], "DataFrame"], # noqa: F821, UP006

Check warning on line 229 in ingestion/src/metadata/utils/datalake/datalake_utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Use a union type expression for this type hint.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4AquEsiO-pOX9cQGKZ&open=AZ4AquEsiO-pOX9cQGKZ&pullRequest=27951
sample: bool,
shuffle: bool, # noqa: F821, RUF100
):
Expand Down Expand Up @@ -282,7 +282,7 @@
return self._get_columns(self.data_frame)

@classmethod
def _get_columns(cls, data_frame: "DataFrame"): # noqa: F821

Check failure on line 285 in ingestion/src/metadata/utils/datalake/datalake_utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4AquEsiO-pOX9cQGKa&open=AZ4AquEsiO-pOX9cQGKa&pullRequest=27951
"""
method to process column details.

Expand Down Expand Up @@ -334,14 +334,20 @@
"""
data_type = None # default to string
try:
if data_frame[column_name].dtypes.name == "object" and any(data_frame[column_name].dropna().values):
col_series = data_frame[column_name]
col_non_null = col_series.dropna()
if col_series.dtypes.name == "object" and len(col_non_null) > 0:
try:
# Safely evaluate the input string
df_row_val_list = data_frame[column_name].dropna().values[:1000]
df_row_val_list = col_non_null.values[:1000]
parsed_object_datatype_list = []
for df_row_val in df_row_val_list:
try:
parsed_object_datatype_list.append(type(ast.literal_eval(str(df_row_val))).__name__.lower())
if isinstance(df_row_val, (dict, list)):
parsed_object_datatype_list.append(type(df_row_val).__name__.lower())
else:
parsed_object_datatype_list.append(
type(ast.literal_eval(str(df_row_val))).__name__.lower()
)
Comment thread
mohittilala marked this conversation as resolved.
except (ValueError, SyntaxError):
# we try to parse the value as a datetime, if it fails, we fallback to string
# as literal_eval will fail for string
Expand All @@ -354,11 +360,11 @@
if not str(df_row_val).isnumeric():
# check if the row value is time
try:
datetime.strptime(df_row_val, "%H:%M:%S").time()
datetime.strptime(str(df_row_val), "%H:%M:%S").time()
dtype_ = "timedelta[ns]"
except (ValueError, TypeError):
# check if the row value is date / time / datetime
type(parse(df_row_val)).__name__.lower()
type(parse(str(df_row_val))).__name__.lower()
dtype_ = "datetime64[ns]"
parsed_object_datatype_list.append(dtype_)
except (ParserError, TypeError):
Expand All @@ -378,10 +384,10 @@
f"ValueError/SyntaxError while parsing column '{column_name}' datatype: {exc}. "
f"Falling back to string."
)
data_type = "string"
data_type = "str"

data_type = cls._data_formats.get(
data_type or data_frame[column_name].dtypes.name,
data_type or col_series.dtypes.name,
)
if not data_type:
logger.debug(f"unknown data type {data_frame[column_name].dtypes.name}. resolving to string.")
Expand All @@ -394,7 +400,7 @@
return data_type or DataType.STRING

@classmethod
def unique_json_structure(cls, dicts: List[Dict]) -> Dict: # noqa: UP006

Check failure on line 403 in ingestion/src/metadata/utils/datalake/datalake_utils.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 17 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ4AquEsiO-pOX9cQGKb&open=AZ4AquEsiO-pOX9cQGKb&pullRequest=27951
"""Given a sample of `n` json objects, return a json object that represents the unique
structure of all `n` objects. Note that the type of the key will be that of
the last object seen in the sample.
Expand Down Expand Up @@ -459,16 +465,33 @@
from pandas import Series # pylint: disable=import-outside-toplevel # noqa: PLC0415

json_column = cast(Series, json_column) # noqa: TC006
try:
json_column = json_column.apply(json.loads)
except TypeError as exc:
# if values are not strings, we will assume they are already json objects
# based on the read class logic
logger.debug(
f"TypeError while parsing JSON column children: {exc}. Assuming values are already JSON objects."
)
json_structure = cls.unique_json_structure(json_column.values.tolist())

dict_values = []
for value in json_column.dropna().values:
if isinstance(value, dict):
dict_values.append(value)
elif isinstance(value, str):
try:
parsed = json.loads(value)
if isinstance(parsed, dict):
dict_values.append(parsed)
else:
logger.debug(
"Skipping non-object JSON value while extracting column children: "
f"parsed type is {type(parsed).__name__}"
)
except (TypeError, json.JSONDecodeError) as exc:
logger.debug(f"Skipping unparseable string value while extracting column children: {exc}")
else:
logger.debug(
"Skipping non-string, non-dict value while extracting column children: "
f"type is {type(value).__name__}"
)

if not dict_values:
return []

json_structure = cls.unique_json_structure(dict_values)
return cls.construct_json_column_children(json_structure)

@classmethod
Expand Down
37 changes: 37 additions & 0 deletions ingestion/tests/unit/resources/datalake/dbt_catalog.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json",
"dbt_version": "1.5.0",
"generated_at": "2024-01-01T00:00:00.000000Z",
"invocation_id": "abc-123",
"env": {}
},
"nodes": {
"model.my_project.customers": {
"metadata": {"type": "VIEW", "schema": "public", "name": "customers"},
"columns": {
"customer_id": {"type": "integer", "index": 1, "name": "customer_id"},
"name": {"type": "text", "index": 2, "name": "name"}
},
"stats": {},
"unique_id": "model.my_project.customers"
},
"model.my_project.orders": {
"metadata": {"type": "VIEW", "schema": "public", "name": "orders"},
"columns": {
"order_id": {"type": "integer", "index": 1, "name": "order_id"}
},
"stats": {},
"unique_id": "model.my_project.orders"
}
},
"sources": {
"source.my_project.raw.customers": {
"metadata": {"type": "TABLE", "schema": "raw", "name": "customers"},
"columns": {},
"stats": {},
"unique_id": "source.my_project.raw.customers"
}
},
"errors": null
}
42 changes: 42 additions & 0 deletions ingestion/tests/unit/resources/datalake/dbt_manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"metadata": {
"dbt_schema_version": "https://schemas.getdbt.com/dbt/manifest/v11/manifest.json",
"dbt_version": "1.5.0",
"generated_at": "2024-01-01T00:00:00.000000Z",
"invocation_id": "abc-123",
"env": {},
"project_name": "my_project",
"project_id": "xyz",
"adapter_type": "postgres",
"credential_id": null,
"profile_name": "my_profile"
},
"nodes": {
"model.my_project.customers": {
"name": "customers",
"description": "Customer records",
"unique_id": "model.my_project.customers",
"fqn": ["my_project", "customers"]
}
},
"sources": {
"source.my_project.raw.customers": {
"name": "customers",
"description": "Raw customer data",
"unique_id": "source.my_project.raw.customers"
}
},
"macros": {},
"docs": {},
"exposures": {},
"metrics": {},
"groups": {},
"selectors": {},
"disabled": {},
"parent_map": {},
"child_map": {},
"group_map": {},
"saved_queries": {},
"semantic_models": {},
"unit_tests": {}
}
195 changes: 194 additions & 1 deletion ingestion/tests/unit/utils/test_datalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def test_create_column_object(self):
"children": formatted_column,
}
column_obj = Column(**column)
assert len(column_obj.children) == 3
assert column_obj.children is not None and len(column_obj.children) == 3


class TestParquetDataFrameColumnParser(TestCase):
Expand Down Expand Up @@ -811,6 +811,199 @@ def test_fallback_to_json_schema_parser(self):
self.assertIsNotNone(columns)


class TestFetchColTypesWithParsedObjects:
"""fetch_col_types must correctly type object-dtype columns whose values are already
parsed Python dicts or lists, including falsy containers ({}, [])."""

def test_empty_dict_typed_as_json(self):
df = pd.DataFrame({"col": [{}]})
assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.JSON

def test_empty_list_typed_as_array(self):
df = pd.DataFrame({"col": [[]]})
assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.ARRAY

def test_multiple_empty_dicts_typed_as_json(self):
df = pd.DataFrame({"col": [{}, {}, {}]})
assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.JSON

def test_dict_with_data_typed_as_json(self):
df = pd.DataFrame({"col": [{"k": "v"}]})
assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.JSON

def test_list_with_data_typed_as_array(self):
df = pd.DataFrame({"col": [[1, 2, 3]]})
assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.ARRAY

def test_large_already_parsed_dict_typed_as_json(self):
large = {str(i): i for i in range(500)}
df = pd.DataFrame({"col": [large]})
assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.JSON

def test_null_column_typed_as_string(self):
df = pd.DataFrame({"col": [None]})
assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.STRING

def test_string_column_typed_as_string(self):
df = pd.DataFrame({"col": ["hello"]})
assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.STRING

def test_int_column_typed_as_int(self):
df = pd.DataFrame({"col": [42]})
assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.INT


class TestGetChildrenWithParsedDicts:
"""get_children must correctly extract children regardless of whether the Series
values are already-parsed Python dicts, JSON strings, or a mix of both."""

def test_already_parsed_dict_returns_children(self):
col = pd.Series([{"name": "Alice", "age": 30}])
children = GenericDataFrameColumnParser.get_children(col)
assert {c["name"] for c in children} == {"name", "age"}

def test_empty_dict_returns_no_children(self):
col = pd.Series([{}])
assert GenericDataFrameColumnParser.get_children(col) == []

def test_all_null_returns_no_children(self):
col = pd.Series([None, None])
assert GenericDataFrameColumnParser.get_children(col) == []

def test_string_json_returns_children(self):
col = pd.Series(['{"name": "Bob", "score": 99}'])
children = GenericDataFrameColumnParser.get_children(col)
assert {c["name"] for c in children} == {"name", "score"}

def test_mixed_string_and_dict_values_returns_union_of_children(self):
col = pd.Series(['{"a": 1, "b": 2}', {"b": 2, "c": 3}])
children = GenericDataFrameColumnParser.get_children(col)
assert {c["name"] for c in children} == {"a", "b", "c"}

def test_malformed_string_values_are_skipped(self):
col = pd.Series(["not-json", {"key": "val"}])
children = GenericDataFrameColumnParser.get_children(col)
assert {c["name"] for c in children} == {"key"}

def test_nested_dict_structure_returns_children(self):
nodes = {"model.Project.my_model": {"name": "my_model", "unique_id": "x", "description": "test"}}
col = pd.Series([nodes])
children = GenericDataFrameColumnParser.get_children(col)
assert len(children) == 1
assert children[0]["name"] == "model.Project.my_model"


class TestSingleObjectJsonFileIngestion:
"""End-to-end column parsing for single-object JSON files.

Reads fixture files with json.loads → DataFrame.from_records → _get_columns → Column objects.
A single top-level JSON object is wrapped into a 1-row DataFrame. Every top-level key
becomes a column whose value is the Python object returned by json.loads — typically a
dict, list, or None. All columns must be typed correctly and children extracted without
errors.
Comment thread
mohittilala marked this conversation as resolved.
"""

RESOURCES = os.path.join(os.path.dirname(os.path.dirname(__file__)), "resources", "datalake") # noqa: PTH118, PTH120

def _load_fixture_as_dataframe(self, filename):
path = os.path.join(self.RESOURCES, filename) # noqa: PTH118
with open(path, "rb") as f: # noqa: PTH123
data = json.loads(f.read())
if isinstance(data, dict):
data = [data]
return pd.DataFrame.from_records(data)

def _parsed_columns(self, filename):
df = self._load_fixture_as_dataframe(filename)
return {col.name.root: col for col in GenericDataFrameColumnParser._get_columns(df)}

def test_dict_valued_columns_typed_as_json(self):
cols = self._parsed_columns("dbt_catalog.json")
assert cols["metadata"].dataType == DataType.JSON
assert cols["nodes"].dataType == DataType.JSON
assert cols["sources"].dataType == DataType.JSON

def test_null_column_typed_as_string(self):
cols = self._parsed_columns("dbt_catalog.json")
assert cols["errors"].dataType == DataType.STRING

def test_non_empty_dict_column_has_children(self):
cols = self._parsed_columns("dbt_catalog.json")
assert cols["nodes"].children is not None and len(cols["nodes"].children) > 0

def test_empty_dict_columns_typed_as_json_not_string(self):
cols = self._parsed_columns("dbt_manifest.json")
for name in ("metrics", "groups", "disabled", "group_map", "saved_queries", "semantic_models", "unit_tests"):
assert cols[name].dataType == DataType.JSON, f"column '{name}': expected JSON, got {cols[name].dataType}"

def test_empty_dict_columns_have_no_children(self):
cols = self._parsed_columns("dbt_manifest.json")
for name in ("metrics", "groups", "disabled", "group_map", "saved_queries", "semantic_models", "unit_tests"):
children = cols[name].children
assert not children, f"column '{name}' should have no children"


class TestDbtSingleObjectJsonIngestion:
"""Single-object JSON files (e.g. dbt artifacts) are wrapped into a 1-row DataFrame
where every top-level key becomes a column with a Python dict value. The column parser
must correctly type all columns — including empty-dict columns — without errors."""

@staticmethod
def _make_catalog_df():
return pd.DataFrame(
[
{
"metadata": {"dbt_version": "1.5.0", "generated_at": "2024-01-01"},
"nodes": {"model.Project.tbl": {"name": "tbl", "description": "test"}},
"sources": {},
"errors": None,
}
]
)

@staticmethod
def _make_manifest_df():
return pd.DataFrame(
[
{
"metadata": {"dbt_version": "1.5.0"},
"nodes": {"model.Project.tbl": {"name": "tbl"}},
"sources": {},
"metrics": {},
"groups": {},
"disabled": {},
"group_map": {},
"saved_queries": {},
"semantic_models": {},
"unit_tests": {},
}
]
)

def test_catalog_column_types(self):
df = self._make_catalog_df()
assert GenericDataFrameColumnParser.fetch_col_types(df, "metadata") == DataType.JSON
assert GenericDataFrameColumnParser.fetch_col_types(df, "nodes") == DataType.JSON
assert GenericDataFrameColumnParser.fetch_col_types(df, "sources") == DataType.JSON
assert GenericDataFrameColumnParser.fetch_col_types(df, "errors") == DataType.STRING

def test_manifest_empty_dict_columns_typed_as_json(self):
df = self._make_manifest_df()
for col in ("metrics", "groups", "disabled", "group_map", "saved_queries", "semantic_models", "unit_tests"):
assert GenericDataFrameColumnParser.fetch_col_types(df, col) == DataType.JSON, f"{col} should be JSON"

def test_catalog_nodes_children_extracted_without_error(self):
df = self._make_catalog_df()
nodes_col = df["nodes"].dropna()[:100]
children = GenericDataFrameColumnParser.get_children(nodes_col)
assert len(children) > 0

def test_catalog_sources_empty_dict_returns_no_children(self):
df = self._make_catalog_df()
sources_col = df["sources"].dropna()[:100]
assert GenericDataFrameColumnParser.get_children(sources_col) == []


class TestCSVQuotedHeaderFix(TestCase):
"""Test CSV parsing with quoted header fix for malformed CSV files"""

Expand Down
Loading