feat(cli-e2e-v2): Slice 1 — framework + MySQL pilot connector#27949
feat(cli-e2e-v2): Slice 1 — framework + MySQL pilot connector#27949
Conversation
…s helper
Serializes each workflow step's status as JSON to the given path. The
success field mirrors the CLI's exit-code semantic: True iff every step
meets its failure threshold (the same condition that prevents
raise_from_status_internal from raising).
The per-step threshold check is extracted into _step_meets_success_threshold
and shared by both raise_from_status_internal and write_status_file, so
error raising and status-file reporting can never diverge.
Payload:
{
"pipeline_type": str,
"ingestion_pipeline_fqn": str | None,
"success": bool,
"steps": [<StepSummary dicts>]
}
Also adds pytest tests using a new OkSink/OkWorkflow fixture:
- success path (no failures),
- sink failures below threshold (SimpleWorkflow),
- source failures (BrokenWorkflow),
- ingestion_pipeline_fqn propagation via config.model_copy.
Purely additive - no callers yet; the CLI execution layer wires this
up in a following commit.
Adds optional status_file kwarg to execute_workflow. When provided, calls workflow.write_status_file(path) after stop() so the status JSON reflects the final per-step state. The write happens inside a try/finally around execute() so partial status is written on crash — more useful than no status for post-mortem. raise_from_status stays outside the finally so original exceptions propagate unchanged. Backward compatible: existing callers pass no status_file and behavior is unchanged. Run functions in cli/*.py will forward the kwarg in a following commit, and cmd.py will expose the --status-file flag after that.
All config-taking CLI commands (ingest, profile, test, lineage, usage, ingest-dbt, app, classify) now accept the optional status_file path and forward it into execute_workflow. Purely additive — callers without the kwarg continue to work unchanged. cmd.py will expose the --status-file argparse flag and thread it into these calls in the next commit.
…patch Final CLI prep commit. --status-file is now a visible option on every config-taking subcommand (ingest, profile, test, lineage, usage, ingest-dbt, app, classify). metadata() extracts the path from parsed args and passes it to RUN_PATH_METHODS[<cmd>](path, status_file); downstream run functions (updated in the previous commit) forward it into execute_workflow, which calls BaseWorkflow.write_status_file when a path is supplied. Backward compatible: omitting the flag preserves prior behavior end-to-end. Optional path value defaults to None at every layer.
Two top-level groups: - core/ — framework shared by all connectors (config, runner, fluent, expected, source). Concrete modules land in following commits. - <connector>/ — everything for one connector (expected catalog, baseline data + enforcer, fixtures, test file). mysql/ is the pilot; future connectors add their own sibling folder. Empty __init__.py markers only. Connector-centric grouping keeps every file touched when adding/modifying a connector in one folder, so PR reviews and template cloning stay simple.
Typed contract between CliRunner (added in a later commit) and tests. Frozen dataclasses mirroring the JSON shape written by BaseWorkflow.write_status_file. `warnings`/`errors`/`filtered` are int counts (matching the live payload observed in smoke testing); `failures` is the optional detail list. Exposes `.all_failures` for flat assertions and `.step(name)` for targeted per-step lookups.
CliExecutionError raised by CliRunner (future Task 11) on non-zero subprocess exit; carries exit_code + stderr + config_path + command so pytest's default failure rendering surfaces everything needed for post-mortem. SourceBaselineDrift raised by ensure_baseline (future Task 17) when source state drifts in check_only mode --- cloud sources default to check_only to avoid mutating shared state.
Env.required raises EnvLoadError at fixture time with a clear message when a required E2E_* secret is missing — replaces v1's $E2E_* shell expansion in YAML that produced opaque Pydantic errors when vars were absent. ServerConfig owns the shared sinkConfig + openMetadataServerConfig applied to every test's rendered YAML, reading OM_SERVER_URL and OM_JWT_TOKEN from env with sensible local-Docker defaults.
Immutable dict-backed builder for the YAML config passed to `metadata`. WorkflowConfig.build starts in 'metadata' pipeline mode with the shared sink+server config; .as_metadata(**flags) produces a new instance with markDeletedTables / includeTables / includeViews / includeStoredProcedures set on sourceConfig.config. .write_tmp serializes to a per-invocation numbered YAML file under pytest's tmp_path. Pipeline-to-CLI-subcommand mapping lives here so CliRunner can dispatch from cfg.cli_subcommand. The 'lineage' pipeline_type maps to the 'ingest' subcommand: metadata lineage is a raw-SQL-parse tool with a different config schema entirely; connector-based DatabaseLineage workflows run through metadata ingest, differentiated only by sourceConfig.config.type and source.type. Remaining overlays (as_profiler/as_lineage/as_usage/as_test/with_filter/ with_overrides) land in a following commit.
Completes WorkflowConfig's overlay API. .as_profiler uses real profiler schema fields (profileSample, computeTableMetrics, computeColumnMetrics) not auto-classification fields. .as_lineage and .as_usage both carry resultLimit. .as_auto_classification is its own switcher with storeSampleData/enableAutoClassification/sampleDataCount/includeViews and maps to cli_subcommand "classify". .with_filter takes separate include/exclude lists at database/schema/table levels and merges (appends) across multiple calls — supports OM's simultaneous include-AND-exclude pattern. _PIPELINE_CLI_SUBCOMMAND and _PIPELINE_SOURCE_CONFIG_TYPE updated with "auto_classification" → "classify" / "AutoClassification".
Per-invocation numbered artifacts (cfg_<pipeline>_<n>.yaml + status_<pipeline>_<n>.json) so multiple runs within a single test keep distinct files. Counter keyed by pipeline_type so same-pipeline repeats increment while cross-pipeline calls both start at 0. Raises CliExecutionError with full stderr + config_path + command on non-zero exit. Also raises (defensive) if the CLI exits 0 but fails to write a status file — catches future regressions in BaseWorkflow.write_status_file.
Deadline-based retry on AssertionError — returns the check's value as soon as it succeeds, raises the final failure once timeout elapses. Other exception types propagate immediately (no retry on programming bugs). Logging contract: first attempt at DEBUG, final failure at ERROR with attempt count + elapsed seconds, middle attempts silent. Replaces v1's time.sleep-in-a-while-loop polling patterns. Consumed by fluent assertion namespaces (.eventually()) added in following commits.
OmClient wraps metadata.ingestion.ometa.OpenMetadata (no new HTTP code per Decision #21); exposes .table(fqn), .service(name), and .raw escape hatch. TableAssert has synchronous exists/get, single-property has_tag/has_owner, drill-down .column(name), entity-level .eventually(timeout) one-shot wrapper, and three namespace properties (.lineage/.profile/.tests) that lazy-import their Task 14 implementations. ColumnAssert has has_tag/has_type/has_description_containing.
…ssert Completes the fluent surface. Each namespace owns its own .eventually(timeout) one-shot wrapper. - LineageAssert: has_upstream / has_downstream / has_column_lineage, using OM's get_lineage_by_name and tolerating either upstreamEdges/downstreamEdges or a nodes-only shape in the response. - ProfileAssert: row_count() returns NumericAssert (at_least/equals/between). .eventually() polls until profile.rowCount is non-None then hands off. - TestsAssert/TestCaseAssert: case(name).passes() / .fails() check the latest testCaseResult's testCaseStatus. - ServiceAssert: exists, get (raw DatabaseService), has_entity_count (tables or schemas) via list_all_entities filtered by service. All retry via the Task 12 retry_until helper — first attempt at DEBUG, final at ERROR with elapsed time.
Per Decision #4, these are custom wrappers reusing OM's Pydantic value types (DataType, Constraint, DatabaseServiceType) — automatic enum drift-safety while keeping spec modules short (no forced population of the 30+ fields on the real Table/Column entities). Expected tree: ExpectedService → ExpectedDatabase → ExpectedSchema → ExpectedTable → ExpectedColumn. Unset / None fields mean "don't assert" so specs stay terse. description uses str | None with substring semantics (Decision #16). ExpectedTable.ordered defaults False — columns matched by name, tolerant to reordering. MatchMode enum (STRICT / SUPERSET / SUBSET) drives the differ's tolerance for "extras" in the actual OM state. Consumed by StructuralDiffer in the next commit.
assert_service_matches(expected, om, mode=SUPERSET) walks an Expected* tree, fetches actual OM state per-level via OmClient.raw, collects path-qualified Diff entries, and raises StructuralMismatch with a multi-line message listing every discrepancy. Level-by-level walk with get_by_name at each node — missing parent short-circuits without cascading "column X missing" noise. Column matching is by name (table-level .ordered not yet consumed; landing point for future by-position mode). STRICT mode flags extra columns beyond the expected set; SUPERSET (default) tolerates extras, treating actual as a superset of expected. Diffs render via bracket-path notation (service[foo].database[bar].table[baz] .column[qux].dataType) so pytest failure output is greppable.
SourceBaselineEnforcer Protocol defines the three-phase lifecycle (introspect -> compare -> apply) shared by all source families. EnforcementPolicy binds an enforcer to a mode (apply for local Docker, check_only for shared cloud). Trust mode (policy=None + expected=None) short-circuits with a WARNING so a connector can migrate to v2 before its baseline is modeled. ensure_baseline(policy, expected, connector_name) is the uniform entry point. check_only drift raises SourceBaselineDrift with the operator CLI command for manual recovery; apply mode delegates to enforcer.apply. SQL family types (SqlSourceBaseline + BaselineTable + BaselineColumn + Seed + BaselineView) are the only concrete baseline family shipped with MVP. Seed uses a two-field shape: sql (idempotent INSERT) + expected_row_count (COUNT-based drift gate).
First concrete SourceBaselineEnforcer implementation. Via SQLAlchemy: - introspect() queries INFORMATION_SCHEMA for schemas, tables (BASE TABLE), columns, and views in one pass per category. Uses bindparam(expanding=True) for the IN clauses so multi-schema baselines work correctly. - compare() walks the SqlSourceBaseline, diffing each schema / table / column / seed row-count / view against the introspected state. Missing-parent entries short-circuit that subtree to avoid cascading noise. - apply() runs CREATE SCHEMA IF NOT EXISTS, CREATE TABLE IF NOT EXISTS with column + PK definitions, executes idempotent Seed.sql when row count drifts, and runs each view's definition_sql verbatim (baseline supplies CREATE OR REPLACE VIEW ...). Type normalization is deliberately minimal (strip UNSIGNED + whitespace) — baselines should declare types that match MySQL's INFORMATION_SCHEMA output directly. Factory classmethod from_url(url, baseline) for common case of building from a connection string.
…rift error The Task 19 standalone operator CLI (python -m ...source.apply) was removed from the plan — MVP ships only MySQL which runs mode="apply" locally, so check_only never triggers and the CLI is dead code. When the first cloud connector lands, the recovery mechanism (likely a pytest --source-mode=apply flag reusing existing fixtures) will be designed against real requirements instead of speculative ones. SourceBaselineDrift error message now points at a generic out-of-band recovery instruction rather than a nonexistent module path.
Declarative MySQL source state: one schema (e2e) with two tables (customers + transactions) and one view (customer_txn_summary), each sized to exercise every major connector feature while staying as small as possible. customers (10 rows): INT PK, VARCHAR, email/phone PII columns for auto-classification, INT numeric (age, credit_score) and TEXT nullable (bio) for profiler coverage, DATE and TINYINT, status VARCHAR for DQ enum assertions. transactions (10 rows): INT FK to customers.id for join-level table lineage via INFORMATION_SCHEMA.KEY_COLUMN_USAGE, DECIMAL(10,2) for profiler mean/stddev, DATETIME for date-range profiling, status for DQ assertions. customer_txn_summary (view): LEFT JOIN of both tables for view-to-table lineage testing. Seed SQL uses ON DUPLICATE KEY UPDATE for full idempotency and explicit timestamp literals (no NOW()) for determinism — profiler min/max assertions and DQ value checks can rely on fixed values. get_policy() uses functools.lru_cache(maxsize=1) — no mutable module-level state. Importing the module never reads env vars (test discovery and static analysis work with no E2E_MYSQL_* set). Env vars are resolved only on the first call; EnvLoadError surfaces immediately with the missing key name.
Extends the framework to model MySQL stored procedures (and future connectors' equivalents) through every layer: - core/source/sql.py: BaselineStoredProcedure dataclass + stored_procedures field on SqlSourceBaseline (backward compat via default_factory). - mysql/enforcer.py: introspect reads INFORMATION_SCHEMA.ROUTINES where ROUTINE_TYPE='PROCEDURE'; compare reports missing procedures as Drifts; apply issues DROP PROCEDURE IF EXISTS + CREATE (no CREATE OR REPLACE in MySQL). DELIMITER is CLI-only — PyMySQL sends the whole CREATE PROCEDURE as one string and MySQL parses the body server-side. - core/expected/types.py: ExpectedStoredProcedure + stored_procedures field on ExpectedSchema. - core/expected/differ.py: _diff_stored_procedure walks procedures in each schema, checking presence + substring-match on description. - core/fluent/stored_procedure_assert.py (new): StoredProcedureAssert with exists / get / has_description_containing / has_code_containing. - core/fluent/om_client.py: .stored_procedure(fqn) entry point, lazy-import pattern matching .service(). All additive -- no existing method bodies changed, no breaking changes.
Three tables + one view + one stored procedure, breadth-first across every MySQL connector feature deterministically seedable: - customers (10 rows): adds full PII-regex coverage (email, phone, ssn, address, city, country, zipcode, first_name, last_name, full_name, date_of_birth) plus DQ enum (status), profiler numeric (age, credit_score), profiler date (joined_date, date_of_birth), and nullable columns (bio, phone, ssn, address). Auto-classification pipeline lights up most Presidio recognizers via column-name regex. - transactions (10 rows): adds CHAR(3) for currency (distinct from VARCHAR in type coverage), CHAR(12) reference_number, ip_address VARCHAR for IP_ADDRESS PII recognizer, MEDIUMTEXT notes. Keeps FK customer_id -> customers.id for table lineage. - all_types (3 rows): NEW table exercising every MySQL -> OM DataType mapping: all integer widths, float/double/decimal, every CHAR/VARCHAR/ TINYTEXT/TEXT/MEDIUMTEXT/LONGTEXT, every BINARY/VARBINARY/TINYBLOB/BLOB/ MEDIUMBLOB/LONGBLOB, DATE/TIME/DATETIME/TIMESTAMP/YEAR, BIT(8), JSON, ENUM, SET. Seed uses hex literals (X'...'), bit literals (b'...'), JSON_OBJECT/JSON_ARRAY, and ENUM/SET string values. - customer_txn_summary (view): view-to-table lineage across both tables. - sp_active_customer_count (stored procedure): static SELECT COUNT(*) with status='active' filter. Exercises StoredProcedure entity ingestion when .as_metadata(include_stored_procedures=True) is set. All seed SQL is deterministic (no NOW/RAND/UUID) and idempotent (ON DUPLICATE KEY UPDATE). Stored procedure uses DROP IF EXISTS + CREATE in apply() -- MySQL has no CREATE OR REPLACE PROCEDURE. Expected row counts: customers=10, transactions=10, all_types=3 -- fixed so profiler row_count.equals(N) assertions work cleanly. get_policy() continues to lazy-build + cache the EnforcementPolicy.
_check_strict_schema_extras lists actual tables and stored procedures via list_all_entities filtered by databaseSchema FQN, comparing against the declared expected sets. Any actual entity not in the expected set produces a Diff with path suffix "(strict)" for visual distinction in failure output. SUPERSET mode behavior is unchanged — existing test expectations against SUPERSET stay green. Required by filter tests, which need to verify "exclude" semantics: merely walking declared items isn't enough because extras stay invisible. The new check closes that gap at the schema level. Database/service-level extras (e.g., stray schemas, unrelated databases) are not flagged — would need similar list_all_entities walks at those levels. Deferred until a test demands it.
Declarative OM-side expectation mirroring MYSQL_BASELINE. Single factory mysql_expected(service_name, *, tables=None) returns the full catalog (4 tables + 1 view + 1 stored procedure) or a restricted slice when `tables` is supplied. Caller computes the post-filter table list explicitly — the factory does not mirror the ingest-side regex logic, which keeps the filter config <-> expected catalog mapping obvious at each test call site. Each table's column list is a separate private helper so diffs stay readable. 18 columns on customers (full PII coverage), 10 on transactions, 30 on all_types (every MySQL type the connector maps), 5 on the customer_txn_summary view. stored_procedures stays as [sp_active_customer_count] — no current scenario disables SPs. Several DataType variants (MEDIUMINT, TINYTEXT/LONGTEXT/TINYBLOB/ MEDIUMBLOB/LONGBLOB) are best-guess mappings; Task 25's first live run is the authoritative validator and will surface any corrections.
Split into two files so test modules can import helpers cleanly
(pytest discourages importing from conftest).
connector.py:
- mysql_service_name(session_uuid, variant="") — builds the service
name. Variant suffix enables per-test isolation for filter tests
that can't share service state.
- build_mysql_config(service_name, server) — factory for building a
base MySQL WorkflowConfig. Reads E2E_MYSQL_* env vars via
Env.required so missing secrets fail-fast at fixture time.
conftest.py:
- mysql_source_ready (session): runs ensure_baseline, creating schema,
tables, views, stored procedure, and deterministic seed rows.
- mysql_cfg (module): shared default config for tests that don't need
variant isolation.
Depends on session_uuid + om_server_config fixtures from the top-level
conftest.py (arriving in Task 23). pytest resolves the chain lazily.
Absolute `from tests.cli_e2e_v2.*` imports across 18 files broke pytest conftest loading because pytest doesn't put ingestion/ on sys.path before importing conftest modules. The repo convention (v1 cli_e2e, integration test conftests) is relative imports within test packages — converting matches that pattern. Converted: - 44 occurrences across 18 files (core/* + mysql/* + top-level conftest). - Depth-aware: same-dir uses .X, sibling-up uses ..sibling.X, etc. Also lands the top-level conftest.py (Task 23) with session-scoped identity/server/client fixtures + per-test om_client/cli_runner + registered_services finalizer for OM service cleanup. Verified: pytest --collect-only on tests/cli_e2e_v2/ now exits 5 (no tests collected) without ConftestImportFailure.
Exercises every MVP-scoped pipeline against the MySQL baseline:
metadata ingest — vanilla structural diff + all_types column
type spot-checks + stored procedure presence
+ service-level entity counts
profiler — exact row counts (customers=10, transactions=10,
all_types=3) per deterministic seed
lineage — view-derived only (customer_txn_summary→customers/
transactions via SQL parse); FK constraints land
on TableConstraint entries, not lineage edges
auto-classification — PII tags on column-name regex matches (email,
ssn, first_name → Sensitive; phone, address,
date_of_birth → NonSensitive)
Plus four filter scenarios using isolated variant-named services:
tables_include exact, tables_exclude exact, schemas_include only e2e,
and regex include+exclude with exclude-priority semantic.
Session-shared tests use mysql_cfg; filter tests build isolated configs
via build_mysql_config(service_name, om_server_config) so STRICT-mode
extras assertions don't get polluted by prior-test state.
DQ pipeline deferred to post-MVP (requires TestCase entities seeded via
OM API before the test command can run).
Live validation happens in Task 25 — today's build only verifies
collection + imports.
Env gains two new methods:
- ref(key) — validates presence and returns "${KEY}" in one step
- set_default(key, default) — os.environ.setdefault wrapper
ServerConfig.to_workflow_config_dict and build_mysql_config now emit
${OM_*} / ${E2E_MYSQL_*} references via Env.ref() instead of embedding
real JWT tokens / passwords / host:port values. The metadata CLI's
load_config_file applies os.path.expandvars to raw YAML before parsing,
so subprocesses resolve references at load time — but tmp_path artifacts
a developer might share in tickets or Slack no longer leak credentials.
Validation lives inside ref() rather than per-connector factories:
Env.ref("E2E_MYSQL_PASSWORD") both validates the var is set AND emits
the reference string. Per-connector code reads as a flat list of refs
with no scattered Env.required calls.
ServerConfig instance fields still hold real resolved values (used by
om_http_client fixture, which authenticates directly without YAML).
ServerConfig.from_env uses Env.set_default to backfill OM_SERVER_URL
and OM_JWT_TOKEN with local-Docker defaults so local runs work without
env var plumbing.
Replaces the static-method Env API (required / optional / set_default / ref)
with a single class. Construction captures (key, default, required);
Env.ref() returns '${KEY}' for YAML embedding, Env.get() returns the raw
value for non-YAML contexts.
Connector code and framework code converge on one idiom:
Env("E2E_MYSQL_USER").ref() # YAML field
Env("OM_SERVER_URL", default=FALLBACK).get() # raw value
Env("E2E_MYSQL_DATABASE", required=False).get() # optional
No caller touches os.environ directly. ServerConfig.from_env, its
to_workflow_config_dict, build_mysql_config, and mysql/baseline.py's
get_policy are all updated to the new API. The rendered YAML still emits
${E2E_MYSQL_*} / ${OM_*} references so credentials never enter tmp_path
artifacts; metadata CLI's os.path.expandvars expands them at subprocess
load time.
- BaselineColumn: foreign_key + description fields - BaselineTable: description field - MySqlEnforcer: emit FK + COMMENT in CREATE TABLE via new helpers - mysql baseline: FK on transactions.customer_id -> customers.id; table/column comments on customers+transactions; docstring caveat about CREATE TABLE IF NOT EXISTS limit - TableAssert: has_description_containing + has_foreign_key_constraint - mysql expected: description substring assertions (picked up by differ walk) - mysql conftest: module-scoped mysql_metadata_ingested fixture; 7 tests drop inline metadata CLI (saves ~6 subprocess runs per module pass) - new test: test_transactions_foreign_key_constraint pytest collection: exit=0, 12 tests (up from 11).
Lower the bar for adding a new connector to the slice. The pilot MySQL module had ~250 lines of conftest boilerplate every connector would need to copy. Most of it was the same body wrapped around different service names. Highlights: - core/fixtures.py — `run_source_baseline(factory, baseline, *, name)` and `metadata_ingest_once(...)` plain-function bodies. Per-connector conftests still own pytest scoping (so the dependency graph builds); helpers carry just the body. Removes ~70 lines of copy-paste from each connector module. - core/filter_scenarios.py — `COMMON_FILTER_SCENARIOS` + helper `expected_tables_for(...)` that turns a missing variant key into an actionable AssertionError naming the connector and the fix location. Adding a fifth filter semantic is now one edit; every connector picks it up automatically. - README.md — quick-start, env-var table, debug-failure-mode catalog (CliExecutionError, StructuralMismatch, JWT mint, baseline drift, filter variant gap, eventually timeout). Each entry: symptom + first- look diagnostic step. - CONNECTORS.md — seven-file scaffold (baseline / connector / enforcer / expected / conftest / test) with code skeletons. Calls out the admin-vs-ingest credential split and the ON CONFLICT/ON DUPLICATE KEY UPDATE template requirement up front. MySQL pilot updates that fall out of the new helpers: - mysql/conftest.py shrinks; fixtures consume `metadata_ingest_once`. - mysql/test_mysql.py — filter tests parametrize over `COMMON_FILTER_SCENARIOS` with a per-connector `_EXPECTED_TABLES_BY_VARIANT` mapping. mark-deleted, error- containment, and FK assertion paths added. - mysql/baseline.py / expected.py — small adjustments to match the derived expected tree shape. Config-builder small ergonomics: - pipelines.py / builder.py — minor surface tightening to support the `metadata_ingest_once(pipeline_options=...)` form. - server.py — sink config split out for clarity (and so connectors that don't push lineage / profiles can override only what they need). - common_baseline.py / orchestrator.py — non-functional polish driven by the new docs (clearer log lines, narrower types).
…ontext, profile metrics Every change here is invisible while tests are passing — earned its keep at 11pm when something fails on CI and you have to figure out what happened without re-running locally. CliRunner / errors: - E2ESetupError(Exception) — shared base for "test couldn't run" (CLI exit, baseline drift, JWT mint). Inherits Exception, NOT AssertionError, so pytest reports these as test errors (E) instead of test failures (F). Clear separation: setup trouble vs. assertion miss. - CliExecutionError gains status_path + step_failures_summary. The exception body now extracts the first 3 step failures from the status JSON (truncated to ~500 chars each, root cause first) above the raw stdout/stderr dump. The wall of capture logs is still underneath for deep dives. - Every CliRunner.run() invocation logs `cfg=… status=… stdout=…` at INFO so the trio of artifact paths is grep-able from CI logs without having to dig into pytest's auto-generated tmp_path. Eventually: - retry_until re-raises a polling timeout with attempts/elapsed/timeout context wrapped around the last failure. The original exception is chained via __cause__ so the traceback stays intact. Previous behavior surfaced just the last AssertionError indistinguishable from an immediate failure. - E2E_POLL_VERBOSE=1 logs every attempt's failure — used when a poll is flaking and you can't tell whether data never materialized vs. is flapping. Profile metrics (Bucket C #7): - ColumnProfileAssert.has_metrics(**expected) — generic kwargs against Column.profile (mean / stddev / distinct_count / max_length / min / max / etc.). Replaces the planned NumericAssert expansion since OM exposes column metrics as a flat dict. - New eventually-aware ColumnProfileAssert wraps polling for profile.* fetches. - Lineage assert: minor fluent surface trim. Fluent API docstring catalog: - core/fluent/__init__.py grows a top-level docstring listing every fluent terminal with sync vs eventually grouping and the one-shot arming rule. Discoverable from `help(om_client)` or a quick `cat`.
… + assertrepr hook Closes Bucket C #5 + #6. The OM-side and source-side diff machinery were drifting apart — separate Diff dataclasses, separate string sentinels for "missing", separate scope-classification helpers. This commit consolidates the type and the differ shape so adding a new node kind (ExpectedView, ExpectedDashboard) is a one-line registry entry. types.py — single source of truth: - Diff dataclass moves from differ.py into source/types.py, used by both source baseline drift and OM-side catalog diffing. - Drift -> Diff rename in sql_enforcer.py (one Diff type, two domains). - DiffKind enum (MISSING / UNEXPECTED / VALUE_MISMATCH) replaces the string sentinels (`expected="present", actual="missing"` / `"not present"` / `"exactly declared set"`). Default is VALUE_MISMATCH so the 13 value-mismatch callsites (which already pass expected+actual) didn't need touching. - Diff.__str__ renders per kind: MISSING → one-liner `path: missing`, UNEXPECTED → `path: unexpected (extras...)`, VALUE_MISMATCH → full expected/actual block. differ.py — registry-based dispatch: - Every per-node differ has the uniform signature `_diff_<x>(node, parent_path, om, mode, diffs)`. The single `_diff_node` entry point looks up `_DIFFERS[type(node)]` and calls it. Recursion is uniform: each differ invokes `_diff_node` on its children. Adding ExpectedView is one function above + one entry in `_DIFFERS` below. - `_classify_path` returns (scope, category) in one pass instead of two separate helpers (`_scope_of` + `_category_summary`). - Diff is imported from source.types (was a local dataclass). sql_enforcer.py — type safety: - `_SqlSnapshot` TypedDict for the snapshot payload. Typos like `state["tabels"]` are caught by basedpyright instead of raising KeyError at runtime against a populated source. - All Diff() callsites use kind=DiffKind.MISSING (5 sites) or default VALUE_MISMATCH (3 value-mismatch sites). conftest.py — pytest hook: - `pytest_assertrepr_compare` for StructuralMismatch. Fires on `==`/`is` comparisons (rare but real, e.g. `assert run_diff() == NO_DIFFS`) and emits the full grouped diff body instead of pytest's default short repr. Raised StructuralMismatch was already rendering fully because AssertionError messages aren't truncated; the hook covers the comparison case. All 12 tests still collect cleanly. Smoke-tested rendering across all three DiffKind variants — MISSING / UNEXPECTED-with-extras / VALUE_MISMATCH — and confirmed no residual sentinel strings in the codebase.
Stand-alone workflow for the v2 framework, parallel to v1's py-cli-e2e-tests.yml. Strangler pattern — connectors migrate one at a time; the matrix grows here as it shrinks in v1. Triggers: workflow_dispatch only during the MySQL pilot stabilization window. Cron will be added once the pilot run is consistently green (spec §7.1). The MySQL pilot needs no CI secrets — the connector talks to the docker-compose MySQL stood up by setup-openmetadata-test-environment, which uses public credentials baked into the bundled compose file. A small "Bootstrap MySQL grants" step issues the v2-framework-specific GRANTs on the e2e schema before pytest runs. Kept in the workflow (not in docker/mysql/mysql-script.sql) so the shared infra image's privilege surface stays minimal — only this workflow's openmetadata_user gets the e2e grants.
Project switched from black + isort + pylint + pycln to ruff (single tool, configured in ingestion/pyproject.toml). After rebasing slice 1 onto main, run the new pre-commit hooks against the framework changes. ruff-check: 8 auto-fixes (unused imports, import order, trivial restructures). One manual fix in core/source/types.py — the SourceBaselineEnforcer Protocol still referenced `Drift` after the earlier rename to `Diff`; ruff's F821 caught it. ruff-format: 28 files reformatted. Pure whitespace / quote / wrap diffs — no semantic changes. 12 pytest tests still collect cleanly. The reformat overlaps slice 1's cli/* + workflow/* + cli_e2e_v2/* paths plus the test_base_workflow.py file touched during rebase conflict resolution.
…cer Protocol Architectural review caught the Protocol mis-teaching its own contract: the docstring claimed introspect → compare → apply, but the orchestrator only ever called compare(). SqlBaselineEnforcer.introspect() was implemented but had zero callers; the public method just wrapped a private _snapshot() that compare() already invokes internally. Future implementers (Postgres, Metabase) would have faithfully provided introspect() and seen it never invoked. Speculative API surface decays. Removed: - introspect() declaration from SourceBaselineEnforcer Protocol - SourceState dataclass (only the deleted method returned it) - SqlBaselineEnforcer.introspect() public method (compare's internal _snapshot is the boundary) Updated: - Protocol docstring + types.py module docstring to reflect the actual compare → apply lifecycle. The new docstring explicitly notes that engine-specific state caching belongs inside the enforcer — if a future enforcer (e.g. an HTTP-based Metabase one) wants to share a fetched snapshot between compare and apply, it does so as a private cache, not a public framework phase. 12 tests still collect; ruff clean.
…oot_list OM's generated Pydantic schema sometimes wraps list-typed entity fields in RootModel[list[X]] (notably `owners`) and sometimes uses plain list[X] | None (today: `tags`, `columns`, `tableConstraints`). The shape can flip between OM minor versions without warning, which historically forced sweeping changes through every test that walked the field. The differ + fluent layer had ~12 callsites hand-rolling either `field.root if field else []` or `field or []` depending on which shape the field had today. Inconsistency itself is the smell — when the next OM schema bump promotes `tags` to a RootModel, the connector tests would die with AttributeError instead of producing a clean StructuralMismatch. Added core/_om_compat.py exposing unwrap_root_list(field) -> list. Mirrors the role model_str() plays for scalar RootModel fields (tagFQN, name, description). All list-typed Pydantic field reads now route through it: - differ._diff_table: owners, tags, columns - differ._diff_column: tags - table_assert._fk_matches: constraint.columns + referredColumns - table_assert.has_tag, has_owner, has_foreign_key_constraint, column, column.has_tag (×2) - profile_assert._fetch_column_profile: columns Future RootModel bumps now touch one helper rather than 12 sites. 12 tests still collect; ruff clean.
Spec §4.8 declared three match modes (STRICT / SUPERSET / SUBSET) but
the differ only ever branched on STRICT. Calling
`assert_service_matches(..., mode=MatchMode.SUBSET)` silently produced
SUPERSET semantics — the kind of lying API that wastes triage time
when a test author trusts the enum.
We have no current consumer. Spec hedge ("rare") was speculative.
Drop it — adding it back is one line trivial when an actual rare case
surfaces. Same speculative-API discipline that killed introspect() in
H1.
Removed:
- MatchMode.SUBSET from the enum (core/expected/types.py)
- SUBSET line from the MatchMode docstring
- SUBSET line from assert_service_matches docstring
12 tests still collect; ruff clean.
…es fallback The previous matcher fell back from `<direction>Edges` to `nodes` if the typed edge wasn't found. That accommodated API-response variability at the cost of correctness: `nodes` contains the full participant set across BOTH directions, mixed. Worked example: tableA.lineage.has_downstream(tableB) OM returns: upstreamEdges=[tableB], downstreamEdges=[], nodes=[tableA, tableB] fallback hits `nodes`, finds tableB → assertion passes But tableB is upstream of tableA, not downstream → silent inversion. Investigation of `metadata.ingestion.ometa.mixins.lineage_mixin._get_lineage` confirmed OM populates direction-typed edges as canonical; `nodes` is the full graph participant list, never a substitute. Removed the `nodes` short-circuit. If a future OM response genuinely fails to populate direction-typed edges while emitting `nodes`, the test fails LOUD with both fields rendered in the assertion message — that's the right discovery path for an OM bug, not a quiet test pass. 12 tests still collect; ruff clean.
…or summary Per-step truncation meant a 5-step pipeline with widespread failures contributed up to 15 lines to the inline summary, defeating the "first 3 lines tell you the root cause" intent. The full failure list is in the status JSON for deep dives; the inline block is the appetizer. Switched to a global cap of 3 across all steps. Failures cascade — step 1's first failure is overwhelmingly the root cause; step 5's third failure is downstream noise. Predictable inline-summary size regardless of step count. 12 tests still collect; ruff clean.
… type coverage Each connector that exercises types beyond SQLAlchemy core (TINYINT, MEDIUMINT, ENUM on MySQL; ARRAY, JSONB, INET on Postgres; etc.) needs its own declaration table. The architecture review flagged the implicit naming convention as worth pinning: `all_types` with a `BigInteger` PK `id`, three seed rows. Documenting the convention now (vs. extracting a helper) keeps the abstraction surface small while preventing future contributors from reinventing the table name. The actual work — picking the dialect columns to test — stays where it belongs: connector-private.
…dure abstract Surface missing dialect-overrides at enforcer instantiation (fixture setup) instead of at first stored-procedure apply mid-test, where the failure context is harder to triage. Before: _apply_stored_procedure raised NotImplementedError. A subclass that forgot to override would only fail when the apply() loop reached a non-empty stored_procedures list — multiple test runs after the omission, with a confusing call-site. After: SqlBaselineEnforcer is an ABC; _apply_stored_procedure is @AbstractMethod. Forgetting to override fails at construction with "Can't instantiate abstract class …", before any test runs. Subclasses without stored procedures in their baseline implement as a `pass` no-op (documented in the abstract method's docstring). Smoke-tested MySqlEnforcer still instantiates (fully satisfies the contract; abstract methods frozenset is empty). 12 tests collect; ruff clean.
OM's Edge schema carries only fromEntity/toEntity UUIDs (no FQN). The
prior _check_edge read fullyQualifiedName off Edge dicts directly,
producing {None} on every lookup; live MySQL pilot was the first run
to exercise the path. Build a uuid->fqn map from nodes + central
entity, then match direction-typed edges by resolved FQN.
OM's MySQL median impl previously returned a non-deterministic value (its ROW_NUMBER() OVER () lacked a window ORDER BY and the @counter side-effect ordering was undefined). Live pilot returned 650/680 across runs on identical seed data. Re-pin to 680 with a comment referencing the upstream fix; expect intermittent CI failures until the OM patch lands.
# Conflicts: # ingestion/src/metadata/cli/common.py # ingestion/src/metadata/cmd.py # ingestion/tests/unit/workflow/test_base_workflow.py
Auto-fixes from `pre-commit run --all-files` after merging main into slice 1 (the main commits include the ruff migration #27739/#27774). Changes: - Drop string-quoted forward references where `from __future__ import annotations` is in effect (auto-fix). - Move type-only imports under `TYPE_CHECKING` blocks (auto-fix). - Convert `Union[A, B, ...]` to `A | B | ...` in `pipelines.py`. - Annotate class-level mutable defaults with `ClassVar` in `entity_assert.py` and `table_assert.py`. - Add `check=False` to `subprocess.run` in `cli_runner.py`. - Mark `StructuralMismatch` and `SourceBaselineDrift` with `# noqa: N818` — public exception names, intentional API surface. `pyproject.toml` per-file-ignores extended for `tests/cli_e2e_v2/**`: - `TID252` (relative imports) — by design (connector-centric layout). - `T201` (print) — top-level conftest's session posture banner. Both paths listed twice (relative + ingestion-prefixed) per the existing dual-cwd pattern in this file.
Drops dependency on the shared docker-compose MySQL + the CI GRANT bootstrap step that violated test-framework / shared-infra separation. A session-scoped `mysql_container` fixture now boots `mysql:8.0` per run, creates the `e2e` schema, and provisions a scoped `om_user` whose GRANTs match the documented OM MySQL connector minimum (SELECT, SHOW VIEW, EXECUTE on the schema; PROCESS + SHOW_ROUTINE globally — the latter required for stored-procedure body introspection). Two users coexist for the session: - `root` (testcontainers default) for the framework's SqlBaselineEnforcer - `om_user` for the CLI ingest subprocess, exercising production-realistic permissions The fixture populates `E2E_MYSQL_*` env vars from the running container so the existing `Env(key).ref()` YAML rendering keeps working — secrets never leak to tmp_path even though they are generated per-session. CI workflow loses 30 lines; teammates no longer manage source DB ports, credentials, or grants. README + CONNECTORS docs trimmed to essentials. Also drops the now-stale comment in test_mysql.py about the median fix landing upstream — fix #27815 is in-branch.
`build_ingestion_status` was annotated `Optional[IngestionStatus]` but the body unconditionally returns `IngestionStatus(...)` — tighten the return type. Eliminates one error in `write_status_file`. The remaining three `write_status_file` errors stem from `BaseWorkflow`'s `config: Union[Any, Dict]` annotation, which is load-bearing — the `ApplicationWorkflow` subclass uses `OpenMetadataApplicationConfig` instead of `OpenMetadataWorkflowConfig`, so tightening the base annotation breaks subclass polymorphism. Suppress those three with targeted `# pyright: ignore[<rule>]` comments rather than weakening the contract.
…e modes 48 unit tests covering the structural differ, fluent assertions, and polling primitives — built against synthetic stubs, no testcontainers, no network. Run in <2s. Each fluent assertion has paired positive + negative tests so the suite proves bidirectional correctness rather than just "doesn't crash on happy path". The structural-differ tests parametrize one corruption per row and assert the right `DiffKind` + path fragment surfaces. Notable scenarios: - StructuralDiffer: missing service / db / schema / table / SP / column; wrong column type; wrong service type; missing tag / owner / description; STRICT mode flagging extras that SUPERSET tolerates. - TableAssert / ColumnAssert / StoredProcedureAssert: tag, owner, FK, schema definition (case-insensitive), soft-deleted state, code body. - EventuallyRunner / retry_until: success on first attempt, retry-until- converge, deadline-driven timeout with last-failure-text in the error, non-AssertionError propagation, one-shot arming semantics. Includes regression coverage for the SHOW_ROUTINE bug discovered earlier today (`test_sp_has_code_containing_raises_on_empty_body`) — proves the assertion catches an empty SP body even before ingestion runs.
| workflow.stop() | ||
| workflow.print_status() | ||
| workflow.raise_from_status() | ||
| execute_workflow(workflow=workflow, config_dict=config_dict, status_file=status_file) |
There was a problem hiding this comment.
⚠️ Bug: run_app lost print_status() call — silent CLI output regression
The old run_app explicitly called workflow.print_status() between stop() and raise_from_status(), giving operators a human-readable summary on stdout. The new unified execute_workflow in common.py does not call print_status(), so the app command now runs silently on success. Other run_* functions (ingest, profile, etc.) may have had their own print_status() calls that were similarly dropped during unification.
Suggested fix:
Add `workflow.print_status()` inside `execute_workflow` (after `workflow.stop()` in the finally block, or after the try/finally before `raise_from_status`), so all workflow types get consistent output:
try:
workflow.execute()
finally:
workflow.stop()
if status_file is not None:
workflow.write_status_file(status_file)
workflow.print_status()
if config_dict.get("workflowConfig", {}).get("raiseOnError", True):
workflow.raise_from_status()
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| "pipeline_type": self.config.source.type, # pyright: ignore[reportAttributeAccessIssue] | ||
| "ingestion_pipeline_fqn": self.config.ingestionPipelineFQN, # pyright: ignore[reportAttributeAccessIssue] |
There was a problem hiding this comment.
⚠️ Edge Case: write_status_file assumes config.source.type — crashes for ApplicationWorkflow
write_status_file accesses self.config.source.type (line 432) and self.config.ingestionPipelineFQN (line 433). For ApplicationWorkflow, config is OpenMetadataApplicationConfig which uses a different schema and may not expose .source.type. If a user passes --status-file to the app subcommand, this will raise AttributeError inside the finally block, masking any original workflow exception.
Suggested fix:
Guard the attribute access with hasattr/getattr, or use a polymorphic method:
payload = {
"pipeline_type": getattr(getattr(self.config, "source", None), "type", None),
"ingestion_pipeline_fqn": getattr(self.config, "ingestionPipelineFQN", None),
"success": success,
"steps": ingestion_status.model_dump(),
}
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| finally: | ||
| workflow.stop() | ||
| if status_file is not None: | ||
| workflow.write_status_file(status_file) |
There was a problem hiding this comment.
💡 Edge Case: write_status_file in finally can suppress the original exception
In execute_workflow, write_status_file runs inside the finally block. If workflow.execute() raises and write_status_file also raises (e.g., because steps aren't populated after an early crash), the original exception is replaced by the status-file error, making debugging harder. This only triggers when --status-file is explicitly provided.
Suggested fix:
Wrap the write_status_file call in a try/except to log-and-swallow:
finally:
workflow.stop()
if status_file is not None:
try:
workflow.write_status_file(status_file)
except Exception:
logger.warning("Failed to write status file", exc_info=True)
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
| _Req = TypeVar("_Req", Literal[True], Literal[False]) | ||
|
|
||
|
|
||
| class Env(Generic[_Req]): |
There was a problem hiding this comment.
Great typing communicates intent across time. Right now, a signature like Env[True] is a bit opaque. We can make it entirely self-documenting by introducing semantic aliases:
# Constrained — callers can only parameterize Env with Required (True) or NotRequired (False), matching
# the two concrete `required` states. Anything else is a type error.
Required: TypeAlias = Literal[True]
NotRequired: TypeAlias = Literal[False]
_Req = TypeVar("_Req", Required, NotRequired)This allows us to write/hint Env[Required] or Env[NotRequired], whic tells the developer what is expected. While it might be a minor detail for this specific, simple class, adopting this pattern sets a great standard for readability across the codebase.
| required: Literal[False], | ||
| ) -> Env[Literal[False]]: ... | ||
|
|
||
| def __new__( |
There was a problem hiding this comment.
Overriding new with a custom signature breaks standard object instantiation conventions and feels hacky here.
We can achieve the exact same type resolution much more cleanly by using @overload on init. This respects standard interfaces and keeps the logic straightforward without having to touch new. Here is how that looks:
# Two __init__ overloads — one per Literal[required] value — let the type
# checker pick the right `Env[NotRequired|Required]` specialization at the call
# site.
@overload
def __init__(
self: Env[Required],
key: str,
default: str | None = None,
*,
required: Literal[True],
) -> None: ...
@overload
def __init__(
self: Env[NotRequired],
key: str,
default: str | None = None,
*,
required: Literal[False] = False,
) -> None: ...
def __init__(
self,
key: str,
default: str | None = None,
*,
required: bool = True,
) -> None:
self.key = key
if default is not None:
os.environ.setdefault(key, default)
if required and not os.environ.get(key):
raise EnvLoadError(
f"required env var {key} not set. Set it in your shell or GitHub Actions secrets."
)
# Two .get() overloads narrow by the specialization of Env:
# Env[Required].get() -> str (construction validated)
# Env[NotRequired].get() -> str | None (caller must handle None)
@overload
def get(self: Env[Required]) -> str: ...
@overload
def get(self: Env[NotRequired]) -> str | None: ...
def get(self) -> str | None:
return os.environ.get(self.key)
Summary
First vertical slice of the v2 CLI E2E test framework — replaces the v1
test_cli_<connector>.pyper-source approach with a declarative shape-then-assert framework. Ships the framework core plus MySQL as the pilot connector. Other connectors land in follow-up PRs.What's new
Framework (
tests/cli_e2e_v2/core/)MetaData+TableSeed+ViewDefinition+StoredProcedureDefinition).SqlBaselineEnforcer(Inspector-based introspect, dialect-specific apply). Adding a new SQL dialect is ~30 lines.WorkflowConfigbuilder with.pipeline(options)transition + filter overlay; renders${E2E_*}env refs (no secrets in tmp_path).OmClient→TableAssert/StoredProcedureAssert/ServiceAssert+ descend intoColumnAssert/LineageAssert/ProfileAssert. One-shot.eventually(timeout)arms the next terminal for polling.Expected*against OM, raisesStructuralMismatchwith category-summary + path-grouped diffs.Expected*derived from the SQLAlchemyMetaData+ dialect-specific type map — no hand-rolled column lists.MySQL pilot (
tests/cli_e2e_v2/mysql/)mysql_containertestcontainers fixture bootsmysql:8.0, creates a scopedom_userwith the documented OM-MySQL minimum GRANTs (SELECT, SHOW VIEW, EXECUTE on the schema; PROCESS + SHOW_ROUTINE globally for SP body introspection), and populatesE2E_MYSQL_*env vars. Teammates runpytestwith no env plumbing.Meta-tests (
tests/cli_e2e_v2/meta/)DiffKindvia parametrize.CI
.github/workflows/py-cli-e2e-tests-v2.ymlruns the suite on push and PR. Drops the v1 GRANT bootstrap step — the testcontainers fixture owns container lifecycle, schema bootstrap, and grants.What's NOT in this PR
test_cli_mysql.pyis removed in a follow-up PR after v2 has run cleanly in CI for one week. This avoids the "delete-the-old-path-in-the-same-PR-as-the-new-one" trap.Test plan
pytest tests/cli_e2e_v2/mysql -v→ 12 passedpytest tests/cli_e2e_v2/meta -v→ 48 passedmake py_format_checkcleannox -s static-checksno new errors vs baselineNotable workflow fixes bundled in
fix(workflow): basedpyright cleanup for write_status_file— three pyright-ignore comments onBaseWorkflow.configaccess in the newwrite_status_filemethod (added earlier in this branch). TheUnion[Any, Dict]config type is load-bearing for theApplicationWorkflowsubclass which usesOpenMetadataApplicationConfig, so tightening the base type isn't safe; targeted ignores keep the annotation correct without breaking polymorphism. Also tightensbuild_ingestion_statusreturn type fromOptional[IngestionStatus]toIngestionStatus(body unconditionally returns non-None).Docs
tests/cli_e2e_v2/README.md— how to run + debugtests/cli_e2e_v2/CONNECTORS.md— six-file scaffold for adding a new connector, mirrorsmysql/