diff --git a/README.md b/README.md index 3cdceddc0..3e1137864 100644 --- a/README.md +++ b/README.md @@ -254,6 +254,43 @@ Options: --quantization-type TEXT which type of quantization to use valid values [fp32, fp16, bq] --help Show this message and exit. ``` + +### Run awsopensearch serverless from command line + +OpenSearch Serverless (AOSS) is a serverless deployment option for Amazon OpenSearch Service. VDBBench supports AOSS with the `--serverless` flag, which uses AWS SigV4 authentication and automatically skips unsupported operations (cluster settings, force merge, manual refresh, warmup API). + +**Prerequisites:** +- AWS credentials configured (via `~/.aws/credentials`, environment variables, or IAM role) +- `requests-aws4auth` installed: `pip install requests-aws4auth` +- IAM identity policy allowing `aoss:APIAccessAll` on the collection +- AOSS Data Access Policy granting index/collection permissions to the IAM principal + +**Example: Run performance test on OpenSearch Serverless** + +```shell +vectordbbench awsopensearch --db-label aoss \ + --serverless --aws-region us-east-1 \ + --host .aoss.us-east-1.on.aws --port 443 \ + --case-type Performance768D1M \ + --m 16 --ef-construction 200 --ef-search 40 \ + --number-of-shards 8 --number-of-replicas 0 \ + --engine faiss --metric-type cosine \ + --num-concurrency 80,100,120 +``` + +OpenSearch Serverless-specific options: + +| Option | Description | +|--------|-------------| +| `--serverless` | Enable OpenSearch Serverless mode (uses AWS SigV4 auth) | +| `--aws-region` | AWS region for the AOSS collection (default: `us-east-1`) | + +> **Notes:** +> - `--user` and `--password` are not needed for Serverless mode +> - `--engine` is accepted but ignored internally (AOSS manages the engine) +> - `--force-merge-enabled`, `--refresh-interval`, `--flush-threshold-size`, and `--cb-threshold` are ignored for Serverless +> - Data insertion uses smaller batch sizes (100) for Serverless API compatibility + ### Run Elastic Cloud from command line Elastic Cloud supports multiple index types: HNSW, HNSW_INT8, HNSW_INT4, and HNSW_BBQ. diff --git a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py index 8d77d263b..8c089b790 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py +++ b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py @@ -48,6 +48,7 @@ def __init__( log.info(f"AWS_OpenSearch client config: {self.db_config}") log.info(f"AWS_OpenSearch db case config : {self.case_config}") + self._is_serverless = ".aoss." in self.db_config.get("hosts", [{}])[0].get("host", "") client = OpenSearch(**self.db_config) if drop_old: log.info(f"AWS_OpenSearch client drop old index: {self.index_name}") @@ -80,6 +81,9 @@ def _log_index_creation_info(self) -> None: log.info(f"All case_config parameters: {self.case_config.__dict__}") def _configure_cluster_settings(self, client: OpenSearch) -> None: + if self._is_serverless: + log.info("Skipping cluster settings for OpenSearch Serverless") + return cluster_settings_body = { "persistent": { "knn.algo_param.index_thread_qty": self.case_config.index_thread_qty, @@ -89,17 +93,19 @@ def _configure_cluster_settings(self, client: OpenSearch) -> None: client.cluster.put_settings(body=cluster_settings_body) def _build_index_settings(self) -> dict: - return { + settings = { "index": { "knn": True, "number_of_shards": self.case_config.number_of_shards, "number_of_replicas": self.case_config.number_of_replicas, - "translog.flush_threshold_size": self.case_config.flush_threshold_size, - "knn.advanced.approximate_threshold": "-1", - "knn.algo_param.ef_search": self.case_config.ef_search, }, - "refresh_interval": self.case_config.refresh_interval, } + if not self._is_serverless: + settings["index"]["translog.flush_threshold_size"] = self.case_config.flush_threshold_size + settings["index"]["knn.advanced.approximate_threshold"] = "-1" + settings["index"]["knn.algo_param.ef_search"] = self.case_config.ef_search + settings["refresh_interval"] = self.case_config.refresh_interval + return settings def _build_vector_field_config(self) -> dict: method_config = self.case_config.index_param() @@ -108,6 +114,21 @@ def _build_vector_field_config(self) -> dict: if self.case_config.engine == AWSOS_Engine.s3vector: method_config = {"engine": "s3vector"} + # OpenSearch Serverless does not support 'engine' or 'encoder' in method config + if self._is_serverless and "engine" in method_config: + space_type = method_config.pop("space_type", self.case_config.parse_metric()) + method_config.pop("engine", None) + if "parameters" in method_config: + method_config["parameters"].pop("encoder", None) + vector_field_config = { + "type": "knn_vector", + "dimension": self.dim, + "space_type": space_type, + "method": method_config, + } + log.info(f"Serverless vector field config: {vector_field_config}") + return vector_field_config + if self.case_config.on_disk: space_type = self.case_config.parse_metric() vector_field_config = { @@ -222,25 +243,47 @@ def _insert_with_single_client( metadata: list[int], labels_data: list[str] | None = None, ) -> tuple[int, Exception]: - insert_data = [] - for i in range(len(embeddings)): - index_data = {"index": {"_index": self.index_name, self.id_col_name: metadata[i]}} - if self.with_scalar_labels and self.case_config.use_routing and labels_data is not None: - index_data["routing"] = labels_data[i] - insert_data.append(index_data) - - other_data = {self.vector_col_name: embeddings[i]} - if self.with_scalar_labels and labels_data is not None: - other_data[self.label_col_name] = labels_data[i] - insert_data.append(other_data) + embeddings_list = list(embeddings) + batch_size = 100 if self._is_serverless else len(embeddings_list) + total_inserted = 0 - try: - self.client.bulk(body=insert_data) - return len(embeddings), None - except Exception as e: - log.warning(f"Failed to insert data: {self.index_name} error: {e!s}") - time.sleep(10) - return self._insert_with_single_client(embeddings, metadata, labels_data) + for i in range(0, len(embeddings_list), batch_size): + batch_embeddings = embeddings_list[i : i + batch_size] + batch_metadata = metadata[i : i + batch_size] + batch_labels = labels_data[i : i + batch_size] if labels_data else None + + insert_data = [] + for j in range(len(batch_embeddings)): + if self._is_serverless: + index_data = {"index": {"_index": self.index_name}} + else: + index_data = {"index": {"_index": self.index_name, self.id_col_name: batch_metadata[j]}} + + if self.with_scalar_labels and self.case_config.use_routing and batch_labels is not None: + index_data["routing"] = batch_labels[j] + insert_data.append(index_data) + + other_data = {self.vector_col_name: batch_embeddings[j]} + if self._is_serverless: + other_data["id"] = batch_metadata[j] + if self.with_scalar_labels and batch_labels is not None: + other_data[self.label_col_name] = batch_labels[j] + insert_data.append(other_data) + + try: + self.client.bulk(body=insert_data) + total_inserted += len(batch_embeddings) + except Exception as e: + log.warning(f"Failed to insert batch: {self.index_name} error: {e!s}") + time.sleep(10) + try: + self.client.bulk(body=insert_data) + total_inserted += len(batch_embeddings) + except Exception as retry_e: + log.warning(f"Retry failed for batch: {retry_e!s}") + return total_inserted, retry_e + + return total_inserted, None def _insert_with_multiple_clients( self, @@ -402,24 +445,37 @@ def search_embedding( } try: - resp = self.client.search( - index=self.index_name, - body=body, - size=k, - _source=False, - docvalue_fields=[self.id_col_name], - stored_fields="_none_", - preference="_only_local" if self.case_config.number_of_shards == 1 else None, - routing=self.routing_key, - ) - log.debug(f"Search took: {resp['took']}") - log.debug(f"Search shards: {resp['_shards']}") - log.debug(f"Search hits total: {resp['hits']['total']}") - try: - return [int(h["fields"][self.id_col_name][0]) for h in resp["hits"]["hits"]] - except Exception: - # empty results - return [] + if self._is_serverless: + resp = self.client.search( + index=self.index_name, + body=body, + size=k, + _source=["id"], + preference="_only_local" if self.case_config.number_of_shards == 1 else None, + routing=self.routing_key, + ) + try: + return [int(h["_source"]["id"]) for h in resp["hits"]["hits"]] + except Exception: + return [] + else: + resp = self.client.search( + index=self.index_name, + body=body, + size=k, + _source=False, + docvalue_fields=[self.id_col_name], + stored_fields="_none_", + preference="_only_local" if self.case_config.number_of_shards == 1 else None, + routing=self.routing_key, + ) + log.debug(f"Search took: {resp['took']}") + log.debug(f"Search shards: {resp['_shards']}") + log.debug(f"Search hits total: {resp['hits']['total']}") + try: + return [int(h["fields"][self.id_col_name][0]) for h in resp["hits"]["hits"]] + except Exception: + return [] except Exception as e: log.warning(f"Failed to search: {self.index_name} error: {e!s}") raise e from None @@ -468,6 +524,10 @@ def _update_ef_search(self): log.warning(f"Failed to update ef_search parameter: {e}") def _update_replicas(self): + if self._is_serverless: + log.info("Skipping replica updates for OpenSearch Serverless") + return + index_settings = self.client.indices.get_settings(index=self.index_name) current_number_of_replicas = int(index_settings[self.index_name]["settings"]["index"]["number_of_replicas"]) log.info( @@ -490,6 +550,11 @@ def _wait_till_green(self): log.info(f"Index {self.index_name} is green..") def _refresh_index(self): + if self._is_serverless: + log.info("Skipping manual refresh for OpenSearch Serverless, waiting for auto-refresh...") + time.sleep(10) + return + log.debug(f"Starting refresh for index {self.index_name}") while True: try: @@ -505,6 +570,10 @@ def _refresh_index(self): log.debug(f"Completed refresh for index {self.index_name}") def _do_force_merge(self): + if self._is_serverless: + log.info("Skipping force merge for OpenSearch Serverless") + return + log.info(f"Updating the Index thread qty to {self.case_config.index_thread_qty_during_force_merge}.") cluster_settings_body = { @@ -530,6 +599,10 @@ def _do_force_merge(self): log.info(f"Completed force merge for index {self.index_name}") def _load_graphs_to_memory(self, client: OpenSearch): + if self._is_serverless: + log.info("Skipping warmup API for OpenSearch Serverless") + return + if self.case_config.engine != AWSOS_Engine.lucene: log.info("Calling warmup API to load graphs into memory") warmup_endpoint = f"/_plugins/_knn/warmup/{self.index_name}" diff --git a/vectordb_bench/backend/clients/aws_opensearch/cli.py b/vectordb_bench/backend/clients/aws_opensearch/cli.py index 5bc80a687..59bb68a23 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/cli.py +++ b/vectordb_bench/backend/clients/aws_opensearch/cli.py @@ -25,8 +25,14 @@ def optional_secret_str(value: str | None) -> SecretStr | None: class AWSOpenSearchTypedDict(TypedDict): host: Annotated[str, click.option("--host", type=str, help="Db host", required=True)] port: Annotated[int, click.option("--port", type=int, default=80, help="Db Port")] - user: Annotated[str | None, click.option("--user", type=str, help="Db User")] - password: Annotated[str | None, click.option("--password", type=str, help="Db password")] + user: Annotated[str | None, click.option("--user", type=str, help="Db User (not needed for Serverless)")] + password: Annotated[ + str | None, click.option("--password", type=str, help="Db password (not needed for Serverless)") + ] + is_serverless: Annotated[bool, click.option("--serverless", is_flag=True, help="Use OpenSearch Serverless")] + aws_region: Annotated[ + str, click.option("--aws-region", type=str, default="us-east-1", help="AWS region for Serverless") + ] number_of_shards: Annotated[ int, click.option("--number-of-shards", type=int, help="Number of primary shards for the index", default=1), @@ -172,6 +178,12 @@ class AWSOpenSearchHNSWTypedDict(CommonTypedDict, AWSOpenSearchTypedDict, HNSWFl def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]): from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig + is_serverless = parameters.get("serverless", False) + log.info(f"Is Serverless: {is_serverless}") + + if not is_serverless and not parameters.get("user"): + log.warning("Standard OpenSearch mode requires user and password.") + # Set default values for HNSW parameters if not provided and not using s3vector engine = AWSOS_Engine(parameters["engine"]) ef_construction = parameters.get("ef_construction") @@ -192,8 +204,10 @@ def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]): db_config=AWSOpenSearchConfig( host=parameters["host"], port=parameters["port"], - user=parameters["user"], - password=optional_secret_str(parameters["password"]), + user=parameters.get("user"), + password=optional_secret_str(parameters.get("password")), + is_serverless=is_serverless, + aws_region=parameters.get("aws_region", "us-east-1"), ), db_case_config=AWSOpenSearchIndexConfig( number_of_shards=parameters["number_of_shards"], diff --git a/vectordb_bench/backend/clients/aws_opensearch/config.py b/vectordb_bench/backend/clients/aws_opensearch/config.py index 62c284317..134da869b 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/config.py +++ b/vectordb_bench/backend/clients/aws_opensearch/config.py @@ -16,8 +16,60 @@ class AWSOpenSearchConfig(DBConfig, BaseModel): port: int = 80 user: str | None = None password: SecretStr | None = None + is_serverless: bool = False + aws_region: str = "us-east-1" def to_dict(self) -> dict: + if self.is_serverless: + return self._serverless_config() + return self._standard_config() + + def _serverless_config(self) -> dict: + """Configuration for OpenSearch Serverless using AWS SigV4 authentication.""" + log.info(f"Configuring OpenSearch Serverless - Host: {self.host}, Region: {self.aws_region}") + + try: + import boto3 + except ImportError as e: + raise ImportError("boto3 is required for OpenSearch Serverless. Install with: pip install boto3") from e + + try: + from opensearchpy import RequestsHttpConnection + from requests_aws4auth import AWS4Auth + except ImportError as e: + raise ImportError( + "requests-aws4auth is required for OpenSearch Serverless. " + "Install with: pip install requests-aws4auth" + ) from e + + session = boto3.Session() + credentials = session.get_credentials() + if not credentials: + raise ValueError("AWS credentials not found. Please configure AWS credentials.") + + credentials = credentials.get_frozen_credentials() + auth = AWS4Auth( + credentials.access_key, + credentials.secret_key, + self.aws_region, + "aoss", + session_token=credentials.token, + ) + + return { + "hosts": [{"host": self.host, "port": 443}], + "http_auth": auth, + "use_ssl": True, + "verify_certs": True, + "connection_class": RequestsHttpConnection, + "timeout": 600, + "max_retries": 3, + "retry_on_timeout": True, + "http_compress": False, + } + + def _standard_config(self) -> dict: + """Configuration for standard OpenSearch with basic auth.""" use_ssl = self.port == 443 http_auth = ( (self.user, self.password.get_secret_value())