ParallelStream nám umožňuje využiť viacprocesorové počítače. Framework apache Spark umožňuje vykonanie paralelneho algoritmu distribuovať cloud počítacov. Algoritmus nemusí byť dokonca napísaný len v jazyku java. Framework podporuje aj jazyky Python, Scala, a R.
Quick start https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications
Architektúra klastra https://spark.apache.org/docs/latest/cluster-overview.html
Spustenie na klastri https://spark.apache.org/docs/latest/submitting-applications.html
Spark framework poskytuje pre paralelné (funkcionálne) spracovanie dát dve API a ich kolekcie
RDD (resilent distributed dataset) – Základné Spark API https://spark.apache.org/docs/latest/rdd-programming-guide.html
Dataset – Spark SQL API určené špeciálne na spracovanie štrukturovaných dát. Poskytuje lepšiu podporu pre prácu s databázami (schema informácie…) https://spark.apache.org/docs/latest/sql-programming-guide.html
RDD API poskytuje viacero typov funkcií pre prácu s RDD
faktory metódy – slúžia na vytvorenie RDD z iných vstupných dátových zdojov. Napr. paralelize,
transformácie – slúžia na transformáciu jedného RDD na iné RDD. Napr. map
akcie – slúžia na získanie nedistribuovaných výstupných hodnôt/dát z RDD. Napr. reduce
Prehľad najpoužívanejších
transformácií: http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
akcií: http://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
Maven dependency pre build java aplikácií:
1 2 3 4 5 |
<dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> |
1 2 |
SparkConf conf = new SparkConf().setAppName("meno aplikacie").setMaster("local[2]"); JavaSparkContext sc = new JavaSparkContext(conf); |
Pozn. argument „local[2]“ nastavuje, že výpočet má bežať na lokálnom počítači paralelne na dvoch výpočtových uzloch (CPU) ak sú k dispozícii.
Pozri: https://spark.apache.org/docs/latest/rdd-programming-guide.html#initializing-spark
Pre podporu paralelných výpočtov poskytuje Spark Java API vlastnú RDD kolekciu – distribuovaný dataset (anlógia Java8 ParallelStreamu). RDD kolekcie môžeme vytvoriť rôznymi spôsobmi. Zo zoznamu ju získame pomocou kontext metódy parallelize.
Reimplementácia 1. príkladu z predchádzajúcej prednášky pomocou Spark RDD API. Ilustruje:
factory method paralellize na vytvorenie RDD zo zoznamu.
transformáciu map
akciu reduce
Pozn. všimite si, že pred ukončením aplikácie sa patrí uzavrieť prácu s spark-kontextom
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public class UctyRddApp { public static void main(String[] args) { List<Ucet> ls = new ArrayList<>(); ls.add(new Ucet("U001", 100)); ls.add(new Ucet("U002", 200)); ls.add(new Ucet("U003", 300)); ls.add(new Ucet("U004", 400)); SparkConf conf = new SparkConf().setAppName("Ucty"); // ak to spustame s spark-submit bude uz master zadany if (conf.get("spark.master", null) == null) { conf.setMaster("local[2]"); } JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Ucet> rdd = sc.parallelize(ls); // ls.stream() double d = rdd.map(u->u.getStav()).reduce((a,b)->a+b); // map(u->u.getStav()).reduce(0.0, (a,b)->a+b) System.out.println("" + d); sc.stop(); sc.close(); } } |
Pozn. Namiesto kolekcie Stream použijete RDD kolekciu objektov triedy Ucet. RDD kolekcia však vyžaduje aby objekty, ktoré v nej boli serializovateľné, teda:
1 |
public class Ucet implements Serializable { |
Spustiť a otestovať aplikáciu môžete priamo v netbeans alebo z prikazového riadku skriptom spark-submit
1 |
./bin/spark-submit --class asos.UctyRddApp /home/igor/EDU/ASOS/2019/Spark/UctyRdd/target/UctyRdd-1.0.jar |
Pozri: https://spark.apache.org/docs/latest/submitting-applications.html.
Java aplikácia ilustrujúca niektoré ďalšie transformácie a akcie:
factory method textFile
akcia count
akcia first
akcia collect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("spark-words-app"); // ak to spustame s spark-submit bude uz master zadany if (conf.get("spark.master", null) == null) { conf.setMaster("local[2]"); } JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> distFile = sc.textFile("src/test/csv/data.txt"); // pocet datovych clenov v sade System.out.println("Pocet riadkov: " + distFile.count()); //1. first vracia 1. clen datovej sady System.out.println("Prvy riadok: " + distFile.first()); //2. vytvori List vsetkych clenov RDD List<String> lines = distFile.collect(); for (String s: lines) { System.out.println(s); } sc.stop(); sc.close(); } |
transformácia filter
1 2 3 4 5 |
// 3. filter JavaRDD<String> filtered = distFile.filter(f->f.contains("Ahoj")); for (String s: filtered.collect()) System.out.println("" + s); |
transformácia flatMap
1 2 3 4 5 6 |
// rozdeli riadky na slova - vystupná sada obsahuje slová JavaRDD<String> words = distFile.flatMap(s -> Arrays.asList(s.split(" ")).iterator()); for (String w : words.collect()) { System.out.println(w); } |
Pozn. distFile.map(s -> s.split(“ „)) nefunguje tak ako by sme chceli: vyskúšajte
Ak by sme chceli zistiť koľkokrát sa v súbore vyskytujú jednotlivé slová zrejme by sme vytvorili java.util.map, v ktorej klúčom bude slovo a hodnota je počet.
Distribuovanou obdobou mapy v Sparku je JavaPairRDD, čo je v podstate RDD usporiadaných dvojíc (scala.Tuple2). Pracujú s ňou napr. metódy
mapToPair
reduceByKey
collectAsMap
http://spark.apache.org/docs/latest/rdd-programming-guide.html#working-with-key-value-pairs
http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html
1 2 3 4 5 6 |
JavaPairRDD<String, Integer> pairs = words.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b); Map<String, Integer> r = counts.collectAsMap(); for (String k: r.keySet()) System.out.println(k + ":" + r.get(k)); |
mapValues todo
Spark poskytuje RDD API aj pre ďalšie jazyky ako Python, Scala a R. Scala je interpretovaný funkcionálny jazyk vykonávaný JVM.
umožňuje rýchlo zvládnuť API a použiť ho pre analýzu dát v interaktívnom mode. Spustenie scala-shellu:
1 |
./bin/spark-shell |
1 2 3 4 5 6 7 8 9 |
val lines = sc.textFile("/home/igor/EDU/ASOS/2018/Predn10/subor1.txt") lines.first() lines.collect() val words = lines.flatMap(r=>r.split(" ")) words.collect() words.map(w=>(w,1)).reduceByKey( (a,b) => a+b ).collect() |
Ako mieru podobnosti dvoch množín môžeme použiť napr. symetrickú diferenciu. Pre prácu RDD ako množinami poskytuje Spark typické množinové operácie:
1 2 3 4 5 6 7 8 9 |
val s1 = sc.textFile("/home/igor/EDU/ASOS/2018/Predn10/subor1.txt") val s2 = sc.textFile("/home/igor/EDU/ASOS/2018/Predn10/subor2.txt") val w1 = s1.flatMap(r=>r.split(" ")).filter(a=>a.length()>0).distinct() val w2 = s2.flatMap(r=>r.split(" ")).filter(a=>a.length()>0).distinct() w1.union(w2).distinct().count() - w1.intersection(w2).count() w1.subtract(w2).collect() |
Map-Reduce umožňuje paralelizovať aj zložitejšie výpočty
Vstup: sada usporiadaných dvojíc čísel (bodov v rovine): (xi , yi) pre i=1..n
Výstup: koeficienty a, c regresnej priamky y = a +c*x aproximujúcej vstupnú množinu.
Numerické riešenie: Koeficienty možno vyrátať pomocou jednoduchých vzorcov (pozri napr. http://it4kt.cnl.sk/c/nm/lecturer/13.html):
c = (n.sxy – sx.sy) / (n.sx2 – sx.sx)
a = (sy – c.sx) / n
kde
sx = ∑xi
sy = ∑yi
sxy = ∑xi.yi
sx2 = ∑xi.xi
Implementáia riešenia pomocou Spark v jazyku Scala
1 2 3 4 5 6 7 8 9 10 11 |
val pts = Array( (0,-1), (1,1), (2,3)) val pdd = sc.parallelize(pts) val n = pdd.map(p=>1).reduce((a,b)=>a+b) val x = pdd.map(p=>p._1).reduce((a,b)=>a+b) val y = pdd.map(p=>p._2).reduce((a,b)=>a+b) val xy = pdd.map(p=>p._1*p._2).reduce((a,b)=>a+b) val x2 = pdd.map(p=>p._1*p._1).reduce((a,b)=>a+b) val c = (n*xy - x*y)/ (n*x2 - x*x) val a = (y - c*x)/n |
vyýpočty súm môžeme urobiť jediným príkazom
1 |
val r = sc.parallelize(pts).map(p => (1, p._1, p._2, p._1*p._1, p._1*p._2)).reduce((a,b) => (a._1+b._1, a._2+b._2, a._3+b._3, a._4+b._4, a._5+b._5)) |
Java applikácia – DU