Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ project( fdb5 LANGUAGES C CXX )

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# set(CMAKE_COMPILE_WARNING_AS_ERROR ON)

# add_compile_options(-fsanitize=address)
# add_compile_options(-fsanitize=address,undefined -fno-omit-frame-pointer)
# add_link_options(-fsanitize=address)

# add_compile_options(-fsanitize=thread)
# add_compile_options(-fsanitize=thread,undefined -fno-omit-frame-pointer)
# add_link_options(-fsanitize=thread)

# set(CMAKE_CXX_FLAGS "-Wall -Wextra -Wno-unused-parameter -Wno-unused-variable -Wno-sign-compare")
# set(CMAKE_CXX_FLAGS "-Wno-unused-parameter -Wno-unused-variable -Wno-reorder -Wno-sign-compare -Wvla-cxx-extension")

Expand Down
2 changes: 1 addition & 1 deletion src/chunked_data_view/ListIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ std::optional<std::tuple<fdb5::Key, std::unique_ptr<eckit::DataHandle>>> ListIte
auto has_next = listIterator_.next(elem);

if (has_next) {
return std::make_tuple(elem.combinedKey(), std::unique_ptr<eckit::DataHandle>(elem.location().dataHandle()));
return std::make_tuple(elem.combinedKey(), std::unique_ptr<eckit::DataHandle>(elem.location().dataHandle("")));
}

return std::nullopt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ListIteratorWrapperImpl : public ListIteratorInterface {
public:

explicit ListIteratorWrapperImpl(fdb5::ListIterator listIterator) : listIterator_(std::move(listIterator)) {};

std::optional<std::tuple<fdb5::Key, std::unique_ptr<eckit::DataHandle>>> next() override;
};

Expand Down
106 changes: 59 additions & 47 deletions src/fdb5/api/FDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <string>
#include <vector>

#include <uuid/uuid.h>

#include "eckit/config/Resource.h"
#include "eckit/exception/Exceptions.h"
#include "eckit/io/DataHandle.h"
Expand All @@ -39,7 +41,6 @@
#include "fdb5/database/FieldLocation.h"
#include "fdb5/database/Key.h"
#include "fdb5/database/WipeCoordinator.h"
#include "fdb5/database/WipeState.h"
#include "fdb5/io/FieldHandle.h"
#include "fdb5/io/HandleGatherer.h"
#include "fdb5/message/MessageDecoder.h"
Expand All @@ -66,24 +67,24 @@ FDB::FDB(FDB&&) = default;

FDB& FDB::operator=(FDB&&) = default;

void FDB::archive(eckit::message::Message msg) {
void FDB::archive(eckit::message::Message msg, const std::string& tracingID) {
fdb5::Key key = MessageDecoder::messageToKey(msg);
archive(key, msg.data(), msg.length());
archive(key, msg.data(), msg.length(), tracingID);
}
void FDB::archive(eckit::DataHandle& handle) {
void FDB::archive(eckit::DataHandle& handle, const std::string& tracingID) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A passing comment: I would use class TracingContext or something similar rather than just a std::string. It seems plausible that this should be something that we might want to be extendable / structured.

eckit::message::Message msg;
eckit::message::Reader reader(handle);

while ((msg = reader.next())) {
archive(msg);
archive(msg, tracingID);
}
}
void FDB::archive(const void* data, size_t length) {
void FDB::archive(const void* data, size_t length, const std::string& tracingID) {
eckit::MemoryHandle handle(data, length);
archive(handle);
}

void FDB::archive(const metkit::mars::MarsRequest& request, eckit::DataHandle& handle) {
void FDB::archive(const metkit::mars::MarsRequest& request, eckit::DataHandle& handle, const std::string& tracingID) {
eckit::message::Message msg;
eckit::message::Reader reader(handle);

Expand All @@ -99,7 +100,7 @@ void FDB::archive(const metkit::mars::MarsRequest& request, eckit::DataHandle& h
LOG_DEBUG_LIB(LibFdb5) << ss.str();
throw eckit::UserError(ss.str(), Here());
}
archive(key, msg.data(), msg.length());
archive(key, msg.data(), msg.length(), tracingID);
}
if (cube.countVacant()) {
std::ostringstream ss;
Expand All @@ -114,7 +115,7 @@ void FDB::archive(const metkit::mars::MarsRequest& request, eckit::DataHandle& h
}
}

void FDB::archive(const Key& key, const void* data, size_t length) {
void FDB::archive(const Key& key, const void* data, size_t length, const std::string& tracingID) {
eckit::Timer timer;
timer.start();

Expand All @@ -134,15 +135,15 @@ void FDB::archive(const Key& key, const void* data, size_t length) {
keyInternal.unset("stepunits");
}

internal_->archive(keyInternal, data, length);
internal_->archive(keyInternal, data, length, tracingID);
dirty_ = true;

timer.stop();
stats_.addArchive(length, timer);
}

void FDB::reindex(const Key& key, const FieldLocation& location) {
internal_->reindex(key, location);
void FDB::reindex(const Key& key, const FieldLocation& location, const std::string& tracingID) {
internal_->reindex(key, location, tracingID);
dirty_ = true;
}

Expand All @@ -162,23 +163,23 @@ bool FDB::sorted(const metkit::mars::MarsRequest& request) {
return sorted;
}

eckit::DataHandle* FDB::read(const eckit::URI& uri) {
eckit::DataHandle* FDB::read(const eckit::URI& uri, const std::string& tracingID) {
auto location = std::unique_ptr<FieldLocation>(FieldLocationFactory::instance().build(uri.scheme(), uri));
return location->dataHandle();
return location->dataHandle(tracingID);
}

eckit::DataHandle* FDB::read(const std::vector<eckit::URI>& uris, bool sorted) {
eckit::DataHandle* FDB::read(const std::vector<eckit::URI>& uris, const std::string& tracingID, bool sorted) {
HandleGatherer result(sorted);

for (const eckit::URI& uri : uris) {
auto location = std::unique_ptr<FieldLocation>(FieldLocationFactory::instance().build(uri.scheme(), uri));
result.add(location->dataHandle());
result.add(location->dataHandle(tracingID));
}

return result.dataHandle();
}

eckit::DataHandle* FDB::read(ListIterator& it, bool sorted) {
eckit::DataHandle* FDB::read(ListIterator& it, const std::string& tracingID, bool sorted) {
eckit::Timer timer;
timer.start();

Expand Down Expand Up @@ -216,53 +217,55 @@ eckit::DataHandle* FDB::read(ListIterator& it, bool sorted) {
for (std::size_t i = 0; i < cube.size(); i++) {
ListElement element;
if (cube.find(i, element)) {
result.add(element.location().dataHandle());
result.add(element.location().dataHandle(tracingID));
}
}
}
}
else {
while (it.next(el)) {
result.add(el.location().dataHandle());
result.add(el.location().dataHandle(tracingID));
}
}
return result.dataHandle();
}

eckit::DataHandle* FDB::retrieve(const metkit::mars::MarsRequest& request) {
eckit::DataHandle* FDB::retrieve(const metkit::mars::MarsRequest& request, const std::string& tracingID) {
static bool seekable = eckit::Resource<bool>("fdbSeekableDataHandle;$FDB_SEEKABLE_DATA_HANDLE", false);

ListIterator it = inspect(request);
return seekable ? new FieldHandle(it) : read(it, sorted(request));
ListIterator it = inspect(request, tracingID);
return seekable ? new FieldHandle(it, tracingID) : read(it, sorted(request), tracingID);
}

ListIterator FDB::inspect(const metkit::mars::MarsRequest& request) {
return internal_->inspect(request);
ListIterator FDB::inspect(const metkit::mars::MarsRequest& request, const std::string& tracingID) {
return internal_->inspect(request, tracingID);
}

ListIterator FDB::list(const FDBToolRequest& request, const ListMode mode, const int level) {
return {internal_->list(request, level), mode};
ListIterator FDB::list(const FDBToolRequest& request, const ListMode mode, const std::string& tracingID,
const int level) {
return {internal_->list(request, tracingID, level), mode};
}

ListIterator FDB::list(const FDBToolRequest& request, const bool deduplicate, const int level) {
return list(request, deduplicate ? ListMode::Deduplicate : ListMode::Full, level);
return list(request, deduplicate ? ListMode::Deduplicate : ListMode::Full, generateTracingID("list"), level);
}

DumpIterator FDB::dump(const FDBToolRequest& request, bool simple) {
return internal_->dump(request, simple);
return internal_->dump(request, generateTracingID(), simple);
}

StatusIterator FDB::status(const FDBToolRequest& request) {
return internal_->status(request);
StatusIterator FDB::status(const FDBToolRequest& request, const std::string& tracingID) {
return internal_->status(request, tracingID);
}

WipeIterator FDB::wipe(const FDBToolRequest& request, bool doit, bool porcelain, bool unsafeWipeAll) {
WipeIterator FDB::wipe(const FDBToolRequest& request, const std::string& tracingID, bool doit, bool porcelain,
bool unsafeWipeAll) {

auto internal = internal_->shared();

auto async = [internal, request, doit, porcelain, unsafeWipeAll](eckit::Queue<WipeElement>& queue) {
auto async = [internal, request, tracingID, doit, porcelain, unsafeWipeAll](eckit::Queue<WipeElement>& queue) {
// Visit the catalogues to determine what they would wipe
WipeStateIterator it = internal->wipe(request, doit, porcelain, unsafeWipeAll);
WipeStateIterator it = internal->wipe(request, tracingID, doit, porcelain, unsafeWipeAll);

// Coordinate the wipe across catalogues and stores
WipeCoordinator coordinator{internal->config()};
Expand All @@ -279,20 +282,21 @@ WipeIterator FDB::wipe(const FDBToolRequest& request, bool doit, bool porcelain,
return WipeIterator(new APIAsyncIterator<WipeElement>(internal_->shared(), async));
}

PurgeIterator FDB::purge(const FDBToolRequest& request, bool doit, bool porcelain) {
return internal_->purge(request, doit, porcelain);
PurgeIterator FDB::purge(const FDBToolRequest& request, const std::string& tracingID, bool doit, bool porcelain) {
return internal_->purge(request, tracingID, doit, porcelain);
}

StatsIterator FDB::stats(const FDBToolRequest& request) {
return internal_->stats(request);
StatsIterator FDB::stats(const FDBToolRequest& request, const std::string& tracingID) {
return internal_->stats(request, tracingID);
}

ControlIterator FDB::control(const FDBToolRequest& request, ControlAction action, ControlIdentifiers identifiers) {
return internal_->control(request, action, identifiers);
ControlIterator FDB::control(const FDBToolRequest& request, ControlAction action, ControlIdentifiers identifiers,
const std::string& tracingID) {
return internal_->control(request, tracingID, action, identifiers);
}

MoveIterator FDB::move(const FDBToolRequest& request, const eckit::URI& dest) {
return internal_->move(request, dest);
MoveIterator FDB::move(const FDBToolRequest& request, const eckit::URI& dest, const std::string& tracingID) {
return internal_->move(request, tracingID, dest);
}

FDBStats FDB::stats() const {
Expand All @@ -311,31 +315,31 @@ void FDB::print(std::ostream& s) const {
s << *internal_;
}

void FDB::flush() {
void FDB::flush(const std::string& tracingID) {
if (dirty_) {
eckit::Timer timer;
timer.start();

internal_->flush();
internal_->flush(tracingID);
dirty_ = false;

timer.stop();
stats_.addFlush(timer);
}
}

IndexAxis FDB::axes(const FDBToolRequest& request, int level) {
IndexAxis FDB::axes(const FDBToolRequest& request, const std::string& tracingID, int level) {
IndexAxis axes;
AxesElement elem;
auto it = axesIterator(request, level);
auto it = axesIterator(request, tracingID, level);
while (it.next(elem)) {
axes.merge(elem.axes());
}
return axes;
}

AxesIterator FDB::axesIterator(const FDBToolRequest& request, int level) {
return internal_->axesIterator(request, level);
AxesIterator FDB::axesIterator(const FDBToolRequest& request, const std::string& tracingID, int level) {
return internal_->axesIterator(request, tracingID, level);
}

bool FDB::dirty() const {
Expand All @@ -354,6 +358,14 @@ void FDB::registerFlushCallback(FlushCallback callback) {
internal_->registerFlushCallback(callback);
}

std::string FDB::generateTracingID(const std::string& prefix) {
uuid_t uuid;
uuid_generate(uuid);
char uuidStr[37];
uuid_unparse(uuid, uuidStr);
return (prefix.empty() ? "" : prefix + "-") + std::string(uuidStr);
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace fdb5
Loading