Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion docker/tests/test_lance_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def test_properties_persist_after_insert(self, spark):


class TestDDLIndex:
"""Test DDL index operations: CREATE INDEX (BTree, FTS)."""
"""Test DDL index operations: CREATE INDEX (BTree, ZoneMap, FTS)."""

def test_create_btree_index_on_int(self, spark):
"""Test CREATE INDEX with BTree on integer column."""
Expand Down Expand Up @@ -562,6 +562,42 @@ def test_create_btree_index_on_string(self, spark):
""").collect()
assert len(query_result) == 3

def test_create_zonemap_index_on_int(self, spark):
"""Test CREATE INDEX with ZoneMap on integer column."""
spark.sql("""
CREATE TABLE default.test_table (
id INT,
name STRING,
value DOUBLE
)
""")

data = [(i, f"Name{i}", float(i * 10)) for i in range(100)]
df = spark.createDataFrame(data, ["id", "name", "value"])
df.writeTo("default.test_table").append()

result = spark.sql("""
ALTER TABLE default.test_table
CREATE INDEX idx_id_zonemap USING zonemap (id)
WITH (rows_per_zone = 8)
""").collect()

assert len(result) == 1
assert result[0][1] == "idx_id_zonemap"

indexes = spark.sql("""
SHOW INDEXES IN default.test_table
""").collect()
zonemap_rows = [row for row in indexes if row["name"] == "idx_id_zonemap"]
assert len(zonemap_rows) == 1
assert zonemap_rows[0]["index_type"] == "zonemap"

query_result = spark.sql("""
SELECT * FROM default.test_table WHERE id = 50
""").collect()
assert len(query_result) == 1
assert query_result[0].id == 50

def test_create_fts_index(self, spark):
"""Test CREATE INDEX with full-text search (FTS)."""
spark.sql("""
Expand Down
37 changes: 33 additions & 4 deletions docs/src/operations/ddl/create-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Creates a scalar index on a Lance table to accelerate queries.

## Overview

The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. This operation is performed in a distributed manner, building indexes for each data fragment in parallel.
The `CREATE INDEX` command builds an index on one or more columns of a Lance table. Indexing can improve the performance of queries that filter on the indexed columns. Depending on the index method, Lance Spark either uses a fragment-parallel build path or delegates to Lance's built-in single-phase index creation path.

## Basic Usage

Expand All @@ -24,13 +24,22 @@ The following index methods are supported:

| Method | Description |
|---------|-----------------------------------------------------------------------------|
| `zonemap` | Lightweight min/max index for fragment pruning on a scalar column. |
| `btree` | B-tree index for efficient range queries and point lookups on scalar columns. |
| `fts` | Full-text search (inverted) index for text search on string columns. |

## Options

The `CREATE INDEX` command supports options via the `WITH` clause to control index creation. These options are specific to the chosen index method.

### ZoneMap Options

For the `zonemap` method, the following options are supported:

| Option | Type | Description |
|-----------------|------|----------------------------------------------|
| `rows_per_zone` | Long | The approximate number of rows per zonemap zone. |

### BTree Options

For the `btree` method, the following options are supported:
Expand Down Expand Up @@ -77,6 +86,15 @@ Create a composite index on multiple columns.
ALTER TABLE lance.db.logs CREATE INDEX idx_ts_level USING btree (timestamp, level);
```

### Lightweight Fragment Pruning

Create a zonemap index when you want lightweight min/max-based fragment pruning:

=== "SQL"
```sql
ALTER TABLE lance.db.users CREATE INDEX idx_id_zonemap USING zonemap (id);
```

### Indexing with Options

Create an index and specify the `zone_size` for the B-tree:
Expand All @@ -86,6 +104,15 @@ Create an index and specify the `zone_size` for the B-tree:
ALTER TABLE lance.db.users CREATE INDEX idx_id_zoned USING btree (id) WITH (zone_size = 2048);
```

### Zonemap with Options

Create a zonemap index and specify the approximate number of rows per zone:

=== "SQL"
```sql
ALTER TABLE lance.db.users CREATE INDEX idx_id_zonemap USING zonemap (id) WITH (rows_per_zone = 2048);
```

### Full-Text Search Index

Create an FTS index on a text column:
Expand Down Expand Up @@ -117,17 +144,19 @@ The `CREATE INDEX` command returns the following information about the operation
Consider creating an index when:

- You frequently filter a large table on a specific column.
- You want lightweight fragment pruning based on per-zone min/max statistics.
- Your queries involve point lookups or small range scans.

## How It Works

The `CREATE INDEX` command operates as follows:

1. **Distributed Index Building**: For each fragment in the Lance dataset, a separate task is launched to build an index on the specified column(s).
2. **Metadata Merging**: Once all per-fragment indexes are built, their metadata is collected and merged.
1. **Index Build Execution**: Lance Spark chooses an execution path based on the index method. Methods such as `btree` can use fragment-parallel execution, while `zonemap` is built through Lance's single-phase create-index API.
2. **Metadata Finalization**: Lance records the new index metadata as part of the index creation flow.
3. **Transactional Commit**: A new table version is committed with the new index information. The operation is atomic and ensures that concurrent reads are not affected.

## Notes and Limitations

- **Index Methods**: The `btree` and `fts` methods are supported for scalar index creation.
- **Index Methods**: The `zonemap`, `btree`, and `fts` methods are supported for scalar index creation.
- **Zonemap Column Count**: Zonemap indexes currently support a single column only. The generic `CREATE INDEX` grammar accepts a column list, but Lance rejects multi-column zonemap creation.
- **Index Replacement**: If you create an index with the same name as an existing one, the old index will be replaced by the new one. This is because the underlying implementation uses `replace(true)`.
2 changes: 1 addition & 1 deletion docs/src/operations/ddl/show-indexes.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The `SHOW INDEXES` command returns the following columns:
|-------------------------|---------------|--------------------------------------------------------------------|
| `name` | string | Logical name of the index. |
| `fields` | array<string> | List of column names included in the index. |
| `index_type` | string | Human-readable index type (for example `btree`). |
| `index_type` | string | Human-readable index type (for example `btree` or `zonemap`). |
| `num_indexed_fragments` | long | Number of fragments fully or partially covered by the index. |
| `num_indexed_rows` | long | Approximate number of rows covered by the index. |
| `num_unindexed_fragments` | long | Number of fragments that are not yet indexed. |
Expand Down
38 changes: 37 additions & 1 deletion integration-tests/test_lance_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ def test_compression_metadata_reaches_lance_file(self, spark):


class TestDDLIndex:
"""Test DDL index operations: CREATE INDEX (BTree, FTS)."""
"""Test DDL index operations: CREATE INDEX (BTree, ZoneMap, FTS)."""

def test_create_btree_index_on_int(self, spark):
"""Test CREATE INDEX with BTree on integer column."""
Expand Down Expand Up @@ -690,6 +690,42 @@ def test_create_btree_index_on_string(self, spark):
""").collect()
assert len(query_result) == 3

def test_create_zonemap_index_on_int(self, spark):
"""Test CREATE INDEX with ZoneMap on integer column."""
spark.sql("""
CREATE TABLE default.test_table (
id INT,
name STRING,
value DOUBLE
)
""")

data = [(i, f"Name{i}", float(i * 10)) for i in range(100)]
df = spark.createDataFrame(data, ["id", "name", "value"])
df.writeTo("default.test_table").append()

result = spark.sql("""
ALTER TABLE default.test_table
CREATE INDEX idx_id_zonemap USING zonemap (id)
WITH (rows_per_zone = 8)
""").collect()

assert len(result) == 1
assert result[0][1] == "idx_id_zonemap"

indexes = spark.sql("""
SHOW INDEXES IN default.test_table
""").collect()
zonemap_rows = [row for row in indexes if row["name"] == "idx_id_zonemap"]
assert len(zonemap_rows) == 1
assert zonemap_rows[0]["index_type"] == "zonemap"

query_result = spark.sql("""
SELECT * FROM default.test_table WHERE id = 50
""").collect()
assert len(query_result) == 1
assert query_result[0].id == 50

def test_create_fts_index(self, spark):
"""Test CREATE INDEX with full-text search (FTS)."""
spark.sql("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import org.lance.Dataset;
import org.lance.Fragment;
import org.lance.ManifestSummary;
import org.lance.index.IndexDescription;
import org.lance.index.Index;
import org.lance.index.IndexType;
import org.lance.index.scalar.ZoneStats;
import org.lance.ipc.ColumnOrdering;
import org.lance.schema.LanceField;
Expand Down Expand Up @@ -382,9 +383,9 @@ private Set<String> findZonemapIndexedColumns(Dataset dataset) {
fieldIdToName.put(field.getId(), field.getName());
}

for (IndexDescription idx : dataset.describeIndices()) {
if ("ZONEMAP".equalsIgnoreCase(idx.getIndexType())) {
for (int fieldId : idx.getFieldIds()) {
for (Index idx : dataset.getIndexes()) {
if (idx.indexType() == IndexType.ZONEMAP) {
for (int fieldId : idx.fields()) {
String name = fieldIdToName.get(fieldId);
if (name != null) {
columns.add(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -159,7 +161,6 @@ private static Optional<Set<Integer>> analyzeFilter(
return Optional.empty();
}

@SuppressWarnings("unchecked")
private static Optional<Set<Integer>> analyzeComparison(
String column,
Object value,
Expand All @@ -171,30 +172,20 @@ private static Optional<Set<Integer>> analyzeComparison(
return Optional.empty();
}

Comparable<Object> target;
try {
target = (Comparable<Object>) value;
} catch (ClassCastException e) {
LOG.warn("Cannot cast filter value {} to Comparable for zonemap pruning", value);
return Optional.empty();
}

Set<Integer> matchingFragments = new HashSet<>();
for (ZoneStats zone : stats) {
if (zoneMatchesComparison(zone, target, type)) {
if (zoneMatchesComparison(zone, value, type)) {
matchingFragments.add(zone.getFragmentId());
}
}

return Optional.of(matchingFragments);
}

@SuppressWarnings("unchecked")
private static boolean zoneMatchesComparison(
ZoneStats zone, Comparable<Object> target, ComparisonType type) {
private static boolean zoneMatchesComparison(ZoneStats zone, Object target, ComparisonType type) {

Comparable<Object> min = (Comparable<Object>) zone.getMin();
Comparable<Object> max = (Comparable<Object>) zone.getMax();
Object min = zone.getMin();
Object max = zone.getMax();

// If min or max is null, the zone contains only nulls for the indexed range;
// non-null comparisons cannot match.
Expand All @@ -206,27 +197,26 @@ private static boolean zoneMatchesComparison(
switch (type) {
case EQUALS:
// target ∈ [min, max]
return target.compareTo(min) >= 0 && target.compareTo(max) <= 0;
return compareValues(target, min) >= 0 && compareValues(target, max) <= 0;
case LESS_THAN:
// ∃ row < target ⟺ zone.min < target
return min.compareTo(target) < 0;
return compareValues(min, target) < 0;
case LESS_THAN_OR_EQUAL:
return min.compareTo(target) <= 0;
return compareValues(min, target) <= 0;
case GREATER_THAN:
return max.compareTo(target) > 0;
return compareValues(max, target) > 0;
case GREATER_THAN_OR_EQUAL:
return max.compareTo(target) >= 0;
return compareValues(max, target) >= 0;
default:
return true; // conservative
}
} catch (ClassCastException e) {
} catch (ClassCastException | IllegalArgumentException e) {
// Type mismatch between filter value and zone stats — be conservative
LOG.warn("Type mismatch in zonemap comparison, skipping pruning for zone", e);
return true;
}
}

@SuppressWarnings("unchecked")
private static Optional<Set<Integer>> analyzeIn(
String column, Object[] values, Map<String, List<ZoneStats>> statsByColumn) {

Expand All @@ -244,14 +234,7 @@ private static Optional<Set<Integer>> analyzeIn(
break;
}
} else {
try {
Comparable<Object> target = (Comparable<Object>) value;
if (zoneMatchesComparison(zone, target, ComparisonType.EQUALS)) {
matchingFragments.add(zone.getFragmentId());
break;
}
} catch (ClassCastException e) {
// Non-comparable value, conservatively include
if (zoneMatchesComparison(zone, value, ComparisonType.EQUALS)) {
matchingFragments.add(zone.getFragmentId());
break;
}
Expand All @@ -262,6 +245,44 @@ private static Optional<Set<Integer>> analyzeIn(
return Optional.of(matchingFragments);
}

@SuppressWarnings({"rawtypes", "unchecked"})
private static int compareValues(Object left, Object right) {
if (left instanceof Number && right instanceof Number) {
return compareNumbers((Number) left, (Number) right);
}
if (isStringLike(left) && isStringLike(right)) {
return left.toString().compareTo(right.toString());
}
return ((Comparable) left).compareTo(right);
}

private static int compareNumbers(Number left, Number right) {
return toBigDecimal(left).compareTo(toBigDecimal(right));
}

private static BigDecimal toBigDecimal(Number value) {
if (value instanceof BigDecimal) {
return (BigDecimal) value;
}
if (value instanceof BigInteger) {
return new BigDecimal((BigInteger) value);
}
if (value instanceof Byte
|| value instanceof Short
|| value instanceof Integer
|| value instanceof Long) {
return BigDecimal.valueOf(value.longValue());
}
if (value instanceof Float || value instanceof Double) {
return BigDecimal.valueOf(value.doubleValue());
}
return new BigDecimal(value.toString());
}

private static boolean isStringLike(Object value) {
return value instanceof CharSequence || value instanceof UTF8String;
}

private static Optional<Set<Integer>> analyzeIsNull(
String column, Map<String, List<ZoneStats>> statsByColumn) {

Expand Down
Loading
Loading