diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index f7da5cdae60f..6f169549f7a1 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -32,6 +32,34 @@ logger = utils_logger() +# Explicit type precedence so mixed-type object columns are not mis-typed by lexicographic max(). +# dict > list > datetime > numeric > str, matching _data_formats priority. +_TYPE_PRECEDENCE = ( + "dict", + "list", + "datetime64[ns]", + "datetime", + "timedelta[ns]", + "float64", + "float32", + "float", + "int64", + "int32", + "int", + "bool", + "str", + "bytes", +) + + +def _resolve_col_type(type_list: List[str]) -> str: # noqa: UP006 + """Pick the dominant type from type_list using _TYPE_PRECEDENCE instead of lexicographic max().""" + type_set = set(type_list) + for t in _TYPE_PRECEDENCE: + if t in type_set: + return t + return type_list[0] if type_list else "str" + class _ArrayOfStruct: """Marker for a JSON value observed as a list of dicts. Carries the merged struct shape @@ -334,14 +362,20 @@ def fetch_col_types(cls, data_frame, column_name): """ data_type = None # default to string try: - if data_frame[column_name].dtypes.name == "object" and any(data_frame[column_name].dropna().values): + col_series = data_frame[column_name] + col_non_null = col_series.dropna() + if col_series.dtypes.name == "object" and len(col_non_null) > 0: try: - # Safely evaluate the input string - df_row_val_list = data_frame[column_name].dropna().values[:1000] + df_row_val_list = col_non_null.values[:1000] parsed_object_datatype_list = [] for df_row_val in df_row_val_list: try: - parsed_object_datatype_list.append(type(ast.literal_eval(str(df_row_val))).__name__.lower()) + if isinstance(df_row_val, (dict, list)): + parsed_object_datatype_list.append(type(df_row_val).__name__.lower()) + else: + parsed_object_datatype_list.append( + type(ast.literal_eval(str(df_row_val))).__name__.lower() + ) except (ValueError, SyntaxError): # we try to parse the value as a datetime, if it fails, we fallback to string # as literal_eval will fail for string @@ -354,11 +388,11 @@ def fetch_col_types(cls, data_frame, column_name): if not str(df_row_val).isnumeric(): # check if the row value is time try: - datetime.strptime(df_row_val, "%H:%M:%S").time() + datetime.strptime(str(df_row_val), "%H:%M:%S").time() dtype_ = "timedelta[ns]" except (ValueError, TypeError): # check if the row value is date / time / datetime - type(parse(df_row_val)).__name__.lower() + type(parse(str(df_row_val))).__name__.lower() dtype_ = "datetime64[ns]" parsed_object_datatype_list.append(dtype_) except (ParserError, TypeError): @@ -369,8 +403,7 @@ def fetch_col_types(cls, data_frame, column_name): ) parsed_object_datatype_list.append("str") - data_type = max(parsed_object_datatype_list) - # Determine the data type of the parsed object + data_type = _resolve_col_type(parsed_object_datatype_list) except (ValueError, SyntaxError) as exc: # Handle any exceptions that may occur @@ -378,10 +411,10 @@ def fetch_col_types(cls, data_frame, column_name): f"ValueError/SyntaxError while parsing column '{column_name}' datatype: {exc}. " f"Falling back to string." ) - data_type = "string" + data_type = "str" data_type = cls._data_formats.get( - data_type or data_frame[column_name].dtypes.name, + data_type or col_series.dtypes.name, ) if not data_type: logger.debug(f"unknown data type {data_frame[column_name].dtypes.name}. resolving to string.") @@ -459,16 +492,33 @@ def get_children(cls, json_column) -> List[Dict]: # noqa: UP006 from pandas import Series # pylint: disable=import-outside-toplevel # noqa: PLC0415 json_column = cast(Series, json_column) # noqa: TC006 - try: - json_column = json_column.apply(json.loads) - except TypeError as exc: - # if values are not strings, we will assume they are already json objects - # based on the read class logic - logger.debug( - f"TypeError while parsing JSON column children: {exc}. Assuming values are already JSON objects." - ) - json_structure = cls.unique_json_structure(json_column.values.tolist()) + dict_values = [] + for value in json_column.dropna().values: + if isinstance(value, dict): + dict_values.append(value) + elif isinstance(value, str): + try: + parsed = json.loads(value) + if isinstance(parsed, dict): + dict_values.append(parsed) + else: + logger.debug( + "Skipping non-object JSON value while extracting column children: " + f"parsed type is {type(parsed).__name__}" + ) + except (TypeError, json.JSONDecodeError) as exc: + logger.debug(f"Skipping unparseable string value while extracting column children: {exc}") + else: + logger.debug( + "Skipping non-string, non-dict value while extracting column children: " + f"type is {type(value).__name__}" + ) + + if not dict_values: + return [] + + json_structure = cls.unique_json_structure(dict_values) return cls.construct_json_column_children(json_structure) @classmethod diff --git a/ingestion/tests/unit/resources/datalake/dbt_catalog.json b/ingestion/tests/unit/resources/datalake/dbt_catalog.json new file mode 100644 index 000000000000..4083e26a3034 --- /dev/null +++ b/ingestion/tests/unit/resources/datalake/dbt_catalog.json @@ -0,0 +1,37 @@ +{ + "metadata": { + "dbt_schema_version": "https://schemas.getdbt.com/dbt/catalog/v1.json", + "dbt_version": "1.5.0", + "generated_at": "2024-01-01T00:00:00.000000Z", + "invocation_id": "abc-123", + "env": {} + }, + "nodes": { + "model.my_project.customers": { + "metadata": {"type": "VIEW", "schema": "public", "name": "customers"}, + "columns": { + "customer_id": {"type": "integer", "index": 1, "name": "customer_id"}, + "name": {"type": "text", "index": 2, "name": "name"} + }, + "stats": {}, + "unique_id": "model.my_project.customers" + }, + "model.my_project.orders": { + "metadata": {"type": "VIEW", "schema": "public", "name": "orders"}, + "columns": { + "order_id": {"type": "integer", "index": 1, "name": "order_id"} + }, + "stats": {}, + "unique_id": "model.my_project.orders" + } + }, + "sources": { + "source.my_project.raw.customers": { + "metadata": {"type": "TABLE", "schema": "raw", "name": "customers"}, + "columns": {}, + "stats": {}, + "unique_id": "source.my_project.raw.customers" + } + }, + "errors": null +} diff --git a/ingestion/tests/unit/resources/datalake/dbt_manifest.json b/ingestion/tests/unit/resources/datalake/dbt_manifest.json new file mode 100644 index 000000000000..566e326cd8b6 --- /dev/null +++ b/ingestion/tests/unit/resources/datalake/dbt_manifest.json @@ -0,0 +1,42 @@ +{ + "metadata": { + "dbt_schema_version": "https://schemas.getdbt.com/dbt/manifest/v11/manifest.json", + "dbt_version": "1.5.0", + "generated_at": "2024-01-01T00:00:00.000000Z", + "invocation_id": "abc-123", + "env": {}, + "project_name": "my_project", + "project_id": "xyz", + "adapter_type": "postgres", + "credential_id": null, + "profile_name": "my_profile" + }, + "nodes": { + "model.my_project.customers": { + "name": "customers", + "description": "Customer records", + "unique_id": "model.my_project.customers", + "fqn": ["my_project", "customers"] + } + }, + "sources": { + "source.my_project.raw.customers": { + "name": "customers", + "description": "Raw customer data", + "unique_id": "source.my_project.raw.customers" + } + }, + "macros": {}, + "docs": {}, + "exposures": {}, + "metrics": {}, + "groups": {}, + "selectors": {}, + "disabled": {}, + "parent_map": {}, + "child_map": {}, + "group_map": {}, + "saved_queries": {}, + "semantic_models": {}, + "unit_tests": {} +} diff --git a/ingestion/tests/unit/utils/test_datalake.py b/ingestion/tests/unit/utils/test_datalake.py index 334226ff3009..9c9b7cf62f38 100644 --- a/ingestion/tests/unit/utils/test_datalake.py +++ b/ingestion/tests/unit/utils/test_datalake.py @@ -235,7 +235,7 @@ def test_create_column_object(self): "children": formatted_column, } column_obj = Column(**column) - assert len(column_obj.children) == 3 + assert column_obj.children is not None and len(column_obj.children) == 3 class TestParquetDataFrameColumnParser(TestCase): @@ -811,6 +811,237 @@ def test_fallback_to_json_schema_parser(self): self.assertIsNotNone(columns) +class TestFetchColTypesWithParsedObjects: + """fetch_col_types must correctly type object-dtype columns whose values are already + parsed Python dicts or lists, including falsy containers ({}, []).""" + + def test_empty_dict_typed_as_json(self): + df = pd.DataFrame({"col": [{}]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.JSON + + def test_empty_list_typed_as_array(self): + df = pd.DataFrame({"col": [[]]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.ARRAY + + def test_multiple_empty_dicts_typed_as_json(self): + df = pd.DataFrame({"col": [{}, {}, {}]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.JSON + + def test_dict_with_data_typed_as_json(self): + df = pd.DataFrame({"col": [{"k": "v"}]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.JSON + + def test_list_with_data_typed_as_array(self): + df = pd.DataFrame({"col": [[1, 2, 3]]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.ARRAY + + def test_large_already_parsed_dict_typed_as_json(self): + large = {str(i): i for i in range(500)} + df = pd.DataFrame({"col": [large]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.JSON + + def test_null_column_typed_as_string(self): + df = pd.DataFrame({"col": [None]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.STRING + + def test_string_column_typed_as_string(self): + df = pd.DataFrame({"col": ["hello"]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.STRING + + def test_int_column_typed_as_int(self): + df = pd.DataFrame({"col": [42]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.INT + + +class TestFetchColTypesMixedTypes: + """fetch_col_types must resolve the dominant type via explicit precedence, not + lexicographic max(). The old max() would return 'str' whenever a string value appeared + in the column because 'str' > 'dict' and 'str' > 'list' lexicographically.""" + + def test_dict_and_string_mix_typed_as_json(self): + # Previously: max(["dict", "str"]) == "str" → STRING (wrong) + # Now: precedence picks "dict" → JSON (correct) + df = pd.DataFrame({"col": [{"a": 1}, "fallback_string"]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.JSON + + def test_list_and_string_mix_typed_as_array(self): + # Previously: max(["list", "str"]) == "str" → STRING (wrong) + # Now: precedence picks "list" → ARRAY (correct) + df = pd.DataFrame({"col": [[1, 2], "fallback_string"]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.ARRAY + + def test_int_and_float_mix_typed_as_float(self): + # float64 beats int64 in precedence — a column with mixed numeric types resolves to FLOAT + df = pd.DataFrame({"col": ["42", "3.14"]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.FLOAT + + def test_pure_string_column_typed_as_string(self): + # Control: no structured types present → still STRING + df = pd.DataFrame({"col": ["hello", "world"]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.STRING + + def test_pure_dict_column_typed_as_json(self): + # Control: all dicts → JSON with no ambiguity + df = pd.DataFrame({"col": [{"a": 1}, {"b": 2}]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.JSON + + def test_dict_beats_list_in_mixed_column(self): + # dict > list in precedence + df = pd.DataFrame({"col": [{"a": 1}, [1, 2]]}) + assert GenericDataFrameColumnParser.fetch_col_types(df, "col") == DataType.JSON + + +class TestGetChildrenWithParsedDicts: + """get_children must correctly extract children regardless of whether the Series + values are already-parsed Python dicts, JSON strings, or a mix of both.""" + + def test_already_parsed_dict_returns_children(self): + col = pd.Series([{"name": "Alice", "age": 30}]) + children = GenericDataFrameColumnParser.get_children(col) + assert {c["name"] for c in children} == {"name", "age"} + + def test_empty_dict_returns_no_children(self): + col = pd.Series([{}]) + assert GenericDataFrameColumnParser.get_children(col) == [] + + def test_all_null_returns_no_children(self): + col = pd.Series([None, None]) + assert GenericDataFrameColumnParser.get_children(col) == [] + + def test_string_json_returns_children(self): + col = pd.Series(['{"name": "Bob", "score": 99}']) + children = GenericDataFrameColumnParser.get_children(col) + assert {c["name"] for c in children} == {"name", "score"} + + def test_mixed_string_and_dict_values_returns_union_of_children(self): + col = pd.Series(['{"a": 1, "b": 2}', {"b": 2, "c": 3}]) + children = GenericDataFrameColumnParser.get_children(col) + assert {c["name"] for c in children} == {"a", "b", "c"} + + def test_malformed_string_values_are_skipped(self): + col = pd.Series(["not-json", {"key": "val"}]) + children = GenericDataFrameColumnParser.get_children(col) + assert {c["name"] for c in children} == {"key"} + + def test_nested_dict_structure_returns_children(self): + nodes = {"model.Project.my_model": {"name": "my_model", "unique_id": "x", "description": "test"}} + col = pd.Series([nodes]) + children = GenericDataFrameColumnParser.get_children(col) + assert len(children) == 1 + assert children[0]["name"] == "model.Project.my_model" + + +class TestSingleObjectJsonFileIngestion: + """End-to-end column parsing for single-object JSON files. + + Reads fixture files with json.loads → DataFrame.from_records → _get_columns → Column objects. + A single top-level JSON object is wrapped into a 1-row DataFrame. Every top-level key + becomes a column whose value is the Python object returned by json.loads — typically a + dict, list, or None. All columns must be typed correctly and children extracted without + errors. + """ + + RESOURCES = os.path.join(os.path.dirname(os.path.dirname(__file__)), "resources", "datalake") # noqa: PTH118, PTH120 + + def _load_fixture_as_dataframe(self, filename): + path = os.path.join(self.RESOURCES, filename) # noqa: PTH118 + with open(path, "rb") as f: # noqa: PTH123 + data = json.loads(f.read()) + if isinstance(data, dict): + data = [data] + return pd.DataFrame.from_records(data) + + def _parsed_columns(self, filename): + df = self._load_fixture_as_dataframe(filename) + return {col.name.root: col for col in GenericDataFrameColumnParser._get_columns(df)} + + def test_dict_valued_columns_typed_as_json(self): + cols = self._parsed_columns("dbt_catalog.json") + assert cols["metadata"].dataType == DataType.JSON + assert cols["nodes"].dataType == DataType.JSON + assert cols["sources"].dataType == DataType.JSON + + def test_null_column_typed_as_string(self): + cols = self._parsed_columns("dbt_catalog.json") + assert cols["errors"].dataType == DataType.STRING + + def test_non_empty_dict_column_has_children(self): + cols = self._parsed_columns("dbt_catalog.json") + assert cols["nodes"].children is not None and len(cols["nodes"].children) > 0 + + def test_empty_dict_columns_typed_as_json_not_string(self): + cols = self._parsed_columns("dbt_manifest.json") + for name in ("metrics", "groups", "disabled", "group_map", "saved_queries", "semantic_models", "unit_tests"): + assert cols[name].dataType == DataType.JSON, f"column '{name}': expected JSON, got {cols[name].dataType}" + + def test_empty_dict_columns_have_no_children(self): + cols = self._parsed_columns("dbt_manifest.json") + for name in ("metrics", "groups", "disabled", "group_map", "saved_queries", "semantic_models", "unit_tests"): + children = cols[name].children + assert not children, f"column '{name}' should have no children" + + +class TestDbtSingleObjectJsonIngestion: + """Single-object JSON files (e.g. dbt artifacts) are wrapped into a 1-row DataFrame + where every top-level key becomes a column with a Python dict value. The column parser + must correctly type all columns — including empty-dict columns — without errors.""" + + @staticmethod + def _make_catalog_df(): + return pd.DataFrame( + [ + { + "metadata": {"dbt_version": "1.5.0", "generated_at": "2024-01-01"}, + "nodes": {"model.Project.tbl": {"name": "tbl", "description": "test"}}, + "sources": {}, + "errors": None, + } + ] + ) + + @staticmethod + def _make_manifest_df(): + return pd.DataFrame( + [ + { + "metadata": {"dbt_version": "1.5.0"}, + "nodes": {"model.Project.tbl": {"name": "tbl"}}, + "sources": {}, + "metrics": {}, + "groups": {}, + "disabled": {}, + "group_map": {}, + "saved_queries": {}, + "semantic_models": {}, + "unit_tests": {}, + } + ] + ) + + def test_catalog_column_types(self): + df = self._make_catalog_df() + assert GenericDataFrameColumnParser.fetch_col_types(df, "metadata") == DataType.JSON + assert GenericDataFrameColumnParser.fetch_col_types(df, "nodes") == DataType.JSON + assert GenericDataFrameColumnParser.fetch_col_types(df, "sources") == DataType.JSON + assert GenericDataFrameColumnParser.fetch_col_types(df, "errors") == DataType.STRING + + def test_manifest_empty_dict_columns_typed_as_json(self): + df = self._make_manifest_df() + for col in ("metrics", "groups", "disabled", "group_map", "saved_queries", "semantic_models", "unit_tests"): + assert GenericDataFrameColumnParser.fetch_col_types(df, col) == DataType.JSON, f"{col} should be JSON" + + def test_catalog_nodes_children_extracted_without_error(self): + df = self._make_catalog_df() + nodes_col = df["nodes"].dropna()[:100] + children = GenericDataFrameColumnParser.get_children(nodes_col) + assert len(children) > 0 + + def test_catalog_sources_empty_dict_returns_no_children(self): + df = self._make_catalog_df() + sources_col = df["sources"].dropna()[:100] + assert GenericDataFrameColumnParser.get_children(sources_col) == [] + + class TestCSVQuotedHeaderFix(TestCase): """Test CSV parsing with quoted header fix for malformed CSV files"""