Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ project( fdb5 LANGUAGES C CXX )

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# set(CMAKE_COMPILE_WARNING_AS_ERROR ON)

# add_compile_options(-fsanitize=address)
# add_compile_options(-fsanitize=address,undefined -fno-omit-frame-pointer)
# add_link_options(-fsanitize=address)

# add_compile_options(-fsanitize=thread)
# add_compile_options(-fsanitize=thread,undefined -fno-omit-frame-pointer)
# add_link_options(-fsanitize=thread)

# set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wno-unused-parameter -Wno-unused-variable -Wno-sign-compare")
# set(CMAKE_CXX_FLAGS "-Wno-unused-parameter -Wno-unused-variable -Wno-reorder -Wno-sign-compare -Wvla-cxx-extension")

Expand Down
78 changes: 41 additions & 37 deletions src/fdb5/api/RemoteFDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ const net::Endpoint& RemoteFDB::storeEndpoint(const net::Endpoint& fieldLocation

RemoteFDB::RemoteFDB(const Configuration& config, const std::string& name) : LocalFDB(config, name), Client(config) {

Buffer buf = controlWriteReadResponse(remote::Message::Stores, generateRequestID());
Buffer buf = controlWriteReadResponse(Message::Stores, generateRequestID());
MemoryStream s(buf);
size_t numStores;
s >> numStores;
Expand Down Expand Up @@ -211,21 +211,18 @@ RemoteFDB::RemoteFDB(const Configuration& config, const std::string& name) : Loc
fieldLocationEndpoints.push_back("");
}

Buffer buf2 = controlWriteReadResponse(remote::Message::Schema, generateRequestID());
Buffer buf2 = controlWriteReadResponse(Message::Schema, generateRequestID());
MemoryStream s2(buf2);

Schema* schema = Reanimator<Schema>::reanimate(s2);

config_.set("stores", stores);
config_.set("fieldLocationEndpoints", fieldLocationEndpoints);
config_.overrideSchema(static_cast<std::string>(controlEndpoint()) + "/schema", schema);
}

/// @note: We must instantiate the ReadLimiter before any RemoteStores due to their static initialisation.
/// @todo: this may change in future.
static size_t memoryLimit =
Resource<size_t>("$FDB_READ_LIMIT;fdbReadLimit",
config_.userConfig().getUnsigned("limits.read", size_t(1) * 1024 * 1024 * 1024)); // 1GiB
ReadLimiter::init(memoryLimit);
RemoteFDB::~RemoteFDB() {
deregister();
}

// -----------------------------------------------------------------------------------------------------
Expand All @@ -250,14 +247,16 @@ auto RemoteFDB::forwardApiCall(const HelperClass& helper, const FDBToolRequest&

// Ensure we have an entry in the message queue before we trigger anything that
// will result in return messages

uint32_t id = generateRequestID();
auto entry = messageQueues_.emplace(id, std::make_shared<MessageQueue>(HelperClass::queueSize()));
ASSERT(entry.second);
std::shared_ptr<MessageQueue> messageQueue(entry.first->second);
std::shared_ptr<MessageQueue> messageQueue;
{
std::lock_guard<std::mutex> lock(messageMutex_);
auto entry = messageQueues_.emplace(id, std::make_shared<MessageQueue>(HelperClass::queueSize()));
ASSERT(entry.second);
messageQueue = entry.first->second;
}

// Encode the request and send it to the server

Buffer encodeBuffer(HelperClass::bufferSize());
MemoryStream s(encodeBuffer);
s << request;
Expand Down Expand Up @@ -315,64 +314,69 @@ const Configuration& RemoteFDB::clientConfig() const {
return config();
}

bool RemoteFDB::handle(remote::Message message, uint32_t requestID) {
bool RemoteFDB::handle(Message message, uint32_t requestID) {

switch (message) {
case Message::Complete: {

std::lock_guard<std::mutex> lock(messageMutex_);
auto it = messageQueues_.find(requestID);
if (it == messageQueues_.end()) {
return false;
}

it->second->close();
// Remove entry (shared_ptr --> message queue will be destroyed when it
// goes out of scope in the worker thread).
// Remove entry (shared_ptr --> message queue will be destroyed when it goes out of scope in the worker
// thread).
messageQueues_.erase(it);
return true;
}
case Message::Error: {

std::ostringstream ss;
ss << "RemoteFDB - client id: " << clientId()
<< " - received an error without error description for requestID " << requestID << std::endl;
throw RemoteFDBException(ss.str(), controlEndpoint());

return false;
std::lock_guard<std::mutex> lock(messageMutex_);
// Received Error message without error description. Remove the corresponding entry from the message queue
// and let the caller know & complain
auto it = messageQueues_.find(requestID);
if (it != messageQueues_.end()) {
it->second->interrupt(
std::make_exception_ptr(RemoteFDBException("no error description provided", controlEndpoint())));
// Remove entry (shared_ptr --> message queue will be destroyed when it goes out of scope in the worker
// thread).
messageQueues_.erase(it);
}
return true;
}
Comment on lines +333 to 346
default:
Log::error() << *this << " - Received unexpected [message=" << message << ",requestID=" << requestID << "]"
<< std::endl;
return false;
}
}
bool RemoteFDB::handle(remote::Message message, uint32_t requestID, Buffer&& payload) {
bool RemoteFDB::handle(Message message, uint32_t requestID, Buffer&& payload) {

switch (message) {
case Message::Blob: {
std::lock_guard<std::mutex> lock(messageMutex_);
auto it = messageQueues_.find(requestID);
if (it == messageQueues_.end()) {
return false;
}

it->second->emplace(std::move(payload));
return true;
}

case Message::Error: {

std::lock_guard<std::mutex> lock(messageMutex_);
auto it = messageQueues_.find(requestID);
if (it == messageQueues_.end()) {
return false;
if (it != messageQueues_.end()) {
std::string errmsg{static_cast<const char*>(payload.data()), payload.size()};
it->second->interrupt(std::make_exception_ptr(RemoteFDBException(errmsg, controlEndpoint())));
// Remove entry (shared_ptr --> message queue will be destroyed when it goes out of scope in the worker
// thread).
messageQueues_.erase(it);
}
std::string msg;
msg.resize(payload.size(), ' ');
payload.copy(&msg[0], payload.size());
it->second->interrupt(std::make_exception_ptr(RemoteFDBException(msg, controlEndpoint())));
// Remove entry (shared_ptr --> message queue will be destroyed when it
// goes out of scope in the worker thread).
messageQueues_.erase(it);
return true;
}
Comment on lines +365 to 376
default:
Log::warning() << *this << " - Received unexpected [message=" << message << ",requestID=" << requestID
<< ",payloadSize=" << payload.size() << "]" << std::endl;
return false;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/fdb5/api/RemoteFDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class RemoteFDB : public LocalFDB, public Client {
public: // method

RemoteFDB(const eckit::Configuration& config, const std::string& name);
~RemoteFDB() override {}
~RemoteFDB() override;

ListIterator inspect(const metkit::mars::MarsRequest& request) override;

Expand Down Expand Up @@ -96,6 +96,7 @@ class RemoteFDB : public LocalFDB, public Client {
// The shared_ptr allows this removal to be asynchronous with the actual task
// cleaning up and returning to the client.
std::unordered_map<uint32_t, std::shared_ptr<MessageQueue>> messageQueues_;
std::mutex messageMutex_;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
22 changes: 20 additions & 2 deletions src/fdb5/remote/Connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace fdb5::remote {
Connection::Connection() : single_(false) {}

void Connection::teardown() {
closingSocket_ = true;
closingSocket_.store(true);

if (!valid()) {
return;
Expand Down Expand Up @@ -91,11 +91,29 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const {
&& readUnsafe(socket, &tail, sizeof(tail))) {

ASSERT(tail == MessageHeader::EndMarker);

if (hdr.message == Message::Exit) {
closingSocket_.store(true);
}
if (hdr.message == Message::Error) {
eckit::net::Endpoint remoteEndpoint{socket.remoteHost(), socket.remotePort()};
std::ostringstream ss;
if (payload.size() == 0) {
ss << "Received an error without error description for clientID " << hdr.clientID() << " requestID "
<< hdr.requestID << std::endl;
throw RemoteFDBException(ss.str(), remoteEndpoint);
}
std::string errmsg{static_cast<const char*>(payload.data()), payload.size()};
ss << "Received error message: \"" << errmsg << "\" from " << remoteEndpoint << " for clientID "
<< hdr.clientID() << " requestID " << hdr.requestID << std::endl;
eckit::Log::warning() << ss.str();
}
return payload;
}
}

hdr.message = Message::Exit;
closingSocket_.store(true);
return eckit::Buffer{0};
}

Expand Down Expand Up @@ -130,7 +148,7 @@ void Connection::write(const Message msg, const bool control, const uint32_t cli

void Connection::error(std::string_view msg, uint32_t clientID, uint32_t requestID) const {
eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl;
write(Message::Error, false, clientID, requestID, msg.data(), msg.length());
write(Message::Error, true, clientID, requestID, msg.data(), msg.length());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flip makes me nervous, and such that I would want to have a proper design discussion about it. Why are we changing whether (all) error messages are being sent on the control vs. data connection. That feels like a big change?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially the control connection was blocking (no listener thread) and thus I had to use the data connection for the errors.
With the current implementation, we have the freedom to chose and the data connection could be busier with the large data transfer, so I considered this a potential improvement to the error latency.

}

eckit::Buffer Connection::readControl(MessageHeader& hdr) const {
Expand Down
12 changes: 11 additions & 1 deletion src/fdb5/remote/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <vector>

#include "eckit/exception/Exceptions.h"
#include "eckit/net/Endpoint.h"
#include "eckit/net/TCPSocket.h"
#include "eckit/os/BackTrace.h"
#include "eckit/serialisation/MemoryStream.h"
Expand Down Expand Up @@ -50,6 +51,15 @@ class TCPException : public eckit::Exception {

//----------------------------------------------------------------------------------------------------------------------

class RemoteFDBException : public eckit::RemoteException {
public:

RemoteFDBException(const std::string& msg, const eckit::net::Endpoint& endpoint) :
eckit::RemoteException(msg, endpoint) {}
};

//----------------------------------------------------------------------------------------------------------------------

class Connection {

public: // types
Expand Down Expand Up @@ -106,7 +116,7 @@ class Connection {

private: // members

bool closingSocket_ = false;
mutable std::atomic<bool> closingSocket_{false};

mutable std::mutex controlMutex_;
mutable std::mutex dataMutex_;
Expand Down
34 changes: 28 additions & 6 deletions src/fdb5/remote/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Client::Client(const eckit::Configuration& config,
}

void Client::refreshConnection() {
if (connection_->valid()) {
if (connection_->valid()) { // Connection is still valid, no need to refresh
return;
}
eckit::Log::warning() << "Connection to " << connection_->controlEndpoint()
Expand All @@ -69,8 +69,14 @@ void Client::refreshConnection() {
connection_->add(*this);
}

void Client::deregister() {
if (!deregistered_.exchange(true)) {
connection_->remove(id_);
}
}

Client::~Client() {
connection_->remove(id_);
deregister();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any exceptions that might be thrown during this deregister()? Is that safe, given we are in a destructor?

}

void Client::controlWriteCheckResponse(const Message msg, const uint32_t requestID, const bool dataListener,
Expand All @@ -86,8 +92,16 @@ void Client::controlWriteCheckResponse(const Message msg, const uint32_t request
}

auto f = connection_->controlWrite(*this, msg, requestID, dataListener, payloads);
f.wait();
ASSERT(f.get().size() == 0);
try {
f.wait();
ASSERT(f.get().size() == 0);
}
catch (const std::exception& e) {
std::ostringstream ss;
ss << "Error while waiting for response to control message " << msg << " with requestID " << requestID << ": "
<< e.what();
throw RemoteFDBException(ss.str(), connection_->controlEndpoint());
}
}

eckit::Buffer Client::controlWriteReadResponse(const Message msg, const uint32_t requestID, const void* const payload,
Expand All @@ -103,8 +117,16 @@ eckit::Buffer Client::controlWriteReadResponse(const Message msg, const uint32_t
}

auto f = connection_->controlWrite(*this, msg, requestID, false, payloads);
f.wait();
return eckit::Buffer{f.get()};
try {
f.wait();
return eckit::Buffer{f.get()};
}
catch (const std::exception& e) {
std::ostringstream ss;
ss << "Error while waiting for response to control message " << msg << " with requestID " << requestID << ": "
<< e.what();
throw RemoteFDBException(ss.str(), connection_->controlEndpoint());
}
}

void Client::dataWrite(Message msg, uint32_t requestID, PayloadList payloads) {
Expand Down
16 changes: 7 additions & 9 deletions src/fdb5/remote/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "fdb5/remote/Messages.h"
#include "fdb5/remote/client/ClientConnection.h"

#include <atomic>
#include <mutex>
#include <utility> // std::pair
#include <vector>
Expand All @@ -27,15 +28,6 @@ namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

class RemoteFDBException : public eckit::RemoteException {
public:

RemoteFDBException(const std::string& msg, const eckit::net::Endpoint& endpoint) :
eckit::RemoteException(msg, endpoint) {}
};

//----------------------------------------------------------------------------------------------------------------------

class Client {
public: // types

Expand Down Expand Up @@ -91,6 +83,11 @@ class Client {

protected:

/// Deregister this client from its connection. Idempotent.
/// Derived classes with state accessed by handle() should call this
/// in their destructor, before that state is destroyed.
void deregister();

std::shared_ptr<ClientConnection> connection_;

private:
Expand All @@ -100,6 +97,7 @@ class Client {
private:

uint32_t id_;
std::atomic<bool> deregistered_{false};
mutable std::mutex blockingRequestMutex_;
};

Expand Down
Loading
Loading