首先,假设每一进程都有需要发送给其他进程的数据,我们在这里可以理解为脉冲,那其实每一个进程都有一个邻接表,而邻接表的内容代表需要和其他进程通信的需求。我这里使用4个进程。关于邻接表的创建,我打算随机生成。如代码
#include <stdio.h>
#include <mpi.h>
#include <vector>
#include <random>
#include <iostream>
//随机生成一个邻接表
std::vector<std::vector<int>> generateRandomAdjacencyList(int numNodes,int myid)
{
std::vector<std::vector<int>> adjacencyList(numNodes);
// 生成随机连接
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<int> dist(0, 3);
for (int i = 0; i < numNodes; ++i)
{
//生成一个随机长度
int numConnections = dist(gen);
std::vector<int> connections(numConnections);
// 生成不重复的随机连接
for (int j = 0; j < numConnections; ++j)
{
int connectedNode;
do
{
connectedNode = dist(gen);
} while (std::find(connections.begin(), connections.end(), connectedNode) != connections.end());
connections[j] = connectedNode;
}
adjacencyList[i] = connections;
}
//返回邻接表
return adjacencyList;
}
// 打印邻接表
void printAdjacencyList(const std::vector<std::vector<int>>& adjacencyList,int myid)
{
for (int i = 0; i < adjacencyList.size(); ++i)
{
std::cout << "soruce process " << myid << ",target process: "<<i<<",content is ";
for (int j = 0; j < adjacencyList[i].size(); ++j)
{
std::cout << adjacencyList[i][j] << " ";
}
std::cout << std::endl;
}
}
int main(int argc, char* argv[])
{
int myid, numprocs;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
int numNodes = numprocs;
// 生成随机邻接表
std::vector<std::vector<int>> adjacencyList = generateRandomAdjacencyList(numNodes, myid);
printAdjacencyList(adjacencyList ,myid);
MPI_Finalize();
return 0;
}
运行结果如图:
我们写一些NBX的伪代码:
prepare list I of destinations of data
for each i in I, send message to destination (nonblocking synchronous comm: MPI_Issend)
while not done
nonblocking probe to check for incoming messages (any source)
receive if detected
if need_to_check_barrier
done = ibarrier has completed
else
need_to_check_barrier = all sends finished
那么整体的代码应该为
#include <stdio.h>
#include <mpi.h>
#include <vector>
#include <random>
#include <iostream>
//随机生成一个邻接表
std::vector<std::vector<int>> generateRandomAdjacencyList(int numNodes,int myid)
{
std::vector<std::vector<int>> adjacencyList(numNodes);
// 生成随机连接
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<int> dist(0, 3);
for (int i = 0; i < numNodes; ++i)
{
//生成一个随机长度
int numConnections = dist(gen);
std::vector<int> connections(numConnections);
// 生成不重复的随机连接
for (int j = 0; j < numConnections; ++j)
{
int connectedNode;
do
{
connectedNode = dist(gen);
} while (std::find(connections.begin(), connections.end(), connectedNode) != connections.end());
connections[j] = connectedNode;
}
adjacencyList[i] = connections;
}
//返回邻接表
return adjacencyList;
}
// 打印邻接表
void printAdjacencyList(const std::vector<std::vector<int>>& adjacencyList,int myid)
{
for (int i = 0; i < adjacencyList.size(); ++i)
{
std::cout << "soruce process " << myid << ",target process: "<<i<<",content is ";
for (int j = 0; j < adjacencyList[i].size(); ++j)
{
std::cout << adjacencyList[i][j] << " ";
}
std::cout << std::endl;
}
}
int main(int argc, char* argv[])
{
int myid, numprocs;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
// 生成随机邻接表
std::vector<std::vector<int>> adjacencyList = generateRandomAdjacencyList(numprocs, myid);
// 打印生成的结果
printAdjacencyList(adjacencyList ,myid);
int send_size; //发送内容的长度
std::vector<MPI_Request> requests(numprocs);
std::vector<MPI_Request> requests_list;
int comm = 0;
// 向其他进程发送邻接表数据
for (int ii = 0; ii < numprocs; ii++)
{
send_size = adjacencyList[ii].size();
if (send_size == 0)
continue;
// 发送邻接表数据给进程 ii
MPI_Issend(&(adjacencyList[ii][0]), send_size, MPI_INT, ii, comm, MPI_COMM_WORLD, &requests[ii]);
// 将发送请求保存到请求列表中
requests_list.push_back(requests[ii]);
}
bool do_work = true;
bool check_barrier = false;
MPI_Request ibarrier_request;
int flag;
MPI_Status status;
std::vector<int> received;
do
{
// 检查是否有任何进程发送的消息
MPI_Iprobe(MPI_ANY_SOURCE, comm, MPI_COMM_WORLD, &flag, &status);
if (flag)
{
// 读取和接收消息
int count; //读取内容的大小
MPI_Get_count(&status, MPI_INT, &count);
std::vector<int> buffer(count);
MPI_Recv(&buffer[0], count, MPI_UNSIGNED, status.MPI_SOURCE, comm, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for (int ss = 0; ss < count; ss++)
{
received.push_back(buffer[ss]);
}
}
if (check_barrier)
{
// 检查 MPI_Ibarrier 是否由所有进程达到,如果是,则 do_work = false
MPI_Test(&ibarrier_request, &flag, MPI_STATUS_IGNORE);
do_work = !flag;
}
else
{
// 检查所有 MPI_Issend 是否已完成,如果是,则 check_barrier = true
int request_size = requests_list.size();
int requests_left = request_size;
for (int ii = 0; ii < request_size; ii++)
{
if (requests_list[ii] == MPI_REQUEST_NULL)
{
requests_left--;
continue;
}
MPI_Test(&requests_list[ii], &flag, MPI_STATUS_IGNORE);
if (flag)
{
requests_left--;
}
}
if (requests_left == 0)
{
// 当所有的数据发送完毕后 ,触发 MPI_Ibarrier
MPI_Ibarrier(MPI_COMM_WORLD, &ibarrier_request);
check_barrier = true;
}
}
} while (do_work);
// 打印结果
std::cout << "process " << myid << ", receive data is ";
for (int jj = 0; jj < received.size(); ++jj)
{
std::cout << received[jj] << " ";
}
std::cout << std::endl;
MPI_Finalize();
return 0;
}
那么运行结果如图
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/107658.html