User:DRBot/source/mwclient.py
< User:DRBot | source
import urllib, urllib2, urlparse import cookielib, random from htmlentitydefs import name2codepoint from HTMLParser import HTMLParser import time __ver__ = '0.3.1u' class MediawikiError(StandardError): pass class Request(urllib2.Request): def __init__(self, url, data=None, headers={}, origin_req_host=None, unverifiable=False): urllib2.Request.__init__(self, url, data, headers, origin_req_host, unverifiable) self.add_header('User-Agent', 'MwClient-' + __ver__) class PostRequest(Request): def __init__(self, url, data = None): Request.__init__(self, url) self.add_header('Content-type', 'application/x-www-form-urlencoded; charset=UTF-8') self.add_data(data) def add_data(self, data): if data == None: return if type(data) == str: return Request.add_data(self, data) raw = [] for k, v in data.iteritems(): raw.append(urllib.quote(k) + '=' + urllib.quote(v)) return Request.add_data(self, '&'.join(raw)) def Open(url): return urllib2.urlopen(Request(url)) def encode_multipart(fields, files): boundary = '----%s----' % ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for i in xrange(32))) res = [] for k, v in fields: res.append('--' + boundary) res.append('Content-Disposition: form-data; name="%s"' % k) res.append('') res.append(v.encode('utf-8')) for k, v, f in files: res.append('--' + boundary) res.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (k, v.encode('utf-8'))) res.append('Content-Type: application/octet-stream') res.append('') res.append(f) res.append('--%s--' % boundary) res.append('') return boundary, '\r\n'.join(res) class Session(object): def __init__(self, baseuri = None, wikicode = None, username = None, password = None): self.base = baseuri self.wikicode = wikicode self.username = username self.password = password self.cookie = cookielib.CookieJar() self.login() setoptions = __init__ def login(self): req = Request(self.base + '?title=Special:Userlogin') self.cookie.extract_cookies(urllib2.urlopen(req), req) req = PostRequest(self.base + '?title=Special:Userlogin&action=submitlogin&type=login') req.add_data({ \ 'wpName' : self.username, 'wpPassword' : self.password, 'wpRemember' : '1', 'wpLoginattempt': 'Log in', }) self.cookie.add_cookie_header(req) res = urllib2.urlopen(req) self.cookie.extract_cookies(res, req) if not self.checklogin(): raise MediawikiError, 'Login failed!' def checklogin(self): return sum((c.name == self.wikicode + 'UserID' for c in self.cookie)) def open(self, title, raw = False): if raw: req = Request(title) else: req = Request(self.base + '?title=' + title) self.cookie.add_cookie_header(req) try: res = urllib2.urlopen(req) except urllib2.HTTPError, e: if e.code in (500, 502, 503, 504): time.sleep(10) res = urllib2.urlopen(req) else: raise self.cookie.extract_cookies(res, req) if not self.checklogin(): self.login() return self.open(title) return res def post_raw(self, action, data): req = PostRequest('://'.join(urlparse.urlparse(self.base)[:2]) + action, data) self.cookie.add_cookie_header(req) res = urllib2.urlopen(req) self.cookie.extract_cookies(res, req) if not self.checklogin(): self.login() return self.post_raw(action, data) return res def upload(self, fo, filename, description, license = '', ignore = False): post = {} post['wpDestFile'] = filename post['wpUploadDescription'] = description post['wpLicense'] = license if ignore: post['wpIgnoreWarning'] = 'true' post['wpUpload'] = 'Upload file' post['wpSourceType'] = 'file' boundary, data = encode_multipart(post.iteritems(), (('wpUploadFile', filename, fo.read()),)) req = PostRequest(self.base + '?title=Special:Upload', data) req.add_header('Content-Type', 'multipart/form-data; boundary=' + boundary) self.cookie.add_cookie_header(req) res = urllib2.urlopen(req) self.cookie.extract_cookies(res, req) if not self.checklogin(): self.login() fo.seek(0) return self.upload(fo, filename, description, license, ignore) return res class Page(HTMLParser): def __init__(self, url = None, session = None, section = ''): HTMLParser.__init__(self) self.in_form = False self.in_text = False self.data = {} self.textdata = [] self.session = session self.section = section if session and url: u = session.open(urllib.quote(url.encode('utf-8')) + '&action=edit§ion=' + section) else: u = Open(url) if url: self.raw = u.read().decode('utf-8', 'ignore') self.feed(self.raw) def handle_starttag(self, tag, attrs): if tag == 'form' and (u'id', u'editform') in attrs: attrs = dict(attrs) self.in_form = True self.action = attrs['action'] if tag == 'input' and self.in_form and (u'type', u'submit') not in attrs: attrs = dict(attrs) if u'name' in attrs: self.data[attrs[u'name']] = attrs.get(u'value', u'') self.in_text = self.in_form and tag == 'textarea' def handle_endtag(self, tag): if self.in_form and tag == 'form': self.in_form = False self.in_text = self.in_text and tag == 'textarea' def handle_data(self, data): if self.in_text: self.textdata.append(data) def handle_entityref(self, name): if name in name2codepoint: self.handle_data(unichr(name2codepoint[name])) else: self.handle_data(u'&%s;' % name) def handle_charref(name): try: self.handle_data(unichr(int(name))) except ValueError: self.handle_data(u'&#$s;' % name) def __str__(self): return u''.join(self.textdata) def edit(self, data, summary = u''): self.data['wpTextbox1'] = data self.data['wpSummary'] = summary self.data['wpSave'] = 'Save page' e = self.action.encode('utf-8') + '§ion=' + self.section, '&'.join((urllib.quote(k.encode('utf-8')) + '=' + urllib.quote(v.encode('utf-8')) \ for k, v in self.data.iteritems()))# if v)) if self.session: return self.session.post_raw(*e), e else: return e def log(data): pass