Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions phlex/app/load_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ namespace phlex::experimental {
// internal reference counting in classification.hpp.
// NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks,clang-analyzer-cplusplus.NewDelete)
create_driver = plugin_loader<detail::driver_shim_t>(spec, "create_driver");
driver_proxy const proxy{};
driver_bundle result;
create_driver(proxy, config, &result);
create_driver(driver_proxy{}, config, &result);
return result;
}
}
2 changes: 2 additions & 0 deletions phlex/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ cet_make_library(
provider_node.cpp
registrar.cpp
registration_api.cpp
source.cpp
store_counters.cpp
LIBRARIES
PUBLIC
Expand Down Expand Up @@ -66,6 +67,7 @@ install(
provider_node.hpp
registrar.hpp
registration_api.hpp
source.hpp
store_counters.hpp
upstream_predicates.hpp
DESTINATION include/phlex/core
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/framework_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ namespace phlex::experimental {
make_bookkeeping_edges();

auto [provider_input_ports, multilayer_join_index_ports] =
make_computational_edges(nodes_, filters_);
make_computational_edges(nodes_, filters_, graph_);

if (provider_input_ports.empty()) {
assert(multilayer_join_index_ports.empty());
Expand Down
13 changes: 10 additions & 3 deletions phlex/core/framework_graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

#include "phlex/phlex_core_export.hpp"

#include "phlex/core/declared_fold.hpp"
#include "phlex/core/declared_unfold.hpp"
#include "phlex/core/filter.hpp"
#include "phlex/core/glue.hpp"
#include "phlex/core/index_router.hpp"
Expand All @@ -13,6 +11,7 @@
#include "phlex/driver.hpp"
#include "phlex/model/data_cell_tracker.hpp"
#include "phlex/model/data_layer_hierarchy.hpp"
#include "phlex/model/fixed_hierarchy.hpp"
#include "phlex/model/flush_messages.hpp"
#include "phlex/model/product_store.hpp"
#include "phlex/module.hpp"
Expand All @@ -23,8 +22,10 @@
#include "oneapi/tbb/flow_graph.h"
#include "oneapi/tbb/info.h"

#include <concepts>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <tuple>
#include <utility>
Expand Down Expand Up @@ -58,7 +59,7 @@ namespace phlex::experimental {
return {config, graph_, nodes_, registration_errors_};
}

source_graph_proxy<void_tag> source_proxy(configuration const& config)
source_bundle source_proxy(configuration const& config)
{
return {config, graph_, nodes_, registration_errors_};
}
Expand Down Expand Up @@ -113,6 +114,12 @@ namespace phlex::experimental {
return make_glue().provide(std::move(name), std::move(f), c);
}

template <std::derived_from<source> Source, typename... Args>
void source(std::string name, Args&&... args)
{
return make_glue().template source<Source>(std::move(name), std::forward<Args>(args)...);
}

template <typename T, typename... Args>
glue<T> make(Args&&... args)
{
Expand Down
11 changes: 11 additions & 0 deletions phlex/core/glue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "phlex/core/concepts.hpp"
#include "phlex/core/registrar.hpp"
#include "phlex/core/registration_api.hpp"
#include "phlex/core/source.hpp"
#include "phlex/metaprogramming/delegate.hpp"

#include "oneapi/tbb/flow_graph.h"
Expand Down Expand Up @@ -150,6 +151,16 @@ namespace phlex::experimental {
c};
}

template <std::derived_from<source> Source, typename... Args>
void source(std::string name, Args&&... args)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this creates a new thing in the graph on which it is called, consider renaming to add_source.
This probably should be done for the other node types as well, so an issue that addresses this should be created.

{
auto [_, inserted] =
nodes_.sources.try_emplace(name, std::make_unique<Source>(std::forward<Args>(args)...));
if (not inserted) {
detail::add_to_error_messages(errors_, name); // From registrar.hpp
}
}

private:
// Non-owning references to framework-owned resources; glue<T> is a short-lived builder.
tbb::flow::graph& graph_; // NOLINT(cppcoreguidelines-avoid-const-or-ref-data-members)
Expand Down
11 changes: 11 additions & 0 deletions phlex/core/graph_proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ namespace phlex::experimental {
std::move(name), std::move(pred), std::move(unf), c, std::move(destination_data_layer));
}

/// @brief Registers a source (used by the framework to create provider nodes)
template <std::derived_from<source> Source, typename... Args>
void source(std::string name, Args&&... args)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment as put in graph::source applies here: consider renaming to add_source.

requires(not is_bound_object<T>)
{
// The bound object is created when invoking source<Source>(...), so we explicitly indicate that
// no bound object should be used in the create_glue(...) call.
return create_glue(false).template source<Source>(std::move(name),
std::forward<Args>(args)...);
}

/// @brief Registers an output node.
auto output(std::string name, is_output_like auto f, concurrency c = concurrency::serial)
{
Expand Down
94 changes: 85 additions & 9 deletions phlex/core/make_computational_edges.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "phlex/core/make_computational_edges.hpp"

#include "fmt/format.h"
#include "oneapi/tbb/flow_graph.h"
#include "spdlog/spdlog.h"

#include <algorithm>
#include <cassert>
#include <ranges>
#include <span>
#include <stdexcept>

using namespace std::string_literals;

Expand All @@ -26,14 +28,24 @@ namespace phlex::experimental {
return nullptr;
}

index_router::provider_input_ports_t edges_from_explicit_providers(
index_router::head_ports_t head_ports, provider_nodes& explicit_providers)
provider_bundles find_matching_implicit_providers(source_map const& sources,
product_selector const& input_product)
{
assert(!head_ports.empty());
provider_bundles result;
for (auto const& src : sources | std::views::values) {
result.append_range(src->create_providers(input_product));
}
return result;
}

// FIXME: Should return a list of head ports that cannot be matched to an explicit provider.
std::pair<index_router::provider_input_ports_t, index_router::head_ports_t>
edges_from_explicit_providers(index_router::head_ports_t head_ports,
provider_nodes& explicit_providers)
{
assert(!head_ports.empty());

index_router::provider_input_ports_t result;
index_router::head_ports_t unconsumed_head_ports;
for (auto const& [node_name, ports] : head_ports) {
for (auto const& [input_product, port] : ports) {
// Find the provider that has the right product name (hidden in the
Expand All @@ -47,12 +59,55 @@ namespace phlex::experimental {
input_product.to_string());
make_edge(matched_provider->output_port(), *port);
} else {
throw std::runtime_error("No provider found for product: "s +
input_product.to_string());
unconsumed_head_ports[node_name].push_back({input_product, port});
}
}
}
return result;
return {std::move(result), std::move(unconsumed_head_ports)};
}

std::pair<index_router::provider_input_ports_t, index_router::head_ports_t>
edges_from_implicit_providers(index_router::head_ports_t head_ports,
provider_nodes& providers,
source_map const& sources,
tbb::flow::graph& g)
{
index_router::provider_input_ports_t result;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming result to resulting_input_ports or the like.

index_router::head_ports_t unconsumed_head_ports;
for (auto const& [node_name, ports] : head_ports) {
for (auto const& [input_product, port] : ports) {
// If we have a source node that can produce this product, use it.
auto bundles = find_matching_implicit_providers(sources, input_product);
if (bundles.empty()) {
unconsumed_head_ports[node_name].push_back({input_product, port});
continue;
}

// For now we require only one implicit provider. This will change in the future.
if (bundles.size() > 1ull) {
auto error_msg = fmt::format(
"Multiple implicit providers found for product '{}', required by node '{}':\n",
input_product.to_string(),
node_name);
throw std::runtime_error(error_msg);
}

auto& bundle = bundles[0];
auto const& spec = bundle.specification();
auto node = std::make_unique<provider_node>(spec.creator(),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider filing an issue that makes the argument list of the constructor of provider_node shorter by passing in the provider_bundle rather than passing in the innards of the provider_bundle.

bundle.get_concurrency().value,
g,
bundle.release_provider_function(),
spec,
bundle.layer(),
bundle.stage());
auto const provider_name = node->name().to_string();
result.try_emplace(provider_name, input_product, node->input_port());
make_edge(node->output_port(), *port);
providers.try_emplace(provider_name, std::move(node));
}
}
return {std::move(result), std::move(unconsumed_head_ports)};
}

index_router::head_ports_t edges_within_computational_graph(
Expand Down Expand Up @@ -111,7 +166,9 @@ namespace phlex::experimental {
}

std::tuple<index_router::provider_input_ports_t, std::map<std::string, named_index_ports>>
make_computational_edges(node_catalog& nodes, std::map<std::string, filter>& filters)
make_computational_edges(node_catalog& nodes,
std::map<std::string, filter>& filters,
tbb::flow::graph& g)
{
auto const producers = nodes.producers();
auto const consumers = nodes.consumers();
Expand All @@ -124,8 +181,27 @@ namespace phlex::experimental {

edges_to_outputs(nodes.providers, producers, nodes.outputs);

auto provider_input_ports =
auto [explicit_provider_input_ports, unconsumed_head_ports] =
edges_from_explicit_providers(std::move(head_ports), nodes.providers);

auto [implicit_provider_input_ports, unmatched_head_ports] = edges_from_implicit_providers(
std::move(unconsumed_head_ports), nodes.providers, nodes.sources, g);

if (not unmatched_head_ports.empty()) {
std::string error_msg{"No provider found for the following required products:\n"};
for (auto const& [node_name, ports] : unmatched_head_ports) {
for (auto const& [input_product, _] : ports) {
error_msg += fmt::format(
" - Node '{}' requires product '{}'\n", node_name, input_product.to_string());
}
}
throw std::runtime_error(error_msg);
}

// Combine implicit and explicit provider input ports.
auto provider_input_ports = std::move(explicit_provider_input_ports);
provider_input_ports.merge(std::move(implicit_provider_input_ports));

auto multilayer_join_index_ports = multilayer_ports(consumers);

return std::make_tuple(std::move(provider_input_ports), std::move(multilayer_join_index_ports));
Expand Down
6 changes: 5 additions & 1 deletion phlex/core/make_computational_edges.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "phlex/core/index_router.hpp"
#include "phlex/core/node_catalog.hpp"

#include "oneapi/tbb/flow_graph.h"

#include <map>
#include <string>
#include <tuple>
Expand All @@ -28,7 +30,9 @@ namespace phlex::experimental {

PHLEX_CORE_EXPORT
std::tuple<index_router::provider_input_ports_t, std::map<std::string, named_index_ports>>
make_computational_edges(node_catalog& nodes, std::map<std::string, filter>& filters);
make_computational_edges(node_catalog& nodes,
std::map<std::string, filter>& filters,
tbb::flow::graph& g);

}

Expand Down
2 changes: 2 additions & 0 deletions phlex/core/node_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "phlex/core/products_consumer.hpp"
#include "phlex/core/provider_node.hpp"
#include "phlex/core/registrar.hpp"
#include "phlex/core/source.hpp"
#include "phlex/utilities/simple_ptr_map.hpp"

#include "boost/pfr.hpp"
Expand All @@ -39,6 +40,7 @@ namespace phlex::experimental {
simple_ptr_map<declared_unfold_ptr> unfolds{};
simple_ptr_map<declared_transform_ptr> transforms{};
simple_ptr_map<provider_node_ptr> providers{};
simple_ptr_map<source_ptr> sources{};
};
}

Expand Down
28 changes: 28 additions & 0 deletions phlex/core/source.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "phlex/core/source.hpp"

#include <utility>

namespace phlex::experimental {
provider_bundle::provider_bundle(provider_function_t f,
concurrency c,
product_specification spec,
std::string layer,
std::string stage) :
provider_function_{std::move(f)},
concurrency_{c},
spec_{std::move(spec)},
layer_{std::move(layer)},
stage_{std::move(stage)}
{
}

std::function<provider_function_t> provider_bundle::release_provider_function()
{
return std::move(provider_function_);
}

product_specification const& provider_bundle::specification() const noexcept { return spec_; }
identifier const& provider_bundle::layer() const noexcept { return layer_; }
identifier const& provider_bundle::stage() const noexcept { return stage_; }
concurrency provider_bundle::get_concurrency() const noexcept { return concurrency_; }
}
Loading
Loading