Vytvor asynchrónnu aplikáciu, ktorá bude asynchrónne spúšťať viacero pajplajn súbežne. Každá pajplajna pozostáva z viacerých sériovo zapojených korutín:
Náležitosti riešenia:
logging (správna voľba úrovne hlásenia)async/await a asyncio.run()TaskGroup, nie asyncio.gather() a pomocou tohto mechanizmu zruš všetky pajplajn pri chybe niektorej z nichPreštuduj priložený programový kód a vhodne doplň na predpripravené miesta chýbajúcu funkcionalitu. Kód je k dispozícii na skopírovanie nižšie alebo v archíve na stiahnutie.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import asyncio import logging from typing import AsyncGenerator, List # ------------------------------------------------- # Konfigurácia retry # ------------------------------------------------- MAX_RETRIES = 3 BASE_DELAY = 0.5 # ------------------------------------------------- # LOGGING # ------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)-8s | %(message)s", ) logger = logging.getLogger(__name__) # ------------------------------------------------- # VÝNIMKY # ------------------------------------------------- class TransientAPIError(RuntimeError): """Použi túto výnimku pre retry API volaní.""" pass # ------------------------------------------------- # TODO 1: Asynchrónny API stream # ------------------------------------------------- async def api_stream(client_id: str) -> AsyncGenerator[str, None]: logger.info(f"[{client_id}] API stream: pripojenie") responses = [ "10,20,30", "40,invalid,50", "CRASH", # simuluje prechodnú chybu API "60,70", ] try: """ - Simuluj externý API stream (prejdi všetky hodnoty responses) - Na simuláciu dosiahnutia zdroja použi await asyncio.sleep(.6) - Ak nastane API chyba, náhodne zvoľ medzi pokračovaním a generovaním výnimky TransientAPIError - Zaloguj informáciu "[{client_id}] API data -> {data}" - Použi yield na vrátanie údajov """ except asyncio.CancelledError: logger.warning(f"[{client_id}] API stream ZRUŠENÝ") raise finally: logger.info(f"[{client_id}] API stream: odpojenie") # ------------------------------------------------- # TODO 2: Pipeline kroky # ------------------------------------------------- async def parse_data(raw: str, name: str) -> List[str]: await asyncio.sleep(0.2) """TODO: rozdeľ vstupný reťazec""" async def validate_data(items: List[str], name: str) -> List[int]: await asyncio.sleep(0.2) """ TODO: - premeň len platné čísla - ak nič neostane -> raise ValueError """ async def transform_data(data: List[int], name: str) -> List[int]: await asyncio.sleep(0.2) """TODO: transformuj dáta (napr. * 2)""" async def aggregate_data(data: List[int], name: str) -> int: await asyncio.sleep(0.2) """TODO: agreguj dáta (napr. sum)""" async def store_result(result: int, name: str) -> None: """simuluj uloženie výsledku""" logger.info(f"[{name}] Store result = {result}") await asyncio.sleep(0.2) # ------------------------------------------------- # TODO 3: Pipeline s retry a graceful shutdown logikou # ------------------------------------------------- async def pipeline(name: str) -> None: logger.info(f"[{name}] Pipeline štart") retries = 0 try: while retries < MAX_RETRIES: """ TODO: - použi async for nad api_stream * nad API údajmi volaj zreťazenie korutín * zaloguj [{name}] Pipeline úspešne dokončená" * ukonči pajplajnu - ošetri TransientAPIError (retry) exception as e * vypočítaj delay vzťahom BASE_DELAY * (2 ** retries) * zaloguj "[{name}] API chyba ({retries}/{MAX_RETRIES}), retry za {delay:.1f}s -> {e}" * počkaj """ raise RuntimeError(f"[{name}] Vyčerpaný maximálny počet retry") except asyncio.CancelledError: logger.warning(f"[{name}] Pipeline GRACEFUL SHUTDOWN") raise except Exception as exc: logger.error(f"[{name}] Pipeline ZLYHALA: {exc}") raise # ------------------------------------------------- # TODO 4: Spustenie viacerých pipeline # ------------------------------------------------- async def main() -> None: logger.info("=== Spúšťam pipeline ===") """ TODO: - použi asyncio.TaskGroup - spusti aspoň 3 pipeline konkurentne - zachyť ExceptionGroup (except*) * zaloguj: "=== TaskGroup ukončený kvôli chybe ===" * každú výnimku v skupine zaloguj: " -> {e}" - zaloguj korektné ukončenie programu """ if __name__ == "__main__": asyncio.run(main()) |
