From 87cb73748d19ce17208da050169a275a8cd4f714 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 17 Mar 2026 21:54:32 -0500 Subject: [PATCH 1/8] coll/cga: fix reduce chunk_count The chunk_count should fit within chunk_size. Also fix checking of contig datatypes. --- src/mpi/coll/algorithms/circ_graph/cga_request_queue.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c index 9cdc6d1b4d3..3cbf57ffcd4 100644 --- a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c +++ b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c @@ -253,9 +253,6 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, } else { /* reduction chunks have to contain whole datatypes */ chunk_count = chunk_size / type_size; - if (chunk_size > 0 && chunk_size % type_size > 0) { - chunk_count++; - } num_chunks = count / chunk_count; last_chunk_count = count % chunk_count; @@ -287,7 +284,7 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, if (!queue->need_pack) { /* datatype must be contig, no pack_buf, need tmpbuf to receive chunk data */ queue->u.reduce.tmpbuf_size = chunk_size; - } else if (type_size == true_extent) { + } else if (type_size == true_extent && type_size == extent) { /* datatype is contig, skip tmpbuf, we can do reduce from pack_buf */ queue->u.reduce.tmpbuf_size = 0; } else { From 44dfdcda03d5cecc2e6e6981400e3ee14f3399f4 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 18 Mar 2026 10:56:40 -0500 Subject: [PATCH 2/8] coll/cga: use MPI_Aint to prevent overflow It's possible users may set some unreasonable value for MPIR_CVAR_CIRC_GRAPH_CHUNK_SIZE, such as 1, that may create integer overflow issue. --- .../algorithms/circ_graph/cga_request_queue.c | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c index 3cbf57ffcd4..17db1e5ab0c 100644 --- a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c +++ b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c @@ -22,7 +22,7 @@ * Otherwise, the algorithm is inefficient or inconsistent. */ -static int calc_chunks(MPI_Aint data_size, MPI_Aint chunk_size, int *last_msg_size_out); +static MPI_Aint calc_chunks(MPI_Aint data_size, MPI_Aint chunk_size, MPI_Aint * last_msg_size_out); static int get_pending_id(MPII_cga_request_queue * queue, int block, int root); static int get_pending_req_id(MPII_cga_request_queue * queue, int block, int root); @@ -84,8 +84,9 @@ static void debug_queue(MPII_cga_request_queue * queue); /* Routines for managing non-blocking send/recv of chunks */ static int init_request_queue_common(MPII_cga_request_queue * queue, - int q_len, int num_pending, int num_chunks, int all_size, - int chunk_size, int last_chunk_size, bool inverse_order, + int q_len, int num_pending, MPI_Aint num_chunks, int all_size, + MPI_Aint chunk_size, MPI_Aint last_chunk_size, + bool inverse_order, void *buf, MPI_Aint count, MPI_Datatype datatype, MPIR_Comm * comm, int coll_attr) { @@ -179,8 +180,8 @@ int MPII_cga_init_bcast_queue(MPII_cga_request_queue * queue, int num_pending, MPIR_Datatype_get_size_macro(datatype, type_size); data_size = count * type_size; - int last_chunk_size; - int num_chunks = calc_chunks(data_size, chunk_size, &last_chunk_size); + MPI_Aint last_chunk_size; + MPI_Aint num_chunks = calc_chunks(data_size, chunk_size, &last_chunk_size); bool inverse_order = false; mpi_errno = init_request_queue_common(queue, q_len, num_pending, num_chunks, 1, @@ -210,8 +211,8 @@ int MPII_cga_init_allgather_queue(MPII_cga_request_queue * queue, int num_pendin MPI_Aint type_size; MPIR_Datatype_get_size_macro(datatype, type_size); - int last_chunk_size; - int num_chunks = calc_chunks(count * type_size, chunk_size, &last_chunk_size); + MPI_Aint last_chunk_size; + MPI_Aint num_chunks = calc_chunks(count * type_size, chunk_size, &last_chunk_size); int all_size = comm_size; bool inverse_order = false; @@ -234,7 +235,7 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, { int mpi_errno = MPI_SUCCESS; - int chunk_size = MPIR_CVAR_CIRC_GRAPH_CHUNK_SIZE; + MPI_Aint chunk_size = MPIR_CVAR_CIRC_GRAPH_CHUNK_SIZE; int q_len = MPIR_CVAR_CIRC_GRAPH_Q_LEN; /* minimum q_len is 2 */ @@ -1228,10 +1229,10 @@ static void debug_queue(MPII_cga_request_queue * queue) /* ---- math routines ---- */ -static int calc_chunks(MPI_Aint buf_size, MPI_Aint chunk_size, int *last_msg_size_out) +static MPI_Aint calc_chunks(MPI_Aint buf_size, MPI_Aint chunk_size, MPI_Aint * last_msg_size_out) { - int n; - int last_msg_size; + MPI_Aint n; + MPI_Aint last_msg_size; /* note: bcast zero sized messages is valid */ if (chunk_size == 0 || buf_size == 0) { From 84748c4ad566d3801c756ce2e6965056de5e4a71 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 18 Mar 2026 10:49:09 -0500 Subject: [PATCH 3/8] coll/cga: use malloc for packbuf if CHUNK_SIZE is 0 If MPIR_CVAR_CIRC_GRAPH_CHUNK_SIZE is 0, it forces the circ_graph algorithm to use a single chunk. Since we can't ensure the data will fit in a pre-allocated genq buffer pool, we fallback to use malloc. --- .../algorithms/circ_graph/cga_request_queue.c | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c index 17db1e5ab0c..1a3a089bb4b 100644 --- a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c +++ b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c @@ -35,7 +35,7 @@ static void remove_pending_req_id(MPII_cga_request_queue * queue, int block, int */ static void *get_persist_packbuf(MPII_cga_request_queue * queue, int block, int root); static void add_persist_packbuf(MPII_cga_request_queue * queue, int block, int root, void *packbuf); -static int alloc_packbuf(void **packbuf_out); +static int alloc_packbuf(MPII_cga_request_queue * queue, void **packbuf_out); /* free buffers depend on where they are stored */ static void clear_request(MPII_cga_request_queue * queue, int req_id); static void clear_pending(MPII_cga_request_queue * queue, int pending_id); @@ -245,6 +245,7 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, MPI_Aint type_size; MPIR_Datatype_get_size_macro(datatype, type_size); + MPIR_Assert(type_size > 0); MPI_Aint num_chunks, chunk_count, last_chunk_count, last_chunk_size; if (chunk_size == 0) { @@ -875,10 +876,17 @@ static void add_persist_packbuf(MPII_cga_request_queue * queue, int block, int r queue->pending_blocks[pending_id].persist_packbuf = packbuf; } -static int alloc_packbuf(void **packbuf_out) +static int alloc_packbuf(MPII_cga_request_queue * queue, void **packbuf_out) { - int mpi_errno; - mpi_errno = MPIDU_genq_private_pool_force_alloc_cell(MPIR_cga_chunk_pool, packbuf_out); + int mpi_errno = MPI_SUCCESS; + if (MPIR_cga_chunk_pool) { + mpi_errno = MPIDU_genq_private_pool_force_alloc_cell(MPIR_cga_chunk_pool, packbuf_out); + } else { + *packbuf_out = MPL_malloc(queue->chunk_size, MPL_MEM_COLL); + if (!(*packbuf_out)) { + MPIR_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**nomem"); + } + } return mpi_errno; } @@ -887,7 +895,11 @@ static void clear_request(MPII_cga_request_queue * queue, int req_id) #define REQi queue->requests[req_id] if (queue->coll_type == MPII_CGA_REDUCE) { if (REQi.packbuf) { - MPIDU_genq_private_pool_free_cell(MPIR_cga_chunk_pool, REQi.packbuf); + if (MPIR_cga_chunk_pool) { + MPIDU_genq_private_pool_free_cell(MPIR_cga_chunk_pool, REQi.packbuf); + } else { + MPL_free(REQi.packbuf); + } REQi.packbuf = NULL; } if (REQi.tmpbuf) { @@ -902,7 +914,11 @@ static void clear_pending(MPII_cga_request_queue * queue, int pending_id) { #define PENDING queue->pending_blocks[pending_id] if (PENDING.persist_packbuf) { - MPIDU_genq_private_pool_free_cell(MPIR_cga_chunk_pool, PENDING.persist_packbuf); + if (MPIR_cga_chunk_pool) { + MPIDU_genq_private_pool_free_cell(MPIR_cga_chunk_pool, PENDING.persist_packbuf); + } else { + MPL_free(PENDING.persist_packbuf); + } PENDING.persist_packbuf = NULL; } #undef PENDING @@ -1031,7 +1047,7 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) if (queue->need_pack) { void *pack_buf = get_persist_packbuf(queue, block, root); if (!pack_buf) { - mpi_errno = alloc_packbuf(&pack_buf); + mpi_errno = alloc_packbuf(queue, &pack_buf); MPIR_ERR_CHECK(mpi_errno); REQi.packbuf = pack_buf; @@ -1065,7 +1081,7 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) /* make sure all recvs are in order */ TEST_PENDING(check_pending_ops(queue, i, &flag)); void *pack_buf; - mpi_errno = alloc_packbuf(&pack_buf); + mpi_errno = alloc_packbuf(queue, &pack_buf); MPIR_ERR_CHECK(mpi_errno); REQi.packbuf = pack_buf; From 4a32670c33f1b418761af65fa4bdfed3d4f21ecc Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Mon, 16 Mar 2026 15:16:48 -0500 Subject: [PATCH 4/8] coll/cga: allow switching coll_type To support allreduce as a composition of Reduce + Bcast, we need allow switching coll_type in the middle of the algorithm. Previously if there is an dependent recv operation, it is always issued before any sends. If we allow switching coll_type, there will case of: recv->recv->...->send->recv->send->send->... So we need check every previously issued request in order to clear recv before send. --- .../algorithms/circ_graph/cga_request_queue.c | 46 ++++++++++++++----- .../coll/algorithms/circ_graph/circ_graph.h | 2 + 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c index 1a3a089bb4b..1ce6531bdb3 100644 --- a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c +++ b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c @@ -298,6 +298,20 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, return mpi_errno; } +int MPII_cga_switch_coll_type(MPII_cga_request_queue * queue, enum MPII_cga_type coll_type) +{ + /* for now, we only support switching from REDUCE to BCAST (as an ALLREDUCE composition) */ + MPIR_Assert(queue->coll_type == MPII_CGA_REDUCE && coll_type == MPII_CGA_BCAST); + queue->coll_type = MPII_CGA_BCAST; + queue->inverse_order = false; + /* at this point, pending_blocks must be tracking 0, 1, ..., num_pending-1 */ + MPIR_Assert(queue->pending_head_block == -1); + queue->pending_head_block = queue->num_pending; + queue->pending_head = 0; + + return MPI_SUCCESS; +} + #define GET_BLOCK_SIZE(block) (((block) == queue->num_chunks - 1) ? queue->last_chunk_size : queue->chunk_size) /* ---- bcast ---- */ @@ -614,6 +628,7 @@ static int issue_nb_op(MPII_cga_request_queue * queue, enum MPII_cga_op_type op_ int req_id = queue->q_head; #define REQi queue->requests[req_id] REQi.op_type = op_type; + REQi.coll_type = queue->coll_type; REQi.peer_rank = peer_rank; REQi.block = block; REQi.root = root; @@ -621,7 +636,7 @@ static int issue_nb_op(MPII_cga_request_queue * queue, enum MPII_cga_op_type op_ REQi.issued = false; REQi.tmpbuf = NULL; - if (op_type == MPII_CGA_OP_RECV && queue->coll_type == MPII_CGA_REDUCE) { + if (op_type == MPII_CGA_OP_RECV && REQi.coll_type == MPII_CGA_REDUCE) { /* reduce need recv (or unpack) into a tmp buffer before reduce_local */ if (queue->u.reduce.tmpbuf_size > 0) { REQi.tmpbuf = MPL_malloc(queue->u.reduce.tmpbuf_size, MPL_MEM_OTHER); @@ -893,7 +908,7 @@ static int alloc_packbuf(MPII_cga_request_queue * queue, void **packbuf_out) static void clear_request(MPII_cga_request_queue * queue, int req_id) { #define REQi queue->requests[req_id] - if (queue->coll_type == MPII_CGA_REDUCE) { + if (REQi.coll_type == MPII_CGA_REDUCE) { if (REQi.packbuf) { if (MPIR_cga_chunk_pool) { MPIDU_genq_private_pool_free_cell(MPIR_cga_chunk_pool, REQi.packbuf); @@ -952,10 +967,17 @@ static int clear_pending_recvs(MPII_cga_request_queue * queue, int cur_req_id, b int root = queue->requests[cur_req_id].root; int req_id = get_pending_req_id(queue, block, root); - if (req_id >= 0 && req_id != cur_req_id && queue->requests[req_id].op_type == MPII_CGA_OP_RECV) { - *flag = (queue->requests[req_id].op_stage == MPII_CGA_STAGE_NULL); - } else { - *flag = true; + while (true) { + MPIR_Assert(req_id >= 0); + if (req_id == cur_req_id) { + *flag = true; + break; + } + if (queue->requests[req_id].op_type == MPII_CGA_OP_RECV) { + *flag = false; + break; + } + req_id = queue->requests[req_id].next_req_id; } return MPI_SUCCESS; @@ -1051,7 +1073,7 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) MPIR_ERR_CHECK(mpi_errno); REQi.packbuf = pack_buf; - if (queue->coll_type == MPII_CGA_BCAST) { + if (REQi.coll_type == MPII_CGA_BCAST) { add_persist_packbuf(queue, block, root, pack_buf); } @@ -1059,7 +1081,7 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) MPIR_ERR_CHECK(mpi_errno); REQi.op_stage = MPII_CGA_STAGE_COPY; } else { - MPIR_Assert(queue->coll_type == MPII_CGA_BCAST); + MPIR_Assert(REQi.coll_type == MPII_CGA_BCAST); REQi.packbuf = pack_buf; /* make sure all sends are in order */ TEST_PENDING(check_pending_ops(queue, i, &flag)); @@ -1085,7 +1107,7 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) MPIR_ERR_CHECK(mpi_errno); REQi.packbuf = pack_buf; - if (queue->coll_type == MPII_CGA_BCAST) { + if (REQi.coll_type == MPII_CGA_BCAST) { add_persist_packbuf(queue, block, root, pack_buf); } @@ -1120,7 +1142,7 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) REQi.issued = true; REQi.op_stage = MPII_CGA_STAGE_REQUEST; } else { - if (queue->coll_type == MPII_CGA_REDUCE) { + if (REQi.coll_type == MPII_CGA_REDUCE) { /* can't have multiple reduce into the same buffer */ TEST_PENDING(clear_pending_recvs(queue, i, &flag)); /* blocking reduce will be done in the next loop. @@ -1143,7 +1165,7 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) goto fn_complete; } else { if (queue->need_pack) { - if (queue->coll_type == MPII_CGA_REDUCE && !REQi.tmpbuf) { + if (REQi.coll_type == MPII_CGA_REDUCE && !REQi.tmpbuf) { /* contig (gpu) case, we can directly reduce from pack_buf */ REQi.op_stage = MPII_CGA_STAGE_REDUCE; } else { @@ -1154,7 +1176,7 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) REQi.op_stage = MPII_CGA_STAGE_COPY; } } else { - if (queue->coll_type == MPII_CGA_REDUCE) { + if (REQi.coll_type == MPII_CGA_REDUCE) { REQi.op_stage = MPII_CGA_STAGE_REDUCE; } else { goto fn_complete; diff --git a/src/mpi/coll/algorithms/circ_graph/circ_graph.h b/src/mpi/coll/algorithms/circ_graph/circ_graph.h index 9ed8bcf440e..13c6bb09326 100644 --- a/src/mpi/coll/algorithms/circ_graph/circ_graph.h +++ b/src/mpi/coll/algorithms/circ_graph/circ_graph.h @@ -101,6 +101,7 @@ typedef struct { struct { enum MPII_cga_op_type op_type; + enum MPII_cga_type coll_type; enum MPII_cga_op_stage op_stage; /* sends and recvs need be issued in order. It is difficult to figure out whether * the request has been issued yet from op_stage. Use an explicit flag as shortcut. */ @@ -137,6 +138,7 @@ int MPII_cga_init_allgather_queue(MPII_cga_request_queue * queue, int num_pendin int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, void *recvbuf, MPI_Aint count, MPI_Datatype datatype, MPI_Op op, MPIR_Comm * comm, int coll_attr); +int MPII_cga_switch_coll_type(MPII_cga_request_queue * queue, enum MPII_cga_type coll_type); int MPII_cga_bcast_isend(MPII_cga_request_queue * queue, int block, int peer_rank, bool * flag); int MPII_cga_bcast_irecv(MPII_cga_request_queue * queue, int block, int peer_rank, bool * flag); From 09396ced03b171d8c2a4d8d81d9d9b1bad8a19a5 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 17 Mar 2026 17:49:05 -0500 Subject: [PATCH 5/8] coll/cga: packbuf dependency in allreduce 1. the persist_packbuf is only checked during the bcast stage. 2. add a flag, persist_packbuf_loaded, as a dependency to prevent a bcast send to proceed without previous packbuf copy incomplete. 3. To keep all sends and all recvs are issued in order, we only need check the op_stage is past START or COPY (for sends). --- .../algorithms/circ_graph/cga_request_queue.c | 70 ++++++++++++++++--- .../coll/algorithms/circ_graph/circ_graph.h | 1 + 2 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c index 1ce6531bdb3..326efb31d54 100644 --- a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c +++ b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c @@ -34,7 +34,9 @@ static void remove_pending_req_id(MPII_cga_request_queue * queue, int block, int * - for reduce, packbuf is stored in the requests[].packbuf, and freed with the request */ static void *get_persist_packbuf(MPII_cga_request_queue * queue, int block, int root); +static bool is_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block, int root); static void add_persist_packbuf(MPII_cga_request_queue * queue, int block, int root, void *packbuf); +static void set_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block, int root); static int alloc_packbuf(MPII_cga_request_queue * queue, void **packbuf_out); /* free buffers depend on where they are stored */ static void clear_request(MPII_cga_request_queue * queue, int req_id); @@ -864,6 +866,14 @@ static void *get_persist_packbuf(MPII_cga_request_queue * queue, int block, int return queue->pending_blocks[pending_id].persist_packbuf; } +static bool is_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block, int root) +{ + int pending_id = get_pending_id(queue, block, root); + MPIR_Assert(pending_id >= 0); + MPIR_Assert(queue->pending_blocks[pending_id].persist_packbuf); + return queue->pending_blocks[pending_id].persist_packbuf_loaded; +} + static void add_pending_req_id(MPII_cga_request_queue * queue, int block, int root, int req_id) { int pending_id = get_pending_id(queue, block, root); @@ -889,6 +899,15 @@ static void add_persist_packbuf(MPII_cga_request_queue * queue, int block, int r int pending_id = get_pending_id(queue, block, root); MPIR_Assert(pending_id >= 0); queue->pending_blocks[pending_id].persist_packbuf = packbuf; + queue->pending_blocks[pending_id].persist_packbuf_loaded = false; +} + +static void set_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block, int root) +{ + int pending_id = get_pending_id(queue, block, root); + MPIR_Assert(pending_id >= 0); + MPIR_Assert(queue->pending_blocks[pending_id].persist_packbuf); + queue->pending_blocks[pending_id].persist_packbuf_loaded = true; } static int alloc_packbuf(MPII_cga_request_queue * queue, void **packbuf_out) @@ -1001,12 +1020,24 @@ static int check_pending_ops(MPII_cga_request_queue * queue, int cur_req_id, boo if (req_id < 0) { req_id = queue->q_len - 1; } - /* we only need check 1 previous request since it is an all or none condition */ + /* The goal is to ensure all sends or recvs to the same rank are issued in order. + * Thus, we check that no previous send (or recv) are in a stage before issuing. + */ if (queue->requests[req_id].op_stage != MPII_CGA_STAGE_NULL && queue->requests[req_id].op_type == op_type && queue->requests[req_id].peer_rank == peer_rank) { - *flag = false; - goto fn_exit; + if (op_type == MPII_CGA_OP_SEND) { + if (queue->requests[req_id].op_stage == MPII_CGA_STAGE_START || + queue->requests[req_id].op_stage == MPII_CGA_STAGE_COPY) { + *flag = false; + goto fn_exit; + } + } else { /* MPII_CGA_OP_RECV */ + if (queue->requests[req_id].op_stage == MPII_CGA_STAGE_START) { + *flag = false; + goto fn_exit; + } + } } } @@ -1067,22 +1098,37 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) /* send need clear previous recvs or the data is incorrect */ TEST_PENDING(clear_pending_recvs(queue, i, &flag)); if (queue->need_pack) { - void *pack_buf = get_persist_packbuf(queue, block, root); - if (!pack_buf) { + void *pack_buf = NULL; + bool new_pack_buf = false; + if (REQi.coll_type == MPII_CGA_REDUCE) { + /* reduce can't reuse pack buffer */ mpi_errno = alloc_packbuf(queue, &pack_buf); MPIR_ERR_CHECK(mpi_errno); REQi.packbuf = pack_buf; - if (REQi.coll_type == MPII_CGA_BCAST) { + new_pack_buf = true; + } else { /* MPII_CGA_BCAST */ + pack_buf = get_persist_packbuf(queue, block, root); + if (!pack_buf) { + mpi_errno = alloc_packbuf(queue, &pack_buf); + MPIR_ERR_CHECK(mpi_errno); + + REQi.packbuf = pack_buf; add_persist_packbuf(queue, block, root, pack_buf); + new_pack_buf = true; + } else { + /* make sure the packbuf is loaded */ + if (!is_persist_packbuf_loaded(queue, block, root)) { + goto fn_cont; + } + REQi.packbuf = pack_buf; } - + } + if (new_pack_buf) { mpi_errno = issue_pack(queue, block, root, pack_buf, &REQi.u.async_req); MPIR_ERR_CHECK(mpi_errno); REQi.op_stage = MPII_CGA_STAGE_COPY; } else { - MPIR_Assert(REQi.coll_type == MPII_CGA_BCAST); - REQi.packbuf = pack_buf; /* make sure all sends are in order */ TEST_PENDING(check_pending_ops(queue, i, &flag)); mpi_errno = issue_isend_packed(queue, block, root, REQi.peer_rank, @@ -1136,6 +1182,9 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) if (REQi.op_type == MPII_CGA_OP_SEND) { /* make sure all sends are in order */ TEST_PENDING(check_pending_ops(queue, i, &flag)); + if (REQi.coll_type == MPII_CGA_BCAST) { + set_persist_packbuf_loaded(queue, block, root); + } mpi_errno = issue_isend_packed(queue, block, root, REQi.peer_rank, REQi.packbuf, &REQi.u.req); MPIR_ERR_CHECK(mpi_errno); @@ -1165,6 +1214,9 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) goto fn_complete; } else { if (queue->need_pack) { + if (REQi.coll_type == MPII_CGA_BCAST) { + set_persist_packbuf_loaded(queue, block, root); + } if (REQi.coll_type == MPII_CGA_REDUCE && !REQi.tmpbuf) { /* contig (gpu) case, we can directly reduce from pack_buf */ REQi.op_stage = MPII_CGA_STAGE_REDUCE; diff --git a/src/mpi/coll/algorithms/circ_graph/circ_graph.h b/src/mpi/coll/algorithms/circ_graph/circ_graph.h index 13c6bb09326..a47404363d5 100644 --- a/src/mpi/coll/algorithms/circ_graph/circ_graph.h +++ b/src/mpi/coll/algorithms/circ_graph/circ_graph.h @@ -94,6 +94,7 @@ typedef struct { struct { int req_id; /* points to the index of the pending requests */ void *persist_packbuf; /* for bcast, avoid packing for every send */ + bool persist_packbuf_loaded; /* avoid sending unloaded packbuf */ } *pending_blocks; int pending_head; int pending_head_block; From e5be20c0ab95f5095f1469baf3de1f3def545c4e Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Mon, 16 Mar 2026 16:13:11 -0500 Subject: [PATCH 6/8] coll: add intra_circ_graph allreduce algorithm Implemented as a composition of Reduce + Bcast. --- src/mpi/coll/allreduce/Makefile.mk | 1 + .../allreduce/allreduce_intra_circ_graph.c | 136 ++++++++++++++++++ src/mpi/coll/coll_algorithms.txt | 2 + src/mpi/coll/cvars.txt | 1 + src/mpi/coll/include/csel_container.h | 1 + src/mpi/coll/src/csel_container.c | 2 + test/mpi/maint/coll_cvars.txt | 3 + 7 files changed, 146 insertions(+) create mode 100644 src/mpi/coll/allreduce/allreduce_intra_circ_graph.c diff --git a/src/mpi/coll/allreduce/Makefile.mk b/src/mpi/coll/allreduce/Makefile.mk index 5db420bd059..d1b81f2c874 100644 --- a/src/mpi/coll/allreduce/Makefile.mk +++ b/src/mpi/coll/allreduce/Makefile.mk @@ -18,4 +18,5 @@ mpi_core_sources += \ src/mpi/coll/allreduce/allreduce_intra_ring.c \ src/mpi/coll/allreduce/allreduce_intra_k_reduce_scatter_allgather.c \ src/mpi/coll/allreduce/allreduce_intra_ccl.c \ + src/mpi/coll/allreduce/allreduce_intra_circ_graph.c \ src/mpi/coll/allreduce/allreduce_inter_reduce_exchange_bcast.c diff --git a/src/mpi/coll/allreduce/allreduce_intra_circ_graph.c b/src/mpi/coll/allreduce/allreduce_intra_circ_graph.c new file mode 100644 index 00000000000..b5780f74e21 --- /dev/null +++ b/src/mpi/coll/allreduce/allreduce_intra_circ_graph.c @@ -0,0 +1,136 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#include "mpiimpl.h" +#include "circ_graph.h" + +/* Algorithm: Circulant graph allreduce + * + * This algorithm is a combination of reduce_circ_graph + bcast_circ_graph. + * + * It is not round-efficient for small to medium messages, but can be efficient for + * large message when there are enough chunks to saturate the pipeline. + */ + +int MPIR_Allreduce_intra_circ_graph(const void *sendbuf, void *recvbuf, + MPI_Aint count, MPI_Datatype datatype, + MPI_Op op, MPIR_Comm * comm, int coll_attr) +{ + int mpi_errno = MPI_SUCCESS; + MPIR_CHKLMEM_DECL(); + + int rank, comm_size; + MPIR_COMM_RANK_SIZE(comm, rank, comm_size); + + MPIR_Assert(MPIR_Op_is_commutative(op)); + + if (sendbuf != MPI_IN_PLACE) { + mpi_errno = MPIR_Localcopy(sendbuf, count, datatype, recvbuf, count, datatype); + MPIR_ERR_CHECK(mpi_errno); + } + + /* calculate the schedule */ + MPII_circ_graph cga; + mpi_errno = MPII_circ_graph_create(&cga, comm_size, rank); + MPIR_ERR_CHECK(mpi_errno); + + /* Run schedule */ + MPII_cga_request_queue queue; + int min_pending_blocks = cga.q * 2; + mpi_errno = MPII_cga_init_reduce_queue(&queue, min_pending_blocks, + recvbuf, count, datatype, op, comm, coll_attr); + MPIR_ERR_CHECK(mpi_errno); + + /* First run the reduce schedule */ + int n = queue.num_chunks; + int p = cga.p; + int q = cga.q; + int x = (q - ((n - 1) % q)) % q; + int offset = n - 1; + + for (int i = n + q + x - 2; i >= x; i--) { + int k = i % q; + + int send_block = cga.R[k] + offset; + if (send_block >= 0) { + int peer = (rank - cga.Skip[k] + p) % p; + if (rank != 0) { + if (send_block >= n) { + send_block = n - 1; + } + + mpi_errno = MPII_cga_reduce_send(&queue, send_block, peer); + MPIR_ERR_CHECK(mpi_errno); + } + } + + int recv_block = cga.S[k] + offset; + if (recv_block >= 0) { + int peer = (rank + cga.Skip[k]) % p; + if (peer != 0) { + if (recv_block >= n) { + recv_block = n - 1; + } + + mpi_errno = MPII_cga_reduce_recv(&queue, recv_block, peer); + MPIR_ERR_CHECK(mpi_errno); + } + } + + if (k == 0) { + offset -= q; + } + } + + MPII_cga_switch_coll_type(&queue, MPII_CGA_BCAST); + offset = -x; + + /* Then run the bcast schedule */ + for (int i = x; i < n - 1 + q + x; i++) { + int k = i % q; + + int send_block = cga.S[k] + offset; + if (send_block >= 0) { + int peer = (rank + cga.Skip[k]) % p; + if (peer != 0) { + if (send_block >= n) { + send_block = n - 1; + } + + mpi_errno = MPII_cga_bcast_send(&queue, send_block, peer); + MPIR_ERR_CHECK(mpi_errno); + } + } + + int recv_block = cga.R[k] + offset; + if (recv_block >= 0) { + int peer = (rank - cga.Skip[k] + p) % p; + if (rank != 0) { + if (recv_block >= n) { + recv_block = n - 1; + } + + mpi_errno = MPII_cga_bcast_recv(&queue, recv_block, peer); + MPIR_ERR_CHECK(mpi_errno); + } + } + + if (k == q - 1) { + offset += q; + } + } + + /* wait for all pending requests */ + mpi_errno = MPII_cga_waitall(&queue); + MPIR_ERR_CHECK(mpi_errno); + + MPII_circ_graph_free(&cga); + MPIR_CHKLMEM_FREEALL(); + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} diff --git a/src/mpi/coll/coll_algorithms.txt b/src/mpi/coll/coll_algorithms.txt index e0c9edeec60..3feeaf568ff 100644 --- a/src/mpi/coll/coll_algorithms.txt +++ b/src/mpi/coll/coll_algorithms.txt @@ -364,6 +364,8 @@ allreduce-intra: ccl extra_params: ccl cvar_params: CCL + circ_graph + restrictions: commutative allreduce-inter: reduce_exchange_bcast iallreduce-intra: diff --git a/src/mpi/coll/cvars.txt b/src/mpi/coll/cvars.txt index f8b3ab97bea..b166fff6fd8 100644 --- a/src/mpi/coll/cvars.txt +++ b/src/mpi/coll/cvars.txt @@ -1374,6 +1374,7 @@ cvars: ring - Force ring algorithm k_reduce_scatter_allgather - Force reduce scatter allgather algorithm ccl - Force CCL algorithm + circ_graph - Force circulant graph algorithm - name : MPIR_CVAR_ALLREDUCE_RECURSIVE_MULTIPLYING_KVAL category : COLLECTIVE diff --git a/src/mpi/coll/include/csel_container.h b/src/mpi/coll/include/csel_container.h index 4df1ea8b0f6..15aa846efa6 100644 --- a/src/mpi/coll/include/csel_container.h +++ b/src/mpi/coll/include/csel_container.h @@ -34,6 +34,7 @@ typedef enum { MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_ring, MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_k_reduce_scatter_allgather, MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_ccl, + MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_circ_graph, MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_inter_reduce_exchange_bcast, MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_allcomm_nb, MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Alltoall_intra_brucks, diff --git a/src/mpi/coll/src/csel_container.c b/src/mpi/coll/src/csel_container.c index 6a352782417..d53c67f8258 100644 --- a/src/mpi/coll/src/csel_container.c +++ b/src/mpi/coll/src/csel_container.c @@ -431,6 +431,8 @@ void *MPII_Create_container(struct json_object *obj) MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_k_reduce_scatter_allgather; else if (!strcmp(ckey, "algorithm=MPIR_Allreduce_intra_ccl")) cnt->id = MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_ccl; + else if (!strcmp(ckey, "algorithm=MPIR_Allreduce_intra_circ_graph")) + cnt->id = MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_circ_graph; else if (!strcmp(ckey, "algorithm=MPIR_Allreduce_inter_reduce_exchange_bcast")) cnt->id = MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_inter_reduce_exchange_bcast; diff --git a/test/mpi/maint/coll_cvars.txt b/test/mpi/maint/coll_cvars.txt index d3c4ba0bad6..86e8ed7ddf0 100644 --- a/test/mpi/maint/coll_cvars.txt +++ b/test/mpi/maint/coll_cvars.txt @@ -257,6 +257,9 @@ algorithms: .MPIR_CVAR_COLL_SHM_LIMIT_PER_NODE=131072 .MPIR_CVAR_REDUCE_INTRANODE_BUFFER_TOTAL_SIZE=16384 .MPIR_CVAR_REDUCE_INTRANODE_TREE_KVAL=4,8 + circ_graph + .MPIR_CVAR_CIRC_GRAPH_CHUNK_SIZE=0,1024,262144 + .MPIR_CVAR_CIRC_GRAPH_Q_LEN=2,8 intra-nonblocking: sched_naive sched_smp From 5866780b62c274be7daf3c4e1659373bae68f51f Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 17 Mar 2026 22:03:21 -0500 Subject: [PATCH 7/8] test: modify coll/allred.c to avoid exceeding precision When the count argument is big, the reduction result, especially for MPI_PROD and MPI_FLOAT, it can easily exceed the floating point precision. The test become unstable for algorithm that doesn't perform reduction in the same order as the provided reference solution. Avoid big value by mod 10 in the data so we can test arbitrary large data size without worry about loss of precision. --- test/mpi/coll/allred.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/mpi/coll/allred.c b/test/mpi/coll/allred.c index b44807f6696..e40b1a2be94 100644 --- a/test/mpi/coll/allred.c +++ b/test/mpi/coll/allred.c @@ -230,7 +230,7 @@ static void set_index_sum(MPI_Datatype mpi_type, int category, void *arr, int va int type_size = get_mpi_type_size(mpi_type); char *p = arr; for (int i = 0; i < count; i++) { - set_category_value(category, p, type_size, i + val); + set_category_value(category, p, type_size, (i % 10) + val); p += type_size; } } @@ -240,7 +240,7 @@ static void set_index_factor(MPI_Datatype mpi_type, int category, void *arr, int int type_size = get_mpi_type_size(mpi_type); char *p = arr; for (int i = 0; i < count; i++) { - set_category_value(category, p, type_size, i * val); + set_category_value(category, p, type_size, (i % 10) * val); p += type_size; } } @@ -259,7 +259,7 @@ static void set_index_power(MPI_Datatype mpi_type, int category, void *arr, int int type_size = get_mpi_type_size(mpi_type); char *p = arr; for (int i = 0; i < count; i++) { - set_category_value(category, p, type_size, get_pow(i, val)); + set_category_value(category, p, type_size, get_pow(i % 10, val)); p += type_size; } } From 1856eeef51f6804d2990bde2133b7b8c39d8b3d8 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 18 Mar 2026 15:57:05 -0500 Subject: [PATCH 8/8] coll/cga: add staging to avoid direct gpu reduction In allreduce, intermediate reduction should avoid directly operate on GPU buffers to cut unnecessary latency. --- .../algorithms/circ_graph/cga_request_queue.c | 141 +++++++++++++++++- .../coll/algorithms/circ_graph/circ_graph.h | 11 +- .../allreduce/allreduce_intra_circ_graph.c | 4 +- 3 files changed, 146 insertions(+), 10 deletions(-) diff --git a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c index 326efb31d54..044657e16b2 100644 --- a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c +++ b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c @@ -35,8 +35,11 @@ static void remove_pending_req_id(MPII_cga_request_queue * queue, int block, int */ static void *get_persist_packbuf(MPII_cga_request_queue * queue, int block, int root); static bool is_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block, int root); +static bool is_in_staging(MPII_cga_request_queue * queue, int block, int root); static void add_persist_packbuf(MPII_cga_request_queue * queue, int block, int root, void *packbuf); static void set_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block, int root); +static void set_in_staging(MPII_cga_request_queue * queue, int block, int root); + static int alloc_packbuf(MPII_cga_request_queue * queue, void **packbuf_out); /* free buffers depend on where they are stored */ static void clear_request(MPII_cga_request_queue * queue, int req_id); @@ -50,6 +53,8 @@ static int issue_pack(MPII_cga_request_queue * queue, int block, int root, void *packbuf, MPIR_gpu_req * areq); static int issue_unpack(MPII_cga_request_queue * queue, int block, int root, void *packbuf, void *tmpbuf, MPIR_gpu_req * areq); +static int issue_staging(MPII_cga_request_queue * queue, int block, int root, + bool is_load, MPIR_gpu_req * areq); static int issue_isend_contig(MPII_cga_request_queue * queue, int block, int root, int peer_rank, MPIR_Request ** req); static int issue_isend_packed(MPII_cga_request_queue * queue, int block, int root, @@ -100,6 +105,7 @@ static int init_request_queue_common(MPII_cga_request_queue * queue, MPIR_GPU_query_pointer_attr(buf, &queue->attr); queue->need_pack = (!dt_contig || MPL_gpu_attr_is_dev(&queue->attr)); + queue->need_staging = false; queue->comm = comm; queue->coll_attr = coll_attr; queue->num_chunks = num_chunks; @@ -110,6 +116,7 @@ static int init_request_queue_common(MPII_cga_request_queue * queue, queue->count = count; queue->datatype = datatype; queue->inverse_order = inverse_order; + queue->dt_contig = dt_contig; mpi_errno = MPIR_Sched_next_tag(comm, &queue->tag); MPIR_ERR_CHECK(mpi_errno); @@ -284,6 +291,7 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, queue->u.reduce.type_size = type_size; queue->u.reduce.type_extent = extent; queue->u.reduce.true_lb = true_lb; + queue->u.reduce.true_extent = true_extent; queue->u.reduce.chunk_extent = chunk_count * extent; if (!queue->need_pack) { /* datatype must be contig, no pack_buf, need tmpbuf to receive chunk data */ @@ -300,6 +308,24 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, return mpi_errno; } +int MPII_cga_init_allreduce_queue(MPII_cga_request_queue * queue, int num_pending, + void *recvbuf, MPI_Aint count, MPI_Datatype datatype, + MPI_Op op, MPIR_Comm * comm, int coll_attr) +{ + int mpi_errno = MPII_cga_init_reduce_queue(queue, num_pending, recvbuf, count, datatype, + op, comm, coll_attr); + if (queue->need_pack && MPL_gpu_attr_is_dev(&queue->attr)) { + /* alloc queue->hostbuf */ + MPI_Aint buf_size = (count - 1) * queue->u.reduce.type_extent + queue->u.reduce.true_extent; + queue->u.reduce.staging_buf = MPL_malloc(buf_size, MPL_MEM_COLL);; + if (!queue->u.reduce.staging_buf) { + MPIR_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**nomem"); + } + queue->need_staging = true; + } + return mpi_errno; +} + int MPII_cga_switch_coll_type(MPII_cga_request_queue * queue, enum MPII_cga_type coll_type) { /* for now, we only support switching from REDUCE to BCAST (as an ALLREDUCE composition) */ @@ -527,7 +553,13 @@ static int reduce_local(MPII_cga_request_queue * queue, int block, void *buf) int mpi_errno = MPI_SUCCESS; void *buf_in = (char *) buf - queue->u.reduce.true_lb; - void *buf_inout = (char *) queue->buf + block * queue->u.reduce.chunk_extent; + void *buf_inout; + if (queue->need_staging) { + buf_inout = (char *) queue->u.reduce.staging_buf - queue->u.reduce.true_lb + + block * queue->u.reduce.chunk_extent; + } else { + buf_inout = (char *) queue->buf + block * queue->u.reduce.chunk_extent; + } MPI_Aint count = GET_BLOCK_SIZE(block) / queue->u.reduce.type_size; mpi_errno = MPIR_Reduce_local(buf_in, buf_inout, count, queue->datatype, queue->u.reduce.op); @@ -590,6 +622,9 @@ int MPII_cga_testall(MPII_cga_request_queue * queue, bool * is_done) /* free the memory */ MPL_free(queue->pending_blocks); MPL_free(queue->requests); + if (queue->need_staging) { + MPL_free(queue->u.reduce.staging_buf); + } *is_done = true; @@ -636,6 +671,7 @@ static int issue_nb_op(MPII_cga_request_queue * queue, enum MPII_cga_op_type op_ REQi.root = root; REQi.op_stage = MPII_CGA_STAGE_START; REQi.issued = false; + REQi.staging_areq.type = MPIR_NULL_REQUEST; REQi.tmpbuf = NULL; if (op_type == MPII_CGA_OP_RECV && REQi.coll_type == MPII_CGA_REDUCE) { @@ -695,8 +731,11 @@ static int issue_pack(MPII_cga_request_queue * queue, int block, int root, MPI_Aint nbytes = GET_BLOCK_SIZE(block); void *src_buf = queue->buf; + if (queue->need_staging && is_in_staging(queue, block, root)) { + src_buf = (char *) queue->u.reduce.staging_buf - queue->u.reduce.true_lb; + } if (root > 0) { - src_buf = (char *) queue->buf + root * queue->buf_extent; + src_buf = (char *) src_buf + root * queue->buf_extent; } int engine_type = MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH; /* TODO: add a cvar */ @@ -708,6 +747,38 @@ static int issue_pack(MPII_cga_request_queue * queue, int block, int root, return mpi_errno; } +static int issue_staging(MPII_cga_request_queue * queue, int block, int root, + bool is_load, MPIR_gpu_req * areq) +{ + int mpi_errno = MPI_SUCCESS; + + MPI_Aint nbytes = GET_BLOCK_SIZE(block); + MPI_Aint chunk_count = nbytes / queue->u.reduce.type_size; + + char *src_buf, *dst_buf; + if (is_load) { + src_buf = (char *) queue->buf; + dst_buf = (char *) queue->u.reduce.staging_buf - queue->u.reduce.true_lb; + } else { + src_buf = (char *) queue->u.reduce.staging_buf - queue->u.reduce.true_lb; + dst_buf = (char *) queue->buf; + } + if (root > 0) { + src_buf += root * queue->buf_extent; + dst_buf += root * queue->buf_extent; + } + MPI_Aint offset = block * queue->chunk_size; + src_buf += offset; + dst_buf += offset; + + int engine_type = MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH; /* TODO: add a cvar */ + mpi_errno = MPIR_Ilocalcopy_gpu(src_buf, chunk_count, queue->datatype, 0, &queue->attr, + dst_buf, chunk_count, queue->datatype, 0, NULL, engine_type, 1, + areq); + + return mpi_errno; +} + static int issue_isend_contig(MPII_cga_request_queue * queue, int block, int root, int peer_rank, MPIR_Request ** req) { @@ -715,11 +786,14 @@ static int issue_isend_contig(MPII_cga_request_queue * queue, int block, int roo MPI_Aint nbytes = GET_BLOCK_SIZE(block); MPI_Aint offset = block * queue->chunk_size; - const void *send_buf; + char *send_buf = (char *) queue->buf;; + if (is_in_staging(queue, block, root)) { + send_buf = (char *) queue->u.reduce.staging_buf - queue->u.reduce.true_lb; + } if (root == 0) { - send_buf = (char *) queue->buf + offset; + send_buf += offset; } else { - send_buf = (char *) queue->buf + root * queue->buf_extent + offset; + send_buf += root * queue->buf_extent + offset; } mpi_errno = MPIC_Isend(send_buf, nbytes, MPIR_BYTE_INTERNAL, peer_rank, queue->tag, queue->comm, req, queue->coll_attr); @@ -874,6 +948,16 @@ static bool is_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block, return queue->pending_blocks[pending_id].persist_packbuf_loaded; } +static bool is_in_staging(MPII_cga_request_queue * queue, int block, int root) +{ + if (!queue->need_staging) { + return false; + } + int pending_id = get_pending_id(queue, block, root); + MPIR_Assert(pending_id >= 0); + return queue->pending_blocks[pending_id].in_staging; +} + static void add_pending_req_id(MPII_cga_request_queue * queue, int block, int root, int req_id) { int pending_id = get_pending_id(queue, block, root); @@ -910,6 +994,13 @@ static void set_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block queue->pending_blocks[pending_id].persist_packbuf_loaded = true; } +static void set_in_staging(MPII_cga_request_queue * queue, int block, int root) +{ + int pending_id = get_pending_id(queue, block, root); + MPIR_Assert(pending_id >= 0); + queue->pending_blocks[pending_id].in_staging = true; +} + static int alloc_packbuf(MPII_cga_request_queue * queue, void **packbuf_out) { int mpi_errno = MPI_SUCCESS; @@ -955,6 +1046,7 @@ static void clear_pending(MPII_cga_request_queue * queue, int pending_id) } PENDING.persist_packbuf = NULL; } + PENDING.in_staging = false; #undef PENDING } @@ -992,7 +1084,8 @@ static int clear_pending_recvs(MPII_cga_request_queue * queue, int cur_req_id, b *flag = true; break; } - if (queue->requests[req_id].op_type == MPII_CGA_OP_RECV) { + if (queue->requests[req_id].op_type == MPII_CGA_OP_RECV && + queue->requests[req_id].op_stage != MPII_CGA_STAGE_UNSTAGE) { *flag = false; break; } @@ -1097,7 +1190,10 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) if (REQi.op_type == MPII_CGA_OP_SEND) { /* send need clear previous recvs or the data is incorrect */ TEST_PENDING(clear_pending_recvs(queue, i, &flag)); - if (queue->need_pack) { + /* with contig staging_buf, we can skip pack_buf, but only if it's in REDUCE or rank 0 */ +#define IN_REDUCE_OR_RANK_0 (REQi.coll_type == MPII_CGA_REDUCE || queue->comm->rank == 0) +#define CONTIG_AND_IN_STAGING (queue->dt_contig && is_in_staging(queue, block, root)) + if (queue->need_pack && !(IN_REDUCE_OR_RANK_0 && CONTIG_AND_IN_STAGING)) { void *pack_buf = NULL; bool new_pack_buf = false; if (REQi.coll_type == MPII_CGA_REDUCE) { @@ -1145,6 +1241,14 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) REQi.op_stage = MPII_CGA_STAGE_REQUEST; } } else { /* MPII_CGA_OP_RECV */ + if (REQi.coll_type == MPII_CGA_REDUCE && queue->need_staging && + !is_in_staging(queue, block, root)) { + /* issue concurrent loading of staging buffer */ + mpi_errno = issue_staging(queue, block, root, true /* is_load */ , + &REQi.staging_areq); + MPIR_ERR_CHECK(mpi_errno); + set_in_staging(queue, block, root); + } if (queue->need_pack) { /* make sure all recvs are in order */ TEST_PENDING(check_pending_ops(queue, i, &flag)); @@ -1205,6 +1309,14 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) if (!MPIR_Request_is_complete(REQi.u.req)) { goto fn_cont; } + /* a reduction recv may issue a concurrent loading staging_buf, check before proceed */ + if (REQi.staging_areq.type != MPIR_NULL_REQUEST) { + int done; + MPIR_async_test(&REQi.staging_areq, &done); + if (!done) { + goto fn_cont; + } + } /* -- transition -- */ MPIR_Assert(REQi.u.req->status.MPI_ERROR == MPI_SUCCESS); @@ -1243,6 +1355,21 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) mpi_errno = reduce_local(queue, block, buf); MPIR_ERR_CHECK(mpi_errno); + if (queue->need_staging && queue->comm->rank == 0) { + mpi_errno = issue_staging(queue, block, root, false /* unstaging */ , + &REQi.u.async_req); + MPIR_ERR_CHECK(mpi_errno); + + REQi.op_stage = MPII_CGA_STAGE_UNSTAGE; + } else { + goto fn_complete; + } + } else if (REQi.op_stage == MPII_CGA_STAGE_UNSTAGE) { + int done; + MPIR_async_test(&REQi.u.async_req, &done); + if (!done) { + goto fn_cont; + } goto fn_complete; } else { MPIR_Assert(0); diff --git a/src/mpi/coll/algorithms/circ_graph/circ_graph.h b/src/mpi/coll/algorithms/circ_graph/circ_graph.h index a47404363d5..98bfbeba544 100644 --- a/src/mpi/coll/algorithms/circ_graph/circ_graph.h +++ b/src/mpi/coll/algorithms/circ_graph/circ_graph.h @@ -56,6 +56,7 @@ enum MPII_cga_op_stage { MPII_CGA_STAGE_COPY, /* waiting for the async local copy */ MPII_CGA_STAGE_REQUEST, /* waiting for the send/recv request */ MPII_CGA_STAGE_REDUCE, /* at reduce_local pending dependency */ + MPII_CGA_STAGE_UNSTAGE, /* when allreduce staging is used, rank 0 need unstage after reduce_local */ }; typedef struct { @@ -69,7 +70,8 @@ typedef struct { MPI_Aint count; MPI_Datatype datatype; MPI_Aint buf_extent; /* count * extent, needed by allgather */ - + bool dt_contig; + bool need_staging; bool need_pack; MPL_pointer_attr_t attr; @@ -80,8 +82,10 @@ typedef struct { MPI_Aint type_extent; MPI_Aint chunk_extent; /* for calc buf offset at a block */ MPI_Aint true_lb; /* adjustment for tmp_buf */ + MPI_Aint true_extent; MPI_Aint tmpbuf_size; /* (chunk_count - 1) * extent + true_extent, but 0 if not needed */ MPI_Op op; + void *staging_buf; /* if recvbuf is in GPU, use staging buf for intermediate results */ } reduce; } u; @@ -95,6 +99,7 @@ typedef struct { int req_id; /* points to the index of the pending requests */ void *persist_packbuf; /* for bcast, avoid packing for every send */ bool persist_packbuf_loaded; /* avoid sending unloaded packbuf */ + bool in_staging; /* for allreduce, whether data is in staging_buf */ } *pending_blocks; int pending_head; int pending_head_block; @@ -111,6 +116,7 @@ typedef struct { MPIR_gpu_req async_req; MPIR_Request *req; } u; + MPIR_gpu_req staging_areq; /* allreduce recv may issue concurrent loading of staging buffer */ void *packbuf; /* if need_pack, allocated chunk buffer */ void *tmpbuf; /* reduce need recv into a tmpbuf before reduce_local */ int block; @@ -139,6 +145,9 @@ int MPII_cga_init_allgather_queue(MPII_cga_request_queue * queue, int num_pendin int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, void *recvbuf, MPI_Aint count, MPI_Datatype datatype, MPI_Op op, MPIR_Comm * comm, int coll_attr); +int MPII_cga_init_allreduce_queue(MPII_cga_request_queue * queue, int num_pending, + void *recvbuf, MPI_Aint count, MPI_Datatype datatype, + MPI_Op op, MPIR_Comm * comm, int coll_attr); int MPII_cga_switch_coll_type(MPII_cga_request_queue * queue, enum MPII_cga_type coll_type); int MPII_cga_bcast_isend(MPII_cga_request_queue * queue, int block, int peer_rank, bool * flag); diff --git a/src/mpi/coll/allreduce/allreduce_intra_circ_graph.c b/src/mpi/coll/allreduce/allreduce_intra_circ_graph.c index b5780f74e21..cceda830bc1 100644 --- a/src/mpi/coll/allreduce/allreduce_intra_circ_graph.c +++ b/src/mpi/coll/allreduce/allreduce_intra_circ_graph.c @@ -39,8 +39,8 @@ int MPIR_Allreduce_intra_circ_graph(const void *sendbuf, void *recvbuf, /* Run schedule */ MPII_cga_request_queue queue; int min_pending_blocks = cga.q * 2; - mpi_errno = MPII_cga_init_reduce_queue(&queue, min_pending_blocks, - recvbuf, count, datatype, op, comm, coll_attr); + mpi_errno = MPII_cga_init_allreduce_queue(&queue, min_pending_blocks, + recvbuf, count, datatype, op, comm, coll_attr); MPIR_ERR_CHECK(mpi_errno); /* First run the reduce schedule */