diff --git a/form/CMakeLists.txt b/form/CMakeLists.txt index 4d5136566..4e6d6eb04 100644 --- a/form/CMakeLists.txt +++ b/form/CMakeLists.txt @@ -30,7 +30,7 @@ target_link_libraries(form_module PRIVATE phlex::module form) target_include_directories(form_module PRIVATE ${PROJECT_SOURCE_DIR}) add_library(form_source MODULE form_source.cpp) -target_link_libraries(form_source PRIVATE phlex::module form) +target_link_libraries(form_source PRIVATE phlex::source form) target_include_directories(form_source PRIVATE ${PROJECT_SOURCE_DIR}) install(TARGETS form_module form_source LIBRARY DESTINATION lib) diff --git a/phlex/CMakeLists.txt b/phlex/CMakeLists.txt index 76e54ae2e..a115868bd 100644 --- a/phlex/CMakeLists.txt +++ b/phlex/CMakeLists.txt @@ -19,6 +19,20 @@ cet_make_library( phlex::configuration ) +cet_make_library( + LIBRARY_NAME + phlex_source + EXPORT_NAME + source + INTERFACE + NO_SOURCE + LIBRARIES + Boost::boost + Boost::json + phlex::core + phlex::configuration +) + install(FILES concurrency.hpp module.hpp source.hpp DESTINATION include/phlex) install(FILES detail/plugin_macros.hpp DESTINATION include/phlex/detail) diff --git a/phlex/app/load_module.cpp b/phlex/app/load_module.cpp index f204c7355..48d52f72c 100644 --- a/phlex/app/load_module.cpp +++ b/phlex/app/load_module.cpp @@ -121,9 +121,8 @@ namespace phlex::experimental { // internal reference counting in classification.hpp. // NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks,clang-analyzer-cplusplus.NewDelete) create_driver = plugin_loader(spec, "create_driver"); - driver_proxy const proxy{}; driver_bundle result; - create_driver(proxy, config, &result); + create_driver(driver_proxy{}, config, &result); return result; } } diff --git a/phlex/core/CMakeLists.txt b/phlex/core/CMakeLists.txt index dd8043e29..1e8f0f457 100644 --- a/phlex/core/CMakeLists.txt +++ b/phlex/core/CMakeLists.txt @@ -66,6 +66,7 @@ install( provider_node.hpp registrar.hpp registration_api.hpp + source.hpp store_counters.hpp upstream_predicates.hpp DESTINATION include/phlex/core diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index 54838a78c..9e50fa0d7 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -176,7 +176,7 @@ namespace phlex::experimental { make_bookkeeping_edges(); auto [provider_input_ports, multilayer_join_index_ports] = - make_computational_edges(nodes_, filters_); + make_computational_edges(nodes_, filters_, graph_); if (provider_input_ports.empty()) { assert(multilayer_join_index_ports.empty()); diff --git a/phlex/core/framework_graph.hpp b/phlex/core/framework_graph.hpp index f0b8ff1bb..5a359ab16 100644 --- a/phlex/core/framework_graph.hpp +++ b/phlex/core/framework_graph.hpp @@ -3,8 +3,6 @@ #include "phlex/phlex_core_export.hpp" -#include "phlex/core/declared_fold.hpp" -#include "phlex/core/declared_unfold.hpp" #include "phlex/core/filter.hpp" #include "phlex/core/glue.hpp" #include "phlex/core/index_router.hpp" @@ -13,6 +11,7 @@ #include "phlex/driver.hpp" #include "phlex/model/data_cell_tracker.hpp" #include "phlex/model/data_layer_hierarchy.hpp" +#include "phlex/model/fixed_hierarchy.hpp" #include "phlex/model/flush_messages.hpp" #include "phlex/model/product_store.hpp" #include "phlex/module.hpp" @@ -23,8 +22,10 @@ #include "oneapi/tbb/flow_graph.h" #include "oneapi/tbb/info.h" +#include #include #include +#include #include #include #include @@ -58,7 +59,7 @@ namespace phlex::experimental { return {config, graph_, nodes_, registration_errors_}; } - source_graph_proxy source_proxy(configuration const& config) + source_bundle source_proxy(configuration const& config) { return {config, graph_, nodes_, registration_errors_}; } @@ -113,6 +114,12 @@ namespace phlex::experimental { return make_glue().provide(std::move(name), std::move(f), c); } + template Source, typename... Args> + void source(std::string name, Args&&... args) + { + return make_glue().template source(std::move(name), std::forward(args)...); + } + template glue make(Args&&... args) { diff --git a/phlex/core/glue.hpp b/phlex/core/glue.hpp index a1101be6a..75c54412e 100644 --- a/phlex/core/glue.hpp +++ b/phlex/core/glue.hpp @@ -7,6 +7,7 @@ #include "phlex/core/concepts.hpp" #include "phlex/core/registrar.hpp" #include "phlex/core/registration_api.hpp" +#include "phlex/core/source.hpp" #include "phlex/metaprogramming/delegate.hpp" #include "oneapi/tbb/flow_graph.h" @@ -150,6 +151,16 @@ namespace phlex::experimental { c}; } + template Source, typename... Args> + void source(std::string name, Args&&... args) + { + auto [_, inserted] = + nodes_.sources.try_emplace(name, std::make_unique(std::forward(args)...)); + if (not inserted) { + detail::add_to_error_messages(errors_, "Source", name); // From registrar.hpp + } + } + private: // Non-owning references to framework-owned resources; glue is a short-lived builder. tbb::flow::graph& graph_; // NOLINT(cppcoreguidelines-avoid-const-or-ref-data-members) diff --git a/phlex/core/graph_proxy.hpp b/phlex/core/graph_proxy.hpp index 1258ea8e1..8e3821bbb 100644 --- a/phlex/core/graph_proxy.hpp +++ b/phlex/core/graph_proxy.hpp @@ -107,6 +107,17 @@ namespace phlex::experimental { std::move(name), std::move(pred), std::move(unf), c, std::move(destination_data_layer)); } + /// @brief Registers a source (used by the framework to create provider nodes) + template Source, typename... Args> + void source(std::string name, Args&&... args) + requires(not is_bound_object) + { + // The bound object is created when invoking source(...), so we explicitly indicate that + // no bound object should be used in the create_glue(...) call. + return create_glue(false).template source(std::move(name), + std::forward(args)...); + } + /// @brief Registers an output node. auto output(std::string name, is_output_like auto f, concurrency c = concurrency::serial) { diff --git a/phlex/core/make_computational_edges.cpp b/phlex/core/make_computational_edges.cpp index 8205301d0..c73e279c8 100644 --- a/phlex/core/make_computational_edges.cpp +++ b/phlex/core/make_computational_edges.cpp @@ -1,5 +1,6 @@ #include "phlex/core/make_computational_edges.hpp" +#include "fmt/format.h" #include "oneapi/tbb/flow_graph.h" #include "spdlog/spdlog.h" @@ -7,6 +8,7 @@ #include #include #include +#include using namespace std::string_literals; @@ -26,33 +28,87 @@ namespace phlex::experimental { return nullptr; } - index_router::provider_input_ports_t edges_from_explicit_providers( - index_router::head_ports_t head_ports, provider_nodes& explicit_providers) + provider_bundles find_matching_implicit_providers(source_map const& sources, + product_selector const& input_product) { - assert(!head_ports.empty()); + provider_bundles result; + for (auto const& src : sources | std::views::values) { + result.append_range(src->create_providers(input_product)); + } + return result; + } - // FIXME: Should return a list of head ports that cannot be matched to an explicit provider. + std::pair + edges_from_explicit_providers(index_router::head_ports_t head_ports, + provider_nodes& explicit_providers) + { + assert(!head_ports.empty()); - index_router::provider_input_ports_t result; + index_router::provider_input_ports_t provider_input_ports; + index_router::head_ports_t unconsumed_head_ports; for (auto const& [node_name, ports] : head_ports) { for (auto const& [input_product, port] : ports) { // Find the provider that has the right product name (hidden in the // output port) and the right family (hidden in the input port). if (auto* matched_provider = find_matching_provider(explicit_providers, input_product)) { auto const provider_name = matched_provider->name().to_string(); - result.try_emplace(provider_name, input_product, matched_provider->input_port()); + provider_input_ports.try_emplace( + provider_name, input_product, matched_provider->input_port()); spdlog::debug("Connecting provider {} to node {} (product: {})", provider_name, node_name, input_product.to_string()); make_edge(matched_provider->output_port(), *port); } else { - throw std::runtime_error("No provider found for product: "s + - input_product.to_string()); + unconsumed_head_ports[node_name].push_back({input_product, port}); } } } - return result; + return {std::move(provider_input_ports), std::move(unconsumed_head_ports)}; + } + + std::pair + edges_from_implicit_providers(index_router::head_ports_t head_ports, + provider_nodes& providers, + source_map const& sources, + tbb::flow::graph& g) + { + index_router::provider_input_ports_t provider_input_ports; + index_router::head_ports_t unconsumed_head_ports; + for (auto const& [node_name, ports] : head_ports) { + for (auto const& [input_product, port] : ports) { + // If we have a source node that can produce this product, use it. + auto bundles = find_matching_implicit_providers(sources, input_product); + if (bundles.empty()) { + unconsumed_head_ports[node_name].push_back({input_product, port}); + continue; + } + + // For now we require only one implicit provider. This will change in the future. + if (bundles.size() > 1ull) { + auto error_msg = fmt::format( + "Multiple implicit providers found for product '{}', required by node '{}':\n", + input_product.to_string(), + node_name); + throw std::runtime_error(error_msg); + } + + auto& bundle = bundles[0]; + auto const& spec = bundle.spec; + auto node = std::make_unique(spec.creator(), + bundle.max_concurrency.value, + g, + std::move(bundle.provider_function), + spec, + identifier{bundle.layer}, + identifier{bundle.stage}); + auto const provider_name = node->name().to_string(); + provider_input_ports.try_emplace(provider_name, input_product, node->input_port()); + make_edge(node->output_port(), *port); + providers.try_emplace(provider_name, std::move(node)); + } + } + return {std::move(provider_input_ports), std::move(unconsumed_head_ports)}; } index_router::head_ports_t edges_within_computational_graph( @@ -111,7 +167,9 @@ namespace phlex::experimental { } std::tuple> - make_computational_edges(node_catalog& nodes, std::map& filters) + make_computational_edges(node_catalog& nodes, + std::map& filters, + tbb::flow::graph& g) { auto const producers = nodes.producers(); auto const consumers = nodes.consumers(); @@ -124,8 +182,27 @@ namespace phlex::experimental { edges_to_outputs(nodes.providers, producers, nodes.outputs); - auto provider_input_ports = + auto [explicit_provider_input_ports, unconsumed_head_ports] = edges_from_explicit_providers(std::move(head_ports), nodes.providers); + + auto [implicit_provider_input_ports, unmatched_head_ports] = edges_from_implicit_providers( + std::move(unconsumed_head_ports), nodes.providers, nodes.sources, g); + + if (not unmatched_head_ports.empty()) { + std::string error_msg{"No provider found for the following required products:\n"}; + for (auto const& [node_name, ports] : unmatched_head_ports) { + for (auto const& [input_product, _] : ports) { + error_msg += fmt::format( + " - Node '{}' requires product '{}'\n", node_name, input_product.to_string()); + } + } + throw std::runtime_error(error_msg); + } + + // Combine implicit and explicit provider input ports. + auto provider_input_ports = std::move(explicit_provider_input_ports); + provider_input_ports.merge(std::move(implicit_provider_input_ports)); + auto multilayer_join_index_ports = multilayer_ports(consumers); return std::make_tuple(std::move(provider_input_ports), std::move(multilayer_join_index_ports)); diff --git a/phlex/core/make_computational_edges.hpp b/phlex/core/make_computational_edges.hpp index 2a5ec165e..8efdcd426 100644 --- a/phlex/core/make_computational_edges.hpp +++ b/phlex/core/make_computational_edges.hpp @@ -19,6 +19,8 @@ #include "phlex/core/index_router.hpp" #include "phlex/core/node_catalog.hpp" +#include "oneapi/tbb/flow_graph.h" + #include #include #include @@ -28,7 +30,9 @@ namespace phlex::experimental { PHLEX_CORE_EXPORT std::tuple> - make_computational_edges(node_catalog& nodes, std::map& filters); + make_computational_edges(node_catalog& nodes, + std::map& filters, + tbb::flow::graph& g); } diff --git a/phlex/core/node_catalog.hpp b/phlex/core/node_catalog.hpp index d8ec0d2d2..274a50893 100644 --- a/phlex/core/node_catalog.hpp +++ b/phlex/core/node_catalog.hpp @@ -13,6 +13,7 @@ #include "phlex/core/products_consumer.hpp" #include "phlex/core/provider_node.hpp" #include "phlex/core/registrar.hpp" +#include "phlex/core/source.hpp" #include "phlex/utilities/simple_ptr_map.hpp" #include "boost/pfr.hpp" @@ -39,6 +40,7 @@ namespace phlex::experimental { simple_ptr_map unfolds{}; simple_ptr_map transforms{}; simple_ptr_map providers{}; + simple_ptr_map sources{}; }; } diff --git a/phlex/core/registrar.cpp b/phlex/core/registrar.cpp index a9878349e..ad80c4f35 100644 --- a/phlex/core/registrar.cpp +++ b/phlex/core/registrar.cpp @@ -3,8 +3,10 @@ #include "fmt/format.h" namespace phlex::experimental::detail { - void add_to_error_messages(std::vector& errors, std::string const& name) + void add_to_error_messages(std::vector& errors, + std::string const& entity, + std::string const& name) { - errors.push_back(fmt::format("Node with name '{}' already exists", name)); + errors.push_back(fmt::format("{} with name '{}' already exists", entity, name)); } } diff --git a/phlex/core/registrar.hpp b/phlex/core/registrar.hpp index d8c0b3abd..d96c1a38d 100644 --- a/phlex/core/registrar.hpp +++ b/phlex/core/registrar.hpp @@ -61,6 +61,7 @@ namespace phlex::experimental { namespace detail { PHLEX_CORE_EXPORT void add_to_error_messages(std::vector& errors, + std::string const& entity, std::string const& name); } @@ -115,7 +116,7 @@ namespace phlex::experimental { auto name = ptr->name().to_string(); auto [_, inserted] = nodes_->try_emplace(name, std::move(ptr)); if (not inserted) { - detail::add_to_error_messages(*errors_, name); + detail::add_to_error_messages(*errors_, "Node", name); } } diff --git a/phlex/core/source.hpp b/phlex/core/source.hpp new file mode 100644 index 000000000..2ed7c93a1 --- /dev/null +++ b/phlex/core/source.hpp @@ -0,0 +1,48 @@ +#ifndef PHLEX_CORE_SOURCE_HPP +#define PHLEX_CORE_SOURCE_HPP + +#include "phlex/phlex_core_export.hpp" + +#include "phlex/concurrency.hpp" +#include "phlex/core/product_selector.hpp" +#include "phlex/model/algorithm_name.hpp" +#include "phlex/model/index_generator.hpp" +#include "phlex/model/product_specification.hpp" +#include "phlex/model/products.hpp" +#include "phlex/model/type_id.hpp" +#include "phlex/utilities/simple_ptr_map.hpp" + +#include +#include +#include + +namespace phlex::experimental { + + // ============================================================================== + + // Function type for type-erased data-product types (used by implicit providers) + using provider_function_t = product_ptr(data_cell_index const&); + + struct PHLEX_CORE_EXPORT provider_bundle { + std::function provider_function; + concurrency max_concurrency; + product_specification spec; + std::string layer; + std::string stage; + }; + + using provider_bundles = std::vector; + + // ============================================================================== + class PHLEX_CORE_EXPORT source { + public: + virtual ~source() = default; + virtual provider_bundles create_providers(product_selector const&) = 0; + virtual index_generator indices() = 0; + }; + + using source_ptr = std::unique_ptr; + using source_map = simple_ptr_map; +} + +#endif // PHLEX_CORE_SOURCE_HPP diff --git a/phlex/detail/plugin_macros.hpp b/phlex/detail/plugin_macros.hpp index cc08d7dcd..d9471be71 100644 --- a/phlex/detail/plugin_macros.hpp +++ b/phlex/detail/plugin_macros.hpp @@ -7,13 +7,16 @@ // `bugprone-macro-parentheses` is appropriate for expression-like macros, but these macros expand // to C++ signatures, where parenthesizing parameters breaks parsing. We suppress the check for this // block because line continuations make per-line suppression impractical. + +// ================================================================================================ +// Algorithm registration macros #define PHLEX_DETAIL_NARGS(...) BOOST_PP_DEC(BOOST_PP_VARIADIC_SIZE(__VA_OPT__(, ) __VA_ARGS__)) #define PHLEX_DETAIL_CREATE_1ARG(token_type, func_name, m) \ - void func_name(token_type& m, phlex::configuration const&) + void func_name(token_type m, phlex::configuration const&) -#define PHLEX_DETAIL_CREATE_2ARGS(token_type, func_name, m, pset) \ - void func_name(token_type& m, phlex::configuration const& config) +#define PHLEX_DETAIL_CREATE_2ARGS(token_type, func_name, m, cfg) \ + void func_name(token_type m, phlex::configuration const& cfg) #define PHLEX_DETAIL_SELECT_SIGNATURE(token_type, func_name, ...) \ BOOST_PP_IF(BOOST_PP_EQUAL(PHLEX_DETAIL_NARGS(__VA_ARGS__), 1), \ @@ -27,13 +30,46 @@ #define PHLEX_DETAIL_REGISTER_PLUGIN(token_type, func_name, dll_alias, ...) \ extern "C" PHLEX_DETAIL_SELECT_SIGNATURE(token_type, dll_alias, __VA_ARGS__) +// ================================================================================================ +// Registration macros for source plugins and explicit-provider plugins +// +// Source plugin entry-points cannot use extern "C" directly because the user-facing proxy types +// (providers_graph_proxy, source_graph_proxy) are C++ templates. Instead we: +// 1. Forward-declare the user's C++ implementation (takes the proxy by reference). +// 2. Define a thin extern "C" shim that accepts source_bundle by value (matching +// source_creator_t exactly), constructs the appropriate proxy from the bundle, +// and calls the user's implementation. +// 3. Open the user's implementation definition for the body that follows the macro. +#define PHLEX_DETAIL_CREATE_SOURCE_1ARG(token_type, func_name, m) \ + void func_name(token_type m, phlex::configuration const&) + +#define PHLEX_DETAIL_CREATE_SOURCE_2ARGS(token_type, func_name, m, cfg) \ + void func_name(token_type m, phlex::configuration const& cfg) + +#define PHLEX_DETAIL_SELECT_SOURCE_SIGNATURE(token_type, func_name, ...) \ + BOOST_PP_IF(BOOST_PP_EQUAL(PHLEX_DETAIL_NARGS(__VA_ARGS__), 1), \ + PHLEX_DETAIL_CREATE_SOURCE_1ARG, \ + PHLEX_DETAIL_CREATE_SOURCE_2ARGS) \ + (token_type, func_name, __VA_ARGS__) + +#define PHLEX_DETAIL_REGISTER_SOURCE_PLUGIN(token_type, func_name, dll_alias, ...) \ + static PHLEX_DETAIL_SELECT_SOURCE_SIGNATURE(token_type, func_name, __VA_ARGS__); \ + extern "C" void dll_alias(phlex::experimental::source_bundle __phlex_bundle, \ + phlex::configuration const& __phlex_config) \ + { \ + func_name(token_type{__phlex_bundle}, __phlex_config); \ + } \ + PHLEX_DETAIL_SELECT_SOURCE_SIGNATURE(token_type, func_name, __VA_ARGS__) + +// ================================================================================================ +// Driver registration plugin macros #define PHLEX_DETAIL_CREATE_DRIVER_1ARG(func_name, d) \ - phlex::experimental::driver_bundle func_name(phlex::experimental::driver_proxy const& d, \ + phlex::experimental::driver_bundle func_name(phlex::experimental::driver_proxy d, \ phlex::configuration const&) -#define PHLEX_DETAIL_CREATE_DRIVER_2ARGS(func_name, d, config) \ - phlex::experimental::driver_bundle func_name(phlex::experimental::driver_proxy const& d, \ - phlex::configuration const& config) +#define PHLEX_DETAIL_CREATE_DRIVER_2ARGS(func_name, d, cfg) \ + phlex::experimental::driver_bundle func_name(phlex::experimental::driver_proxy d, \ + phlex::configuration const& cfg) #define PHLEX_DETAIL_SELECT_DRIVER_SIGNATURE(func_name, ...) \ BOOST_PP_IF(BOOST_PP_EQUAL(PHLEX_DETAIL_NARGS(__VA_ARGS__), 1), \ @@ -47,7 +83,7 @@ // linkage), and then open the user's implementation definition for the body that follows. #define PHLEX_DETAIL_REGISTER_DRIVER_PLUGIN(func_name, dll_alias, ...) \ static PHLEX_DETAIL_SELECT_DRIVER_SIGNATURE(func_name, __VA_ARGS__); \ - extern "C" void dll_alias(phlex::experimental::driver_proxy const& __phlex_proxy, \ + extern "C" void dll_alias(phlex::experimental::driver_proxy __phlex_proxy, \ phlex::configuration const& __phlex_config, \ phlex::experimental::driver_bundle* __phlex_out) \ { \ diff --git a/phlex/driver.hpp b/phlex/driver.hpp index 7542f8e72..77156e6d6 100644 --- a/phlex/driver.hpp +++ b/phlex/driver.hpp @@ -21,7 +21,7 @@ namespace phlex::experimental { using next_index_t = std::function; // Shim type for the extern "C" entry-point: out-parameter avoids returning a C++ type // across a C-linkage boundary. - using driver_shim_t = void(driver_proxy const&, configuration const&, driver_bundle*); + using driver_shim_t = void(driver_proxy, configuration const&, driver_bundle*); }; /// @brief Bundles the driver function and data hierarchy for the framework. diff --git a/phlex/source.hpp b/phlex/source.hpp index 425377bfc..a79870a32 100644 --- a/phlex/source.hpp +++ b/phlex/source.hpp @@ -9,35 +9,73 @@ #include namespace phlex::experimental { - /// @brief Proxy for registering source (provider) nodes. + + struct source_bundle { + // Non-owning references to framework-owned resources; source_bundle is a short-lived struct. + // NOLINTBEGIN(cppcoreguidelines-avoid-const-or-ref-data-members) + configuration const& config; + tbb::flow::graph& graph; + node_catalog& nodes; + std::vector& registration_errors; + // NOLINTEND(cppcoreguidelines-avoid-const-or-ref-data-members) + }; + + /// @brief Proxy for registering explicit provider nodes. /// /// Passed to @c PHLEX_REGISTER_PROVIDERS plugin entry points. Only provide /// registration is accessible. Users never construct this type directly. template - class source_graph_proxy : graph_proxy { + class providers_graph_proxy : graph_proxy { using base = graph_proxy; public: + providers_graph_proxy(source_bundle bundle) : + base{bundle.config, bundle.graph, bundle.nodes, bundle.registration_errors} + { + } + using base::graph_proxy; template - source_graph_proxy make(Args&&... args) + providers_graph_proxy make(Args&&... args) requires(not is_bound_object) { - return this->template bind_to(std::forward(args)...); + return this->template bind_to(std::forward(args)...); } // Only provide(...) should be accessible using base::provide; }; + /// @brief Proxy for registering source nodes. + /// + /// Passed to @c PHLEX_REGISTER_SOURCE plugin entry points. Only source + /// registration is accessible. Users never construct this type directly. + template + class source_graph_proxy : graph_proxy { + using base = graph_proxy; + + public: + source_graph_proxy(source_bundle bundle) : + base{bundle.config, bundle.graph, bundle.nodes, bundle.registration_errors} + { + } + + // Only source(...) should be accessible + using base::source; + }; + namespace detail { - using source_creator_t = void(source_graph_proxy, configuration const&); + using source_creator_t = void(source_bundle, configuration const&); } } #define PHLEX_REGISTER_PROVIDERS(...) \ - PHLEX_DETAIL_REGISTER_PLUGIN( \ + PHLEX_DETAIL_REGISTER_SOURCE_PLUGIN( \ + phlex::experimental::providers_graph_proxy, create, create_source, __VA_ARGS__) + +#define PHLEX_REGISTER_SOURCE(...) \ + PHLEX_DETAIL_REGISTER_SOURCE_PLUGIN( \ phlex::experimental::source_graph_proxy, create, create_source, __VA_ARGS__) #endif // PHLEX_SOURCE_HPP diff --git a/plugins/python/CMakeLists.txt b/plugins/python/CMakeLists.txt index d21131767..27128c204 100644 --- a/plugins/python/CMakeLists.txt +++ b/plugins/python/CMakeLists.txt @@ -18,7 +18,7 @@ add_library( src/lifelinewrap.cpp src/errorwrap.cpp ) -target_link_libraries(pymodule PRIVATE phlex::module Python::Python Python::NumPy) +target_link_libraries(pymodule PRIVATE phlex::module phlex::source Python::Python Python::NumPy) target_compile_definitions(pymodule PRIVATE NPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION) install(TARGETS pymodule LIBRARY DESTINATION lib) diff --git a/plugins/python/src/wrap.hpp b/plugins/python/src/wrap.hpp index a93f1e479..81cef492c 100644 --- a/plugins/python/src/wrap.hpp +++ b/plugins/python/src/wrap.hpp @@ -44,7 +44,7 @@ namespace phlex::experimental { struct py_phlex_module; // Phlex' source wrapper to register providers - typedef source_graph_proxy phlex_source_t; + typedef providers_graph_proxy phlex_source_t; PyObject* wrap_source(phlex_source_t& src); // returns new reference // PyType_Ready() modifies PyTypeObject in-place; the Python C API requires non-const. // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/test/benchmarks/CMakeLists.txt b/test/benchmarks/CMakeLists.txt index 524845c63..ba0dc004b 100644 --- a/test/benchmarks/CMakeLists.txt +++ b/test/benchmarks/CMakeLists.txt @@ -2,7 +2,7 @@ add_library(fibonacci_numbers SHARED fibonacci_numbers.cpp) target_include_directories(fibonacci_numbers PRIVATE ${PROJECT_SOURCE_DIR}) add_library(benchmarks_provider MODULE benchmarks_provider.cpp) -target_link_libraries(benchmarks_provider PRIVATE phlex::model phlex::module) +target_link_libraries(benchmarks_provider PRIVATE phlex::model phlex::source) add_library(last_index MODULE last_index.cpp) target_link_libraries(last_index PRIVATE phlex::module) diff --git a/test/core_misc_test.cpp b/test/core_misc_test.cpp index 259aebe45..1a092a67f 100644 --- a/test/core_misc_test.cpp +++ b/test/core_misc_test.cpp @@ -89,7 +89,7 @@ TEST_CASE("add_to_error_messages tests", "[core]") using namespace phlex::experimental::detail; std::vector errors; - add_to_error_messages(errors, "duplicate_node"); + add_to_error_messages(errors, "Node", "duplicate_node"); REQUIRE(errors.size() == 1); CHECK(errors[0].find("duplicate_node") != std::string::npos); diff --git a/test/max-parallelism/CMakeLists.txt b/test/max-parallelism/CMakeLists.txt index 2fd7b98cb..2997ca3fb 100644 --- a/test/max-parallelism/CMakeLists.txt +++ b/test/max-parallelism/CMakeLists.txt @@ -4,7 +4,7 @@ add_library(check_parallelism MODULE check_parallelism.cpp) target_link_libraries(check_parallelism PRIVATE Boost::json phlex::core) add_library(provide_parallelism MODULE provide_parallelism.cpp) -target_link_libraries(provide_parallelism PRIVATE phlex::core) +target_link_libraries(provide_parallelism PRIVATE phlex::source) configure_file(check_parallelism_default.jsonnet.in check_parallelism_default.jsonnet @ONLY) cet_test( diff --git a/test/max-parallelism/provide_parallelism.cpp b/test/max-parallelism/provide_parallelism.cpp index 3e5b90bc1..0d9cdd995 100644 --- a/test/max-parallelism/provide_parallelism.cpp +++ b/test/max-parallelism/provide_parallelism.cpp @@ -1,11 +1,42 @@ +#include "phlex/core/source.hpp" #include "phlex/source.hpp" #include "phlex/utilities/max_allowed_parallelism.hpp" -PHLEX_REGISTER_PROVIDERS(s) +#include +#include +#include + +namespace { + class max_parallelism_source : public phlex::experimental::source { + public: + phlex::experimental::provider_bundles create_providers( + phlex::product_selector const& selector) override + { + using namespace phlex::experimental; + provider_bundles bundles; + std::string const layer = "job"; + std::string const stage = "CURRENT"; + product_specification spec{"input", "max_parallelism", make_type_id()}; + + if (selector.match(spec, identifier{layer}, identifier{stage})) { + bundles.push_back(provider_bundle{.provider_function = + [](phlex::data_cell_index const&) { + return product_for( + max_allowed_parallelism::active_value()); + }, + .max_concurrency = phlex::concurrency::unlimited, + .spec = std::move(spec), + .layer = layer, + .stage = stage}); + } + return bundles; + } + + phlex::index_generator indices() override { co_return; } + }; +} + +PHLEX_REGISTER_SOURCE(s, config) { - using namespace phlex; - s.provide( - "provide_max_parallelism", - [](data_cell_index const&) { return experimental::max_allowed_parallelism::active_value(); }) - .output_product("input", "max_parallelism", "job"); + s.source(config.get("module_label")); } diff --git a/test/mock-workflow/CMakeLists.txt b/test/mock-workflow/CMakeLists.txt index 53a7b2340..b5a860872 100644 --- a/test/mock-workflow/CMakeLists.txt +++ b/test/mock-workflow/CMakeLists.txt @@ -6,7 +6,7 @@ target_include_directories(algorithm INTERFACE ${PROJECT_SOURCE_DIR}) target_link_libraries(algorithm INTERFACE timed_busy phlex::module spdlog::spdlog) add_library(id_provider MODULE id_provider.cpp) -target_link_libraries(id_provider PRIVATE phlex::module) +target_link_libraries(id_provider PRIVATE phlex::source) function(add_module NAME) add_library(${NAME} MODULE ${NAME}.cpp) diff --git a/test/plugins/CMakeLists.txt b/test/plugins/CMakeLists.txt index 381fe4166..623ef3934 100644 --- a/test/plugins/CMakeLists.txt +++ b/test/plugins/CMakeLists.txt @@ -2,7 +2,7 @@ add_library(module MODULE module.cpp) target_link_libraries(module PRIVATE phlex::module) add_library(ij_source MODULE ij_source.cpp) -target_link_libraries(ij_source PRIVATE phlex::module) +target_link_libraries(ij_source PRIVATE phlex::source) add_library(output MODULE output.cpp) target_link_libraries(output PRIVATE phlex::module) diff --git a/test/provider_test.cpp b/test/provider_test.cpp index 7384f47fd..cdf8faf53 100644 --- a/test/provider_test.cpp +++ b/test/provider_test.cpp @@ -1,4 +1,5 @@ #include "phlex/core/framework_graph.hpp" +#include "phlex/core/source.hpp" #include "phlex/model/data_cell_index.hpp" #include "plugins/layer_generator.hpp" @@ -8,11 +9,13 @@ #include "spdlog/spdlog.h" using namespace phlex; +using Catch::Matchers::ContainsSubstring; namespace toy { struct VertexCollection { std::size_t data; }; + auto make_collection(std::size_t i) { return VertexCollection{i}; } } namespace { @@ -20,47 +23,128 @@ namespace { toy::VertexCollection give_me_vertices(data_cell_index const& id) { spdlog::info("give_me_vertices: {}", id.number()); - return toy::VertexCollection{id.number()}; + return toy::make_collection(id.number()); } -} -namespace { + // Type-erased provider function + experimental::product_ptr give_me_vertices_erased(data_cell_index const& id) + { + spdlog::info("give_me_vertices_erased: {}", id.number()); + return std::make_unique>( + toy::make_collection(id.number())); + } + + // Vertices source for implicit provider test + class vertices_source : public experimental::source { + public: + experimental::provider_bundles create_providers(product_selector const& selector) override + { + using namespace experimental; + provider_bundles bundles; + std::string const layer = "spill"; + std::string const stage = "previous_process"; + product_specification spec{ + "vertices_maker", "happy_vertices", make_type_id()}; + + if (selector.match(spec, identifier{layer}, identifier{stage})) { + bundles.push_back(provider_bundle{.provider_function = give_me_vertices_erased, + .max_concurrency = concurrency::unlimited, + .spec = std::move(spec), + .layer = layer, + .stage = stage}); + } + return bundles; + } + index_generator indices() override { co_return; } + }; + unsigned pass_on(toy::VertexCollection const& vertices) { return vertices.data; } } -TEST_CASE("provider_test") +TEST_CASE("Explicit providers") { - constexpr auto max_events{3u}; + constexpr auto num_spills{3u}; experimental::layer_generator gen; - gen.add_layer("spill", {"job", max_events, 1u}); + gen.add_layer("spill", {"job", num_spills, 1u}); experimental::framework_graph g{driver_for_test(gen)}; g.provide("my_name_here", give_me_vertices, concurrency::unlimited) - .output_product("input", "happy_vertices", "spill"); + .output_product("vertices_maker", "happy_vertices", "spill"); g.transform("passer", pass_on, concurrency::unlimited) .input_family( - product_selector{.creator = "input", .layer = "spill", .suffix = "happy_vertices"}); + product_selector{.creator = "vertices_maker", .layer = "spill", .suffix = "happy_vertices"}); g.execute(); - CHECK(g.execution_count("passer") == max_events); - CHECK(g.execution_count("my_name_here") == max_events); + CHECK(g.execution_count("passer") == num_spills); + CHECK(g.execution_count("my_name_here") == num_spills); +} + +TEST_CASE("Implicit providers") +{ + constexpr auto num_spills{3u}; + + experimental::layer_generator gen; + gen.add_layer("spill", {"job", num_spills, 1u}); + + experimental::framework_graph g{driver_for_test(gen)}; + + g.source("vertices_source"); + + g.transform("passer", pass_on, concurrency::unlimited) + .input_family( + product_selector{.creator = "vertices_maker", .layer = "spill", .suffix = "happy_vertices"}); + + g.execute(); + + CHECK(g.execution_count("vertices_maker") == num_spills); + CHECK(g.execution_count("passer") == num_spills); +} + +TEST_CASE("Throw when two sources with the same name are registered") +{ + experimental::framework_graph g; + g.source("vertices_source"); + g.source("vertices_source"); + + CHECK_THROWS_WITH(g.execute(), + ContainsSubstring("Source with name 'vertices_source' already exists")); +} + +TEST_CASE("Throw when two implicit providers are found for the same product") +{ + experimental::framework_graph g; + + // Register two sources that can provide the same product + g.source("vertices_source_1"); + g.source("vertices_source_2"); + + g.transform("passer", pass_on, concurrency::unlimited) + .input_family( + product_selector{.creator = "vertices_maker", .layer = "spill", .suffix = "happy_vertices"}); + + CHECK_THROWS_WITH( + g.execute(), + ContainsSubstring( + "Multiple implicit providers found for product 'vertices_maker/happy_vertices") && + ContainsSubstring("spill") && ContainsSubstring("passer")); } TEST_CASE("Throw when no provider found for required product") { experimental::framework_graph g; - // Register an observer that needs a product from a creator that does not exist - // in the graph. Since there is no matching provider, make_computational_edges - // should throw "No provider found for product...". + // Register an observer that needs a product from a creator that does not exist in the graph. + // Since there is no matching provider, make_computational_edges should throw listing all + // unmatched products. g.observe( "observer", [](unsigned int const) {}, concurrency::unlimited) .input_family(product_selector{.creator = "nonexistent_creator", .layer = "job"}); CHECK_THROWS_WITH(g.execute(), - Catch::Matchers::ContainsSubstring("No provider found for product")); + ContainsSubstring("No provider found for the following required products:") && + ContainsSubstring("nonexistent_creator") && ContainsSubstring("job")); } diff --git a/test/python/CMakeLists.txt b/test/python/CMakeLists.txt index aa79543c4..d91ed37d8 100644 --- a/test/python/CMakeLists.txt +++ b/test/python/CMakeLists.txt @@ -171,7 +171,7 @@ endif() # C++ helper to provide a driver add_library(cppsource4py MODULE source.cpp) -target_link_libraries(cppsource4py PRIVATE phlex::module) +target_link_libraries(cppsource4py PRIVATE phlex::source) # phlex-based tests (no cppyy dependency) add_test(NAME py:add COMMAND phlex::phlex -c ${CMAKE_CURRENT_SOURCE_DIR}/pyadd.jsonnet)