KEMBAR78
# Copyright 2005-2008 Ka-Ping Yee # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Python module for web browsing and scraping. Done: - navigate to absolute and relative URLs - follow links in page or region - find strings or regular expressions: first, all, split - find tags: first, last, next, previous, all, splittag - find elements: first, last, next, previous, enclosing, all - set form fields - submit forms - strip tags from arbitrary strings of HTML - support HTTPS - handle entities > 255 and Unicode documents - accept and store cookies during redirection - store and send cookies according to domain and path - Python 3 support To do: - split by element - detect ends of elements in most cases even if matching end tags are missing - make the line breaks in striptags correctly reflect whitespace significance - handle - submit forms with file upload - use Regions in striptags instead of duplicating work? - remove dependency on urllib.urlencode """ __author__ = 'Ka-Ping Yee ' __version__ = '2019-05-25' # This module works with both Python 2 and Python 3. try: from urlparse import urlsplit, urljoin except: from urllib.parse import urlsplit, urljoin try: from htmlentitydefs import name2codepoint except: from html.entities import name2codepoint try: basestring except: basestring = str try: unicode except: unicode = str try: unichr except: unichr = chr import sys, re def regex(template, *params, **kwargs): """Compile a regular expression, substituting in any passed parameters for placeholders of the form __0__, __1__, __2__, etc. in the template. Specify the named argument 'flags' to set regular expression compilation flags; by default, DOTALL is set ('.' matches anything including '\n').""" flags = kwargs.get('flags', re.DOTALL) for i, param in enumerate(params): template = template.replace('__%d__' % i, param) return re.compile(template, flags) def iregex(template, *params, **kwargs): """Compile a regular expression, substituting in any passed parameters for placeholders of the form __0__, __1__, __2__, etc. in the template. Specify the named argument 'flags' to set regular expression compilation flags; by default, DOTALL and IGNORECASE are set.""" kwargs['flags'] = kwargs.get('flags', 0) | re.IGNORECASE return regex(template, *params, **kwargs) class ScrapeError(Exception): pass def request(scheme, method, host, path, headers, data='', verbose=0): """Make an HTTP or HTTPS request; return the entire reply as a string.""" request = method + ' ' + path + ' HTTP/1.0\r\n' for name, value in headers.items(): capname = '-'.join([part.capitalize() for part in name.split('-')]) request += capname + ': ' + str(value) + '\r\n' request += '\r\n' + data host, port = host.split('@')[-1], [80, 443][scheme == 'https'] if ':' in host: host, port = host.split(':', 1) import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if verbose >= 3: sys.stderr.write('connect: %s %s' % (host, port)) sock.connect((host, int(port))) file = scheme == 'https' and socket.ssl(sock) or sock.makefile('rw') if verbose >= 3: sys.stderr.write(('\r\n' + request.rstrip()).replace( '\r\n', '\nrequest: ').lstrip()) file.write(request) if hasattr(file, 'flush'): file.flush() chunks = [] try: while not (chunks and len(chunks[-1]) == 0): chunks.append(file.read(16384)) except socket.error: pass return ''.join(chunks) def shellquote(text): """Quote a string literal for /bin/sh.""" return "'" + text.replace("'", "'\\''") + "'" def curl(url, data=None, agent=None, referrer=None, cookies=None, verbose=0): """Use curl to make a request; return the entire reply as a string.""" import os, tempfile fd, tempname = tempfile.mkstemp(prefix='scrape') command = 'curl --include --insecure --silent --max-redirs 0' if data: if not isinstance(data, str): # Unicode not allowed here data = urlencode(data) command += ' --data ' + shellquote(data) if agent: command += ' --user-agent ' + shellquote(agent) if referrer: command += ' --referer ' + shellquote(referrer) if cookies: command += ' --cookie ' + shellquote(cookies) command += ' ' + shellquote(url) if verbose >= 3: sys.stderr.write('execute: ' + command) os.system(command + ' > ' + tempname) reply = open(tempname).read() os.remove(tempname) return reply def getcookies(cookiejar, host, path): """Get a dictionary of the cookies from 'cookiejar' that apply to the given request host and request path.""" cookies = {} for cdomain in cookiejar: if ('.' + host).endswith(cdomain): for cpath in cookiejar[cdomain]: if path.startswith(cpath): for key, value in cookiejar[cdomain][cpath].items(): cookies[key] = value return cookies def setcookies(cookiejar, host, lines): """Store cookies in 'cookiejar' according to the given Set-Cookie header lines.""" for line in lines: pairs = [(part.strip().split('=', 1) + [''])[:2] for part in line.split(';')] (name, value), attrs = pairs[0], dict(pairs[1:]) cookiejar.setdefault(attrs.get('domain', host), {} ).setdefault(attrs.get('path', '/'), {})[name] = value RAW = object() # This sentinel value for 'charset' means "don't decode". def fetch(url, data='', agent=None, referrer=None, charset=None, verbose=0, cookiejar={}): """Make an HTTP or HTTPS request. If 'data' is given, do a POST; otherwise do a GET. If 'agent' and/or 'referrer' are given, include them as User-Agent and Referer headers in the request, respectively. 'cookiejar' should have the form {domain: {path: {name: value, ...}}}; cookies will be sent from it and received cookies will be stored in it. Return the 5-element tuple (url, status, message, headers, content) where 'url' is the final URL retrieved, 'status' is the integer status code, 'message' is the reply status message, 'headers' is a dictionary of HTTP headers, and 'content' is a string containing the received content. For multiple occurrences of the same header, 'headers' will contain a single key-value pair where the values are joined together with newlines. If the Content-Type header specifies a 'charset' parameter, 'content' will be a Unicode string, decoded using the given charset. Giving the 'charset' argument overrides any received 'charset' parameter; a charset of RAW ensures that the content is left undecoded in an 8-bit string.""" scheme, host, path, query, fragment = urlsplit(url) host = host.split('@')[-1] path = path or '/' # Prepare the POST data. method = data and 'POST' or 'GET' if data and not isinstance(data, str): # Unicode not allowed here data = urlencode(data) # Get the cookies to send with this request. cookieheader = '; '.join([ '%s=%s' % pair for pair in getcookies(cookiejar, host, path).items()]) # Make the HTTP or HTTPS request using Python or cURL. if verbose: sys.stderr.write('fetch: ' + url) import socket if scheme == 'http' or scheme == 'https' and hasattr(socket, 'ssl'): if query: path += '?' + query headers = {'host': host, 'accept': '*/*'} if data: headers['content-type'] = 'application/x-www-form-urlencoded' headers['content-length'] = len(data) if agent: headers['user-agent'] = agent if referrer: headers['referer'] = referrer if cookieheader: headers['cookie'] = cookieheader reply = request(scheme, method, host, path, headers, data, verbose) elif scheme == 'https': reply = curl(url, data, agent, referrer, cookieheader, verbose) else: raise ValueError(scheme + ' not supported') # Take apart the HTTP reply. headers, head, content = {}, reply, '' if '\r\n\r\n' in reply: head, content = (reply.split('\r\n\r\n', 1) + [''])[:2] else: # Non-conformant reply. Bummer! match = re.search('\r?\n[ \t]*\r?\n', reply) if match: head, content = head[:match.start()], head[match.end():] head = head.replace('\r\n', '\n').replace('\r', '\n') response, head = head.split('\n', 1) status = int(response.split()[1]) message = ' '.join(response.split()[2:]) for line in head.split('\n'): if verbose >= 3: sys.stderr.write('reply: ' + line.rstrip()) name, value = line.split(': ', 1) name = name.lower() if name in headers: headers[name] += '\n' + value else: headers[name] = value if verbose >= 2: sys.stderr.write('content: %d byte%s\n' % ( len(content), content != 1 and 's' or '')) # Store any received cookies. if 'set-cookie' in headers: setcookies(cookiejar, host, headers['set-cookie'].split('\n')) # Handle the 'charset' parameter. if 'content-type' in headers and not charset: for param in headers['content-type'].split(';')[1:]: if param.strip().startswith('charset='): charset = param.strip()[8:] break if charset and charset is not RAW: content = content.encode('latin-1').decode(charset, 'ignore') return url, status, message, headers, content class Session: """A Web-browsing session. Exposed attributes: agent - the User-Agent string (clients can set this attribute) url - the last successfully fetched URL status - the status code of the last request message - the status message of the last request headers - the headers of the last request as a dictionary content - the content of the last fetched document doc - the Region spanning the last fetched document """ def __init__(self, agent=None, verbose=0): """Specify 'agent' to set the User-Agent. Set 'verbose' to 1, 2, or 3 to display status messages on stderr during document retrieval.""" self.agent = agent self.url = self.status = self.message = self.content = self.doc = None self.verbose = 0 self.headers = {} self.cookiejar = {} self.history = [] def go(self, url, data='', redirects=10, referrer=True, charset=None): """Navigate to a given URL. If the URL is relative, it is resolved with respect to the current URL. If 'data' is provided, do a POST; otherwise do a GET. Follow redirections up to 'redirects' times. If 'referrer' is given, send it as the referrer; if 'referrer' is True (default), send the current URL as the referrer; if 'referrer' is a false value, send no referrer. If 'charset' is given, it overrides any received 'charset' parameter; setting 'charset' to RAW leaves the content undecoded in an 8-bit string. If the document is successfully fetched, return a Region spanning the entire document. Any relevant previously stored cookies will be included in the request, and any received cookies will be stored for future use.""" historyentry = (self.url, self.status, self.message, self.headers, self.content, self.doc) url = self.resolve(url) if referrer is True: referrer = self.url while 1: self.url, self.status, self.message, self.headers, self.content = \ fetch(url, data, self.agent, referrer, charset, self.verbose, self.cookiejar) if redirects: if self.status in [301, 302] and 'location' in self.headers: url, data = urljoin(url, self.headers['location']), '' redirects -= 1 continue break self.history.append(historyentry) self.doc = Region(self.content) return self.doc def back(self): """Restore the state of this session before the previous request.""" (self.url, self.status, self.message, self.headers, self.content, self.doc) = self.history.pop() return self.url def follow(self, anchor, region=None): """Find the first link that has the given anchor text, and follow it. The anchor may be given as a string or a compiled RE. If 'region' is specified, only that region is searched for a matching link, instead of the whole document.""" link = (region or self.doc).first('a', content=anchor) if not link: raise ScrapeError('link %r not found' % anchor) if not link.get('href', ''): raise ScrapeError('link %r has no href' % link) return self.go(link['href']) def submit(self, region, paramdict=None, url=None, redirects=10, **params): """Submit a form, optionally by clicking a given button. The 'region' argument can be the form itself or a button in the form to click. Obtain the parameters to submit by (a) starting with the 'paramdict' dictionary if specified, or the default parameter values as returned by get_params; then (b) adding or replacing parameters in this dictionary according to the keyword arguments. The 'url' argument overrides the form's action attribute and submits the form elsewhere. After submission, follow redirections up to 'redirects' times.""" form = region.tagname == 'form' and region or region.enclosing('form') if not form: raise ScrapeError('%r is not contained in a form' % region) if paramdict is not None: p = paramdict.copy() else: p = form.params if 'name' in region: p[region['name']] = region.get('value', '') p.update(params) method = form['method'].lower() or 'get' url = url or form.get('action', self.url) if method == 'get': return self.go(url + '?' + urlencode(p), '', redirects) elif method == 'post': return self.go(url, p, redirects) else: raise ScrapeError('unknown form method %r' % method) def resolve(self, url): """Resolve a URL with respect to the current location.""" if self.url and not ( url.startswith('http://') or url.startswith('https://')): url = urljoin(self.url, url) return url def setcookie(self, cookieline): """Put a cookie in this session's cookie jar. 'cookieline' should have the format "=; domain=; path=".""" scheme, host, path, query, fragment = urlsplit(self.url) host = host.split('@')[-1].split(':')[0] setcookies(self.cookiejar, host, [cookieline]) # This pattern has been carefully tuned, but re.search can still cause a # stack overflow. Try re.search('(a|b)*', 'a'*10000), for example. tagcontent_re = r'''(('[^']*'|"[^"]*"|--([^-]+|-[^-]+)*--|-(?!-)|[^'">-])*)''' def tag_re(tagname_re): return '<' + tagname_re + tagcontent_re + '>' anytag_re = tag_re(r'(\?|!\w*|/?[a-zA-Z_:][\w:.-]*)') tagpat = re.compile(anytag_re) # This pattern matches a character entity reference (a decimal numeric # references, a hexadecimal numeric reference, or a named reference). charrefpat = re.compile(r'&(#(\d+|x[\da-fA-F]+)|[\w.:-]+);?') def htmldecode(text): """Decode HTML entities in the given text.""" if type(text) is unicode: uchr = unichr else: uchr = lambda value: value > 127 and unichr(value) or chr(value) def entitydecode(match, uchr=uchr): entity = match.group(1) if entity.startswith('#x'): return uchr(int(entity[2:], 16)) elif entity.startswith('#'): return uchr(int(entity[1:])) elif entity in name2codepoint: return uchr(name2codepoint[entity]) else: return match.group(0) return charrefpat.sub(entitydecode, text) def htmlencode(text): """Use HTML entities to encode special characters in the given text.""" text = text.replace('&', '&') text = text.replace('"', '"') text = text.replace('<', '<') text = text.replace('>', '>') return text urlquoted = dict((chr(i), '%%%02X' % i) for i in range(256)) urlquoted.update(dict((c, c) for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' + 'abcdefghijklmnopqrstuvwxyz' + '0123456789._-')) def urlquote(text): return ''.join(map(urlquoted.get, text)) def urlencode(params): pairs = ['%s=%s' % (urlquote(key), urlquote(value).replace('%20', '+')) for key, value in params.items()] return '&'.join(pairs) def no_groups(re): return re.replace('(', '(?:').replace('(?:?', '(?') tagsplitter = re.compile(no_groups(anytag_re)) parasplitter = re.compile(no_groups(tag_re('(p|table|form)')), re.I) linesplitter = re.compile(no_groups(tag_re('(div|br|tr)')), re.I) cdatapat = re.compile(r'<(!\s*--|style\b|script\b)', re.I) endcdatapat = {'!': re.compile(r'--\s*>'), 'script': re.compile(r']*>', re.I), 'style': re.compile(r']*>', re.I)} def striptags(html): """Strip HTML tags from the given string, yielding line breaks for DIV, BR, or TR tags and blank lines for P, TABLE, or FORM tags.""" # Remove comments and elements with CDATA content (