2025年MPI的NBX的实现

MPI的NBX的实现首先 假设每一进程都有需要发送给其他进程的数据 我们在这里可以理解为脉冲 那其实每一个进程都有一个邻接表 而邻接表的内容代表需要和其他进程通信的需求

首先,假设每一进程都有需要发送给其他进程的数据,我们在这里可以理解为脉冲,那其实每一个进程都有一个邻接表,而邻接表的内容代表需要和其他进程通信的需求。我这里使用4个进程。关于邻接表的创建,我打算随机生成。如代码

#include <stdio.h>
#include <mpi.h>
#include <vector>
#include <random>
#include <iostream>

//随机生成一个邻接表
std::vector<std::vector<int>> generateRandomAdjacencyList(int numNodes,int myid)
{
    std::vector<std::vector<int>> adjacencyList(numNodes);

    // 生成随机连接
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<int> dist(0,  3);

    for (int i = 0; i < numNodes; ++i)
    {
        //生成一个随机长度
        int numConnections = dist(gen); 
        std::vector<int> connections(numConnections);

        // 生成不重复的随机连接
        for (int j = 0; j < numConnections; ++j)
        {
            int connectedNode;
            do
            {
                connectedNode = dist(gen);
            } while (std::find(connections.begin(), connections.end(), connectedNode) != connections.end());
            connections[j] = connectedNode;
        }

        adjacencyList[i] = connections;
    }
    //返回邻接表
    return adjacencyList;
}

// 打印邻接表
void printAdjacencyList(const std::vector<std::vector<int>>& adjacencyList,int myid)
{
    for (int i = 0; i < adjacencyList.size(); ++i)
    {
        std::cout << "soruce process " << myid << ",target process: "<<i<<",content is ";
        for (int j = 0; j < adjacencyList[i].size(); ++j)
        {
            std::cout << adjacencyList[i][j] << " ";
        }
        std::cout << std::endl;
    }
}

int main(int argc, char* argv[])
{
    int myid, numprocs;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    
    int numNodes = numprocs;
    // 生成随机邻接表
    std::vector<std::vector<int>> adjacencyList = generateRandomAdjacencyList(numNodes, myid);

    printAdjacencyList(adjacencyList ,myid);


    MPI_Finalize();

    return 0;

}

运行结果如图:

 

我们写一些NBX的伪代码:

		prepare list I of destinations of data
		for each i in I, send message to destination (nonblocking synchronous comm: MPI_Issend)

		while not done
				nonblocking probe to check for incoming messages (any source)
					receive if detected
				if need_to_check_barrier
					done = ibarrier has completed
				else
					need_to_check_barrier = all sends finished

那么整体的代码应该为

#include <stdio.h>
#include <mpi.h>
#include <vector>
#include <random>
#include <iostream>

//随机生成一个邻接表
std::vector<std::vector<int>> generateRandomAdjacencyList(int numNodes,int myid)
{
    std::vector<std::vector<int>> adjacencyList(numNodes);

    // 生成随机连接
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<int> dist(0,  3);

    for (int i = 0; i < numNodes; ++i)
    {
        //生成一个随机长度
        int numConnections = dist(gen); 
        std::vector<int> connections(numConnections);

        // 生成不重复的随机连接
        for (int j = 0; j < numConnections; ++j)
        {
            int connectedNode;
            do
            {
                connectedNode = dist(gen);
            } while (std::find(connections.begin(), connections.end(), connectedNode) != connections.end());
            connections[j] = connectedNode;
        }

        adjacencyList[i] = connections;
    }
    //返回邻接表
    return adjacencyList;
}

// 打印邻接表
void printAdjacencyList(const std::vector<std::vector<int>>& adjacencyList,int myid)
{
    for (int i = 0; i < adjacencyList.size(); ++i)
    {
        std::cout << "soruce process " << myid << ",target process: "<<i<<",content is ";
        for (int j = 0; j < adjacencyList[i].size(); ++j)
        {
            std::cout << adjacencyList[i][j] << " ";
        }
        std::cout << std::endl;
    }
}

int main(int argc, char* argv[])
{
    int myid, numprocs;

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &myid);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    
    // 生成随机邻接表
    std::vector<std::vector<int>> adjacencyList = generateRandomAdjacencyList(numprocs, myid);
    
    // 打印生成的结果
    printAdjacencyList(adjacencyList ,myid);

    int send_size; //发送内容的长度
    std::vector<MPI_Request> requests(numprocs);
    std::vector<MPI_Request> requests_list;
    int comm = 0;

    // 向其他进程发送邻接表数据
    for (int ii = 0; ii < numprocs; ii++)
    {
        send_size = adjacencyList[ii].size();
        if (send_size == 0)
            continue;

        // 发送邻接表数据给进程 ii
        MPI_Issend(&(adjacencyList[ii][0]), send_size, MPI_INT, ii, comm, MPI_COMM_WORLD, &requests[ii]);

        // 将发送请求保存到请求列表中
        requests_list.push_back(requests[ii]);
    }

    bool do_work = true;
    bool check_barrier = false;
    MPI_Request ibarrier_request;
    int flag;
    MPI_Status status;
    std::vector<int> received;

    do
    {
        // 检查是否有任何进程发送的消息
        MPI_Iprobe(MPI_ANY_SOURCE, comm, MPI_COMM_WORLD, &flag, &status);
        if (flag)
        {
            // 读取和接收消息
            int count; //读取内容的大小
            MPI_Get_count(&status, MPI_INT, &count);
            std::vector<int> buffer(count);
            MPI_Recv(&buffer[0], count, MPI_UNSIGNED, status.MPI_SOURCE, comm, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            for (int ss = 0; ss < count; ss++)
            {
                received.push_back(buffer[ss]);
            }
            
        }

        if (check_barrier)
        {
            // 检查 MPI_Ibarrier 是否由所有进程达到,如果是,则 do_work = false
            MPI_Test(&ibarrier_request, &flag, MPI_STATUS_IGNORE);
            do_work = !flag;
        }
        else
        {
            // 检查所有 MPI_Issend 是否已完成,如果是,则 check_barrier = true
            int request_size = requests_list.size();
            int requests_left = request_size;
            for (int ii = 0; ii < request_size; ii++)
            {
                if (requests_list[ii] == MPI_REQUEST_NULL)
                {
                    requests_left--;
                    continue;
                }
                MPI_Test(&requests_list[ii], &flag, MPI_STATUS_IGNORE);
                if (flag)
                {
                    requests_left--;
                }
            }
            if (requests_left == 0)
            {
                // 当所有的数据发送完毕后 ,触发 MPI_Ibarrier
                MPI_Ibarrier(MPI_COMM_WORLD, &ibarrier_request);
                check_barrier = true;
            }
        }
    } while (do_work);

    // 打印结果
    std::cout << "process " << myid << ", receive data is ";
    for (int jj = 0; jj < received.size(); ++jj)
    {
        std::cout << received[jj] << " ";
    }
    std::cout << std::endl;
    MPI_Finalize();

    return 0;

}

那么运行结果如图

编程小号
上一篇 2025-03-29 19:57
下一篇 2025-03-25 13:57

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/107658.html