CLI: Rewrite ForEachAsync to use threadpool, add timeout#40675
CLI: Rewrite ForEachAsync to use threadpool, add timeout#40675dkbennett wants to merge 9 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR rewrites the WSLC ForEachAsync helper to use the Windows thread pool with bounded concurrency and cooperative cancellation, then updates container stats collection and unit coverage to use the new API.
Changes:
- Replaces
std::asyncbatch execution with a reusable thread-pool worker implementation. - Adds timeout/cancel-drain parameters and updates container stats to pass a cancellation handle.
- Expands unit tests for pool sizing, dispatching beyond the initial pool, timeout, and cancellation behavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
src/windows/wslc/core/AsyncExecution.h |
Reimplements ForEachAsync with thread-pool workers, cancellation, and timeout handling. |
src/windows/wslc/tasks/ContainerTasks.cpp |
Updates stats collection to the new ForEachAsync signature and timeout parameters. |
test/windows/wslc/WSLCCLIExecutionUnitTests.cpp |
Updates existing tests and adds coverage for new pool and timeout behaviors. |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
OneBlue
left a comment
There was a problem hiding this comment.
I think a generic threadpool that we can use across the codebase has a lot of value, thank you for doing this !
I think we need to design it a bit more defensively though (runway threads will unfortunately cause us a lot of pain, so we should avoid "timing out" on cancellation)
I also recommend checking out src\windows\common\WslCoreMessageQueue.h, which uses a threadpool as well. I think a good target would be to move that class to a generic threadpool, something like:
template <TResult>
struct WorkItem
{
TResult result;
std::exception_ptr error;
void Wait();
void Cancel();
};
class ThreadPool
{
ThreadPool(min-threads, max-threads);
template <TFunction>
WorkItem SubmitWork(Function &&Work);
}
Should cover both usecases
| void CancelAndDrainInFlight() noexcept | ||
| { | ||
| context->cancelEvent.SetEvent(); | ||
| ::WaitForMultipleObjects(static_cast<DWORD>(doneHandles.size()), doneHandles.data(), TRUE, cancelDrainMs); |
There was a problem hiding this comment.
I don't recommend having a "cancel timeout" here. If this timeout ever hits, this will leak threads which will most likely to undefined behavior / crashes
| DWORD cancelDrainMs{}; | ||
|
|
||
| WorkerPool(size_t workerCount, TWork onWork, std::chrono::milliseconds timeout_, std::chrono::milliseconds cancelDrainTimeout) : | ||
| context(std::make_shared<TSharedContext>(std::move(onWork))), |
There was a problem hiding this comment.
Assuming that we don't leak threads, context doesn't need to be a pointer here (could just be a regular class field)
| std::vector<HANDLE> doneHandles; | ||
| std::shared_ptr<TSharedContext> context; | ||
| std::chrono::milliseconds timeout; | ||
| DWORD timeoutMs{}; |
| // On any exception from Launch, Drain, or the user callbacks (onSuccess/onError), | ||
| // signal cancellation and wait for in-flight workers before rethrowing. This guarantees | ||
| // no background thread pool callbacks outlive the ForEachAsync call. | ||
| auto cancelOnError = wil::scope_exit([&pool] { pool.CancelAndDrainInFlight(); }); |
There was a problem hiding this comment.
I recommend doing this in the WorkerPool's destuctor so there's a strong guarantee of no leaked threads
| context(std::make_shared<TSharedContext>(std::move(onWork))), | ||
| timeout(timeout_), | ||
| timeoutMs(timeout_ == std::chrono::milliseconds::max() ? INFINITE : static_cast<DWORD>(timeout_.count())), | ||
| cancelDrainMs(static_cast<DWORD>(cancelDrainTimeout.count())) |
There was a problem hiding this comment.
Given that we have a Launch() method, I'd recommend only creating the workers there. That way this threadpool can dynamically resize itself as needed (which will allow us to reuse it in other places)
| { | ||
| onSuccess(*batchResult.result); | ||
| worker.workerResult.hasError = true; | ||
| worker.workerResult.error = wil::ResultException{wil::ResultFromCaughtException()}; |
There was a problem hiding this comment.
This catch should cover both catch() cases
| TItem item; | ||
| std::optional<TItem> item; | ||
| std::optional<TResult> result; | ||
| wil::ResultException error{S_OK}; |
There was a problem hiding this comment.
To simplify things, I would recommend storing the error as a std::exception_ptr (null if no error was thrown).
This will have the benefit of allowing us to rethrow non-wil exceptions easily
There was a problem hiding this comment.
(and we can get rid of hasError)
| // keeping the worker and its SharedContext alive for the full duration of the callback | ||
| // regardless of WorkerPool lifetime. Re-create the work item each launch so the | ||
| // context pointer is fresh. | ||
| auto* ctx = new std::shared_ptr<TSharedWorker>(worker); |
There was a problem hiding this comment.
Could we merge SharedContext and SharedWorker into one structure that the threadpool owns, and pass a pointer to that structure directly as the context ? This would simplify things a lot, and get rid of the shared_ptr pointer.
If we store them as an std::list, the pointers never get invalidated
| NON_MOVABLE(SharedContext); | ||
|
|
||
| TWork onWork; | ||
| wil::unique_event cancelEvent; |
There was a problem hiding this comment.
I don't think this cancel event should be in the threadpool structure, since the threadpool logic doesn't actually look at it.
Callers can capture a cancel event in their work callback in they need to
| using TSharedContext = SharedContext<TWork>; | ||
|
|
||
| std::vector<std::shared_ptr<TSharedWorker>> workers; | ||
| std::vector<HANDLE> doneHandles; |
There was a problem hiding this comment.
If we remove the "timeout on cancel" logic, we can get rid of those
|
Converting back to draft to do some rework per @OneBlue comments. There's not huge urgency here so can take a bit of time and ensure it is the best it can be. |
Summary of the Pull Request
This is an improvement of the ForEachAsync generic method to use the windows thread pool and keep the pool full of workers whenever one completes instead of waiting for every worker to be complete before starting another batch. This also adds a timeout to the method so workers do not execute endlessly and does some refactoring for easier debugging.
PR Checklist
Detailed Description of the Pull Request / Additional comments
Validation Steps Performed