Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 85 additions & 77 deletions hindsight-api-slim/hindsight_api/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from ._pg_search import normalize_pg_search_tokenizer, pg_search_bm25_columns
from ._vector_index import (
bootstrap_extension,
configured_vector_extension,
detect_vector_extension,
index_type_keyword,
index_using_clause,
Expand Down Expand Up @@ -60,6 +61,88 @@ def _detect_vector_extension(conn, vector_extension: str = "pgvector") -> str:
return detect_vector_extension(conn, vector_extension)


def _ensure_pgvector_extension_in_public(conn: Connection) -> None:
"""Ensure pgvector is installed before pgvector-backed migrations run."""
logger.debug("Checking pgvector extension availability...")

# First, check if extension already exists
ext_check = conn.execute(
text(
"SELECT extname, nspname FROM pg_extension e "
"JOIN pg_namespace n ON e.extnamespace = n.oid "
"WHERE extname = 'vector'"
)
).fetchone()

if ext_check:
# Extension exists - check if in correct schema
ext_schema = ext_check[1]
if ext_schema == "public":
logger.info("pgvector extension found in public schema - ready to use")
else:
# Extension in wrong schema - try to fix if we have permissions
logger.warning(
f"pgvector extension found in schema '{ext_schema}' instead of 'public'. "
f"Attempting to relocate..."
)
try:
conn.execute(text("DROP EXTENSION vector CASCADE"))
conn.execute(text("SET search_path TO public"))
conn.execute(text("CREATE EXTENSION vector"))
conn.commit()
logger.info("pgvector extension relocated to public schema")
except Exception as e:
# Failed to relocate - log but don't fail if extension exists somewhere
logger.warning(
f"Could not relocate pgvector extension to public schema: {e}. "
f"Continuing with extension in '{ext_schema}' schema."
)
conn.rollback()
else:
# Extension doesn't exist - try to install
logger.info("pgvector extension not found, attempting to install...")
try:
conn.execute(text("SET search_path TO public"))
conn.execute(text("CREATE EXTENSION vector"))
conn.commit()
logger.info("pgvector extension installed in public schema")
except Exception as e:
# Installation failed - this is only fatal if extension truly doesn't exist
# Check one more time in case another process installed it
conn.rollback()
ext_recheck = conn.execute(
text(
"SELECT nspname FROM pg_extension e "
"JOIN pg_namespace n ON e.extnamespace = n.oid "
"WHERE extname = 'vector'"
)
).fetchone()

if ext_recheck:
logger.warning(
f"Could not install pgvector extension (permission denied?), "
f"but extension exists in '{ext_recheck[0]}' schema. Continuing..."
)
else:
# Extension truly doesn't exist and we can't install it
logger.error(
f"pgvector extension is not installed and cannot be installed: {e}. "
f"Please ensure pgvector is installed by a database administrator. "
f"See: https://github.com/pgvector/pgvector#installation"
)
raise RuntimeError(
"pgvector extension is required but not installed. "
"Please install it with: CREATE EXTENSION vector;"
) from e


def _bootstrap_vector_extension_for_migrations(conn: Connection, vector_extension: str) -> None:
"""Bootstrap the configured vector backend before schema migrations run."""
if vector_extension == "pgvector":
_ensure_pgvector_extension_in_public(conn)
bootstrap_extension(conn, vector_extension)


def _drop_per_bank_vector_indexes(conn: Connection, schema_name: str) -> None:
"""Drop per-bank partial memory_units vector indexes after global ScaNN is ready."""
rows = conn.execute(
Expand Down Expand Up @@ -275,83 +358,8 @@ def run_migrations(
logger.debug("Migration advisory lock acquired")

try:
# Ensure pgvector extension is installed globally BEFORE schema migrations
# This is critical: the extension must exist database-wide before any schema
# migrations run, otherwise custom schemas won't have access to vector types
logger.debug("Checking pgvector extension availability...")

# First, check if extension already exists
ext_check = conn.execute(
text(
"SELECT extname, nspname FROM pg_extension e "
"JOIN pg_namespace n ON e.extnamespace = n.oid "
"WHERE extname = 'vector'"
)
).fetchone()

if ext_check:
# Extension exists - check if in correct schema
ext_schema = ext_check[1]
if ext_schema == "public":
logger.info("pgvector extension found in public schema - ready to use")
else:
# Extension in wrong schema - try to fix if we have permissions
logger.warning(
f"pgvector extension found in schema '{ext_schema}' instead of 'public'. "
f"Attempting to relocate..."
)
try:
conn.execute(text("DROP EXTENSION vector CASCADE"))
conn.execute(text("SET search_path TO public"))
conn.execute(text("CREATE EXTENSION vector"))
conn.commit()
logger.info("pgvector extension relocated to public schema")
except Exception as e:
# Failed to relocate - log but don't fail if extension exists somewhere
logger.warning(
f"Could not relocate pgvector extension to public schema: {e}. "
f"Continuing with extension in '{ext_schema}' schema."
)
conn.rollback()
else:
# Extension doesn't exist - try to install
logger.info("pgvector extension not found, attempting to install...")
try:
conn.execute(text("SET search_path TO public"))
conn.execute(text("CREATE EXTENSION vector"))
conn.commit()
logger.info("pgvector extension installed in public schema")
except Exception as e:
# Installation failed - this is only fatal if extension truly doesn't exist
# Check one more time in case another process installed it
conn.rollback()
ext_recheck = conn.execute(
text(
"SELECT nspname FROM pg_extension e "
"JOIN pg_namespace n ON e.extnamespace = n.oid "
"WHERE extname = 'vector'"
)
).fetchone()

if ext_recheck:
logger.warning(
f"Could not install pgvector extension (permission denied?), "
f"but extension exists in '{ext_recheck[0]}' schema. Continuing..."
)
else:
# Extension truly doesn't exist and we can't install it
logger.error(
f"pgvector extension is not installed and cannot be installed: {e}. "
f"Please ensure pgvector is installed by a database administrator. "
f"See: https://github.com/pgvector/pgvector#installation"
)
raise RuntimeError(
"pgvector extension is required but not installed. "
"Please install it with: CREATE EXTENSION vector;"
) from e

vector_extension = os.getenv("HINDSIGHT_API_VECTOR_EXTENSION", "pgvector").lower()
bootstrap_extension(conn, vector_extension)
vector_extension = configured_vector_extension()
_bootstrap_vector_extension_for_migrations(conn, vector_extension)

# Commit any pending transaction on the advisory-lock connection
# before running migrations. Some code paths above (e.g., the
Expand Down
20 changes: 20 additions & 0 deletions hindsight-api-slim/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
validate_extension,
)
from hindsight_api.engine.retain import bank_utils
from hindsight_api.migrations import _bootstrap_vector_extension_for_migrations


class RecordingConn:
Expand Down Expand Up @@ -69,6 +70,25 @@ def test_bootstrap_extension_scann_installs_vector_before_alloydb_scann():
]


def test_migration_bootstrap_vchord_skips_pgvector_preflight():
conn = RecordingConn()

_bootstrap_vector_extension_for_migrations(conn, "vchord")

assert conn.statements == ["CREATE EXTENSION IF NOT EXISTS vchord CASCADE"]


def test_migration_bootstrap_scann_uses_dispatcher_without_legacy_pgvector_check():
conn = RecordingConn()

_bootstrap_vector_extension_for_migrations(conn, "scann")

assert conn.statements == [
"CREATE EXTENSION IF NOT EXISTS vector",
"CREATE EXTENSION IF NOT EXISTS alloydb_scann CASCADE",
]


def test_scann_index_creation_defers_until_table_is_large_enough():
assert should_defer_index_creation("scann", 0)
assert should_defer_index_creation("scann", SCANN_MIN_ROWS_FOR_AUTO_INDEX - 1)
Expand Down
Loading