ParallelStream nám umožňuje využiť viacprocesorové počítače. Framework apache Spark umožňuje vykonanie paralelneho algoritmu distribuovať cloud počítacov. Algoritmus nemusí byť dokonca napísaný len v jazyku java. Framework podporuje aj jazyky Python, Scala, a R.
Quick start https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications
Architektúra klastra https://spark.apache.org/docs/latest/cluster-overview.html
Spustenie na klastri https://spark.apache.org/docs/latest/submitting-applications.html
Spark framework poskytuje pre paralelné (funkcionálne) spracovanie dát dve API a ich kolekcie
RDD (resilent distributed dataset) – Základné Spark API https://spark.apache.org/docs/latest/rdd-programming-guide.html
Dataset – Spark SQL API určené špeciálne na spracovanie štrukturovaných dát. Poskytuje lepšiu podporu pre prácu s databázami (schema informácie…) https://spark.apache.org/docs/latest/sql-programming-guide.html
RDD API poskytuje viacero typov funkcií pre prácu s RDD
faktory metódy – slúžia na vytvorenie RDD z iných vstupných dátových zdojov. Napr. paralelize,
transformácie – slúžia na transformáciu jedného RDD na iné RDD. Napr. map
akcie – slúžia na získanie nedistribuovaných výstupných hodnôt/dát z RDD. Napr. reduce
Prehľad najpoužívanejších
transformácií: http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
akcií: http://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
Maven dependency pre build java aplikácií:
1 2 3 4 5 |
<dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> |
1 2 |
SparkConf conf = new SparkConf().setAppName("meno aplikacie").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf); |
Pozn. argument „local[2]“ nastavuje, že výpočet má bežať na lokálnom počítači paralelne na dvoch výpočtových uzloch (CPU) ak sú k dispozícii.
Pozri: https://spark.apache.org/docs/latest/rdd-programming-guide.html#initializing-spark
Pre podporu paralelných výpočtov poskytuje Spark Java API vlastnú RDD kolekciu – distribuovaný dataset (anlógia Java8 ParallelStreamu). RDD kolekcie môžeme vytvoriť rôznymi spôsobmi. Zo zoznamu ju získame pomocou kontext metódy parallelize.
Reimplementácia 1. príkladu z predchádzajúcej prednášky pomocou Spark RDD API. Ilustruje:
factory method paralellize na vytvorenie RDD zo zoznamu.
transformáciu map
akciu reduce
Pozn. všimite si, že pred ukončením aplikácie sa patrí uzavrieť prácu s spark-kontextom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public class UctyRddApp { public static void main(String[] args) { List<Ucet> ls = new ArrayList<>(); ls.add(new Ucet("U001", 100)); ls.add(new Ucet("U002", 200)); ls.add(new Ucet("U003", 300)); ls.add(new Ucet("U004", 400)); SparkConf conf = new SparkConf().setAppName("Ucty"); // ak to spustame s spark-submit bude uz master zadany if (conf.get("spark.master", null) == null) { conf.setMaster("local[2]"); } JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Ucet> rdd = sc.parallelize(ls); // ls.stream() double d = rdd.map(u->u.getStav()).reduce((a,b)->a+b); // map(u->u.getStav()).reduce(0.0, (a,b)->a+b) System.out.println("" + d); sc.stop(); sc.close(); } } |
Pozn. Namiesto kolekcie Stream použijete RDD kolekciu objektov triedy Ucet. RDD kolekcia však vyžaduje aby objekty, ktoré v nej boli serializovateľné, teda:
1 |
public class Ucet implements Serializable { |
Spustiť a otestovať aplikáciu môžete priamo v netbeans alebo z prikazového riadku skriptom spark-submit
1 |
./bin/spark-submit --class asos.UctyRddApp /home/igor/EDU/ASOS/2019/Spark/UctyRdd/target/UctyRdd-1.0.jar |
Pozri: https://spark.apache.org/docs/latest/submitting-applications.html.
Java aplikácia ilustrujúca niektoré ďalšie transformácie a akcie:
factory method textFile
akcia count
akcia first
akcia collect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("spark-words-app"); // ak to spustame s spark-submit bude uz master zadany if (conf.get("spark.master", null) == null) { conf.setMaster("local[2]"); } JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> distFile = sc.textFile("src/test/csv/data.txt"); // pocet datovych clenov v sade System.out.println("Pocet riadkov: " + distFile.count()); //1. first vracia 1. clen datovej sady System.out.println("Prvy riadok: " + distFile.first()); //2. vytvori List vsetkych clenov RDD List<String> lines = distFile.collect(); for (String s: lines) { System.out.println(s); } sc.stop(); sc.close(); } |
transformácia filter
1 2 3 4 5 |
// 3. filter JavaRDD<String> filtered = distFile.filter(f->f.contains("Ahoj")); for (String s: filtered.collect()) System.out.println("" + s); |
transformácia flatMap
1 2 3 4 5 6 |
// rozdeli riadky na slova - vystupná sada obsahuje slová JavaRDD<String> words = distFile.flatMap(s -> Arrays.asList(s.split(" ")).iterator()); for (String w : words.collect()) { System.out.println(w); } |
Pozn. distFile.map(s -> s.split(“ „)) nefunguje tak ako by sme chceli: vyskúšajte
Ak by sme chceli zistiť koľkokrát sa v súbore vyskytujú jednotlivé slová zrejme by sme vytvorili java.util.map, v ktorej klúčom bude slovo a hodnota je počet.
Distribuovanou obdobou mapy v Sparku je JavaPairRDD, čo je v podstate RDD usporiadaných dvojíc (scala.Tuple2). Pracujú s ňou napr. metódy
mapToPair
reduceByKey
collectAsMap
http://spark.apache.org/docs/latest/rdd-programming-guide.html#working-with-key-value-pairs
http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html
1 2 3 4 5 6 |
JavaPairRDD<String, Integer> pairs = words.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b); Map<String, Integer> r = counts.collectAsMap(); for (String k: r.keySet()) System.out.println(k + ":" + r.get(k)); |
mapValues todo
Spark poskytuje RDD API aj pre ďalšie jazyky ako Python, Scala a R. Scala je interpretovaný funkcionálny jazyk vykonávaný JVM.
umožňuje rýchlo zvládnuť API a použiť ho pre analýzu dát v interaktívnom mode. Spustenie scala-shellu:
1 |
./bin/spark-shell |
1 2 3 4 5 6 7 8 9 |
val lines = sc.textFile("/home/igor/EDU/ASOS/2018/Predn10/subor1.txt") lines.first() lines.collect() val words = lines.flatMap(r=>r.split(" ")) words.collect() words.map(w=>(w,1)).reduceByKey( (a,b) => a+b ).collect() |
Ako mieru podobnosti dvoch množín môžeme použiť napr. symetrickú diferenciu. Pre prácu RDD ako množinami poskytuje Spark typické množinové operácie:
1 2 3 4 5 6 7 8 9 |
val s1 = sc.textFile("/home/igor/EDU/ASOS/2018/Predn10/subor1.txt") val s2 = sc.textFile("/home/igor/EDU/ASOS/2018/Predn10/subor2.txt") val w1 = s1.flatMap(r=>r.split(" ")).filter(a=>a.length()>0).distinct() val w2 = s2.flatMap(r=>r.split(" ")).filter(a=>a.length()>0).distinct() w1.union(w2).distinct().count() - w1.intersection(w2).count() w1.subtract(w2).collect() |
Map-Reduce umožňuje paralelizovať aj zložitejšie výpočty
Vstup: sada usporiadaných dvojíc čísel (bodov v rovine): (xi , yi) pre i=1..n
Výstup: koeficienty a, c regresnej priamky y = a +c*x aproximujúcej vstupnú množinu.
Numerické riešenie: Koeficienty možno vyrátať pomocou jednoduchých vzorcov (pozri napr. http://it4kt.cnl.sk/c/nm/lecturer/13.html):
c = (n.sxy – sx.sy) / (n.sx2 – sx.sx)
a = (sy – c.sx) / n
kde
sx = ∑xi
sy = ∑yi
sxy = ∑xi.yi
sx2 = ∑xi.xi
Implementáia riešenia pomocou Spark v jazyku Scala
1 2 3 4 5 6 7 8 9 10 11 |
val pts = Array( (0,-1), (1,1), (2,3)) val pdd = sc.parallelize(pts) val n = pdd.map(p=>1).reduce((a,b)=>a+b) val x = pdd.map(p=>p._1).reduce((a,b)=>a+b) val y = pdd.map(p=>p._2).reduce((a,b)=>a+b) val xy = pdd.map(p=>p._1*p._2).reduce((a,b)=>a+b) val x2 = pdd.map(p=>p._1*p._1).reduce((a,b)=>a+b) val c = (n*xy - x*y)/ (n*x2 - x*x) val a = (y - c*x)/n |
vyýpočty súm môžeme urobiť jediným príkazom
1 |
val r = sc.parallelize(pts).map(p => (1, p._1, p._2, p._1*p._1, p._1*p._2)).reduce((a,b) => (a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5)) |
Java applikácia – DU
Necessary cookies are absolutely essential for the website to function properly. This category only includes cookies that ensures basic functionalities and security features of the website. These cookies do not store any personal information.