From 22f4a6df3774e69fb49b2303e373ad2e56acba99 Mon Sep 17 00:00:00 2001 From: David Bennett Date: Fri, 29 May 2026 15:58:43 -0700 Subject: [PATCH 1/9] Rewrite ForEachAsync to use threadpool, add timeout --- src/windows/wslc/core/AsyncExecution.h | 241 ++++++++++++++---- src/windows/wslc/tasks/ContainerTasks.cpp | 21 +- .../wslc/WSLCCLIExecutionUnitTests.cpp | 172 ++++++++++++- 3 files changed, 372 insertions(+), 62 deletions(-) diff --git a/src/windows/wslc/core/AsyncExecution.h b/src/windows/wslc/core/AsyncExecution.h index d51c7337d..b8fe9a4c4 100644 --- a/src/windows/wslc/core/AsyncExecution.h +++ b/src/windows/wslc/core/AsyncExecution.h @@ -9,91 +9,228 @@ Module Name: Abstract: Provides ForEachAsync, a generic helper for executing a work callback - over a collection concurrently in bounded batches using std::async. + over a collection concurrently using the Windows thread pool with bounded + concurrency and cooperative cancellation. --*/ #pragma once #include -#include +#include +#include #include #include #include +#include #include namespace wsl::windows::wslc { -// Invokes onWork for each element in items concurrently, in batches of batchSize. -// Results are delivered serially to onSuccess. Errors are delivered serially to onError. -// -// This keeps wall time proportional to ceil(N / batchSize) rather than N for operations -// that have inherent per-item latency (e.g. network or IPC calls). -// -// Note: worker threads have no guaranteed per-thread initialization (e.g. COM). Callers -// whose onWork requires per-thread setup (such as CoInitializeEx) are responsible for -// performing it at the start of the onWork lambda. -// -// TWork : TItem -> TResult (called concurrently) -// TSuccess: TResult -> void (called serially) -// TError : (TItem, wil::ResultException) -> void (called serially) -template -void ForEachAsync(const std::vector& items, TWork onWork, TSuccess onSuccess, TError onError, size_t batchSize = 10) -{ - WI_ASSERT(batchSize > 0); - THROW_HR_IF(E_INVALIDARG, batchSize == 0); +namespace detail { - using TResult = decltype(onWork(std::declval())); - - struct BatchResult + template + struct WorkerResult { - explicit BatchResult(TItem capturedItem) : item(std::move(capturedItem)) - { - } - TItem item; std::optional result; wil::ResultException error{S_OK}; bool hasError{false}; }; - for (size_t batchStart = 0; batchStart < items.size(); batchStart += batchSize) + // Holds all state for one thread pool worker. Owned via shared_ptr so the memory + // remains valid if ForEachAsync unwinds while a work item is still running. + template + struct SharedWorker + { + WorkerResult workerResult; + TWork* onWork{nullptr}; + HANDLE cancelHandle{nullptr}; + wil::unique_event done; + wil::unique_threadpool_work work; + }; + + // Manages a fixed pool of SharedWorkers and a shared cancellation event. + template + struct WorkerPool { - const size_t batchEnd = std::min(batchStart + batchSize, items.size()); + NON_COPYABLE(WorkerPool); + NON_MOVABLE(WorkerPool); + + using TResult = decltype(std::declval()(std::declval(), std::declval())); + using TSharedWorker = SharedWorker; + + std::vector> workers; + std::vector doneHandles; + wil::unique_event cancelEvent; + std::chrono::milliseconds timeout; + DWORD cancelDrainMs{}; + + WorkerPool(size_t poolSize, TWork& onWork, std::chrono::milliseconds timeout_, std::chrono::milliseconds cancelDrainTimeout) : + timeout(timeout_), cancelDrainMs(static_cast(cancelDrainTimeout.count())) + { + cancelEvent.create(wil::EventOptions::ManualReset); + + workers.reserve(poolSize); + doneHandles.reserve(poolSize); + + for (size_t i = 0; i < poolSize; ++i) + { + auto worker = std::make_shared(); + worker->done.create(wil::EventOptions::ManualReset); + worker->onWork = &onWork; + worker->cancelHandle = cancelEvent.get(); + + // Work item is created once per worker and reused for each dispatched item. + worker->work.reset(::CreateThreadpoolWork(ThreadPoolCallback, worker.get(), nullptr)); + THROW_LAST_ERROR_IF(!worker->work); - std::vector> futures; - futures.reserve(batchEnd - batchStart); + doneHandles.push_back(worker->done.get()); + workers.push_back(std::move(worker)); + } + } + + void Launch(size_t workerIndex, const TItem& item) + { + auto& worker = workers[workerIndex]; + worker->workerResult = WorkerResult{}; + worker->workerResult.item = item; + worker->done.ResetEvent(); + ::SubmitThreadpoolWork(worker->work.get()); + } + + void Drain(size_t workerIndex, TSuccess& onSuccess, TError& onError) + { + auto& worker = workers[workerIndex]; + + // Ensure the callback has fully returned before reading results. + ::WaitForThreadpoolWorkCallbacks(worker->work.get(), FALSE); + if (worker->workerResult.hasError) + { + onError(worker->workerResult.item, worker->workerResult.error); + } + else if (worker->workerResult.result.has_value()) + { + onSuccess(*worker->workerResult.result); + } + } - for (size_t i = batchStart; i < batchEnd; ++i) + // Signals cancellation, waits up to cancelDrainMs for workers to exit, then throws ERROR_TIMEOUT. + // Workers that do not exit within cancelDrainMs are abandoned - they retain shared_ptr ownership + // of their state. onWork implementations must check the cancel event at natural checkpoints and + // exit promptly. + // + // Note: TerminateThread() is not used - it skips C++ destructors, leaves user-mode locks + // permanently held (causing deadlocks), and corrupts COM apartment state. + [[noreturn]] void CancelAndThrow(size_t remainingItems) { - const auto& item = items[i]; - futures.push_back(std::async(std::launch::async, [&onWork, item]() -> BatchResult { - BatchResult result{item}; - try - { - result.result = onWork(item); - } - catch (const wil::ResultException& ex) - { - result.hasError = true; - result.error = ex; - } - return result; - })); + cancelEvent.SetEvent(); + + ::WaitForMultipleObjects(static_cast(doneHandles.size()), doneHandles.data(), TRUE, cancelDrainMs); + + THROW_HR_MSG( + HRESULT_FROM_WIN32(ERROR_TIMEOUT), + "ForEachAsync: worker exceeded timeout of %lld ms (%zu items remaining).", + static_cast(timeout.count()), + remainingItems); } - for (auto& future : futures) + // Thread pool callback - invoked on a pool thread for each submitted work item. + static void CALLBACK ThreadPoolCallback(PTP_CALLBACK_INSTANCE, void* context, PTP_WORK) noexcept { - auto batchResult = future.get(); + auto& worker = *static_cast(context); - if (batchResult.hasError) + try { - onError(batchResult.item, batchResult.error); + worker.workerResult.result = (*worker.onWork)(worker.workerResult.item, worker.cancelHandle); } - else if (batchResult.result.has_value()) + catch (const wil::ResultException& ex) { - onSuccess(*batchResult.result); + worker.workerResult.hasError = true; + worker.workerResult.error = ex; } + + worker.done.SetEvent(); + } + }; + +} // namespace detail + +// Invokes onWork for each element in items concurrently using the Windows thread pool, +// with concurrency bounded to poolSize. Results are delivered serially to onSuccess. +// Errors are delivered serially to onError. +// +// onWork receives a HANDLE to a cancellation event and should check it at natural +// checkpoints using WaitForSingleObject(cancel, 0), returning early if it is set. +// On timeout, the event is signalled and ForEachAsync waits up to cancelDrainTimeout +// for workers to exit before throwing HRESULT_FROM_WIN32(ERROR_TIMEOUT). +// +// poolSize must not exceed MAXIMUM_WAIT_OBJECTS (64). +// +// Note: thread pool threads have no guaranteed per-thread initialization. Callers +// whose onWork requires per-thread setup (e.g. CoInitializeEx) must perform it at +// the start of the onWork lambda. +// +// TWork : (TItem, HANDLE cancelEvent) -> TResult (called concurrently) +// TSuccess: TResult -> void (called serially) +// TError : (TItem, wil::ResultException) -> void (called serially) +template +void ForEachAsync( + const std::vector& items, + TWork onWork, + TSuccess onSuccess, + TError onError, + size_t poolSize = 10, + std::chrono::milliseconds timeout = std::chrono::milliseconds::max(), + std::chrono::milliseconds cancelDrainTimeout = std::chrono::seconds(5)) +{ + THROW_HR_IF(E_INVALIDARG, poolSize == 0); + THROW_HR_IF(E_INVALIDARG, poolSize > MAXIMUM_WAIT_OBJECTS); + if (items.empty()) + { + return; + } + + const DWORD timeoutMs = (timeout == std::chrono::milliseconds::max()) ? INFINITE : static_cast(timeout.count()); + const size_t workerCount = std::min(poolSize, items.size()); + + detail::WorkerPool pool{workerCount, onWork, timeout, cancelDrainTimeout}; + + // Fill the pool - submit one item per worker to saturate all workers immediately. + size_t nextItem = 0; + for (; nextItem < workerCount; ++nextItem) + { + pool.Launch(nextItem, items[nextItem]); + } + + // Keep the pool full - as each worker completes, drain its result and immediately + // assign it the next pending item. WaitForMultipleObjects(FALSE) wakes on the first + // completion, so no worker idles while work remains. + while (nextItem < items.size()) + { + const DWORD waitResult = ::WaitForMultipleObjects(static_cast(workerCount), pool.doneHandles.data(), FALSE, timeoutMs); + + if (waitResult == WAIT_TIMEOUT) + { + pool.CancelAndThrow(items.size() - nextItem); } + + THROW_LAST_ERROR_IF(waitResult == WAIT_FAILED); + const size_t workerIndex = waitResult - WAIT_OBJECT_0; + pool.Drain(workerIndex, onSuccess, onError); + pool.Launch(workerIndex, items[nextItem++]); + } + + // Wait for all in-flight workers to finish and collect their final results. + const DWORD finalWait = ::WaitForMultipleObjects(static_cast(workerCount), pool.doneHandles.data(), TRUE, timeoutMs); + if (finalWait == WAIT_TIMEOUT) + { + pool.CancelAndThrow(0); + } + + THROW_LAST_ERROR_IF(finalWait == WAIT_FAILED); + for (size_t i = 0; i < workerCount; ++i) + { + pool.Drain(i, onSuccess, onError); } } diff --git a/src/windows/wslc/tasks/ContainerTasks.cpp b/src/windows/wslc/tasks/ContainerTasks.cpp index 0570108c9..f22e3fc56 100644 --- a/src/windows/wslc/tasks/ContainerTasks.cpp +++ b/src/windows/wslc/tasks/ContainerTasks.cpp @@ -566,14 +566,21 @@ void ShowContainerStats(CLIExecutionContext& context) } } - // Fetch stats for all containers concurrently in batches. The Docker engine blocks for ~1s + // Fetch stats for all containers concurrently. The Docker engine blocks for ~1s // per request to collect a valid precpu_stats sample, so issuing requests in parallel keeps // wall time proportional to ceil(N / batchSize) rather than N. nlohmann::json statsJson = nlohmann::json::array(); wsl::windows::wslc::ForEachAsync( containers, // Work to be done for each container ID on a separate thread. - [&session](const std::wstring& containerId) { + // cancelHandle is signalled if the overall operation times out, check it before + // the blocking Stats call so we exit cooperatively without waiting a full ~1s sample. + [&session, userSpecifiedContainers](const std::wstring& containerId, HANDLE cancelHandle) { + if (::WaitForSingleObject(cancelHandle, 0) == WAIT_OBJECT_0) + { + THROW_HR(HRESULT_FROM_WIN32(ERROR_CANCELLED)); + } + // ContainerService::Stats makes COM calls, so we must ensure COM is initialized on this thread. auto comCleanup = wil::CoInitializeEx(COINIT_MULTITHREADED); return ComputeContainerStatsJson(ContainerService::Stats(session, WideToMultiByte(containerId))); @@ -582,6 +589,12 @@ void ShowContainerStats(CLIExecutionContext& context) [&](const nlohmann::json& entry) { statsJson.push_back(entry); }, // On Error [&](const std::wstring& containerId, wil::ResultException error) { + if (error.GetErrorCode() == HRESULT_FROM_WIN32(ERROR_CANCELLED)) + { + // Cancellation due to timeout. Let ForEachAsync surface ERROR_TIMEOUT to the caller. + return; + } + if (!userSpecifiedContainers) { switch (error.GetErrorCode()) @@ -599,7 +612,9 @@ void ShowContainerStats(CLIExecutionContext& context) LOG_HR_MSG(error.GetErrorCode(), "Failed to get stats for container %ws", containerId.c_str()); throw error; }, - 10 // Batch Size - chosen to be around typical expected container use while protecting against extreme cases. + 10, // Thread pool size - typical expected container use while protecting against extreme cases. + std::chrono::seconds(30), // Timeout - Docker stats blocks ~1s per sample; 30s gives ample headroom on a taxed system. + std::chrono::seconds(5) // Cancel drain - grace period for workers to observe the cancel event and exit cleanly. ); FormatType format = FormatType::Table; // Default is table diff --git a/test/windows/wslc/WSLCCLIExecutionUnitTests.cpp b/test/windows/wslc/WSLCCLIExecutionUnitTests.cpp index 14c231478..e15b6e78c 100644 --- a/test/windows/wslc/WSLCCLIExecutionUnitTests.cpp +++ b/test/windows/wslc/WSLCCLIExecutionUnitTests.cpp @@ -420,7 +420,7 @@ class ForEachAsyncUnitTests ForEachAsync( items, - [](int item) { return item * 2; }, + [](int item, HANDLE /*cancelHandle*/) { return item * 2; }, [&](int result) { results.push_back(result); }, [](int /*item*/, wil::ResultException /*error*/) { VERIFY_FAIL(L"Unexpected error"); }); @@ -439,7 +439,7 @@ class ForEachAsyncUnitTests ForEachAsync( items, - [](int item) -> int { + [](int item, HANDLE /*cancelHandle*/) -> int { if (item == 2) { THROW_HR(E_FAIL); @@ -454,6 +454,20 @@ class ForEachAsyncUnitTests VERIFY_ARE_EQUAL(2u, succeededItems.size()); } + TEST_METHOD(ForEachAsync_AllItemsFailInvokesErrorForEach) + { + const std::vector items = {1, 2, 3, 4, 5}; + std::atomic errorCount{0}; + + ForEachAsync( + items, + [](int /*item*/, HANDLE /*cancelHandle*/) -> int { THROW_HR(E_FAIL); }, + [](int /*result*/) { VERIFY_FAIL(L"Unexpected success callback"); }, + [&](int /*item*/, wil::ResultException /*error*/) { ++errorCount; }); + + VERIFY_ARE_EQUAL(static_cast(items.size()), errorCount.load()); + } + TEST_METHOD(ForEachAsync_EmptyInputProducesNoCallbacks) { const std::vector items; @@ -462,7 +476,7 @@ class ForEachAsyncUnitTests ForEachAsync( items, - [](int item) { return item; }, + [](int item, HANDLE /*cancelHandle*/) { return item; }, [&](int /*result*/) { successCalled = true; }, [&](int /*item*/, wil::ResultException /*error*/) { errorCalled = true; }); @@ -470,17 +484,17 @@ class ForEachAsyncUnitTests VERIFY_IS_FALSE(errorCalled); } - TEST_METHOD(ForEachAsync_BatchSizeOfOneProcessesAllItems) + TEST_METHOD(ForEachAsync_PoolSizeOfOneProcessesAllItems) { const std::vector items = {10, 20, 30, 40, 50}; std::vector results; ForEachAsync( items, - [](int item) { return item; }, + [](int item, HANDLE /*cancelHandle*/) { return item; }, [&](int result) { results.push_back(result); }, [](int /*item*/, wil::ResultException /*error*/) { VERIFY_FAIL(L"Unexpected error"); }, - /*batchSize=*/1); + /*poolSize=*/1); VERIFY_ARE_EQUAL(items.size(), results.size()); for (int item : items) @@ -489,6 +503,92 @@ class ForEachAsyncUnitTests } } + TEST_METHOD(ForEachAsync_PoolSizeLargerThanItemCountIsHandled) + { + // When poolSize > items.size(), the pool is clamped to items.size(). + // All items must still be processed without error. + const std::vector items = {1, 2, 3}; + std::vector results; + + ForEachAsync( + items, + [](int item, HANDLE /*cancelHandle*/) { return item; }, + [&](int result) { results.push_back(result); }, + [](int /*item*/, wil::ResultException /*error*/) { VERIFY_FAIL(L"Unexpected error"); }, + /*poolSize=*/64); + + VERIFY_ARE_EQUAL(items.size(), results.size()); + for (int item : items) + { + VERIFY_IS_TRUE(std::find(results.begin(), results.end(), item) != results.end()); + } + } + + TEST_METHOD(ForEachAsync_ItemsExceedingPoolSizeAreAllProcessed) + { + // Core regression for the optimization: items beyond poolSize must be dispatched + // once earlier workers complete - not silently dropped. + const std::vector items = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + std::vector results; + + ForEachAsync( + items, + [](int item, HANDLE /*cancelHandle*/) { return item; }, + [&](int result) { results.push_back(result); }, + [](int /*item*/, wil::ResultException /*error*/) { VERIFY_FAIL(L"Unexpected error"); }, + /*poolSize=*/3); + + VERIFY_ARE_EQUAL(items.size(), results.size()); + for (int item : items) + { + VERIFY_IS_TRUE(std::find(results.begin(), results.end(), item) != results.end()); + } + } + + TEST_METHOD(ForEachAsync_SuccessAndErrorCallbacksAreInvokedSerially) + { + // onSuccess and onError are documented as serial. Verify this using a non-atomic + // depth counter - concurrent calls would race and produce depth > 1. + const std::vector items = {1, 2, 3, 4, 5, 6, 7, 8}; + int callbackDepth = 0; + + ForEachAsync( + items, + [](int item, HANDLE /*cancelHandle*/) { return item; }, + [&](int /*result*/) { + ++callbackDepth; + VERIFY_ARE_EQUAL(1, callbackDepth); + --callbackDepth; + }, + [](int /*item*/, wil::ResultException /*error*/) { VERIFY_FAIL(L"Unexpected error"); }, + /*poolSize=*/8); + } + + TEST_METHOD(ForEachAsync_InvalidPoolSizeThrows) + { + const std::vector items = {1}; + + VERIFY_THROWS_SPECIFIC( + ForEachAsync( + items, + [](int item, HANDLE /*cancelHandle*/) { return item; }, + [](int /*result*/) {}, + [](int /*item*/, wil::ResultException /*error*/) {}, + /*poolSize=*/0), + wil::ResultException, + [](const wil::ResultException& ex) { return ex.GetErrorCode() == E_INVALIDARG; }); + + VERIFY_THROWS_SPECIFIC( + ForEachAsync( + items, + [](int item, HANDLE /*cancelHandle*/) { return item; }, + [](int /*result*/) {}, + [](int /*item*/, wil::ResultException /*error*/) {}, + /*poolSize=*/MAXIMUM_WAIT_OBJECTS + 1), + wil::ResultException, + [](const wil::ResultException& ex) { return ex.GetErrorCode() == E_INVALIDARG; }); + } + TEST_METHOD(ForEachAsync_ErrorInOnErrorPropagatesThrow) { const std::vector items = {1}; @@ -496,12 +596,70 @@ class ForEachAsyncUnitTests VERIFY_THROWS_SPECIFIC( ForEachAsync( items, - [](int /*item*/) -> int { THROW_HR(E_ACCESSDENIED); }, + [](int /*item*/, HANDLE /*cancelHandle*/) -> int { THROW_HR(E_ACCESSDENIED); }, [](int /*result*/) {}, [](int /*item*/, wil::ResultException error) { throw error; }), wil::ResultException, [](const wil::ResultException& ex) { return ex.GetErrorCode() == E_ACCESSDENIED; }); } + + TEST_METHOD(ForEachAsync_TimeoutThrowsErrorTimeout) + { + // Workers block cooperatively on the cancel handle until the timeout fires. + // The drain is near-instant since workers exit as soon as the handle is signalled. + const std::vector items = {1, 2, 3}; + + VERIFY_THROWS_SPECIFIC( + ForEachAsync( + items, + [](int /*item*/, HANDLE cancelHandle) -> int { + WaitForSingleObject(cancelHandle, INFINITE); + THROW_HR(HRESULT_FROM_WIN32(ERROR_CANCELLED)); + }, + [](int /*result*/) { VERIFY_FAIL(L"Unexpected success callback"); }, + [](int /*item*/, wil::ResultException error) { + // Suppress expected cancellations - ForEachAsync surfaces ERROR_TIMEOUT. + if (error.GetErrorCode() != HRESULT_FROM_WIN32(ERROR_CANCELLED)) + { + throw error; + } + }, + /*poolSize=*/10, + /*timeout=*/std::chrono::milliseconds(500), + /*cancelDrainTimeout=*/std::chrono::milliseconds(500)), + wil::ResultException, + [](const wil::ResultException& ex) { return ex.GetErrorCode() == HRESULT_FROM_WIN32(ERROR_TIMEOUT); }); + } + + TEST_METHOD(ForEachAsync_CancelHandleStopsWorkCooperatively) + { + // Verify that a timeout produces ERROR_TIMEOUT and that workers which observe the + // cancel handle exit cleanly within the drain window. The exact number of workers + // dispatched before the timeout is not asserted - it is scheduler-dependent and + // would be a source of flakiness under load. + const std::vector items = {1, 2, 3, 4, 5}; + + VERIFY_THROWS_SPECIFIC( + ForEachAsync( + items, + [](int /*item*/, HANDLE cancelHandle) -> int { + WaitForSingleObject(cancelHandle, INFINITE); + THROW_HR(HRESULT_FROM_WIN32(ERROR_CANCELLED)); + }, + [](int /*result*/) { VERIFY_FAIL(L"Unexpected success callback"); }, + [](int /*item*/, wil::ResultException error) { + // Suppress expected cancellations - ForEachAsync surfaces ERROR_TIMEOUT. + if (error.GetErrorCode() != HRESULT_FROM_WIN32(ERROR_CANCELLED)) + { + throw error; + } + }, + /*poolSize=*/2, + /*timeout=*/std::chrono::milliseconds(500), + /*cancelDrainTimeout=*/std::chrono::milliseconds(500)), + wil::ResultException, + [](const wil::ResultException& ex) { return ex.GetErrorCode() == HRESULT_FROM_WIN32(ERROR_TIMEOUT); }); + } }; } // namespace WSLCCLIExecutionUnitTests From 7133a5ad8ba72ed0bde1ec745aee48dd5302d03b Mon Sep 17 00:00:00 2001 From: David Bennett Date: Fri, 29 May 2026 16:46:24 -0700 Subject: [PATCH 2/9] Fix shared ptr ownership, address teardown issues --- src/windows/wslc/core/AsyncExecution.h | 91 ++++++++++++++++++-------- 1 file changed, 63 insertions(+), 28 deletions(-) diff --git a/src/windows/wslc/core/AsyncExecution.h b/src/windows/wslc/core/AsyncExecution.h index b8fe9a4c4..b07e5e6f9 100644 --- a/src/windows/wslc/core/AsyncExecution.h +++ b/src/windows/wslc/core/AsyncExecution.h @@ -37,19 +37,37 @@ namespace detail { bool hasError{false}; }; - // Holds all state for one thread pool worker. Owned via shared_ptr so the memory - // remains valid if ForEachAsync unwinds while a work item is still running. - template + // SharedContext holds state that must remain valid for the full lifetime of any running + // callback, including after WorkerPool is destroyed on the timeout path. Owned via + // shared_ptr and referenced by every SharedWorker. + template + struct SharedContext + { + NON_COPYABLE(SharedContext); + NON_MOVABLE(SharedContext); + + TWork onWork; + wil::unique_event cancelEvent; + + explicit SharedContext(TWork onWork_) : onWork(std::move(onWork_)) + { + cancelEvent.create(wil::EventOptions::ManualReset); + } + }; + + // Holds per-worker state. Each Launch heap-allocates a shared_ptr as the + // thread pool callback context, giving the callback shared ownership and ensuring this + // memory is not freed while a callback is still running. + template struct SharedWorker { WorkerResult workerResult; - TWork* onWork{nullptr}; - HANDLE cancelHandle{nullptr}; + std::shared_ptr> context; wil::unique_event done; wil::unique_threadpool_work work; }; - // Manages a fixed pool of SharedWorkers and a shared cancellation event. + // Manages a fixed pool of SharedWorkers. template struct WorkerPool { @@ -57,32 +75,28 @@ namespace detail { NON_MOVABLE(WorkerPool); using TResult = decltype(std::declval()(std::declval(), std::declval())); - using TSharedWorker = SharedWorker; + using TSharedWorker = SharedWorker; + using TSharedContext = SharedContext; std::vector> workers; std::vector doneHandles; - wil::unique_event cancelEvent; + std::shared_ptr context; std::chrono::milliseconds timeout; DWORD cancelDrainMs{}; - WorkerPool(size_t poolSize, TWork& onWork, std::chrono::milliseconds timeout_, std::chrono::milliseconds cancelDrainTimeout) : - timeout(timeout_), cancelDrainMs(static_cast(cancelDrainTimeout.count())) + WorkerPool(size_t workerCount, TWork onWork, std::chrono::milliseconds timeout_, std::chrono::milliseconds cancelDrainTimeout) : + context(std::make_shared(std::move(onWork))), + timeout(timeout_), + cancelDrainMs(static_cast(cancelDrainTimeout.count())) { - cancelEvent.create(wil::EventOptions::ManualReset); + workers.reserve(workerCount); + doneHandles.reserve(workerCount); - workers.reserve(poolSize); - doneHandles.reserve(poolSize); - - for (size_t i = 0; i < poolSize; ++i) + for (size_t i = 0; i < workerCount; ++i) { auto worker = std::make_shared(); worker->done.create(wil::EventOptions::ManualReset); - worker->onWork = &onWork; - worker->cancelHandle = cancelEvent.get(); - - // Work item is created once per worker and reused for each dispatched item. - worker->work.reset(::CreateThreadpoolWork(ThreadPoolCallback, worker.get(), nullptr)); - THROW_LAST_ERROR_IF(!worker->work); + worker->context = context; doneHandles.push_back(worker->done.get()); workers.push_back(std::move(worker)); @@ -95,6 +109,19 @@ namespace detail { worker->workerResult = WorkerResult{}; worker->workerResult.item = item; worker->done.ResetEvent(); + + // Heap-allocate a shared_ptr as the callback context. The callback takes ownership, + // keeping the worker and its SharedContext alive for the full duration of the callback + // regardless of WorkerPool lifetime. Re-create the work item each launch so the + // context pointer is fresh. + auto* ctx = new std::shared_ptr(worker); + worker->work.reset(::CreateThreadpoolWork(ThreadPoolCallback, ctx, nullptr)); + if (!worker->work) + { + delete ctx; + THROW_LAST_ERROR(); + } + ::SubmitThreadpoolWork(worker->work.get()); } @@ -104,6 +131,7 @@ namespace detail { // Ensure the callback has fully returned before reading results. ::WaitForThreadpoolWorkCallbacks(worker->work.get(), FALSE); + if (worker->workerResult.hasError) { onError(worker->workerResult.item, worker->workerResult.error); @@ -115,15 +143,15 @@ namespace detail { } // Signals cancellation, waits up to cancelDrainMs for workers to exit, then throws ERROR_TIMEOUT. - // Workers that do not exit within cancelDrainMs are abandoned - they retain shared_ptr ownership - // of their state. onWork implementations must check the cancel event at natural checkpoints and - // exit promptly. + // Workers that do not exit within cancelDrainMs are abandoned. Each running callback holds a + // shared_ptr to its SharedWorker and SharedContext, so neither is freed while the callback runs. + // onWork implementations must check the cancel event at natural checkpoints and exit promptly. // // Note: TerminateThread() is not used - it skips C++ destructors, leaves user-mode locks // permanently held (causing deadlocks), and corrupts COM apartment state. [[noreturn]] void CancelAndThrow(size_t remainingItems) { - cancelEvent.SetEvent(); + context->cancelEvent.SetEvent(); ::WaitForMultipleObjects(static_cast(doneHandles.size()), doneHandles.data(), TRUE, cancelDrainMs); @@ -135,13 +163,16 @@ namespace detail { } // Thread pool callback - invoked on a pool thread for each submitted work item. + // Takes ownership of the heap-allocated shared_ptr passed as context, + // ensuring the worker and its SharedContext remain alive for the duration of this call. static void CALLBACK ThreadPoolCallback(PTP_CALLBACK_INSTANCE, void* context, PTP_WORK) noexcept { - auto& worker = *static_cast(context); + const std::unique_ptr> owner(static_cast*>(context)); + auto& worker = **owner; try { - worker.workerResult.result = (*worker.onWork)(worker.workerResult.item, worker.cancelHandle); + worker.workerResult.result = worker.context->onWork(worker.workerResult.item, worker.context->cancelEvent.get()); } catch (const wil::ResultException& ex) { @@ -185,6 +216,7 @@ void ForEachAsync( { THROW_HR_IF(E_INVALIDARG, poolSize == 0); THROW_HR_IF(E_INVALIDARG, poolSize > MAXIMUM_WAIT_OBJECTS); + if (items.empty()) { return; @@ -193,7 +225,7 @@ void ForEachAsync( const DWORD timeoutMs = (timeout == std::chrono::milliseconds::max()) ? INFINITE : static_cast(timeout.count()); const size_t workerCount = std::min(poolSize, items.size()); - detail::WorkerPool pool{workerCount, onWork, timeout, cancelDrainTimeout}; + detail::WorkerPool pool{workerCount, std::move(onWork), timeout, cancelDrainTimeout}; // Fill the pool - submit one item per worker to saturate all workers immediately. size_t nextItem = 0; @@ -215,6 +247,7 @@ void ForEachAsync( } THROW_LAST_ERROR_IF(waitResult == WAIT_FAILED); + const size_t workerIndex = waitResult - WAIT_OBJECT_0; pool.Drain(workerIndex, onSuccess, onError); pool.Launch(workerIndex, items[nextItem++]); @@ -222,12 +255,14 @@ void ForEachAsync( // Wait for all in-flight workers to finish and collect their final results. const DWORD finalWait = ::WaitForMultipleObjects(static_cast(workerCount), pool.doneHandles.data(), TRUE, timeoutMs); + if (finalWait == WAIT_TIMEOUT) { pool.CancelAndThrow(0); } THROW_LAST_ERROR_IF(finalWait == WAIT_FAILED); + for (size_t i = 0; i < workerCount; ++i) { pool.Drain(i, onSuccess, onError); From 80061aac47a17e6f66860d5f64e5e76a25b29499 Mon Sep 17 00:00:00 2001 From: David Bennett Date: Fri, 29 May 2026 17:18:10 -0700 Subject: [PATCH 3/9] PR feedback adjustments --- src/windows/wslc/core/AsyncExecution.h | 14 ++++++++++---- src/windows/wslc/tasks/ContainerTasks.cpp | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/windows/wslc/core/AsyncExecution.h b/src/windows/wslc/core/AsyncExecution.h index b07e5e6f9..e07e518d4 100644 --- a/src/windows/wslc/core/AsyncExecution.h +++ b/src/windows/wslc/core/AsyncExecution.h @@ -31,6 +31,12 @@ namespace detail { template struct WorkerResult { + WorkerResult() = default; + + explicit WorkerResult(const TItem& item_) : item(item_) + { + } + TItem item; std::optional result; wil::ResultException error{S_OK}; @@ -82,11 +88,13 @@ namespace detail { std::vector doneHandles; std::shared_ptr context; std::chrono::milliseconds timeout; + DWORD timeoutMs{}; DWORD cancelDrainMs{}; WorkerPool(size_t workerCount, TWork onWork, std::chrono::milliseconds timeout_, std::chrono::milliseconds cancelDrainTimeout) : context(std::make_shared(std::move(onWork))), timeout(timeout_), + timeoutMs(timeout_ == std::chrono::milliseconds::max() ? INFINITE : static_cast(timeout_.count())), cancelDrainMs(static_cast(cancelDrainTimeout.count())) { workers.reserve(workerCount); @@ -222,7 +230,6 @@ void ForEachAsync( return; } - const DWORD timeoutMs = (timeout == std::chrono::milliseconds::max()) ? INFINITE : static_cast(timeout.count()); const size_t workerCount = std::min(poolSize, items.size()); detail::WorkerPool pool{workerCount, std::move(onWork), timeout, cancelDrainTimeout}; @@ -239,7 +246,7 @@ void ForEachAsync( // completion, so no worker idles while work remains. while (nextItem < items.size()) { - const DWORD waitResult = ::WaitForMultipleObjects(static_cast(workerCount), pool.doneHandles.data(), FALSE, timeoutMs); + const DWORD waitResult = ::WaitForMultipleObjects(static_cast(workerCount), pool.doneHandles.data(), FALSE, pool.timeoutMs); if (waitResult == WAIT_TIMEOUT) { @@ -253,8 +260,7 @@ void ForEachAsync( pool.Launch(workerIndex, items[nextItem++]); } - // Wait for all in-flight workers to finish and collect their final results. - const DWORD finalWait = ::WaitForMultipleObjects(static_cast(workerCount), pool.doneHandles.data(), TRUE, timeoutMs); + const DWORD finalWait = ::WaitForMultipleObjects(static_cast(workerCount), pool.doneHandles.data(), TRUE, pool.timeoutMs); if (finalWait == WAIT_TIMEOUT) { diff --git a/src/windows/wslc/tasks/ContainerTasks.cpp b/src/windows/wslc/tasks/ContainerTasks.cpp index f22e3fc56..9a437c0df 100644 --- a/src/windows/wslc/tasks/ContainerTasks.cpp +++ b/src/windows/wslc/tasks/ContainerTasks.cpp @@ -568,7 +568,7 @@ void ShowContainerStats(CLIExecutionContext& context) // Fetch stats for all containers concurrently. The Docker engine blocks for ~1s // per request to collect a valid precpu_stats sample, so issuing requests in parallel keeps - // wall time proportional to ceil(N / batchSize) rather than N. + // wall time proportional to ceil(N / poolSize) rather than N. nlohmann::json statsJson = nlohmann::json::array(); wsl::windows::wslc::ForEachAsync( containers, From 44fbcd122d76a8ba6d56ac5ad16c5675690603f0 Mon Sep 17 00:00:00 2001 From: David Bennett Date: Fri, 29 May 2026 17:42:56 -0700 Subject: [PATCH 4/9] PR feedback adjustments --- src/windows/wslc/core/AsyncExecution.h | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/windows/wslc/core/AsyncExecution.h b/src/windows/wslc/core/AsyncExecution.h index e07e518d4..004ab2709 100644 --- a/src/windows/wslc/core/AsyncExecution.h +++ b/src/windows/wslc/core/AsyncExecution.h @@ -187,6 +187,11 @@ namespace detail { worker.workerResult.hasError = true; worker.workerResult.error = ex; } + catch (...) + { + worker.workerResult.hasError = true; + worker.workerResult.error = wil::ResultException{wil::details::ResultFromCaughtException()}; + } worker.done.SetEvent(); } @@ -205,6 +210,10 @@ namespace detail { // // poolSize must not exceed MAXIMUM_WAIT_OBJECTS (64). // +// The timeout is a safety net against indefinite hangs, not a strict per-worker limit. +// A worker that hangs while other workers are still completing will be caught in the +// final wait at most one full timeout after all other work has finished. +// // Note: thread pool threads have no guaranteed per-thread initialization. Callers // whose onWork requires per-thread setup (e.g. CoInitializeEx) must perform it at // the start of the onWork lambda. From 2f093fc02390f30c72a7084430374e9dcc7cbbd9 Mon Sep 17 00:00:00 2001 From: David Bennett Date: Fri, 29 May 2026 17:46:15 -0700 Subject: [PATCH 5/9] Fix session capture to by value --- src/windows/wslc/tasks/ContainerTasks.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/windows/wslc/tasks/ContainerTasks.cpp b/src/windows/wslc/tasks/ContainerTasks.cpp index 9a437c0df..a4bbf2a23 100644 --- a/src/windows/wslc/tasks/ContainerTasks.cpp +++ b/src/windows/wslc/tasks/ContainerTasks.cpp @@ -575,7 +575,7 @@ void ShowContainerStats(CLIExecutionContext& context) // Work to be done for each container ID on a separate thread. // cancelHandle is signalled if the overall operation times out, check it before // the blocking Stats call so we exit cooperatively without waiting a full ~1s sample. - [&session, userSpecifiedContainers](const std::wstring& containerId, HANDLE cancelHandle) { + [session, userSpecifiedContainers](const std::wstring& containerId, HANDLE cancelHandle) { if (::WaitForSingleObject(cancelHandle, 0) == WAIT_OBJECT_0) { THROW_HR(HRESULT_FROM_WIN32(ERROR_CANCELLED)); From 0be6f83e3d1954f26360b1ddf142226da6cbd48d Mon Sep 17 00:00:00 2001 From: David Bennett Date: Fri, 29 May 2026 17:51:58 -0700 Subject: [PATCH 6/9] PR feedback and fix build break --- src/windows/wslc/core/AsyncExecution.h | 8 ++++---- src/windows/wslc/tasks/ContainerTasks.cpp | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/windows/wslc/core/AsyncExecution.h b/src/windows/wslc/core/AsyncExecution.h index 004ab2709..b3e0eb61c 100644 --- a/src/windows/wslc/core/AsyncExecution.h +++ b/src/windows/wslc/core/AsyncExecution.h @@ -37,7 +37,7 @@ namespace detail { { } - TItem item; + std::optional item; std::optional result; wil::ResultException error{S_OK}; bool hasError{false}; @@ -142,7 +142,7 @@ namespace detail { if (worker->workerResult.hasError) { - onError(worker->workerResult.item, worker->workerResult.error); + onError(*worker->workerResult.item, worker->workerResult.error); } else if (worker->workerResult.result.has_value()) { @@ -180,7 +180,7 @@ namespace detail { try { - worker.workerResult.result = worker.context->onWork(worker.workerResult.item, worker.context->cancelEvent.get()); + worker.workerResult.result = worker.context->onWork(*worker.workerResult.item, worker.context->cancelEvent.get()); } catch (const wil::ResultException& ex) { @@ -190,7 +190,7 @@ namespace detail { catch (...) { worker.workerResult.hasError = true; - worker.workerResult.error = wil::ResultException{wil::details::ResultFromCaughtException()}; + worker.workerResult.error = wil::ResultException{wil::ResultFromCaughtException()}; } worker.done.SetEvent(); diff --git a/src/windows/wslc/tasks/ContainerTasks.cpp b/src/windows/wslc/tasks/ContainerTasks.cpp index a4bbf2a23..b2786d634 100644 --- a/src/windows/wslc/tasks/ContainerTasks.cpp +++ b/src/windows/wslc/tasks/ContainerTasks.cpp @@ -575,7 +575,7 @@ void ShowContainerStats(CLIExecutionContext& context) // Work to be done for each container ID on a separate thread. // cancelHandle is signalled if the overall operation times out, check it before // the blocking Stats call so we exit cooperatively without waiting a full ~1s sample. - [session, userSpecifiedContainers](const std::wstring& containerId, HANDLE cancelHandle) { + [session, userSpecifiedContainers](const std::wstring& containerId, HANDLE cancelHandle) mutable { if (::WaitForSingleObject(cancelHandle, 0) == WAIT_OBJECT_0) { THROW_HR(HRESULT_FROM_WIN32(ERROR_CANCELLED)); From c578ac9b66355f5bd17cbbbe0cd99a9c3b821bc2 Mon Sep 17 00:00:00 2001 From: David Bennett Date: Fri, 29 May 2026 17:55:16 -0700 Subject: [PATCH 7/9] Add DWORD bounds checking --- src/windows/wslc/core/AsyncExecution.h | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/windows/wslc/core/AsyncExecution.h b/src/windows/wslc/core/AsyncExecution.h index b3e0eb61c..328b0f2b1 100644 --- a/src/windows/wslc/core/AsyncExecution.h +++ b/src/windows/wslc/core/AsyncExecution.h @@ -233,6 +233,14 @@ void ForEachAsync( { THROW_HR_IF(E_INVALIDARG, poolSize == 0); THROW_HR_IF(E_INVALIDARG, poolSize > MAXIMUM_WAIT_OBJECTS); + THROW_HR_IF(E_INVALIDARG, timeout.count() < 0); + THROW_HR_IF(E_INVALIDARG, cancelDrainTimeout.count() < 0); + + // INFINITE (0xFFFFFFFF) is reserved as the Win32 sentinel; values at or above it + // would either alias INFINITE or wrap, producing the opposite timeout behavior. + constexpr long long c_maxWaitMs = 0xFFFFFFFELL; // INFINITE - 1 + THROW_HR_IF(E_INVALIDARG, timeout != std::chrono::milliseconds::max() && timeout.count() > c_maxWaitMs); + THROW_HR_IF(E_INVALIDARG, cancelDrainTimeout.count() > c_maxWaitMs); if (items.empty()) { From 2efe78bf0e93d20a4add6430455b060ed6ecec21 Mon Sep 17 00:00:00 2001 From: David Bennett Date: Fri, 29 May 2026 22:33:10 -0700 Subject: [PATCH 8/9] Additional fixes --- src/windows/wslc/core/AsyncExecution.h | 40 ++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/src/windows/wslc/core/AsyncExecution.h b/src/windows/wslc/core/AsyncExecution.h index 328b0f2b1..5dca5f828 100644 --- a/src/windows/wslc/core/AsyncExecution.h +++ b/src/windows/wslc/core/AsyncExecution.h @@ -104,6 +104,12 @@ namespace detail { { auto worker = std::make_shared(); worker->done.create(wil::EventOptions::ManualReset); + + // Start signaled so that unstarted worker slots never block a + // WaitForMultipleObjects over the whole pool (e.g. in CancelAndDrainInFlight). + // Launch resets the event before submitting, ThreadPoolCallback sets it on exit. + worker->done.SetEvent(); + worker->context = context; doneHandles.push_back(worker->done.get()); @@ -114,9 +120,16 @@ namespace detail { void Launch(size_t workerIndex, const TItem& item) { auto& worker = workers[workerIndex]; - worker->workerResult = WorkerResult{}; - worker->workerResult.item = item; + + // Reset first so the slot appears busy. Re-signal on any failure so that + // CancelAndDrainInFlight does not block on a slot that was never submitted. worker->done.ResetEvent(); + auto signalOnFail = wil::scope_exit([&worker] { worker->done.SetEvent(); }); + + worker->workerResult.item.emplace(item); + worker->workerResult.result.reset(); + worker->workerResult.error = wil::ResultException{S_OK}; + worker->workerResult.hasError = false; // Heap-allocate a shared_ptr as the callback context. The callback takes ownership, // keeping the worker and its SharedContext alive for the full duration of the callback @@ -131,6 +144,7 @@ namespace detail { } ::SubmitThreadpoolWork(worker->work.get()); + signalOnFail.release(); } void Drain(size_t workerIndex, TSuccess& onSuccess, TError& onError) @@ -150,6 +164,15 @@ namespace detail { } } + // Signals cancellation and waits up to cancelDrainMs for all in-flight workers to exit. + // Unstarted worker slots have their done events pre-signaled so they never block. + // Called noexcept from the scope_exit error guard in ForEachAsync. + void CancelAndDrainInFlight() noexcept + { + context->cancelEvent.SetEvent(); + ::WaitForMultipleObjects(static_cast(doneHandles.size()), doneHandles.data(), TRUE, cancelDrainMs); + } + // Signals cancellation, waits up to cancelDrainMs for workers to exit, then throws ERROR_TIMEOUT. // Workers that do not exit within cancelDrainMs are abandoned. Each running callback holds a // shared_ptr to its SharedWorker and SharedContext, so neither is freed while the callback runs. @@ -180,7 +203,7 @@ namespace detail { try { - worker.workerResult.result = worker.context->onWork(*worker.workerResult.item, worker.context->cancelEvent.get()); + worker.workerResult.result.emplace(worker.context->onWork(*worker.workerResult.item, worker.context->cancelEvent.get())); } catch (const wil::ResultException& ex) { @@ -208,6 +231,10 @@ namespace detail { // On timeout, the event is signalled and ForEachAsync waits up to cancelDrainTimeout // for workers to exit before throwing HRESULT_FROM_WIN32(ERROR_TIMEOUT). // +// If onSuccess, onError, or Launch throw, the cancel event is signalled and ForEachAsync +// waits up to cancelDrainTimeout for any in-flight workers to exit before rethrowing. +// This guarantees no background thread pool callbacks outlive the ForEachAsync call. +// // poolSize must not exceed MAXIMUM_WAIT_OBJECTS (64). // // The timeout is a safety net against indefinite hangs, not a strict per-worker limit. @@ -251,6 +278,11 @@ void ForEachAsync( detail::WorkerPool pool{workerCount, std::move(onWork), timeout, cancelDrainTimeout}; + // On any exception from Launch, Drain, or the user callbacks (onSuccess/onError), + // signal cancellation and wait for in-flight workers before rethrowing. This guarantees + // no background thread pool callbacks outlive the ForEachAsync call. + auto cancelOnError = wil::scope_exit([&pool] { pool.CancelAndDrainInFlight(); }); + // Fill the pool - submit one item per worker to saturate all workers immediately. size_t nextItem = 0; for (; nextItem < workerCount; ++nextItem) @@ -290,6 +322,8 @@ void ForEachAsync( { pool.Drain(i, onSuccess, onError); } + + cancelOnError.release(); } } // namespace wsl::windows::wslc From 993c46deeb92b82f46673cd62b03b87505c1c9ba Mon Sep 17 00:00:00 2001 From: David Bennett Date: Mon, 1 Jun 2026 09:52:46 -0700 Subject: [PATCH 9/9] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- src/windows/wslc/core/AsyncExecution.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/windows/wslc/core/AsyncExecution.h b/src/windows/wslc/core/AsyncExecution.h index 5dca5f828..74c1b4223 100644 --- a/src/windows/wslc/core/AsyncExecution.h +++ b/src/windows/wslc/core/AsyncExecution.h @@ -233,7 +233,8 @@ namespace detail { // // If onSuccess, onError, or Launch throw, the cancel event is signalled and ForEachAsync // waits up to cancelDrainTimeout for any in-flight workers to exit before rethrowing. -// This guarantees no background thread pool callbacks outlive the ForEachAsync call. +// If workers do not exit in that window, callbacks may outlive this call; onWork +// and any state it captures must remain valid until those callbacks finish. // // poolSize must not exceed MAXIMUM_WAIT_OBJECTS (64). //