Feature/fdb remote deregister#281
Conversation
There was a problem hiding this comment.
Pull request overview
This pull request updates the remote-client lifecycle to support safer client de-registration, makes ReadLimiter initialisation lazy, and adjusts connection-router singleton lifecycle to better handle shutdown/teardown scenarios for local-catalogue + remote-store configurations.
Changes:
- Make
ReadLimiterlazily initialised (with a default limit) and update related initialisation call sites. - Make client de-registration idempotent and ensure derived destructors de-register before tearing down state accessed by
handle(). - Update connection router singleton lifetime/initialisation and adjust client-handling thread locking.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/fdb5/remote/client/RemoteStore.h | Removes unused per-request message queue map, leaving retrieval queue map. |
| src/fdb5/remote/client/RemoteStore.cc | Adds explicit de-registration in destructor and simplifies retrieval message handling. |
| src/fdb5/remote/client/ReadLimiter.h | Exposes lazy init and adds defaultReadLimit() API. |
| src/fdb5/remote/client/ReadLimiter.cc | Implements thread-safe lazy singleton initialisation via std::call_once. |
| src/fdb5/remote/client/ClientConnectionRouter.cc | Changes router singleton initialisation/lifetime and explicit mutex lock guards. |
| src/fdb5/remote/client/ClientConnection.h | Removes unused mutex member. |
| src/fdb5/remote/client/ClientConnection.cc | Refines locking around promise fulfilment and client dispatch in listener threads. |
| src/fdb5/remote/client/Client.h | Adds idempotent deregister() and an atomic guard flag. |
| src/fdb5/remote/client/Client.cc | Implements deregister() and uses it from destructor. |
| src/fdb5/api/RemoteFDB.h | Moves to out-of-line destructor definition. |
| src/fdb5/api/RemoteFDB.cc | Removes eager ReadLimiter init and de-registers in destructor. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #281 +/- ##
===========================================
- Coverage 71.15% 71.05% -0.11%
===========================================
Files 370 370
Lines 23455 23535 +80
Branches 2463 2470 +7
===========================================
+ Hits 16690 16722 +32
- Misses 6765 6813 +48 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
|
||
| Client::~Client() { | ||
| connection_->remove(id_); | ||
| deregister(); |
There was a problem hiding this comment.
Are there any exceptions that might be thrown during this deregister()? Is that safe, given we are in a destructor?
| } | ||
|
|
||
| void ReadLimiter::init(size_t memoryLimit) { | ||
| std::call_once(initFlag_, [memoryLimit] { instance_ = new ReadLimiter(memoryLimit); }); |
There was a problem hiding this comment.
Note that this silently becomes a NOP if already initialised.
Is there a reason we can't just make the limit on the Readlimiter updatable? Then we cane use a static ReadLimiter limiter inside the instance() function rather than messing around with memory allocation/call_once.
| ClientConnectionRouter& ClientConnectionRouter::instance() { | ||
| static ClientConnectionRouter router; | ||
| return router; | ||
| std::call_once(initFlag_, [] { instance_ = new ClientConnectionRouter(); }); |
There was a problem hiding this comment.
I am missing something. Why are we changing the way we construct the static instance?
Are we having issues with the static instance being cleaned up too soon? if so we need a BIG comment in the places where this is done - not least to note that we will be leaking the object.
9362fff to
1bb317b
Compare
| void Connection::error(std::string_view msg, uint32_t clientID, uint32_t requestID) const { | ||
| eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl; | ||
| write(Message::Error, false, clientID, requestID, msg.data(), msg.length()); | ||
| write(Message::Error, true, clientID, requestID, msg.data(), msg.length()); |
There was a problem hiding this comment.
This flip makes me nervous, and such that I would want to have a proper design discussion about it. Why are we changing whether (all) error messages are being sent on the control vs. data connection. That feels like a big change?
There was a problem hiding this comment.
Initially the control connection was blocking (no listener thread) and thus I had to use the data connection for the errors.
With the current implementation, we have the freedom to chose and the data connection could be busier with the large data transfer, so I considered this a potential improvement to the error latency.
| if (connections_.empty()) { | ||
| std::lock_guard<std::mutex> lock(initMutex); | ||
| instance_.reset(); | ||
| } |
| if (hdr.message == Message::Error) { // this is an error response to a blocking request, | ||
| // set the exception on the promise | ||
| std::string errmsg = | ||
| (hdr.payloadSize == 0) | ||
| ? "remote error - no error message provided" | ||
| : std::string{static_cast<const char*>(payload.data()), payload.size()}; | ||
| try { | ||
| pp->second.set_exception( | ||
| std::make_exception_ptr(RemoteFDBException(errmsg, controlEndpoint()))); | ||
| } | ||
| catch (const std::exception& e) { | ||
| Log::error() | ||
| << "ERROR: " << msgHeader(hdr, controlEndpoint_) << " - received error \"" << errmsg | ||
| << "\" for blocking request - unable to set the exception on the promise: " | ||
| << e.what() << std::endl; | ||
| } | ||
| } |
| uint32_t id = generateRequestID(); | ||
| std::lock_guard<std::mutex> lock(messageMutex_); | ||
| auto entry = messageQueues_.emplace(id, std::make_shared<MessageQueue>(HelperClass::queueSize())); | ||
| ASSERT(entry.second); | ||
| std::shared_ptr<MessageQueue> messageQueue(entry.first->second); |
| case Message::Error: { | ||
| std::lock_guard<std::mutex> lock(messageMutex_); | ||
| // Received Error message without error description. Remove the corresponding entry from the message queue | ||
| // and let the caller know & complain | ||
| auto it = messageQueues_.find(requestID); | ||
| if (it != messageQueues_.end()) { | ||
| it->second->interrupt( | ||
| std::make_exception_ptr(RemoteFDBException("no error description provided", controlEndpoint()))); | ||
| // Remove entry (shared_ptr --> message queue will be destroyed when it goes out of scope in the worker | ||
| // thread). | ||
| messageQueues_.erase(it); | ||
| } | ||
| return false; | ||
| } |
| case Message::Error: { | ||
| std::lock_guard<std::mutex> lock(messageMutex_); | ||
| auto it = messageQueues_.find(requestID); | ||
| if (it == messageQueues_.end()) { | ||
| return false; | ||
| if (it != messageQueues_.end()) { | ||
| std::string errmsg{static_cast<const char*>(payload.data()), payload.size()}; | ||
| it->second->interrupt(std::make_exception_ptr(RemoteFDBException(errmsg, controlEndpoint()))); | ||
| // Remove entry (shared_ptr --> message queue will be destroyed when it goes out of scope in the worker | ||
| // thread). | ||
| messageQueues_.erase(it); | ||
| } | ||
| std::string msg; | ||
| msg.resize(payload.size(), ' '); | ||
| payload.copy(&msg[0], payload.size()); | ||
| it->second->interrupt(std::make_exception_ptr(RemoteFDBException(msg, controlEndpoint()))); | ||
| // Remove entry (shared_ptr --> message queue will be destroyed when it | ||
| // goes out of scope in the worker thread). | ||
| messageQueues_.erase(it); | ||
| return true; | ||
| return false; | ||
| } |
c1888d6 to
e4d1c46
Compare
fix race condition on archiving FieldLocations to local catalogue fix TocCatalogueWriter selectIndex mutex addressed PR comments reverted wipe permission change
e4d1c46 to
e65b90d
Compare
Description
fixes for local catalogue + remote stores:
improved client de-registration from the connection
small cleanup
Contributor Declaration
By opening this pull request, I affirm the following:
🌈🌦️📖🚧 Documentation FDB 🚧📖🌦️🌈
https://sites.ecmwf.int/docs/fdb/pull-requests/PR-281