Zdrojové súbory na cvičenie si stiahnite spoločne ako archív.
Na použitie MPI potrebujete podporu MPI v Pythone (mpi4py) a samotnú implementáciu MPI. Na predmete používame Intel MPI, ktorá je tiež dostupná ako Python balíček (impi-rt).
|
1 |
pip install mpi4py impi-rt |
Na spustenie paralelnej aplikácie sa používa program mpiexec. Nájdete ho na ceste ...cesta_k_Pythonu\Library\bin\mpiexec.exe. Pri spusteniach aplikácií na tomto cvičení musíte používať celú cestu k programu. Alternatívne si ho môžete pridať do premennej PATH.
Keďže som impi-rt inštaloval globálne, mpiexec mám na ceste C:\Python314\Library\bin\mpiexec.exe. Do PATH ho natrvalo pridám pomocou týchto príkazov v PowerShelli:
|
1 2 3 |
$oldPath = [Environment]::GetEnvironmentVariable("Path", "User") $newPath = "C:\Python314\Library\bin;$oldPath" [Environment]::SetEnvironmentVariable("Path", $newPath, "User") |
Nasleduje ukážka point-to-point komunikácie, v ktorej sú posielané Python objekty. Knižnica mpi4py ich pred odosielaním a po prijatí automaticky serializuje/deserializuje pomocou modulu pickle. Takéto posielanie je síce programátorsky prívetivé, ale nie veľmi výkonné.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
"""A ping-pong program in python, using pickle'd communication. This program file is part of the book and course "Parallel Computing" by Victor Eijkhout, copyright 2013-5 """ from mpi4py import MPI import time import sys comm = MPI.COMM_WORLD ntids = comm.Get_size() mytid = comm.Get_rank() ntests = 100 for s in [1, 10, 100, 1000, 10000, 100000]: if mytid == 0: data = [2.0 * i for i in range(s)] starttime = MPI.Wtime() for _ in range(ntests): comm.send(data, dest=ntids - 1) rdata = comm.recv(source=ntids - 1) elapsed = MPI.Wtime() - starttime print(f"Size={s}, elapsed time: {elapsed}", file=sys.stderr) # Show that this won't work with numpy if data != rdata: print("oops", data, rdata) elif mytid == ntids - 1: for _ in range(ntests): zdata = comm.recv(source=0) comm.send(zdata, dest=0) |
Všimnite si:
0 a ntids - 1).0 pošle zoznam data procesu ntids - 1 a ten ho pošle späť procesu 0.Vašou úlohou je prepísať tento kód, aby procesy na komunikáciu používali numpy typy. Využite k tomu kostru programu nižšie.
Tipy:
dtype=np.float64.arange() a empty().|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
"""A ping-pong program in python, using numpy communication. This program file is part of the book and course "Parallel Computing" by Victor Eijkhout, copyright 2013-5 """ from mpi4py import MPI import numpy as np import time import sys comm = MPI.COMM_WORLD ntids = comm.Get_size() mytid = comm.Get_rank() ntests = 100 for s in [1, 10, 100, 1000, 10000, 100000]: if mytid == 0: # TODO: Alokujte polia data = None rdata = None for i in range(s): data[i] = i + 1 starttime = MPI.Wtime() for test in range(ntests): # TODO: Pridajte komunikaciu pass elapsed = MPI.Wtime() - starttime print(f"Size={s}, elapsed time: {elapsed}", file=sys.stderr) if not np.array_equal(data, rdata): print("oops", data, rdata) elif mytid == ntids - 1: zdata = np.empty(s, dtype=np.float64) for test in range(ntests): # TODO: Pridajte komunikaciu pass |
Dokončite nižšie uvedený kód skalárneho súčinu dvoch vektorov. Ako dátové typy použite numpy polia. Aký je rozdiel medzi paralelne vypočítaným súčinom a kontrolným súčinom vypočítaným v procese 0?
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
"""This module implements a scalar product of two vectors using MPI. The two vectors are randomly generated and then broadcast to all processors. This solution is not optimal, as the data is broadcasted to all processors, even though only a part of it is needed. """ __author__ = "Tomáš Vavro, Marián Šebeňa, Roderik Ploszek" __licence__ = "MIT" from mpi4py import MPI import numpy as np N = 101 # pocet prvkov vo vektoroch world = MPI.COMM_WORLD world_size = world.Get_size() rank = world.Get_rank() per_processor_range = N // world_size rng = np.random.default_rng(seed=1) # initialize the vectors and calculate workflows if rank == 0: # TODO: Vygenerujte polia s N nahodnymi cislami `a` a `b`. a = None b = None # TODO: Vypocitajte rozlozenie prace `processor_ranges` pre kazdy procesor. # Pouzite celociselny typ. Priklad: Ak mame 8 procesov a 20 udajov na # spracovanie, jedno z moznych rozlozeni je [3, 3, 3, 3, 2, 2, 2, 2]. processor_ranges = np.array() # Vypocitame ocakavany skalarny sucin na overenie spravnosti vysledku. expected_dot_product = np.dot(a, b) else: # TODO: Kazdy prijimaci proces alokuje dve nulove (prazdne) polia (`a` # a `b`). a = None b = None # TODO: Kazdy prijimaci proces alokuje nulove pole pre rozlozenie prace. processor_ranges = None expected_dot_product = None # TODO: Rozposlite oba vektory a informacie o rozlozeni prace VSETKYM procesom. # calculate the partial dot product start = np.sum(processor_ranges[:rank]) result = 0 for i in range(start, start + processor_ranges[rank]): result += a[i] * b[i] # TODO: Pockajte na dokoncenie vypoctov. # TODO: Alokujte pole o velkosti 1 pre vysledok redukcie. total_result = None # TODO: Zredukujte ciastocne vysledky z kazdeho procesu. if rank == 0: print(f"Vypocitany skalarny sucin: {total_result[0]}") print(f"Ocakavany skalarny sucin: {expected_dot_product}") |
V predchádzajúcej úlohe sme pri výpočte skalárneho súčinu rozposlali oba vektory v celej dĺžke všetkým procesom, aj keď každý proces počítal iba s menšou časťou každého vektora.
Teraz implementujte riešenie skalárneho súčinu vektorov, ktoré jednotlivým procesom pošle iba tie časti vektorov, ktoré skutočne potrebujú k výpočtu. Použite na to operáciu scatter.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
"""This module implements a scalar product of two vectors using MPI. The two vectors are randomly generated and then scattered between processors. The final result is calculated by gathering the partial results and summing them up on the root processor. """ __author__ = "Tomáš Vavro, Marián Šebeňa" __licence__ = "MIT" from mpi4py import MPI import numpy as np N = 101 # pocet prvkov vo vektoroch world = MPI.COMM_WORLD world_size = world.Get_size() rank = world.Get_rank() per_processor_range = N // world_size rng = np.random.default_rng(seed=1) # Inicializacia vektorov (poli) a = None b = None if rank == 0: # TODO: Vygenerujte polia s N nahodnymi cislami `a` a `b`. a = None b = None # TODO: Rozsirte polia nulami na velkost delitelnu poctom procesov # (padding). Pozrite dokumentaciu metody np.pad(). # Vyskusajte, co sa stane bez rozsirenia. # TODO: Alokujte ciastkove polia pre kazdy proces (aka bude ich velkost?) local_a = None local_b = None # TODO: Rozptylte data do jednotlivych procesov (operacia scatter). # Vypocet ciastocneho sucinu result = np.dot(local_a, local_b) # TODO: Alokujte pole s velkostou 1 pre vysledok redukcie. total_result = None # TODO: Zredukujte ciastocne vysledky z kazdeho procesu. if rank == 0: print(f"Vypocitany skalarny sucin: {total_result.sum()}") print(f"Ocakavany skalarny sucin: {np.dot(a, b)}") |
