使⽤Python进⾏MPI并⾏编程
mpi4py的安装
我们将使⽤ MPI for Python 包mpi4py。如果您有⼀个⼲净的geo_scipy环境,如本⽹站上 Ryan 的 Python 安装说明所述,您应该能够使⽤ conda 安装它⽽不会出现任何问题。⾸先要做的是打开终端外壳并激活geo_scipy:
source activate geo_scipy
(或者您可以从 Anaconda 应⽤程序启动它)
然后安装mpi4py:
conda install mpi4py
什么是 mpi4py?
MPI for Python 为 Python 语⾔提供 MPI 绑定,允许程序员利⽤多处理器计算系统。mpi4py 是在 MPI-1/2 规范之上构建的,并提供了⼀个紧密遵循 MPI-2 C++ 绑定的⾯向对象的接⼝。
linspace numpy
mpi4py 的⽂档
mpi4py 的⽂档可以在这⾥到:
但是,它仍在进⾏中,其中⼤部分假设您已经熟悉 MPI 标准。因此,您可能还需要查阅 MPI 标准⽂档:
MPI ⽂档仅涵盖 C 和 Fortran 实现,但 Python 语法的扩展很简单,并且在⼤多数情况下⽐等效的 C 或 Fortran 语句要简单得多。
另⼀个有⽤的寻求帮助的地⽅是 mpi4py 的 API 参考:
特别是,Class Comm 部分列出了您可以与 communicator 对象⼀起使⽤的所有⽅法:
## 使⽤ MPI 运⾏ Python 脚本
使⽤ MPI 命令的 Python 程序必须使⽤命令提供的 MPI 解释器运⾏mpirun。在某些系统上,改为调⽤此命令,mpiexec并且 mpi4py 似乎包括两者。
通过使⽤Unix comamnd 检查mpirun您的 anaconda ⽬录中的geo_scipy以确保您的环境正确:which
$ which mpirun
/anaconda/envs/geo_scipy/bin/mpirun
您可以使⽤以下命令运⾏ MPI Python 脚本mpirun:
mpirun -n 4 python script.py
这⾥-n 4告诉 MPI 使⽤四个进程,这是我笔记本电脑上的内核数。然后我们告诉 MPI 运⾏名为script.py.
如果您在台式计算机上运⾏此程序,则应将-n参数调整为系统上的内核数或作业所需的最⼤进程数,以较⼩者为准。或者在⼤型集上,您可以指定程序需要的内核数或特定集上可⽤的最⼤内核数。
沟通者和等级
我们的第⼀个 Python ⽰例的 MPI 将简单地从 mpi4py 包中导⼊ MPI,创建⼀个通信器并获取每个进程的等级:
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
print('My rank is ',rank)
将此保存到⽂件调⽤中comm.py,然后运⾏它:
mpirun -n 4 python comm.py
在这⾥,我们使⽤了名为 的默认通信MPI.COMM_WORLD器,它由所有处理器组成。对于许多 MPI 代码,这是您需要的主要通信器。但是,您可以使⽤MPI.COMM_WORLD. 有关更多信息,请参阅⽂档。
点对点通信
现在我们将研究如何将数据从⼀个进程传递到另⼀个进程。这是⼀个⾮常简单的⽰例,我们将字典从进程 0 传递到进程 1:
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = {'a': 7, 'b': 3.14}
comm.send(data, dest=1)
elif rank == 1:
data = v(source=0)
print('On process 1, data is ',data)
这⾥我们发送了⼀个字典,但你也可以发送⼀个带有类似代码的整数:
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
idata = 1
comm.send(idata, dest=1)
elif rank == 1:
idata = v(source=0)
print('On process 1, data is ',idata)
请注意如何comm.send和v有⼩写s和r。
现在让我们看⼀个更复杂的例⼦,我们发送⼀个 numpy 数组:
rank = comm.Get_rank()
if rank == 0:
# in real code, this section might
# read in data parameters from a file
numData = 10
comm.send(numData, dest=1)
data = np.linspace(0.0,3.14,numData)
comm.Send(data, dest=1)
elif rank == 1:
numData = v(source=0)
print('Number of data to receive: ',numData)
data = np.empty(numData, dtype='d') # allocate space to receive the array
comm.Recv(data, source=0)
print('data received: ',data)
请注意⽤于发送和接收 numpy 数组的comm.Send和如何comm.Recv使⽤⼤写S和R.
集体沟通
⼴播:
⼴播接受⼀个变量并将它的精确副本发送到通信器上的所有进程。这⾥有些例⼦:
⼴播字典:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = {'key1' : [1,2, 3],
'key2' : ( 'abc', 'xyz')}
else:
data = None
data = comm.bcast(data, root=0)
print('Rank: ',rank,', data: ' ,data)
您可以使⽤该⽅法⼴播⼀个 numpy 数组Bcast(再次注意⼤写B)。在这⾥,我们将修改上⾯的点对点代码,改为将数组⼴播data到通信器中的所有进程(⽽不仅仅是从进程 0 到 1):
rank = comm.Get_rank()
if rank == 0:
# create a data array on process 0
# in real code, this section might
# read in data parameters from a file
numData = 10
data = np.linspace(0.0,3.14,numData)
else:
numData = None
# broadcast numData and allocate array on other ranks:
numData = comm.bcast(numData, root=0)
if rank != 0:
data = np.empty(numData, dtype='d')
comm.Bcast(data, root=0) # broadcast the array from rank 0 to all others
print('Rank: ',rank, ', data received: ',data)
散射:
现在让我们尝试⼀个例⼦。
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size() # new: gives number of ranks in comm
rank = comm.Get_rank()
numDataPerRank = 10
data = None
if rank == 0:
data = np.linspace(1,size*numDataPerRank,numDataPerRank*size)
# when size=4 (using -n 4), data = [1.0:40.0]
recvbuf = np.empty(numDataPerRank, dtype='d') # allocate space for recvbuf comm.Scatter(data, recvbuf, root=0)
print('Rank: ',rank, ', recvbuf received: ',recvbuf)
在此⽰例中,排名为 0 的进程创建了数组data。由于这只是⼀个玩具⽰例,我们制作data了⼀个简单的 linspace 数组,但在研究代码中,数据可能是从⽂件中读取的,或者是由⼯作流的前⼀部分⽣成的。 data然后使⽤ 分散到所有等级(包括等级 0)comm.Scatter。请注意,我们⾸先必须初始化(或分配)接收缓冲区数组recvbuf。
搜集:
例如,这⾥有⼀个与上⾯的分散⽰例相反的代码。
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
numDataPerRank = 10
sendbuf = np.linspace(rank*numDataPerRank+1,(rank+1)*numDataPerRank,numDataPerRank)
print('Rank: ',rank, ', sendbuf: ',sendbuf)
recvbuf = None
if rank == 0:
recvbuf = np.empty(numDataPerRank*size, dtype='d')
comm.Gather(sendbuf, recvbuf, root=0)
if rank == 0:
print('Rank: ',rank, ', recvbuf received: ',recvbuf)
减少:
MP reduce操作从每个进程的数组中获取值,并将它们简化为根进程上的单个结果。这本质上就像从每个进程向根进程发送⼀个有点复杂的命令,然后让根进程执⾏归约操作。值得庆幸的是,MPI reduce ⽤⼀个简洁的命令完成了所有这些⼯作。
对于 numpy 数组,语法是
~~~python comm.Reduce(send_data, recv_data, op=, root=0) ~~~哪⾥send_data是通信器上所有进程发送的数据,recv_data是根进程上将接收所有数据的数组。请注意,发送和接收数组具有相同的⼤⼩。⾥⾯的所有数据副本 send_data都会按照指定的减
少<operation>。⼀些常⽤的操作是: - MPI_SUM - 对元素求和。- MPI_PROD - 将所有元素相乘。- MPI_MAX - 返回最⼤元素。-
MPI_MIN - 返回最⼩元素。
这是⼀个代码⽰例:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论