-
Notifications
You must be signed in to change notification settings - Fork 1.8k
CLI: Rewrite ForEachAsync to use threadpool, add timeout #40675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
22f4a6d
7133a5a
80061aa
44fbcd1
2f093fc
0be6f83
c578ac9
2efe78b
993c46d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,44 +9,31 @@ Module Name: | |
| Abstract: | ||
|
|
||
| Provides ForEachAsync, a generic helper for executing a work callback | ||
| over a collection concurrently in bounded batches using std::async. | ||
| over a collection concurrently using the Windows thread pool with bounded | ||
| concurrency and cooperative cancellation. | ||
|
|
||
| --*/ | ||
| #pragma once | ||
|
|
||
| #include <algorithm> | ||
| #include <future> | ||
| #include <chrono> | ||
| #include <memory> | ||
| #include <optional> | ||
| #include <utility> | ||
| #include <vector> | ||
| #include <wil/resource.h> | ||
| #include <wil/result_macros.h> | ||
|
|
||
| namespace wsl::windows::wslc { | ||
|
|
||
| // Invokes onWork for each element in items concurrently, in batches of batchSize. | ||
| // Results are delivered serially to onSuccess. Errors are delivered serially to onError. | ||
| // | ||
| // This keeps wall time proportional to ceil(N / batchSize) rather than N for operations | ||
| // that have inherent per-item latency (e.g. network or IPC calls). | ||
| // | ||
| // Note: worker threads have no guaranteed per-thread initialization (e.g. COM). Callers | ||
| // whose onWork requires per-thread setup (such as CoInitializeEx) are responsible for | ||
| // performing it at the start of the onWork lambda. | ||
| // | ||
| // TWork : TItem -> TResult (called concurrently) | ||
| // TSuccess: TResult -> void (called serially) | ||
| // TError : (TItem, wil::ResultException) -> void (called serially) | ||
| template <typename TItem, typename TWork, typename TSuccess, typename TError> | ||
| void ForEachAsync(const std::vector<TItem>& items, TWork onWork, TSuccess onSuccess, TError onError, size_t batchSize = 10) | ||
| { | ||
| WI_ASSERT(batchSize > 0); | ||
| THROW_HR_IF(E_INVALIDARG, batchSize == 0); | ||
|
|
||
| using TResult = decltype(onWork(std::declval<TItem>())); | ||
| namespace detail { | ||
|
|
||
| struct BatchResult | ||
| template <typename TItem, typename TResult> | ||
| struct WorkerResult | ||
| { | ||
| explicit BatchResult(TItem capturedItem) : item(std::move(capturedItem)) | ||
| WorkerResult() = default; | ||
|
|
||
| explicit WorkerResult(const TItem& item_) : item(item_) | ||
| { | ||
| } | ||
|
|
||
|
|
@@ -56,44 +43,244 @@ void ForEachAsync(const std::vector<TItem>& items, TWork onWork, TSuccess onSucc | |
| bool hasError{false}; | ||
| }; | ||
|
|
||
| for (size_t batchStart = 0; batchStart < items.size(); batchStart += batchSize) | ||
| // SharedContext holds state that must remain valid for the full lifetime of any running | ||
| // callback, including after WorkerPool is destroyed on the timeout path. Owned via | ||
| // shared_ptr and referenced by every SharedWorker. | ||
| template <typename TWork> | ||
| struct SharedContext | ||
| { | ||
| const size_t batchEnd = std::min(batchStart + batchSize, items.size()); | ||
| NON_COPYABLE(SharedContext); | ||
| NON_MOVABLE(SharedContext); | ||
|
|
||
| TWork onWork; | ||
| wil::unique_event cancelEvent; | ||
|
|
||
| explicit SharedContext(TWork onWork_) : onWork(std::move(onWork_)) | ||
| { | ||
| cancelEvent.create(wil::EventOptions::ManualReset); | ||
| } | ||
| }; | ||
|
|
||
| // Holds per-worker state. Each Launch heap-allocates a shared_ptr<SharedWorker> as the | ||
| // thread pool callback context, giving the callback shared ownership and ensuring this | ||
| // memory is not freed while a callback is still running. | ||
| template <typename TWork, typename TItem, typename TResult> | ||
| struct SharedWorker | ||
| { | ||
| WorkerResult<TItem, TResult> workerResult; | ||
|
dkbennett marked this conversation as resolved.
|
||
| std::shared_ptr<SharedContext<TWork>> context; | ||
| wil::unique_event done; | ||
| wil::unique_threadpool_work work; | ||
| }; | ||
|
|
||
| // Manages a fixed pool of SharedWorkers. | ||
| template <typename TItem, typename TWork, typename TSuccess, typename TError> | ||
| struct WorkerPool | ||
| { | ||
| NON_COPYABLE(WorkerPool); | ||
| NON_MOVABLE(WorkerPool); | ||
|
|
||
| using TResult = decltype(std::declval<TWork>()(std::declval<TItem>(), std::declval<HANDLE>())); | ||
| using TSharedWorker = SharedWorker<TWork, TItem, TResult>; | ||
| using TSharedContext = SharedContext<TWork>; | ||
|
|
||
| std::vector<std::shared_ptr<TSharedWorker>> workers; | ||
| std::vector<HANDLE> doneHandles; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we remove the "timeout on cancel" logic, we can get rid of those |
||
| std::shared_ptr<TSharedContext> context; | ||
| std::chrono::milliseconds timeout; | ||
| DWORD timeoutMs{}; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| DWORD cancelDrainMs{}; | ||
|
|
||
| std::vector<std::future<BatchResult>> futures; | ||
| futures.reserve(batchEnd - batchStart); | ||
| WorkerPool(size_t workerCount, TWork onWork, std::chrono::milliseconds timeout_, std::chrono::milliseconds cancelDrainTimeout) : | ||
| context(std::make_shared<TSharedContext>(std::move(onWork))), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assuming that we don't leak threads, |
||
| timeout(timeout_), | ||
| timeoutMs(timeout_ == std::chrono::milliseconds::max() ? INFINITE : static_cast<DWORD>(timeout_.count())), | ||
| cancelDrainMs(static_cast<DWORD>(cancelDrainTimeout.count())) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that we have a |
||
| { | ||
| workers.reserve(workerCount); | ||
| doneHandles.reserve(workerCount); | ||
|
|
||
| for (size_t i = 0; i < workerCount; ++i) | ||
| { | ||
| auto worker = std::make_shared<TSharedWorker>(); | ||
| worker->done.create(wil::EventOptions::ManualReset); | ||
| worker->context = context; | ||
|
|
||
| doneHandles.push_back(worker->done.get()); | ||
| workers.push_back(std::move(worker)); | ||
| } | ||
| } | ||
|
|
||
| for (size_t i = batchStart; i < batchEnd; ++i) | ||
| void Launch(size_t workerIndex, const TItem& item) | ||
| { | ||
| const auto& item = items[i]; | ||
| futures.push_back(std::async(std::launch::async, [&onWork, item]() -> BatchResult { | ||
| BatchResult result{item}; | ||
| try | ||
| { | ||
| result.result = onWork(item); | ||
| } | ||
| catch (const wil::ResultException& ex) | ||
| { | ||
| result.hasError = true; | ||
| result.error = ex; | ||
| } | ||
| return result; | ||
| })); | ||
| auto& worker = workers[workerIndex]; | ||
| worker->workerResult = WorkerResult<TItem, TResult>{}; | ||
| worker->workerResult.item = item; | ||
|
dkbennett marked this conversation as resolved.
Outdated
dkbennett marked this conversation as resolved.
Outdated
dkbennett marked this conversation as resolved.
Outdated
|
||
| worker->done.ResetEvent(); | ||
|
|
||
| // Heap-allocate a shared_ptr as the callback context. The callback takes ownership, | ||
| // keeping the worker and its SharedContext alive for the full duration of the callback | ||
| // regardless of WorkerPool lifetime. Re-create the work item each launch so the | ||
| // context pointer is fresh. | ||
| auto* ctx = new std::shared_ptr<TSharedWorker>(worker); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we merge SharedContext and SharedWorker into one structure that the threadpool owns, and pass a pointer to that structure directly as the context ? This would simplify things a lot, and get rid of the If we store them as an std::list, the pointers never get invalidated |
||
| worker->work.reset(::CreateThreadpoolWork(ThreadPoolCallback, ctx, nullptr)); | ||
| if (!worker->work) | ||
| { | ||
| delete ctx; | ||
| THROW_LAST_ERROR(); | ||
| } | ||
|
|
||
| ::SubmitThreadpoolWork(worker->work.get()); | ||
| } | ||
|
|
||
| for (auto& future : futures) | ||
| void Drain(size_t workerIndex, TSuccess& onSuccess, TError& onError) | ||
| { | ||
| auto batchResult = future.get(); | ||
| auto& worker = workers[workerIndex]; | ||
|
|
||
| // Ensure the callback has fully returned before reading results. | ||
| ::WaitForThreadpoolWorkCallbacks(worker->work.get(), FALSE); | ||
|
|
||
| if (batchResult.hasError) | ||
| if (worker->workerResult.hasError) | ||
| { | ||
| onError(batchResult.item, batchResult.error); | ||
| onError(worker->workerResult.item, worker->workerResult.error); | ||
| } | ||
| else if (batchResult.result.has_value()) | ||
| else if (worker->workerResult.result.has_value()) | ||
| { | ||
| onSuccess(*batchResult.result); | ||
| onSuccess(*worker->workerResult.result); | ||
| } | ||
| } | ||
|
|
||
| // Signals cancellation, waits up to cancelDrainMs for workers to exit, then throws ERROR_TIMEOUT. | ||
| // Workers that do not exit within cancelDrainMs are abandoned. Each running callback holds a | ||
| // shared_ptr to its SharedWorker and SharedContext, so neither is freed while the callback runs. | ||
| // onWork implementations must check the cancel event at natural checkpoints and exit promptly. | ||
| // | ||
| // Note: TerminateThread() is not used - it skips C++ destructors, leaves user-mode locks | ||
| // permanently held (causing deadlocks), and corrupts COM apartment state. | ||
| [[noreturn]] void CancelAndThrow(size_t remainingItems) | ||
| { | ||
| context->cancelEvent.SetEvent(); | ||
|
|
||
| ::WaitForMultipleObjects(static_cast<DWORD>(doneHandles.size()), doneHandles.data(), TRUE, cancelDrainMs); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: We should check the return code of |
||
|
|
||
| THROW_HR_MSG( | ||
| HRESULT_FROM_WIN32(ERROR_TIMEOUT), | ||
| "ForEachAsync: worker exceeded timeout of %lld ms (%zu items remaining).", | ||
| static_cast<long long>(timeout.count()), | ||
| remainingItems); | ||
| } | ||
|
|
||
| // Thread pool callback - invoked on a pool thread for each submitted work item. | ||
| // Takes ownership of the heap-allocated shared_ptr<TSharedWorker> passed as context, | ||
| // ensuring the worker and its SharedContext remain alive for the duration of this call. | ||
| static void CALLBACK ThreadPoolCallback(PTP_CALLBACK_INSTANCE, void* context, PTP_WORK) noexcept | ||
| { | ||
| const std::unique_ptr<std::shared_ptr<TSharedWorker>> owner(static_cast<std::shared_ptr<TSharedWorker>*>(context)); | ||
| auto& worker = **owner; | ||
|
|
||
| try | ||
| { | ||
| worker.workerResult.result = worker.context->onWork(worker.workerResult.item, worker.context->cancelEvent.get()); | ||
| } | ||
| catch (const wil::ResultException& ex) | ||
| { | ||
| worker.workerResult.hasError = true; | ||
| worker.workerResult.error = ex; | ||
| } | ||
|
dkbennett marked this conversation as resolved.
|
||
| catch (...) | ||
| { | ||
| worker.workerResult.hasError = true; | ||
| worker.workerResult.error = wil::ResultException{wil::details::ResultFromCaughtException()}; | ||
|
dkbennett marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| worker.done.SetEvent(); | ||
| } | ||
| }; | ||
|
|
||
| } // namespace detail | ||
|
|
||
| // Invokes onWork for each element in items concurrently using the Windows thread pool, | ||
| // with concurrency bounded to poolSize. Results are delivered serially to onSuccess. | ||
| // Errors are delivered serially to onError. | ||
| // | ||
| // onWork receives a HANDLE to a cancellation event and should check it at natural | ||
| // checkpoints using WaitForSingleObject(cancel, 0), returning early if it is set. | ||
| // On timeout, the event is signalled and ForEachAsync waits up to cancelDrainTimeout | ||
| // for workers to exit before throwing HRESULT_FROM_WIN32(ERROR_TIMEOUT). | ||
| // | ||
| // poolSize must not exceed MAXIMUM_WAIT_OBJECTS (64). | ||
| // | ||
| // The timeout is a safety net against indefinite hangs, not a strict per-worker limit. | ||
| // A worker that hangs while other workers are still completing will be caught in the | ||
| // final wait at most one full timeout after all other work has finished. | ||
| // | ||
| // Note: thread pool threads have no guaranteed per-thread initialization. Callers | ||
| // whose onWork requires per-thread setup (e.g. CoInitializeEx) must perform it at | ||
| // the start of the onWork lambda. | ||
| // | ||
| // TWork : (TItem, HANDLE cancelEvent) -> TResult (called concurrently) | ||
| // TSuccess: TResult -> void (called serially) | ||
| // TError : (TItem, wil::ResultException) -> void (called serially) | ||
| template <typename TItem, typename TWork, typename TSuccess, typename TError> | ||
| void ForEachAsync( | ||
| const std::vector<TItem>& items, | ||
| TWork onWork, | ||
| TSuccess onSuccess, | ||
| TError onError, | ||
| size_t poolSize = 10, | ||
| std::chrono::milliseconds timeout = std::chrono::milliseconds::max(), | ||
| std::chrono::milliseconds cancelDrainTimeout = std::chrono::seconds(5)) | ||
| { | ||
| THROW_HR_IF(E_INVALIDARG, poolSize == 0); | ||
| THROW_HR_IF(E_INVALIDARG, poolSize > MAXIMUM_WAIT_OBJECTS); | ||
|
dkbennett marked this conversation as resolved.
|
||
|
|
||
| if (items.empty()) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| const size_t workerCount = std::min(poolSize, items.size()); | ||
|
|
||
| detail::WorkerPool<TItem, TWork, TSuccess, TError> pool{workerCount, std::move(onWork), timeout, cancelDrainTimeout}; | ||
|
|
||
| // Fill the pool - submit one item per worker to saturate all workers immediately. | ||
| size_t nextItem = 0; | ||
| for (; nextItem < workerCount; ++nextItem) | ||
| { | ||
| pool.Launch(nextItem, items[nextItem]); | ||
| } | ||
|
|
||
| // Keep the pool full - as each worker completes, drain its result and immediately | ||
| // assign it the next pending item. WaitForMultipleObjects(FALSE) wakes on the first | ||
| // completion, so no worker idles while work remains. | ||
| while (nextItem < items.size()) | ||
| { | ||
| const DWORD waitResult = ::WaitForMultipleObjects(static_cast<DWORD>(workerCount), pool.doneHandles.data(), FALSE, pool.timeoutMs); | ||
|
|
||
| if (waitResult == WAIT_TIMEOUT) | ||
| { | ||
| pool.CancelAndThrow(items.size() - nextItem); | ||
|
dkbennett marked this conversation as resolved.
|
||
| } | ||
|
|
||
| THROW_LAST_ERROR_IF(waitResult == WAIT_FAILED); | ||
|
|
||
| const size_t workerIndex = waitResult - WAIT_OBJECT_0; | ||
| pool.Drain(workerIndex, onSuccess, onError); | ||
| pool.Launch(workerIndex, items[nextItem++]); | ||
|
dkbennett marked this conversation as resolved.
|
||
| } | ||
|
|
||
| const DWORD finalWait = ::WaitForMultipleObjects(static_cast<DWORD>(workerCount), pool.doneHandles.data(), TRUE, pool.timeoutMs); | ||
|
|
||
| if (finalWait == WAIT_TIMEOUT) | ||
| { | ||
| pool.CancelAndThrow(0); | ||
| } | ||
|
|
||
| THROW_LAST_ERROR_IF(finalWait == WAIT_FAILED); | ||
|
|
||
| for (size_t i = 0; i < workerCount; ++i) | ||
| { | ||
| pool.Drain(i, onSuccess, onError); | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this cancel event should be in the threadpool structure, since the threadpool logic doesn't actually look at it.
Callers can capture a cancel event in their work callback in they need to