diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 62e6820e9c..106012798e 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -82,6 +82,7 @@ class HookedExecutor final : public folly::Executor { state_(std::make_shared()) {} ~HookedExecutor() override { + shuttingDown_.store(true, std::memory_order_release); if (!join()) { LOG(WARNING) << "Timed out waiting for hooked executor " << name_ << " to finish after " << joinTimeout_.count() << " ms."; @@ -89,6 +90,13 @@ class HookedExecutor final : public folly::Executor { dumpOutstandingTasks(); } } + // We do NOT assume ownership of parent lifecycle, + // so we do NOT call join() blindly. + // + // Instead, we only try a "soft fence" by pushing a no-op barrier. + if (parent_) { + parent_->add([] {}); + } } uint8_t getNumPriorities() const override { @@ -144,12 +152,18 @@ class HookedExecutor final : public folly::Executor { public: void add(folly::Func func) override { GLUTEN_CHECK(parent_ != nullptr, "Parent executor is null."); + if (UNLIKELY(shuttingDown_.load(std::memory_order_acquire))) { + return; + } state_->inFlight.fetch_add(1, std::memory_order_relaxed); parent_->add(wrap(std::move(func), 0)); } void addWithPriority(folly::Func func, int8_t priority) override { GLUTEN_CHECK(parent_ != nullptr, "Parent executor is null."); + if (UNLIKELY(shuttingDown_.load(std::memory_order_acquire))) { + return; + } state_->inFlight.fetch_add(1, std::memory_order_relaxed); parent_->addWithPriority(wrap(std::move(func), priority), priority); } @@ -194,6 +208,7 @@ class HookedExecutor final : public folly::Executor { bool debug_; std::chrono::milliseconds joinTimeout_; std::shared_ptr state_; + std::atomic shuttingDown_{false}; }; std::unique_ptr makeHookedExecutor(