From aeef5ac9e5b5452e483a1385edfb9200cade3837 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 20 Jan 2026 11:06:29 -0600 Subject: [PATCH 01/10] configure/ofi: remove PAC_LIB_DEPS for libfabric We usually don't need link with static dependency with dynamic libraries. Add pkgconfig dependency sometime cause build issues when the dependency library paths are not in system library paths by default. Thus, removing it brings less issues. We may need add dependency libraries when building static libraries. So TODO: add it back for static build only. --- src/mpid/ch4/netmod/ofi/subconfigure.m4 | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/mpid/ch4/netmod/ofi/subconfigure.m4 b/src/mpid/ch4/netmod/ofi/subconfigure.m4 index e63f11a6c0c..f9768c8894d 100644 --- a/src/mpid/ch4/netmod/ofi/subconfigure.m4 +++ b/src/mpid/ch4/netmod/ofi/subconfigure.m4 @@ -340,18 +340,6 @@ AM_COND_IF([BUILD_CH4_NETMOD_OFI],[ PAC_LIBS_ADD([-lfabric]) fi - # check for libfabric dependence libs - pcdir="" - if test "${ofi_embedded}" = "yes" ; then - pcdir="${main_top_builddir}/modules/libfabric" - elif test -f ${with_libfabric}/lib/pkgconfig/libfabric.pc ; then - pcdir="${with_libfabric}/lib/pkgconfig" - fi - PAC_LIB_DEPS(fabric, $pcdir) - if test "x$ac_libfabric_deps" != "x"; then - PAC_APPEND_FLAG([${ac_libfabric_deps}],[WRAPPER_LIBS]) - fi - AC_ARG_ENABLE(ofi-domain, [ --enable-ofi-domain - Use fi_domain for vci contexts. This is the default. Use --disable-ofi-domain to use fi_contexts within From 1cfca775708300dd4fe9337159a936f892f7d0fb Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Thu, 15 Jan 2026 14:57:31 -0600 Subject: [PATCH 02/10] pmi: missing exit in PMI_Abort The process manager, e.g. hydra may still kill all the processes after receiving the the abort message, but applications will actually get a return before being killed. This can be confusing. Let's exit and never return in PMI_Abort. PMI2_Abort already does that. --- src/pmi/src/pmi_v1.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pmi/src/pmi_v1.c b/src/pmi/src/pmi_v1.c index 802aa61e1d5..81915cb36ff 100644 --- a/src/pmi/src/pmi_v1.c +++ b/src/pmi/src/pmi_v1.c @@ -440,6 +440,7 @@ PMI_API_PUBLIC int PMI_Abort(int exit_code, const char error_msg[]) pmi_errno = PMIU_cmd_send(PMI_fd, &pmicmd); + PMIU_Exit(exit_code); return pmi_errno; } From c62cc516cbee668646fab31080e56b4d27647004 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 20 Jan 2026 12:36:06 -0600 Subject: [PATCH 03/10] ch4/ofi: check MAX RETRY in MPIDI_OFI_CALL_RETRY_AM When user set finite MPIR_CVAR_CH4_OFI_MAX_EAGAIN_RETRY, we should check so MPIDI_OFI_CALL_RETRY_AM don't end up in infinite loop. For tcp provider at least, fi_send to a dead process result in infinite FI_EAGAIN. --- src/mpid/ch4/netmod/ofi/ofi_impl.h | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/mpid/ch4/netmod/ofi/ofi_impl.h b/src/mpid/ch4/netmod/ofi/ofi_impl.h index 625021a284e..0e7bf4ac910 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_impl.h +++ b/src/mpid/ch4/netmod/ofi/ofi_impl.h @@ -134,6 +134,7 @@ int MPIDI_OFI_handle_cq_error(int vci, int nic, ssize_t ret); #define MPIDI_OFI_CALL_RETRY_AM(FUNC,vci_,STR) \ do { \ ssize_t _ret; \ + int _retry = MPIR_CVAR_CH4_OFI_MAX_EAGAIN_RETRY; \ do { \ _ret = FUNC; \ if (likely(_ret==0)) break; \ @@ -144,9 +145,12 @@ int MPIDI_OFI_handle_cq_error(int vci, int nic, ssize_t ret); "**ofid_"#STR" %s %s", \ MPIDI_OFI_DEFAULT_NIC_NAME, \ fi_strerror(-_ret)); \ + if (_retry > 0) { \ + _retry--; \ + MPIR_ERR_CHKANDJUMP(_retry == 0, mpi_errno, MPIX_ERR_EAGAIN, "**eagain"); \ + } \ mpi_errno = MPIDI_OFI_progress_do_queue(vci_); \ - if (mpi_errno != MPI_SUCCESS) \ - MPIR_ERR_CHECK(mpi_errno); \ + MPIR_ERR_CHECK(mpi_errno); \ } while (_ret == -FI_EAGAIN); \ } while (0) From 193c97a29d2d7fc7eeab8861c3350d979e8347c0 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Thu, 15 Jan 2026 15:23:34 -0600 Subject: [PATCH 04/10] ch4/ofi: avoid using MPIR_ERR_SETFATALANDJUMP To allow ulfm to work, we need turn off fatal error codes. --- src/mpid/ch4/netmod/ofi/ofi_events.c | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/mpid/ch4/netmod/ofi/ofi_events.c b/src/mpid/ch4/netmod/ofi/ofi_events.c index 1e6e6b35166..2eeb55c7e3a 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_events.c +++ b/src/mpid/ch4/netmod/ofi/ofi_events.c @@ -544,10 +544,9 @@ int MPIDI_OFI_handle_cq_error(int vci, int nic, ssize_t ret) break; default: - MPIR_ERR_SETFATALANDJUMP2(mpi_errno, MPI_ERR_OTHER, "**ofid_poll", - "**ofid_poll %s %s", - MPIDI_OFI_DEFAULT_NIC_NAME, - fi_strerror(e.err)); + MPIR_ERR_SETANDJUMP2(mpi_errno, MPI_ERR_OTHER, "**ofid_poll", + "**ofid_poll %s %s", + MPIDI_OFI_DEFAULT_NIC_NAME, fi_strerror(e.err)); } break; @@ -588,17 +587,17 @@ int MPIDI_OFI_handle_cq_error(int vci, int nic, ssize_t ret) break; default: - MPIR_ERR_SETFATALANDJUMP2(mpi_errno, MPI_ERR_OTHER, "**ofid_poll", - "**ofid_poll %s %s", - MPIDI_OFI_DEFAULT_NIC_NAME, fi_strerror(e.err)); + MPIR_ERR_SETANDJUMP2(mpi_errno, MPI_ERR_OTHER, "**ofid_poll", + "**ofid_poll %s %s", + MPIDI_OFI_DEFAULT_NIC_NAME, fi_strerror(e.err)); } break; default: - MPIR_ERR_SETFATALANDJUMP2(mpi_errno, MPI_ERR_OTHER, "**ofid_poll", - "**ofid_poll %s %s", - MPIDI_OFI_DEFAULT_NIC_NAME, fi_strerror(errno)); + MPIR_ERR_SETANDJUMP2(mpi_errno, MPI_ERR_OTHER, "**ofid_poll", + "**ofid_poll %s %s", + MPIDI_OFI_DEFAULT_NIC_NAME, fi_strerror(errno)); } fn_exit: From 81aa293719541d157767567159ec945543b4de57 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Mon, 19 Jan 2026 18:49:45 -0600 Subject: [PATCH 05/10] ch4/ofi: set error in request for MPIDI_OFI_EVENT_AM_SEND Return error in progress will abort whichever calls that invoked progress. Rather, we should return the error in request's status whenever we can so the error can be handled in the proper context. Do this for short am send for now. It is needed to use short active message as a way to probe dead processes. And add a fixme that we need apply this to other requests as well. --- src/mpid/ch4/netmod/ofi/ofi_events.c | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/mpid/ch4/netmod/ofi/ofi_events.c b/src/mpid/ch4/netmod/ofi/ofi_events.c index 2eeb55c7e3a..b8a6bebacc6 100644 --- a/src/mpid/ch4/netmod/ofi/ofi_events.c +++ b/src/mpid/ch4/netmod/ofi/ofi_events.c @@ -509,6 +509,7 @@ int MPIDI_OFI_handle_cq_error(int vci, int nic, ssize_t ret) struct fi_cq_err_entry e; char err_data[MPIDI_OFI_MAX_ERR_DATA_SIZE]; MPIR_Request *req; + int event_id; ssize_t ret_cqerr; MPIR_FUNC_ENTER; @@ -556,7 +557,7 @@ int MPIDI_OFI_handle_cq_error(int vci, int nic, ssize_t ret) /* Clean up the request. Reference MPIDI_OFI_recv_event. * NOTE: assuming only the receive request can be cancelled and reach here */ - int event_id = MPIDI_OFI_REQUEST(req, event_id); + event_id = MPIDI_OFI_REQUEST(req, event_id); switch (event_id) { case MPIDI_OFI_EVENT_DYNPROC_DONE: dynproc_done_event(vci, e.op_context, req); @@ -587,9 +588,24 @@ int MPIDI_OFI_handle_cq_error(int vci, int nic, ssize_t ret) break; default: - MPIR_ERR_SETANDJUMP2(mpi_errno, MPI_ERR_OTHER, "**ofid_poll", - "**ofid_poll %s %s", - MPIDI_OFI_DEFAULT_NIC_NAME, fi_strerror(e.err)); + req = MPIDI_OFI_context_to_request(e.op_context); + event_id = MPIDI_OFI_REQUEST(req, event_id); + switch (event_id) { + case MPIDI_OFI_EVENT_AM_SEND: + /* set req->status.MPI_ERROR */ + MPIR_ERR_SET2(req->status.MPI_ERROR, MPI_ERR_OTHER, "**ofid_poll", + "**ofid_poll %s %s", + MPIDI_OFI_DEFAULT_NIC_NAME, fi_strerror(e.err)); + mpi_errno = am_isend_event(vci, NULL, req); + break; + default: + /* FIXME: application can't handle error in progress due to loss of + * context. We should try best to set error in req->status instead. + */ + MPIR_ERR_SETANDJUMP2(mpi_errno, MPI_ERR_OTHER, "**ofid_poll", + "**ofid_poll %s %s", + MPIDI_OFI_DEFAULT_NIC_NAME, fi_strerror(e.err)); + } } break; From 9f38bfc0eda36a3f3d73e0a2e9062762fa865ee8 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Fri, 16 Jan 2026 14:52:39 -0600 Subject: [PATCH 06/10] ch4/ulfm: add new MPID_Comm_agree algorithm Add an algorithm that does not depend on PMI_Get "dead_processes". This algorithm relies on a try_probe(rank) method that can return MPIX_ERR_PROC_FAILED if rank is dead. The algorithm first performs a reduction and then a broadcast. Both uses active messages to allow dead processes. --- src/mpi/comm/ulfm_impl.c | 4 + src/mpid/ch4/include/mpidch4.h | 4 + src/mpid/ch4/include/mpidpre.h | 1 + src/mpid/ch4/src/Makefile.mk | 1 + src/mpid/ch4/src/ch4_comm.c | 1 + src/mpid/ch4/src/ch4_impl.h | 2 + src/mpid/ch4/src/ch4_init.c | 6 + src/mpid/ch4/src/ch4_ulfm.c | 344 +++++++++++++++++++++++++++++++++ src/mpid/ch4/src/mpidig.h | 2 + 9 files changed, 365 insertions(+) create mode 100644 src/mpid/ch4/src/ch4_ulfm.c diff --git a/src/mpi/comm/ulfm_impl.c b/src/mpi/comm/ulfm_impl.c index 4c4a4fecbaa..d330fea0c10 100644 --- a/src/mpi/comm/ulfm_impl.c +++ b/src/mpi/comm/ulfm_impl.c @@ -193,6 +193,9 @@ int MPIR_Comm_shrink_impl(MPIR_Comm * comm_ptr, MPIR_Comm ** newcomm_ptr) int MPIR_Comm_agree_impl(MPIR_Comm * comm_ptr, int *flag) { +#ifdef MPID_COMM_AGREE + return MPID_Comm_agree(comm_ptr, flag); +#else int mpi_errno = MPI_SUCCESS, mpi_errno_tmp = MPI_SUCCESS; MPIR_Group *comm_grp = NULL, *failed_grp = NULL, *new_group_ptr = NULL, *global_failed = NULL; int result, success = 1; @@ -264,4 +267,5 @@ int MPIR_Comm_agree_impl(MPIR_Comm * comm_ptr, int *flag) return mpi_errno; fn_fail: goto fn_exit; +#endif } diff --git a/src/mpid/ch4/include/mpidch4.h b/src/mpid/ch4/include/mpidch4.h index 103b3bd6900..e82c2cabcbb 100644 --- a/src/mpid/ch4/include/mpidch4.h +++ b/src/mpid/ch4/include/mpidch4.h @@ -419,6 +419,10 @@ int MPID_Waitall_enqueue(int count, MPI_Request * array_of_requests, MPI_Status * array_of_statuses); int MPID_Abort(struct MPIR_Comm *comm, int mpi_errno, int exit_code, const char *error_msg); +#define MPID_COMM_AGREE 1 +int MPID_Comm_agree(MPIR_Comm * comm, int *flag); + + /* This function is not exposed to the upper layers but functions in a way * similar to the functions above. Other CH4-level functions should call this * function to query locality. This function will determine whether to call the diff --git a/src/mpid/ch4/include/mpidpre.h b/src/mpid/ch4/include/mpidpre.h index dd6c370c34c..99fe3c08fab 100644 --- a/src/mpid/ch4/include/mpidpre.h +++ b/src/mpid/ch4/include/mpidpre.h @@ -575,6 +575,7 @@ typedef struct MPIDIG_comm_t { typedef struct MPIDI_Devcomm_t { struct { + int comm_agree_epoch; /* The first fields are used by the AM(MPIDIG) apis */ MPIDIG_comm_t am; /* for netmod internal send/recv (e.g. am_tag_{send,recv}, pipeline, rndv_{read,write} */ diff --git a/src/mpid/ch4/src/Makefile.mk b/src/mpid/ch4/src/Makefile.mk index 0a334395467..815b9f7b1bf 100644 --- a/src/mpid/ch4/src/Makefile.mk +++ b/src/mpid/ch4/src/Makefile.mk @@ -51,6 +51,7 @@ mpi_core_sources += src/mpid/ch4/src/ch4_globals.c \ src/mpid/ch4/src/ch4_stream_enqueue.c \ src/mpid/ch4/src/ch4_persist.c \ src/mpid/ch4/src/ch4_vci.c \ + src/mpid/ch4/src/ch4_ulfm.c \ src/mpid/ch4/src/mpidig_init.c \ src/mpid/ch4/src/mpidig_recvq.c \ src/mpid/ch4/src/mpidig_pt2pt_callbacks.c \ diff --git a/src/mpid/ch4/src/ch4_comm.c b/src/mpid/ch4/src/ch4_comm.c index 061e758236e..872e8f1bd7b 100644 --- a/src/mpid/ch4/src/ch4_comm.c +++ b/src/mpid/ch4/src/ch4_comm.c @@ -148,6 +148,7 @@ int MPID_Comm_commit_pre_hook(MPIR_Comm * comm) mpi_errno = MPIDIG_init_comm(comm); MPIR_ERR_CHECK(mpi_errno); + MPIDI_COMM(comm, comm_agree_epoch) = 0; /* initialize next_am_tag for internal messaging */ int total_tag_bits = get_num_bits(MPIR_Process.attrs.tag_ub); MPIDI_COMM(comm, next_am_tag) = 0; diff --git a/src/mpid/ch4/src/ch4_impl.h b/src/mpid/ch4/src/ch4_impl.h index bf68132c93c..6a4125e0b1a 100644 --- a/src/mpid/ch4/src/ch4_impl.h +++ b/src/mpid/ch4/src/ch4_impl.h @@ -19,6 +19,8 @@ int MPIDI_init_per_vci(int vci); int MPIDI_destroy_per_vci(int vci); int MPIDIG_get_context_index(uint64_t context_id); uint64_t MPIDIG_generate_win_id(MPIR_Comm * comm_ptr); +int MPIDI_ulfm_init(void); +int MPIDI_ulfm_finalize(void); /* define CH4_CALL to call netmod or shm API based on is_local */ #ifdef MPIDI_CH4_DIRECT_NETMOD diff --git a/src/mpid/ch4/src/ch4_init.c b/src/mpid/ch4/src/ch4_init.c index c94c573cb1d..25bdd343c88 100644 --- a/src/mpid/ch4/src/ch4_init.c +++ b/src/mpid/ch4/src/ch4_init.c @@ -594,6 +594,9 @@ int MPID_Init(int requested, int *provided) mpi_errno = MPIDU_stream_workq_init(); MPIR_ERR_CHECK(mpi_errno); + mpi_errno = MPIDI_ulfm_init(); + MPIR_ERR_CHECK(mpi_errno); + /* Create genq for GPU collectives */ mpi_errno = MPIDU_genq_private_pool_create(MPIR_CVAR_CH4_GPU_COLL_SWAP_BUFFER_SZ, @@ -733,6 +736,9 @@ int MPID_Finalize(void) mpi_errno = MPIDU_stream_workq_finalize(); MPIR_ERR_CHECK(mpi_errno); + mpi_errno = MPIDI_ulfm_finalize(); + MPIR_ERR_CHECK(mpi_errno); + for (int i = 0; i < MAX_CH4_MUTEXES; i++) { int err; MPID_Thread_mutex_destroy(&MPIDI_global.m[i], &err); diff --git a/src/mpid/ch4/src/ch4_ulfm.c b/src/mpid/ch4/src/ch4_ulfm.c new file mode 100644 index 00000000000..013d99a38b6 --- /dev/null +++ b/src/mpid/ch4/src/ch4_ulfm.c @@ -0,0 +1,344 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#include "mpidimpl.h" +#include "uthash.h" + +/* A ULFM Comm_agree algorithm without relying on PMI. Instead, it relies on + * a send_probe function that sends an active message to a peer that should + * return MPIX_ERR_PROC_FAILED if the peer is dead. + * + * The algorithm: + * + * * A power-of-2 hierarichical level with the least surviving rank as a level + * leader. For example, at level 0, every 2 ranks form a group; at level 1, + * every 4 ranks form a group; and so on. + * + * * First, start at level 0, each surviving rank probe its peer rank and collect + * rank maps for its group at the level. The leader rank increment its level + * and continue while the other rank go to the second part waitint for broadcast + * message from its peer. The surviving rank continue level up until one root + * rank remain at level q (ceil(log2(comm_size))) and collects the entire rank + * map. + * + * * Second, root decrement its level and send its peer the whole rank map. Every + * rank upon receiving broadcast decrement its level and continue the broadcast + * until every rank reach level 0 and finish. + * + * * If any of the broadcast fails (peer dead during the process), it will return + * MPIX_ERR_PROC_FAILED. Otherwise, it returns MPI_SUCCESS. + */ + +#define DIR_UP 0 +#define DIR_DOWN 1 + +typedef struct { + int context_id; + int comm_size; + + int flag; + /* {up,down}_probes[level] is the latest epoch for the {up,down} probe + * at this level that we received. */ + int *up_probes; + int *down_probes; + /* process_dead_map[rank] is true if rank is dead */ + bool *process_dead_map; + /* failed_procs is a dynamic array of failed processes */ + int num_failed; + int num_failed_max; + int *failed_procs; + + UT_hash_handle hh; +} comm_agree_entry; + +static comm_agree_entry *comm_agree_hash = NULL; + +static comm_agree_entry *find_comm_agree_entry(int context_id, int comm_size); +static void update_failed_proc(comm_agree_entry * entry, int rank); +static int send_probe(comm_agree_entry * entry, MPIR_Comm * comm, + int epoch, int level, int level_dir); + +int MPID_Comm_agree(MPIR_Comm * comm, int *flag) +{ + int mpi_errno = MPI_SUCCESS; + + MPIR_Assert(comm->comm_kind == MPIR_COMM_KIND__INTRACOMM); + MPID_THREAD_CS_ENTER(VCI, MPIDI_VCI_LOCK(0)); + + comm_agree_entry *entry = find_comm_agree_entry(comm->context_id, comm->local_size); + int epoch = (++MPIDI_COMM(comm, comm_agree_epoch)); + + entry->flag &= *flag; + + int rank, comm_size; + rank = comm->rank; + comm_size = comm->local_size; + + int q = MPL_log2(comm_size); + if ((1 << q) < comm_size) { + q++; + } + + /* probe at pof2 level */ + int level = 0; + + /* first ascending the level as we probe the peer. */ + while (level < q) { + int peer_root = ((rank >> level) ^ 1) << level; + mpi_errno = send_probe(entry, comm, epoch, level, DIR_UP); + if (mpi_errno == MPI_SUCCESS) { + /* probe sent, wait for reply */ + while (entry->up_probes[level] < epoch) { + mpi_errno = MPIDI_progress_test_vci(0); + MPIR_ERR_CHECK(mpi_errno); + } + + if (rank > peer_root) { + /* done at this level if peer is the lower rank */ + break; + } + } + level += 1; + } + + /* descending the level as we broadcast to peer the final map for failed processes */ + + bool has_new_failures = false; + /* skip level q since the last two surviving process already exchanged the + * whole info */ + if (level == q) { + level--; + } else if (level < q - 1) { + /* all other processes wait for the broadcast before proceed */ + while (entry->down_probes[level] < epoch) { + mpi_errno = MPIDI_progress_test_vci(0); + MPIR_ERR_CHECK(mpi_errno); + } + } + while (level > 0) { + level--; + mpi_errno = send_probe(entry, comm, epoch, level, DIR_DOWN); + if (mpi_errno == MPIX_ERR_PROC_FAILED) { + has_new_failures = true; + } + } + + if (has_new_failures) { + mpi_errno = MPIX_ERR_PROC_FAILED; + } + + fn_exit: + MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI_LOCK(0)); + return mpi_errno; + fn_fail: + goto fn_exit; +} + +/* ulfm active message probe */ +static int ulfm_probe_origin_cb(MPIR_Request * req); +static int ulfm_probe_target_msg_cb(void *am_hdr, void *data, MPI_Aint data_sz, uint32_t attr, + MPIR_Request ** req); + +int MPIDI_ulfm_init(void) +{ + MPIDIG_am_reg_cb(MPIDI_ULFM_PROBE, &ulfm_probe_origin_cb, &ulfm_probe_target_msg_cb); + return MPI_SUCCESS; +} + +int MPIDI_ulfm_finalize(void) +{ + comm_agree_entry *entry, *tmp; + + HASH_ITER(hh, comm_agree_hash, entry, tmp) { + HASH_DEL(comm_agree_hash, entry); + + MPL_free(entry->up_probes); + MPL_free(entry->down_probes); + MPL_free(entry->process_dead_map); + MPL_free(entry->failed_procs); + + MPL_free(entry); + } + + return MPI_SUCCESS; +} + +struct probe_hdr { + int context_id; + int comm_size; + int flag; + int epoch; + int level; + int level_dir; /* DIR_UP or DIR_DOWN */ + int origin_rank; + int num_failed_procs; +}; + +static int ulfm_probe_origin_cb(MPIR_Request * sreq) +{ + MPIR_cc_dec(sreq->cc_ptr); + return MPI_SUCCESS; +} + +static int ulfm_probe_target_msg_cb(void *am_hdr, void *data, MPI_Aint data_sz, uint32_t attr, + MPIR_Request ** req) +{ + struct probe_hdr *hdr = am_hdr; + comm_agree_entry *entry = find_comm_agree_entry(hdr->context_id, hdr->comm_size); + if (hdr->level_dir == DIR_UP) { + entry->up_probes[hdr->level] = hdr->epoch; + entry->flag &= hdr->flag; + } else { + entry->down_probes[hdr->level] = hdr->epoch; + entry->flag = hdr->flag; + } + + if (hdr->num_failed_procs > 0) { + int *failed_procs = data; + for (int i = 0; i < hdr->num_failed_procs; i++) { + update_failed_proc(entry, failed_procs[i]); + } + } + + if (attr & MPIDIG_AM_ATTR__IS_ASYNC) { + *req = NULL; + } + + return MPI_SUCCESS; +} + +/* ---- routines for sending probe ---- */ + +static int send_probe(comm_agree_entry * entry, MPIR_Comm * comm, + int epoch, int level, int level_dir) +{ + int mpi_errno = MPI_SUCCESS; + + int my_rank = comm->rank; + int peer_root = ((my_rank >> level) ^ 1) << level; + int peer_max = peer_root + (1 << level); + if (peer_max > entry->comm_size) { + peer_max = entry->comm_size; + } + /* try sending probe to peer group in rank order, stop at the first + * peer that is successful (i.e. peer is not dead). + */ + struct probe_hdr hdr; + hdr.context_id = entry->context_id; + hdr.comm_size = entry->comm_size; + hdr.flag = entry->flag; + hdr.epoch = epoch; + hdr.level = level; + hdr.level_dir = level_dir; + hdr.origin_rank = my_rank; /* for debugging purpose */ + hdr.num_failed_procs = entry->num_failed; + + int newly_failed_procs = 0; + for (int r = peer_root; r < peer_max; r++) { + if (!entry->process_dead_map[r]) { + MPIR_Request *sreq; + MPIDI_CH4_REQUEST_CREATE(sreq, MPIR_REQUEST_KIND__SEND, + 0 /* vci */ , 1 /* ref_count */); + MPIR_ERR_CHKANDJUMP(!sreq, mpi_errno, MPI_ERR_OTHER, "**nomemreq"); + MPIDI_NM_am_request_init(sreq); + + mpi_errno = MPIDI_NM_am_isend(r, comm, MPIDI_ULFM_PROBE, &hdr, sizeof(hdr), + entry->failed_procs, entry->num_failed, MPIR_INT_INTERNAL, + 0, 0, sreq); + if (mpi_errno == MPI_SUCCESS) { + while (!MPIR_Request_is_complete(sreq)) { + mpi_errno = MPIDI_progress_test_vci(0); + MPIR_Assert(mpi_errno == MPI_SUCCESS); + } + if (sreq->status.MPI_ERROR == MPI_SUCCESS) { + /* at least one process is alive, we are done */ + MPIR_Request_free_unsafe(sreq); + /* in the broadcast stage we need know if there is new failures */ + if (newly_failed_procs && level_dir == DIR_DOWN) { + mpi_errno = MPIX_ERR_PROC_FAILED; + } + goto fn_exit; + } else { + newly_failed_procs++; + update_failed_proc(entry, r); + } + } else { + newly_failed_procs++; + update_failed_proc(entry, r); + } + } + } + /* all processes in the peer group failed. We only need know is Comm_agree + * during the UP (reduction) stage */ + if (level_dir == DIR_UP) { + mpi_errno = MPIX_ERR_PROC_FAILED; + } + + fn_exit: + return mpi_errno; + fn_fail: + /* TODO: check failed rank will return MPIX_ERR_PROC_FAILED */ + goto fn_exit; +} + +/* routines for managing received probes */ + +static comm_agree_entry *find_comm_agree_entry(int context_id, int comm_size) +{ + comm_agree_entry *entry = NULL; + + /* Look up entry in hash table by context_id */ + HASH_FIND_INT(comm_agree_hash, &context_id, entry); + + if (entry == NULL) { + /* Entry not found, allocate and add to hash */ + entry = MPL_malloc(sizeof(comm_agree_entry), MPL_MEM_OTHER); + if (!entry) { + goto fn_exit; + } + + /* initialize the entry */ + entry->context_id = context_id; + entry->comm_size = comm_size; + + entry->flag = 0xffffffff; + + int pof2 = MPL_pof2(comm_size); + entry->up_probes = MPL_calloc(pof2, sizeof(int), MPL_MEM_OTHER); + MPIR_Assert(entry->up_probes); + entry->down_probes = MPL_calloc(pof2, sizeof(int), MPL_MEM_OTHER); + MPIR_Assert(entry->down_probes); + + entry->process_dead_map = MPL_calloc(comm_size, sizeof(int), MPL_MEM_OTHER); + MPIR_Assert(entry->process_dead_map); + + entry->num_failed = 0; + entry->num_failed_max = 0; + entry->failed_procs = NULL; + + /* Add to hash table */ + HASH_ADD_INT(comm_agree_hash, context_id, entry, MPL_MEM_OTHER); + } + + fn_exit: + return entry; +} + +static void update_failed_proc(comm_agree_entry * entry, int rank) +{ + if (!entry->process_dead_map[rank]) { + entry->process_dead_map[rank] = true; + /* check the size of dynamic array */ + if (entry->num_failed + 1 > entry->num_failed_max) { + int new_size = entry->num_failed_max ? entry->num_failed_max * 2 : 10; + entry->failed_procs = MPL_realloc(entry->failed_procs, new_size * sizeof(int), + MPL_MEM_OTHER); + MPIR_Assert(entry->failed_procs); + entry->num_failed_max = new_size; + } + /* add the entry */ + entry->failed_procs[entry->num_failed++] = rank; + } +} diff --git a/src/mpid/ch4/src/mpidig.h b/src/mpid/ch4/src/mpidig.h index db850ea931e..c9af3620be7 100644 --- a/src/mpid/ch4/src/mpidig.h +++ b/src/mpid/ch4/src/mpidig.h @@ -59,6 +59,8 @@ enum { MPIDIG_COMM_ABORT, + MPIDI_ULFM_PROBE, + MPIDI_IPC_ACK, MPIDI_IPC_WRITE, From 6af3c848393c94342cec6f094afc1ab875090bf3 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Mon, 19 Jan 2026 12:06:20 -0600 Subject: [PATCH 07/10] ch4/ulfm: add MPID_Comm_get_failed MPID_Comm_agree will update a list of failed processes locally. This provides an alternative implementation of ULFM that does not rely on PMI_dead_processes. --- src/mpi/comm/ulfm_impl.c | 6 +++ src/mpid/ch4/include/mpidch4.h | 1 + src/mpid/ch4/src/ch4_ulfm.c | 91 ++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+) diff --git a/src/mpi/comm/ulfm_impl.c b/src/mpi/comm/ulfm_impl.c index d330fea0c10..4b070b539b9 100644 --- a/src/mpi/comm/ulfm_impl.c +++ b/src/mpi/comm/ulfm_impl.c @@ -14,6 +14,7 @@ * implementation. Since ULFM require local discovery, we should remove that */ +#ifndef MPID_COMM_AGREE /* maintain a list of failed process in comm_world */ /* NOTE: we need maintain the order of failed_procs as the show up. We do it here because * it isn't fair to require PMI to do it. @@ -67,9 +68,13 @@ static void parse_failed_procs_string(char *failed_procs_string) token = strtok(NULL, delim); } } +#endif int MPIR_Comm_get_failed_impl(MPIR_Comm * comm_ptr, MPIR_Group ** failed_group_ptr) { +#ifdef MPID_COMM_AGREE + return MPID_Comm_get_failed(comm_ptr, failed_group_ptr); +#else int mpi_errno = MPI_SUCCESS; MPIR_FUNC_ENTER; @@ -119,6 +124,7 @@ int MPIR_Comm_get_failed_impl(MPIR_Comm * comm_ptr, MPIR_Group ** failed_group_p return mpi_errno; fn_fail: goto fn_exit; +#endif } /* comm shrink impl; assumes that standard error checking has already taken diff --git a/src/mpid/ch4/include/mpidch4.h b/src/mpid/ch4/include/mpidch4.h index e82c2cabcbb..2a0b09aa217 100644 --- a/src/mpid/ch4/include/mpidch4.h +++ b/src/mpid/ch4/include/mpidch4.h @@ -421,6 +421,7 @@ int MPID_Abort(struct MPIR_Comm *comm, int mpi_errno, int exit_code, const char #define MPID_COMM_AGREE 1 int MPID_Comm_agree(MPIR_Comm * comm, int *flag); +int MPID_Comm_get_failed(MPIR_Comm * comm_ptr, MPIR_Group ** failed_group_ptr); /* This function is not exposed to the upper layers but functions in a way diff --git a/src/mpid/ch4/src/ch4_ulfm.c b/src/mpid/ch4/src/ch4_ulfm.c index 013d99a38b6..2d4ab00666e 100644 --- a/src/mpid/ch4/src/ch4_ulfm.c +++ b/src/mpid/ch4/src/ch4_ulfm.c @@ -55,10 +55,49 @@ typedef struct { static comm_agree_entry *comm_agree_hash = NULL; +static int num_global_failed_procs; +static MPIR_Lpid *global_failed_procs; + static comm_agree_entry *find_comm_agree_entry(int context_id, int comm_size); static void update_failed_proc(comm_agree_entry * entry, int rank); static int send_probe(comm_agree_entry * entry, MPIR_Comm * comm, int epoch, int level, int level_dir); +static int update_global_failed_procs(MPIR_Comm * comm, int num_failed, int *failed_procs); + +int MPID_Comm_get_failed(MPIR_Comm * comm, MPIR_Group ** failed_group_ptr) +{ + int mpi_errno = MPI_SUCCESS; + MPIR_CHKPMEM_DECL(); + + if (num_global_failed_procs == 0) { + *failed_group_ptr = MPIR_Group_empty; + } else { + int num = 0; + MPIR_Lpid *procs = NULL; + for (int i = 0; i < num_global_failed_procs; i++) { + int r = MPIR_Group_lpid_to_rank(comm->local_group, global_failed_procs[i]); + if (r != MPI_UNDEFINED) { + if (procs == NULL) { + MPIR_CHKPMEM_MALLOC(procs, num_global_failed_procs * sizeof(MPIR_Lpid), + MPL_MEM_COMM); + } + procs[num++] = global_failed_procs[i]; + } + } + if (num > 0) { + mpi_errno = MPIR_Group_create_map(num, -1, comm->session_ptr, procs, failed_group_ptr); + MPIR_ERR_CHECK(mpi_errno); + } else { + *failed_group_ptr = MPIR_Group_empty; + } + } + + fn_exit: + return mpi_errno; + fn_fail: + MPIR_CHKPMEM_REAP(); + goto fn_exit; +} int MPID_Comm_agree(MPIR_Comm * comm, int *flag) { @@ -125,6 +164,9 @@ int MPID_Comm_agree(MPIR_Comm * comm, int *flag) } } + mpi_errno = update_global_failed_procs(comm, entry->num_failed, entry->failed_procs); + MPIR_ERR_CHECK(mpi_errno); + if (has_new_failures) { mpi_errno = MPIX_ERR_PROC_FAILED; } @@ -342,3 +384,52 @@ static void update_failed_proc(comm_agree_entry * entry, int rank) entry->failed_procs[entry->num_failed++] = rank; } } + +/* -- routine for update global_failed_procs -- */ + +/* qsort compare function */ +static int sort_fn(const void *a, const void *b) +{ + return (*(MPIR_Lpid *) a - *(MPIR_Lpid *) b); +} + +static int update_global_failed_procs(MPIR_Comm * comm, int num_failed, int *failed_procs) +{ + int mpi_errno = MPI_SUCCESS; + MPIR_CHKLMEM_DECL(); + + int num_new_failed = 0; + MPIR_Lpid *new_failed = NULL; + for (int i = 0; i < num_failed; i++) { + MPIR_Lpid lpid = MPIR_comm_rank_to_lpid(comm, failed_procs[i]); + bool is_new = true; + for (int j = 0; j < num_global_failed_procs; j++) { + if (global_failed_procs[j] == lpid) { + is_new = false; + break; + } + } + if (is_new) { + if (!new_failed) { + MPIR_CHKLMEM_MALLOC(new_failed, num_failed * sizeof(MPIR_Lpid)); + } + new_failed[num_new_failed++] = lpid; + } + } + + if (num_new_failed > 0) { + int num = num_global_failed_procs + num_new_failed; + global_failed_procs = MPL_realloc(global_failed_procs, num * sizeof(MPIR_Lpid), + MPL_MEM_OTHER); + for (int i = 0; i < num_new_failed; i++) { + global_failed_procs[num_global_failed_procs++] = new_failed[i]; + } + qsort(global_failed_procs, num, sizeof(MPIR_Lpid), sort_fn); + } + + fn_exit: + MPIR_CHKLMEM_FREEALL(); + return mpi_errno; + fn_fail: + goto fn_exit; +} From 0ed1ddb4404ba6f3be0d023b19115b8a254c2ee5 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Mon, 19 Jan 2026 19:27:09 -0600 Subject: [PATCH 08/10] ulfm: re-implement MPIR_Comm_shrink_impl If we assume user always call MPIX_Comm_shrink after MPIX_Comm_agree, we can simply reduce MPIX_Comm_shrink to MPI_Comm_create_group. There is always a possibility that user call MPIX_Comm_shrink without reaching a consensus on failed processes or there may be new processes fail during the call, however I argue, there is no robust way to handle this situation anyway other than just return an error to user and let user handle it. --- src/mpi/comm/ulfm_impl.c | 78 ++++++++++++++-------------------------- 1 file changed, 27 insertions(+), 51 deletions(-) diff --git a/src/mpi/comm/ulfm_impl.c b/src/mpi/comm/ulfm_impl.c index 4b070b539b9..7881ffe8ac5 100644 --- a/src/mpi/comm/ulfm_impl.c +++ b/src/mpi/comm/ulfm_impl.c @@ -127,73 +127,49 @@ int MPIR_Comm_get_failed_impl(MPIR_Comm * comm_ptr, MPIR_Group ** failed_group_p #endif } -/* comm shrink impl; assumes that standard error checking has already taken - * place in the calling function */ +/* Supposedly caller already agreed on the result of MPIX_Comm_get_failed + * by running MPIX_Comm_agree. Thus, shrink is merely MPI_Comm_create_group. + */ int MPIR_Comm_shrink_impl(MPIR_Comm * comm_ptr, MPIR_Comm ** newcomm_ptr) { int mpi_errno = MPI_SUCCESS; - MPIR_Group *global_failed = NULL, *comm_grp = NULL, *new_group_ptr = NULL; - int attempts = 0; - MPIR_FUNC_ENTER; /* TODO - Implement this function for intercommunicators */ - MPIR_Comm_group_impl(comm_ptr, &comm_grp); + MPIR_Assert(comm_ptr->comm_kind == MPIR_COMM_KIND__INTRACOMM); - do { - int coll_attr = 0; - - MPID_Comm_get_all_failed_procs(comm_ptr, &global_failed, MPIR_SHRINK_TAG); - /* Ignore the mpi_errno value here as it will definitely communicate - * with failed procs */ + MPIR_Group *comm_grp; + mpi_errno = MPIR_Comm_group_impl(comm_ptr, &comm_grp); + MPIR_ERR_CHECK(mpi_errno); - mpi_errno = MPIR_Group_difference_impl(comm_grp, global_failed, &new_group_ptr); - MPIR_ERR_CHECK(mpi_errno); - if (MPIR_Group_empty != global_failed) - MPIR_Group_release(global_failed); - - mpi_errno = MPIR_Comm_create_group_impl(comm_ptr, new_group_ptr, MPIR_SHRINK_TAG, - newcomm_ptr); - if (*newcomm_ptr == NULL) { - coll_attr = MPIR_ERR_PROC_FAILED; - } else if (mpi_errno) { - coll_attr = - MPIX_ERR_PROC_FAILED == - MPIR_ERR_GET_CLASS(mpi_errno) ? MPIR_ERR_PROC_FAILED : MPIR_ERR_OTHER; - MPIR_Comm_release(*newcomm_ptr); - } + MPIR_Group *global_failed; + mpi_errno = MPIR_Comm_get_failed_impl(comm_ptr, &global_failed); + MPIR_ERR_CHECK(mpi_errno); - mpi_errno = MPII_Allreduce_group(MPI_IN_PLACE, &coll_attr, 1, MPIR_INT_INTERNAL, MPI_MAX, - comm_ptr, new_group_ptr, MPIR_SHRINK_TAG, - MPIR_COLL_ATTR_SYNC); - MPIR_Group_release(new_group_ptr); + MPIR_Group *new_group_ptr; + mpi_errno = MPIR_Group_difference_impl(comm_grp, global_failed, &new_group_ptr); + MPIR_ERR_CHECK(mpi_errno); - if (coll_attr) { - if (*newcomm_ptr != NULL && MPIR_Object_get_ref(*newcomm_ptr) > 0) { - MPIR_Object_set_ref(*newcomm_ptr, 1); - MPIR_Comm_release(*newcomm_ptr); - } - if (MPIR_Object_get_ref(new_group_ptr) > 0) { - MPIR_Object_set_ref(new_group_ptr, 1); - MPIR_Group_release(new_group_ptr); - } - } else { - mpi_errno = MPI_SUCCESS; - goto fn_exit; - } - } while (++attempts < 5); + if (MPIR_Group_empty != global_failed) { + MPIR_Group_release(global_failed); + } - goto fn_fail; + mpi_errno = MPIR_Comm_create_group_impl(comm_ptr, new_group_ptr, MPIR_SHRINK_TAG, newcomm_ptr); + /* FIXME: what if user have not run MPIX_Comm_agree or there are new failed procs? + * We need handle MPIX_ERR_PROC_FAILED. + */ + MPIR_ERR_CHECK(mpi_errno); fn_exit: - MPIR_Group_release(comm_grp); + if (new_group_ptr) { + MPIR_Group_release(new_group_ptr); + } + if (comm_grp) { + MPIR_Group_release(comm_grp); + } MPIR_FUNC_EXIT; return mpi_errno; fn_fail: - if (*newcomm_ptr) - MPIR_Object_set_ref(*newcomm_ptr, 0); - MPIR_Object_set_ref(global_failed, 0); - MPIR_Object_set_ref(new_group_ptr, 0); goto fn_exit; } From 56513ebe7fc74a4c44b61bcb41e6e5ca20324e0f Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 20 Jan 2026 13:56:55 -0600 Subject: [PATCH 09/10] ch4/ulfm: retry probe when we stuck in wait Even when we first probe a process succeeded, the process still may die before it enters MPIX_Comm_agree and send us the probe. Regularly retry probe to prevent stuck in the waiting for a probe that never arrives. Potentially we may stuck during the broadcast stage as well, but that only mean the a process died *during* MPIX_Comm_agree. Hopefully, the chances of that is low. Compared to the previous case, the process may be doing arbitrary amount of work before entering MPIX_Comm_agree and may die before then. --- src/mpid/ch4/src/ch4_ulfm.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/mpid/ch4/src/ch4_ulfm.c b/src/mpid/ch4/src/ch4_ulfm.c index 2d4ab00666e..225c5c4e06e 100644 --- a/src/mpid/ch4/src/ch4_ulfm.c +++ b/src/mpid/ch4/src/ch4_ulfm.c @@ -99,6 +99,8 @@ int MPID_Comm_get_failed(MPIR_Comm * comm, MPIR_Group ** failed_group_ptr) goto fn_exit; } +#define ULFM_MAX_RETRY 1000 + int MPID_Comm_agree(MPIR_Comm * comm, int *flag) { int mpi_errno = MPI_SUCCESS; @@ -125,13 +127,21 @@ int MPID_Comm_agree(MPIR_Comm * comm, int *flag) /* first ascending the level as we probe the peer. */ while (level < q) { - int peer_root = ((rank >> level) ^ 1) << level; + int peer_root; + fn_retry: + peer_root = ((rank >> level) ^ 1) << level; mpi_errno = send_probe(entry, comm, epoch, level, DIR_UP); if (mpi_errno == MPI_SUCCESS) { /* probe sent, wait for reply */ + int count = 0; while (entry->up_probes[level] < epoch) { mpi_errno = MPIDI_progress_test_vci(0); MPIR_ERR_CHECK(mpi_errno); + count++; + if (count > ULFM_MAX_RETRY && entry->up_probes[level] < epoch) { + /* potentially the process died since we last probed, probe it again */ + goto fn_retry; + } } if (rank > peer_root) { @@ -151,6 +161,7 @@ int MPID_Comm_agree(MPIR_Comm * comm, int *flag) level--; } else if (level < q - 1) { /* all other processes wait for the broadcast before proceed */ + /* TODO: add retry probe incast the process died during Comm_agree */ while (entry->down_probes[level] < epoch) { mpi_errno = MPIDI_progress_test_vci(0); MPIR_ERR_CHECK(mpi_errno); From cfae45c1bb60b8a47d5c4185a90fef12756ad177 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 21 Jan 2026 14:55:40 -0600 Subject: [PATCH 10/10] ch4/ulfm: verify origin_rank of the received probes The probe may succeed to a dead rank then later we receive a probe from a substitute rank from the peer group. Since we never sent this substitute rank a probe, it may hang. Always verify the origin rank for received probes and send a make-up probe if it is from an unexpected rank. --- src/mpid/ch4/src/ch4_ulfm.c | 46 ++++++++++++++++++++++++------------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/src/mpid/ch4/src/ch4_ulfm.c b/src/mpid/ch4/src/ch4_ulfm.c index 225c5c4e06e..20b6bc9e3e3 100644 --- a/src/mpid/ch4/src/ch4_ulfm.c +++ b/src/mpid/ch4/src/ch4_ulfm.c @@ -39,10 +39,11 @@ typedef struct { int comm_size; int flag; - /* {up,down}_probes[level] is the latest epoch for the {up,down} probe - * at this level that we received. */ - int *up_probes; - int *down_probes; + /* {up,down}_probes[level] is the latest received probe at this level. */ + struct { + int epoch; + int origin_rank; + } *up_probes, *down_probes; /* process_dead_map[rank] is true if rank is dead */ bool *process_dead_map; /* failed_procs is a dynamic array of failed processes */ @@ -61,7 +62,7 @@ static MPIR_Lpid *global_failed_procs; static comm_agree_entry *find_comm_agree_entry(int context_id, int comm_size); static void update_failed_proc(comm_agree_entry * entry, int rank); static int send_probe(comm_agree_entry * entry, MPIR_Comm * comm, - int epoch, int level, int level_dir); + int epoch, int level, int level_dir, int *peer_rank); static int update_global_failed_procs(MPIR_Comm * comm, int num_failed, int *failed_procs); int MPID_Comm_get_failed(MPIR_Comm * comm, MPIR_Group ** failed_group_ptr) @@ -127,22 +128,30 @@ int MPID_Comm_agree(MPIR_Comm * comm, int *flag) /* first ascending the level as we probe the peer. */ while (level < q) { - int peer_root; + int peer_root, peer_rank; fn_retry: peer_root = ((rank >> level) ^ 1) << level; - mpi_errno = send_probe(entry, comm, epoch, level, DIR_UP); + mpi_errno = send_probe(entry, comm, epoch, level, DIR_UP, &peer_rank); if (mpi_errno == MPI_SUCCESS) { /* probe sent, wait for reply */ int count = 0; - while (entry->up_probes[level] < epoch) { + while (entry->up_probes[level].epoch < epoch) { mpi_errno = MPIDI_progress_test_vci(0); MPIR_ERR_CHECK(mpi_errno); count++; - if (count > ULFM_MAX_RETRY && entry->up_probes[level] < epoch) { + if (count > ULFM_MAX_RETRY && entry->up_probes[level].epoch < epoch) { /* potentially the process died since we last probed, probe it again */ goto fn_retry; } } + /* make sure we got from the expected rank, or we need resend the probe */ + if (entry->up_probes[level].origin_rank > peer_rank) { + mpi_errno = send_probe(entry, comm, epoch, level, DIR_UP, &peer_rank); + /* TODO: handle the error + * The process send us a prove then died? */ + MPIR_Assert(mpi_errno == MPI_SUCCESS && + peer_rank == entry->up_probes[level].origin_rank); + } if (rank > peer_root) { /* done at this level if peer is the lower rank */ @@ -162,14 +171,15 @@ int MPID_Comm_agree(MPIR_Comm * comm, int *flag) } else if (level < q - 1) { /* all other processes wait for the broadcast before proceed */ /* TODO: add retry probe incast the process died during Comm_agree */ - while (entry->down_probes[level] < epoch) { + while (entry->down_probes[level].epoch < epoch) { mpi_errno = MPIDI_progress_test_vci(0); MPIR_ERR_CHECK(mpi_errno); } } while (level > 0) { + int peer_rank; level--; - mpi_errno = send_probe(entry, comm, epoch, level, DIR_DOWN); + mpi_errno = send_probe(entry, comm, epoch, level, DIR_DOWN, &peer_rank); if (mpi_errno == MPIX_ERR_PROC_FAILED) { has_new_failures = true; } @@ -241,10 +251,12 @@ static int ulfm_probe_target_msg_cb(void *am_hdr, void *data, MPI_Aint data_sz, struct probe_hdr *hdr = am_hdr; comm_agree_entry *entry = find_comm_agree_entry(hdr->context_id, hdr->comm_size); if (hdr->level_dir == DIR_UP) { - entry->up_probes[hdr->level] = hdr->epoch; + entry->up_probes[hdr->level].epoch = hdr->epoch; + entry->up_probes[hdr->level].origin_rank = hdr->origin_rank; entry->flag &= hdr->flag; } else { - entry->down_probes[hdr->level] = hdr->epoch; + entry->down_probes[hdr->level].epoch = hdr->epoch; + entry->down_probes[hdr->level].origin_rank = hdr->origin_rank; entry->flag = hdr->flag; } @@ -265,7 +277,7 @@ static int ulfm_probe_target_msg_cb(void *am_hdr, void *data, MPI_Aint data_sz, /* ---- routines for sending probe ---- */ static int send_probe(comm_agree_entry * entry, MPIR_Comm * comm, - int epoch, int level, int level_dir) + int epoch, int level, int level_dir, int *peer_rank) { int mpi_errno = MPI_SUCCESS; @@ -312,6 +324,7 @@ static int send_probe(comm_agree_entry * entry, MPIR_Comm * comm, if (newly_failed_procs && level_dir == DIR_DOWN) { mpi_errno = MPIX_ERR_PROC_FAILED; } + *peer_rank = r; goto fn_exit; } else { newly_failed_procs++; @@ -326,6 +339,7 @@ static int send_probe(comm_agree_entry * entry, MPIR_Comm * comm, /* all processes in the peer group failed. We only need know is Comm_agree * during the UP (reduction) stage */ if (level_dir == DIR_UP) { + *peer_rank = -1; mpi_errno = MPIX_ERR_PROC_FAILED; } @@ -359,9 +373,9 @@ static comm_agree_entry *find_comm_agree_entry(int context_id, int comm_size) entry->flag = 0xffffffff; int pof2 = MPL_pof2(comm_size); - entry->up_probes = MPL_calloc(pof2, sizeof(int), MPL_MEM_OTHER); + entry->up_probes = MPL_calloc(pof2, sizeof(*entry->up_probes), MPL_MEM_OTHER); MPIR_Assert(entry->up_probes); - entry->down_probes = MPL_calloc(pof2, sizeof(int), MPL_MEM_OTHER); + entry->down_probes = MPL_calloc(pof2, sizeof(*entry->down_probes), MPL_MEM_OTHER); MPIR_Assert(entry->down_probes); entry->process_dead_map = MPL_calloc(comm_size, sizeof(int), MPL_MEM_OTHER);