Wednesday, May 27, 2015
[edit]Work on Altiscale cluster
JSON Metadata Spark Job
[edit]Generated using /home/joal/refinery-job.jar
/opt/spark/bin/spark-submit --verbose --master yarn --deploy-mode client \ --driver-memory 2G --executor-memory 2G --executor-cores 1 --num-executors 64 \ --driver-class-path $(find /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo-* | head -n 1) \ --class \ /home/joal/refinery-job.jar /user/halfak/streaming/enwiki-20150304/xml-bz2 /user/joal/enwiki_20150304_json_metadata
JSON Data stored in 500 files (about 128Mb each) at /user/joal/enwiki_20150304_json_metadata
. It contains one json line per revision, with limited set of fields: id, parent_id, timestamp, page, contributor, minor, bytes and a special field called prev_rev containing the id and timestamp of the previously parsed revision (if in the same page) in the xml order. Reducing information to that level allows us to generate interesting statistics about enwiki dump using spark with reasonnable execution time.
Spark Shell
[edit]Launch spark-shell having refinery-job.jar in the classpath to take advantage of json libraries:
/opt/spark/bin/spark-shell --master yarn --num-executors 64 --executor-memory 4G \ --driver-class-path $(find /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo-* | head -n 1) --jars /home/joal/refinery-job.jar
This launches a HEAVY spark-shellinstance (64 workers, 4G ram each). Please tweak --num-executors 64 --executor-memory 4G
parameters to adjust.
Spark Analysis
[edit]Import some usefull spark and json libraries:
import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.Serialization import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD
Load revs
value as parsed metadata files, caching it for faster usage:
val revs = sc.textFile("/user/joal/enwiki_20150304_json_metadata").map(parse(_)).cache()
And now get some results:
// Revisions count revs.count Long = 599684931 // Unique pages count => (v \ "page" \ "id").asInstanceOf[JInt].num.toLong).distinct.count() Long = 35332741 /*****************************************/ // Function printing log10 base distribution of a pair rdd // We assume the pair to contain an id (long) and a long value. def printlog10Dist(rdd: RDD[(Long, Long)]): Unit = { rdd. map(p => (math.log10(p._2).toInt, 1L)). aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2). collect.foreach(println) } /*****************************************/ // Revisions per page val revsPerPage = revs. map(v => ((v \ "page" \ "id").asInstanceOf[JInt].num.toLong, 1L)). aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2) // Revisions per page distribution, log10 base printlog10Dist(revsPerPage) (0,29051606) (1,5358977) (2,859891) (3,61327) (4,908) (5,31) (6,1) // Top 32 pages by number of revisions (pageId, num_revisions) revsPerPage.sortBy(p => p._2, ascending=false).take(32).foreach(println) (1952670,1058630) (5137507,821551) (13784401,615671) (16283969,582865) (2535910,388984) (352651,341462) (972034,329975) (2189161,326340) (564696,326107) (16927404,320463) (2535875,308040) (40297,282846) (5149102,254172) (11022716,253033) (32101143,188999) (2515121,170923) (31530695,169290) (6041086,161979) (1470141,157437) (12056747,153411) (11238105,148006) (1226609,145709) (4626266,143302) (8993207,141341) (3252662,133308) (3741656,133192) (5030553,129121) (3514978,119653) (11005908,116607) (9870625,115515) (7535778,112431) (6768170,106106) /*****************************************/ // SUM(Bytes) per page val sumBytesPerPage = revs. map(v => ((v \ "page" \ "id").asInstanceOf[JInt].num.toLong, (v \ "bytes").asInstanceOf[JInt].num.toLong)). aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2) // SUM(Bytes) per page distribution, log10 base printlog10Dist(sumBytesPerPage) (-2147483648,2577) (0,103907) (1,9381750) (2,8964865) (3,9582612) (4,4477007) (5,2072308) (6,599491) (7,131373) (8,16307) (9,520) (10,23) (11,1) // Top XX pages by SUM(Bytes) (pageId, sum_bytes) sumBytesPerPage.sortBy(p => p._2, ascending=false).take(24).foreach(println) (5137507,255748072182) (2535910,60746530013) (972034,60114973986) (5149102,43014476215) (2535875,36922541407) (564696,35003941405) (40297,32445227518) (11424955,22193085230) (32101143,21488079051) (3706897,18549825060) (1470141,17916266438) (986140,17793871027) (3252662,17645815648) (16927404,17511855999) (6768170,16721293033) (3741656,13512581014) (75321,12517129056) (2515121,11889235356) (6041086,11859473845) (9870625,11845661846) (6905700,11410275184) (17820752,11347901892) (10701605,10671585654) (3514978,10141543074) /*****************************************/ // Bytes per revision val bytesPerRev = revs. map(v => ((v \ "id").asInstanceOf[JInt].num.toLong, (v \ "bytes").asInstanceOf[JInt].num.toLong)). aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2) // Bytes per revision distribution, log10 base printlog10Dist(bytesPerRev) (-2147483648,1348990) (0,666045) (1,28060977) (2,58117910) (3,263271820) (4,229162391) (5,19020550) (6,36243) (7,5) // Top XX revisions by bytes bytesPerRev.sortBy(p => p._2, ascending=false).take(5).foreach(println) (35775401,10597086) (39456244,10369813) (26720552,10245346) (35891845,10241883) (21299582,10112256) /*****************************************/ // Unordered revisions val unord_revs = revs.filter(json => (((json \ "prev_rev") != JNothing) && ((json \ "timestamp").asInstanceOf[JString].s < (json \ "prev_rev" \ "timestamp").asInstanceOf[JString].s))) unord_revs.count() Long = 51411 => (v \ "page" \ "id").asInstanceOf[JInt].num.toLong).distinct.count() Long = 36176
Next things to do
[edit]- Deeper analysis on unordered revisions (big / small pages, many revisions in between unordered ones ...)
- Same analysis with subset having namespace = 0 (articles)
val articles = sc.textFile("/user/joal/enwiki_20150304_json_metadata").map(parse(_)). filter(j => (j \ "page" \ "namespace").asInstanceOf[JInt].num.toLong == 0L).cache()
