Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions phlex/app/load_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ namespace phlex::experimental {
// internal reference counting in classification.hpp.
// NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks,clang-analyzer-cplusplus.NewDelete)
create_driver = plugin_loader<detail::driver_shim_t>(spec, "create_driver");
driver_proxy const proxy{};
driver_bundle result;
create_driver(proxy, config, &result);
create_driver(driver_proxy{}, config, &result);
return result;
}
}
1 change: 1 addition & 0 deletions phlex/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ install(
provider_node.hpp
registrar.hpp
registration_api.hpp
source.hpp
store_counters.hpp
upstream_predicates.hpp
DESTINATION include/phlex/core
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/framework_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ namespace phlex::experimental {
make_bookkeeping_edges();

auto [provider_input_ports, multilayer_join_index_ports] =
make_computational_edges(nodes_, filters_);
make_computational_edges(nodes_, filters_, graph_);

if (provider_input_ports.empty()) {
assert(multilayer_join_index_ports.empty());
Expand Down
13 changes: 10 additions & 3 deletions phlex/core/framework_graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

#include "phlex/phlex_core_export.hpp"

#include "phlex/core/declared_fold.hpp"
#include "phlex/core/declared_unfold.hpp"
#include "phlex/core/filter.hpp"
#include "phlex/core/glue.hpp"
#include "phlex/core/index_router.hpp"
Expand All @@ -13,6 +11,7 @@
#include "phlex/driver.hpp"
#include "phlex/model/data_cell_tracker.hpp"
#include "phlex/model/data_layer_hierarchy.hpp"
#include "phlex/model/fixed_hierarchy.hpp"
#include "phlex/model/flush_messages.hpp"
#include "phlex/model/product_store.hpp"
#include "phlex/module.hpp"
Expand All @@ -23,8 +22,10 @@
#include "oneapi/tbb/flow_graph.h"
#include "oneapi/tbb/info.h"

#include <concepts>
#include <functional>
#include <map>
#include <memory>
#include <string>
#include <tuple>
#include <utility>
Expand Down Expand Up @@ -58,7 +59,7 @@ namespace phlex::experimental {
return {config, graph_, nodes_, registration_errors_};
}

source_graph_proxy<void_tag> source_proxy(configuration const& config)
source_bundle source_proxy(configuration const& config)
{
return {config, graph_, nodes_, registration_errors_};
}
Expand Down Expand Up @@ -113,6 +114,12 @@ namespace phlex::experimental {
return make_glue().provide(std::move(name), std::move(f), c);
}

template <std::derived_from<source> Source, typename... Args>
void source(std::string name, Args&&... args)
{
return make_glue().template source<Source>(std::move(name), std::forward<Args>(args)...);
}

template <typename T, typename... Args>
glue<T> make(Args&&... args)
{
Expand Down
11 changes: 11 additions & 0 deletions phlex/core/glue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "phlex/core/concepts.hpp"
#include "phlex/core/registrar.hpp"
#include "phlex/core/registration_api.hpp"
#include "phlex/core/source.hpp"
#include "phlex/metaprogramming/delegate.hpp"

#include "oneapi/tbb/flow_graph.h"
Expand Down Expand Up @@ -150,6 +151,16 @@ namespace phlex::experimental {
c};
}

template <std::derived_from<source> Source, typename... Args>
void source(std::string name, Args&&... args)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this creates a new thing in the graph on which it is called, consider renaming to add_source.
This probably should be done for the other node types as well, so an issue that addresses this should be created.

{
auto [_, inserted] =
nodes_.sources.try_emplace(name, std::make_unique<Source>(std::forward<Args>(args)...));
if (not inserted) {
detail::add_to_error_messages(errors_, name); // From registrar.hpp
}
}

private:
// Non-owning references to framework-owned resources; glue<T> is a short-lived builder.
tbb::flow::graph& graph_; // NOLINT(cppcoreguidelines-avoid-const-or-ref-data-members)
Expand Down
11 changes: 11 additions & 0 deletions phlex/core/graph_proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ namespace phlex::experimental {
std::move(name), std::move(pred), std::move(unf), c, std::move(destination_data_layer));
}

/// @brief Registers a source (used by the framework to create provider nodes)
template <std::derived_from<source> Source, typename... Args>
void source(std::string name, Args&&... args)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment as put in graph::source applies here: consider renaming to add_source.

requires(not is_bound_object<T>)
{
// The bound object is created when invoking source<Source>(...), so we explicitly indicate that
// no bound object should be used in the create_glue(...) call.
return create_glue(false).template source<Source>(std::move(name),
std::forward<Args>(args)...);
}

/// @brief Registers an output node.
auto output(std::string name, is_output_like auto f, concurrency c = concurrency::serial)
{
Expand Down
99 changes: 88 additions & 11 deletions phlex/core/make_computational_edges.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "phlex/core/make_computational_edges.hpp"

#include "fmt/format.h"
#include "oneapi/tbb/flow_graph.h"
#include "spdlog/spdlog.h"

#include <algorithm>
#include <cassert>
#include <ranges>
#include <span>
#include <stdexcept>

using namespace std::string_literals;

Expand All @@ -26,33 +28,87 @@ namespace phlex::experimental {
return nullptr;
}

index_router::provider_input_ports_t edges_from_explicit_providers(
index_router::head_ports_t head_ports, provider_nodes& explicit_providers)
provider_bundles find_matching_implicit_providers(source_map const& sources,
product_selector const& input_product)
{
assert(!head_ports.empty());
provider_bundles result;
for (auto const& src : sources | std::views::values) {
result.append_range(src->create_providers(input_product));
}
return result;
}

// FIXME: Should return a list of head ports that cannot be matched to an explicit provider.
std::pair<index_router::provider_input_ports_t, index_router::head_ports_t>
edges_from_explicit_providers(index_router::head_ports_t head_ports,
provider_nodes& explicit_providers)
{
assert(!head_ports.empty());

index_router::provider_input_ports_t result;
index_router::provider_input_ports_t provider_input_ports;
index_router::head_ports_t unconsumed_head_ports;
for (auto const& [node_name, ports] : head_ports) {
for (auto const& [input_product, port] : ports) {
// Find the provider that has the right product name (hidden in the
// output port) and the right family (hidden in the input port).
if (auto* matched_provider = find_matching_provider(explicit_providers, input_product)) {
auto const provider_name = matched_provider->name().to_string();
result.try_emplace(provider_name, input_product, matched_provider->input_port());
provider_input_ports.try_emplace(
provider_name, input_product, matched_provider->input_port());
spdlog::debug("Connecting provider {} to node {} (product: {})",
provider_name,
node_name,
input_product.to_string());
make_edge(matched_provider->output_port(), *port);
} else {
throw std::runtime_error("No provider found for product: "s +
input_product.to_string());
unconsumed_head_ports[node_name].push_back({input_product, port});
}
}
}
return result;
return {std::move(provider_input_ports), std::move(unconsumed_head_ports)};
}

std::pair<index_router::provider_input_ports_t, index_router::head_ports_t>
edges_from_implicit_providers(index_router::head_ports_t head_ports,
provider_nodes& providers,
source_map const& sources,
tbb::flow::graph& g)
{
index_router::provider_input_ports_t provider_input_ports;
index_router::head_ports_t unconsumed_head_ports;
for (auto const& [node_name, ports] : head_ports) {
for (auto const& [input_product, port] : ports) {
// If we have a source node that can produce this product, use it.
auto bundles = find_matching_implicit_providers(sources, input_product);
if (bundles.empty()) {
unconsumed_head_ports[node_name].push_back({input_product, port});
continue;
}

// For now we require only one implicit provider. This will change in the future.
if (bundles.size() > 1ull) {
auto error_msg = fmt::format(
"Multiple implicit providers found for product '{}', required by node '{}':\n",
input_product.to_string(),
node_name);
throw std::runtime_error(error_msg);
}

auto& bundle = bundles[0];
auto const& spec = bundle.spec;
auto node = std::make_unique<provider_node>(spec.creator(),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider filing an issue that makes the argument list of the constructor of provider_node shorter by passing in the provider_bundle rather than passing in the innards of the provider_bundle.

bundle.max_concurrency.value,
g,
std::move(bundle.provider_function),
spec,
identifier{bundle.layer},
identifier{bundle.stage});
auto const provider_name = node->name().to_string();
provider_input_ports.try_emplace(provider_name, input_product, node->input_port());
make_edge(node->output_port(), *port);
providers.try_emplace(provider_name, std::move(node));
}
}
return {std::move(provider_input_ports), std::move(unconsumed_head_ports)};
}

index_router::head_ports_t edges_within_computational_graph(
Expand Down Expand Up @@ -111,7 +167,9 @@ namespace phlex::experimental {
}

std::tuple<index_router::provider_input_ports_t, std::map<std::string, named_index_ports>>
make_computational_edges(node_catalog& nodes, std::map<std::string, filter>& filters)
make_computational_edges(node_catalog& nodes,
std::map<std::string, filter>& filters,
tbb::flow::graph& g)
{
auto const producers = nodes.producers();
auto const consumers = nodes.consumers();
Expand All @@ -124,8 +182,27 @@ namespace phlex::experimental {

edges_to_outputs(nodes.providers, producers, nodes.outputs);

auto provider_input_ports =
auto [explicit_provider_input_ports, unconsumed_head_ports] =
edges_from_explicit_providers(std::move(head_ports), nodes.providers);

auto [implicit_provider_input_ports, unmatched_head_ports] = edges_from_implicit_providers(
std::move(unconsumed_head_ports), nodes.providers, nodes.sources, g);

if (not unmatched_head_ports.empty()) {
std::string error_msg{"No provider found for the following required products:\n"};
for (auto const& [node_name, ports] : unmatched_head_ports) {
for (auto const& [input_product, _] : ports) {
error_msg += fmt::format(
" - Node '{}' requires product '{}'\n", node_name, input_product.to_string());
}
}
throw std::runtime_error(error_msg);
}

// Combine implicit and explicit provider input ports.
auto provider_input_ports = std::move(explicit_provider_input_ports);
provider_input_ports.merge(std::move(implicit_provider_input_ports));

auto multilayer_join_index_ports = multilayer_ports(consumers);

return std::make_tuple(std::move(provider_input_ports), std::move(multilayer_join_index_ports));
Expand Down
6 changes: 5 additions & 1 deletion phlex/core/make_computational_edges.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "phlex/core/index_router.hpp"
#include "phlex/core/node_catalog.hpp"

#include "oneapi/tbb/flow_graph.h"

#include <map>
#include <string>
#include <tuple>
Expand All @@ -28,7 +30,9 @@ namespace phlex::experimental {

PHLEX_CORE_EXPORT
std::tuple<index_router::provider_input_ports_t, std::map<std::string, named_index_ports>>
make_computational_edges(node_catalog& nodes, std::map<std::string, filter>& filters);
make_computational_edges(node_catalog& nodes,
std::map<std::string, filter>& filters,
tbb::flow::graph& g);

}

Expand Down
2 changes: 2 additions & 0 deletions phlex/core/node_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "phlex/core/products_consumer.hpp"
#include "phlex/core/provider_node.hpp"
#include "phlex/core/registrar.hpp"
#include "phlex/core/source.hpp"
#include "phlex/utilities/simple_ptr_map.hpp"

#include "boost/pfr.hpp"
Expand All @@ -39,6 +40,7 @@ namespace phlex::experimental {
simple_ptr_map<declared_unfold_ptr> unfolds{};
simple_ptr_map<declared_transform_ptr> transforms{};
simple_ptr_map<provider_node_ptr> providers{};
simple_ptr_map<source_ptr> sources{};
};
}

Expand Down
49 changes: 49 additions & 0 deletions phlex/core/source.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifndef PHLEX_CORE_SOURCE_HPP
#define PHLEX_CORE_SOURCE_HPP

#include "phlex/phlex_core_export.hpp"

#include "phlex/concurrency.hpp"
#include "phlex/core/product_selector.hpp"
#include "phlex/model/algorithm_name.hpp"
#include "phlex/model/index_generator.hpp"
#include "phlex/model/product_specification.hpp"
#include "phlex/model/products.hpp"
#include "phlex/model/type_id.hpp"
#include "phlex/utilities/simple_ptr_map.hpp"

#include <functional>
#include <memory>
#include <string>

namespace phlex::experimental {

// ==============================================================================

// Function type for type-erased data-product types (used by implicit providers)
using provider_function_t = product_ptr(data_cell_index const&);

struct provider_bundle {
std::function<provider_function_t> provider_function;
concurrency max_concurrency;
product_specification spec;
std::string layer;
std::string stage;
};

using provider_bundles = std::vector<provider_bundle>;

// ==============================================================================
class source {
public:
virtual ~source() = default;
// FIXME: Should these functions be 'const'?
virtual provider_bundles create_providers(product_selector const&) = 0;
virtual index_generator indices() = 0;
};

using source_ptr = std::unique_ptr<source>;
using source_map = simple_ptr_map<source_ptr>;
}

#endif // PHLEX_CORE_SOURCE_HPP
Loading
Loading