From 520d39bc849cbc62213e368002f445e76d0048db Mon Sep 17 00:00:00 2001 From: r266-tech Date: Sat, 27 Jun 2026 01:17:22 +0800 Subject: [PATCH] Respect vector extension during migration bootstrap --- .../hindsight_api/migrations.py | 162 +++++++++--------- hindsight-api-slim/tests/test_vector_index.py | 20 +++ 2 files changed, 105 insertions(+), 77 deletions(-) diff --git a/hindsight-api-slim/hindsight_api/migrations.py b/hindsight-api-slim/hindsight_api/migrations.py index d95b07b14..1cd4d6ed5 100644 --- a/hindsight-api-slim/hindsight_api/migrations.py +++ b/hindsight-api-slim/hindsight_api/migrations.py @@ -32,6 +32,7 @@ from ._pg_search import normalize_pg_search_tokenizer, pg_search_bm25_columns from ._vector_index import ( bootstrap_extension, + configured_vector_extension, detect_vector_extension, index_type_keyword, index_using_clause, @@ -60,6 +61,88 @@ def _detect_vector_extension(conn, vector_extension: str = "pgvector") -> str: return detect_vector_extension(conn, vector_extension) +def _ensure_pgvector_extension_in_public(conn: Connection) -> None: + """Ensure pgvector is installed before pgvector-backed migrations run.""" + logger.debug("Checking pgvector extension availability...") + + # First, check if extension already exists + ext_check = conn.execute( + text( + "SELECT extname, nspname FROM pg_extension e " + "JOIN pg_namespace n ON e.extnamespace = n.oid " + "WHERE extname = 'vector'" + ) + ).fetchone() + + if ext_check: + # Extension exists - check if in correct schema + ext_schema = ext_check[1] + if ext_schema == "public": + logger.info("pgvector extension found in public schema - ready to use") + else: + # Extension in wrong schema - try to fix if we have permissions + logger.warning( + f"pgvector extension found in schema '{ext_schema}' instead of 'public'. " + f"Attempting to relocate..." + ) + try: + conn.execute(text("DROP EXTENSION vector CASCADE")) + conn.execute(text("SET search_path TO public")) + conn.execute(text("CREATE EXTENSION vector")) + conn.commit() + logger.info("pgvector extension relocated to public schema") + except Exception as e: + # Failed to relocate - log but don't fail if extension exists somewhere + logger.warning( + f"Could not relocate pgvector extension to public schema: {e}. " + f"Continuing with extension in '{ext_schema}' schema." + ) + conn.rollback() + else: + # Extension doesn't exist - try to install + logger.info("pgvector extension not found, attempting to install...") + try: + conn.execute(text("SET search_path TO public")) + conn.execute(text("CREATE EXTENSION vector")) + conn.commit() + logger.info("pgvector extension installed in public schema") + except Exception as e: + # Installation failed - this is only fatal if extension truly doesn't exist + # Check one more time in case another process installed it + conn.rollback() + ext_recheck = conn.execute( + text( + "SELECT nspname FROM pg_extension e " + "JOIN pg_namespace n ON e.extnamespace = n.oid " + "WHERE extname = 'vector'" + ) + ).fetchone() + + if ext_recheck: + logger.warning( + f"Could not install pgvector extension (permission denied?), " + f"but extension exists in '{ext_recheck[0]}' schema. Continuing..." + ) + else: + # Extension truly doesn't exist and we can't install it + logger.error( + f"pgvector extension is not installed and cannot be installed: {e}. " + f"Please ensure pgvector is installed by a database administrator. " + f"See: https://github.com/pgvector/pgvector#installation" + ) + raise RuntimeError( + "pgvector extension is required but not installed. " + "Please install it with: CREATE EXTENSION vector;" + ) from e + + +def _bootstrap_vector_extension_for_migrations(conn: Connection, vector_extension: str) -> None: + """Bootstrap the configured vector backend before schema migrations run.""" + if vector_extension == "pgvector": + _ensure_pgvector_extension_in_public(conn) + bootstrap_extension(conn, vector_extension) + + def _drop_per_bank_vector_indexes(conn: Connection, schema_name: str) -> None: """Drop per-bank partial memory_units vector indexes after global ScaNN is ready.""" rows = conn.execute( @@ -275,83 +358,8 @@ def run_migrations( logger.debug("Migration advisory lock acquired") try: - # Ensure pgvector extension is installed globally BEFORE schema migrations - # This is critical: the extension must exist database-wide before any schema - # migrations run, otherwise custom schemas won't have access to vector types - logger.debug("Checking pgvector extension availability...") - - # First, check if extension already exists - ext_check = conn.execute( - text( - "SELECT extname, nspname FROM pg_extension e " - "JOIN pg_namespace n ON e.extnamespace = n.oid " - "WHERE extname = 'vector'" - ) - ).fetchone() - - if ext_check: - # Extension exists - check if in correct schema - ext_schema = ext_check[1] - if ext_schema == "public": - logger.info("pgvector extension found in public schema - ready to use") - else: - # Extension in wrong schema - try to fix if we have permissions - logger.warning( - f"pgvector extension found in schema '{ext_schema}' instead of 'public'. " - f"Attempting to relocate..." - ) - try: - conn.execute(text("DROP EXTENSION vector CASCADE")) - conn.execute(text("SET search_path TO public")) - conn.execute(text("CREATE EXTENSION vector")) - conn.commit() - logger.info("pgvector extension relocated to public schema") - except Exception as e: - # Failed to relocate - log but don't fail if extension exists somewhere - logger.warning( - f"Could not relocate pgvector extension to public schema: {e}. " - f"Continuing with extension in '{ext_schema}' schema." - ) - conn.rollback() - else: - # Extension doesn't exist - try to install - logger.info("pgvector extension not found, attempting to install...") - try: - conn.execute(text("SET search_path TO public")) - conn.execute(text("CREATE EXTENSION vector")) - conn.commit() - logger.info("pgvector extension installed in public schema") - except Exception as e: - # Installation failed - this is only fatal if extension truly doesn't exist - # Check one more time in case another process installed it - conn.rollback() - ext_recheck = conn.execute( - text( - "SELECT nspname FROM pg_extension e " - "JOIN pg_namespace n ON e.extnamespace = n.oid " - "WHERE extname = 'vector'" - ) - ).fetchone() - - if ext_recheck: - logger.warning( - f"Could not install pgvector extension (permission denied?), " - f"but extension exists in '{ext_recheck[0]}' schema. Continuing..." - ) - else: - # Extension truly doesn't exist and we can't install it - logger.error( - f"pgvector extension is not installed and cannot be installed: {e}. " - f"Please ensure pgvector is installed by a database administrator. " - f"See: https://github.com/pgvector/pgvector#installation" - ) - raise RuntimeError( - "pgvector extension is required but not installed. " - "Please install it with: CREATE EXTENSION vector;" - ) from e - - vector_extension = os.getenv("HINDSIGHT_API_VECTOR_EXTENSION", "pgvector").lower() - bootstrap_extension(conn, vector_extension) + vector_extension = configured_vector_extension() + _bootstrap_vector_extension_for_migrations(conn, vector_extension) # Commit any pending transaction on the advisory-lock connection # before running migrations. Some code paths above (e.g., the diff --git a/hindsight-api-slim/tests/test_vector_index.py b/hindsight-api-slim/tests/test_vector_index.py index a65fdd74b..dd6f6b219 100644 --- a/hindsight-api-slim/tests/test_vector_index.py +++ b/hindsight-api-slim/tests/test_vector_index.py @@ -13,6 +13,7 @@ validate_extension, ) from hindsight_api.engine.retain import bank_utils +from hindsight_api.migrations import _bootstrap_vector_extension_for_migrations class RecordingConn: @@ -69,6 +70,25 @@ def test_bootstrap_extension_scann_installs_vector_before_alloydb_scann(): ] +def test_migration_bootstrap_vchord_skips_pgvector_preflight(): + conn = RecordingConn() + + _bootstrap_vector_extension_for_migrations(conn, "vchord") + + assert conn.statements == ["CREATE EXTENSION IF NOT EXISTS vchord CASCADE"] + + +def test_migration_bootstrap_scann_uses_dispatcher_without_legacy_pgvector_check(): + conn = RecordingConn() + + _bootstrap_vector_extension_for_migrations(conn, "scann") + + assert conn.statements == [ + "CREATE EXTENSION IF NOT EXISTS vector", + "CREATE EXTENSION IF NOT EXISTS alloydb_scann CASCADE", + ] + + def test_scann_index_creation_defers_until_table_is_large_enough(): assert should_defer_index_creation("scann", 0) assert should_defer_index_creation("scann", SCANN_MIN_ROWS_FOR_AUTO_INDEX - 1)