Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,43 @@ Options:
--quantization-type TEXT which type of quantization to use valid values [fp32, fp16, bq]
--help Show this message and exit.
```

### Run awsopensearch serverless from command line

OpenSearch Serverless (AOSS) is a serverless deployment option for Amazon OpenSearch Service. VDBBench supports AOSS with the `--serverless` flag, which uses AWS SigV4 authentication and automatically skips unsupported operations (cluster settings, force merge, manual refresh, warmup API).

**Prerequisites:**
- AWS credentials configured (via `~/.aws/credentials`, environment variables, or IAM role)
- `requests-aws4auth` installed: `pip install requests-aws4auth`
- IAM identity policy allowing `aoss:APIAccessAll` on the collection
- AOSS Data Access Policy granting index/collection permissions to the IAM principal

**Example: Run performance test on OpenSearch Serverless**

```shell
vectordbbench awsopensearch --db-label aoss \
--serverless --aws-region us-east-1 \
--host <collection-id>.aoss.us-east-1.on.aws --port 443 \
--case-type Performance768D1M \
--m 16 --ef-construction 200 --ef-search 40 \
--number-of-shards 8 --number-of-replicas 0 \
--engine faiss --metric-type cosine \
--num-concurrency 80,100,120
```

OpenSearch Serverless-specific options:

| Option | Description |
|--------|-------------|
| `--serverless` | Enable OpenSearch Serverless mode (uses AWS SigV4 auth) |
| `--aws-region` | AWS region for the AOSS collection (default: `us-east-1`) |

> **Notes:**
> - `--user` and `--password` are not needed for Serverless mode
> - `--engine` is accepted but ignored internally (AOSS manages the engine)
> - `--force-merge-enabled`, `--refresh-interval`, `--flush-threshold-size`, and `--cb-threshold` are ignored for Serverless
> - Data insertion uses smaller batch sizes (100) for Serverless API compatibility

### Run Elastic Cloud from command line

Elastic Cloud supports multiple index types: HNSW, HNSW_INT8, HNSW_INT4, and HNSW_BBQ.
Expand Down
155 changes: 114 additions & 41 deletions vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(

log.info(f"AWS_OpenSearch client config: {self.db_config}")
log.info(f"AWS_OpenSearch db case config : {self.case_config}")
self._is_serverless = ".aoss." in self.db_config.get("hosts", [{}])[0].get("host", "")
client = OpenSearch(**self.db_config)
if drop_old:
log.info(f"AWS_OpenSearch client drop old index: {self.index_name}")
Expand Down Expand Up @@ -80,6 +81,9 @@ def _log_index_creation_info(self) -> None:
log.info(f"All case_config parameters: {self.case_config.__dict__}")

def _configure_cluster_settings(self, client: OpenSearch) -> None:
if self._is_serverless:
log.info("Skipping cluster settings for OpenSearch Serverless")
return
cluster_settings_body = {
"persistent": {
"knn.algo_param.index_thread_qty": self.case_config.index_thread_qty,
Expand All @@ -89,17 +93,19 @@ def _configure_cluster_settings(self, client: OpenSearch) -> None:
client.cluster.put_settings(body=cluster_settings_body)

def _build_index_settings(self) -> dict:
return {
settings = {
"index": {
"knn": True,
"number_of_shards": self.case_config.number_of_shards,
"number_of_replicas": self.case_config.number_of_replicas,
"translog.flush_threshold_size": self.case_config.flush_threshold_size,
"knn.advanced.approximate_threshold": "-1",
"knn.algo_param.ef_search": self.case_config.ef_search,
},
"refresh_interval": self.case_config.refresh_interval,
}
if not self._is_serverless:
settings["index"]["translog.flush_threshold_size"] = self.case_config.flush_threshold_size
settings["index"]["knn.advanced.approximate_threshold"] = "-1"
settings["index"]["knn.algo_param.ef_search"] = self.case_config.ef_search
settings["refresh_interval"] = self.case_config.refresh_interval
return settings

def _build_vector_field_config(self) -> dict:
method_config = self.case_config.index_param()
Expand All @@ -108,6 +114,21 @@ def _build_vector_field_config(self) -> dict:
if self.case_config.engine == AWSOS_Engine.s3vector:
method_config = {"engine": "s3vector"}

# OpenSearch Serverless does not support 'engine' or 'encoder' in method config
if self._is_serverless and "engine" in method_config:
space_type = method_config.pop("space_type", self.case_config.parse_metric())
method_config.pop("engine", None)
if "parameters" in method_config:
method_config["parameters"].pop("encoder", None)
vector_field_config = {
"type": "knn_vector",
"dimension": self.dim,
"space_type": space_type,
"method": method_config,
}
log.info(f"Serverless vector field config: {vector_field_config}")
return vector_field_config

if self.case_config.on_disk:
space_type = self.case_config.parse_metric()
vector_field_config = {
Expand Down Expand Up @@ -222,25 +243,47 @@ def _insert_with_single_client(
metadata: list[int],
labels_data: list[str] | None = None,
) -> tuple[int, Exception]:
insert_data = []
for i in range(len(embeddings)):
index_data = {"index": {"_index": self.index_name, self.id_col_name: metadata[i]}}
if self.with_scalar_labels and self.case_config.use_routing and labels_data is not None:
index_data["routing"] = labels_data[i]
insert_data.append(index_data)

other_data = {self.vector_col_name: embeddings[i]}
if self.with_scalar_labels and labels_data is not None:
other_data[self.label_col_name] = labels_data[i]
insert_data.append(other_data)
embeddings_list = list(embeddings)
batch_size = 100 if self._is_serverless else len(embeddings_list)
total_inserted = 0

try:
self.client.bulk(body=insert_data)
return len(embeddings), None
except Exception as e:
log.warning(f"Failed to insert data: {self.index_name} error: {e!s}")
time.sleep(10)
return self._insert_with_single_client(embeddings, metadata, labels_data)
for i in range(0, len(embeddings_list), batch_size):
batch_embeddings = embeddings_list[i : i + batch_size]
batch_metadata = metadata[i : i + batch_size]
batch_labels = labels_data[i : i + batch_size] if labels_data else None

insert_data = []
for j in range(len(batch_embeddings)):
if self._is_serverless:
index_data = {"index": {"_index": self.index_name}}
else:
index_data = {"index": {"_index": self.index_name, self.id_col_name: batch_metadata[j]}}

if self.with_scalar_labels and self.case_config.use_routing and batch_labels is not None:
index_data["routing"] = batch_labels[j]
insert_data.append(index_data)

other_data = {self.vector_col_name: batch_embeddings[j]}
if self._is_serverless:
other_data["id"] = batch_metadata[j]
if self.with_scalar_labels and batch_labels is not None:
other_data[self.label_col_name] = batch_labels[j]
insert_data.append(other_data)

try:
self.client.bulk(body=insert_data)
total_inserted += len(batch_embeddings)
except Exception as e:
log.warning(f"Failed to insert batch: {self.index_name} error: {e!s}")
time.sleep(10)
try:
self.client.bulk(body=insert_data)
total_inserted += len(batch_embeddings)
except Exception as retry_e:
log.warning(f"Retry failed for batch: {retry_e!s}")
return total_inserted, retry_e

return total_inserted, None

def _insert_with_multiple_clients(
self,
Expand Down Expand Up @@ -402,24 +445,37 @@ def search_embedding(
}

try:
resp = self.client.search(
index=self.index_name,
body=body,
size=k,
_source=False,
docvalue_fields=[self.id_col_name],
stored_fields="_none_",
preference="_only_local" if self.case_config.number_of_shards == 1 else None,
routing=self.routing_key,
)
log.debug(f"Search took: {resp['took']}")
log.debug(f"Search shards: {resp['_shards']}")
log.debug(f"Search hits total: {resp['hits']['total']}")
try:
return [int(h["fields"][self.id_col_name][0]) for h in resp["hits"]["hits"]]
except Exception:
# empty results
return []
if self._is_serverless:
resp = self.client.search(
index=self.index_name,
body=body,
size=k,
_source=["id"],
preference="_only_local" if self.case_config.number_of_shards == 1 else None,
routing=self.routing_key,
)
try:
return [int(h["_source"]["id"]) for h in resp["hits"]["hits"]]
except Exception:
return []
else:
resp = self.client.search(
index=self.index_name,
body=body,
size=k,
_source=False,
docvalue_fields=[self.id_col_name],
stored_fields="_none_",
preference="_only_local" if self.case_config.number_of_shards == 1 else None,
routing=self.routing_key,
)
log.debug(f"Search took: {resp['took']}")
log.debug(f"Search shards: {resp['_shards']}")
log.debug(f"Search hits total: {resp['hits']['total']}")
try:
return [int(h["fields"][self.id_col_name][0]) for h in resp["hits"]["hits"]]
except Exception:
return []
except Exception as e:
log.warning(f"Failed to search: {self.index_name} error: {e!s}")
raise e from None
Expand Down Expand Up @@ -468,6 +524,10 @@ def _update_ef_search(self):
log.warning(f"Failed to update ef_search parameter: {e}")

def _update_replicas(self):
if self._is_serverless:
log.info("Skipping replica updates for OpenSearch Serverless")
return

index_settings = self.client.indices.get_settings(index=self.index_name)
current_number_of_replicas = int(index_settings[self.index_name]["settings"]["index"]["number_of_replicas"])
log.info(
Expand All @@ -490,6 +550,11 @@ def _wait_till_green(self):
log.info(f"Index {self.index_name} is green..")

def _refresh_index(self):
if self._is_serverless:
log.info("Skipping manual refresh for OpenSearch Serverless, waiting for auto-refresh...")
time.sleep(10)
return

log.debug(f"Starting refresh for index {self.index_name}")
while True:
try:
Expand All @@ -505,6 +570,10 @@ def _refresh_index(self):
log.debug(f"Completed refresh for index {self.index_name}")

def _do_force_merge(self):
if self._is_serverless:
log.info("Skipping force merge for OpenSearch Serverless")
return

log.info(f"Updating the Index thread qty to {self.case_config.index_thread_qty_during_force_merge}.")

cluster_settings_body = {
Expand All @@ -530,6 +599,10 @@ def _do_force_merge(self):
log.info(f"Completed force merge for index {self.index_name}")

def _load_graphs_to_memory(self, client: OpenSearch):
if self._is_serverless:
log.info("Skipping warmup API for OpenSearch Serverless")
return

if self.case_config.engine != AWSOS_Engine.lucene:
log.info("Calling warmup API to load graphs into memory")
warmup_endpoint = f"/_plugins/_knn/warmup/{self.index_name}"
Expand Down
22 changes: 18 additions & 4 deletions vectordb_bench/backend/clients/aws_opensearch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,14 @@ def optional_secret_str(value: str | None) -> SecretStr | None:
class AWSOpenSearchTypedDict(TypedDict):
host: Annotated[str, click.option("--host", type=str, help="Db host", required=True)]
port: Annotated[int, click.option("--port", type=int, default=80, help="Db Port")]
user: Annotated[str | None, click.option("--user", type=str, help="Db User")]
password: Annotated[str | None, click.option("--password", type=str, help="Db password")]
user: Annotated[str | None, click.option("--user", type=str, help="Db User (not needed for Serverless)")]
password: Annotated[
str | None, click.option("--password", type=str, help="Db password (not needed for Serverless)")
]
is_serverless: Annotated[bool, click.option("--serverless", is_flag=True, help="Use OpenSearch Serverless")]
aws_region: Annotated[
str, click.option("--aws-region", type=str, default="us-east-1", help="AWS region for Serverless")
]
number_of_shards: Annotated[
int,
click.option("--number-of-shards", type=int, help="Number of primary shards for the index", default=1),
Expand Down Expand Up @@ -172,6 +178,12 @@ class AWSOpenSearchHNSWTypedDict(CommonTypedDict, AWSOpenSearchTypedDict, HNSWFl
def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]):
from .config import AWSOpenSearchConfig, AWSOpenSearchIndexConfig

is_serverless = parameters.get("serverless", False)
log.info(f"Is Serverless: {is_serverless}")

if not is_serverless and not parameters.get("user"):
log.warning("Standard OpenSearch mode requires user and password.")

# Set default values for HNSW parameters if not provided and not using s3vector
engine = AWSOS_Engine(parameters["engine"])
ef_construction = parameters.get("ef_construction")
Expand All @@ -192,8 +204,10 @@ def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]):
db_config=AWSOpenSearchConfig(
host=parameters["host"],
port=parameters["port"],
user=parameters["user"],
password=optional_secret_str(parameters["password"]),
user=parameters.get("user"),
password=optional_secret_str(parameters.get("password")),
is_serverless=is_serverless,
aws_region=parameters.get("aws_region", "us-east-1"),
),
db_case_config=AWSOpenSearchIndexConfig(
number_of_shards=parameters["number_of_shards"],
Expand Down
52 changes: 52 additions & 0 deletions vectordb_bench/backend/clients/aws_opensearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,60 @@ class AWSOpenSearchConfig(DBConfig, BaseModel):
port: int = 80
user: str | None = None
password: SecretStr | None = None
is_serverless: bool = False
aws_region: str = "us-east-1"

def to_dict(self) -> dict:
if self.is_serverless:
return self._serverless_config()
return self._standard_config()

def _serverless_config(self) -> dict:
"""Configuration for OpenSearch Serverless using AWS SigV4 authentication."""
log.info(f"Configuring OpenSearch Serverless - Host: {self.host}, Region: {self.aws_region}")

try:
import boto3
except ImportError as e:
raise ImportError("boto3 is required for OpenSearch Serverless. Install with: pip install boto3") from e

try:
from opensearchpy import RequestsHttpConnection
from requests_aws4auth import AWS4Auth
except ImportError as e:
raise ImportError(
"requests-aws4auth is required for OpenSearch Serverless. "
"Install with: pip install requests-aws4auth"
) from e

session = boto3.Session()
credentials = session.get_credentials()
if not credentials:
raise ValueError("AWS credentials not found. Please configure AWS credentials.")

credentials = credentials.get_frozen_credentials()
auth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
self.aws_region,
"aoss",
session_token=credentials.token,
)

return {
"hosts": [{"host": self.host, "port": 443}],
"http_auth": auth,
"use_ssl": True,
"verify_certs": True,
"connection_class": RequestsHttpConnection,
"timeout": 600,
"max_retries": 3,
"retry_on_timeout": True,
"http_compress": False,
}

def _standard_config(self) -> dict:
"""Configuration for standard OpenSearch with basic auth."""
use_ssl = self.port == 443
http_auth = (
(self.user, self.password.get_secret_value())
Expand Down
Loading