"""Convenient HTTP UserAgent class.
This is a subclass of urllib2.OpenerDirector.
Copyright 2003-2006 John J. Lee <jjl@pobox.com>
This code is free software; you can redistribute it and/or modify it under
the terms of the BSD or ZPL 2.1 licenses (see the file LICENSE
included with the distribution).
"""
from __future__ import absolute_import
import copy
from . import _auth, _gzip, _opener, _response, _sockettimeout, _urllib2
from .polyglot import iteritems, itervalues
class UserAgentBase(_opener.OpenerDirector):
"""Convenient user-agent class.
Do not use .add_handler() to add a handler for something already dealt with
by this code.
The only reason at present for the distinction between UserAgent and
UserAgentBase is so that classes that depend on .seek()able responses
(e.g. mechanize.Browser) can inherit from UserAgentBase. The subclass
UserAgent exposes a .set_seekable_responses() method that allows switching
off the adding of a .seek() method to responses.
Public attributes:
addheaders: list of (name, value) pairs specifying headers to send with
every request, unless they are overridden in the Request instance.
>>> ua = UserAgentBase()
>>> ua.addheaders = [
... ("User-agent", "Mozilla/5.0 (compatible)"),
... ("From", "responsible.person@example.com")]
"""
handler_classes = {
# scheme handlers
"http": _urllib2.HTTPHandler,
# CacheFTPHandler is buggy, at least in 2.3, so we don't use it
"ftp": _urllib2.FTPHandler,
"file": _urllib2.FileHandler,
# other handlers
"_unknown": _urllib2.UnknownHandler,
# HTTP{S,}Handler depend on HTTPErrorProcessor too
"_http_error": _urllib2.HTTPErrorProcessor,
"_http_default_error": _urllib2.HTTPDefaultErrorHandler,
# feature handlers
"_basicauth": _urllib2.HTTPBasicAuthHandler,
"_digestauth": _urllib2.HTTPDigestAuthHandler,
"_redirect": _urllib2.HTTPRedirectHandler,
"_cookies": _urllib2.HTTPCookieProcessor,
"_refresh": _urllib2.HTTPRefreshProcessor,
"_equiv": _urllib2.HTTPEquivProcessor,
"_proxy": _urllib2.ProxyHandler,
"_proxy_basicauth": _urllib2.ProxyBasicAuthHandler,
"_proxy_digestauth": _urllib2.ProxyDigestAuthHandler,
"_robots": _urllib2.HTTPRobotRulesProcessor,
"_gzip": _gzip.HTTPGzipProcessor,
# debug handlers
"_debug_redirect": _urllib2.HTTPRedirectDebugProcessor,
"_debug_response_body": _urllib2.HTTPResponseDebugProcessor,
}
default_schemes = ["http", "ftp", "file"]
default_others = ["_unknown", "_http_error", "_http_default_error"]
default_features = [
"_gzip",
"_redirect",
"_cookies",
"_refresh",
"_equiv",
"_basicauth",
"_digestauth",
"_proxy",
"_proxy_basicauth",
"_proxy_digestauth",
"_robots",
]
if hasattr(_urllib2, 'HTTPSHandler'):
handler_classes["https"] = _urllib2.HTTPSHandler
default_schemes.append("https")
def __init__(self):
_opener.OpenerDirector.__init__(self)
ua_handlers = self._ua_handlers = {}
for scheme in (self.default_schemes + self.default_others +
self.default_features):
klass = self.handler_classes[scheme]
ua_handlers[scheme] = klass()
for handler in tuple(itervalues(ua_handlers)):
self.add_handler(handler)
# Yuck.
# Ensure correct default constructor args were passed to
# HTTPRefreshProcessor and HTTPEquivProcessor.
if "_refresh" in ua_handlers:
self.set_handle_refresh(True)
if "_equiv" in ua_handlers:
self.set_handle_equiv(True)
# Ensure default password managers are installed.
pm = ppm = None
if "_basicauth" in ua_handlers or "_digestauth" in ua_handlers:
pm = _urllib2.HTTPPasswordMgrWithDefaultRealm()
if ("_proxy_basicauth" in ua_handlers or
"_proxy_digestauth" in ua_handlers):
ppm = _auth.HTTPProxyPasswordMgr()
self.set_password_manager(pm)
self.set_proxy_password_manager(ppm)
# set default certificate manager
if "https" in ua_handlers:
cm = _urllib2.HTTPSClientCertMgr()
self.set_client_cert_manager(cm)
def close(self):
_opener.OpenerDirector.close(self)
self._ua_handlers = None
# XXX
# def set_timeout(self, timeout):
# self._timeout = timeout
# def set_http_connection_cache(self, conn_cache):
# self._http_conn_cache = conn_cache
# def set_ftp_connection_cache(self, conn_cache):
# XXX ATM, FTP has cache as part of handler; should it be separate?
# self._ftp_conn_cache = conn_cache
def set_handled_schemes(self, schemes):
"""Set sequence of URL scheme (protocol) strings.
For example: ua.set_handled_schemes(["http", "ftp"])
If this fails (with ValueError) because you've passed an unknown
scheme, the set of handled schemes will not be changed.
"""
want = {}
for scheme in schemes:
if scheme.startswith("_"):
raise ValueError("not a scheme '%s'" % scheme)
if scheme not in self.handler_classes:
raise ValueError("unknown scheme '%s'")
want[scheme] = None
# get rid of scheme handlers we don't want
for scheme, oldhandler in tuple(iteritems(self._ua_handlers)):
if scheme.startswith("_"):
continue # not a scheme handler
if scheme not in want:
self._replace_handler(scheme, None)
else:
del want[scheme] # already got it
# add the scheme handlers that are missing
for scheme in want:
self._set_handler(scheme, True)
def set_cookiejar(self, cookiejar):
"""Set a mechanize.CookieJar, or None."""
self._set_handler("_cookies", obj=cookiejar)
# XXX could use Greg Stein's httpx for some of this instead?
# or httplib2??
def set_proxies(self, proxies=None, proxy_bypass=None):
"""Configure proxy settings.
:arg proxies: dictionary mapping URL scheme to proxy specification.
None means use the default system-specific settings.
:arg proxy_bypass: function taking hostname, returning whether proxy
should be used. None means use the default system-specific settings.
The default is to try to obtain proxy settings from the system (see the
documentation for urllib.urlopen for information about the
system-specific methods used -- note that's urllib, not urllib2).
To avoid all use of proxies, pass an empty proxies dict.
>>> ua = UserAgentBase()
>>> def proxy_bypass(hostname):
... return hostname == "noproxy.com"
>>> ua.set_proxies(
... {"http": "joe:password@myproxy.example.com:3128",
... "ftp": "proxy.example.com"},
... proxy_bypass)
"""
self._set_handler(
"_proxy",
True,
constructor_kwds=dict(proxies=proxies, proxy_bypass=proxy_bypass))
def add_password(self, url, user, password, realm=None):
self._password_manager.add_password(realm, url, user, password)
def add_proxy_password(self, user, password, hostport=None, realm=None):
self._proxy_password_manager.add_password(realm, hostport, user,
password)
def add_client_certificate(self, url, key_file, cert_file):
"""Add an SSL client certificate, for HTTPS client auth.
key_file and cert_file must be filenames of the key and certificate
files, in PEM format. You can use e.g. OpenSSL to convert a p12 (PKCS
12) file to PEM format:
openssl pkcs12 -clcerts -nokeys -in cert.p12 -out cert.pem
openssl pkcs12 -nocerts -in cert.p12 -out key.pem
Note that client certificate password input is very inflexible ATM. At
the moment this seems to be console only, which is presumably the
default behaviour of libopenssl. In future mechanize may support
third-party libraries that (I assume) allow more options here.
"""
self._client_cert_manager.add_key_cert(url, key_file, cert_file)
# the following are rarely useful -- use add_password / add_proxy_password
# instead
def set_password_manager(self, password_manager):
"""Set a mechanize.HTTPPasswordMgrWithDefaultRealm, or None."""
self._password_manager = password_manager
self._set_handler("_basicauth", obj=password_manager)
self._set_handler("_digestauth", obj=password_manager)
def set_proxy_password_manager(self, password_manager):
"""Set a mechanize.HTTPProxyPasswordMgr, or None."""
self._proxy_password_manager = password_manager
self._set_handler("_proxy_basicauth", obj=password_manager)
self._set_handler("_proxy_digestauth", obj=password_manager)
def set_client_cert_manager(self, cert_manager):
"""Set a mechanize.HTTPClientCertMgr, or None."""
handler = self._ua_handlers["https"]
self._client_cert_manager = handler.client_cert_manager = cert_manager
def set_ca_data(self, cafile=None, capath=None, cadata=None, context=None):
'''
Set the SSL Context used for connecting to SSL servers.
This method accepts the same arguments as the
:py:meth:`ssl.SSLContext.load_verify_locations()` method from
the Python standard library. You can also pass a pre-built
:class:`ssl.SSLContext` via the `context` keyword argument.
Note that to use this feature, you must be using Python >=
2.7.9.
'''
import ssl
if context is None:
try:
context = ssl.create_default_context(
cafile=cafile, capath=capath, cadata=cadata)
except AttributeError:
raise RuntimeError('python >= 2.7.9 required')
handler = self._ua_handlers["https"]
handler.ssl_context = context
# these methods all take a boolean parameter
def set_handle_robots(self, handle):
"""Set whether to observe rules from robots.txt."""
self._set_handler("_robots", handle)
def set_handle_redirect(self, handle):
"""Set whether to handle HTTP 30x redirections."""
self._set_handler("_redirect", handle)
def set_handle_refresh(self, handle, max_time=None, honor_time=True):
"""Set whether to handle HTTP Refresh headers."""
self._set_handler(
"_refresh",
handle,
constructor_kwds={"max_time": max_time,
"honor_time": honor_time})
def set_handle_equiv(self, handle, head_parser_class=None):
"""Set whether to treat HTML http-equiv headers like HTTP headers.
Response objects may be .seek()able if this is set (currently returned
responses are, raised HTTPError exception responses are not).
"""
if head_parser_class is not None:
constructor_kwds = {"head_parser_class": head_parser_class}
else:
constructor_kwds = {}
self._set_handler("_equiv", handle, constructor_kwds=constructor_kwds)
def set_request_gzip(self, handle):
"""Add header indicating to server that we handle gzip
content encoding. Note that if the server sends gzip'ed content,
it is handled automatically in any case, regardless of this setting.
"""
self._set_handler(
"_gzip", True, constructor_kwds={'request_gzip': bool(handle)})
set_handle_gzip = set_request_gzip # legacy
def set_debug_redirects(self, handle):
"""
Log information about HTTP redirects (including refreshes).
Logging is performed using module logging. The logger name is
`"mechanize.http_redirects"`. To actually print some debug output,
eg:
.. code-block:: python
import sys, logging
logger = logging.getLogger("mechanize.http_redirects")
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.INFO)
Other logger names relevant to this module:
* `mechanize.http_responses`
* `mechanize.cookies`
To turn on everything:
.. code-block:: python
import sys, logging
logger = logging.getLogger("mechanize")
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.INFO)
"""
self._set_handler("_debug_redirect", handle)
def set_debug_responses(self, handle):
"""Log HTTP response bodies.
See :meth:`set_debug_redirects()` for details of logging.
Response objects may be .seek()able if this is set (currently returned
responses are, raised HTTPError exception responses are not).
"""
self._set_handler("_debug_response_body", handle)
def set_debug_http(self, handle):
"""Print HTTP headers to sys.stdout."""
level = int(bool(handle))
for scheme in "http", "https":
h = self._ua_handlers.get(scheme)
if h is not None:
h.set_http_debuglevel(level)
def _copy_state(self, other):
if self._ua_handlers is None:
raise ValueError('Cannot copy state from a closed UserAgentBase')
other.addheaders = self.addheaders[:]
rmap = {v: k for k, v in iteritems(self._ua_handlers)}
def clone_handler(h):
ans = copy.copy(h)
ans.add_parent(other)
try:
other._ua_handlers[rmap[h]] = ans
except KeyError:
pass
return ans
other._ua_handlers.clear()
other.handlers = [clone_handler(h) for h in self.handlers]
other._handler_index_valid = False
def handlers_by_class(self, cls):
for h in self.handlers:
if isinstance(h, cls):
yield h
def _set_handler(self,
name,
handle=None,
obj=None,
constructor_args=(),
constructor_kwds={}):
if handle is None:
handle = obj is not None
if handle:
handler_class = self.handler_classes[name]
if obj is not None:
newhandler = handler_class(obj)
else:
newhandler = handler_class(*constructor_args,
**constructor_kwds)
else:
newhandler = None
self._replace_handler(name, newhandler)
def _replace_handler(self, name, newhandler=None):
# first, if handler was previously added, remove it
if name is not None:
handler = self._ua_handlers.pop(name, None)
if handler is not None:
try:
self.handlers.remove(handler)
except ValueError:
pass
# then add the replacement, if any
if newhandler is not None:
self.add_handler(newhandler)
self._ua_handlers[name] = newhandler
class UserAgent(UserAgentBase):
def __init__(self):
UserAgentBase.__init__(self)
self._seekable = False
def set_seekable_responses(self, handle):
"""Make response objects .seek()able."""
self._seekable = bool(handle)
def open(self,
fullurl,
data=None,
timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT):
if self._seekable:
def bound_open(fullurl,
data=None,
timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT):
return UserAgentBase.open(self, fullurl, data, timeout)
response = _opener.wrapped_open(bound_open,
_response.seek_wrapped_response,
fullurl, data, timeout)
else:
response = UserAgentBase.open(self, fullurl, data)
return response