V tomto cvičení preberieme základy asynchrónneho programovania. Začneme jednoduchým synchrónnym príkladom, ktorý budeme postupne upravovať, až z neho na záver bude asynchrónny program. V druhej časti cvičenia budeme aplikovať asynchrónne programovanie na posielanie HTTP žiadostí. Porovnáme tiež zrýchlenie asynchrónnej implementácie oproti synchrónnej verzii.
Cvičenie je vypracované podľa learnpython.com.
Začneme jednoduchým príkladom, v ktorom máme definovanú jednoduchú úlohu reprezentovanú funkciou task()
. Táto úloha vyberá z fronty dáta na spracovanie (v našom príklade sa jedná o celé čísla) a po vybratí z fronty ich spracuje (v cykle spočíta jednotky a vypíše na obrazovku). Ak je fronta prázdna, úloha sa ukončí.
V hlavnej funkcii programu potom vytvoríme dve úlohy, ktoré následne v cykle spustíme.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import queue def task(name, work_queue): if work_queue.empty(): print(f"Task {name} nothing to do") else: while not work_queue.empty(): count = work_queue.get() total = 0 print(f"Task {name} running") for x in range(count): total += 1 print(f"Task {name} total: {total}") def main(): work_queue = queue.Queue() # Do fronty dame objekty na spracovanie (cisla) for work in [15, 10, 5, 2]: work_queue.put(work) # Pripravime dve synchronne ulohy tasks = [(task, "One", work_queue), (task, "Two", work_queue)] # Spustime pripravene ulohy for t, n, q in tasks: t(n, q) if __name__ == "__main__": main() |
Z výstupu vidíme, že na začiatku sa práce chopila prvá úloha, ktorá spracovala všetky údaje z fronty a ukončila činnosť. Druhá úloha už nemala s čím pracovať. Úlohy idú za sebou synchrónne.
Výstup:
1 2 3 4 5 6 7 8 9 |
Task One running Task One total: 15 Task One running Task One total: 10 Task One running Task One total: 5 Task One running Task One total: 2 Task Two nothing to do |
V tejto časti budeme implementovať vlastnú kooperatívnu konkurenciu. Potrebujeme k tomu urobiť dve veci. Funkciu task()
zmeníme na generátor. To nám umožní prerušiť vykonávanie úlohy zavolaním výrazu yield
. Oproti predošlému kódu nevoláme task()
ako samotnú úlohu, ale najprv vytvoríme generátorový objekt a až ten voláme.
Druhá vec, ktorú potrebujeme, je riadiaci cyklus, ktorý bude postupne vyvolávať úlohy a predávať im riadenie pomocou volania funkcie next()
. Pomocou tohto mechanizmu implementujeme kooperatívny multitasking, v ktorom máme niekoľko úloh, ktoré sa samy prerušia, aby mohla ísť druhá úloha. Riadiaci cyklus predstavuje primitívny „plánovač“, ktorý zavolá ďalšiu úlohu.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
import queue def task(name, queue): while not queue.empty(): count = queue.get() total = 0 print(f"Task {name} running") for x in range(count): total += 1 yield print(f"Task {name} total: {total}") def main(): work_queue = queue.Queue() # Do fronty dame objekty na spracovanie (cisla) for work in [15, 10, 5, 2]: work_queue.put(work) tasks = [task("One", work_queue), task("Two", work_queue)] # Spustime pripravene ulohy done = False while not done: for t in tasks: try: next(t) except StopIteration: tasks.remove(t) if len(tasks) == 0: done = True if __name__ == "__main__": main() |
Výstup:
1 2 3 4 5 6 7 8 9 |
Task One running Task Two running Task Two total: 10 Task Two running Task One total: 15 Task One running Task Two total: 5 Task One total: 2 |
Uvažujte: Z výstupu vidíme, že síce úloha 1 začala pracovať prvá, ale prvý súčet mala hotová úloha 2. Prečo je to tak?
Môže sa zdať, že sme vytvorili asynchrónny program, ale zatiaľ je všetko synchrónne – úlohy sa striedajú.
V ďalšej ukážke pridáme do kódu blokujúce volanie – v našom prípade volanie time.sleep()
. Blokujúce volanie spôsobí, že procesor prestane vykonávať kód nášho programu a bude čakať, kým nepríde odpoveď z tohto volania. Obvykle sa jedná o komunikáciu so vstupno-výstupnými zariadeniami alebo sieťovými soketmi. Funkcia sleep()
simuluje takéto volanie.
Ďalej sme do kódu pridali meranie času. Jedno je v úlohe, ktoré meria dĺžku práce jednej úlohy a druhé v hlavnej funkcii, ktoré meria celkovú dĺžku behu všetkých úloh. Funkcia time.perf_counter()
vráti hodnotu najpresnejšieho počítadla času dostupného v systéme.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
import time import queue def task(name, queue): while not queue.empty(): delay = queue.get() print(f"Task {name} running") time_start = time.perf_counter() time.sleep(delay) elapsed = time.perf_counter() - time_start print(f"Task {name} elapsed time: {elapsed:.1f}") yield def main(): # Create the queue of work work_queue = queue.Queue() # Put some work in the queue for work in [15, 10, 5, 2]: work_queue.put(work) tasks = [task("One", work_queue), task("Two", work_queue)] # Run the tasks done = False start_time = time.perf_counter() while not done: for t in tasks: try: next(t) except StopIteration: tasks.remove(t) if len(tasks) == 0: done = True elapsed = time.perf_counter() - start_time print(f"\nTotal elapsed time: {elapsed:.1f}") if __name__ == "__main__": main() |
Výstup:
1 2 3 4 5 6 7 8 9 10 |
Task One running Task One elapsed time: 15.0 Task Two running Task Two elapsed time: 10.0 Task One running Task One elapsed time: 5.0 Task Two running Task Two elapsed time: 2.0 Total elapsed time: 32.0 |
Poradie behu úloh je také isté ako v predchádzajúcom príklade. Tentoraz ale úlohy museli čakať na výsledok blokujúceho volania. Pomohla nám kooperatívna konkurencia? Vidíme, že súčet dĺžok behu jednotlivých úloh je rovnaký ako celková dĺžka behu úloh. Jednotlivé úlohy boli blokované a pred blokovaním neodovzdali riadenie, takže sme nič nezískali.
V poslednej časti kód transformujeme na plne asynchrónny kód. Využijeme k tomu modul asyncio
, ktorý implementuje event loop a metódy na prácu s korutinami a kľúčové slová jazyka Python async
a await
, ktoré označujú asynchrónny kód.
Funkcia task()
je označená príkazom async
. Znamená to, že sa jedná o natívnu korutinu. Jej beh je možné prerušiť pri riadkoch označených príkazom await
. V našom príklade je to na dvoch miestach:
Queue
za asyncio.Queue
, ktorá je asynchrónne čakateľná (angl. awaitable))Aj funkcia main
je korutina. Je prerušiteľná na dvoch miestach:
Vyskúšajte: Čo sa stane, ak nepoužijeme príkaz
await
pri vkladaní objektu do fronty?
Zavolaním task("one", work_queue)
vytvoríme objekt korutiny. Zatiaľ je to iba objekt v pamäti, ktorý nebeží, podobne ako generátorový objekt. Pripravené úlohy spustíme funkciou asyncio.gather()
. Táto funkcia z nich najprv vytvorí objekty Task
(tým ich naplánuje) a potom ich konkurentne spustí.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import asyncio import time async def task(name, queue): while not queue.empty(): delay = await queue.get() print(f"Task {name} running") time_start = time.perf_counter() await asyncio.sleep(delay) elapsed = time.perf_counter() - time_start print(f"Task {name} elapsed time: {elapsed:.1f}") async def main(): # Create the queue of work work_queue = asyncio.Queue() # Put some work in the queue for work in [15, 10, 5, 2]: await work_queue.put(work) start_time = time.perf_counter() await asyncio.gather( task("One", work_queue), task("Two", work_queue), ) elapsed = time.perf_counter() - start_time print(f"\nTotal elapsed time: {elapsed:.1f}") if __name__ == "__main__": asyncio.run(main()) |
Výstup:
1 2 3 4 5 6 7 8 9 10 11 |
Task One running Task Two running Task Two elapsed time: 10.0 Task Two running Task One elapsed time: 15.0 Task One running Task Two elapsed time: 5.0 Task One elapsed time: 2.0 Total elapsed time: 17.0 |
Z výsledku môžeme vidieť, že celkový čas behu je menší ako súčet behu všetkých úloh spolu. Je to vďaka tomu, že úlohy teraz naozaj bežali konkurentne.
Zatiaľ to boli všetko iba ukážkové príklady. Poďme sa pozrieť na kód, ktorý pracuje so skutočnými vstupno-výstupnými volaniami.
Vyskúšame pripojenie na niekoľko webových stránok. Adresy týchto stránok nahradili čísla vo fronte. Na pripojenie k stránke používame metódu zo štandardnej knižnice urllib.request.urlopen()
.
Úloha task
je znovu generátor a spracovanie ďalšej stránky voláme pomocou next()
v cykle. Výsledkom je sekvenčné spracovanie jednotlivých požiadaviek. Každá úloha je pri volaní metódy urllib.request.urlopen()
zablokovaná a celkový čas behu programu je rovnaký ako súčet behu jednotlivých požiadaviek.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
import queue import urllib.request import time def task(name, work_queue): while not work_queue.empty(): url = work_queue.get() print(f"Task {name} getting URL: {url}") time_start = time.perf_counter() urllib.request.urlopen(url) elapsed = time.perf_counter() - time_start print(f"Task {name} elapsed time: {elapsed:.1f}") yield def main(): # Create the queue of work work_queue = queue.Queue() # Put some work in the queue for url in [ "http://google.com", "http://stuba.sk", "http://linkedin.com", "http://apple.com", "http://microsoft.com", "http://facebook.com", "http://twitter.com", ]: work_queue.put(url) tasks = [task("One", work_queue), task("Two", work_queue)] # Run the tasks done = False start_time = time.perf_counter() while not done: for t in tasks: try: next(t) except StopIteration: tasks.remove(t) if len(tasks) == 0: done = True elapsed = time.perf_counter() - start_time print(f"\nTotal elapsed time: {elapsed:.1f}") if __name__ == "__main__": main() |
Výstup:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
Task One getting URL: http://google.com Task One elapsed time: 0.2 Task Two getting URL: http://stuba.sk Task Two elapsed time: 0.5 Task One getting URL: http://linkedin.com Task One elapsed time: 0.5 Task Two getting URL: http://apple.com Task Two elapsed time: 0.3 Task One getting URL: http://microsoft.com Task One elapsed time: 0.6 Task Two getting URL: http://facebook.com Task Two elapsed time: 0.4 Task One getting URL: http://twitter.com Task One elapsed time: 0.4 Total elapsed time: 2.9 |
Na asynchrónne HTTP požiadavky použijeme externý modul aiohttp
. Musíte si ho doinštalovať do vášho prostredia (napr. pomocou pip3 install aiohttp
). Z úlohy task
sa znova stane korutina vďaka príkazu async
. Najprv v nej vytvoríme „sedenie“ (angl. session), v rámci ktorého sa budeme pripájať. Všimnite si príkaz async
. Umožní prerušiť vykonávanie korutiny pri vstupe a výstupe z
withwith
bloku. Čakaním na objekt response.text()
prečítame telo odpovede.
Zvyšok programu je rovnaký ako pri poslednej ukážke. Vytvoríme dve úlohy a odmeriame čas ich vykonávania. Keďže teraz sa môžu korutiny striedať pri čakaní na odpoveď, celkový čas je menší ako pri synchrónnej ukážke – skoro o polovicu. Dosiahli sme to vďaka asynchrónnemu programovaniu v jednom vlákne.
Vyskúšajte vytvoriť jednu úlohu na jednu URL. Aký bude celkový čas vykonávania?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
import asyncio import aiohttp import time async def task(name, work_queue): async with aiohttp.ClientSession() as session: while not work_queue.empty(): url = await work_queue.get() print(f"Task {name} getting URL: {url}") time_start = time.perf_counter() async with session.get(url) as response: await response.text() elapsed = time.perf_counter() - time_start print(f"Task {name} elapsed time: {elapsed:.1f}") async def main(): # Create the queue of work work_queue = asyncio.Queue() # Put some work in the queue for url in [ "http://google.com", "http://stuba.sk", "http://linkedin.com", "http://apple.com", "http://microsoft.com", "http://facebook.com", "http://twitter.com", ]: await work_queue.put(url) # Run the tasks start_time = time.perf_counter() await asyncio.gather( task("One", work_queue), task("Two", work_queue), ) elapsed = time.perf_counter() - start_time print(f"\nTotal elapsed time: {elapsed:.1f}") if __name__ == "__main__": asyncio.run(main()) |
Výstup:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
Task One getting URL: http://google.com Task Two getting URL: http://stuba.sk Task One elapsed time: 0.2 Task One getting URL: http://linkedin.com Task Two elapsed time: 0.5 Task Two getting URL: http://apple.com Task One elapsed time: 0.5 Task One getting URL: http://microsoft.com Task Two elapsed time: 0.3 Task Two getting URL: http://facebook.com Task Two elapsed time: 0.2 Task Two getting URL: http://twitter.com Task One elapsed time: 0.7 Task Two elapsed time: 0.4 Total elapsed time: 1.5 |
Prepíšte tento kód, aby boli požiadavky zasielané asynchrónne:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# Written by Anton Caceres # https://github.com/MA3STR0/PythonAsyncWorkshop from urllib.request import urlopen import time URLS = [ 'http://dsl.sk', 'http://stuba.sk', 'http://shmu.sk', 'http://root.cz', ] def request_greetings(): responses = [] for url in URLS: resp = urlopen(url) responses.append(resp.read().decode('utf-8')) texts = '\n'.join(responses) return texts if __name__ == "__main__": t1 = time.time() greetings = request_greetings() print(time.time() - t1, "seconds passed") print(greetings) |
Napíšte vlastnú (ľubovoľnú) jednovláknovú aplikáciu v dvoch verziách: synchrónnej a asynchrónnej (pomocou natívnych korutín). V priloženej dokumentácií objasnite cieľ aplikácie a urobte výkonnostné porovnanie synchrónnej a asynchrónnej verzie. Nezabudnite zdôvodniť získané výsledky (zrýchlenie, spomalenie, nezmenený výkon).