Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/unreleased/http-shard-handler-virtual-threads.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
title: HttpShardHandler now uses Java virtual threads for shard fan-out, replacing CompletableFuture callbacks; the now-equivalent ParallelHttpShardHandlerFactory is removed (switch to the default HttpShardHandlerFactory).
type: changed
authors:
- name: Mark Robert Miller
links: []
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,26 @@
import static org.apache.solr.request.SolrQueryRequest.disallowPartialResults;

import java.lang.invoke.MethodHandles;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import net.jcip.annotations.NotThreadSafe;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
Expand All @@ -47,6 +57,7 @@
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.CoreDescriptor;
Expand All @@ -55,6 +66,7 @@
import org.apache.solr.security.AllowListUrlChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/**
* Solr's default {@link ShardHandler} implementation; uses Jetty's async HTTP Client APIs for
Expand Down Expand Up @@ -91,19 +103,20 @@ public class HttpShardHandler extends ShardHandler {

private final HttpShardHandlerFactory httpShardHandlerFactory;

protected final ConcurrentMap<ShardResponse, CompletableFuture<LBSolrClient.Rsp>>
responseFutureMap;
protected final ConcurrentMap<ShardResponse, Future<?>> pending;
protected final BlockingQueue<ShardResponse> responses;
private final AtomicBoolean canceled = new AtomicBoolean(false);

private final Map<String, List<String>> shardToURLs;
protected LBAsyncSolrClient lbClient;
private final ExecutorService shardExecutor;

public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory) {
this.httpShardHandlerFactory = httpShardHandlerFactory;
this.lbClient = httpShardHandlerFactory.loadbalancer;
this.shardExecutor = httpShardHandlerFactory.shardExecutor;
this.responses = new LinkedBlockingQueue<>();
this.responseFutureMap = new ConcurrentHashMap<>();
this.pending = new ConcurrentHashMap<>();

// maps "localhost:8983|localhost:7574" to a shuffled
// List("http://localhost:8983","http://localhost:7574")
Expand Down Expand Up @@ -265,40 +278,108 @@ protected void makeShardRequest(
SimpleSolrResponse ssr,
ShardResponse srsp,
long startTimeNS) {
CompletableFuture<LBSolrClient.Rsp> future = this.lbClient.requestAsync(lbReq);
// Synchronize on canceled, so that we know precisely whether to add it to the responseFutureMap
// or not.
synchronized (canceled) {
if (canceled.get() && !future.isDone()) {
future.cancel(true);
return;
} else {
responseFutureMap.put(srsp, future);
// Capture submitter context now so the shard task (running on a virtual thread) sees the
// submitter's MDC + every registered InheritableThreadLocalProvider (SolrRequestInfo for PKI,
// OTel trace Context, etc.), matching what MDCAwareThreadPoolExecutor does for pool threads.
// Virtual threads aren't pooled so no restore is needed.
final Map<String, String> submitterMdc = MDC.getCopyOfContextMap();
final List<ExecutorUtil.InheritableThreadLocalProvider> providers =
ExecutorUtil.getThreadLocalProviders();
final List<AtomicReference<Object>> providerCtx;
if (providers.isEmpty()) {
providerCtx = List.of();
} else {
providerCtx = new ArrayList<>(providers.size());
for (ExecutorUtil.InheritableThreadLocalProvider p : providers) {
AtomicReference<Object> ref = new AtomicReference<>();
p.store(ref);
providerCtx.add(ref);
}
}
// Add the callback explicitly after adding the future to the map, because the callback relies
// on the map already having the future.
future.whenComplete(
(LBSolrClient.Rsp rsp, Throwable throwable) -> {
if (rsp != null) {
ssr.nl = rsp.getResponse();
srsp.setShardAddress(rsp.getServer());
} else if (throwable != null) {
srsp.setException(throwable);
if (throwable instanceof SolrException) {
srsp.setResponseCode(((SolrException) throwable).code());
}

// Build + dispatch the request on the virtual thread (parallel CPU work) and block on the
// returned CompletableFuture there. cancelAll() interrupts the virtual thread; the catch
// below translates that into a graceful jetty-future cancel, which is how Jetty wants to be
// aborted (vs. interrupting a synchronous send mid-flight).
Runnable shardTask =
() -> {
ExecutorUtil.setServerThreadFlag(Boolean.TRUE);
if (submitterMdc != null) {
MDC.setContextMap(submitterMdc);
}
for (int i = 0; i < providers.size(); i++) {
providers.get(i).set(providerCtx.get(i));
}
ssr.elapsedTime =
TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
// Synchronize on cancelled so this code and cancelAll() cannot happen at the same time
synchronized (canceled) {
// We don't want to add responses after the requests have been canceled
if (responseFutureMap.containsKey(srsp)) {
responses.add(HttpShardHandler.this.transformResponse(sreq, srsp, shard));
CompletableFuture<LBSolrClient.Rsp> jettyFuture = null;
try {
try {
// doPrivileged needed because the request setup reads "solr.*" system properties
// and otherwise fails under SecurityManager when invoked from a virtual thread.
@SuppressWarnings("removal")
CompletableFuture<LBSolrClient.Rsp> tmp =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why declare tmp instead of directly setting jettyFuture?

AccessController.doPrivileged(
(PrivilegedExceptionAction<CompletableFuture<LBSolrClient.Rsp>>)
Comment on lines +316 to +321
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh; sad, for something so trivial. Why is this not needed seemingly everywhere else?

Any way, we'll likely be removing such things soon.

() -> lbClient.requestAsync(lbReq));
jettyFuture = tmp;
LBSolrClient.Rsp rsp = jettyFuture.get();
ssr.nl = rsp.getResponse();
srsp.setShardAddress(rsp.getServer());
} catch (CancellationException ce) {
// jettyFuture was cancelled; leave srsp without an exception/response.
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
if (jettyFuture != null) {
jettyFuture.cancel(true);
}
} catch (ExecutionException ee) {
Throwable cause = ee.getCause() != null ? ee.getCause() : ee;
srsp.setException(cause);
if (cause instanceof SolrException se) {
srsp.setResponseCode(se.code());
}
} catch (PrivilegedActionException pae) {
Throwable cause = pae.getCause() != null ? pae.getCause() : pae;
srsp.setException(cause);
if (cause instanceof SolrException se) {
srsp.setResponseCode(se.code());
}
}
} finally {
ssr.elapsedTime =
TimeUnit.MILLISECONDS.convert(
System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
// Synchronize on canceled so this code and cancelAll() cannot happen at the same time
synchronized (canceled) {
// We don't want to add responses after the requests have been canceled
if (pending.containsKey(srsp)) {
responses.add(HttpShardHandler.this.transformResponse(sreq, srsp, shard));
}
}
for (int i = 0; i < providers.size(); i++) {
providers.get(i).clean(providerCtx.get(i));
}
MDC.clear();
Comment on lines +358 to +361
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as this is a virtual thread which aren't pool'ed, why do this cleanup?

}
});
};

// Submit cheaply; the coordinator thread races through the loop while CPU-heavy request
// setup happens in parallel on the virtual threads.
synchronized (canceled) {
if (canceled.get()) {
return;
}
try {
Future<?> vtFuture = shardExecutor.submit(shardTask);
pending.put(srsp, vtFuture);
} catch (RejectedExecutionException ree) {
recordShardSubmitError(
srsp,
new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
"Shard executor rejected request for shard: " + shard,
ree));
}
}
}

/** Subclasses could modify the request based on the shard */
Expand Down Expand Up @@ -349,7 +430,7 @@ private ShardResponse take(boolean bailOnError) {
.orElse(previousResponse);
}
} else {
responseFutureMap.remove(rsp);
pending.remove(rsp);

// add response to the response list... we do this after the take() and
// not after the completion of "call" so we know when the last response
Expand All @@ -375,7 +456,7 @@ private ShardResponse take(boolean bailOnError) {
}

protected boolean responsesPending() {
return !responseFutureMap.isEmpty() || !responses.isEmpty();
return !pending.isEmpty() || !responses.isEmpty();
}

@Override
Expand All @@ -394,13 +475,12 @@ public void cancelAll() {
// We don't want to queue this multiple times if we are already canceled
responses.add(CANCELLATION_NOTIFICATION);
}
// Cancel all outstanding requests
for (CompletableFuture<LBSolrClient.Rsp> future : responseFutureMap.values()) {
for (Future<?> future : pending.values()) {
if (!future.isDone()) {
future.cancel(true);
}
}
responseFutureMap.clear();
pending.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -84,6 +85,13 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory
// This executor is initialized in the init method
protected ExecutorService commExecutor;

/**
* Virtual-thread-per-task executor used for the per-shard scatter/gather fan-out in {@link
* HttpShardHandler}. Distinct from {@link #commExecutor}, which serves as Jetty's HTTP client
* worker executor.
*/
protected ExecutorService shardExecutor;

protected volatile HttpJettySolrClient defaultClient;
protected InstrumentedHttpListenerFactory httpListenerFactory;
protected LBAsyncSolrClient loadbalancer;
Expand Down Expand Up @@ -286,6 +294,10 @@ public void init(PluginInfo info) {
// collection as an optimization. see SOLR-11880 for more details
false);

this.shardExecutor =
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("httpShardVT-", 0).factory());

this.httpListenerFactory = new InstrumentedHttpListenerFactory(this.metricNameStrategy);
int connectionTimeout =
getParameter(
Expand Down Expand Up @@ -341,16 +353,24 @@ protected <T> T getParameter(
@Override
public void close() {
try {
if (loadbalancer != null) {
loadbalancer.close();
// Stop the per-shard virtual-thread executor first so in-flight tasks are interrupted before
// the load balancer / HTTP client they depend on are torn down.
if (shardExecutor != null) {
ExecutorUtil.shutdownNowAndAwaitTermination(shardExecutor);
}
} finally {
try {
if (defaultClient != null) {
IOUtils.closeQuietly(defaultClient);
if (loadbalancer != null) {
loadbalancer.close();
}
} finally {
ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
try {
if (defaultClient != null) {
IOUtils.closeQuietly(defaultClient);
}
} finally {
ExecutorUtil.shutdownAndAwaitTermination(commExecutor);
}
}
}
IOUtils.closeQuietly(asyncRequestsGauge);
Expand Down Expand Up @@ -459,6 +479,9 @@ public void initializeMetrics(SolrMetricsContext parentContext, Attributes attri
commExecutor =
solrMetricsContext.instrumentedExecutorService(
commExecutor, "solr.core.executor", "httpShardExecutor", SolrInfoBean.Category.QUERY);
shardExecutor =
solrMetricsContext.instrumentedExecutorService(
shardExecutor, "solr.core.executor", "httpShardVT", SolrInfoBean.Category.QUERY);
if (defaultClient != null) {
asyncRequestsGauge =
solrMetricsContext.observableLongGauge(
Expand Down
Loading
Loading