WikiXRay parser
From Meta, a Wikimedia project coordination wiki
Cut & paste the following code in a text file, and save it as dump_sax.py. Don't forget to give your file executable privileges.
It uses a tweak to speed up the parsing process, consisting on filtering the text events so that the characters method is called only when the complete text block has been buffered. You can see further details of this recipe in the Python CookBook (though I retrieved mine from the second edition of the printed version of the Python Cookbook, by O'Reilly; obviously, they explicitly give you permission to use the code for this purpose).
############################################# # WikiXRay: Quantitative Analysis of Wikipedia language versions ############################################# # http://wikixray.berlios.de ############################################# # Copyright (c) 2006-7 Universidad Rey Juan Carlos (Madrid, Spain) ############################################# # This program is free software. You can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 or later of the GPL. ############################################# # Author: Jose Felipe Ortega Soto import sys,os,codecs, datetime, random import dbaccess from xml.sax import saxutils,make_parser from xml.sax.handler import feature_namespaces, ContentHandler from xml.sax.saxutils import XMLFilterBase, XMLGenerator from optparse import OptionParser class text_normalize_filter(XMLFilterBase): """ SAX filter to ensure that contiguous texts nodes are merged into one That hopefully speeds up the parsing process a lot, specially when reading revisions with long text Receip by Uche Ogbuji, James Kew and Peter Cogolo Retrieved from "Python Cookbook, 2nd ed., by Alex Martelli, Anna Martelli Ravenscroft, and David Ascher (O'Reillly Media, 2005) 0-596-00797-3" """ def __init__(self, upstream, downstream): XMLFilterBase.__init__(self, upstream) self._downstream=downstream self._accumulator=[] def _complete_text_node(self): if self._accumulator: self._downstream.characters(''.join(self._accumulator)) self._accumulator=[] def characters(self, text): self._accumulator.append(text) def ignorableWhiteSpace(self, ws): self._accumulator.append(text) def _wrap_complete(method_name): def method(self, *a, **k): self._complete_text_node() getattr(self._downstream, method_name)(*a, **k) method.__name__= method_name setattr(text_normalize_filter, method_name, method) for n in '''startElement endElement endDocument'''.split(): _wrap_complete(n) class wikiHandler(ContentHandler): """Parse an XML file generated by Wikipedia Export page into SQL data suitable to be imported by MySQL""" def __init__(self, options): self.fileErrPath="./errors.log"; self.options=options if self.options.monitor and not self.options.fileout and not self.options.streamout: self.acceso = dbaccess.get_Connection(self.options.machine, self.options.port,\ self.options.user, self.options.passwd, self.options.database) self.nspace_dict={}; self.codens=''; self.page_dict={}; self.rev_dict = {} self.stack=[]; self.current_text = ''; self.current_elem=None; self.revfile=None self.pagefile=None self.page_num = 0; self.rev_num=0; self.last_page_len=0; self.rev_count=0 self.prior_rev_id='NULL'; self.isRedirect='0'; self.isStub='0'; self.isMinor='0' self.revinsert=''; self.pageinsert=''; self.textinsert='' self.revinsertrows=0; self.revinsertsize=0; self.pageinsertrows=0 self.pageinsertsize=0; self.textinsertrows=0; self.textinsertsize=0 self.start=datetime.datetime.now(); self.timeCheck=None; self.timeDelta=None def startElement(self, name, attrs): ## Here we define which tags we want to catch ## In this case, we only want to recall the name of the tags in a stack ## so we can later look up the parent node of a new tag ## (for instance, to discriminate among page id, rev id and contributor id ## all of them with the name=="id") if name=='page' or name=='revision' or name=='contributor': self.stack.append(name) elif name=='namespace': self.codens=attrs.get('key') elif name=='minor': self.isMinor='1' self.current_text='' self.current_elem=name return def endElement(self, name): ## Defining tasks to manage contents from the last readed tag ## Catching the namespace of this page if name=='namespace': self.nspace_dict[self.current_text]=self.codens elif name=='id': if self.stack[-1]=='contributor': ##Detecting contributor's attributes inside a revision self.rev_dict['rev_user']=self.current_text elif self.stack[-1]=='revision': self.rev_dict[name]=self.current_text elif self.stack[-1]=='page': self.page_dict[name]=self.current_text else: self.f=open(self.fileErrPath,'w') if len(self.stack)>0: self.f.write("Unsupported parent tag for '"+name+"': "+self.stack[-1]) self.f.close() elif name=='ip': self.rev_dict['rev_user']='0' self.rev_dict['username']=self.current_text elif name=='timestamp': ##Adequate formatting of timestamps self.rev_dict['timestamp']=self.current_text.replace('Z','').replace('T',' ') elif name=='contributor': ##Pop contributor tag from the stack self.stack.pop() elif name=='revision': self.rev_count+=1 ##Store whether this is a redirect or stub page or not if len(self.rev_dict['text'])>0: if self.rev_dict['text'][0:9].upper()=='#REDIRECT': self.isRedirect='1' else: self.isRedirect='0' ## Takes from the first argument the threshold for stub's length if str(2*len(self.rev_dict['text']))<=self.options.stubth: self.isStub='1' else: self.isStub='0' ####CONSTRUCTION OF EXTENDED INSERTS FOR REVISIONS (STANDARD VERSION)###### ##Values order: (rev_id, rev_page, [[rev_text_id=rev_id]], rev_comment, ##rev_user, rev_user_text, rev_timestamp, rev_is_minor) # Build current row for revinsert try: newrevinsert="("+self.rev_dict['id']+","+self.page_dict['id']+","+self.rev_dict['id'] if self.rev_dict.has_key('comment'): newrevinsert+=","+'"'+self.rev_dict['comment'].replace("\\","\\\\").replace("'","\\'").replace('"', '\\"')+'"' else: newrevinsert+=",''" newrevinsert+=","+self.rev_dict['rev_user']+","+'"'+self.rev_dict['username'].\ replace("\\","\\\\").replace("'","\\'").replace('"', '\\"')+\ '"'+","+'"'+self.rev_dict['timestamp']+\ '"'+","+self.isMinor+")" # In case that any field is missing or flawed, skip this revision and log to standard error except (KeyError), e: self.printfile = codecs.open("error_"+self.options.database,'a','utf_8') self.printfile.write("Offending rev_dict was = \n") self.printfile.write(str(self.rev_dict)) self.printfile.write("\n") self.printfile.write("Offending page_dict was = \n") self.printfile.write(str(self.page_dict)) self.printfile.write("\n") self.printfile.write("====================================================\n") self.printfile.write(str(e)+"\n") self.printfile.write("====================================================\n\n") self.printfile.close() return if self.revinsertrows==0: #Always allow at least one row in extended inserts self.revinsert="INSERT INTO revision VALUES"+newrevinsert self.revinsertrows+=1 #Conservative approach: assuming 2 bytes per UTF-8 character self.revinsertsize=len(self.revinsert)*2 elif (self.revinsertsize+(2*len(newrevinsert))<=self.options.imaxsize*1024) and\ ((self.revinsertrows+1)<=self.options.imaxrows): #Append new row to self.revinsert self.revinsert+=","+newrevinsert self.revinsertrows+=1 #Conservative approach: assuming 2 bytes per UTF-8 character self.revinsertsize=len(self.revinsert)*2 else: #We must finish and write currrent insert and begin a new one if self.options.fileout: self.revinsert+=";\n" # Write output to SQL file self.revfile = codecs.open(self.options.revfile,'a','utf_8') self.revfile.write(revinsert) self.revfile.close() elif self.options.streamout: # DON'T WRITE SQL TO FILES, GENERATE ENCONDED SQL STREAM FOR MYSQL self.revinsert+=";" print self.revinsert.encode('utf-8') elif self.options.monitor: while 1: try: dbaccess.raw_query_SQL(self.acceso[1], self.revinsert.encode('utf-8')) except (Exception), e: print e else: break self.revinsert="INSERT INTO revision VALUES"+newrevinsert self.revinsertrows=1 #Conservative approach: assuming 2 bytes per UTF-8 character self.revinsertsize=len(self.revinsert)*2 ################################################## ##CONSTRUCTION OF EXTENDED INSERTS FOR TABLE TEXT ##Template for each row: ## (old_id, old_text, old_flags) newtextinsert="("+self.rev_dict['id']+','+'"'+\ self.rev_dict['text'].replace("\\","\\\\").replace("'","\\'").replace('"', '\\"')+\ '",'+'"utf8")' if self.textinsertrows==0: #Always allow at least one row in extended inserts self.textinsert="INSERT INTO text VALUES"+newtextinsert self.textinsertrows+=1 #Conservative approach: assuming 2 bytes per UTF-8 character self.textinsertsize=len(self.textinsert)*2 elif (self.textinsertsize+(2*len(newtextinsert))<=self.options.imaxsize*1024) and\ ((self.textinsertrows+1)<=self.options.imaxrows): #Append new row to self.revinsert self.textinsert+=","+newtextinsert self.textinsertrows+=1 #Conservative approach: assuming 2 bytes per UTF-8 character self.textinsertsize=len(self.textinsert)*2 else: #We must finish and write currrent insert and begin a new one if self.options.fileout: self.textinsert+=";\n" # Write output to SQL file self.textfile = codecs.open(self.options.textfile,'a','utf_8') self.textfile.write(textinsert) self.textfile.close() elif self.options.streamout: # DON'T WRITE SQL TO FILES, GENERATE ENCONDED SQL STREAM FOR MYSQL self.textinsert+=";" print self.textinsert.encode('utf-8') elif self.options.monitor: while 1: try: dbaccess.raw_query_SQL(self.acceso[1], self.textinsert.encode('utf-8')) except (Exception), e: print e else: break self.textinsert="INSERT INTO text VALUES"+newtextinsert self.textinsertrows=1 #Conservative approach: assuming 2 bytes per UTF-8 character self.textinsertsize=len(self.textinsert)*2 ################################################## ################################################## ##Store this rev_id to recall it when processing the following revision, if it exists self.prior_rev_id=self.rev_dict['id'] ##Store this rev_len to recall it for the current page_len, in case this is the last revision for that page self.last_page_len=2*len(self.rev_dict['text']) self.rev_dict.clear() self.stack.pop() self.isMinor='0' self.rev_num+=1 if self.options.verbose and self.options.log is None: # Display status report if self.rev_num % 1000 == 0: self.timeCheck=datetime.datetime.now() self.timeDelta=self.timeCheck-self.start if self.timeDelta.seconds==0: print >> sys.stderr, "page %d (%f pags. per sec.), revision %d (%f revs. per sec.)"\ % (self.page_num, 1e6*float(self.page_num)/self.timeDelta.microseconds,\ self.rev_num, 1e6*float(self.rev_num)/self.timeDelta.microseconds) else: print >> sys.stderr, "page %d (%f pags. per sec.), revision %d (%f revs. per sec.)"\ % (self.page_num, float(self.page_num)/self.timeDelta.seconds,\ self.rev_num, float(self.rev_num)/self.timeDelta.seconds) if self.options.verbose and self.options.log is not None: # TODO: Print report status to log file pass elif name=='page': ################################################ #We must write the las revinsert before finishing this page if self.options.fileout: self.revinsert+=";\n" # Write output to SQL file self.revfile = codecs.open(self.options.revfile,'a','utf_8') self.revfile.write(self.revinsert) self.revfile.close() elif self.options.streamout: # DON'T WRITE SQL TO FILES, GENERATE ENCONDED SQL STREAM FOR MYSQL self.revinsert+=";" print self.revinsert.encode('utf-8') elif self.options.monitor: while 1: try: dbaccess.raw_query_SQL(self.acceso[1], self.revinsert.encode('utf-8')) except (Exception), e: print e else: break #Reset status vars self.revinsertrows=0 self.revinsertsize=0 ################################################ ##Same for Insert into text table if self.options.fileout: self.textinsert+=";\n" # Write output to SQL file self.textfile = codecs.open(self.options.textfile,'a','utf_8') self.textfile.write(self.textinsert) self.textfile.close() elif self.options.streamout: # DON'T WRITE SQL TO FILES, GENERATE ENCONDED SQL STREAM FOR MYSQL self.textinsert+=";" print self.textinsert.encode('utf-8') elif self.options.monitor: while 1: try: dbaccess.raw_query_SQL(self.acceso[1], self.textinsert.encode('utf-8')) except (Exception), e: print e else: break #Reset status vars self.textinsertrows=0 self.textinsertsize=0 ################################################ ##Recovering namespace for this page if self.nspace_dict.has_key(self.page_dict['title'].split(':')[0]): self.page_dict['namespace']=self.nspace_dict[self.page_dict['title'].split(':')[0]] else: self.page_dict['namespace']='0' ################################################### #CONSTRUCTION OF EXTENDED INSERT FOR PAGES (STANDARD VERSION) ################################################### ##Values order for page (page_id, page_namespace, page_title, page_restrictions, ##page_counter[[unused]], ##page_is_redirect, page_is_new, page_random, page_touched[[default to '']], ##page_latest, page_len) newpageinsert="("+self.page_dict['id']+","+\ self.page_dict['namespace']+',"'+\ self.page_dict['title'].replace("\\","\\\\").replace("'","\\'").replace('"', '\\"')+'"' if self.page_dict.has_key('restrictions'): newpageinsert+=","+'"'+self.page_dict['restrictions']+'"' else: newpageinsert+=",''" newpageinsert+=","+'0'+","+self.isRedirect+"," if self.rev_count>1: newpageinsert+="1," else: newpageinsert+="0," newpageinsert+=str(random.random())+","+\ "''"+","+self.prior_rev_id+","+str(self.last_page_len) newpageinsert+=")" if self.pageinsertrows==0: self.pageinsert="INSERT INTO page VALUES"+newpageinsert self.pageinsertrows+=1 self.pageinsertsize=len(self.pageinsert)*2 elif (self.pageinsertsize+(2*len(newpageinsert))<=self.options.imaxsize*1024) and\ (self.pageinsertrows+1<=self.options.imaxrows): #Append current row to extended insert self.pageinsert+=","+newpageinsert self.pageinsertrows+=1 self.pageinsertsize=len(self.pageinsert)*2 else: #We must write this extended insert and begin a new one if self.options.fileout: #Write extended insert to file self.pageinsert+=";\n" self.pagefile = codecs.open(self.options.pagefile,'a','utf_8') self.pagefile.write(self.pageinsert) self.pagefile.close() elif self.options.streamout: #Write extended insert to sys.stdout (stream to MySQL) self.pageinsert+=";" print self.pageinsert.encode('utf-8') elif self.options.monitor: while 1: try: dbaccess.raw_query_SQL(self.acceso[1], self.pageinsert.encode('utf-8')) except (Exception), e: print e else: break self.pageinsert="INSERT INTO page VALUES"+newpageinsert self.pageinsertrows=1 self.pageinsertsize=len(self.pageinsert)*2 ##Clear temp variables for the next page self.page_dict.clear() self.prior_rev_id='NULL' self.last_page_len=0 self.rev_count=0 self.isRedirect='0' self.isStub='0' self.stack.pop() self.page_num += 1 else: ##General tag processing if len(self.stack)>0 and (self.stack[-1]=='revision' or self.stack[-1]=='contributor'): self.rev_dict[self.current_elem]=self.current_text elif len(self.stack)>0 and self.stack[-1]=='page': self.page_dict[self.current_elem]=self.current_text self.current_elem=None return def characters(self, ch): if self.current_elem != None: self.current_text = self.current_text + ch def endDocument(self): ################################################ #We must write the last pageinsert before finishing this dump if self.options.fileout: # Write output to SQL file self.pageinsert+=";\n" self.pagefile = codecs.open(self.options.pagefile,'a','utf_8') self.pagefile.write(self.pageinsert) self.pagefile.close() elif self.options.streamout: # DON'T WRITE SQL TO FILES, GENERATE ENCONDED SQL STREAM FOR MYSQL self.pageinsert+=";" print self.pageinsert.encode('utf-8') elif self.options.monitor: while 1: try: dbaccess.raw_query_SQL(self.acceso[1], self.pageinsert.encode('utf-8')) except (Exception), e: print e else: break #Reset status vars self.pageinsertrows=0 self.pageinsertsize=0 ########IF WE USE MONITOR MODE, CLOSE DB CONNECTION if self.options.monitor and not self.options.fileout and not self.options.streamout: dbaccess.close_Connection(self.acceso[1]) ################################################ #Checking out total time consumed and display end message self.timeCheck=datetime.datetime.now() self.timeDelta=self.timeCheck-self.start print >> sys.stderr, "\n" print >> sys.stderr, "File successfully parsed..." print >> sys.stderr, "page %d (%f pags./sec.), revision %d (%f revs./sec.)" % (self.page_num,\ float(self.page_num)/self.timeDelta.seconds, self.rev_num, float(self.rev_num)/self.timeDelta.seconds) ##Main zone if __name__ == '__main__': usage = "usage: %prog [options]" parserc = OptionParser(usage) parserc.add_option("-t","--stubth", dest="stubth", type="int", metavar="STUBTH", default=256, help="Max. size in bytes to consider an article as stub [default: %default]") parserc.add_option("--pagefile", dest="pagefile", default="page.sql", metavar="FILE", help="Name of the SQL file created for the page table [default: %default]") parserc.add_option("--revfile", dest="revfile", default="revision.sql", metavar="FILE", help="Name of the SQL file created for the revision table [default: %default]") parserc.add_option("--textfile", dest="textfile", default="text.sql", metavar="FILE", help="Name of the SQL file created for the text table [default: %default]") parserc.add_option("--skipnamespaces", dest="skipns", metavar="NAMESPACES", help="List of namespaces whose content will be ignored [comma separated values, without " "blanks; e.g. --skipnamespaces=name1,name2,name3]") parserc.add_option("-i","--inject", dest="inject", metavar="STRING", help="Optional string to inject at the very start of articles' text; string " "must be provided within quotes (e.g. --inject='my string') or double quotes") parserc.add_option("-f","--fileout", dest="fileout", action="store_true", default=False, help="Create SQL files from parsed XML dump") parserc.add_option("-s","--streamout", dest="streamout", action="store_true", default=False, help="Generate an output SQL stream suitable for a direct import into MySQL database") parserc.add_option("-m", "--monitor", dest="monitor", action="store_true", default=True, help="Insert SQL code directly into MySQL database [default]") parserc.add_option("-u", "--user", dest="user", metavar="MySQL_USER", help="Username to connect to MySQL database") parserc.add_option("-p", "--passwd", dest="passwd", metavar="MySQL_PASSWORD", help="Password for MySQL user to access the database") parserc.add_option("-d", "--database", dest="database", metavar="DBNAME", help="Name of the MySQL database") parserc.add_option("--port", dest="port", metavar="MySQL_SERVER_PORT", default=3306, type="int", help="Listening port of MySQL server") parserc.add_option("--machine", dest="machine", metavar="SERVER_NAME", default="localhost", help="Name of MySQL server") parserc.add_option("-v", "--verbose", action="store_true", dest="verbose", default=True, help="Display standard status reports about the parsing process [default]") parserc.add_option("-q", "--quiet", action="store_false", dest="verbose", help="Do not display any status reports") parserc.add_option("-l","--log", dest="log", metavar="LOGFILE", help="Store status reports in a log file; do not display them") parserc.add_option("--insertmaxsize", dest="imaxsize", metavar="MAXSIZE", type="int", default=156, help="Max size in KB of the MySQL extended inserts [default: %default] " "[max: 256]") parserc.add_option("--insertmaxnum", dest="imaxrows", metavar="MAXROWS", type="int", default=50000, help="Max number of individual rows allowed in the MySQL extended " "inserts [default: %default][max: 250000]") (options, args) = parserc.parse_args() if not options.verbose and options.log!=None: parserc.error("Error! Illegal combination: options -q and --log options are mutually exclusive") if options.monitor and not options.fileout and not options.streamout and (options.user==None or options.passwd==None or options.database==None): parserc.error("Error! You must provide user, password and database name to execute monitor mode") if options.imaxsize>256 or options.imaxsize<=0: parserc.error("Error! Illegal value: optional param --insertmaxsize must be between 1 and 256") if options.imaxrows>250000 or options.imaxrows<=0: parserc.error("Error! Illegal value: optinal param --insertmaxnum must be between 1 250000") # Adapt stdout to Unicode UTF-8 sys.stdout=codecs.EncodedFile(sys.stdout,'utf-8') # Create a parser parser = make_parser() # Tell the parser we are not interested in XML namespaces parser.setFeature(feature_namespaces, 0) # Create the downstream_handler using our class wh = wikiHandler(options) #Create de filter based in our parser and content handler filter_handler = text_normalize_filter(parser, wh) #Parse the XML dump filter_handler.parse(sys.stdin)