From c9b52f06ef534f36cb72ea6ef34aae46610edf7c Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Wed, 13 May 2026 17:01:11 +0200 Subject: [PATCH 1/6] feat(remote): deregister connection on dtor --- src/fdb5/api/RemoteFDB.cc | 4 ++++ src/fdb5/api/RemoteFDB.h | 2 +- src/fdb5/remote/client/Client.cc | 8 +++++++- src/fdb5/remote/client/Client.h | 7 +++++++ src/fdb5/remote/client/RemoteStore.cc | 2 ++ 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/fdb5/api/RemoteFDB.cc b/src/fdb5/api/RemoteFDB.cc index 2b9b0a701..e9f3bc5ea 100644 --- a/src/fdb5/api/RemoteFDB.cc +++ b/src/fdb5/api/RemoteFDB.cc @@ -228,6 +228,10 @@ RemoteFDB::RemoteFDB(const Configuration& config, const std::string& name) : Loc ReadLimiter::init(memoryLimit); } +RemoteFDB::~RemoteFDB() { + deregister(); +} + // ----------------------------------------------------------------------------------------------------- // forwardApiCall captures the asynchronous behaviour: diff --git a/src/fdb5/api/RemoteFDB.h b/src/fdb5/api/RemoteFDB.h index 236f51d31..98c99481f 100644 --- a/src/fdb5/api/RemoteFDB.h +++ b/src/fdb5/api/RemoteFDB.h @@ -43,7 +43,7 @@ class RemoteFDB : public LocalFDB, public Client { public: // method RemoteFDB(const eckit::Configuration& config, const std::string& name); - ~RemoteFDB() override {} + ~RemoteFDB() override; ListIterator inspect(const metkit::mars::MarsRequest& request) override; diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index e3e51e784..1d32ced44 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -69,8 +69,14 @@ void Client::refreshConnection() { connection_->add(*this); } +void Client::deregister() { + if (!deregistered_.exchange(true)) { + connection_->remove(id_); + } +} + Client::~Client() { - connection_->remove(id_); + deregister(); } void Client::controlWriteCheckResponse(const Message msg, const uint32_t requestID, const bool dataListener, diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 320a6a62c..4fbee3bb6 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -17,6 +17,7 @@ #include "fdb5/remote/Messages.h" #include "fdb5/remote/client/ClientConnection.h" +#include #include #include // std::pair #include @@ -91,6 +92,11 @@ class Client { protected: + /// Deregister this client from its connection. Idempotent. + /// Derived classes with state accessed by handle() should call this + /// in their destructor, before that state is destroyed. + void deregister(); + std::shared_ptr connection_; private: @@ -100,6 +106,7 @@ class Client { private: uint32_t id_; + std::atomic deregistered_{false}; mutable std::mutex blockingRequestMutex_; }; diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index d4836d26e..1f73c0d71 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -234,6 +234,8 @@ RemoteStore::RemoteStore(const eckit::URI& uri, const Config& config) : } RemoteStore::~RemoteStore() { + deregister(); + // If we have launched a thread with an async and we manage to get here, this is // an error. n.b. if we don't do something, we will block in the destructor // of std::future. From dc0783794531bf29e0ebf73f82873feec4bfba95 Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Wed, 13 May 2026 17:02:18 +0200 Subject: [PATCH 2/6] feat(remote): remove dead code --- src/fdb5/remote/client/ClientConnection.h | 2 -- src/fdb5/remote/client/RemoteStore.h | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/fdb5/remote/client/ClientConnection.h b/src/fdb5/remote/client/ClientConnection.h index d0ffb102b..65a2dcdc3 100644 --- a/src/fdb5/remote/client/ClientConnection.h +++ b/src/fdb5/remote/client/ClientConnection.h @@ -105,8 +105,6 @@ class ClientConnection : protected Connection { std::thread listeningControlThread_; std::thread listeningDataThread_; - std::mutex requestMutex_; - // requestID std::mutex idMutex_; uint32_t id_; diff --git a/src/fdb5/remote/client/RemoteStore.h b/src/fdb5/remote/client/RemoteStore.h index 9b6eb27e2..376c1f306 100644 --- a/src/fdb5/remote/client/RemoteStore.h +++ b/src/fdb5/remote/client/RemoteStore.h @@ -186,6 +186,7 @@ class RemoteStore : public Store, public Client { // complete, errored or otherwise killed, it needs to be removed from the map. // The shared_ptr allows this removal to be asynchronous with the actual task // cleaning up and returning to the client. + /// @note `messageQueues_` is never populated; it's a dead code. std::map> messageQueues_; std::map> retrieveMessageQueues_; From 3862b4ec5574d08461ac5bdb9b7bddacc0c6f0fe Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Wed, 13 May 2026 17:19:19 +0200 Subject: [PATCH 3/6] fix(remote): mutex scopes --- src/fdb5/remote/client/ClientConnection.cc | 89 +++++++++++----------- 1 file changed, 44 insertions(+), 45 deletions(-) diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 466717b00..3de376817 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -356,39 +356,40 @@ void ClientConnection::listeningControlThreadLoop() { ASSERT(hdr.control() || single_); - std::lock_guard lock(promisesMutex_); - - auto pp = promises_.find(hdr.requestID); - if (pp != promises_.end()) { - if (hdr.payloadSize == 0) { - ASSERT(hdr.message == Message::Received); - pp->second.set_value(Buffer(0)); - } - else { - pp->second.set_value(std::move(payload)); + { + // only hold it for the promise lookup/fulfillment, then release before calling handle(). + std::lock_guard lock(promisesMutex_); + + auto pp = promises_.find(hdr.requestID); + if (pp != promises_.end()) { + if (hdr.payloadSize == 0) { + ASSERT(hdr.message == Message::Received); + pp->second.set_value(Buffer(0)); + } + else { + pp->second.set_value(std::move(payload)); + } + promises_.erase(pp); + handled = true; } - promises_.erase(pp); - handled = true; } - else { - Client* client = nullptr; - { - std::lock_guard lock(clientsMutex_); - - auto it = clients_.find(hdr.clientID()); - if (it == clients_.end()) { - std::ostringstream ss; - ss << "ERROR: CONTROL connection=" << controlEndpoint_ - << " received [clientID=" << hdr.clientID() << ",requestID=" << hdr.requestID - << ",message=" << hdr.message << ",payload=" << hdr.payloadSize << "]" << std::endl; - ss << "ClientID (" << hdr.clientID() << ") not found. ABORTING"; - Log::status() << ss.str() << std::endl; - Log::error() << "Retrieving... " << ss.str() << std::endl; - throw SeriousBug(ss.str(), Here()); - } - client = it->second; + + if (!handled) { + std::lock_guard lock(clientsMutex_); + + auto it = clients_.find(hdr.clientID()); + if (it == clients_.end()) { + std::ostringstream ss; + ss << "ERROR: CONTROL connection=" << controlEndpoint_ + << " received [clientID=" << hdr.clientID() << ",requestID=" << hdr.requestID + << ",message=" << hdr.message << ",payload=" << hdr.payloadSize << "]" << std::endl; + ss << "ClientID (" << hdr.clientID() << ") not found. ABORTING"; + Log::status() << ss.str() << std::endl; + Log::error() << "Retrieving... " << ss.str() << std::endl; + throw SeriousBug(ss.str(), Here()); } + auto* client = it->second; if (hdr.payloadSize == 0) { handled = client->handle(hdr.message, hdr.requestID); } @@ -464,25 +465,23 @@ void ClientConnection::listeningDataThreadLoop() { else { if (hdr.clientID()) { bool handled = false; - Client* client = nullptr; - { - std::lock_guard lock(clientsMutex_); + // Hold clientsMutex_ across handle() to prevent the Client + // from being destroyed (via remove()) while handle() is in flight. + std::lock_guard lock(clientsMutex_); - auto it = clients_.find(hdr.clientID()); - if (it == clients_.end()) { - std::ostringstream ss; - ss << "ERROR: DATA connection=" << dataEndpoint_ << " received [clientID=" << hdr.clientID() - << ",requestID=" << hdr.requestID << ",message=" << hdr.message - << ",payload=" << hdr.payloadSize << "]" << std::endl; - ss << "ClientID (" << hdr.clientID() << ") not found. ABORTING"; - Log::status() << ss.str() << std::endl; - Log::error() << "Retrieving... " << ss.str() << std::endl; - throw SeriousBug(ss.str(), Here()); - } - client = it->second; + auto it = clients_.find(hdr.clientID()); + if (it == clients_.end()) { + std::ostringstream ss; + ss << "ERROR: DATA connection=" << dataEndpoint_ << " received [clientID=" << hdr.clientID() + << ",requestID=" << hdr.requestID << ",message=" << hdr.message + << ",payload=" << hdr.payloadSize << "]" << std::endl; + ss << "ClientID (" << hdr.clientID() << ") not found. ABORTING"; + Log::status() << ss.str() << std::endl; + Log::error() << "Retrieving... " << ss.str() << std::endl; + throw SeriousBug(ss.str(), Here()); } - ASSERT(client); + auto* client = it->second; ASSERT(!hdr.control()); if (hdr.payloadSize == 0) { handled = client->handle(hdr.message, hdr.requestID); From 00e78d5c3dac66d56b65b0a9f6c7c7c81dccfb96 Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Fri, 15 May 2026 14:00:08 +0200 Subject: [PATCH 4/6] fix(remote): thread safe ReadLimiter init --- src/fdb5/api/RemoteFDB.cc | 5 +++-- src/fdb5/remote/client/ReadLimiter.cc | 6 +++--- src/fdb5/remote/client/RemoteStore.cc | 16 +++++++++++++++- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/fdb5/api/RemoteFDB.cc b/src/fdb5/api/RemoteFDB.cc index e9f3bc5ea..9ea45de42 100644 --- a/src/fdb5/api/RemoteFDB.cc +++ b/src/fdb5/api/RemoteFDB.cc @@ -220,8 +220,9 @@ RemoteFDB::RemoteFDB(const Configuration& config, const std::string& name) : Loc config_.set("fieldLocationEndpoints", fieldLocationEndpoints); config_.overrideSchema(static_cast(controlEndpoint()) + "/schema", schema); - /// @note: We must instantiate the ReadLimiter before any RemoteStores due to their static initialisation. - /// @todo: this may change in future. + // Initialise ReadLimiter with user-configured limit. + // RemoteStore constructors call init() with a default limit as a fallback + // for standalone use (e.g. type=local, store=remote). static size_t memoryLimit = Resource("$FDB_READ_LIMIT;fdbReadLimit", config_.userConfig().getUnsigned("limits.read", size_t(1) * 1024 * 1024 * 1024)); // 1GiB diff --git a/src/fdb5/remote/client/ReadLimiter.cc b/src/fdb5/remote/client/ReadLimiter.cc index 51e34eae3..e9f7f6bb5 100644 --- a/src/fdb5/remote/client/ReadLimiter.cc +++ b/src/fdb5/remote/client/ReadLimiter.cc @@ -9,6 +9,7 @@ */ #include "fdb5/remote/client/ReadLimiter.h" +#include #include #include "eckit/config/Resource.h" #include "fdb5/remote/client/RemoteStore.h" @@ -18,6 +19,7 @@ namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- namespace { ReadLimiter* instance_ = nullptr; +std::once_flag initFlag_; } // namespace bool ReadLimiter::isInitialised() { @@ -30,9 +32,7 @@ ReadLimiter& ReadLimiter::instance() { } void ReadLimiter::init(size_t memoryLimit) { - if (!instance_) { - instance_ = new ReadLimiter(memoryLimit); - } + std::call_once(initFlag_, [memoryLimit] { instance_ = new ReadLimiter(memoryLimit); }); } ReadLimiter::ReadLimiter(size_t memoryLimit) : memoryUsed_{0}, memoryLimit_{memoryLimit} {} diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index 1f73c0d71..1f0bf3def 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -24,6 +24,7 @@ #include "fdb5/remote/client/ReadLimiter.h" #include "fdb5/rules/Rule.h" +#include "eckit/config/Resource.h" #include "eckit/exception/Exceptions.h" #include "eckit/filesystem/URI.h" #include "eckit/io/Length.h" @@ -219,18 +220,31 @@ Client::EndpointList storeEndpoints(const Config& config) { return out; } +/// Default ReadLimiter memory limit used when RemoteStore is constructed standalone +/// (e.g. type=local, store=remote). When used via RemoteFDB, init() is called in its +/// constructor first with user-configured limits; std::call_once inside init() makes +/// subsequent calls no-ops, so the user's limit wins. +size_t defaultReadLimit() { + static size_t limit = + eckit::Resource("$FDB_READ_LIMIT;fdbReadLimit", size_t{1} * 1024 * 1024 * 1024); // 1 GiB default + return limit; +} + } // namespace //---------------------------------------------------------------------------------------------------------------------- RemoteStore::RemoteStore(const Key& dbKey, const Config& config) : - Client(config, storeEndpoints(config)), dbKey_(dbKey), config_(config) {} + Client(config, storeEndpoints(config)), dbKey_(dbKey), config_(config) { + ReadLimiter::init(defaultReadLimit()); +} // this is used only in retrieval, with an URI already referring to an accessible Store RemoteStore::RemoteStore(const eckit::URI& uri, const Config& config) : Client(config, eckit::net::Endpoint(uri.hostport()), uri.hostport()), config_(config) { // no need to set the local_ flag on the read path ASSERT(uri.scheme() == "fdb"); + ReadLimiter::init(defaultReadLimit()); } RemoteStore::~RemoteStore() { From e65b90d39858a37df51c7020f7923c5ebd7c266e Mon Sep 17 00:00:00 2001 From: Emanuele Danovaro Date: Mon, 25 May 2026 10:03:41 +0200 Subject: [PATCH 5/6] Message::Error handling on client side fix race condition on archiving FieldLocations to local catalogue fix TocCatalogueWriter selectIndex mutex addressed PR comments reverted wipe permission change --- CMakeLists.txt | 7 +- src/fdb5/api/RemoteFDB.cc | 77 +++--- src/fdb5/api/RemoteFDB.h | 1 + src/fdb5/remote/Connection.cc | 22 +- src/fdb5/remote/Connection.h | 12 +- src/fdb5/remote/client/Client.cc | 26 +- src/fdb5/remote/client/Client.h | 9 - src/fdb5/remote/client/ClientConnection.cc | 246 +++++++++--------- src/fdb5/remote/client/ClientConnection.h | 1 - .../remote/client/ClientConnectionRouter.cc | 16 +- src/fdb5/remote/client/ReadLimiter.cc | 54 ++-- src/fdb5/remote/client/ReadLimiter.h | 11 +- src/fdb5/remote/client/RemoteCatalogue.cc | 8 +- src/fdb5/remote/client/RemoteStore.cc | 117 +++------ src/fdb5/remote/client/RemoteStore.h | 4 +- src/fdb5/toc/TocCatalogueWriter.cc | 27 +- src/fdb5/toc/TocCatalogueWriter.h | 10 +- 17 files changed, 350 insertions(+), 298 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 463e37056..b090e1a54 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,10 +6,15 @@ project( fdb5 LANGUAGES C CXX ) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) -# set(CMAKE_COMPILE_WARNING_AS_ERROR ON) + # add_compile_options(-fsanitize=address) # add_compile_options(-fsanitize=address,undefined -fno-omit-frame-pointer) # add_link_options(-fsanitize=address) + +# add_compile_options(-fsanitize=thread) +# add_compile_options(-fsanitize=thread,undefined -fno-omit-frame-pointer) +# add_link_options(-fsanitize=thread) + # set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wno-unused-parameter -Wno-unused-variable -Wno-sign-compare") # set(CMAKE_CXX_FLAGS "-Wno-unused-parameter -Wno-unused-variable -Wno-reorder -Wno-sign-compare -Wvla-cxx-extension") diff --git a/src/fdb5/api/RemoteFDB.cc b/src/fdb5/api/RemoteFDB.cc index 9ea45de42..cc87520ce 100644 --- a/src/fdb5/api/RemoteFDB.cc +++ b/src/fdb5/api/RemoteFDB.cc @@ -160,7 +160,7 @@ const net::Endpoint& RemoteFDB::storeEndpoint(const net::Endpoint& fieldLocation RemoteFDB::RemoteFDB(const Configuration& config, const std::string& name) : LocalFDB(config, name), Client(config) { - Buffer buf = controlWriteReadResponse(remote::Message::Stores, generateRequestID()); + Buffer buf = controlWriteReadResponse(Message::Stores, generateRequestID()); MemoryStream s(buf); size_t numStores; s >> numStores; @@ -211,7 +211,7 @@ RemoteFDB::RemoteFDB(const Configuration& config, const std::string& name) : Loc fieldLocationEndpoints.push_back(""); } - Buffer buf2 = controlWriteReadResponse(remote::Message::Schema, generateRequestID()); + Buffer buf2 = controlWriteReadResponse(Message::Schema, generateRequestID()); MemoryStream s2(buf2); Schema* schema = Reanimator::reanimate(s2); @@ -219,14 +219,6 @@ RemoteFDB::RemoteFDB(const Configuration& config, const std::string& name) : Loc config_.set("stores", stores); config_.set("fieldLocationEndpoints", fieldLocationEndpoints); config_.overrideSchema(static_cast(controlEndpoint()) + "/schema", schema); - - // Initialise ReadLimiter with user-configured limit. - // RemoteStore constructors call init() with a default limit as a fallback - // for standalone use (e.g. type=local, store=remote). - static size_t memoryLimit = - Resource("$FDB_READ_LIMIT;fdbReadLimit", - config_.userConfig().getUnsigned("limits.read", size_t(1) * 1024 * 1024 * 1024)); // 1GiB - ReadLimiter::init(memoryLimit); } RemoteFDB::~RemoteFDB() { @@ -255,14 +247,16 @@ auto RemoteFDB::forwardApiCall(const HelperClass& helper, const FDBToolRequest& // Ensure we have an entry in the message queue before we trigger anything that // will result in return messages - uint32_t id = generateRequestID(); - auto entry = messageQueues_.emplace(id, std::make_shared(HelperClass::queueSize())); - ASSERT(entry.second); - std::shared_ptr messageQueue(entry.first->second); + std::shared_ptr messageQueue; + { + std::lock_guard lock(messageMutex_); + auto entry = messageQueues_.emplace(id, std::make_shared(HelperClass::queueSize())); + ASSERT(entry.second); + messageQueue = entry.first->second; + } // Encode the request and send it to the server - Buffer encodeBuffer(HelperClass::bufferSize()); MemoryStream s(encodeBuffer); s << request; @@ -320,64 +314,69 @@ const Configuration& RemoteFDB::clientConfig() const { return config(); } -bool RemoteFDB::handle(remote::Message message, uint32_t requestID) { +bool RemoteFDB::handle(Message message, uint32_t requestID) { switch (message) { case Message::Complete: { - + std::lock_guard lock(messageMutex_); auto it = messageQueues_.find(requestID); if (it == messageQueues_.end()) { return false; } it->second->close(); - // Remove entry (shared_ptr --> message queue will be destroyed when it - // goes out of scope in the worker thread). + // Remove entry (shared_ptr --> message queue will be destroyed when it goes out of scope in the worker + // thread). messageQueues_.erase(it); return true; } case Message::Error: { - - std::ostringstream ss; - ss << "RemoteFDB - client id: " << clientId() - << " - received an error without error description for requestID " << requestID << std::endl; - throw RemoteFDBException(ss.str(), controlEndpoint()); - - return false; + std::lock_guard lock(messageMutex_); + // Received Error message without error description. Remove the corresponding entry from the message queue + // and let the caller know & complain + auto it = messageQueues_.find(requestID); + if (it != messageQueues_.end()) { + it->second->interrupt( + std::make_exception_ptr(RemoteFDBException("no error description provided", controlEndpoint()))); + // Remove entry (shared_ptr --> message queue will be destroyed when it goes out of scope in the worker + // thread). + messageQueues_.erase(it); + } + return true; } default: + Log::error() << *this << " - Received unexpected [message=" << message << ",requestID=" << requestID << "]" + << std::endl; return false; } } -bool RemoteFDB::handle(remote::Message message, uint32_t requestID, Buffer&& payload) { +bool RemoteFDB::handle(Message message, uint32_t requestID, Buffer&& payload) { switch (message) { case Message::Blob: { + std::lock_guard lock(messageMutex_); auto it = messageQueues_.find(requestID); if (it == messageQueues_.end()) { return false; } - it->second->emplace(std::move(payload)); return true; } - case Message::Error: { - + std::lock_guard lock(messageMutex_); auto it = messageQueues_.find(requestID); - if (it == messageQueues_.end()) { - return false; + if (it != messageQueues_.end()) { + std::string errmsg{static_cast(payload.data()), payload.size()}; + it->second->interrupt(std::make_exception_ptr(RemoteFDBException(errmsg, controlEndpoint()))); + // Remove entry (shared_ptr --> message queue will be destroyed when it goes out of scope in the worker + // thread). + messageQueues_.erase(it); } - std::string msg; - msg.resize(payload.size(), ' '); - payload.copy(&msg[0], payload.size()); - it->second->interrupt(std::make_exception_ptr(RemoteFDBException(msg, controlEndpoint()))); - // Remove entry (shared_ptr --> message queue will be destroyed when it - // goes out of scope in the worker thread). - messageQueues_.erase(it); return true; } default: + Log::warning() << *this << " - Received unexpected [message=" << message << ",requestID=" << requestID + << ",payloadSize=" << payload.size() << "]" << std::endl; return false; } } diff --git a/src/fdb5/api/RemoteFDB.h b/src/fdb5/api/RemoteFDB.h index 98c99481f..b402db906 100644 --- a/src/fdb5/api/RemoteFDB.h +++ b/src/fdb5/api/RemoteFDB.h @@ -96,6 +96,7 @@ class RemoteFDB : public LocalFDB, public Client { // The shared_ptr allows this removal to be asynchronous with the actual task // cleaning up and returning to the client. std::unordered_map> messageQueues_; + std::mutex messageMutex_; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/Connection.cc b/src/fdb5/remote/Connection.cc index 4d7b2d846..cd1a8891e 100644 --- a/src/fdb5/remote/Connection.cc +++ b/src/fdb5/remote/Connection.cc @@ -15,7 +15,7 @@ namespace fdb5::remote { Connection::Connection() : single_(false) {} void Connection::teardown() { - closingSocket_ = true; + closingSocket_.store(true); if (!valid()) { return; @@ -91,11 +91,29 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { && readUnsafe(socket, &tail, sizeof(tail))) { ASSERT(tail == MessageHeader::EndMarker); + + if (hdr.message == Message::Exit) { + closingSocket_.store(true); + } + if (hdr.message == Message::Error) { + eckit::net::Endpoint remoteEndpoint{socket.remoteHost(), socket.remotePort()}; + std::ostringstream ss; + if (payload.size() == 0) { + ss << "Received an error without error description for clientID " << hdr.clientID() << " requestID " + << hdr.requestID << std::endl; + throw RemoteFDBException(ss.str(), remoteEndpoint); + } + std::string errmsg{static_cast(payload.data()), payload.size()}; + ss << "Received error message: \"" << errmsg << "\" from " << remoteEndpoint << " for clientID " + << hdr.clientID() << " requestID " << hdr.requestID << std::endl; + eckit::Log::warning() << ss.str(); + } return payload; } } hdr.message = Message::Exit; + closingSocket_.store(true); return eckit::Buffer{0}; } @@ -130,7 +148,7 @@ void Connection::write(const Message msg, const bool control, const uint32_t cli void Connection::error(std::string_view msg, uint32_t clientID, uint32_t requestID) const { eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl; - write(Message::Error, false, clientID, requestID, msg.data(), msg.length()); + write(Message::Error, true, clientID, requestID, msg.data(), msg.length()); } eckit::Buffer Connection::readControl(MessageHeader& hdr) const { diff --git a/src/fdb5/remote/Connection.h b/src/fdb5/remote/Connection.h index f851b8eaf..221473752 100644 --- a/src/fdb5/remote/Connection.h +++ b/src/fdb5/remote/Connection.h @@ -19,6 +19,7 @@ #include #include "eckit/exception/Exceptions.h" +#include "eckit/net/Endpoint.h" #include "eckit/net/TCPSocket.h" #include "eckit/os/BackTrace.h" #include "eckit/serialisation/MemoryStream.h" @@ -50,6 +51,15 @@ class TCPException : public eckit::Exception { //---------------------------------------------------------------------------------------------------------------------- +class RemoteFDBException : public eckit::RemoteException { +public: + + RemoteFDBException(const std::string& msg, const eckit::net::Endpoint& endpoint) : + eckit::RemoteException(msg, endpoint) {} +}; + +//---------------------------------------------------------------------------------------------------------------------- + class Connection { public: // types @@ -106,7 +116,7 @@ class Connection { private: // members - bool closingSocket_ = false; + mutable std::atomic closingSocket_{false}; mutable std::mutex controlMutex_; mutable std::mutex dataMutex_; diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index 1d32ced44..a096bc723 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -59,7 +59,7 @@ Client::Client(const eckit::Configuration& config, } void Client::refreshConnection() { - if (connection_->valid()) { + if (connection_->valid()) { // Connection is still valid, no need to refresh return; } eckit::Log::warning() << "Connection to " << connection_->controlEndpoint() @@ -92,8 +92,16 @@ void Client::controlWriteCheckResponse(const Message msg, const uint32_t request } auto f = connection_->controlWrite(*this, msg, requestID, dataListener, payloads); - f.wait(); - ASSERT(f.get().size() == 0); + try { + f.wait(); + ASSERT(f.get().size() == 0); + } + catch (const std::exception& e) { + std::ostringstream ss; + ss << "Error while waiting for response to control message " << msg << " with requestID " << requestID << ": " + << e.what(); + throw RemoteFDBException(ss.str(), connection_->controlEndpoint()); + } } eckit::Buffer Client::controlWriteReadResponse(const Message msg, const uint32_t requestID, const void* const payload, @@ -109,8 +117,16 @@ eckit::Buffer Client::controlWriteReadResponse(const Message msg, const uint32_t } auto f = connection_->controlWrite(*this, msg, requestID, false, payloads); - f.wait(); - return eckit::Buffer{f.get()}; + try { + f.wait(); + return eckit::Buffer{f.get()}; + } + catch (const std::exception& e) { + std::ostringstream ss; + ss << "Error while waiting for response to control message " << msg << " with requestID " << requestID << ": " + << e.what(); + throw RemoteFDBException(ss.str(), connection_->controlEndpoint()); + } } void Client::dataWrite(Message msg, uint32_t requestID, PayloadList payloads) { diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 4fbee3bb6..7bf09ad1f 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -28,15 +28,6 @@ namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- -class RemoteFDBException : public eckit::RemoteException { -public: - - RemoteFDBException(const std::string& msg, const eckit::net::Endpoint& endpoint) : - eckit::RemoteException(msg, endpoint) {} -}; - -//---------------------------------------------------------------------------------------------------------------------- - class Client { public: // types diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 3de376817..191a59715 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -330,6 +330,13 @@ SessionID ClientConnection::verifyServerStartupResponse() { return serverSession; } +std::string msgHeader(MessageHeader& hdr, net::Endpoint& endpoint) { + std::ostringstream ss; + ss << (hdr.control() ? "CONTROL" : "DATA") << " connection=" << endpoint << " [message=" << hdr.message + << ",clientID=" << hdr.clientID() << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]"; + return ss.str(); +} + void ClientConnection::listeningControlThreadLoop() { try { @@ -339,29 +346,47 @@ void ClientConnection::listeningControlThreadLoop() { while (true) { Buffer payload = Connection::readControl(hdr); - - LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningControlThreadLoop - got [message=" << hdr.message - << ",clientID=" << hdr.clientID() << ",control=" << hdr.control() - << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" - << std::endl; + LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningControlThreadLoop - " + << msgHeader(hdr, controlEndpoint_) << std::endl; if (hdr.message == Message::Exit) { - LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningControlThreadLoop() -- Control thread stopping" + LOG_DEBUG_LIB(LibFdb5) << "CONTROL connection=" << controlEndpoint_ << " - thread stopping" << std::endl; return; } - else { - if (hdr.clientID()) { - bool handled = false; - - ASSERT(hdr.control() || single_); - { - // only hold it for the promise lookup/fulfillment, then release before calling handle(). - std::lock_guard lock(promisesMutex_); - - auto pp = promises_.find(hdr.requestID); - if (pp != promises_.end()) { + if (hdr.clientID()) { + ASSERT(hdr.control() || single_); + + bool found = false; + bool handled = false; + { + // is the message a response to a blocking request? + // acquire the mutex and look for the request ID in the promises map + // only hold the mutex for the promise lookup/fulfillment, then release before calling handle(). + std::lock_guard lock(promisesMutex_); + auto pp = promises_.find(hdr.requestID); + if (pp != promises_.end()) { + found = true; + if (hdr.message == Message::Error) { // this is an error response to a blocking request, + // set the exception on the promise + std::string errmsg = + (hdr.payloadSize == 0) + ? "remote error - no error message provided" + : std::string{static_cast(payload.data()), payload.size()}; + try { + pp->second.set_exception( + std::make_exception_ptr(RemoteFDBException(errmsg, controlEndpoint()))); + } + catch (const std::exception& e) { + Log::error() + << "ERROR: " << msgHeader(hdr, controlEndpoint_) << " - received error \"" << errmsg + << "\" for blocking request - unable to set the exception on the promise: " + << e.what() << std::endl; + } + handled = true; + } + else { if (hdr.payloadSize == 0) { ASSERT(hdr.message == Message::Received); pp->second.set_value(Buffer(0)); @@ -369,61 +394,56 @@ void ClientConnection::listeningControlThreadLoop() { else { pp->second.set_value(std::move(payload)); } - promises_.erase(pp); handled = true; } + promises_.erase(pp); + } + } + if (!found) { + // if not a response to a blocking request, then it must be a message for a client, look up the + // client and call handle() + std::lock_guard lock(clientsMutex_); + auto it = clients_.find(hdr.clientID()); + if (it == clients_.end()) { + std::ostringstream ss; + ss << "ERROR: " << msgHeader(hdr, controlEndpoint_) << " - ClientID not found. ABORTING"; + Log::status() << ss.str() << std::endl; + Log::error() << ss.str() << std::endl; + throw SeriousBug(ss.str(), Here()); } - if (!handled) { - std::lock_guard lock(clientsMutex_); - - auto it = clients_.find(hdr.clientID()); - if (it == clients_.end()) { - std::ostringstream ss; - ss << "ERROR: CONTROL connection=" << controlEndpoint_ - << " received [clientID=" << hdr.clientID() << ",requestID=" << hdr.requestID - << ",message=" << hdr.message << ",payload=" << hdr.payloadSize << "]" << std::endl; - ss << "ClientID (" << hdr.clientID() << ") not found. ABORTING"; - Log::status() << ss.str() << std::endl; - Log::error() << "Retrieving... " << ss.str() << std::endl; - throw SeriousBug(ss.str(), Here()); - } - - auto* client = it->second; - if (hdr.payloadSize == 0) { - handled = client->handle(hdr.message, hdr.requestID); - } - else { - handled = client->handle(hdr.message, hdr.requestID, std::move(payload)); - } + auto* client = it->second; + if (hdr.payloadSize == 0) { + handled = client->handle(hdr.message, hdr.requestID); + } + else { + handled = client->handle(hdr.message, hdr.requestID, std::move(payload)); } + } - if (!handled) { - std::ostringstream ss; - if (hdr.message == Message::Error) { - ss << "RemoteFDB received an unhandled error on CONTROL connection. [clientID=" - << hdr.clientID() << ",requestID=" << hdr.requestID << "]"; - if (hdr.payloadSize) { - std::string msg; - msg.resize(payload.size(), ' '); - payload.copy(msg.data(), payload.size()); - ss << ": " << msg; - } - throw RemoteFDBException(ss.str(), controlEndpoint_); - } - else { - ss << "ERROR: CONTROL connection=" << controlEndpoint_ - << "Unexpected message recieved [message=" << hdr.message - << ",clientID=" << hdr.clientID() << ",requestID=" << hdr.requestID << "]. ABORTING"; - Log::status() << ss.str() << std::endl; - Log::error() << "Client Retrieving... " << ss.str() << std::endl; - throw SeriousBug(ss.str(), Here()); + if (!handled) { + std::ostringstream ss; + ss << "ERROR: " << msgHeader(hdr, controlEndpoint_); + + if (hdr.message == Message::Error) { + ss << " - received an unhandled error"; + if (hdr.payloadSize) { + std::string errmsg{static_cast(payload.data()), payload.size()}; + ss << ": \"" << errmsg << "\""; } + Log::status() << ss.str() << std::endl; + Log::error() << ss.str() << std::endl; + throw RemoteFDBException(ss.str(), controlEndpoint_); + } + else { + ss << " - received unexpected message. ABORTING"; + Log::status() << ss.str() << std::endl; + Log::error() << ss.str() << std::endl; + throw SeriousBug(ss.str(), Here()); } } } } - // We don't want to let exceptions escape inside a worker thread. } catch (const std::exception& e) { @@ -434,14 +454,6 @@ void ClientConnection::listeningControlThreadLoop() { } } -void ClientConnection::closeConnection() { - LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::closeConnection() -- Data thread stopping" << std::endl; - std::lock_guard lock(clientsMutex_); - for (auto& [id, client] : clients_) { - client->closeConnection(); - } -} - void ClientConnection::listeningDataThreadLoop() { try { @@ -453,63 +465,63 @@ void ClientConnection::listeningDataThreadLoop() { while (true) { Buffer payload = Connection::readData(hdr); - - LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningDataThreadLoop - got [message=" << hdr.message - << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" + LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningDataThreadLoop - " << msgHeader(hdr, dataEndpoint_) << std::endl; if (hdr.message == Message::Exit) { - closeConnection(); + LOG_DEBUG_LIB(LibFdb5) << "DATA connection=" << dataEndpoint_ << " - thread stopping" << std::endl; + std::lock_guard lock(clientsMutex_); + for (auto& [id, client] : clients_) { + client->closeConnection(); + } return; } - else { - if (hdr.clientID()) { - bool handled = false; - // Hold clientsMutex_ across handle() to prevent the Client - // from being destroyed (via remove()) while handle() is in flight. - std::lock_guard lock(clientsMutex_); - auto it = clients_.find(hdr.clientID()); - if (it == clients_.end()) { - std::ostringstream ss; - ss << "ERROR: DATA connection=" << dataEndpoint_ << " received [clientID=" << hdr.clientID() - << ",requestID=" << hdr.requestID << ",message=" << hdr.message - << ",payload=" << hdr.payloadSize << "]" << std::endl; - ss << "ClientID (" << hdr.clientID() << ") not found. ABORTING"; - Log::status() << ss.str() << std::endl; - Log::error() << "Retrieving... " << ss.str() << std::endl; - throw SeriousBug(ss.str(), Here()); - } + if (hdr.clientID()) { + ASSERT(!hdr.control()); - auto* client = it->second; - ASSERT(!hdr.control()); - if (hdr.payloadSize == 0) { - handled = client->handle(hdr.message, hdr.requestID); - } - else { - handled = client->handle(hdr.message, hdr.requestID, std::move(payload)); - } + bool handled = false; - if (!handled) { - std::ostringstream ss; - if (hdr.message == Message::Error) { - ss << "RemoteFDB received an unhandled error on DATA connection. [clientID=" - << hdr.clientID() << ",requestID=" << hdr.requestID << "]"; - if (hdr.payloadSize) { - std::string msg; - msg.resize(payload.size(), ' '); - payload.copy(msg.data(), payload.size()); - ss << ": " << msg; - } - throw RemoteFDBException(ss.str(), dataEndpoint_); - } - else { - ss << "ERROR: DATA connection=" << dataEndpoint_ << " Unexpected message recieved (" - << hdr.message << "). ABORTING"; - Log::status() << ss.str() << std::endl; - Log::error() << "Client Retrieving... " << ss.str() << std::endl; - throw SeriousBug(ss.str(), Here()); + // Hold clientsMutex_ across handle() to prevent the Client + // from being destroyed (via remove()) while handle() is in flight. + std::lock_guard lock(clientsMutex_); + + auto it = clients_.find(hdr.clientID()); + if (it == clients_.end()) { + std::ostringstream ss; + ss << "ERROR: " << msgHeader(hdr, dataEndpoint_) << " - ClientID not found. ABORTING"; + Log::status() << ss.str() << std::endl; + Log::error() << ss.str() << std::endl; + throw SeriousBug(ss.str(), Here()); + } + + auto* client = it->second; + if (hdr.payloadSize == 0) { + handled = client->handle(hdr.message, hdr.requestID); + } + else { + handled = client->handle(hdr.message, hdr.requestID, std::move(payload)); + } + + if (!handled) { + std::ostringstream ss; + ss << "ERROR: " << msgHeader(hdr, dataEndpoint_); + + if (hdr.message == Message::Error) { + ss << " - received an unhandled error"; + if (hdr.payloadSize) { + std::string errmsg{static_cast(payload.data()), payload.size()}; + ss << ": \"" << errmsg << "\""; } + Log::status() << ss.str() << std::endl; + Log::error() << ss.str() << std::endl; + throw RemoteFDBException(ss.str(), dataEndpoint_); + } + else { + ss << " - received unexpected message. ABORTING"; + Log::status() << ss.str() << std::endl; + Log::error() << ss.str() << std::endl; + throw SeriousBug(ss.str(), Here()); } } } diff --git a/src/fdb5/remote/client/ClientConnection.h b/src/fdb5/remote/client/ClientConnection.h index 65a2dcdc3..2c5baf89b 100644 --- a/src/fdb5/remote/client/ClientConnection.h +++ b/src/fdb5/remote/client/ClientConnection.h @@ -81,7 +81,6 @@ class ClientConnection : protected Connection { void listeningControlThreadLoop(); void listeningDataThreadLoop(); void dataWriteThreadLoop(); - void closeConnection(); const eckit::net::TCPSocket& controlSocket() const override { return controlClient_; } diff --git a/src/fdb5/remote/client/ClientConnectionRouter.cc b/src/fdb5/remote/client/ClientConnectionRouter.cc index 01e861c43..6086e8e8b 100644 --- a/src/fdb5/remote/client/ClientConnectionRouter.cc +++ b/src/fdb5/remote/client/ClientConnectionRouter.cc @@ -1,7 +1,8 @@ #include "fdb5/remote/client/ClientConnectionRouter.h" -namespace { +#include +namespace { class ConnectionError : public eckit::Exception { public: @@ -24,7 +25,11 @@ ConnectionError::ConnectionError(const eckit::net::Endpoint& endpoint) { reason(s.str()); eckit::Log::status() << what() << std::endl; } + +std::mutex initMutex; +std::unique_ptr instance_{nullptr}; } // namespace + namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- @@ -86,7 +91,7 @@ std::shared_ptr ClientConnectionRouter::connection( std::shared_ptr ClientConnectionRouter::refresh(const eckit::Configuration& config, const std::shared_ptr& connection) { - std::lock_guard lock(connectionMutex_); + std::lock_guard lock(connectionMutex_); const auto iter = connections_.find(connection->controlEndpoint()); if (iter == connections_.end() || !iter->second->valid()) { auto newConnection = @@ -110,8 +115,11 @@ void ClientConnectionRouter::deregister(ClientConnection& connection) { } ClientConnectionRouter& ClientConnectionRouter::instance() { - static ClientConnectionRouter router; - return router; + std::lock_guard lock(initMutex); + if (!instance_) { + instance_.reset(new ClientConnectionRouter()); + } + return *instance_; } void ClientConnectionRouter::teardown(std::exception_ptr e) { diff --git a/src/fdb5/remote/client/ReadLimiter.cc b/src/fdb5/remote/client/ReadLimiter.cc index e9f7f6bb5..38217e092 100644 --- a/src/fdb5/remote/client/ReadLimiter.cc +++ b/src/fdb5/remote/client/ReadLimiter.cc @@ -18,21 +18,28 @@ namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- namespace { -ReadLimiter* instance_ = nullptr; -std::once_flag initFlag_; +std::mutex instanceMutex_; +std::unique_ptr instance_{nullptr}; } // namespace -bool ReadLimiter::isInitialised() { - return instance_ != nullptr; +void ReadLimiter::init(size_t memoryLimit) { + std::lock_guard lock(instanceMutex_); + if (instance_ == nullptr) { + instance_.reset(new ReadLimiter(memoryLimit)); + } } ReadLimiter& ReadLimiter::instance() { - ASSERT(instance_); + std::lock_guard lock(instanceMutex_); + if (instance_ == nullptr) { + instance_.reset(new ReadLimiter(defaultReadLimit())); + } return *instance_; } -void ReadLimiter::init(size_t memoryLimit) { - std::call_once(initFlag_, [memoryLimit] { instance_ = new ReadLimiter(memoryLimit); }); +size_t ReadLimiter::defaultReadLimit() { + static size_t limit = eckit::Resource("$FDB_READ_LIMIT;fdbReadLimit", size_t{1_GiB}); // 1 GiB default + return limit; } ReadLimiter::ReadLimiter(size_t memoryLimit) : memoryUsed_{0}, memoryLimit_{memoryLimit} {} @@ -53,7 +60,7 @@ void ReadLimiter::add(RemoteStore* client, uint32_t id, const FieldLocation& fie } { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); requests_.emplace_back(RequestInfo{client, id, std::move(requestBuffer), requestSize, resultSize}); } @@ -61,7 +68,7 @@ void ReadLimiter::add(RemoteStore* client, uint32_t id, const FieldLocation& fie } bool ReadLimiter::tryNextRequest() { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); if (requests_.empty()) { return false; } @@ -84,7 +91,7 @@ bool ReadLimiter::tryNextRequest() { void ReadLimiter::finishRequest(uint32_t clientID, uint32_t requestID) { { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); auto it = activeRequests_.find(clientID); if (it == activeRequests_.end()) { @@ -104,37 +111,38 @@ void ReadLimiter::finishRequest(uint32_t clientID, uint32_t requestID) { /// @note: Only called when a RemoteStore is destroyed, which is currently on exit. void ReadLimiter::evictClient(size_t clientID) { - { - std::lock_guard lock(mutex_); + std::lock_guard lock(instanceMutex_); + if (instance_ != nullptr) { + auto& instance = *instance_; + std::lock_guard lock(instance.mutex_); // Remove the client's active requests - auto it = activeRequests_.find(clientID); + auto it = instance.activeRequests_.find(clientID); - if (it != activeRequests_.end()) { + if (it != instance.activeRequests_.end()) { for (auto requestID : it->second) { - memoryUsed_ -= resultSizes_[{clientID, requestID}]; - resultSizes_.erase({clientID, requestID}); + instance.memoryUsed_ -= instance.resultSizes_[{clientID, requestID}]; + instance.resultSizes_.erase({clientID, requestID}); } - activeRequests_.erase(it); + instance.activeRequests_.erase(it); } // Clean up any pending requests attributed to this client ///@note O(n), room for optimisation. - auto it2 = requests_.begin(); - while (it2 != requests_.end()) { + auto it2 = instance.requests_.begin(); + while (it2 != instance.requests_.end()) { if (it2->client->id() == clientID) { - it2 = requests_.erase(it2); + it2 = instance.requests_.erase(it2); } else { ++it2; // Only increment if we didn't erase } } + instance.tryNextRequest(); } - - tryNextRequest(); } void ReadLimiter::print(std::ostream& out) const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); out << "ReadLimiter(memoryUsed=" << memoryUsed_ << ", memoryLimit=" << memoryLimit_ << ") {" << std::endl; diff --git a/src/fdb5/remote/client/ReadLimiter.h b/src/fdb5/remote/client/ReadLimiter.h index bcacc403a..724e4e649 100644 --- a/src/fdb5/remote/client/ReadLimiter.h +++ b/src/fdb5/remote/client/ReadLimiter.h @@ -40,8 +40,7 @@ struct RequestInfo { class ReadLimiter { public: - static bool isInitialised(); - + static void init(size_t memoryLimit); static ReadLimiter& instance(); ReadLimiter(const ReadLimiter&) = delete; @@ -49,8 +48,6 @@ class ReadLimiter { ReadLimiter(ReadLimiter&&) = delete; ReadLimiter& operator=(ReadLimiter&&) = delete; - static void init(size_t memoryLimit); - // Add a new request to the queue of requests to be sent. Will not be sent until we know we have buffer space. void add(RemoteStore* client, uint32_t id, const FieldLocation& fieldLocation, const Key& remapKey); // use const *? @@ -66,7 +63,7 @@ class ReadLimiter { // request). /// @todo: This is somewhat pointless right now because the RemoteStores appear to be infinitely long lived... /// Revisit if this changes. - void evictClient(size_t clientID); + static void evictClient(size_t clientID); // Debugging void print(std::ostream& out) const; @@ -75,12 +72,14 @@ class ReadLimiter { ReadLimiter(size_t memoryLimit); + static size_t defaultReadLimit(); + // Send the request to the server void sendRequest(const RequestInfo& request) const; private: - mutable std::mutex mutex_; + mutable std::recursive_mutex mutex_; size_t memoryUsed_; const size_t memoryLimit_; diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index a6b5c3713..d17c9e6e3 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -196,14 +196,14 @@ const eckit::Configuration& RemoteCatalogue::clientConfig() const { } bool RemoteCatalogue::handle(Message message, uint32_t requestID) { - Log::warning() << *this << " - Received [message=" << ((uint)message) << ",requestID=" << requestID << "]" - << std::endl; + Log::warning() << *this << " - Received unexpected [message=" << ((uint)message) << ",requestID=" << requestID + << "]" << std::endl; return false; } bool RemoteCatalogue::handle(Message message, uint32_t requestID, eckit::Buffer&& payload) { - LOG_DEBUG_LIB(LibFdb5) << *this << " - Received [message=" << ((uint)message) << ",requestID=" << requestID - << ",payloadSize=" << payload.size() << "]" << std::endl; + Log::warning() << *this << " - Received unexpected [message=" << message << ",requestID=" << requestID + << ",payloadSize=" << payload.size() << "]" << std::endl; return false; } diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index 1f0bf3def..a1b33a407 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -27,6 +27,7 @@ #include "eckit/config/Resource.h" #include "eckit/exception/Exceptions.h" #include "eckit/filesystem/URI.h" +#include "eckit/io/Buffer.h" #include "eckit/io/Length.h" #include "eckit/io/Offset.h" #include "eckit/log/Log.h" @@ -34,6 +35,7 @@ #include "eckit/runtime/Main.h" #include "eckit/serialisation/MemoryStream.h" #include "eckit/serialisation/Reanimator.h" +#include "eckit/utils/Literals.h" #include #include @@ -128,9 +130,7 @@ class FDBRemoteDataHandle : public DataHandle { // If we are in the DataHandle, then there MUST be data to read RemoteStore::StoredMessage msg = std::make_pair(remote::Message{}, eckit::Buffer{0}); - // eckit::Log::info() << "RemoteDataHandle::read() -- popping next" << std::endl; ASSERT(queue_->pop(msg) != -1); - // eckit::Log::info() << "RemoteDataHandle::read() -- popped next" << std::endl; // Handle any remote errors communicated from the server if (msg.first == Message::Error) { @@ -206,9 +206,13 @@ class FDBRemoteDataHandle : public DataHandle { Client::EndpointList storeEndpoints(const Config& config) { ASSERT(config.has("stores")); - ASSERT(config.has("fieldLocationEndpoints")); const auto stores = config.getStringVector("stores"); - const auto fieldLocationEndpoints = config.getStringVector("fieldLocationEndpoints"); + + // endpoints used in RemoteFieldLocations can differ from the store endpoints (e.g. different networks; canonical + // store names) if so, the canonical names must be provided in the fieldLocationEndpoints config, otherwise the + // store endpoints are used. + const auto fieldLocationEndpoints = + config.has("fieldLocationEndpoints") ? config.getStringVector("fieldLocationEndpoints") : stores; ASSERT(stores.size() == fieldLocationEndpoints.size()); @@ -219,32 +223,18 @@ Client::EndpointList storeEndpoints(const Config& config) { } return out; } - -/// Default ReadLimiter memory limit used when RemoteStore is constructed standalone -/// (e.g. type=local, store=remote). When used via RemoteFDB, init() is called in its -/// constructor first with user-configured limits; std::call_once inside init() makes -/// subsequent calls no-ops, so the user's limit wins. -size_t defaultReadLimit() { - static size_t limit = - eckit::Resource("$FDB_READ_LIMIT;fdbReadLimit", size_t{1} * 1024 * 1024 * 1024); // 1 GiB default - return limit; -} - } // namespace //---------------------------------------------------------------------------------------------------------------------- RemoteStore::RemoteStore(const Key& dbKey, const Config& config) : - Client(config, storeEndpoints(config)), dbKey_(dbKey), config_(config) { - ReadLimiter::init(defaultReadLimit()); -} + Client(config, storeEndpoints(config)), dbKey_(dbKey), config_(config) {} // this is used only in retrieval, with an URI already referring to an accessible Store RemoteStore::RemoteStore(const eckit::URI& uri, const Config& config) : Client(config, eckit::net::Endpoint(uri.hostport()), uri.hostport()), config_(config) { // no need to set the local_ flag on the read path ASSERT(uri.scheme() == "fdb"); - ReadLimiter::init(defaultReadLimit()); } RemoteStore::~RemoteStore() { @@ -258,9 +248,7 @@ RemoteStore::~RemoteStore() { eckit::Main::instance().terminate(); } - if (ReadLimiter::isInitialised()) { - ReadLimiter::instance().evictClient(id()); - } + ReadLimiter::evictClient(id()); } eckit::URI RemoteStore::uri() const { @@ -367,16 +355,12 @@ void RemoteStore::print(std::ostream& out) const { } void RemoteStore::closeConnection() { + std::lock_guard lock(messageMutex_); for (auto& kv : messageQueues_) { if (!kv.second->closed()) { kv.second->interrupt(std::make_exception_ptr(eckit::Exception("Unexpected closure of store", Here()))); } } - for (auto& kv : retrieveMessageQueues_) { - if (!kv.second->closed()) { - kv.second->interrupt(std::make_exception_ptr(eckit::Exception("Unexpected closure of store", Here()))); - } - } } const eckit::Configuration& RemoteStore::clientConfig() const { @@ -387,38 +371,31 @@ bool RemoteStore::handle(Message message, uint32_t requestID) { switch (message) { case Message::Complete: { - // eckit::Log::info() << "RemoteStore::handle COMPLETE" << std::endl; - auto it = messageQueues_.find(requestID); - if (it != messageQueues_.end()) { - // eckit::Log::info() << "RemoteStore::handle COMPLETE close and erase queue" << std::endl; - it->second->close(); - - // Remove entry (shared_ptr --> message queue will be destroyed when it - // goes out of scope in the worker thread). - messageQueues_.erase(it); - // eckit::Log::info() << "RemoteStore::handle COMPLETE closed and erased queue" << std::endl; + std::lock_guard lock(messageMutex_); + auto id = messageQueues_.find(requestID); + if (id == messageQueues_.end()) { + return false; } - else { - std::lock_guard lock(retrieveMessageMutex_); - auto id = retrieveMessageQueues_.find(requestID); - ASSERT(id != retrieveMessageQueues_.end()); - id->second->emplace(std::make_pair(message, Buffer(0))); - - retrieveMessageQueues_.erase(id); - } + id->second->emplace(std::make_pair(message, Buffer(0))); + messageQueues_.erase(id); return true; } case Message::Error: { - - std::ostringstream ss; - ss << "RemoteStore client id: " << id() << " - received an error without error description for requestID " - << requestID << std::endl; - throw RemoteFDBException(ss.str(), controlEndpoint()); - - return false; + // Received Error message without error description. Remove the corresponding entry from the message queue + // and let the caller know & complain + std::lock_guard lock(messageMutex_); + auto it = messageQueues_.find(requestID); + if (it != messageQueues_.end()) { + it->second->interrupt( + std::make_exception_ptr(RemoteFDBException("no error description provided", controlEndpoint()))); + messageQueues_.erase(it); + } + return true; } default: + Log::warning() << *this << " - Received unexpected [message=" << ((uint)message) + << ",requestID=" << requestID << "]" << std::endl; return false; } } @@ -426,8 +403,8 @@ bool RemoteStore::handle(Message message, uint32_t requestID, eckit::Buffer&& pa switch (message) { - case Message::Store: { // received a Field location from the remote store, can forward to the archiver for the - // indexing + case Message::Store: { + // received a FieldLocation from the remote store, can forward to the archiver for the indexing MemoryStream s(payload); std::unique_ptr location(eckit::Reanimator::reanimate(s)); if (defaultEndpoint().empty()) { @@ -440,34 +417,25 @@ bool RemoteStore::handle(Message message, uint32_t requestID, eckit::Buffer&& pa } } case Message::Blob: { - auto it = messageQueues_.find(requestID); - if (it != messageQueues_.end()) { - it->second->emplace(message, std::move(payload)); - } - else { - std::lock_guard lock(retrieveMessageMutex_); - auto id = retrieveMessageQueues_.find(requestID); - ASSERT(id != retrieveMessageQueues_.end()); - id->second->emplace(std::make_pair(message, std::move(payload))); - } + std::lock_guard lock(messageMutex_); + auto id = messageQueues_.find(requestID); + ASSERT(id != messageQueues_.end()); + id->second->emplace(std::make_pair(message, std::move(payload))); return true; } case Message::Error: { - + std::lock_guard lock(messageMutex_); auto it = messageQueues_.find(requestID); if (it != messageQueues_.end()) { - std::string msg; - msg.resize(payload.size(), ' '); - payload.copy(&msg[0], payload.size()); - it->second->interrupt(std::make_exception_ptr(RemoteFDBException(msg, controlEndpoint()))); - - // Remove entry (shared_ptr --> message queue will be destroyed when it - // goes out of scope in the worker thread). + std::string errmsg{static_cast(payload.data()), payload.size()}; + it->second->interrupt(std::make_exception_ptr(RemoteFDBException(errmsg, controlEndpoint()))); messageQueues_.erase(it); } return true; } default: + Log::warning() << *this << " - Received unexpected [message=" << message << ",requestID=" << requestID + << ",payloadSize=" << payload.size() << "]" << std::endl; return false; } } @@ -483,9 +451,8 @@ eckit::DataHandle* RemoteStore::dataHandle(const FieldLocation& fieldLocation, c static size_t queueSize = 320; std::shared_ptr queue = nullptr; { - std::lock_guard lock(retrieveMessageMutex_); - - auto entry = retrieveMessageQueues_.emplace(id, std::make_shared(queueSize)); + std::lock_guard lock(messageMutex_); + auto entry = messageQueues_.emplace(id, std::make_shared(queueSize)); ASSERT(entry.second); queue = entry.first->second; diff --git a/src/fdb5/remote/client/RemoteStore.h b/src/fdb5/remote/client/RemoteStore.h index 376c1f306..b040b88b7 100644 --- a/src/fdb5/remote/client/RemoteStore.h +++ b/src/fdb5/remote/client/RemoteStore.h @@ -186,11 +186,9 @@ class RemoteStore : public Store, public Client { // complete, errored or otherwise killed, it needs to be removed from the map. // The shared_ptr allows this removal to be asynchronous with the actual task // cleaning up and returning to the client. - /// @note `messageQueues_` is never populated; it's a dead code. std::map> messageQueues_; - std::map> retrieveMessageQueues_; - std::mutex retrieveMessageMutex_; + std::mutex messageMutex_; Locations locations_; }; diff --git a/src/fdb5/toc/TocCatalogueWriter.cc b/src/fdb5/toc/TocCatalogueWriter.cc index 7a42d488f..61f81e46b 100644 --- a/src/fdb5/toc/TocCatalogueWriter.cc +++ b/src/fdb5/toc/TocCatalogueWriter.cc @@ -52,6 +52,13 @@ TocCatalogueWriter::~TocCatalogueWriter() { // selectIndex is called during schema traversal and in case of out-of-order fieldLocation archival bool TocCatalogueWriter::selectIndex(const Key& idxKey) { + std::lock_guard lock(indexMutex_); + return selectIndexUnsafe(idxKey); +} + +// selectIndexUnsafe is not mutex protected, and should only be called by mutex-protected functions (archive, index, +// selectIndex and reconsolidateIndexesAndTocs) +bool TocCatalogueWriter::selectIndexUnsafe(const Key& idxKey) { currentIndexKey_ = idxKey; @@ -139,14 +146,16 @@ void TocCatalogueWriter::close() { } void TocCatalogueWriter::index(const Key& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) { + std::lock_guard lock(indexMutex_); + archivedLocations_++; if (current_.null()) { ASSERT(!currentIndexKey_.empty()); - selectIndex(currentIndexKey_); + selectIndexUnsafe(currentIndexKey_); } - Field field(TocFieldLocation(uri, offset, length, Key()), currentIndex().timestamp()); + Field field(TocFieldLocation(uri, offset, length, Key()), currentIndexUnsafe().timestamp()); current_.put(key, field); @@ -211,6 +220,7 @@ void TocCatalogueWriter::reconsolidateIndexesAndTocs() { close(); // Add masking entries for all the indexes and subtocs visited so far + std::lock_guard lock(indexMutex_); Buffer buf(sizeof(TocRecord) * (subtocs.size() + maskable_indexes)); buf.zero(); @@ -238,10 +248,14 @@ void TocCatalogueWriter::reconsolidateIndexesAndTocs() { } const Index& TocCatalogueWriter::currentIndex() { + std::lock_guard lock(indexMutex_); + return currentIndexUnsafe(); +} +const Index& TocCatalogueWriter::currentIndexUnsafe() { if (current_.null()) { ASSERT(!currentIndexKey_.empty()); - selectIndex(currentIndexKey_); + selectIndexUnsafe(currentIndexKey_); } return current_; @@ -323,12 +337,13 @@ bool TocCatalogueWriter::enabled(const ControlIdentifier& controlIdentifier) con void TocCatalogueWriter::archive(const Key& idxKey, const Key& datumKey, std::shared_ptr fieldLocation) { + std::lock_guard lock(indexMutex_); archivedLocations_++; if (current_.null()) { ASSERT(!currentIndexKey_.empty()); - if (!selectIndex(currentIndexKey_)) { + if (!selectIndexUnsafe(currentIndexKey_)) { createIndex(currentIndexKey_, datumKey.size()); } } @@ -336,13 +351,13 @@ void TocCatalogueWriter::archive(const Key& idxKey, const Key& datumKey, // in case of async archival (out of order store/catalogue archival), currentIndexKey_ can differ from the // indexKey used for store archival. Reset it if (currentIndexKey_ != idxKey) { - if (!selectIndex(idxKey)) { + if (!selectIndexUnsafe(idxKey)) { createIndex(idxKey, datumKey.size()); } } } - Field field(std::move(fieldLocation), currentIndex().timestamp()); + Field field(std::move(fieldLocation), currentIndexUnsafe().timestamp()); current_.put(datumKey, field); diff --git a/src/fdb5/toc/TocCatalogueWriter.h b/src/fdb5/toc/TocCatalogueWriter.h index b1a4e68e7..b8e704903 100644 --- a/src/fdb5/toc/TocCatalogueWriter.h +++ b/src/fdb5/toc/TocCatalogueWriter.h @@ -16,12 +16,13 @@ #ifndef fdb5_TocCatalogueWriter_H #define fdb5_TocCatalogueWriter_H +#include + #include "eckit/os/AutoUmask.h" #include "fdb5/database/Index.h" -#include "fdb5/toc/TocRecord.h" - #include "fdb5/toc/TocCatalogue.h" +#include "fdb5/toc/TocRecord.h" #include "fdb5/toc/TocSerialisationVersion.h" namespace fdb5 { @@ -81,6 +82,9 @@ class TocCatalogueWriter : public TocCatalogue, public CatalogueWriter { private: // methods + bool selectIndexUnsafe(const Key& idxKey); + const Index& currentIndexUnsafe(); + void closeIndexes(); void flushIndexes(); void compactSubTocIndexes(); @@ -110,6 +114,8 @@ class TocCatalogueWriter : public TocCatalogue, public CatalogueWriter { eckit::AutoUmask umask_; size_t archivedLocations_; + + std::mutex indexMutex_; }; //---------------------------------------------------------------------------------------------------------------------- From 99cb2c06e0fa543119cefdb27d45652a11151cc1 Mon Sep 17 00:00:00 2001 From: Emanuele Danovaro Date: Tue, 9 Jun 2026 16:44:10 +0100 Subject: [PATCH 6/6] addressed PR comments on ReadLimiter --- src/fdb5/remote/client/ReadLimiter.cc | 46 +++++++++++++++------------ src/fdb5/remote/client/ReadLimiter.h | 14 ++++---- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/src/fdb5/remote/client/ReadLimiter.cc b/src/fdb5/remote/client/ReadLimiter.cc index 38217e092..4b9655635 100644 --- a/src/fdb5/remote/client/ReadLimiter.cc +++ b/src/fdb5/remote/client/ReadLimiter.cc @@ -22,14 +22,11 @@ std::mutex instanceMutex_; std::unique_ptr instance_{nullptr}; } // namespace -void ReadLimiter::init(size_t memoryLimit) { - std::lock_guard lock(instanceMutex_); - if (instance_ == nullptr) { - instance_.reset(new ReadLimiter(memoryLimit)); - } -} ReadLimiter& ReadLimiter::instance() { + // the instance cannot be a static ReadLimiter, which is causing the following error on exit, + // when the instance is destroyed and the mutex is destroyed before the instance: + // libc++abi: terminating due to uncaught exception of type std::__1::system_error: mutex lock failed: Invalid argument std::lock_guard lock(instanceMutex_); if (instance_ == nullptr) { instance_.reset(new ReadLimiter(defaultReadLimit())); @@ -42,6 +39,10 @@ size_t ReadLimiter::defaultReadLimit() { return limit; } +void ReadLimiter::setMemoryLimit(size_t memoryLimit) { + memoryLimit_ = memoryLimit; +} + ReadLimiter::ReadLimiter(size_t memoryLimit) : memoryUsed_{0}, memoryLimit_{memoryLimit} {} void ReadLimiter::add(RemoteStore* client, uint32_t id, const FieldLocation& fieldLocation, const Key& remapKey) { @@ -60,7 +61,7 @@ void ReadLimiter::add(RemoteStore* client, uint32_t id, const FieldLocation& fie } { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); requests_.emplace_back(RequestInfo{client, id, std::move(requestBuffer), requestSize, resultSize}); } @@ -68,7 +69,6 @@ void ReadLimiter::add(RemoteStore* client, uint32_t id, const FieldLocation& fie } bool ReadLimiter::tryNextRequest() { - std::lock_guard lock(mutex_); if (requests_.empty()) { return false; } @@ -91,7 +91,7 @@ bool ReadLimiter::tryNextRequest() { void ReadLimiter::finishRequest(uint32_t clientID, uint32_t requestID) { { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); auto it = activeRequests_.find(clientID); if (it == activeRequests_.end()) { @@ -113,36 +113,40 @@ void ReadLimiter::finishRequest(uint32_t clientID, uint32_t requestID) { void ReadLimiter::evictClient(size_t clientID) { std::lock_guard lock(instanceMutex_); if (instance_ != nullptr) { - auto& instance = *instance_; - std::lock_guard lock(instance.mutex_); + std::lock_guard lock(instance_->mutex_); // Remove the client's active requests - auto it = instance.activeRequests_.find(clientID); + auto it = instance_->activeRequests_.find(clientID); - if (it != instance.activeRequests_.end()) { + if (it != instance_->activeRequests_.end()) { for (auto requestID : it->second) { - instance.memoryUsed_ -= instance.resultSizes_[{clientID, requestID}]; - instance.resultSizes_.erase({clientID, requestID}); + instance_->memoryUsed_ -= instance_->resultSizes_[{clientID, requestID}]; + instance_->resultSizes_.erase({clientID, requestID}); } - instance.activeRequests_.erase(it); + instance_->activeRequests_.erase(it); } // Clean up any pending requests attributed to this client ///@note O(n), room for optimisation. - auto it2 = instance.requests_.begin(); - while (it2 != instance.requests_.end()) { + auto it2 = instance_->requests_.begin(); + while (it2 != instance_->requests_.end()) { if (it2->client->id() == clientID) { - it2 = instance.requests_.erase(it2); + it2 = instance_->requests_.erase(it2); } else { ++it2; // Only increment if we didn't erase } } - instance.tryNextRequest(); + instance_->tryNextRequest(); + + if (instance_->activeRequests_.empty() && instance_->requests_.empty()) { + // If there are no more active or pending requests, we can reset the instance to free memory. + instance_.reset(); + } } } void ReadLimiter::print(std::ostream& out) const { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex_); out << "ReadLimiter(memoryUsed=" << memoryUsed_ << ", memoryLimit=" << memoryLimit_ << ") {" << std::endl; diff --git a/src/fdb5/remote/client/ReadLimiter.h b/src/fdb5/remote/client/ReadLimiter.h index 724e4e649..d619abaf7 100644 --- a/src/fdb5/remote/client/ReadLimiter.h +++ b/src/fdb5/remote/client/ReadLimiter.h @@ -40,8 +40,8 @@ struct RequestInfo { class ReadLimiter { public: - static void init(size_t memoryLimit); static ReadLimiter& instance(); + void setMemoryLimit(size_t memoryLimit); ReadLimiter(const ReadLimiter&) = delete; ReadLimiter& operator=(const ReadLimiter&) = delete; @@ -52,10 +52,6 @@ class ReadLimiter { void add(RemoteStore* client, uint32_t id, const FieldLocation& fieldLocation, const Key& remapKey); // use const *? - // Attempt to send the next request in the queue. Returns true if a request was sent. - // If not enough memory is available, or there is no next request, returns false. - bool tryNextRequest(); - void finishRequest(uint32_t clientID, uint32_t requestID); // When a RemoteStore is destroyed, it must evict any unconsumed requests. @@ -74,15 +70,19 @@ class ReadLimiter { static size_t defaultReadLimit(); + // Attempt to send the next request in the queue. Returns true if a request was sent. + // If not enough memory is available, or there is no next request, returns false. + bool tryNextRequest(); + // Send the request to the server void sendRequest(const RequestInfo& request) const; private: - mutable std::recursive_mutex mutex_; + mutable std::mutex mutex_; size_t memoryUsed_; - const size_t memoryLimit_; + size_t memoryLimit_; // Enqueued requests std::deque requests_;