Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
191f31f
feat(workflow): add write_status_file on BaseWorkflow + shared succes…
IceS2 Apr 18, 2026
2970a52
feat(cli): forward status_file through execute_workflow
IceS2 Apr 19, 2026
71f98e7
feat(cli): forward status_file param through all 8 run functions
IceS2 Apr 19, 2026
a62a4ef
feat(cli): expose --status-file argparse flag and forward through dis…
IceS2 Apr 19, 2026
8cc4771
chore(e2e-v2): scaffold cli_e2e_v2 package (connector-centric layout)
IceS2 Apr 20, 2026
e545128
feat(e2e-v2): add Status + StepStatus dataclasses
IceS2 Apr 20, 2026
9d9a84f
feat(e2e-v2): add CliExecutionError and SourceBaselineDrift
IceS2 Apr 20, 2026
641adc8
feat(e2e-v2): add Env loader and ServerConfig
IceS2 Apr 20, 2026
708d552
feat(e2e-v2): add WorkflowConfig core + .as_metadata() + .write_tmp()
IceS2 Apr 20, 2026
deb80ff
feat(e2e-v2): add remaining .as_*() variants and .with_filter
IceS2 Apr 20, 2026
9be5b61
feat(e2e-v2): add CliRunner subprocess orchestrator
IceS2 Apr 20, 2026
7b1b833
feat(e2e-v2): add retry_until polling helper
IceS2 Apr 20, 2026
9ed76b6
feat(e2e-v2): add OmClient + TableAssert + ColumnAssert
IceS2 Apr 20, 2026
0742401
feat(e2e-v2): add LineageAssert, ProfileAssert, TestsAssert, ServiceA…
IceS2 Apr 20, 2026
8fc818d
feat(e2e-v2): add Expected* declarative dataclasses + MatchMode
IceS2 Apr 20, 2026
e4e82b5
feat(e2e-v2): add StructuralDiffer + StructuralMismatch
IceS2 Apr 20, 2026
d3fe8c7
feat(e2e-v2): add source baseline protocol, orchestrator, SQL types
IceS2 Apr 20, 2026
c2ee0ab
feat(e2e-v2): add MySqlEnforcer for SQL-family source baselines
IceS2 Apr 20, 2026
357e1f5
refactor(e2e-v2): drop planned operator CLI reference from baseline d…
IceS2 Apr 20, 2026
32e22f9
feat(e2e-v2): add MYSQL_BASELINE + get_policy factory
IceS2 Apr 20, 2026
943bab4
feat(e2e-v2): add stored procedure support across framework
IceS2 Apr 20, 2026
4628226
feat(e2e-v2): expand MYSQL_BASELINE for exhaustive connector coverage
IceS2 Apr 20, 2026
8e74c7e
feat(e2e-v2): extend differ STRICT mode to flag schema-level extras
IceS2 Apr 21, 2026
93a2846
feat(e2e-v2): add MYSQL_EXPECTED catalog factory
IceS2 Apr 21, 2026
74a989d
feat(e2e-v2): add MySQL connector helpers + pytest fixtures
IceS2 Apr 21, 2026
dcc3b22
fix(e2e-v2): convert to relative imports + land top-level conftest
IceS2 Apr 21, 2026
daea036
feat(e2e-v2): add MySQL pilot test file with 11 E2E tests
IceS2 Apr 21, 2026
5d77f0f
feat(e2e-v2): Env.ref() for secret-free rendered YAML
IceS2 Apr 21, 2026
d8b080d
refactor(e2e-v2): class-based Env with ref() / get() terminals
IceS2 Apr 21, 2026
c6506f5
feat(cli-e2e-v2): bucket A — FK, comments, shared ingest fixture
IceS2 Apr 21, 2026
aa13878
refactor(cli-e2e-v2): apply review-pass fixes across framework
IceS2 Apr 21, 2026
c560af3
refactor(cli-e2e-v2): simplification pass — delete YAGNI, dedupe helpers
IceS2 Apr 21, 2026
8afb388
refactor(cli-e2e-v2): introspect via SQLAlchemy Inspector; split SqlB…
IceS2 Apr 21, 2026
7fd5f66
refactor(cli-e2e-v2): baseline via SQLAlchemy Core + shared common ta…
IceS2 Apr 21, 2026
9498f53
refactor(cli-e2e-v2): derive expected tree + EntityAssert base + para…
IceS2 Apr 22, 2026
1ea1ec8
refactor(cli-e2e-v2): clearer debuggability — eventually timeout + st…
IceS2 Apr 22, 2026
a3f6eb8
fix(cli-e2e-v2): separate source-admin credentials from ingest creden…
IceS2 Apr 22, 2026
6745fcb
fix(cli-e2e-v2): gate seed row-count check by table presence
IceS2 Apr 22, 2026
20034cc
feat(cli-e2e-v2): connector onboarding ergonomics + docs
IceS2 Apr 27, 2026
0c94b90
refactor(cli-e2e-v2): debuggability polish — inline failures, retry c…
IceS2 Apr 27, 2026
d689f73
refactor(cli-e2e-v2): typed Diff + DiffKind + uniform differ dispatch…
IceS2 Apr 27, 2026
63f8900
ci(cli-e2e-v2): add py-cli-e2e-tests-v2 workflow with MySQL pilot matrix
IceS2 Apr 27, 2026
c02b44b
chore(cli-e2e-v2): apply ruff lint + format pass post-rebase
IceS2 Apr 27, 2026
9c608bc
refactor(cli-e2e-v2): drop dead introspect() from SourceBaselineEnfor…
IceS2 Apr 28, 2026
5fd72b7
refactor(cli-e2e-v2): centralize OM RootModel list reads via unwrap_r…
IceS2 Apr 28, 2026
3d6de3c
refactor(cli-e2e-v2): drop unimplemented MatchMode.SUBSET
IceS2 Apr 28, 2026
bbd3c46
fix(cli-e2e-v2): pin LineageAssert to direction-typed edges; drop nod…
IceS2 Apr 28, 2026
2f3f104
fix(cli-e2e-v2): cap inline step failures globally in CliExecutionErr…
IceS2 Apr 28, 2026
4dea125
docs(cli-e2e-v2): pin all_types table convention for dialect-specific…
IceS2 Apr 28, 2026
2f29657
refactor(cli-e2e-v2): SqlBaselineEnforcer is ABC; _apply_stored_proce…
IceS2 Apr 28, 2026
b926457
fix(cli-e2e-v2): resolve Edge UUID endpoints via nodes/entity FQN map
IceS2 Apr 29, 2026
1ae1c32
test(cli-e2e-v2): pin credit_score median=680 (textbook middle)
IceS2 Apr 29, 2026
bca81a9
Merge remote-tracking branch 'origin/main' into feat/cli-e2e-v2-slice1
IceS2 Apr 29, 2026
6148095
Merge remote-tracking branch 'origin/main' into feat/cli-e2e-v2-slice1
IceS2 Apr 29, 2026
e3354a7
chore(cli-e2e-v2): satisfy ruff after main merge
IceS2 Apr 29, 2026
f5aeade
Merge remote-tracking branch 'origin/main' into feat/cli-e2e-v2-slice1
IceS2 Apr 30, 2026
38948ce
Merge remote-tracking branch 'origin/main' into feat/cli-e2e-v2-slice1
IceS2 May 6, 2026
0440cee
refactor(cli-e2e-v2): self-contained MySQL via testcontainers
IceS2 May 6, 2026
a8f57c6
fix(workflow): basedpyright cleanup for write_status_file
IceS2 May 6, 2026
0cfce8a
test(cli-e2e-v2): meta-tests proving the framework catches its failur…
IceS2 May 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions .github/workflows/py-cli-e2e-tests-v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright 2026 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# CLI E2E v2 — strangler-fig replacement for py-cli-e2e-tests.yml.
#
# Each connector lives under ingestion/tests/cli_e2e_v2/<connector>/ and
# is a self-contained pytest module (no inheritance). The matrix below
# grows by one entry per connector migration PR; the connector's
# corresponding entry is removed from py-cli-e2e-tests.yml in the same PR.
#
# Triggers: workflow_dispatch only during the stabilization window for
# the MySQL pilot. The schedule cron will be added once the pilot is
# consistently green (see spec §7.1).

name: py-cli-e2e-tests-v2
on:
workflow_dispatch:
inputs:
connectors:
description: "Connectors to run (JSON array)"
required: true
default: '["mysql"]'

permissions:
id-token: write
contents: read

jobs:
py-cli-e2e-tests-v2:
runs-on: ubuntu-latest
timeout-minutes: 60
strategy:
fail-fast: false
matrix:
connector: ${{ fromJSON(inputs.connectors || '["mysql"]') }}
environment: test

steps:
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
with:
tool-cache: false
android: true
dotnet: true
haskell: true
large-packages: false
swap-storage: true
docker-images: false

- name: Checkout
uses: actions/checkout@v4

- name: Setup Openmetadata Test Environment
uses: ./.github/actions/setup-openmetadata-test-environment
with:
python-version: '3.10'

- name: Run CLI E2E v2 tests
id: e2e-v2-test
env:
# MySQL test data lives in a dedicated MySQL container that the
# session-scoped `mysql_container` pytest fixture (testcontainers)
# boots, bootstraps with the OM-doc minimum grants, and tears
# down. Teammates run the same way locally — no env plumbing.
# Only OM-server admin creds (used to mint the ingestion-bot
# JWT) need to be exported here; they come from the bundled
# docker-compose and are not secrets.
OM_ADMIN_EMAIL: admin@open-metadata.org
OM_ADMIN_PASSWORD: admin
run: |
source env/bin/activate
cd ingestion
mkdir -p junit
pytest -v \
--junitxml=junit/test-results-v2-${{ matrix.connector }}.xml \
tests/cli_e2e_v2/${{ matrix.connector }}
shell: bash

- name: Upload tests artifact
if: always()
uses: actions/upload-artifact@v4
with:
name: tests-v2-${{ matrix.connector }}
path: ingestion/junit/test-results-v2-*.xml

- name: Clean Up
if: always()
run: |
cd ./docker/development
docker compose down --remove-orphans
sudo rm -rf ${PWD}/docker-volume
6 changes: 6 additions & 0 deletions ingestion/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ ignore = [
# lands in a later stage tests don't immediately error out.
"tests/**/*.py" = ["S101", "PLR2004", "PLC0415"]
"ingestion/tests/**/*.py" = ["S101", "PLR2004", "PLC0415"]
# v2 CLI E2E framework uses relative imports by design (connector-centric
# layout — connectors live in subdirs and import from `..core.*` / `.connector`).
# `T201` (print) is allowed in the top-level conftest for the session-start
# posture banner. Path listed twice for the dual-cwd pattern above.
"tests/cli_e2e_v2/**/*.py" = ["S101", "PLR2004", "PLC0415", "TID252", "T201"]
"ingestion/tests/cli_e2e_v2/**/*.py" = ["S101", "PLR2004", "PLC0415", "TID252", "T201"]
# Auto-generated from JSON Schema — never edit, never lint.
"src/metadata/generated/**" = ["ALL"]
"ingestion/src/metadata/generated/**" = ["ALL"]
Expand Down
9 changes: 4 additions & 5 deletions ingestion/src/metadata/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@
import traceback
from pathlib import Path

from metadata.cli.common import execute_workflow
from metadata.config.common import load_config_file
from metadata.utils.logger import cli_logger
from metadata.workflow.application import ApplicationWorkflow

logger = cli_logger()


def run_app(config_path: Path) -> None:
def run_app(config_path: Path, status_file: Path | None = None) -> None:
"""
Run the application workflow from a config path
to a JSON or YAML file
:param config_path: Path to load JSON config
"""

config_dict = None
try:
config_dict = load_config_file(config_path)
# no logging for config because apps might have custom secrets
Expand All @@ -40,7 +42,4 @@ def run_app(config_path: Path) -> None:
logger.debug(traceback.format_exc())
sys.exit(1)

workflow.execute()
workflow.stop()
workflow.print_status()
workflow.raise_from_status()
execute_workflow(workflow=workflow, config_dict=config_dict, status_file=status_file)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: run_app lost print_status() call — silent CLI output regression

The old run_app explicitly called workflow.print_status() between stop() and raise_from_status(), giving operators a human-readable summary on stdout. The new unified execute_workflow in common.py does not call print_status(), so the app command now runs silently on success. Other run_* functions (ingest, profile, etc.) may have had their own print_status() calls that were similarly dropped during unification.

Suggested fix:

Add `workflow.print_status()` inside `execute_workflow` (after `workflow.stop()` in the finally block, or after the try/finally before `raise_from_status`), so all workflow types get consistent output:

    try:
        workflow.execute()
    finally:
        workflow.stop()
        if status_file is not None:
            workflow.write_status_file(status_file)
    workflow.print_status()
    if config_dict.get("workflowConfig", {}).get("raiseOnError", True):
        workflow.raise_from_status()

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

4 changes: 2 additions & 2 deletions ingestion/src/metadata/cli/classify.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
logger = cli_logger()


def run_classification(config_path: Path) -> None:
def run_classification(config_path: Path, status_file: Path | None = None) -> None:
"""
Run the sampler workflow from a config path
to a JSON or YAML file
Expand All @@ -48,4 +48,4 @@ def run_classification(config_path: Path) -> None:
WorkflowInitErrorHandler.print_init_error(exc, config_dict, PipelineType.metadata)
sys.exit(1)

execute_workflow(workflow=workflow, config_dict=config_dict)
execute_workflow(workflow=workflow, config_dict=config_dict, status_file=status_file)
17 changes: 13 additions & 4 deletions ingestion/src/metadata/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,23 @@
Handle workflow execution
"""

from pathlib import Path
from typing import Any, Dict # noqa: UP035

from metadata.workflow.base import BaseWorkflow


def execute_workflow(workflow: BaseWorkflow, config_dict: Dict[str, Any]) -> None: # noqa: UP006
"""Execute the workflow and raise if needed"""
workflow.execute()
workflow.stop()
def execute_workflow(
workflow: BaseWorkflow,
config_dict: Dict[str, Any], # noqa: UP006
status_file: Path | None = None,
) -> None:
"""Execute the workflow, write status file if requested, raise on failure if configured."""
try:
workflow.execute()
finally:
workflow.stop()
if status_file is not None:
workflow.write_status_file(status_file)
Comment on lines +30 to +33
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Edge Case: write_status_file in finally can suppress the original exception

In execute_workflow, write_status_file runs inside the finally block. If workflow.execute() raises and write_status_file also raises (e.g., because steps aren't populated after an early crash), the original exception is replaced by the status-file error, making debugging harder. This only triggers when --status-file is explicitly provided.

Suggested fix:

Wrap the write_status_file call in a try/except to log-and-swallow:

    finally:
        workflow.stop()
        if status_file is not None:
            try:
                workflow.write_status_file(status_file)
            except Exception:
                logger.warning("Failed to write status file", exc_info=True)

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

if config_dict.get("workflowConfig", {}).get("raiseOnError", True):
workflow.raise_from_status()
4 changes: 2 additions & 2 deletions ingestion/src/metadata/cli/dataquality.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
logger = cli_logger()


def run_test(config_path: Path) -> None:
def run_test(config_path: Path, status_file: Path | None = None) -> None:
"""
Run the Data Quality Test Suites workflow from a config path
to a JSON or YAML file
Expand All @@ -48,4 +48,4 @@ def run_test(config_path: Path) -> None:
WorkflowInitErrorHandler.print_init_error(exc, workflow_config_dict, PipelineType.TestSuite)
sys.exit(1)

execute_workflow(workflow=workflow, config_dict=workflow_config_dict)
execute_workflow(workflow=workflow, config_dict=workflow_config_dict, status_file=status_file)
4 changes: 2 additions & 2 deletions ingestion/src/metadata/cli/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
logger = cli_logger()


def run_ingest(config_path: Path) -> None:
def run_ingest(config_path: Path, status_file: Path | None = None) -> None:
"""
Run the ingestion workflow from a config path
to a JSON or YAML file
Expand All @@ -46,4 +46,4 @@ def run_ingest(config_path: Path) -> None:
WorkflowInitErrorHandler.print_init_error(exc, config_dict, PipelineType.metadata)
sys.exit(1)

execute_workflow(workflow=workflow, config_dict=config_dict)
execute_workflow(workflow=workflow, config_dict=config_dict, status_file=status_file)
13 changes: 7 additions & 6 deletions ingestion/src/metadata/cli/ingest_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from dotenv import load_dotenv
from pydantic import BaseModel, Field, field_validator

from metadata.cli.common import execute_workflow
from metadata.ingestion.ometa.credentials import URL
from metadata.utils.logger import cli_logger
from metadata.workflow.metadata import MetadataWorkflow
Expand Down Expand Up @@ -291,7 +292,7 @@ def create_dbt_workflow_config(dbt_project_path: Path, om_config: OpenMetadataDB
return config # noqa: RET504


def run_ingest_dbt(dbt_project_path: Path) -> None:
def run_ingest_dbt(dbt_project_path: Path, status_file: Path | None = None) -> None:
"""
Run the dbt artifacts ingestion workflow from a dbt project path

Expand Down Expand Up @@ -321,13 +322,13 @@ def run_ingest_dbt(dbt_project_path: Path) -> None:
logger.info("Creating workflow configuration...")
workflow_config = create_dbt_workflow_config(dbt_project_path, om_config)

# Create and execute the MetadataWorkflow (reusing existing infrastructure)
logger.info("Starting OpenMetadata ingestion workflow...")
workflow = MetadataWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
execute_workflow(
workflow=workflow,
config_dict=workflow_config,
status_file=status_file,
)

logger.info("DBT artifacts ingestion completed successfully")

Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/metadata/cli/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class LineageWorkflow(BaseModel):
parserType: Optional[QueryParserType] = QueryParserType.Auto # noqa: N815, UP045


def run_lineage(config_path: Path) -> None:
def run_lineage(config_path: Path, status_file: Path | None = None) -> None:
"""
Run the ingestion workflow from a config path
to a JSON or YAML file
Expand Down
4 changes: 2 additions & 2 deletions ingestion/src/metadata/cli/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
logger = cli_logger()


def run_profiler(config_path: Path) -> None:
def run_profiler(config_path: Path, status_file: Path | None = None) -> None:
"""
Run the Profiler workflow from a config path
to a JSON or YAML file
Expand All @@ -46,4 +46,4 @@ def run_profiler(config_path: Path) -> None:
WorkflowInitErrorHandler.print_init_error(exc, workflow_config_dict, PipelineType.profiler)
sys.exit(1)

execute_workflow(workflow=workflow, config_dict=workflow_config_dict)
execute_workflow(workflow=workflow, config_dict=workflow_config_dict, status_file=status_file)
4 changes: 2 additions & 2 deletions ingestion/src/metadata/cli/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
logger = cli_logger()


def run_usage(config_path: Path) -> None:
def run_usage(config_path: Path, status_file: Path | None = None) -> None:
"""
Run the usage workflow from a config path
to a JSON or YAML file
Expand All @@ -46,4 +46,4 @@ def run_usage(config_path: Path) -> None:
WorkflowInitErrorHandler.print_init_error(exc, config_dict, PipelineType.usage)
sys.exit(1)

execute_workflow(workflow=workflow, config_dict=config_dict)
execute_workflow(workflow=workflow, config_dict=config_dict, status_file=status_file)
10 changes: 9 additions & 1 deletion ingestion/src/metadata/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ def create_common_config_parser_args(parser: argparse.ArgumentParser):
type=Path,
required=True,
)
parser.add_argument(
"--status-file",
help="path to write structured JSON status output (optional)",
type=Path,
required=False,
default=None,
)


def create_dbt_parser_args(parser: argparse.ArgumentParser):
Expand Down Expand Up @@ -220,6 +227,7 @@ def metadata(args: Optional[List[str]] = None): # noqa: UP006, UP045
metadata_workflow = contains_args.get("command")
config_file: Optional[Path] = contains_args.get("config") # noqa: UP045
dbt_project_path: Optional[Path] = contains_args.get("dbt_project_path") # noqa: UP045
status_file: Optional[Path] = contains_args.get("status_file") # noqa: UP045

path = None
if config_file:
Expand All @@ -234,7 +242,7 @@ def metadata(args: Optional[List[str]] = None): # noqa: UP006, UP045
set_loggers_level(log_level)

if path and metadata_workflow and metadata_workflow in RUN_PATH_METHODS:
RUN_PATH_METHODS[metadata_workflow](path)
RUN_PATH_METHODS[metadata_workflow](path, status_file)

if metadata_workflow == MetadataCommands.SCAFFOLD_CONNECTOR.value:
has_name = contains_args.get("name")
Expand Down
43 changes: 39 additions & 4 deletions ingestion/src/metadata/workflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
Base workflow definition.
"""

import json
import traceback
import uuid
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
from statistics import mean
from typing import Any, Dict, List, Optional, TypeVar, Union # noqa: UP035

Expand Down Expand Up @@ -212,13 +214,21 @@ def get_failures(self) -> List[StackTraceError]: # noqa: UP006
def workflow_steps(self) -> List[Step]: # noqa: UP006
"""Steps to report status from"""

def _step_meets_success_threshold(self, step: Step) -> bool:
"""True iff the step has no failures, or its success ratio meets the workflow's threshold.

Shared by `raise_from_status_internal` (which raises on failure) and
`write_status_file` (which reports the CLI's observable success/failure state).
"""
status = step.get_status()
if not status.failures:
return True
return status.calculate_success() >= self.workflow_config.successThreshold # pyright: ignore[reportOperatorIssue]

def raise_from_status_internal(self, raise_warnings=False) -> None:
"""Based on the internal workflow status, raise a WorkflowExecutionError"""
for step in self.workflow_steps():
if (
step.get_status().failures
and step.get_status().calculate_success() < self.workflow_config.successThreshold
):
if not self._step_meets_success_threshold(step):
raise WorkflowExecutionError(f"{step.name} reported errors: {Summary.from_step(step)}")

if raise_warnings and step.status.warnings:
Expand Down Expand Up @@ -400,3 +410,28 @@ def print_status(self):
start_time,
self._is_debug_enabled(),
)

def write_status_file(self, path: Path) -> None:
"""Serialize per-step status to JSON at the given path.

The `success` field mirrors the CLI's exit-code semantic: True iff every
step meets its success threshold (the same condition under which
`raise_from_status_internal` does NOT raise).

Shape:
{
"pipeline_type": str,
"ingestion_pipeline_fqn": str | None,
"success": bool,
"steps": [<StepSummary dicts>]
}
"""
ingestion_status = self.build_ingestion_status()
success = all(self._step_meets_success_threshold(step) for step in self.workflow_steps())
payload = {
"pipeline_type": self.config.source.type, # pyright: ignore[reportAttributeAccessIssue]
"ingestion_pipeline_fqn": self.config.ingestionPipelineFQN, # pyright: ignore[reportAttributeAccessIssue]
Comment on lines +432 to +433
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Edge Case: write_status_file assumes config.source.type — crashes for ApplicationWorkflow

write_status_file accesses self.config.source.type (line 432) and self.config.ingestionPipelineFQN (line 433). For ApplicationWorkflow, config is OpenMetadataApplicationConfig which uses a different schema and may not expose .source.type. If a user passes --status-file to the app subcommand, this will raise AttributeError inside the finally block, masking any original workflow exception.

Suggested fix:

Guard the attribute access with hasattr/getattr, or use a polymorphic method:

    payload = {
        "pipeline_type": getattr(getattr(self.config, "source", None), "type", None),
        "ingestion_pipeline_fqn": getattr(self.config, "ingestionPipelineFQN", None),
        "success": success,
        "steps": ingestion_status.model_dump(),
    }

Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion

"success": success,
"steps": ingestion_status.model_dump(),
}
path.write_text(json.dumps(payload, indent=2, default=str))
Loading
Loading