Nakoľko sa v dnešnom cvičení budeme venovať definíciám rôznych abstraktných údajových typov, bolo by viac než vhodné vedieť, ako sa pracuje s definíciou tried v jazyku Python. Ak si niekto potrebuje zopakovať túto oblasť, odporúčame napríklad časť 9.3 tutoriálu jazyka Python.
Najdôležitejšie veci špecifické pre jazyk Python:
self. Pomocou neho pristupujeme k atribútom objektu. Ak má nejaká metóda viac argumentov, self je vždy prvý. __init__(self, ...). Nejedná sa ale o konštruktor! V čase volania tejto funkcie už jestvuje v pamäti vytvorená inštancia triedy; táto metóda sa vyvoláva na konci celého procesu vytvárania objektu iba za účelom inicializácie jeho atribútov. Funkcia nemá návratovú hodnotu (presnejšie, iba hodnota None je povolená ako návratová). Zdrojové súbory na cvičenie, ktoré vidíte vo výpisoch nižšie, si môžete stiahnuť spoločne ako archív.
V module ppds sa nachádzajú (okrem iných) nasledovné triedy použiteľné pre synchronizáciu:
Mutex Event Mutex implementuje zámok (lock), v ponímaní prednášok semafor inicializovaný na hodnotu 1. Pre zvýšenie čitateľnosti kódu odporúčame na mieste vyžadujúcom mutex využívať túto triedu. Použitie je jednoduché:
|
1 2 3 4 5 6 |
from fei.ppds import Mutex m = Mutex() m.lock() m.unlock() |
Volanie Mutex() vytvorí inštanciu triedy Mutex. Objekt je inicializovaný, mutex je odomknutý. Nad objektom môžete využívať funkcie lock() a unlock(): prvá (z názvu) mutex uzamyká, druhá ho odomyká.
Trieda Event slúži na synchronizáciu pomocou udalostí. Použitie je tiež triviálne:
|
1 2 3 4 5 6 7 |
from fei.ppds import Event m = Event() m.signal() # alebo m.set() m.wait() m.clear() |
Konštruktor vytvorí objekt triedy Event. Pomocou metódy set() alebo signal() sa signalizuje udalosť; interne sa v objekte nastavuje príznak nastatia udalosti. Volanie wait() blokuje všetky vlákna, ktoré ho vyvolávajú, až pokým nenastane udalosť (teda pokým sa nezavolá metóda set() alebo signal()). Ak sa pomocou set() alebo signal() signalizuje udalosť, všetky vlákna, ktoré sú blokované vo funkcii wait(), budú odblokované. Taktiež platí, že každé volanie wait() po set() alebo signal() nie je blokujúce. To je zrejmé, pretože už udalosť nastala.
Objekt triedy Event je znovupoužiteľný. To znamená, že je možnosť „vynulovať“ nastavenie udalosti pomocou metódy clear(). Po jej zavolaní začnú byť znovu všetky volania wait() blokujúce, až pokým nebude nad týmto objektom (znovu) vyvolaná metóda set() alebo signal().
V tejto úlohe máte pripravený ADT SimpleBarrier podľa špecifikácie z prednášky. V nej sme na synchronizáciu použili ADT Semafor. Vašou úlohou je prepracovať implementáciu turniketu tak, aby ste použili signalizáciu pomocou udalosti (ADT Event).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
from random import randint from time import sleep from fei.ppds import Thread, Semaphore, Mutex, Event """Vypisovat na monitor budeme pomocou funkcie 'print' importovanej z modulu 'ppds'. To kvoli tomu, aby neboli 'rozbite' vypisy. """ from fei.ppds import print class SimpleBarrier: def __init__(self, N: int): """Inicializuje barieru. :param N: pocet vlakien potrebnych na otvorenie bariery """ self.N = N self.cnt = 0 self.mutex = Mutex() # TODO: Prepracujte implementaciu turniketu, aby pouzival udalost (ADT # Event) self.turnstile = Semaphore(0) def wait(self): self.mutex.lock() self.cnt += 1 if self.cnt == self.N: self.cnt = 0 self.turnstile.signal(self.N) """Skuste experimentovat s nasledovnym riadkom. Ako sa sprava bariera, ked je zakomentovany a ako ked je odkomentovany? V com je rozdiel? """ # sleep(0) self.mutex.unlock() self.turnstile.wait() def barrier_example(barrier: SimpleBarrier, thread_id: int): """Predpokladajme, ze nas program vytvara a spusta 5 vlakien, ktore vykonavaju tuto funkciu, ktorej argumentom je zdielany objekt jednoduchej bariery. """ sleep(randint(1, 10) / 10) print(f'vlakno {thread_id} pred barierou') barrier.wait() print(f'vlakno {thread_id} po bariere') if __name__ == "__main__": # priklad pouzitia ADT SimpleBarrier sb = SimpleBarrier(5) # kod, v ktorom sa vytvara a spusta 5 vlakien threads = [Thread(barrier_example, sb, tid) for tid in range(5)] # pockame na ukoncenie cinnosti vlakien [t.join() for t in threads] |
Na ďalšie otestovanie ADT SimpleBarrier implementujte znovupoužiteľnú bariéru podľa špecifikácie z prednášky. Cieľom cvičenia nie je prepisovať kód z prednášky, ale zamyslieť sa nad úlohou a snažiť sa ju vyriešiť. V prípade nejasností je samozrejme potrebné a rozumné hľadať pomoc (napríklad v prednáške).
V implementácii využite svoj ADT SimpleBarrier (implementácia pomocou udalosti) z predchádzajúcej úlohy. Svoje riešenie overte experimentálne pomocou tohto kódu:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
from time import sleep from random import randint from fei.ppds import Thread, Mutex, Semaphore, Event """Vypisovat na monitor budeme pri zamknutom mutexe pomocou funkcie 'print' z modulu 'fei.ppds', aby sme nemali rozbite vypisy. """ from fei.ppds import print def rendezvous(thread_name: str): sleep(randint(1, 10) / 10) print(f'rendezvous: {thread_name}') def some_code_after_barrier(thread_name: str): print(f'after_barrier: {thread_name}') sleep(randint(1, 10) / 10) def barrier_example(thread_name: str): """Kazde vlakno vykonava kod funkcie 'barrier_example'. Doplnte synchronizaciu tak, aby sa vsetky vlakna pockali nielen pred vykonanim funkcie 'some_code_after_barrier', ale aj *vzdy* pred zacatim vykonavania funkcie 'rendezvous'. """ for _ in range(3): # ... TODO faza synchronizacie ? rendezvous(thread_name) # ... TODO faza synchronizacie ? some_code_after_barrier(thread_name) # ... TODO faza synchronizacie ? if __name__ == "__main__": """Nezabudnime vytvorit aj zdielane synchronizacne objekty, a dat ich ako argumenty kazdemu vlaknu, ktore chceme pomocou nich synchronizovat. """ # --- KONFIGURACIA --- N = 5 # TODO: Vytvorte potrebne instancie bariery, napriklad: # barriers = [SimpleBarrier(N), SimpleBarrier(N)] # Vytvorime vlakna, ktore chceme synchronizovat. threads = [Thread(barrier_example, f'Thread {i}') for i in range(N)] # Pockame na ich ukoncenie. [t.join() for t in threads] |
Vytvorte N vlákien. Vlákno i nech reprezentuje uzol, v ktorom sa vypočítava prvok na pozícii i+2 Fibonacciho postupnosti (0, 1, 1, 2, 3, 5, 8, 13, 21, …). Nech všetky vlákna zdieľajú spoločný zoznam, do ktorého sa postupne počas výpočtu ukladajú vypočítané prvky postupnosti. Prvé dva prvky 0 a 1 nech sú v tomto zozname pevne dané. Navrhnite pomocou doteraz prebraných synchronizačných nástrojov takú synchronizáciu, aby vlákno i+2 mohlo vykonávať výpočet jemu určeného Fibonacciho čísla až potom, ako svoje výsledky uložia do zoznamu vlákna, ktoré vypočítavajú predošlé čísla postupnosti (teda po skončení výpočtu vlákien i a i+1). Pri synchronizácii nezabúdajte na krajné prípady!
Nižšie je vzorové riešenie, v ktorom sú použité udalosti:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
from time import sleep from random import randint from fei.ppds import Thread, print, Event def fib_node_event(i: int, fib_list: list[int], signals: list[Event]): """Vlakno i vypocitava prvok na pozicii i + 2. Caka na udalosti i a i + 1, potom vypocita vysledok a nastavi udalost i + 2. """ # Mierne zdrzanie, aby sme dali sancu synchronizacii... sleep(randint(1, 10) / 10) # Cakanie na pripravenost predchodcov signals[i].wait() signals[i + 1].wait() # Vypocet fib_list[i + 2] = fib_list[i] + fib_list[i + 1] print(f"Vlakno {i:2}: F({i+2:2}) = {fib_list[i+2]}") # Signalizacia pre vsetkych nasledovnikov naraz signals[i + 2].signal() def solve_fibonacci_event(N: int): """Vypise `N` clenov Fibonacciho postupnosti. Vypis zacne na prvku s indexom 2. :param N: pocet vytvorenych vlakien """ fib_list = [0] * (N + 2) fib_list[0], fib_list[1] = 0, 1 # Inicializacia udalosti signals = [Event() for _ in range(N + 2)] threads = [] for i in range(N): t = Thread(fib_node_event, i, fib_list, signals) threads.append(t) # Prve dva prvky postupnosti su hotove hned signals[0].signal() signals[1].signal() for t in threads: t.join() print(f"\nVysledna postupnost: {fib_list}") if __name__ == "__main__": print("Spustam riesenie s udalostami...") solve_fibonacci_event(10) |
Vašou úlohou je prepracovať vzorové riešenie s využitím semaforov. Nižšie je kostra kódu, do ktorej doplňte potrebné riadky.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
from time import sleep from random import randint from fei.ppds import Thread, Semaphore, print def fib_node_sem(i: int, fib_list: list[int], signals: list[Semaphore]): """Vypis prvok Fibonacciho postupnosti na pozicii i + 2. ULOHA: Doplnte synchronizaciu pomocou semaforov. Nezabudnite, ze kazde vypocitane cislo (okrem poslednych dvoch) bude potrebovat DVE nasledujuce vlakna. :param i: cislo vlakna :param fib_list: zoznam clenov Fibonacciho postupnosti :param signal: zoznam synchronizacnych semaforov """ # Mierne zdrzanie, aby sme dali sancu synchronizacii... sleep(randint(1, 10) / 10) # 1. Cakanie na predchadzajuce dva vysledky # --- TODO DOPLNTE KOD --- # 2. Vypocet fib_list[i + 2] = fib_list[i] + fib_list[i + 1] print(f"Vlakno {i:2}: F({i+2:2}) = {fib_list[i+2]}") # 3. Signalizacia pre nasledovnikov # Pomocka: Kolkokrat treba zavolat signal()? # --- TODO DOPLNTE KOD --- def solve_fibonacci_sem(N): """Vypise `N` clenov Fibonacciho postupnosti. Vypis zacne na prvku s indexom 2. :param N: pocet vytvorenych vlakien """ fib_list = [0] * (N + 2) fib_list[0], fib_list[1] = 0, 1 # TODO: Inicializujte zoznam semaforov signals = [] threads = [] for i in range(N): t = Thread(fib_node_sem, i, fib_list, signals) threads.append(t) # "Nabite" semafory pre prve dva cleny (index 0 a 1) # Pozor: Index 0 potrebuje len vlakno 0, ale index 1 potrebuje # vlakna 0 aj 1! # --- TODO DOPLNTE KOD --- for t in threads: t.join() print(f"\nVysledna postupnost: {fib_list}") if __name__ == "__main__": print("Spustam riesenie so semaformi...") solve_fibonacci_sem(10) |
Otázky na zamyslenie:
1) Aký je najmenší počet synchronizačných objektov (semafory, mutexy, udalosti) potrebných na riešenie tejto úlohy?
2) Ktoré z prebratých synchronizačných vzorov (vzájomné vylúčenie, signalizácia, rendezvous, bariéra) sa dajú (rozumne) využiť pri riešení tejto úlohy? Konkrétne popíšte, ako sa ten-ktorý synchronizačný vzor využíva vo vašom riešení.
