updates
This commit is contained in:
Executable
+629
@@ -0,0 +1,629 @@
|
||||
"""HTML handling.
|
||||
|
||||
Copyright 2003-2006 John J. Lee <jjl@pobox.com>
|
||||
|
||||
This code is free software; you can redistribute it and/or modify it under
|
||||
the terms of the BSD or ZPL 2.1 licenses (see the file COPYING.txt
|
||||
included with the distribution).
|
||||
|
||||
"""
|
||||
|
||||
import codecs
|
||||
import copy
|
||||
import htmlentitydefs
|
||||
import re
|
||||
|
||||
import _sgmllib_copy as sgmllib
|
||||
|
||||
import _beautifulsoup
|
||||
import _form
|
||||
from _headersutil import split_header_words, is_html as _is_html
|
||||
import _request
|
||||
import _rfc3986
|
||||
|
||||
DEFAULT_ENCODING = "latin-1"
|
||||
|
||||
COMPRESS_RE = re.compile(r"\s+")
|
||||
|
||||
|
||||
class CachingGeneratorFunction(object):
|
||||
"""Caching wrapper around a no-arguments iterable."""
|
||||
|
||||
def __init__(self, iterable):
|
||||
self._cache = []
|
||||
# wrap iterable to make it non-restartable (otherwise, repeated
|
||||
# __call__ would give incorrect results)
|
||||
self._iterator = iter(iterable)
|
||||
|
||||
def __call__(self):
|
||||
cache = self._cache
|
||||
for item in cache:
|
||||
yield item
|
||||
for item in self._iterator:
|
||||
cache.append(item)
|
||||
yield item
|
||||
|
||||
|
||||
class EncodingFinder:
|
||||
def __init__(self, default_encoding):
|
||||
self._default_encoding = default_encoding
|
||||
def encoding(self, response):
|
||||
# HTTPEquivProcessor may be in use, so both HTTP and HTTP-EQUIV
|
||||
# headers may be in the response. HTTP-EQUIV headers come last,
|
||||
# so try in order from first to last.
|
||||
for ct in response.info().getheaders("content-type"):
|
||||
for k, v in split_header_words([ct])[0]:
|
||||
if k == "charset":
|
||||
encoding = v
|
||||
try:
|
||||
codecs.lookup(v)
|
||||
except LookupError:
|
||||
continue
|
||||
else:
|
||||
return encoding
|
||||
return self._default_encoding
|
||||
|
||||
|
||||
class ResponseTypeFinder:
|
||||
def __init__(self, allow_xhtml):
|
||||
self._allow_xhtml = allow_xhtml
|
||||
def is_html(self, response, encoding):
|
||||
ct_hdrs = response.info().getheaders("content-type")
|
||||
url = response.geturl()
|
||||
# XXX encoding
|
||||
return _is_html(ct_hdrs, url, self._allow_xhtml)
|
||||
|
||||
|
||||
class Args(object):
|
||||
|
||||
# idea for this argument-processing trick is from Peter Otten
|
||||
|
||||
def __init__(self, args_map):
|
||||
self.__dict__["dictionary"] = dict(args_map)
|
||||
|
||||
def __getattr__(self, key):
|
||||
try:
|
||||
return self.dictionary[key]
|
||||
except KeyError:
|
||||
return getattr(self.__class__, key)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
if key == "dictionary":
|
||||
raise AttributeError()
|
||||
self.dictionary[key] = value
|
||||
|
||||
|
||||
def form_parser_args(
|
||||
select_default=False,
|
||||
form_parser_class=None,
|
||||
request_class=None,
|
||||
backwards_compat=False,
|
||||
):
|
||||
return Args(locals())
|
||||
|
||||
|
||||
class Link:
|
||||
def __init__(self, base_url, url, text, tag, attrs):
|
||||
assert None not in [url, tag, attrs]
|
||||
self.base_url = base_url
|
||||
self.absolute_url = _rfc3986.urljoin(base_url, url)
|
||||
self.url, self.text, self.tag, self.attrs = url, text, tag, attrs
|
||||
def __cmp__(self, other):
|
||||
try:
|
||||
for name in "url", "text", "tag", "attrs":
|
||||
if getattr(self, name) != getattr(other, name):
|
||||
return -1
|
||||
except AttributeError:
|
||||
return -1
|
||||
return 0
|
||||
def __repr__(self):
|
||||
return "Link(base_url=%r, url=%r, text=%r, tag=%r, attrs=%r)" % (
|
||||
self.base_url, self.url, self.text, self.tag, self.attrs)
|
||||
|
||||
|
||||
class LinksFactory:
|
||||
|
||||
def __init__(self,
|
||||
link_parser_class=None,
|
||||
link_class=Link,
|
||||
urltags=None,
|
||||
):
|
||||
import _pullparser
|
||||
if link_parser_class is None:
|
||||
link_parser_class = _pullparser.TolerantPullParser
|
||||
self.link_parser_class = link_parser_class
|
||||
self.link_class = link_class
|
||||
if urltags is None:
|
||||
urltags = {
|
||||
"a": "href",
|
||||
"area": "href",
|
||||
"frame": "src",
|
||||
"iframe": "src",
|
||||
}
|
||||
self.urltags = urltags
|
||||
self._response = None
|
||||
self._encoding = None
|
||||
|
||||
def set_response(self, response, base_url, encoding):
|
||||
self._response = response
|
||||
self._encoding = encoding
|
||||
self._base_url = base_url
|
||||
|
||||
def links(self):
|
||||
"""Return an iterator that provides links of the document."""
|
||||
response = self._response
|
||||
encoding = self._encoding
|
||||
base_url = self._base_url
|
||||
p = self.link_parser_class(response, encoding=encoding)
|
||||
|
||||
try:
|
||||
for token in p.tags(*(self.urltags.keys()+["base"])):
|
||||
if token.type == "endtag":
|
||||
continue
|
||||
if token.data == "base":
|
||||
base_href = dict(token.attrs).get("href")
|
||||
if base_href is not None:
|
||||
base_url = base_href
|
||||
continue
|
||||
attrs = dict(token.attrs)
|
||||
tag = token.data
|
||||
text = None
|
||||
# XXX use attr_encoding for ref'd doc if that doc does not
|
||||
# provide one by other means
|
||||
#attr_encoding = attrs.get("charset")
|
||||
url = attrs.get(self.urltags[tag]) # XXX is "" a valid URL?
|
||||
if not url:
|
||||
# Probably an <A NAME="blah"> link or <AREA NOHREF...>.
|
||||
# For our purposes a link is something with a URL, so
|
||||
# ignore this.
|
||||
continue
|
||||
|
||||
url = _rfc3986.clean_url(url, encoding)
|
||||
if tag == "a":
|
||||
if token.type != "startendtag":
|
||||
# hmm, this'd break if end tag is missing
|
||||
text = p.get_compressed_text(("endtag", tag))
|
||||
# but this doesn't work for e.g.
|
||||
# <a href="blah"><b>Andy</b></a>
|
||||
#text = p.get_compressed_text()
|
||||
|
||||
yield Link(base_url, url, text, tag, token.attrs)
|
||||
except sgmllib.SGMLParseError, exc:
|
||||
raise _form.ParseError(exc)
|
||||
|
||||
class FormsFactory:
|
||||
|
||||
"""Makes a sequence of objects satisfying HTMLForm interface.
|
||||
|
||||
After calling .forms(), the .global_form attribute is a form object
|
||||
containing all controls not a descendant of any FORM element.
|
||||
|
||||
For constructor argument docs, see ParseResponse argument docs.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
select_default=False,
|
||||
form_parser_class=None,
|
||||
request_class=None,
|
||||
backwards_compat=False,
|
||||
):
|
||||
self.select_default = select_default
|
||||
if form_parser_class is None:
|
||||
form_parser_class = _form.FormParser
|
||||
self.form_parser_class = form_parser_class
|
||||
if request_class is None:
|
||||
request_class = _request.Request
|
||||
self.request_class = request_class
|
||||
self.backwards_compat = backwards_compat
|
||||
self._response = None
|
||||
self.encoding = None
|
||||
self.global_form = None
|
||||
|
||||
def set_response(self, response, encoding):
|
||||
self._response = response
|
||||
self.encoding = encoding
|
||||
self.global_form = None
|
||||
|
||||
def forms(self):
|
||||
encoding = self.encoding
|
||||
forms = _form.ParseResponseEx(
|
||||
self._response,
|
||||
select_default=self.select_default,
|
||||
form_parser_class=self.form_parser_class,
|
||||
request_class=self.request_class,
|
||||
encoding=encoding,
|
||||
_urljoin=_rfc3986.urljoin,
|
||||
_urlparse=_rfc3986.urlsplit,
|
||||
_urlunparse=_rfc3986.urlunsplit,
|
||||
)
|
||||
self.global_form = forms[0]
|
||||
return forms[1:]
|
||||
|
||||
class TitleFactory:
|
||||
def __init__(self):
|
||||
self._response = self._encoding = None
|
||||
|
||||
def set_response(self, response, encoding):
|
||||
self._response = response
|
||||
self._encoding = encoding
|
||||
|
||||
def _get_title_text(self, parser):
|
||||
import _pullparser
|
||||
text = []
|
||||
tok = None
|
||||
while 1:
|
||||
try:
|
||||
tok = parser.get_token()
|
||||
except _pullparser.NoMoreTokensError:
|
||||
break
|
||||
if tok.type == "data":
|
||||
text.append(str(tok))
|
||||
elif tok.type == "entityref":
|
||||
t = unescape("&%s;" % tok.data,
|
||||
parser._entitydefs, parser.encoding)
|
||||
text.append(t)
|
||||
elif tok.type == "charref":
|
||||
t = unescape_charref(tok.data, parser.encoding)
|
||||
text.append(t)
|
||||
elif tok.type in ["starttag", "endtag", "startendtag"]:
|
||||
tag_name = tok.data
|
||||
if tok.type == "endtag" and tag_name == "title":
|
||||
break
|
||||
text.append(str(tok))
|
||||
return COMPRESS_RE.sub(" ", "".join(text).strip())
|
||||
|
||||
def title(self):
|
||||
import _pullparser
|
||||
p = _pullparser.TolerantPullParser(
|
||||
self._response, encoding=self._encoding)
|
||||
try:
|
||||
try:
|
||||
p.get_tag("title")
|
||||
except _pullparser.NoMoreTokensError:
|
||||
return None
|
||||
else:
|
||||
return self._get_title_text(p)
|
||||
except sgmllib.SGMLParseError, exc:
|
||||
raise _form.ParseError(exc)
|
||||
|
||||
|
||||
def unescape(data, entities, encoding):
|
||||
if data is None or "&" not in data:
|
||||
return data
|
||||
|
||||
def replace_entities(match):
|
||||
ent = match.group()
|
||||
if ent[1] == "#":
|
||||
return unescape_charref(ent[2:-1], encoding)
|
||||
|
||||
repl = entities.get(ent[1:-1])
|
||||
if repl is not None:
|
||||
repl = unichr(repl)
|
||||
if type(repl) != type(""):
|
||||
try:
|
||||
repl = repl.encode(encoding)
|
||||
except UnicodeError:
|
||||
repl = ent
|
||||
else:
|
||||
repl = ent
|
||||
return repl
|
||||
|
||||
return re.sub(r"&#?[A-Za-z0-9]+?;", replace_entities, data)
|
||||
|
||||
def unescape_charref(data, encoding):
|
||||
name, base = data, 10
|
||||
if name.startswith("x"):
|
||||
name, base= name[1:], 16
|
||||
uc = unichr(int(name, base))
|
||||
if encoding is None:
|
||||
return uc
|
||||
else:
|
||||
try:
|
||||
repl = uc.encode(encoding)
|
||||
except UnicodeError:
|
||||
repl = "&#%s;" % data
|
||||
return repl
|
||||
|
||||
|
||||
class MechanizeBs(_beautifulsoup.BeautifulSoup):
|
||||
_entitydefs = htmlentitydefs.name2codepoint
|
||||
# don't want the magic Microsoft-char workaround
|
||||
PARSER_MASSAGE = [(re.compile('(<[^<>]*)/>'),
|
||||
lambda(x):x.group(1) + ' />'),
|
||||
(re.compile('<!\s+([^<>]*)>'),
|
||||
lambda(x):'<!' + x.group(1) + '>')
|
||||
]
|
||||
|
||||
def __init__(self, encoding, text=None, avoidParserProblems=True,
|
||||
initialTextIsEverything=True):
|
||||
self._encoding = encoding
|
||||
_beautifulsoup.BeautifulSoup.__init__(
|
||||
self, text, avoidParserProblems, initialTextIsEverything)
|
||||
|
||||
def handle_charref(self, ref):
|
||||
t = unescape("&#%s;"%ref, self._entitydefs, self._encoding)
|
||||
self.handle_data(t)
|
||||
def handle_entityref(self, ref):
|
||||
t = unescape("&%s;"%ref, self._entitydefs, self._encoding)
|
||||
self.handle_data(t)
|
||||
def unescape_attrs(self, attrs):
|
||||
escaped_attrs = []
|
||||
for key, val in attrs:
|
||||
val = unescape(val, self._entitydefs, self._encoding)
|
||||
escaped_attrs.append((key, val))
|
||||
return escaped_attrs
|
||||
|
||||
class RobustLinksFactory:
|
||||
|
||||
compress_re = COMPRESS_RE
|
||||
|
||||
def __init__(self,
|
||||
link_parser_class=None,
|
||||
link_class=Link,
|
||||
urltags=None,
|
||||
):
|
||||
if link_parser_class is None:
|
||||
link_parser_class = MechanizeBs
|
||||
self.link_parser_class = link_parser_class
|
||||
self.link_class = link_class
|
||||
if urltags is None:
|
||||
urltags = {
|
||||
"a": "href",
|
||||
"area": "href",
|
||||
"frame": "src",
|
||||
"iframe": "src",
|
||||
}
|
||||
self.urltags = urltags
|
||||
self._bs = None
|
||||
self._encoding = None
|
||||
self._base_url = None
|
||||
|
||||
def set_soup(self, soup, base_url, encoding):
|
||||
self._bs = soup
|
||||
self._base_url = base_url
|
||||
self._encoding = encoding
|
||||
|
||||
def links(self):
|
||||
bs = self._bs
|
||||
base_url = self._base_url
|
||||
encoding = self._encoding
|
||||
for ch in bs.recursiveChildGenerator():
|
||||
if (isinstance(ch, _beautifulsoup.Tag) and
|
||||
ch.name in self.urltags.keys()+["base"]):
|
||||
link = ch
|
||||
attrs = bs.unescape_attrs(link.attrs)
|
||||
attrs_dict = dict(attrs)
|
||||
if link.name == "base":
|
||||
base_href = attrs_dict.get("href")
|
||||
if base_href is not None:
|
||||
base_url = base_href
|
||||
continue
|
||||
url_attr = self.urltags[link.name]
|
||||
url = attrs_dict.get(url_attr)
|
||||
if not url:
|
||||
continue
|
||||
url = _rfc3986.clean_url(url, encoding)
|
||||
text = link.fetchText(lambda t: True)
|
||||
if not text:
|
||||
# follow _pullparser's weird behaviour rigidly
|
||||
if link.name == "a":
|
||||
text = ""
|
||||
else:
|
||||
text = None
|
||||
else:
|
||||
text = self.compress_re.sub(" ", " ".join(text).strip())
|
||||
yield Link(base_url, url, text, link.name, attrs)
|
||||
|
||||
|
||||
class RobustFormsFactory(FormsFactory):
|
||||
def __init__(self, *args, **kwds):
|
||||
args = form_parser_args(*args, **kwds)
|
||||
if args.form_parser_class is None:
|
||||
args.form_parser_class = _form.RobustFormParser
|
||||
FormsFactory.__init__(self, **args.dictionary)
|
||||
|
||||
def set_response(self, response, encoding):
|
||||
self._response = response
|
||||
self.encoding = encoding
|
||||
|
||||
|
||||
class RobustTitleFactory:
|
||||
def __init__(self):
|
||||
self._bs = self._encoding = None
|
||||
|
||||
def set_soup(self, soup, encoding):
|
||||
self._bs = soup
|
||||
self._encoding = encoding
|
||||
|
||||
def title(self):
|
||||
title = self._bs.first("title")
|
||||
if title == _beautifulsoup.Null:
|
||||
return None
|
||||
else:
|
||||
inner_html = "".join([str(node) for node in title.contents])
|
||||
return COMPRESS_RE.sub(" ", inner_html.strip())
|
||||
|
||||
|
||||
class Factory:
|
||||
"""Factory for forms, links, etc.
|
||||
|
||||
This interface may expand in future.
|
||||
|
||||
Public methods:
|
||||
|
||||
set_request_class(request_class)
|
||||
set_response(response)
|
||||
forms()
|
||||
links()
|
||||
|
||||
Public attributes:
|
||||
|
||||
Note that accessing these attributes may raise ParseError.
|
||||
|
||||
encoding: string specifying the encoding of response if it contains a text
|
||||
document (this value is left unspecified for documents that do not have
|
||||
an encoding, e.g. an image file)
|
||||
is_html: true if response contains an HTML document (XHTML may be
|
||||
regarded as HTML too)
|
||||
title: page title, or None if no title or not HTML
|
||||
global_form: form object containing all controls that are not descendants
|
||||
of any FORM element, or None if the forms_factory does not support
|
||||
supplying a global form
|
||||
|
||||
"""
|
||||
|
||||
LAZY_ATTRS = ["encoding", "is_html", "title", "global_form"]
|
||||
|
||||
def __init__(self, forms_factory, links_factory, title_factory,
|
||||
encoding_finder=EncodingFinder(DEFAULT_ENCODING),
|
||||
response_type_finder=ResponseTypeFinder(allow_xhtml=False),
|
||||
):
|
||||
"""
|
||||
|
||||
Pass keyword arguments only.
|
||||
|
||||
default_encoding: character encoding to use if encoding cannot be
|
||||
determined (or guessed) from the response. You should turn on
|
||||
HTTP-EQUIV handling if you want the best chance of getting this right
|
||||
without resorting to this default. The default value of this
|
||||
parameter (currently latin-1) may change in future.
|
||||
|
||||
"""
|
||||
self._forms_factory = forms_factory
|
||||
self._links_factory = links_factory
|
||||
self._title_factory = title_factory
|
||||
self._encoding_finder = encoding_finder
|
||||
self._response_type_finder = response_type_finder
|
||||
|
||||
self.set_response(None)
|
||||
|
||||
def set_request_class(self, request_class):
|
||||
"""Set request class (mechanize.Request by default).
|
||||
|
||||
HTMLForm instances returned by .forms() will return instances of this
|
||||
class when .click()ed.
|
||||
|
||||
"""
|
||||
self._forms_factory.request_class = request_class
|
||||
|
||||
def set_response(self, response):
|
||||
"""Set response.
|
||||
|
||||
The response must either be None or implement the same interface as
|
||||
objects returned by mechanize.urlopen().
|
||||
|
||||
"""
|
||||
self._response = response
|
||||
self._forms_genf = self._links_genf = None
|
||||
self._get_title = None
|
||||
for name in self.LAZY_ATTRS:
|
||||
try:
|
||||
delattr(self, name)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def __getattr__(self, name):
|
||||
if name not in self.LAZY_ATTRS:
|
||||
return getattr(self.__class__, name)
|
||||
|
||||
if name == "encoding":
|
||||
self.encoding = self._encoding_finder.encoding(
|
||||
copy.copy(self._response))
|
||||
return self.encoding
|
||||
elif name == "is_html":
|
||||
self.is_html = self._response_type_finder.is_html(
|
||||
copy.copy(self._response), self.encoding)
|
||||
return self.is_html
|
||||
elif name == "title":
|
||||
if self.is_html:
|
||||
self.title = self._title_factory.title()
|
||||
else:
|
||||
self.title = None
|
||||
return self.title
|
||||
elif name == "global_form":
|
||||
self.forms()
|
||||
return self.global_form
|
||||
|
||||
def forms(self):
|
||||
"""Return iterable over HTMLForm-like objects.
|
||||
|
||||
Raises mechanize.ParseError on failure.
|
||||
"""
|
||||
# this implementation sets .global_form as a side-effect, for benefit
|
||||
# of __getattr__ impl
|
||||
if self._forms_genf is None:
|
||||
try:
|
||||
self._forms_genf = CachingGeneratorFunction(
|
||||
self._forms_factory.forms())
|
||||
except: # XXXX define exception!
|
||||
self.set_response(self._response)
|
||||
raise
|
||||
self.global_form = getattr(
|
||||
self._forms_factory, "global_form", None)
|
||||
return self._forms_genf()
|
||||
|
||||
def links(self):
|
||||
"""Return iterable over mechanize.Link-like objects.
|
||||
|
||||
Raises mechanize.ParseError on failure.
|
||||
"""
|
||||
if self._links_genf is None:
|
||||
try:
|
||||
self._links_genf = CachingGeneratorFunction(
|
||||
self._links_factory.links())
|
||||
except: # XXXX define exception!
|
||||
self.set_response(self._response)
|
||||
raise
|
||||
return self._links_genf()
|
||||
|
||||
class DefaultFactory(Factory):
|
||||
"""Based on sgmllib."""
|
||||
def __init__(self, i_want_broken_xhtml_support=False):
|
||||
Factory.__init__(
|
||||
self,
|
||||
forms_factory=FormsFactory(),
|
||||
links_factory=LinksFactory(),
|
||||
title_factory=TitleFactory(),
|
||||
response_type_finder=ResponseTypeFinder(
|
||||
allow_xhtml=i_want_broken_xhtml_support),
|
||||
)
|
||||
|
||||
def set_response(self, response):
|
||||
Factory.set_response(self, response)
|
||||
if response is not None:
|
||||
self._forms_factory.set_response(
|
||||
copy.copy(response), self.encoding)
|
||||
self._links_factory.set_response(
|
||||
copy.copy(response), response.geturl(), self.encoding)
|
||||
self._title_factory.set_response(
|
||||
copy.copy(response), self.encoding)
|
||||
|
||||
class RobustFactory(Factory):
|
||||
"""Based on BeautifulSoup, hopefully a bit more robust to bad HTML than is
|
||||
DefaultFactory.
|
||||
|
||||
"""
|
||||
def __init__(self, i_want_broken_xhtml_support=False,
|
||||
soup_class=None):
|
||||
Factory.__init__(
|
||||
self,
|
||||
forms_factory=RobustFormsFactory(),
|
||||
links_factory=RobustLinksFactory(),
|
||||
title_factory=RobustTitleFactory(),
|
||||
response_type_finder=ResponseTypeFinder(
|
||||
allow_xhtml=i_want_broken_xhtml_support),
|
||||
)
|
||||
if soup_class is None:
|
||||
soup_class = MechanizeBs
|
||||
self._soup_class = soup_class
|
||||
|
||||
def set_response(self, response):
|
||||
Factory.set_response(self, response)
|
||||
if response is not None:
|
||||
data = response.read()
|
||||
soup = self._soup_class(self.encoding, data)
|
||||
self._forms_factory.set_response(
|
||||
copy.copy(response), self.encoding)
|
||||
self._links_factory.set_soup(
|
||||
soup, response.geturl(), self.encoding)
|
||||
self._title_factory.set_soup(soup, self.encoding)
|
||||
Reference in New Issue
Block a user