Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/schema/index-management/index-lifecycle.md
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ An index can be in one of the following states:
: The index is enabled and can be used to answer queries.
Furthermore, this is the only state in which insertions and deletions in the graph are forwarded to the index.

**WRITE_ONLY_ENABLED**

: The index is enabled for write operations only. Insertions and deletions in the graph are forwarded to the index,
but the index is not used to answer queries.
This is useful when enabling an index on a live system: the index can receive writes and be reindexed concurrently,
and once it has caught up, it can be fully enabled with `ENABLE_INDEX`.

**DISABLED**

: The index is disabled and will not be used to answer queries.
Expand All @@ -76,6 +83,8 @@ The following actions can be performed on an index to change its state via `mgmt
: Re-builds the index from the ground up using the data stored in the graph.
Depending on the size of the graph, this action can take a long time to complete.
Reindexing is possible in all stable states and automatically enables the index once finished.
If the index is in `WRITE_ONLY_ENABLED` state, reindexing will keep it in `WRITE_ONLY_ENABLED` instead of
automatically enabling it for reads.

**ENABLE_INDEX**
: Enables the index so that it can be used by the query processing engine.
Expand All @@ -85,6 +94,12 @@ The following actions can be performed on an index to change its state via `mgmt
Enabling a previously disabled index without `REINDEX` may cause index queries to return ghost vertices, which have
been deleted from the graph while the index was disabled.

**ENABLE_WRITE_ONLY**
: Enables the index for write operations only. The index will receive updates during mutations and reindexing,
but will not be used to answer queries.
This is useful for enabling an index on a live system where data ingestion is ongoing.
Once the index has caught up (e.g., after a `REINDEX`), use `ENABLE_INDEX` to fully enable it for queries.

**DISABLE_INDEX**
: Disables the index temporarily so that it is no longer used to answer queries.
The index also stops receiving updates in the graph.
Expand All @@ -102,3 +117,28 @@ The following actions can be performed on an index to change its state via `mgmt
**DROP_INDEX**
: Removes the index from the schema and communicates the change to other instances in the cluster.
After an index has been dropped, a new index is allowed to use the same name again.

## Write-Only Index Workflow

When adding an index to a live system with ongoing data ingestion, you can use the write-only workflow to avoid
missing writes while the index is being built:

```java
// 1. Create the index and wait for it to be registered
ManagementSystem.awaitGraphIndexStatus(graph, "myIndex").call();

// 2. Enable the index for writes only
mgmt = graph.openManagement();
mgmt.updateIndex(mgmt.getGraphIndex("myIndex"), SchemaAction.ENABLE_WRITE_ONLY).get();
mgmt.commit();

// 3. Reindex existing data (the index stays in WRITE_ONLY_ENABLED after reindexing)
mgmt = graph.openManagement();
mgmt.updateIndex(mgmt.getGraphIndex("myIndex"), SchemaAction.REINDEX).get();
mgmt.commit();

// 4. Fully enable the index for reads and writes
mgmt = graph.openManagement();
mgmt.updateIndex(mgmt.getGraphIndex("myIndex"), SchemaAction.ENABLE_INDEX).get();
mgmt.commit();
```
565 changes: 178 additions & 387 deletions docs/schema/index-management/index-lifecycle.svg
100644 → 100755
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
81 changes: 81 additions & 0 deletions janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2992,6 +2992,87 @@ public void testIndexUpdatesWithoutReindex() throws InterruptedException, Execut

}

@Test
public void testWriteOnlyIndexMixed() throws InterruptedException, ExecutionException {
final Object[] settings = new Object[]{option(LOG_SEND_DELAY, MANAGEMENT_LOG), Duration.ofMillis(0),
option(KCVSLog.LOG_READ_LAG_TIME, MANAGEMENT_LOG), Duration.ofMillis(50),
option(LOG_READ_INTERVAL, MANAGEMENT_LOG), Duration.ofMillis(250)
};

clopen(settings);
final String defText = "Mountain rocks are great friends";
final int defTime = 5;
final double defHeight = 101.1;
final String[] defPhones = new String[]{"1234", "5678"};

// Create types and mixed index
mgmt.makePropertyKey("time").dataType(Integer.class).make();
final PropertyKey text = mgmt.makePropertyKey("text").dataType(String.class).make();
mgmt.makePropertyKey("height").dataType(Double.class).make();
mgmt.buildIndex("writeOnlyMixed", Vertex.class).addKey(text, getTextMapping(), getFieldMap(text)).buildMixedIndex(INDEX);
finishSchema();

// Add data before index is enabled
addVertex(defTime, defText, defHeight, defPhones);

// Wait for REGISTERED
ManagementSystem.awaitGraphIndexStatus(graph, "writeOnlyMixed").call();

// Enable write-only
finishSchema();
mgmt.updateIndex(mgmt.getGraphIndex("writeOnlyMixed"), SchemaAction.ENABLE_WRITE_ONLY);
finishSchema();

// Verify WRITE_ONLY_ENABLED status
JanusGraphIndex idx = mgmt.getGraphIndex("writeOnlyMixed");
for (final PropertyKey key : idx.getFieldKeys()) {
assertEquals(SchemaStatus.WRITE_ONLY_ENABLED, idx.getIndexStatus(key));
}

// Add data while write-only
addVertex(defTime, defText, defHeight, defPhones);

// Query should NOT use the write-only index
clopen(settings);
evaluateQuery(tx.query().has("text", Text.CONTAINS, "rocks"),
ElementCategory.VERTEX, 2, new boolean[]{false, true});
newTx();

// Reindex - should stay in WRITE_ONLY_ENABLED
finishSchema();
mgmt.updateIndex(mgmt.getGraphIndex("writeOnlyMixed"), SchemaAction.REINDEX).get();
mgmt.commit();
finishSchema();

// Verify still WRITE_ONLY_ENABLED after reindex
idx = mgmt.getGraphIndex("writeOnlyMixed");
for (final PropertyKey key : idx.getFieldKeys()) {
assertEquals(SchemaStatus.WRITE_ONLY_ENABLED, idx.getIndexStatus(key));
}

// Query still should NOT use the index
newTx();
evaluateQuery(tx.query().has("text", Text.CONTAINS, "rocks"),
ElementCategory.VERTEX, 2, new boolean[]{false, true});
newTx();

// Now fully enable
finishSchema();
mgmt.updateIndex(mgmt.getGraphIndex("writeOnlyMixed"), SchemaAction.ENABLE_INDEX);
finishSchema();

// Verify ENABLED status
idx = mgmt.getGraphIndex("writeOnlyMixed");
for (final PropertyKey key : idx.getFieldKeys()) {
assertEquals(SchemaStatus.ENABLED, idx.getIndexStatus(key));
}

// Query SHOULD now use the index and find all data (reindex populated it)
clopen(settings);
evaluateQuery(tx.query().has("text", Text.CONTAINS, "rocks"),
ElementCategory.VERTEX, 2, new boolean[]{true, true}, "writeOnlyMixed");
}

private void addVertex(int time, String text, double height, String[] phones) {
newTx();
final JanusGraphVertex v = tx.addVertex("text", text, "time", time, "height", height);
Expand Down
71 changes: 71 additions & 0 deletions janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphTest.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -2243,6 +2243,77 @@ public void testIndexUpdateSyncWithMultipleInstances() throws InterruptedExcepti

}

@Tag(TestCategory.BRITTLE_TESTS)
@Test
public void testWriteOnlyIndexComposite() throws InterruptedException {
clopen(option(LOG_SEND_DELAY, MANAGEMENT_LOG), Duration.ofMillis(0),
option(KCVSLog.LOG_READ_LAG_TIME, MANAGEMENT_LOG), Duration.ofMillis(50),
option(LOG_READ_INTERVAL, MANAGEMENT_LOG), Duration.ofMillis(250)
);

mgmt.makePropertyKey("name").dataType(String.class).make();
finishSchema();

// Add data before index exists
tx.addVertex("name", "v1");
newTx();

// Create composite index
mgmt.buildIndex("writeOnlyIndex", Vertex.class).addKey(mgmt.getPropertyKey("name")).buildCompositeIndex();
mgmt.commit();

// Wait for REGISTERED
assertTrue(ManagementSystem.awaitGraphIndexStatus(graph, "writeOnlyIndex").status(SchemaStatus.REGISTERED)
.timeout(TestGraphConfigs.getSchemaConvergenceTime(ChronoUnit.SECONDS), ChronoUnit.SECONDS)
.call().getSucceeded());

// Enable write-only
finishSchema();
mgmt.updateIndex(mgmt.getGraphIndex("writeOnlyIndex"), SchemaAction.ENABLE_WRITE_ONLY);
finishSchema();

// Verify status is WRITE_ONLY_ENABLED
assertEquals(SchemaStatus.WRITE_ONLY_ENABLED, mgmt.getGraphIndex("writeOnlyIndex").getIndexStatus(mgmt.getPropertyKey("name")));

// Add data while index is write-only
tx.addVertex("name", "v2");
newTx();

// Query should NOT use the write-only index
evaluateQuery(tx.query().has("name", "v2"), ElementCategory.VERTEX, 1, new boolean[]{false, true});
newTx();

// Fully enable the index
finishSchema();
mgmt.updateIndex(mgmt.getGraphIndex("writeOnlyIndex"), SchemaAction.ENABLE_INDEX);
finishSchema();

// Now query SHOULD use the index (but only v2 was written while index was active)
evaluateQuery(tx.query().has("name", "v2"), ElementCategory.VERTEX, 1, new boolean[]{true, true}, "writeOnlyIndex");
// v1 was added before index, so not in index
evaluateQuery(tx.query().has("name", "v1"), ElementCategory.VERTEX, 0, new boolean[]{true, true}, "writeOnlyIndex");
newTx();

// Test DISABLE_INDEX from WRITE_ONLY_ENABLED
finishSchema();
mgmt.updateIndex(mgmt.getGraphIndex("writeOnlyIndex"), SchemaAction.DISABLE_INDEX);
finishSchema();
assertEquals(SchemaStatus.DISABLED, mgmt.getGraphIndex("writeOnlyIndex").getIndexStatus(mgmt.getPropertyKey("name")));

// Test ENABLE_WRITE_ONLY from DISABLED
mgmt.updateIndex(mgmt.getGraphIndex("writeOnlyIndex"), SchemaAction.ENABLE_WRITE_ONLY);
finishSchema();
assertEquals(SchemaStatus.WRITE_ONLY_ENABLED, mgmt.getGraphIndex("writeOnlyIndex").getIndexStatus(mgmt.getPropertyKey("name")));

// Test ENABLE_WRITE_ONLY from ENABLED
mgmt.updateIndex(mgmt.getGraphIndex("writeOnlyIndex"), SchemaAction.ENABLE_INDEX);
finishSchema();
assertEquals(SchemaStatus.ENABLED, mgmt.getGraphIndex("writeOnlyIndex").getIndexStatus(mgmt.getPropertyKey("name")));
mgmt.updateIndex(mgmt.getGraphIndex("writeOnlyIndex"), SchemaAction.ENABLE_WRITE_ONLY);
finishSchema();
assertEquals(SchemaStatus.WRITE_ONLY_ENABLED, mgmt.getGraphIndex("writeOnlyIndex").getIndexStatus(mgmt.getPropertyKey("name")));
}

@Tag(TestCategory.BRITTLE_TESTS)
@Test
public void testIndexShouldRegisterWhenWeRemoveAnInstance() throws InterruptedException {
Expand Down
17 changes: 12 additions & 5 deletions janusgraph-core/src/main/java/org/janusgraph/core/schema/SchemaAction.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.janusgraph.core.schema.SchemaStatus.INSTALLED;
import static org.janusgraph.core.schema.SchemaStatus.REGISTERED;
import static org.janusgraph.core.schema.SchemaStatus.ENABLED;
import static org.janusgraph.core.schema.SchemaStatus.WRITE_ONLY_ENABLED;
import static org.janusgraph.core.schema.SchemaStatus.DISABLED;

/**
Expand All @@ -42,23 +43,23 @@ public enum SchemaAction {
/**
* Re-builds the index from the graph
*/
REINDEX(Arrays.asList(REGISTERED, ENABLED, DISABLED)),
REINDEX(Arrays.asList(REGISTERED, ENABLED, WRITE_ONLY_ENABLED, DISABLED)),

/**
* Enables the index so that it can be used by the query processing engine. An index must be registered before it
* can be enabled.
*/
ENABLE_INDEX(Arrays.asList(REGISTERED, DISABLED, ENABLED)),
ENABLE_INDEX(Arrays.asList(REGISTERED, DISABLED, ENABLED, WRITE_ONLY_ENABLED)),

/**
* Disables the index in the graph so that it is no longer used.
*/
DISABLE_INDEX(Arrays.asList(ENABLED, DISABLED, REGISTERED)),
DISABLE_INDEX(Arrays.asList(ENABLED, WRITE_ONLY_ENABLED, DISABLED, REGISTERED)),

/**
* Deletes indexed data and leaves the index in an empty state.
*/
DISCARD_INDEX(Arrays.asList(DISABLED, REGISTERED, DISCARDED)),
DISCARD_INDEX(Arrays.asList(DISABLED, WRITE_ONLY_ENABLED, REGISTERED, DISCARDED)),

/**
* Removes the internal index vertex, which completely deletes the index
Expand All @@ -68,7 +69,13 @@ public enum SchemaAction {
/**
* Registers the index as empty which qualifies it for deletion.
*/
MARK_DISCARDED(Arrays.asList(DISABLED, REGISTERED, DISCARDED));
MARK_DISCARDED(Arrays.asList(DISABLED, WRITE_ONLY_ENABLED, REGISTERED, DISCARDED)),

/**
* Enables the index for write operations only. The index will receive updates during mutations and reindexing,
* but will not be used to answer queries.
*/
ENABLE_WRITE_ONLY(Arrays.asList(REGISTERED, ENABLED, DISABLED));

private final Set<SchemaStatus> applicableStatuses;

Expand Down
6 changes: 6 additions & 0 deletions janusgraph-core/src/main/java/org/janusgraph/core/schema/SchemaStatus.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public enum SchemaStatus {
*/
ENABLED,

/**
* The index is enabled for write operations only. It receives updates during mutations and reindexing,
* but is not used to answer queries.
*/
WRITE_ONLY_ENABLED,

/**
* The index is temporarily disabled and not in use
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,17 @@ private ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<O
break;
case REINDEX:
builder = graph.getBackend().buildEdgeScanJob();
builder.setFinishJob(indexId.getIndexJobFinisher(graph, SchemaAction.ENABLE_INDEX));
SchemaAction reindexFinishAction;
if (index instanceof JanusGraphIndex && ((JanusGraphIndex) index).isMixedIndex()) {
boolean hasWriteOnlyField = keySubset.stream()
.map(k -> ((JanusGraphIndex) index).getIndexStatus(k))
.anyMatch(s -> s == SchemaStatus.WRITE_ONLY_ENABLED);
reindexFinishAction = hasWriteOnlyField ? SchemaAction.ENABLE_WRITE_ONLY : SchemaAction.ENABLE_INDEX;
} else {
reindexFinishAction = schemaVertex.getStatus() == SchemaStatus.WRITE_ONLY_ENABLED
? SchemaAction.ENABLE_WRITE_ONLY : SchemaAction.ENABLE_INDEX;
}
builder.setFinishJob(indexId.getIndexJobFinisher(graph, reindexFinishAction));
builder.setJobId(indexId);
builder.setNumProcessingThreads(numOfThreads);
builder.setJob(VertexJobConverter.convert(graph, new IndexRepairJob(indexId.indexName, indexId.relationTypeName), vertexOnly));
Expand All @@ -1016,6 +1026,12 @@ private ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List<O
if (!keySubset.isEmpty()) updatedTypes.addAll(dependentTypes);
future = new EmptyScanJobFuture();
break;
case ENABLE_WRITE_ONLY:
setStatus(schemaVertex, SchemaStatus.WRITE_ONLY_ENABLED, keySubset);
updatedTypes.add(schemaVertex);
if (!keySubset.isEmpty()) updatedTypes.addAll(dependentTypes);
future = new EmptyScanJobFuture();
break;
case DISABLE_INDEX:
setStatus(schemaVertex, SchemaStatus.DISABLED, keySubset);
updatedTypes.add(schemaVertex);
Expand Down
Loading