# Copyright 2005-2008 Ka-Ping Yee
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Python module for web browsing and scraping.
Done:
- navigate to absolute and relative URLs
- follow links in page or region
- find strings or regular expressions: first, all, split
- find tags: first, last, next, previous, all, splittag
- find elements: first, last, next, previous, enclosing, all
- set form fields
- submit forms
- strip tags from arbitrary strings of HTML
- support HTTPS
- handle entities > 255 and Unicode documents
- accept and store cookies during redirection
- store and send cookies according to domain and path
- Python 3 support
To do:
- split by element
- detect ends of elements in most cases even if matching end tags are missing
- make the line breaks in striptags correctly reflect whitespace significance
- handle
- submit forms with file upload
- use Regions in striptags instead of duplicating work?
- remove dependency on urllib.urlencode
"""
__author__ = 'Ka-Ping Yee
'
__version__ = '2019-05-25'
# This module works with both Python 2 and Python 3.
try:
from urlparse import urlsplit, urljoin
except:
from urllib.parse import urlsplit, urljoin
try:
from htmlentitydefs import name2codepoint
except:
from html.entities import name2codepoint
try:
basestring
except:
basestring = str
try:
unicode
except:
unicode = str
try:
unichr
except:
unichr = chr
import sys, re
def regex(template, *params, **kwargs):
"""Compile a regular expression, substituting in any passed parameters
for placeholders of the form __0__, __1__, __2__, etc. in the template.
Specify the named argument 'flags' to set regular expression compilation
flags; by default, DOTALL is set ('.' matches anything including '\n')."""
flags = kwargs.get('flags', re.DOTALL)
for i, param in enumerate(params):
template = template.replace('__%d__' % i, param)
return re.compile(template, flags)
def iregex(template, *params, **kwargs):
"""Compile a regular expression, substituting in any passed parameters
for placeholders of the form __0__, __1__, __2__, etc. in the template.
Specify the named argument 'flags' to set regular expression compilation
flags; by default, DOTALL and IGNORECASE are set."""
kwargs['flags'] = kwargs.get('flags', 0) | re.IGNORECASE
return regex(template, *params, **kwargs)
class ScrapeError(Exception):
pass
def request(scheme, method, host, path, headers, data='', verbose=0):
"""Make an HTTP or HTTPS request; return the entire reply as a string."""
request = method + ' ' + path + ' HTTP/1.0\r\n'
for name, value in headers.items():
capname = '-'.join([part.capitalize() for part in name.split('-')])
request += capname + ': ' + str(value) + '\r\n'
request += '\r\n' + data
host, port = host.split('@')[-1], [80, 443][scheme == 'https']
if ':' in host:
host, port = host.split(':', 1)
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if verbose >= 3:
sys.stderr.write('connect: %s %s' % (host, port))
sock.connect((host, int(port)))
file = scheme == 'https' and socket.ssl(sock) or sock.makefile('rw')
if verbose >= 3:
sys.stderr.write(('\r\n' + request.rstrip()).replace(
'\r\n', '\nrequest: ').lstrip())
file.write(request)
if hasattr(file, 'flush'):
file.flush()
chunks = []
try:
while not (chunks and len(chunks[-1]) == 0):
chunks.append(file.read(16384))
except socket.error:
pass
return ''.join(chunks)
def shellquote(text):
"""Quote a string literal for /bin/sh."""
return "'" + text.replace("'", "'\\''") + "'"
def curl(url, data=None, agent=None, referrer=None, cookies=None, verbose=0):
"""Use curl to make a request; return the entire reply as a string."""
import os, tempfile
fd, tempname = tempfile.mkstemp(prefix='scrape')
command = 'curl --include --insecure --silent --max-redirs 0'
if data:
if not isinstance(data, str): # Unicode not allowed here
data = urlencode(data)
command += ' --data ' + shellquote(data)
if agent:
command += ' --user-agent ' + shellquote(agent)
if referrer:
command += ' --referer ' + shellquote(referrer)
if cookies:
command += ' --cookie ' + shellquote(cookies)
command += ' ' + shellquote(url)
if verbose >= 3:
sys.stderr.write('execute: ' + command)
os.system(command + ' > ' + tempname)
reply = open(tempname).read()
os.remove(tempname)
return reply
def getcookies(cookiejar, host, path):
"""Get a dictionary of the cookies from 'cookiejar' that apply to the
given request host and request path."""
cookies = {}
for cdomain in cookiejar:
if ('.' + host).endswith(cdomain):
for cpath in cookiejar[cdomain]:
if path.startswith(cpath):
for key, value in cookiejar[cdomain][cpath].items():
cookies[key] = value
return cookies
def setcookies(cookiejar, host, lines):
"""Store cookies in 'cookiejar' according to the given Set-Cookie
header lines."""
for line in lines:
pairs = [(part.strip().split('=', 1) + [''])[:2]
for part in line.split(';')]
(name, value), attrs = pairs[0], dict(pairs[1:])
cookiejar.setdefault(attrs.get('domain', host), {}
).setdefault(attrs.get('path', '/'), {})[name] = value
RAW = object() # This sentinel value for 'charset' means "don't decode".
def fetch(url, data='', agent=None, referrer=None, charset=None, verbose=0,
cookiejar={}):
"""Make an HTTP or HTTPS request. If 'data' is given, do a POST;
otherwise do a GET. If 'agent' and/or 'referrer' are given, include
them as User-Agent and Referer headers in the request, respectively.
'cookiejar' should have the form {domain: {path: {name: value, ...}}};
cookies will be sent from it and received cookies will be stored in it.
Return the 5-element tuple (url, status, message, headers, content)
where 'url' is the final URL retrieved, 'status' is the integer status
code, 'message' is the reply status message, 'headers' is a dictionary of
HTTP headers, and 'content' is a string containing the received content.
For multiple occurrences of the same header, 'headers' will contain a
single key-value pair where the values are joined together with newlines.
If the Content-Type header specifies a 'charset' parameter, 'content'
will be a Unicode string, decoded using the given charset. Giving the
'charset' argument overrides any received 'charset' parameter; a charset
of RAW ensures that the content is left undecoded in an 8-bit string."""
scheme, host, path, query, fragment = urlsplit(url)
host = host.split('@')[-1]
path = path or '/'
# Prepare the POST data.
method = data and 'POST' or 'GET'
if data and not isinstance(data, str): # Unicode not allowed here
data = urlencode(data)
# Get the cookies to send with this request.
cookieheader = '; '.join([
'%s=%s' % pair for pair in getcookies(cookiejar, host, path).items()])
# Make the HTTP or HTTPS request using Python or cURL.
if verbose:
sys.stderr.write('fetch: ' + url)
import socket
if scheme == 'http' or scheme == 'https' and hasattr(socket, 'ssl'):
if query:
path += '?' + query
headers = {'host': host, 'accept': '*/*'}
if data:
headers['content-type'] = 'application/x-www-form-urlencoded'
headers['content-length'] = len(data)
if agent:
headers['user-agent'] = agent
if referrer:
headers['referer'] = referrer
if cookieheader:
headers['cookie'] = cookieheader
reply = request(scheme, method, host, path, headers, data, verbose)
elif scheme == 'https':
reply = curl(url, data, agent, referrer, cookieheader, verbose)
else:
raise ValueError(scheme + ' not supported')
# Take apart the HTTP reply.
headers, head, content = {}, reply, ''
if '\r\n\r\n' in reply:
head, content = (reply.split('\r\n\r\n', 1) + [''])[:2]
else: # Non-conformant reply. Bummer!
match = re.search('\r?\n[ \t]*\r?\n', reply)
if match:
head, content = head[:match.start()], head[match.end():]
head = head.replace('\r\n', '\n').replace('\r', '\n')
response, head = head.split('\n', 1)
status = int(response.split()[1])
message = ' '.join(response.split()[2:])
for line in head.split('\n'):
if verbose >= 3:
sys.stderr.write('reply: ' + line.rstrip())
name, value = line.split(': ', 1)
name = name.lower()
if name in headers:
headers[name] += '\n' + value
else:
headers[name] = value
if verbose >= 2:
sys.stderr.write('content: %d byte%s\n' % (
len(content), content != 1 and 's' or ''))
# Store any received cookies.
if 'set-cookie' in headers:
setcookies(cookiejar, host, headers['set-cookie'].split('\n'))
# Handle the 'charset' parameter.
if 'content-type' in headers and not charset:
for param in headers['content-type'].split(';')[1:]:
if param.strip().startswith('charset='):
charset = param.strip()[8:]
break
if charset and charset is not RAW:
content = content.encode('latin-1').decode(charset, 'ignore')
return url, status, message, headers, content
class Session:
"""A Web-browsing session. Exposed attributes:
agent - the User-Agent string (clients can set this attribute)
url - the last successfully fetched URL
status - the status code of the last request
message - the status message of the last request
headers - the headers of the last request as a dictionary
content - the content of the last fetched document
doc - the Region spanning the last fetched document
"""
def __init__(self, agent=None, verbose=0):
"""Specify 'agent' to set the User-Agent. Set 'verbose' to 1, 2, or
3 to display status messages on stderr during document retrieval."""
self.agent = agent
self.url = self.status = self.message = self.content = self.doc = None
self.verbose = 0
self.headers = {}
self.cookiejar = {}
self.history = []
def go(self, url, data='', redirects=10, referrer=True, charset=None):
"""Navigate to a given URL. If the URL is relative, it is resolved
with respect to the current URL. If 'data' is provided, do a POST;
otherwise do a GET. Follow redirections up to 'redirects' times.
If 'referrer' is given, send it as the referrer; if 'referrer' is
True (default), send the current URL as the referrer; if 'referrer'
is a false value, send no referrer. If 'charset' is given, it
overrides any received 'charset' parameter; setting 'charset' to RAW
leaves the content undecoded in an 8-bit string. If the document is
successfully fetched, return a Region spanning the entire document.
Any relevant previously stored cookies will be included in the
request, and any received cookies will be stored for future use."""
historyentry = (self.url, self.status, self.message,
self.headers, self.content, self.doc)
url = self.resolve(url)
if referrer is True:
referrer = self.url
while 1:
self.url, self.status, self.message, self.headers, self.content = \
fetch(url, data, self.agent, referrer, charset, self.verbose,
self.cookiejar)
if redirects:
if self.status in [301, 302] and 'location' in self.headers:
url, data = urljoin(url, self.headers['location']), ''
redirects -= 1
continue
break
self.history.append(historyentry)
self.doc = Region(self.content)
return self.doc
def back(self):
"""Restore the state of this session before the previous request."""
(self.url, self.status, self.message,
self.headers, self.content, self.doc) = self.history.pop()
return self.url
def follow(self, anchor, region=None):
"""Find the first link that has the given anchor text, and follow it.
The anchor may be given as a string or a compiled RE. If 'region' is
specified, only that region is searched for a matching link, instead
of the whole document."""
link = (region or self.doc).first('a', content=anchor)
if not link:
raise ScrapeError('link %r not found' % anchor)
if not link.get('href', ''):
raise ScrapeError('link %r has no href' % link)
return self.go(link['href'])
def submit(self, region, paramdict=None, url=None, redirects=10, **params):
"""Submit a form, optionally by clicking a given button. The 'region'
argument can be the form itself or a button in the form to click.
Obtain the parameters to submit by (a) starting with the 'paramdict'
dictionary if specified, or the default parameter values as returned
by get_params; then (b) adding or replacing parameters in this
dictionary according to the keyword arguments. The 'url' argument
overrides the form's action attribute and submits the form elsewhere.
After submission, follow redirections up to 'redirects' times."""
form = region.tagname == 'form' and region or region.enclosing('form')
if not form:
raise ScrapeError('%r is not contained in a form' % region)
if paramdict is not None:
p = paramdict.copy()
else:
p = form.params
if 'name' in region:
p[region['name']] = region.get('value', '')
p.update(params)
method = form['method'].lower() or 'get'
url = url or form.get('action', self.url)
if method == 'get':
return self.go(url + '?' + urlencode(p), '', redirects)
elif method == 'post':
return self.go(url, p, redirects)
else:
raise ScrapeError('unknown form method %r' % method)
def resolve(self, url):
"""Resolve a URL with respect to the current location."""
if self.url and not (
url.startswith('http://') or url.startswith('https://')):
url = urljoin(self.url, url)
return url
def setcookie(self, cookieline):
"""Put a cookie in this session's cookie jar. 'cookieline' should
have the format "=; domain=; path="."""
scheme, host, path, query, fragment = urlsplit(self.url)
host = host.split('@')[-1].split(':')[0]
setcookies(self.cookiejar, host, [cookieline])
# This pattern has been carefully tuned, but re.search can still cause a
# stack overflow. Try re.search('(a|b)*', 'a'*10000), for example.
tagcontent_re = r'''(('[^']*'|"[^"]*"|--([^-]+|-[^-]+)*--|-(?!-)|[^'">-])*)'''
def tag_re(tagname_re):
return '<' + tagname_re + tagcontent_re + '>'
anytag_re = tag_re(r'(\?|!\w*|/?[a-zA-Z_:][\w:.-]*)')
tagpat = re.compile(anytag_re)
# This pattern matches a character entity reference (a decimal numeric
# references, a hexadecimal numeric reference, or a named reference).
charrefpat = re.compile(r'&(#(\d+|x[\da-fA-F]+)|[\w.:-]+);?')
def htmldecode(text):
"""Decode HTML entities in the given text."""
if type(text) is unicode:
uchr = unichr
else:
uchr = lambda value: value > 127 and unichr(value) or chr(value)
def entitydecode(match, uchr=uchr):
entity = match.group(1)
if entity.startswith('#x'):
return uchr(int(entity[2:], 16))
elif entity.startswith('#'):
return uchr(int(entity[1:]))
elif entity in name2codepoint:
return uchr(name2codepoint[entity])
else:
return match.group(0)
return charrefpat.sub(entitydecode, text)
def htmlencode(text):
"""Use HTML entities to encode special characters in the given text."""
text = text.replace('&', '&')
text = text.replace('"', '"')
text = text.replace('<', '<')
text = text.replace('>', '>')
return text
urlquoted = dict((chr(i), '%%%02X' % i) for i in range(256))
urlquoted.update(dict((c, c) for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' +
'abcdefghijklmnopqrstuvwxyz' +
'0123456789._-'))
def urlquote(text):
return ''.join(map(urlquoted.get, text))
def urlencode(params):
pairs = ['%s=%s' % (urlquote(key), urlquote(value).replace('%20', '+'))
for key, value in params.items()]
return '&'.join(pairs)
def no_groups(re):
return re.replace('(', '(?:').replace('(?:?', '(?')
tagsplitter = re.compile(no_groups(anytag_re))
parasplitter = re.compile(no_groups(tag_re('(p|table|form)')), re.I)
linesplitter = re.compile(no_groups(tag_re('(div|br|tr)')), re.I)
cdatapat = re.compile(r'<(!\s*--|style\b|script\b)', re.I)
endcdatapat = {'!': re.compile(r'--\s*>'),
'script': re.compile(r']*>', re.I),
'style': re.compile(r']*>', re.I)}
def striptags(html):
"""Strip HTML tags from the given string, yielding line breaks for DIV,
BR, or TR tags and blank lines for P, TABLE, or FORM tags."""
# Remove comments and elements with CDATA content (