diff --git a/src/vt/collective/reduce/reduce.impl.h b/src/vt/collective/reduce/reduce.impl.h index ccd37b57c4..d3865e1ff7 100644 --- a/src/vt/collective/reduce/reduce.impl.h +++ b/src/vt/collective/reduce/reduce.impl.h @@ -78,7 +78,7 @@ void Reduce::reduceRootRecv(MsgT* msg) { auto const from_node = theContext()->getFromNodeCurrentTask(); auto m = promoteMsg(msg); - runnable::makeRunnable(m, false, handler, from_node) + runnable::makeRunnable(std::move(m), false, handler, from_node) .withTDEpochFromMsg() .run(); } @@ -216,6 +216,7 @@ void Reduce::startReduce(detail::ReduceStamp id, bool use_num_contrib) { ); if (ready) { + auto saved_msg=state.msgs[0]; // Combine messages if (state.msgs.size() > 1) { auto size = state.msgs.size(); @@ -256,14 +257,14 @@ void Reduce::startReduce(detail::ReduceStamp id, bool use_num_contrib) { // this needs to run inline.. threaded not allowed for reduction // combination - runnable::makeRunnable(state.msgs[0], false, handler, from_node) + runnable::makeRunnable(std::move(state.msgs[0]), false, handler, from_node) .withTDEpochFromMsg() .run(); } // Send to parent // Collection is of MsgPtr, re-type and drop collection owner. - auto msg = state.msgs[0]; + auto msg = saved_msg; MsgPtr typed_msg = msg.template to(); state.msgs.clear(); state.num_contrib_ = 1; diff --git a/src/vt/messaging/active.cc b/src/vt/messaging/active.cc index 9ab3235191..be44cc4e58 100644 --- a/src/vt/messaging/active.cc +++ b/src/vt/messaging/active.cc @@ -500,7 +500,7 @@ EventType ActiveMessenger::doMessageSend( if (is_obj) { objgroup::dispatchObjGroup(base, han, dest, nullptr); } else { - runnable::makeRunnable(base, true, envelopeGetHandler(msg->env), dest) + runnable::makeRunnable(std::move(base), true, envelopeGetHandler(msg->env), dest) .withTDEpochFromMsg(is_term) .withLBData(&bare_handler_lb_data_, bare_handler_dummy_elm_id_for_lb_data_) .enqueue(); @@ -963,7 +963,8 @@ void ActiveMessenger::prepareActiveMsgToRun( if (is_obj) { objgroup::dispatchObjGroup(base, handler, from_node, cont); } else { - runnable::makeRunnable(base, not is_term, handler, from_node) + auto m = base; + runnable::makeRunnable(std::move(m), not is_term, handler, from_node) .withContinuation(cont) .withTDEpochFromMsg(is_term) .withLBData(&bare_handler_lb_data_, bare_handler_dummy_elm_id_for_lb_data_) diff --git a/src/vt/messaging/message/smart_ptr.h b/src/vt/messaging/message/smart_ptr.h index b7d69eabf9..fd0312d8e4 100644 --- a/src/vt/messaging/message/smart_ptr.h +++ b/src/vt/messaging/message/smart_ptr.h @@ -155,6 +155,11 @@ struct MsgSharedPtr final { /*N.B. retain ORIGINAL-type implementation*/ impl_); } + template + MsgSharedPtr* reinterpretAs() { + return reinterpret_cast*>(this); + } + /// [obsolete] Use to() as MsgVirtualPtr <-> MsgSharedPtr. /// Both methods are equivalent in function. template diff --git a/src/vt/messaging/pending_send.cc b/src/vt/messaging/pending_send.cc index d19ae2401b..5416423425 100644 --- a/src/vt/messaging/pending_send.cc +++ b/src/vt/messaging/pending_send.cc @@ -59,17 +59,23 @@ PendingSend::PendingSend(PendingSend&& in) noexcept std::swap(msg_, in.msg_); std::swap(epoch_action_, in.epoch_action_); std::swap(send_action_, in.send_action_); + std::swap(send_move_action_, in.send_move_action_); } void PendingSend::sendMsg() { - if (send_action_ == nullptr) { + if (send_action_ == nullptr and send_move_action_ == nullptr) { theMsg()->doMessageSend(msg_); } else { - send_action_(msg_); + if (send_action_) { + send_action_(msg_); + } else { + send_move_action_(std::move(msg_)); + } } consumeMsg(); msg_ = nullptr; send_action_ = nullptr; + send_move_action_ = nullptr; } EpochType PendingSend::getProduceEpochFromMsg() const { @@ -95,7 +101,7 @@ void PendingSend::consumeMsg() { } void PendingSend::release() { - bool send_msg = msg_ != nullptr || send_action_ != nullptr; + bool send_msg = msg_ != nullptr || send_action_ != nullptr || send_move_action_ != nullptr; vtAssert(!send_msg || !epoch_action_, "cannot have both a message and epoch action"); if (send_msg) { sendMsg(); diff --git a/src/vt/messaging/pending_send.h b/src/vt/messaging/pending_send.h index 8c83be82bc..aee00f2f2d 100644 --- a/src/vt/messaging/pending_send.h +++ b/src/vt/messaging/pending_send.h @@ -70,6 +70,7 @@ namespace vt { namespace messaging { struct PendingSend final { /// Function for complex action on send---takes a message to operate on using SendActionType = std::function&)>; + using SendActionMoveType = std::function&&)>; using EpochActionType = std::function; /** @@ -91,6 +92,21 @@ struct PendingSend final { produceMsg(); } + /** + * \brief Construct a pending send. + * + * \param[in] in_msg the message to send + * \param[in] in_move_action the action to run, where the msg is moved. + */ + PendingSend( + MsgSharedPtr&& in_msg, + SendActionMoveType in_move_action = nullptr + ) : msg_(std::move(in_msg)) + , send_move_action_(in_move_action) + { + produceMsg(); + } + /** * \brief Construct a pending send that invokes a callback. * @@ -182,6 +198,7 @@ struct PendingSend final { private: MsgPtr msg_ = nullptr; SendActionType send_action_ = {}; + SendActionMoveType send_move_action_ = {}; EpochActionType epoch_action_ = {}; EpochType epoch_produced_ = no_epoch; }; diff --git a/src/vt/objgroup/manager.static.h b/src/vt/objgroup/manager.static.h index ac24a78aec..a9b3052668 100644 --- a/src/vt/objgroup/manager.static.h +++ b/src/vt/objgroup/manager.static.h @@ -93,7 +93,7 @@ decltype(auto) invoke( auto const& elm_id = holder->getElmID(); auto elm = holder->getPtr(); auto lb_data = &holder->getLBData(); - return runnable::makeRunnableVoid(false, han, this_node) + runnable::makeRunnableVoid(false, han, this_node) .withObjGroup(elm) .withLBData(lb_data, elm_id) .runLambda(f, static_cast(elm), msg.get()); @@ -108,13 +108,13 @@ namespace detail { template void dispatchImpl( - MsgSharedPtr const& msg, HandlerType han, NodeType from_node, + MsgSharedPtr msg, HandlerType han, NodeType from_node, ActionType cont, ObjT* obj ) { auto holder = detail::getHolderBase(han); auto const& elm_id = holder->getElmID(); auto lb_data = &holder->getLBData(); - runnable::makeRunnable(msg, true, han, from_node) + runnable::makeRunnable(std::move(msg), true, han, from_node) .withContinuation(cont) .withObjGroup(obj) .withLBData(lb_data, elm_id) diff --git a/src/vt/pipe/callback/handler_send/callback_send.impl.h b/src/vt/pipe/callback/handler_send/callback_send.impl.h index fd172c0eab..0e97bd1e98 100644 --- a/src/vt/pipe/callback/handler_send/callback_send.impl.h +++ b/src/vt/pipe/callback/handler_send/callback_send.impl.h @@ -110,7 +110,7 @@ CallbackSend::triggerDispatch(SignalDataType* data, PipeType const& pid) { if (this_node == send_node_) { auto msg = reinterpret_cast(data); auto m = promoteMsg(msg); - runnable::makeRunnable(m, true, handler_, this_node) + runnable::makeRunnable(std::move(m), true, handler_, this_node) .withTDEpochFromMsg() .enqueue(); } else { diff --git a/src/vt/pipe/callback/handler_send/callback_send_tl.impl.h b/src/vt/pipe/callback/handler_send/callback_send_tl.impl.h index 91c0581ba0..a108960c94 100644 --- a/src/vt/pipe/callback/handler_send/callback_send_tl.impl.h +++ b/src/vt/pipe/callback/handler_send/callback_send_tl.impl.h @@ -71,7 +71,7 @@ void CallbackSendTypeless::trigger(MsgT* msg, PipeType const& pipe) { ); auto pmsg = promoteMsg(msg); if (this_node == send_node_) { - runnable::makeRunnable(pmsg, true, handler_, this_node) + runnable::makeRunnable(std::move(pmsg), true, handler_, this_node) .withTDEpochFromMsg() .enqueue(); } else { diff --git a/src/vt/runnable/make_runnable.h b/src/vt/runnable/make_runnable.h index e3b713b83e..632519aa91 100644 --- a/src/vt/runnable/make_runnable.h +++ b/src/vt/runnable/make_runnable.h @@ -67,19 +67,18 @@ struct RunnableMaker { * \internal \brief Construct the builder. Shall not be called directly. * * \param[in] in_impl the runnable - * \param[in] in_msg the associated message + * \param[in] in_has_msg whether we have a message * \param[in] in_handler the handler * \param[in] in_han_type the type of handler * \param[in] in_from_node the from node for the runnable */ RunnableMaker( - RunnableNew* in_impl, MsgSharedPtr const& in_msg, + RunnableNew* in_impl, bool in_has_msg, HandlerType in_handler, NodeType in_from_node ) : impl_(in_impl), - msg_(in_msg), + has_msg_(in_has_msg), handler_(in_handler), - from_node_(in_from_node), - has_msg_(in_msg != nullptr) + from_node_(in_from_node) { } RunnableMaker(RunnableMaker const&) = delete; RunnableMaker(RunnableMaker&&) = default; @@ -119,7 +118,7 @@ struct RunnableMaker { RunnableMaker&& withTDEpochFromMsg(bool is_term = false) { is_term_ = is_term; if (not is_term) { - impl_->addContextTD(msg_); + impl_->addContextTD(impl_->getMsg()); } return std::move(*this); } @@ -219,7 +218,7 @@ struct RunnableMaker { template RunnableMaker&& withLBData(ElmT* elm) { #if vt_check_enabled(lblite) - impl_->addContextLB(elm, msg_.get()); + impl_->addContextLB(elm, reinterpret_cast(impl_->getMsg().get())); #endif return std::move(*this); } @@ -239,7 +238,7 @@ struct RunnableMaker { uint64_t idx1, uint64_t idx2, uint64_t idx3, uint64_t idx4 ) { impl_->addContextTrace( - msg_, trace_event, handler_, from_node_, idx1, idx2, idx3, idx4 + impl_->getMsg(), trace_event, handler_, from_node_, idx1, idx2, idx3, idx4 ); return std::move(*this); } @@ -309,13 +308,12 @@ struct RunnableMaker { private: RunnableNew* impl_ = nullptr; - MsgSharedPtr const& msg_; + bool has_msg_ = false; HandlerType handler_ = uninitialized_handler; bool set_handler_ = false; NodeType from_node_ = uninitialized_destination; bool is_done_ = false; bool is_term_ = false; - bool has_msg_ = true; }; /** @@ -331,19 +329,19 @@ struct RunnableMaker { */ template RunnableMaker makeRunnable( - MsgSharedPtr const& msg, bool is_threaded, HandlerType handler, NodeType from + MsgSharedPtr&& msg, bool is_threaded, HandlerType handler, NodeType from ) { - auto r = new RunnableNew(msg, is_threaded); + auto r = new RunnableNew(std::move(msg), is_threaded); #if vt_check_enabled(trace_enabled) auto const han_type = HandlerManager::getHandlerRegistryType(handler); if (han_type == auto_registry::RegistryTypeEnum::RegVrt or han_type == auto_registry::RegistryTypeEnum::RegGeneral or han_type == auto_registry::RegistryTypeEnum::RegObjGroup) { - r->addContextTrace(msg, handler, from); + r->addContextTrace(r->getMsg(), handler, from); } #endif r->addContextSetContext(r, from); - return RunnableMaker{r, msg, handler, from}; + return RunnableMaker{r, true, handler, from}; } /** @@ -362,7 +360,7 @@ inline RunnableMaker makeRunnableVoid( auto r = new RunnableNew(is_threaded); // @todo: figure out how to trace this? r->addContextSetContext(r, from); - return RunnableMaker{r, nullptr, handler, from}; + return RunnableMaker{r, false, handler, from}; } }} /* end namespace vt::runnable */ diff --git a/src/vt/runnable/runnable.h b/src/vt/runnable/runnable.h index 3151a4fe43..e39e4453fd 100644 --- a/src/vt/runnable/runnable.h +++ b/src/vt/runnable/runnable.h @@ -106,8 +106,8 @@ struct RunnableNew { * \param[in] in_is_threaded whether the handler can be run with a thread */ template - RunnableNew(MsgSharedPtr const& in_msg, bool in_is_threaded) - : msg_(in_msg.template to()) + RunnableNew(MsgSharedPtr&& in_msg, bool in_is_threaded) + : msg_(std::move(*in_msg.template reinterpretAs())) #if vt_check_enabled(fcontext) , is_threaded_(in_is_threaded) #endif @@ -130,6 +130,8 @@ struct RunnableNew { RunnableNew& operator=(RunnableNew&&) = default; public: + MsgSharedPtr const& getMsg() { return msg_; } + /** * \brief Add a new \c SetContext for this handler * diff --git a/src/vt/serialization/messaging/serialized_messenger.impl.h b/src/vt/serialization/messaging/serialized_messenger.impl.h index 5b9c296ae0..526aa1af47 100644 --- a/src/vt/serialization/messaging/serialized_messenger.impl.h +++ b/src/vt/serialization/messaging/serialized_messenger.impl.h @@ -95,7 +95,7 @@ template user_msg.template to(), handler, sys_msg->from_node, nullptr ); } else { - runnable::makeRunnable(user_msg, true, handler, sys_msg->from_node) + runnable::makeRunnable(std::move(user_msg), true, handler, sys_msg->from_node) .withTDEpochFromMsg() .enqueue(); } @@ -146,7 +146,7 @@ template msg.template to(), handler, node, action ); } else { - runnable::makeRunnable(msg, true, handler, node) + runnable::makeRunnable(std::move(msg), true, handler, node) .withTDEpoch(epoch, not is_valid_epoch) .withContinuation(action) .enqueue(); @@ -195,7 +195,7 @@ template user_msg.template to(), handler, sys_msg->from_node, nullptr ); } else { - runnable::makeRunnable(user_msg, true, handler, sys_msg->from_node) + runnable::makeRunnable(std::move(user_msg), true, handler, sys_msg->from_node) .withTDEpochFromMsg() .enqueue(); } @@ -428,14 +428,14 @@ template ); auto base_msg = user_msg.template to(); - return messaging::PendingSend(base_msg, [=](MsgPtr in) { + return messaging::PendingSend(std::move(base_msg), [=](MsgPtr&& in) mutable { bool const is_obj = HandlerManager::isHandlerObjGroup(typed_handler); if (is_obj) { objgroup::dispatchObjGroup( user_msg.template to(), typed_handler, node, nullptr ); } else { - runnable::makeRunnable(user_msg, true, typed_handler, node) + runnable::makeRunnable(std::move(user_msg), true, typed_handler, node) .withTDEpochFromMsg() .enqueue(); } diff --git a/src/vt/topos/location/location.h b/src/vt/topos/location/location.h index 1e465e0e34..ff874fb758 100644 --- a/src/vt/topos/location/location.h +++ b/src/vt/topos/location/location.h @@ -96,7 +96,7 @@ struct EntityLocationCoord : LocationCoord { using LocRecType = LocRecord; using LocCacheType = LocLookup; using LocEntityMsg = LocEntity; - using LocalRegisteredContType = std::unordered_set; + using LocalRegisteredContType = std::unordered_map; using LocalRegisteredMsgContType = std::unordered_map; using ActionListType = std::list; using PendingType = PendingLocationLookup; @@ -138,10 +138,12 @@ struct EntityLocationCoord : LocationCoord { * \param[in] home the home node for this entity * \param[in] msg_action function to trigger when message arrives for it * \param[in] migrated whether it migrated in: \c entityEmigrated is preferred + * \param[in] obj pointer to the object associated with this entity */ void registerEntity( EntityID const& id, NodeType const& home, - LocMsgActionType msg_action = nullptr, bool const& migrated = false + LocMsgActionType msg_action = nullptr, bool const& migrated = false, + void* obj = nullptr ); /** @@ -191,11 +193,13 @@ struct EntityLocationCoord : LocationCoord { * \param[in] id the entity ID * \param[in] home_node the home node for the entity * \param[in] msg_action function to trigger when message arrives for it + * \param[in] obj pointer to the object associated with this entity */ void entityImmigrated( EntityID const& id, NodeType const& home_node, NodeType const& __attribute__((unused)) from_node, - LocMsgActionType msg_action = nullptr + LocMsgActionType msg_action = nullptr, + void* obj = nullptr ); /** @@ -240,7 +244,7 @@ struct EntityLocationCoord : LocationCoord { template *f> void routeMsgHandler( EntityID const& id, NodeType const& home_node, - MsgSharedPtr const& msg + MsgSharedPtr&& msg ); /** @@ -248,8 +252,8 @@ struct EntityLocationCoord : LocationCoord { * * \param[in] m message shared pointer */ - template - void routePreparedMsgHandler(MsgSharedPtr const& msg); + template *f> + void routePreparedMsgHandler(MsgSharedPtr&& msg); /** * \brief Route a message with a custom handler @@ -257,15 +261,16 @@ struct EntityLocationCoord : LocationCoord { * \param[in] m message shared pointer */ template - void routePreparedMsg(MsgSharedPtr const& msg); + void routePreparedMsg(MsgSharedPtr&& msg); /** * \brief Route a message with a custom handler where the element is local * * \param[in] m message shared pointer + * \param[in] obj the object pointer */ - template - void routeMsgHandlerLocal(MsgSharedPtr const& msg); + template *f> + void routeMsgHandlerLocal(MsgSharedPtr&& msg, void* obj); /** * \brief Route a message to the default handler @@ -278,7 +283,7 @@ struct EntityLocationCoord : LocationCoord { template void routeMsg( EntityID const& id, NodeType const& home_node, - MsgSharedPtr const& msg, + MsgSharedPtr&& msg, NodeType from_node = uninitialized_destination ); @@ -405,7 +410,7 @@ struct EntityLocationCoord : LocationCoord { template void routeMsgEager( EntityID const& id, NodeType const& home_node, - MsgSharedPtr const& msg + MsgSharedPtr&& msg ); /** @@ -419,7 +424,7 @@ struct EntityLocationCoord : LocationCoord { template void routeMsgNode( EntityID const& id, NodeType const& home_node, NodeType const& to_node, - MsgSharedPtr const& msg + MsgSharedPtr&& msg ); /** @@ -441,6 +446,8 @@ struct EntityLocationCoord : LocationCoord { */ LocInstType getInst() const; + void* getObjContext() const { return obj_context_; } + private: LocInstType this_inst = no_loc_inst; @@ -461,6 +468,9 @@ struct EntityLocationCoord : LocationCoord { // List of nodes that inquire about an entity that require an update LocAsksType loc_asks_; + + /// Current object pointer context + void* obj_context_ = nullptr; }; }} // end namespace vt::location diff --git a/src/vt/topos/location/location.impl.h b/src/vt/topos/location/location.impl.h index f3b195c3c1..8d4fdc3deb 100644 --- a/src/vt/topos/location/location.impl.h +++ b/src/vt/topos/location/location.impl.h @@ -95,7 +95,7 @@ template template void EntityLocationCoord::registerEntity( EntityID const& id, NodeType const& home, LocMsgActionType msg_action, - bool const& migrated + bool const& migrated, void* obj ) { auto const& this_node = theContext()->getNode(); auto reg_iter = local_registered_.find(id); @@ -112,7 +112,7 @@ void EntityLocationCoord::registerEntity( this_inst, home, migrated, id ); - local_registered_.insert(id); + local_registered_[id] = obj; recs_.insert(id, home, LocRecType{id, eLocState::Local, this_node}); @@ -260,11 +260,11 @@ void EntityLocationCoord::entityEmigrated( template void EntityLocationCoord::entityImmigrated( EntityID const& id, NodeType const& home_node, NodeType const& from, - LocMsgActionType msg_action + LocMsgActionType msg_action, void* obj ) { // @todo: currently `from' is unused, but is passed to this method in case we // need it in the future - return registerEntity(id, home_node, msg_action, true); + return registerEntity(id, home_node, msg_action, true, obj); } template @@ -337,7 +337,7 @@ template template void EntityLocationCoord::routeMsgEager( EntityID const& id, NodeType const& home_node, - MsgSharedPtr const& msg + MsgSharedPtr&& msg ) { auto const& this_node = theContext()->getNode(); NodeType route_to_node = uninitialized_destination; @@ -387,7 +387,7 @@ void EntityLocationCoord::routeMsgEager( home_node, route_to_node, id ); - return routeMsgNode(id, home_node, route_to_node, msg); + return routeMsgNode(id, home_node, route_to_node, std::move(msg)); } template @@ -505,7 +505,7 @@ template template void EntityLocationCoord::routeMsgNode( EntityID const& id, NodeType const& home_node, NodeType const& to_node, - MsgSharedPtr const& msg + MsgSharedPtr&& msg ) { auto const& this_node = theContext()->getNode(); auto const epoch = theMsg()->getEpochContextMsg(msg); @@ -549,23 +549,27 @@ void EntityLocationCoord::routeMsgNode( theTerm()->produce(epoch); - auto trigger_msg_handler_action = [=](EntityID const& hid) { - bool const& has_handler = msg->hasHandler(); - auto const& from = msg->getLocFromNode(); + auto trigger_msg_handler_action = [=](EntityID const& hid, MsgSharedPtr&& m) mutable { + bool const& has_handler = m->hasHandler(); + auto const& from = m->getLocFromNode(); + auto ask_node = m->getAskNode(); if (has_handler) { - auto const handler = msg->getHandler(); + auto const handler = m->getHandler(); vt_debug_print( verbose, location, "EntityLocationCoord: apply direct handler action: " "id={}, from={}, handler={}, ref={}\n", - hid, from, handler, envelopeGetRef(msg->env) + hid, from, handler, envelopeGetRef(m->env) ); - runnable::makeRunnable(msg, true, handler, from) + obj_context_ = local_registered_.find(hid)->second; + runnable::makeRunnable(std::move(m), true, handler, from) .withTDEpochFromMsg() .run(); - } else { + obj_context_ = nullptr; + } + else { auto reg_han_iter = local_registered_msg_han_.find(hid); vtAssert( reg_han_iter != local_registered_msg_han_.end(), @@ -575,11 +579,9 @@ void EntityLocationCoord::routeMsgNode( verbose, location, "EntityLocationCoord: no direct handler: id={}\n", hid ); - reg_han_iter->second.applyRegisteredActionMsg(msg.get()); + reg_han_iter->second.applyRegisteredActionMsg(m.get()); } - auto ask_node = msg->getAskNode(); - if (ask_node != uninitialized_destination) { auto delivered_node = theContext()->getNode(); sendEagerUpdate(hid, ask_node, home_node, delivered_node); @@ -602,7 +604,7 @@ void EntityLocationCoord::routeMsgNode( ); theMsg()->pushEpoch(epoch); - trigger_msg_handler_action(id); + trigger_msg_handler_action(id, std::move(msg)); theMsg()->popEpoch(epoch); theTerm()->consume(epoch); } else { @@ -614,7 +616,7 @@ void EntityLocationCoord::routeMsgNode( EntityID id_ = id; // buffer the message here, the entity will be registered in the future - insertPendingEntityAction(id_, [=](NodeType resolved) { + insertPendingEntityAction(id_, [=](NodeType resolved) mutable { auto const& my_node = theContext()->getNode(); vt_debug_print( @@ -626,14 +628,14 @@ void EntityLocationCoord::routeMsgNode( theMsg()->pushEpoch(epoch); if (resolved == my_node) { - trigger_msg_handler_action(id_); + trigger_msg_handler_action(id_, std::move(msg)); } else { /* * Recurse with the new updated node information. This occurs * typically when an non-migrated registration occurs off the home * node and messages are buffered, awaiting forwarding information. */ - routeMsgNode(id_, home_node, resolved,msg); + routeMsgNode(id_, home_node, resolved, std::move(msg)); } theMsg()->popEpoch(epoch); theTerm()->consume(epoch); @@ -655,22 +657,23 @@ template template *f> void EntityLocationCoord::routeMsgHandler( EntityID const& id, NodeType const& home_node, - MsgSharedPtr const& msg + MsgSharedPtr&& msg ) { setupMessageForRouting(id, home_node, msg); - routePreparedMsgHandler(msg); + routePreparedMsgHandler(std::move(msg)); } template -template +template *f> void EntityLocationCoord::routePreparedMsgHandler( - MsgSharedPtr const& msg + MsgSharedPtr&& msg ) { - if (local_registered_.find(msg->getEntity()) == local_registered_.end()) { - return routePreparedMsg(msg); + auto iter = local_registered_.find(msg->getEntity()); + if (iter == local_registered_.end()) { + return routePreparedMsg(std::move(msg)); } else { - return routeMsgHandlerLocal(msg); + return routeMsgHandlerLocal(std::move(msg), iter->second); } } @@ -698,19 +701,19 @@ void EntityLocationCoord::setupMessageForRouting( } template -template +template *f> void EntityLocationCoord::routeMsgHandlerLocal( - MsgSharedPtr const& msg + MsgSharedPtr&& msg, void* obj ) { - runnable::makeRunnable(msg, true, msg->getHandler(), theContext()->getNode()) - .withTDEpochFromMsg() - .run(); + obj_context_ = obj; + f(msg.get()); + obj_context_ = nullptr; } template template void EntityLocationCoord::routePreparedMsg( - MsgSharedPtr const& msg + MsgSharedPtr&& msg ) { auto const msg_size = sizeof(*msg); bool const use_eager = useEagerProtocol(msg); @@ -727,15 +730,17 @@ void EntityLocationCoord::routePreparedMsg( if (use_eager) { theMsg()->pushEpoch(epoch); - routeMsgEager(msg->getEntity(), msg->getHomeNode(), msg); + routeMsgEager(msg->getEntity(), msg->getHomeNode(), std::move(msg)); theMsg()->popEpoch(epoch); } else { theTerm()->produce(epoch); // non-eager protocol: get location first then send message after resolution - getLocation(msg->getEntity(), msg->getHomeNode(), [=](NodeType node) { + auto entity = msg->getEntity(); + auto home_node = msg->getHomeNode(); + getLocation(entity, home_node, [this, epoch, m = std::move(msg)](NodeType node) mutable { theMsg()->pushEpoch(epoch); routeMsgNode( - msg->getEntity(), msg->getHomeNode(), node, msg + m->getEntity(), m->getHomeNode(), node, std::move(m) ); theMsg()->popEpoch(epoch); theTerm()->consume(epoch); @@ -747,7 +752,7 @@ template template void EntityLocationCoord::routeMsg( EntityID const& id, NodeType const& home_node, - MsgSharedPtr const& msg, NodeType from_node + MsgSharedPtr&& msg, NodeType from_node ) { auto const from = from_node == uninitialized_destination ? theContext()->getNode() : @@ -758,7 +763,7 @@ void EntityLocationCoord::routeMsg( msg->setHomeNode(home_node); msg->setLocFromNode(from); - routePreparedMsg(msg); + routePreparedMsg(std::move(msg)); } template @@ -842,9 +847,9 @@ template theTerm()->produce(epoch); LocationManager::applyInstance>( - inst, [=](EntityLocationCoord* loc) { + inst, [=](EntityLocationCoord* loc) mutable { theMsg()->pushEpoch(epoch); - loc->routeMsg(entity_id, home_node, msg, from_node); + loc->routeMsg(entity_id, home_node, std::move(msg), from_node); theMsg()->popEpoch(epoch); theTerm()->consume(epoch); } diff --git a/src/vt/vrt/collection/manager.impl.h b/src/vt/vrt/collection/manager.impl.h index 3a1a965fc6..904495b40d 100644 --- a/src/vt/vrt/collection/manager.impl.h +++ b/src/vt/vrt/collection/manager.impl.h @@ -219,8 +219,7 @@ template #endif auto m = promoteMsg(msg); - - runnable::makeRunnable(m, true, han, from) + runnable::makeRunnable(std::move(m), true, han, from) .withTDEpoch(theMsg()->getEpochContextMsg(msg)) .withCollection(base) #if vt_check_enabled(trace_enabled) @@ -292,32 +291,16 @@ template auto const& col = entity_proxy.getCollectionProxy(); auto const& elm = entity_proxy.getElementProxy(); auto const& idx = elm.getIndex(); - auto elm_holder = theCollection()->findElmHolder(col); - - bool const exists = elm_holder->exists(idx); - - vt_debug_print( - terse, vrt_coll, - "collectionMsgTypedHandler: exists={}, idx={}, cur_epoch={:x}\n", - exists, idx, cur_epoch - ); - - vtAssertInfo(exists, "Proxy must exist", cur_epoch, idx); - - auto& inner_holder = elm_holder->lookup(idx); - auto const sub_handler = col_msg->getVrtHandler(); - auto const col_ptr = inner_holder.getRawPtr(); vt_debug_print( - verbose, vrt_coll, - "collectionMsgTypedHandler: sub_handler={}\n", sub_handler + terse, vrt_coll, + "collectionMsgTypedHandler: idx={}, cur_epoch={:x}, sub_handler={}\n", + idx, cur_epoch, sub_handler ); - vtAssertInfo( - col_ptr != nullptr, "Must be valid pointer", - sub_handler, HandlerManager::isHandlerMember(sub_handler), cur_epoch, idx, exists - ); + auto lm = theLocMan()->getCollectionLM(col); + auto obj = reinterpret_cast*>(lm->getObjContext()); // Dispatch the handler after pushing the contextual epoch theMsg()->pushEpoch(cur_epoch); @@ -328,7 +311,7 @@ template trace_event = col_msg->getFromTraceEvent(); #endif collectionAutoMsgDeliver( - msg, col_ptr, sub_handler, from, trace_event, false + msg, obj, sub_handler, from, trace_event, false ); theMsg()->popEpoch(cur_epoch); } @@ -1132,10 +1115,10 @@ messaging::PendingSend CollectionManager::sendMsgUntypedHandler( >(idx, home_node, msg); return messaging::PendingSend{ - msg, [](MsgSharedPtr& inner_msg){ - auto typed_msg = inner_msg.template to(); - auto lm2 = theLocMan()->getCollectionLM(typed_msg->getLocInst()); - lm2->template routePreparedMsgHandler(typed_msg); + std::move(*(msg.template reinterpretAs())), [](MsgSharedPtr&& inner_msg){ + MsgSharedPtr* typed_msg = inner_msg.template reinterpretAs(); + auto lm2 = theLocMan()->getCollectionLM((*typed_msg)->getLocInst()); + lm2->template routePreparedMsgHandler>(std::move(*typed_msg)); } }; } @@ -1179,6 +1162,7 @@ bool CollectionManager::insertCollectionElement( ); if (!destroyed) { + void* obj_ptr = vc.get(); elm_holder->insert(idx, typename Holder::InnerHolder{ std::move(vc) }); @@ -1186,7 +1170,8 @@ bool CollectionManager::insertCollectionElement( if (is_migrated_in) { theLocMan()->getCollectionLM(proxy)->entityImmigrated( idx, home_node, migrated_from, - CollectionManager::collectionMsgHandler + CollectionManager::collectionMsgHandler, + obj_ptr ); elm_holder->applyListeners( listener::ElementEventEnum::ElementMigratedIn, idx, home_node @@ -1194,7 +1179,8 @@ bool CollectionManager::insertCollectionElement( } else { theLocMan()->getCollectionLM(proxy)->registerEntity( idx, home_node, - CollectionManager::collectionMsgHandler + CollectionManager::collectionMsgHandler, + false, obj_ptr ); elm_holder->applyListeners( listener::ElementEventEnum::ElementCreated, idx, home_node diff --git a/src/vt/vrt/context/context_vrtinfo.cc b/src/vt/vrt/context/context_vrtinfo.cc index 38347a49b9..9434fdf6f5 100644 --- a/src/vt/vrt/context/context_vrtinfo.cc +++ b/src/vt/vrt/context/context_vrtinfo.cc @@ -92,7 +92,7 @@ bool VirtualInfo::enqueueWorkUnit(VirtualMessage* raw_msg) { // @todo: fix the from node auto const& from_node = 0; auto m = promoteMsg(raw_msg); - runnable::makeRunnable(m, false, sub_handler, from_node) + runnable::makeRunnable(std::move(m), false, sub_handler, from_node) .withTDEpochFromMsg() .withElementHandler(vc_ptr) .run(); diff --git a/src/vt/vrt/context/context_vrtmanager.impl.h b/src/vt/vrt/context/context_vrtmanager.impl.h index 3e71c60895..5cf7b7cd81 100644 --- a/src/vt/vrt/context/context_vrtmanager.impl.h +++ b/src/vt/vrt/context/context_vrtmanager.impl.h @@ -167,7 +167,7 @@ messaging::PendingSend VirtualContextManager::sendSerialMsg( innermsg->setProxy(toProxy); theLocMan()->vrtContextLoc->routeMsgHandler< SerialMsgT, SerializedMessenger::payloadMsgHandler - >(toProxy, home_node, innermsg); + >(toProxy, home_node, std::move(innermsg)); return messaging::PendingSend(nullptr); }, // custom data transfer lambda if above the eager threshold @@ -279,7 +279,7 @@ messaging::PendingSend VirtualContextManager::sendMsg( [=](MsgPtr mymsg){ // route the message to the destination using the location manager auto msg_shared = promoteMsg(reinterpret_cast(mymsg.get())); - theLocMan()->vrtContextLoc->routeMsg(toProxy, home_node, msg_shared); + theLocMan()->vrtContextLoc->routeMsg(toProxy, home_node, std::move(msg_shared)); } ); } diff --git a/tests/perf/make_runnable_micro.cc b/tests/perf/make_runnable_micro.cc index 9991d630e9..8f16c2d7bb 100644 --- a/tests/perf/make_runnable_micro.cc +++ b/tests/perf/make_runnable_micro.cc @@ -87,7 +87,7 @@ struct NodeObj { void perfRunBenchmark() { for (int i = 0; i < num_iters; i++) { - auto r = runnable::makeRunnable(msgs[i], false, han, 0) + auto r = runnable::makeRunnable(std::move(msgs[i]), false, han, 0) .withContinuation(nullptr) .withTDEpochFromMsg(false); r.enqueue(); diff --git a/tests/unit/location/test_location_common.h b/tests/unit/location/test_location_common.h index 8a7438f6ce..d8780520bc 100644 --- a/tests/unit/location/test_location_common.h +++ b/tests/unit/location/test_location_common.h @@ -123,7 +123,7 @@ void routeTestHandler(EntityMsg* msg) { ); // route message vt::theLocMan()->virtual_loc->routeMsg( - msg->entity_, msg->home_, test_msg + msg->entity_, msg->home_, std::move(test_msg) ); } @@ -147,18 +147,20 @@ void verifyCacheConsistency( // create an entity message to route auto msg = vt::makeMessage(entity, my_node); + // check the routing protocol to be used by the manager. + bool is_eager = theLocMan()->virtual_loc->useEagerProtocol(msg); + auto msg_from = msg->from_; + auto sizeof_msg = sizeof(*msg); // perform the checks only after all entity messages have been // correctly delivered runInEpochCollective([&]{ if (my_node not_eq home) { // route entity message - vt::theLocMan()->virtual_loc->routeMsg(entity, home, msg); + vt::theLocMan()->virtual_loc->routeMsg(entity, home, std::move(msg)); } }); if (my_node not_eq home) { - // check the routing protocol to be used by the manager. - bool is_eager = theLocMan()->virtual_loc->useEagerProtocol(msg); // check for cache updates bool is_entity_cached = isCached(entity); @@ -167,7 +169,7 @@ void verifyCacheConsistency( normal, location, "verifyCacheConsistency: iter={}, entityID={}, home={}, bytes={}, " "in cache={}\n", - iter, entity, msg->from_, sizeof(*msg), is_entity_cached + iter, entity, msg_from, sizeof_msg, is_entity_cached ); if (not is_eager) { diff --git a/tests/unit/scheduler/test_scheduler_timings.cc b/tests/unit/scheduler/test_scheduler_timings.cc index 3521cd7973..a5f9f91098 100644 --- a/tests/unit/scheduler/test_scheduler_timings.cc +++ b/tests/unit/scheduler/test_scheduler_timings.cc @@ -87,7 +87,7 @@ void myHandler(MyMsg* msg) { auto handler = auto_registry::makeAutoHandler(); auto msg = vt::makeMessage(); msg->ms = time; - auto maker = vt::runnable::makeRunnable(msg, false, handler, 0) + auto maker = vt::runnable::makeRunnable(std::move(msg), false, handler, 0) .withLBData(std::get<1>(v[i]).get(), id); auto runnable = maker.getRunnableImpl(); runnable->setupHandler(handler); @@ -123,7 +123,7 @@ TEST_F(TestSchedTimings, test_sched_msg) { auto handler = auto_registry::makeAutoHandler(); - auto maker = vt::runnable::makeRunnable(next_msg, false, handler, 0); + auto maker = vt::runnable::makeRunnable(std::move(next_msg), false, handler, 0); auto runnable = maker.getRunnableImpl(); runnable->setupHandler(handler);