Research talk:Measuring edit productivity/Work log/2015-05-27
Add topicWednesday, May 27, 2015
[edit]Work on Altiscale cluster
JSON Metadata Spark Job
[edit]Generated using /home/joal/refinery-job.jar:
/opt/spark/bin/spark-submit --verbose --master yarn --deploy-mode client \ --driver-memory 2G --executor-memory 2G --executor-cores 1 --num-executors 64 \ --driver-class-path $(find /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo-* | head -n 1) \ --class org.wikimedia.analytics.refinery.job.MediaWikiDumpXMLToJSONMetadata \ /home/joal/refinery-job.jar /user/halfak/streaming/enwiki-20150304/xml-bz2 /user/joal/enwiki_20150304_json_metadata
JSON Data stored in 500 files (about 128Mb each) at /user/joal/enwiki_20150304_json_metadata. It contains one json line per revision, with limited set of fields: id, parent_id, timestamp, page, contributor, minor, bytes and a special field called prev_rev containing the id and timestamp of the previously parsed revision (if in the same page) in the xml order. Reducing information to that level allows us to generate interesting statistics about enwiki dump using spark with reasonnable execution time.
Spark Shell
[edit]Launch spark-shell having refinery-job.jar in the classpath to take advantage of json libraries:
/opt/spark/bin/spark-shell --master yarn --num-executors 64 --executor-memory 4G \ --driver-class-path $(find /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo-* | head -n 1) --jars /home/joal/refinery-job.jar
This launches a HEAVY spark-shellinstance (64 workers, 4G ram each). Please tweak --num-executors 64 --executor-memory 4G parameters to adjust.
Spark Analysis
[edit]Import some usefull spark and json libraries:
import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.Serialization import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD
Load revs value as parsed metadata files, caching it for faster usage:
val revs = sc.textFile("/user/joal/enwiki_20150304_json_metadata").map(parse(_)).cache()
And now get some results:
// Revisions count
revs.count
Long = 599684931
// Unique pages count
revs.map(v => (v \ "page" \ "id").asInstanceOf[JInt].num.toLong).distinct.count()
Long = 35332741
/*****************************************/
// Function printing log10 base distribution of a pair rdd
// We assume the pair to contain an id (long) and a long value.
def printlog10Dist(rdd: RDD[(Long, Long)]): Unit = {
rdd.
map(p => (math.log10(p._2).toInt, 1L)).
aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2).
collect.foreach(println)
}
/*****************************************/
// Revisions per page
val revsPerPage = revs.
map(v => ((v \ "page" \ "id").asInstanceOf[JInt].num.toLong, 1L)).
aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2)
// Revisions per page distribution, log10 base
printlog10Dist(revsPerPage)
(0,29051606)
(1,5358977)
(2,859891)
(3,61327)
(4,908)
(5,31)
(6,1)
// Top 32 pages by number of revisions (pageId, num_revisions)
revsPerPage.sortBy(p => p._2, ascending=false).take(32).foreach(println)
(1952670,1058630)
(5137507,821551)
(13784401,615671)
(16283969,582865)
(2535910,388984)
(352651,341462)
(972034,329975)
(2189161,326340)
(564696,326107)
(16927404,320463)
(2535875,308040)
(40297,282846)
(5149102,254172)
(11022716,253033)
(32101143,188999)
(2515121,170923)
(31530695,169290)
(6041086,161979)
(1470141,157437)
(12056747,153411)
(11238105,148006)
(1226609,145709)
(4626266,143302)
(8993207,141341)
(3252662,133308)
(3741656,133192)
(5030553,129121)
(3514978,119653)
(11005908,116607)
(9870625,115515)
(7535778,112431)
(6768170,106106)
/*****************************************/
// SUM(Bytes) per page
val sumBytesPerPage = revs.
map(v => ((v \ "page" \ "id").asInstanceOf[JInt].num.toLong, (v \ "bytes").asInstanceOf[JInt].num.toLong)).
aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2)
// SUM(Bytes) per page distribution, log10 base
printlog10Dist(sumBytesPerPage)
(-2147483648,2577)
(0,103907)
(1,9381750)
(2,8964865)
(3,9582612)
(4,4477007)
(5,2072308)
(6,599491)
(7,131373)
(8,16307)
(9,520)
(10,23)
(11,1)
// Top XX pages by SUM(Bytes) (pageId, sum_bytes)
sumBytesPerPage.sortBy(p => p._2, ascending=false).take(24).foreach(println)
(5137507,255748072182)
(2535910,60746530013)
(972034,60114973986)
(5149102,43014476215)
(2535875,36922541407)
(564696,35003941405)
(40297,32445227518)
(11424955,22193085230)
(32101143,21488079051)
(3706897,18549825060)
(1470141,17916266438)
(986140,17793871027)
(3252662,17645815648)
(16927404,17511855999)
(6768170,16721293033)
(3741656,13512581014)
(75321,12517129056)
(2515121,11889235356)
(6041086,11859473845)
(9870625,11845661846)
(6905700,11410275184)
(17820752,11347901892)
(10701605,10671585654)
(3514978,10141543074)
/*****************************************/
// Bytes per revision
val bytesPerRev = revs.
map(v => ((v \ "id").asInstanceOf[JInt].num.toLong, (v \ "bytes").asInstanceOf[JInt].num.toLong)).
aggregateByKey(0L)((v1, v2) => v1 + v2, (v1, v2) => v1 + v2)
// Bytes per revision distribution, log10 base
printlog10Dist(bytesPerRev)
(-2147483648,1348990)
(0,666045)
(1,28060977)
(2,58117910)
(3,263271820)
(4,229162391)
(5,19020550)
(6,36243)
(7,5)
// Top XX revisions by bytes
bytesPerRev.sortBy(p => p._2, ascending=false).take(5).foreach(println)
(35775401,10597086)
(39456244,10369813)
(26720552,10245346)
(35891845,10241883)
(21299582,10112256)
/*****************************************/
// Unordered revisions
val unord_revs = revs.filter(json => (((json \ "prev_rev") != JNothing)
&& ((json \ "timestamp").asInstanceOf[JString].s < (json \ "prev_rev" \ "timestamp").asInstanceOf[JString].s)))
unord_revs.count()
Long = 51411
unord_revs.map(v => (v \ "page" \ "id").asInstanceOf[JInt].num.toLong).distinct.count()
Long = 36176
Next things to do
[edit]- Deeper analysis on unordered revisions (big / small pages, many revisions in between unordered ones ...)
- Same analysis with subset having namespace = 0 (articles)
val articles = sc.textFile("/user/joal/enwiki_20150304_json_metadata").map(parse(_)).
filter(j => (j \ "page" \ "namespace").asInstanceOf[JInt].num.toLong == 0L).cache()
JAllemandou (WMF) (talk | contribs) 20:52, 27 May 2015
signed by Halfak (WMF) (talk) 06:44, 28 May 2015 (UTC)