diff --git a/src/windows/wslc/core/AsyncExecution.h b/src/windows/wslc/core/AsyncExecution.h index d51c7337d..74c1b4223 100644 --- a/src/windows/wslc/core/AsyncExecution.h +++ b/src/windows/wslc/core/AsyncExecution.h @@ -9,92 +9,322 @@ Module Name: Abstract: Provides ForEachAsync, a generic helper for executing a work callback - over a collection concurrently in bounded batches using std::async. + over a collection concurrently using the Windows thread pool with bounded + concurrency and cooperative cancellation. --*/ #pragma once #include -#include +#include +#include #include #include #include +#include #include namespace wsl::windows::wslc { -// Invokes onWork for each element in items concurrently, in batches of batchSize. -// Results are delivered serially to onSuccess. Errors are delivered serially to onError. -// -// This keeps wall time proportional to ceil(N / batchSize) rather than N for operations -// that have inherent per-item latency (e.g. network or IPC calls). -// -// Note: worker threads have no guaranteed per-thread initialization (e.g. COM). Callers -// whose onWork requires per-thread setup (such as CoInitializeEx) are responsible for -// performing it at the start of the onWork lambda. -// -// TWork : TItem -> TResult (called concurrently) -// TSuccess: TResult -> void (called serially) -// TError : (TItem, wil::ResultException) -> void (called serially) -template -void ForEachAsync(const std::vector& items, TWork onWork, TSuccess onSuccess, TError onError, size_t batchSize = 10) -{ - WI_ASSERT(batchSize > 0); - THROW_HR_IF(E_INVALIDARG, batchSize == 0); - - using TResult = decltype(onWork(std::declval())); +namespace detail { - struct BatchResult + template + struct WorkerResult { - explicit BatchResult(TItem capturedItem) : item(std::move(capturedItem)) + WorkerResult() = default; + + explicit WorkerResult(const TItem& item_) : item(item_) { } - TItem item; + std::optional item; std::optional result; wil::ResultException error{S_OK}; bool hasError{false}; }; - for (size_t batchStart = 0; batchStart < items.size(); batchStart += batchSize) + // SharedContext holds state that must remain valid for the full lifetime of any running + // callback, including after WorkerPool is destroyed on the timeout path. Owned via + // shared_ptr and referenced by every SharedWorker. + template + struct SharedContext + { + NON_COPYABLE(SharedContext); + NON_MOVABLE(SharedContext); + + TWork onWork; + wil::unique_event cancelEvent; + + explicit SharedContext(TWork onWork_) : onWork(std::move(onWork_)) + { + cancelEvent.create(wil::EventOptions::ManualReset); + } + }; + + // Holds per-worker state. Each Launch heap-allocates a shared_ptr as the + // thread pool callback context, giving the callback shared ownership and ensuring this + // memory is not freed while a callback is still running. + template + struct SharedWorker + { + WorkerResult workerResult; + std::shared_ptr> context; + wil::unique_event done; + wil::unique_threadpool_work work; + }; + + // Manages a fixed pool of SharedWorkers. + template + struct WorkerPool { - const size_t batchEnd = std::min(batchStart + batchSize, items.size()); + NON_COPYABLE(WorkerPool); + NON_MOVABLE(WorkerPool); + + using TResult = decltype(std::declval()(std::declval(), std::declval())); + using TSharedWorker = SharedWorker; + using TSharedContext = SharedContext; + + std::vector> workers; + std::vector doneHandles; + std::shared_ptr context; + std::chrono::milliseconds timeout; + DWORD timeoutMs{}; + DWORD cancelDrainMs{}; + + WorkerPool(size_t workerCount, TWork onWork, std::chrono::milliseconds timeout_, std::chrono::milliseconds cancelDrainTimeout) : + context(std::make_shared(std::move(onWork))), + timeout(timeout_), + timeoutMs(timeout_ == std::chrono::milliseconds::max() ? INFINITE : static_cast(timeout_.count())), + cancelDrainMs(static_cast(cancelDrainTimeout.count())) + { + workers.reserve(workerCount); + doneHandles.reserve(workerCount); + + for (size_t i = 0; i < workerCount; ++i) + { + auto worker = std::make_shared(); + worker->done.create(wil::EventOptions::ManualReset); + + // Start signaled so that unstarted worker slots never block a + // WaitForMultipleObjects over the whole pool (e.g. in CancelAndDrainInFlight). + // Launch resets the event before submitting, ThreadPoolCallback sets it on exit. + worker->done.SetEvent(); + + worker->context = context; + + doneHandles.push_back(worker->done.get()); + workers.push_back(std::move(worker)); + } + } + + void Launch(size_t workerIndex, const TItem& item) + { + auto& worker = workers[workerIndex]; + + // Reset first so the slot appears busy. Re-signal on any failure so that + // CancelAndDrainInFlight does not block on a slot that was never submitted. + worker->done.ResetEvent(); + auto signalOnFail = wil::scope_exit([&worker] { worker->done.SetEvent(); }); + + worker->workerResult.item.emplace(item); + worker->workerResult.result.reset(); + worker->workerResult.error = wil::ResultException{S_OK}; + worker->workerResult.hasError = false; + + // Heap-allocate a shared_ptr as the callback context. The callback takes ownership, + // keeping the worker and its SharedContext alive for the full duration of the callback + // regardless of WorkerPool lifetime. Re-create the work item each launch so the + // context pointer is fresh. + auto* ctx = new std::shared_ptr(worker); + worker->work.reset(::CreateThreadpoolWork(ThreadPoolCallback, ctx, nullptr)); + if (!worker->work) + { + delete ctx; + THROW_LAST_ERROR(); + } + + ::SubmitThreadpoolWork(worker->work.get()); + signalOnFail.release(); + } + + void Drain(size_t workerIndex, TSuccess& onSuccess, TError& onError) + { + auto& worker = workers[workerIndex]; + + // Ensure the callback has fully returned before reading results. + ::WaitForThreadpoolWorkCallbacks(worker->work.get(), FALSE); + + if (worker->workerResult.hasError) + { + onError(*worker->workerResult.item, worker->workerResult.error); + } + else if (worker->workerResult.result.has_value()) + { + onSuccess(*worker->workerResult.result); + } + } - std::vector> futures; - futures.reserve(batchEnd - batchStart); + // Signals cancellation and waits up to cancelDrainMs for all in-flight workers to exit. + // Unstarted worker slots have their done events pre-signaled so they never block. + // Called noexcept from the scope_exit error guard in ForEachAsync. + void CancelAndDrainInFlight() noexcept + { + context->cancelEvent.SetEvent(); + ::WaitForMultipleObjects(static_cast(doneHandles.size()), doneHandles.data(), TRUE, cancelDrainMs); + } - for (size_t i = batchStart; i < batchEnd; ++i) + // Signals cancellation, waits up to cancelDrainMs for workers to exit, then throws ERROR_TIMEOUT. + // Workers that do not exit within cancelDrainMs are abandoned. Each running callback holds a + // shared_ptr to its SharedWorker and SharedContext, so neither is freed while the callback runs. + // onWork implementations must check the cancel event at natural checkpoints and exit promptly. + // + // Note: TerminateThread() is not used - it skips C++ destructors, leaves user-mode locks + // permanently held (causing deadlocks), and corrupts COM apartment state. + [[noreturn]] void CancelAndThrow(size_t remainingItems) { - const auto& item = items[i]; - futures.push_back(std::async(std::launch::async, [&onWork, item]() -> BatchResult { - BatchResult result{item}; - try - { - result.result = onWork(item); - } - catch (const wil::ResultException& ex) - { - result.hasError = true; - result.error = ex; - } - return result; - })); + context->cancelEvent.SetEvent(); + + ::WaitForMultipleObjects(static_cast(doneHandles.size()), doneHandles.data(), TRUE, cancelDrainMs); + + THROW_HR_MSG( + HRESULT_FROM_WIN32(ERROR_TIMEOUT), + "ForEachAsync: worker exceeded timeout of %lld ms (%zu items remaining).", + static_cast(timeout.count()), + remainingItems); } - for (auto& future : futures) + // Thread pool callback - invoked on a pool thread for each submitted work item. + // Takes ownership of the heap-allocated shared_ptr passed as context, + // ensuring the worker and its SharedContext remain alive for the duration of this call. + static void CALLBACK ThreadPoolCallback(PTP_CALLBACK_INSTANCE, void* context, PTP_WORK) noexcept { - auto batchResult = future.get(); + const std::unique_ptr> owner(static_cast*>(context)); + auto& worker = **owner; - if (batchResult.hasError) + try + { + worker.workerResult.result.emplace(worker.context->onWork(*worker.workerResult.item, worker.context->cancelEvent.get())); + } + catch (const wil::ResultException& ex) { - onError(batchResult.item, batchResult.error); + worker.workerResult.hasError = true; + worker.workerResult.error = ex; } - else if (batchResult.result.has_value()) + catch (...) { - onSuccess(*batchResult.result); + worker.workerResult.hasError = true; + worker.workerResult.error = wil::ResultException{wil::ResultFromCaughtException()}; } + + worker.done.SetEvent(); } + }; + +} // namespace detail + +// Invokes onWork for each element in items concurrently using the Windows thread pool, +// with concurrency bounded to poolSize. Results are delivered serially to onSuccess. +// Errors are delivered serially to onError. +// +// onWork receives a HANDLE to a cancellation event and should check it at natural +// checkpoints using WaitForSingleObject(cancel, 0), returning early if it is set. +// On timeout, the event is signalled and ForEachAsync waits up to cancelDrainTimeout +// for workers to exit before throwing HRESULT_FROM_WIN32(ERROR_TIMEOUT). +// +// If onSuccess, onError, or Launch throw, the cancel event is signalled and ForEachAsync +// waits up to cancelDrainTimeout for any in-flight workers to exit before rethrowing. +// If workers do not exit in that window, callbacks may outlive this call; onWork +// and any state it captures must remain valid until those callbacks finish. +// +// poolSize must not exceed MAXIMUM_WAIT_OBJECTS (64). +// +// The timeout is a safety net against indefinite hangs, not a strict per-worker limit. +// A worker that hangs while other workers are still completing will be caught in the +// final wait at most one full timeout after all other work has finished. +// +// Note: thread pool threads have no guaranteed per-thread initialization. Callers +// whose onWork requires per-thread setup (e.g. CoInitializeEx) must perform it at +// the start of the onWork lambda. +// +// TWork : (TItem, HANDLE cancelEvent) -> TResult (called concurrently) +// TSuccess: TResult -> void (called serially) +// TError : (TItem, wil::ResultException) -> void (called serially) +template +void ForEachAsync( + const std::vector& items, + TWork onWork, + TSuccess onSuccess, + TError onError, + size_t poolSize = 10, + std::chrono::milliseconds timeout = std::chrono::milliseconds::max(), + std::chrono::milliseconds cancelDrainTimeout = std::chrono::seconds(5)) +{ + THROW_HR_IF(E_INVALIDARG, poolSize == 0); + THROW_HR_IF(E_INVALIDARG, poolSize > MAXIMUM_WAIT_OBJECTS); + THROW_HR_IF(E_INVALIDARG, timeout.count() < 0); + THROW_HR_IF(E_INVALIDARG, cancelDrainTimeout.count() < 0); + + // INFINITE (0xFFFFFFFF) is reserved as the Win32 sentinel; values at or above it + // would either alias INFINITE or wrap, producing the opposite timeout behavior. + constexpr long long c_maxWaitMs = 0xFFFFFFFELL; // INFINITE - 1 + THROW_HR_IF(E_INVALIDARG, timeout != std::chrono::milliseconds::max() && timeout.count() > c_maxWaitMs); + THROW_HR_IF(E_INVALIDARG, cancelDrainTimeout.count() > c_maxWaitMs); + + if (items.empty()) + { + return; } + + const size_t workerCount = std::min(poolSize, items.size()); + + detail::WorkerPool pool{workerCount, std::move(onWork), timeout, cancelDrainTimeout}; + + // On any exception from Launch, Drain, or the user callbacks (onSuccess/onError), + // signal cancellation and wait for in-flight workers before rethrowing. This guarantees + // no background thread pool callbacks outlive the ForEachAsync call. + auto cancelOnError = wil::scope_exit([&pool] { pool.CancelAndDrainInFlight(); }); + + // Fill the pool - submit one item per worker to saturate all workers immediately. + size_t nextItem = 0; + for (; nextItem < workerCount; ++nextItem) + { + pool.Launch(nextItem, items[nextItem]); + } + + // Keep the pool full - as each worker completes, drain its result and immediately + // assign it the next pending item. WaitForMultipleObjects(FALSE) wakes on the first + // completion, so no worker idles while work remains. + while (nextItem < items.size()) + { + const DWORD waitResult = ::WaitForMultipleObjects(static_cast(workerCount), pool.doneHandles.data(), FALSE, pool.timeoutMs); + + if (waitResult == WAIT_TIMEOUT) + { + pool.CancelAndThrow(items.size() - nextItem); + } + + THROW_LAST_ERROR_IF(waitResult == WAIT_FAILED); + + const size_t workerIndex = waitResult - WAIT_OBJECT_0; + pool.Drain(workerIndex, onSuccess, onError); + pool.Launch(workerIndex, items[nextItem++]); + } + + const DWORD finalWait = ::WaitForMultipleObjects(static_cast(workerCount), pool.doneHandles.data(), TRUE, pool.timeoutMs); + + if (finalWait == WAIT_TIMEOUT) + { + pool.CancelAndThrow(0); + } + + THROW_LAST_ERROR_IF(finalWait == WAIT_FAILED); + + for (size_t i = 0; i < workerCount; ++i) + { + pool.Drain(i, onSuccess, onError); + } + + cancelOnError.release(); } } // namespace wsl::windows::wslc diff --git a/src/windows/wslc/tasks/ContainerTasks.cpp b/src/windows/wslc/tasks/ContainerTasks.cpp index 0570108c9..b2786d634 100644 --- a/src/windows/wslc/tasks/ContainerTasks.cpp +++ b/src/windows/wslc/tasks/ContainerTasks.cpp @@ -566,14 +566,21 @@ void ShowContainerStats(CLIExecutionContext& context) } } - // Fetch stats for all containers concurrently in batches. The Docker engine blocks for ~1s + // Fetch stats for all containers concurrently. The Docker engine blocks for ~1s // per request to collect a valid precpu_stats sample, so issuing requests in parallel keeps - // wall time proportional to ceil(N / batchSize) rather than N. + // wall time proportional to ceil(N / poolSize) rather than N. nlohmann::json statsJson = nlohmann::json::array(); wsl::windows::wslc::ForEachAsync( containers, // Work to be done for each container ID on a separate thread. - [&session](const std::wstring& containerId) { + // cancelHandle is signalled if the overall operation times out, check it before + // the blocking Stats call so we exit cooperatively without waiting a full ~1s sample. + [session, userSpecifiedContainers](const std::wstring& containerId, HANDLE cancelHandle) mutable { + if (::WaitForSingleObject(cancelHandle, 0) == WAIT_OBJECT_0) + { + THROW_HR(HRESULT_FROM_WIN32(ERROR_CANCELLED)); + } + // ContainerService::Stats makes COM calls, so we must ensure COM is initialized on this thread. auto comCleanup = wil::CoInitializeEx(COINIT_MULTITHREADED); return ComputeContainerStatsJson(ContainerService::Stats(session, WideToMultiByte(containerId))); @@ -582,6 +589,12 @@ void ShowContainerStats(CLIExecutionContext& context) [&](const nlohmann::json& entry) { statsJson.push_back(entry); }, // On Error [&](const std::wstring& containerId, wil::ResultException error) { + if (error.GetErrorCode() == HRESULT_FROM_WIN32(ERROR_CANCELLED)) + { + // Cancellation due to timeout. Let ForEachAsync surface ERROR_TIMEOUT to the caller. + return; + } + if (!userSpecifiedContainers) { switch (error.GetErrorCode()) @@ -599,7 +612,9 @@ void ShowContainerStats(CLIExecutionContext& context) LOG_HR_MSG(error.GetErrorCode(), "Failed to get stats for container %ws", containerId.c_str()); throw error; }, - 10 // Batch Size - chosen to be around typical expected container use while protecting against extreme cases. + 10, // Thread pool size - typical expected container use while protecting against extreme cases. + std::chrono::seconds(30), // Timeout - Docker stats blocks ~1s per sample; 30s gives ample headroom on a taxed system. + std::chrono::seconds(5) // Cancel drain - grace period for workers to observe the cancel event and exit cleanly. ); FormatType format = FormatType::Table; // Default is table diff --git a/test/windows/wslc/WSLCCLIExecutionUnitTests.cpp b/test/windows/wslc/WSLCCLIExecutionUnitTests.cpp index 14c231478..e15b6e78c 100644 --- a/test/windows/wslc/WSLCCLIExecutionUnitTests.cpp +++ b/test/windows/wslc/WSLCCLIExecutionUnitTests.cpp @@ -420,7 +420,7 @@ class ForEachAsyncUnitTests ForEachAsync( items, - [](int item) { return item * 2; }, + [](int item, HANDLE /*cancelHandle*/) { return item * 2; }, [&](int result) { results.push_back(result); }, [](int /*item*/, wil::ResultException /*error*/) { VERIFY_FAIL(L"Unexpected error"); }); @@ -439,7 +439,7 @@ class ForEachAsyncUnitTests ForEachAsync( items, - [](int item) -> int { + [](int item, HANDLE /*cancelHandle*/) -> int { if (item == 2) { THROW_HR(E_FAIL); @@ -454,6 +454,20 @@ class ForEachAsyncUnitTests VERIFY_ARE_EQUAL(2u, succeededItems.size()); } + TEST_METHOD(ForEachAsync_AllItemsFailInvokesErrorForEach) + { + const std::vector items = {1, 2, 3, 4, 5}; + std::atomic errorCount{0}; + + ForEachAsync( + items, + [](int /*item*/, HANDLE /*cancelHandle*/) -> int { THROW_HR(E_FAIL); }, + [](int /*result*/) { VERIFY_FAIL(L"Unexpected success callback"); }, + [&](int /*item*/, wil::ResultException /*error*/) { ++errorCount; }); + + VERIFY_ARE_EQUAL(static_cast(items.size()), errorCount.load()); + } + TEST_METHOD(ForEachAsync_EmptyInputProducesNoCallbacks) { const std::vector items; @@ -462,7 +476,7 @@ class ForEachAsyncUnitTests ForEachAsync( items, - [](int item) { return item; }, + [](int item, HANDLE /*cancelHandle*/) { return item; }, [&](int /*result*/) { successCalled = true; }, [&](int /*item*/, wil::ResultException /*error*/) { errorCalled = true; }); @@ -470,17 +484,17 @@ class ForEachAsyncUnitTests VERIFY_IS_FALSE(errorCalled); } - TEST_METHOD(ForEachAsync_BatchSizeOfOneProcessesAllItems) + TEST_METHOD(ForEachAsync_PoolSizeOfOneProcessesAllItems) { const std::vector items = {10, 20, 30, 40, 50}; std::vector results; ForEachAsync( items, - [](int item) { return item; }, + [](int item, HANDLE /*cancelHandle*/) { return item; }, [&](int result) { results.push_back(result); }, [](int /*item*/, wil::ResultException /*error*/) { VERIFY_FAIL(L"Unexpected error"); }, - /*batchSize=*/1); + /*poolSize=*/1); VERIFY_ARE_EQUAL(items.size(), results.size()); for (int item : items) @@ -489,6 +503,92 @@ class ForEachAsyncUnitTests } } + TEST_METHOD(ForEachAsync_PoolSizeLargerThanItemCountIsHandled) + { + // When poolSize > items.size(), the pool is clamped to items.size(). + // All items must still be processed without error. + const std::vector items = {1, 2, 3}; + std::vector results; + + ForEachAsync( + items, + [](int item, HANDLE /*cancelHandle*/) { return item; }, + [&](int result) { results.push_back(result); }, + [](int /*item*/, wil::ResultException /*error*/) { VERIFY_FAIL(L"Unexpected error"); }, + /*poolSize=*/64); + + VERIFY_ARE_EQUAL(items.size(), results.size()); + for (int item : items) + { + VERIFY_IS_TRUE(std::find(results.begin(), results.end(), item) != results.end()); + } + } + + TEST_METHOD(ForEachAsync_ItemsExceedingPoolSizeAreAllProcessed) + { + // Core regression for the optimization: items beyond poolSize must be dispatched + // once earlier workers complete - not silently dropped. + const std::vector items = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + std::vector results; + + ForEachAsync( + items, + [](int item, HANDLE /*cancelHandle*/) { return item; }, + [&](int result) { results.push_back(result); }, + [](int /*item*/, wil::ResultException /*error*/) { VERIFY_FAIL(L"Unexpected error"); }, + /*poolSize=*/3); + + VERIFY_ARE_EQUAL(items.size(), results.size()); + for (int item : items) + { + VERIFY_IS_TRUE(std::find(results.begin(), results.end(), item) != results.end()); + } + } + + TEST_METHOD(ForEachAsync_SuccessAndErrorCallbacksAreInvokedSerially) + { + // onSuccess and onError are documented as serial. Verify this using a non-atomic + // depth counter - concurrent calls would race and produce depth > 1. + const std::vector items = {1, 2, 3, 4, 5, 6, 7, 8}; + int callbackDepth = 0; + + ForEachAsync( + items, + [](int item, HANDLE /*cancelHandle*/) { return item; }, + [&](int /*result*/) { + ++callbackDepth; + VERIFY_ARE_EQUAL(1, callbackDepth); + --callbackDepth; + }, + [](int /*item*/, wil::ResultException /*error*/) { VERIFY_FAIL(L"Unexpected error"); }, + /*poolSize=*/8); + } + + TEST_METHOD(ForEachAsync_InvalidPoolSizeThrows) + { + const std::vector items = {1}; + + VERIFY_THROWS_SPECIFIC( + ForEachAsync( + items, + [](int item, HANDLE /*cancelHandle*/) { return item; }, + [](int /*result*/) {}, + [](int /*item*/, wil::ResultException /*error*/) {}, + /*poolSize=*/0), + wil::ResultException, + [](const wil::ResultException& ex) { return ex.GetErrorCode() == E_INVALIDARG; }); + + VERIFY_THROWS_SPECIFIC( + ForEachAsync( + items, + [](int item, HANDLE /*cancelHandle*/) { return item; }, + [](int /*result*/) {}, + [](int /*item*/, wil::ResultException /*error*/) {}, + /*poolSize=*/MAXIMUM_WAIT_OBJECTS + 1), + wil::ResultException, + [](const wil::ResultException& ex) { return ex.GetErrorCode() == E_INVALIDARG; }); + } + TEST_METHOD(ForEachAsync_ErrorInOnErrorPropagatesThrow) { const std::vector items = {1}; @@ -496,12 +596,70 @@ class ForEachAsyncUnitTests VERIFY_THROWS_SPECIFIC( ForEachAsync( items, - [](int /*item*/) -> int { THROW_HR(E_ACCESSDENIED); }, + [](int /*item*/, HANDLE /*cancelHandle*/) -> int { THROW_HR(E_ACCESSDENIED); }, [](int /*result*/) {}, [](int /*item*/, wil::ResultException error) { throw error; }), wil::ResultException, [](const wil::ResultException& ex) { return ex.GetErrorCode() == E_ACCESSDENIED; }); } + + TEST_METHOD(ForEachAsync_TimeoutThrowsErrorTimeout) + { + // Workers block cooperatively on the cancel handle until the timeout fires. + // The drain is near-instant since workers exit as soon as the handle is signalled. + const std::vector items = {1, 2, 3}; + + VERIFY_THROWS_SPECIFIC( + ForEachAsync( + items, + [](int /*item*/, HANDLE cancelHandle) -> int { + WaitForSingleObject(cancelHandle, INFINITE); + THROW_HR(HRESULT_FROM_WIN32(ERROR_CANCELLED)); + }, + [](int /*result*/) { VERIFY_FAIL(L"Unexpected success callback"); }, + [](int /*item*/, wil::ResultException error) { + // Suppress expected cancellations - ForEachAsync surfaces ERROR_TIMEOUT. + if (error.GetErrorCode() != HRESULT_FROM_WIN32(ERROR_CANCELLED)) + { + throw error; + } + }, + /*poolSize=*/10, + /*timeout=*/std::chrono::milliseconds(500), + /*cancelDrainTimeout=*/std::chrono::milliseconds(500)), + wil::ResultException, + [](const wil::ResultException& ex) { return ex.GetErrorCode() == HRESULT_FROM_WIN32(ERROR_TIMEOUT); }); + } + + TEST_METHOD(ForEachAsync_CancelHandleStopsWorkCooperatively) + { + // Verify that a timeout produces ERROR_TIMEOUT and that workers which observe the + // cancel handle exit cleanly within the drain window. The exact number of workers + // dispatched before the timeout is not asserted - it is scheduler-dependent and + // would be a source of flakiness under load. + const std::vector items = {1, 2, 3, 4, 5}; + + VERIFY_THROWS_SPECIFIC( + ForEachAsync( + items, + [](int /*item*/, HANDLE cancelHandle) -> int { + WaitForSingleObject(cancelHandle, INFINITE); + THROW_HR(HRESULT_FROM_WIN32(ERROR_CANCELLED)); + }, + [](int /*result*/) { VERIFY_FAIL(L"Unexpected success callback"); }, + [](int /*item*/, wil::ResultException error) { + // Suppress expected cancellations - ForEachAsync surfaces ERROR_TIMEOUT. + if (error.GetErrorCode() != HRESULT_FROM_WIN32(ERROR_CANCELLED)) + { + throw error; + } + }, + /*poolSize=*/2, + /*timeout=*/std::chrono::milliseconds(500), + /*cancelDrainTimeout=*/std::chrono::milliseconds(500)), + wil::ResultException, + [](const wil::ResultException& ex) { return ex.GetErrorCode() == HRESULT_FROM_WIN32(ERROR_TIMEOUT); }); + } }; } // namespace WSLCCLIExecutionUnitTests