Aggiornato Cloudscraper

This commit is contained in:
Alhaziel01
2022-09-15 15:42:27 +02:00
parent 667f7f31ff
commit de8d6ff46d
9 changed files with 588 additions and 540 deletions

View File

@@ -1,20 +1,14 @@
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #
import logging import logging
import re
import requests import requests
import sys import sys
import ssl import ssl
from collections import OrderedDict
from copy import deepcopy
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
from requests.sessions import Session from requests.sessions import Session
from requests_toolbelt.utils import dump from requests_toolbelt.utils import dump
from time import sleep
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #
try: try:
@@ -28,37 +22,23 @@ except ImportError:
import copy_reg as copyreg import copy_reg as copyreg
try: try:
from HTMLParser import HTMLParser from urlparse import urlparse
except ImportError: except ImportError:
if sys.version_info >= (3, 4): from urllib.parse import urlparse
import html
else:
from html.parser import HTMLParser
try:
from urlparse import urlparse, urljoin
except ImportError:
from urllib.parse import urlparse, urljoin
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #
from .exceptions import ( from .exceptions import (
CloudflareLoopProtection, CloudflareLoopProtection,
CloudflareCode1020, CloudflareIUAMError
CloudflareIUAMError,
CloudflareSolveError,
CloudflareChallengeError,
CloudflareCaptchaError,
CloudflareCaptchaProvider
) )
from .interpreters import JavaScriptInterpreter from .cloudflare import Cloudflare
from .captcha import Captcha
from .user_agent import User_Agent from .user_agent import User_Agent
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #
__version__ = '1.2.58' __version__ = '1.2.62'
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #
@@ -79,6 +59,8 @@ class CipherSuiteAdapter(HTTPAdapter):
self.ssl_context = kwargs.pop('ssl_context', None) self.ssl_context = kwargs.pop('ssl_context', None)
self.cipherSuite = kwargs.pop('cipherSuite', None) self.cipherSuite = kwargs.pop('cipherSuite', None)
self.source_address = kwargs.pop('source_address', None) self.source_address = kwargs.pop('source_address', None)
self.server_hostname = kwargs.pop('server_hostname', None)
self.ecdhCurve = kwargs.pop('ecdhCurve', 'prime256v1')
if self.source_address: if self.source_address:
if isinstance(self.source_address, str): if isinstance(self.source_address, str):
@@ -91,14 +73,32 @@ class CipherSuiteAdapter(HTTPAdapter):
if not self.ssl_context: if not self.ssl_context:
self.ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) self.ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
self.ssl_context.orig_wrap_socket = self.ssl_context.wrap_socket
self.ssl_context.wrap_socket = self.wrap_socket
if self.server_hostname:
self.ssl_context.server_hostname = self.server_hostname
self.ssl_context.set_ciphers(self.cipherSuite) self.ssl_context.set_ciphers(self.cipherSuite)
self.ssl_context.set_ecdh_curve('prime256v1') self.ssl_context.set_ecdh_curve(self.ecdhCurve)
self.ssl_context.options |= (ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1) self.ssl_context.options |= (ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1)
super(CipherSuiteAdapter, self).__init__(**kwargs) super(CipherSuiteAdapter, self).__init__(**kwargs)
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #
def wrap_socket(self, *args, **kwargs):
if hasattr(self.ssl_context, 'server_hostname') and self.ssl_context.server_hostname:
kwargs['server_hostname'] = self.ssl_context.server_hostname
self.ssl_context.check_hostname = False
else:
self.ssl_context.check_hostname = True
return self.ssl_context.orig_wrap_socket(*args, **kwargs)
# ------------------------------------------------------------------------------- #
def init_poolmanager(self, *args, **kwargs): def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context kwargs['ssl_context'] = self.ssl_context
kwargs['source_address'] = self.source_address kwargs['source_address'] = self.source_address
@@ -118,15 +118,21 @@ class CloudScraper(Session):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.debug = kwargs.pop('debug', False) self.debug = kwargs.pop('debug', False)
self.disableCloudflareV1 = kwargs.pop('disableCloudflareV1', False)
self.delay = kwargs.pop('delay', None) self.delay = kwargs.pop('delay', None)
self.cipherSuite = kwargs.pop('cipherSuite', None)
self.ssl_context = kwargs.pop('ssl_context', None)
self.interpreter = kwargs.pop('interpreter', 'native')
self.captcha = kwargs.pop('captcha', {}) self.captcha = kwargs.pop('captcha', {})
self.doubleDown = kwargs.pop('doubleDown', True)
self.interpreter = kwargs.pop('interpreter', 'native')
self.requestPreHook = kwargs.pop('requestPreHook', None) self.requestPreHook = kwargs.pop('requestPreHook', None)
self.requestPostHook = kwargs.pop('requestPostHook', None) self.requestPostHook = kwargs.pop('requestPostHook', None)
self.cipherSuite = kwargs.pop('cipherSuite', None)
self.ecdhCurve = kwargs.pop('ecdhCurve', 'prime256v1')
self.source_address = kwargs.pop('source_address', None) self.source_address = kwargs.pop('source_address', None)
self.doubleDown = kwargs.pop('doubleDown', True) self.server_hostname = kwargs.pop('server_hostname', None)
self.ssl_context = kwargs.pop('ssl_context', None)
self.allow_brotli = kwargs.pop( self.allow_brotli = kwargs.pop(
'allow_brotli', 'allow_brotli',
@@ -159,8 +165,10 @@ class CloudScraper(Session):
'https://', 'https://',
CipherSuiteAdapter( CipherSuiteAdapter(
cipherSuite=self.cipherSuite, cipherSuite=self.cipherSuite,
ssl_context=self.ssl_context, ecdhCurve=self.ecdhCurve,
source_address=self.source_address server_hostname=self.server_hostname,
source_address=self.source_address,
ssl_context=self.ssl_context
) )
) )
@@ -199,21 +207,7 @@ class CloudScraper(Session):
try: try:
print(dump.dump_all(req).decode('utf-8', errors='backslashreplace')) print(dump.dump_all(req).decode('utf-8', errors='backslashreplace'))
except ValueError as e: except ValueError as e:
print("Debug Error: {}".format(getattr(e, 'message', e))) print(f"Debug Error: {getattr(e, 'message', e)}")
# ------------------------------------------------------------------------------- #
# Unescape / decode html entities
# ------------------------------------------------------------------------------- #
@staticmethod
def unescape(html_text):
if sys.version_info >= (3, 0):
if sys.version_info >= (3, 4):
return html.unescape(html_text)
return HTMLParser().unescape(html_text)
return HTMLParser().unescape(html_text)
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #
# Decode Brotli on older versions of urllib3 manually # Decode Brotli on older versions of urllib3 manually
@@ -225,10 +219,10 @@ class CloudScraper(Session):
resp._content = brotli.decompress(resp.content) resp._content = brotli.decompress(resp.content)
else: else:
logging.warning( logging.warning(
'You\'re running urllib3 {}, Brotli content detected, ' f'You\'re running urllib3 {requests.packages.urllib3.__version__}, Brotli content detected, '
'Which requires manual decompression, ' 'Which requires manual decompression, '
'But option allow_brotli is set to False, ' 'But option allow_brotli is set to False, '
'We will not continue to decompress.'.format(requests.packages.urllib3.__version__) 'We will not continue to decompress.'
) )
return resp return resp
@@ -275,480 +269,44 @@ class CloudScraper(Session):
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #
if self.requestPostHook: if self.requestPostHook:
response = self.requestPostHook(self, response) newResponse = self.requestPostHook(self, response)
if self.debug: if response != newResponse: # Give me walrus in 3.7!!!
self.debugRequest(response) response = newResponse
if self.debug:
print('==== requestPostHook Debug ====')
self.debugRequest(response)
# ------------------------------------------------------------------------------- #
if not self.disableCloudflareV1:
cloudflareV1 = Cloudflare(self)
# Check if Cloudflare anti-bot is on
if self.is_Challenge_Request(response):
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #
# Try to solve the challenge and send it back # Check if Cloudflare v1 anti-bot is on
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #
if self._solveDepthCnt >= self.solveDepth: if cloudflareV1.is_Challenge_Request(response):
_ = self._solveDepthCnt # ------------------------------------------------------------------------------- #
self.simpleException( # Try to solve the challenge and send it back
CloudflareLoopProtection, # ------------------------------------------------------------------------------- #
"!!Loop Protection!! We have tried to solve {} time(s) in a row.".format(_)
)
self._solveDepthCnt += 1 if self._solveDepthCnt >= self.solveDepth:
_ = self._solveDepthCnt
self.simpleException(
CloudflareLoopProtection,
f"!!Loop Protection!! We have tried to solve {_} time(s) in a row."
)
response = self.Challenge_Response(response, **kwargs) self._solveDepthCnt += 1
else:
if not response.is_redirect and response.status_code not in [429, 503]: response = cloudflareV1.Challenge_Response(response, **kwargs)
self._solveDepthCnt = 0 else:
if not response.is_redirect and response.status_code not in [429, 503]:
self._solveDepthCnt = 0
return response return response
# ------------------------------------------------------------------------------- #
# check if the response contains a valid Cloudflare Bot Fight Mode challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_BFM_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and re.search(
r"\/cdn-cgi\/bm\/cv\/\d+\/api\.js.*?"
r"window\['__CF\$cv\$params'\]\s*=\s*{",
resp.text,
re.M | re.S
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains a valid Cloudflare challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_IUAM_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code in [429, 503]
and re.search(
r'<form .*?="challenge-form" action="/.*?__cf_chl_jschl_tk__=\S+"',
resp.text,
re.M | re.S
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains new Cloudflare challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_New_IUAM_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code in [429, 503]
and re.search(
r'cpo.src\s*=\s*"/cdn-cgi/challenge-platform/\S+orchestrate/jsch/v1',
resp.text,
re.M | re.S
)
and re.search(r'window._cf_chl_enter\s*[\(=]', resp.text, re.M | re.S)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains a v2 hCaptcha Cloudflare challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_New_Captcha_Challenge(resp):
try:
return (
CloudScraper.is_Captcha_Challenge(resp)
and re.search(
r'cpo.src\s*=\s*"/cdn-cgi/challenge-platform/\S+orchestrate/captcha/v1',
resp.text,
re.M | re.S
)
and re.search(r'\s*id="trk_captcha_js"', resp.text, re.M | re.S)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains a Cloudflare hCaptcha challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_Captcha_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code == 403
and re.search(
r'action="/\S+__cf_chl_captcha_tk__=\S+',
resp.text,
re.M | re.DOTALL
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains Firewall 1020 Error
# ------------------------------------------------------------------------------- #
@staticmethod
def is_Firewall_Blocked(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code == 403
and re.search(
r'<span class="cf-error-code">1020</span>',
resp.text,
re.M | re.DOTALL
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# Wrapper for is_Captcha_Challenge, is_IUAM_Challenge, is_Firewall_Blocked
# ------------------------------------------------------------------------------- #
def is_Challenge_Request(self, resp):
if self.is_Firewall_Blocked(resp):
self.simpleException(
CloudflareCode1020,
'Cloudflare has blocked this request (Code 1020 Detected).'
)
if self.is_New_Captcha_Challenge(resp):
self.simpleException(
CloudflareChallengeError,
'Detected a Cloudflare version 2 Captcha challenge, This feature is not available in the opensource (free) version.'
)
if self.is_New_IUAM_Challenge(resp):
self.simpleException(
CloudflareChallengeError,
'Detected a Cloudflare version 2 challenge, This feature is not available in the opensource (free) version.'
)
if self.is_Captcha_Challenge(resp) or self.is_IUAM_Challenge(resp):
if self.debug:
print('Detected a Cloudflare version 1 challenge.')
return True
return False
# ------------------------------------------------------------------------------- #
# Try to solve cloudflare javascript challenge.
# ------------------------------------------------------------------------------- #
def IUAM_Challenge_Response(self, body, url, interpreter):
try:
formPayload = re.search(
r'<form (?P<form>.*?="challenge-form" '
r'action="(?P<challengeUUID>.*?'
r'__cf_chl_jschl_tk__=\S+)"(.*?)</form>)',
body,
re.M | re.DOTALL
).groupdict()
if not all(key in formPayload for key in ['form', 'challengeUUID']):
self.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM detected, unfortunately we can't extract the parameters correctly."
)
payload = OrderedDict()
for challengeParam in re.findall(r'^\s*<input\s(.*?)/>', formPayload['form'], re.M | re.S):
inputPayload = dict(re.findall(r'(\S+)="(\S+)"', challengeParam))
if inputPayload.get('name') in ['r', 'jschl_vc', 'pass']:
payload.update({inputPayload['name']: inputPayload['value']})
except AttributeError:
self.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM detected, unfortunately we can't extract the parameters correctly."
)
hostParsed = urlparse(url)
try:
payload['jschl_answer'] = JavaScriptInterpreter.dynamicImport(
interpreter
).solveChallenge(body, hostParsed.netloc)
except Exception as e:
self.simpleException(
CloudflareIUAMError,
"Unable to parse Cloudflare anti-bots page: {}".format(getattr(e, 'message', e))
)
return {
'url': "{}://{}{}".format(hostParsed.scheme, hostParsed.netloc, self.unescape(formPayload['challengeUUID'])),
'data': payload
}
# ------------------------------------------------------------------------------- #
# Try to solve the Captcha challenge via 3rd party.
# ------------------------------------------------------------------------------- #
def captcha_Challenge_Response(self, provider, provider_params, body, url):
try:
formPayload = re.search(
r'<form (?P<form>.*?="challenge-form" '
r'action="(?P<challengeUUID>.*?__cf_chl_captcha_tk__=\S+)"(.*?)</form>)',
body,
re.M | re.DOTALL
).groupdict()
if not all(key in formPayload for key in ['form', 'challengeUUID']):
self.simpleException(
CloudflareCaptchaError,
"Cloudflare Captcha detected, unfortunately we can't extract the parameters correctly."
)
payload = OrderedDict(
re.findall(
r'(name="r"\svalue|data-ray|data-sitekey|name="cf_captcha_kind"\svalue)="(.*?)"',
formPayload['form']
)
)
captchaType = 'reCaptcha' if payload['name="cf_captcha_kind" value'] == 're' else 'hCaptcha'
except (AttributeError, KeyError):
self.simpleException(
CloudflareCaptchaError,
"Cloudflare Captcha detected, unfortunately we can't extract the parameters correctly."
)
# ------------------------------------------------------------------------------- #
# Pass proxy parameter to provider to solve captcha.
# ------------------------------------------------------------------------------- #
if self.proxies and self.proxies != self.captcha.get('proxy'):
self.captcha['proxy'] = self.proxies
# ------------------------------------------------------------------------------- #
# Pass User-Agent if provider supports it to solve captcha.
# ------------------------------------------------------------------------------- #
self.captcha['User-Agent'] = self.headers['User-Agent']
# ------------------------------------------------------------------------------- #
# Submit job to provider to request captcha solve.
# ------------------------------------------------------------------------------- #
captchaResponse = Captcha.dynamicImport(
provider.lower()
).solveCaptcha(
captchaType,
url,
payload['data-sitekey'],
provider_params
)
# ------------------------------------------------------------------------------- #
# Parse and handle the response of solved captcha.
# ------------------------------------------------------------------------------- #
dataPayload = OrderedDict([
('r', payload.get('name="r" value', '')),
('cf_captcha_kind', payload['name="cf_captcha_kind" value']),
('id', payload.get('data-ray')),
('g-recaptcha-response', captchaResponse)
])
if captchaType == 'hCaptcha':
dataPayload.update({'h-captcha-response': captchaResponse})
hostParsed = urlparse(url)
return {
'url': "{}://{}{}".format(hostParsed.scheme, hostParsed.netloc, self.unescape(formPayload['challengeUUID'])),
'data': dataPayload
}
# ------------------------------------------------------------------------------- #
# Attempt to handle and send the challenge response back to cloudflare
# ------------------------------------------------------------------------------- #
def Challenge_Response(self, resp, **kwargs):
if self.is_Captcha_Challenge(resp):
# ------------------------------------------------------------------------------- #
# double down on the request as some websites are only checking
# if cfuid is populated before issuing Captcha.
# ------------------------------------------------------------------------------- #
if self.doubleDown:
resp = self.decodeBrotli(
self.perform_request(resp.request.method, resp.url, **kwargs)
)
if not self.is_Captcha_Challenge(resp):
return resp
# ------------------------------------------------------------------------------- #
# if no captcha provider raise a runtime error.
# ------------------------------------------------------------------------------- #
if not self.captcha or not isinstance(self.captcha, dict) or not self.captcha.get('provider'):
self.simpleException(
CloudflareCaptchaProvider,
"Cloudflare Captcha detected, unfortunately you haven't loaded an anti Captcha provider "
"correctly via the 'captcha' parameter."
)
# ------------------------------------------------------------------------------- #
# if provider is return_response, return the response without doing anything.
# ------------------------------------------------------------------------------- #
if self.captcha.get('provider') == 'return_response':
return resp
# ------------------------------------------------------------------------------- #
# Submit request to parser wrapper to solve captcha
# ------------------------------------------------------------------------------- #
submit_url = self.captcha_Challenge_Response(
self.captcha.get('provider'),
self.captcha,
resp.text,
resp.url
)
else:
# ------------------------------------------------------------------------------- #
# Cloudflare requires a delay before solving the challenge
# ------------------------------------------------------------------------------- #
if not self.delay:
try:
delay = float(
re.search(
r'submit\(\);\r?\n\s*},\s*([0-9]+)',
resp.text
).group(1)
) / float(1000)
if isinstance(delay, (int, float)):
self.delay = delay
except (AttributeError, ValueError):
self.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM possibility malformed, issue extracing delay value."
)
sleep(self.delay)
# ------------------------------------------------------------------------------- #
submit_url = self.IUAM_Challenge_Response(
resp.text,
resp.url,
self.interpreter
)
# ------------------------------------------------------------------------------- #
# Send the Challenge Response back to Cloudflare
# ------------------------------------------------------------------------------- #
if submit_url:
def updateAttr(obj, name, newValue):
try:
obj[name].update(newValue)
return obj[name]
except (AttributeError, KeyError):
obj[name] = {}
obj[name].update(newValue)
return obj[name]
cloudflare_kwargs = deepcopy(kwargs)
cloudflare_kwargs['allow_redirects'] = False
cloudflare_kwargs['data'] = updateAttr(
cloudflare_kwargs,
'data',
submit_url['data']
)
urlParsed = urlparse(resp.url)
cloudflare_kwargs['headers'] = updateAttr(
cloudflare_kwargs,
'headers',
{
'Origin': '{}://{}'.format(urlParsed.scheme, urlParsed.netloc),
'Referer': resp.url
}
)
challengeSubmitResponse = self.request(
'POST',
submit_url['url'],
**cloudflare_kwargs
)
if challengeSubmitResponse.status_code == 400:
self.simpleException(
CloudflareSolveError,
'Invalid challenge answer detected, Cloudflare broken?'
)
# ------------------------------------------------------------------------------- #
# Return response if Cloudflare is doing content pass through instead of 3xx
# else request with redirect URL also handle protocol scheme change http -> https
# ------------------------------------------------------------------------------- #
if not challengeSubmitResponse.is_redirect:
return challengeSubmitResponse
else:
cloudflare_kwargs = deepcopy(kwargs)
cloudflare_kwargs['headers'] = updateAttr(
cloudflare_kwargs,
'headers',
{'Referer': challengeSubmitResponse.url}
)
if not urlparse(challengeSubmitResponse.headers['Location']).netloc:
redirect_location = urljoin(
challengeSubmitResponse.url,
challengeSubmitResponse.headers['Location']
)
else:
redirect_location = challengeSubmitResponse.headers['Location']
return self.request(
resp.request.method,
redirect_location,
**cloudflare_kwargs
)
# ------------------------------------------------------------------------------- #
# We shouldn't be here...
# Re-request the original query and/or process again....
# ------------------------------------------------------------------------------- #
return self.request(resp.request.method, resp.url, **kwargs)
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #
@classmethod @classmethod
@@ -761,7 +319,7 @@ class CloudScraper(Session):
if sess: if sess:
for attr in ['auth', 'cert', 'cookies', 'headers', 'hooks', 'params', 'proxies', 'data']: for attr in ['auth', 'cert', 'cookies', 'headers', 'hooks', 'params', 'proxies', 'data']:
val = getattr(sess, attr, None) val = getattr(sess, attr, None)
if val: if val is not None:
setattr(scraper, attr, val) setattr(scraper, attr, val)
return scraper return scraper
@@ -782,7 +340,7 @@ class CloudScraper(Session):
'doubleDown', 'doubleDown',
'captcha', 'captcha',
'interpreter', 'interpreter',
'source_address' 'source_address',
'requestPreHook', 'requestPreHook',
'requestPostHook' 'requestPostHook'
] if field in kwargs ] if field in kwargs
@@ -793,7 +351,7 @@ class CloudScraper(Session):
resp = scraper.get(url, **kwargs) resp = scraper.get(url, **kwargs)
resp.raise_for_status() resp.raise_for_status()
except Exception: except Exception:
logging.error('"{}" returned an error. Could not collect tokens.'.format(url)) logging.error(f'"{url}" returned an error. Could not collect tokens.')
raise raise
domain = urlparse(resp.url).netloc domain = urlparse(resp.url).netloc
@@ -801,11 +359,12 @@ class CloudScraper(Session):
cookie_domain = None cookie_domain = None
for d in scraper.cookies.list_domains(): for d in scraper.cookies.list_domains():
if d.startswith('.') and d in ('.{}'.format(domain)): if d.startswith('.') and d in (f'.{domain}'):
cookie_domain = d cookie_domain = d
break break
else: else:
cls.simpleException( cls.simpleException(
cls,
CloudflareIUAMError, CloudflareIUAMError,
"Unable to find Cloudflare cookies. Does the site actually " "Unable to find Cloudflare cookies. Does the site actually "
"have Cloudflare IUAM (I'm Under Attack Mode) enabled?" "have Cloudflare IUAM (I'm Under Attack Mode) enabled?"
@@ -813,7 +372,6 @@ class CloudScraper(Session):
return ( return (
{ {
'__cfduid': scraper.cookies.get('__cfduid', '', domain=cookie_domain),
'cf_clearance': scraper.cookies.get('cf_clearance', '', domain=cookie_domain) 'cf_clearance': scraper.cookies.get('cf_clearance', '', domain=cookie_domain)
}, },
scraper.headers['User-Agent'] scraper.headers['User-Agent']
@@ -834,9 +392,9 @@ class CloudScraper(Session):
if ssl.OPENSSL_VERSION_INFO < (1, 1, 1): if ssl.OPENSSL_VERSION_INFO < (1, 1, 1):
print( print(
"DEPRECATION: The OpenSSL being used by this python install ({}) does not meet the minimum supported " f"DEPRECATION: The OpenSSL being used by this python install ({ssl.OPENSSL_VERSION}) does not meet the minimum supported "
"version (>= OpenSSL 1.1.1) in order to support TLS 1.3 required by Cloudflare, " "version (>= OpenSSL 1.1.1) in order to support TLS 1.3 required by Cloudflare, "
"You may encounter an unexpected Captcha or cloudflare 1020 blocks.".format(ssl.OPENSSL_VERSION) "You may encounter an unexpected Captcha or cloudflare 1020 blocks."
) )
# ------------------------------------------------------------------------------- # # ------------------------------------------------------------------------------- #

View File

@@ -103,7 +103,7 @@ class captchaSolver(Captcha):
response = polling2.poll( response = polling2.poll(
lambda: self.session.get( lambda: self.session.get(
'{}/res.php'.format(self.host), f'{self.host}/res.php',
params={ params={
'key': self.api_key, 'key': self.api_key,
'action': 'reportbad', 'action': 'reportbad',
@@ -138,7 +138,7 @@ class captchaSolver(Captcha):
response = polling2.poll( response = polling2.poll(
lambda: self.session.get( lambda: self.session.get(
'{}/res.php'.format(self.host), f'{self.host}/res.php',
params={ params={
'key': self.api_key, 'key': self.api_key,
'action': 'get', 'action': 'get',
@@ -195,7 +195,7 @@ class captchaSolver(Captcha):
response = polling2.poll( response = polling2.poll(
lambda: self.session.post( lambda: self.session.post(
'{}/in.php'.format(self.host), f'{self.host}/in.php',
data=data, data=data,
allow_redirects=False, allow_redirects=False,
timeout=30 timeout=30

View File

@@ -36,7 +36,7 @@ class captchaSolver(reCaptcha):
def checkErrorStatus(response): def checkErrorStatus(response):
if response.status_code in [500, 502]: if response.status_code in [500, 502]:
raise reCaptchaServiceUnavailable( raise reCaptchaServiceUnavailable(
'9kw: Server Side Error {}'.format(response.status_code) f'9kw: Server Side Error {response.status_code}'
) )
error_codes = { error_codes = {

View File

@@ -25,12 +25,12 @@ class Captcha(ABC):
def dynamicImport(cls, name): def dynamicImport(cls, name):
if name not in captchaSolvers: if name not in captchaSolvers:
try: try:
__import__('{}.{}'.format(cls.__module__, name)) __import__(f'{cls.__module__}.{name}')
if not isinstance(captchaSolvers.get(name), Captcha): if not isinstance(captchaSolvers.get(name), Captcha):
raise ImportError('The anti captcha provider was not initialized.') raise ImportError('The anti captcha provider was not initialized.')
except ImportError as e: except ImportError as e:
sys.tracebacklimit = 0 sys.tracebacklimit = 0
logging.error('Unable to load {} anti captcha provider -> {}'.format(name, e)) logging.error(f'Unable to load {name} anti captcha provider -> {e}')
raise raise
return captchaSolvers[name] return captchaSolvers[name]

View File

@@ -36,7 +36,7 @@ class captchaSolver(Captcha):
def checkErrorStatus(response): def checkErrorStatus(response):
if response.status_code in [500, 502]: if response.status_code in [500, 502]:
raise CaptchaServiceUnavailable( raise CaptchaServiceUnavailable(
'CapMonster: Server Side Error {}'.format(response.status_code) f'CapMonster: Server Side Error {response.status_code}'
) )
payload = response.json() payload = response.json()
@@ -66,7 +66,7 @@ class captchaSolver(Captcha):
response = polling2.poll( response = polling2.poll(
lambda: self.session.post( lambda: self.session.post(
'{}/getTaskResult'.format(self.host), f'{self.host}/getTaskResult',
json={ json={
'clientKey': self.clientKey, 'clientKey': self.clientKey,
'taskId': taskID 'taskId': taskID
@@ -101,9 +101,9 @@ class captchaSolver(Captcha):
'task': { 'task': {
'websiteURL': url, 'websiteURL': url,
'websiteKey': siteKey, 'websiteKey': siteKey,
'softId': 37,
'type': 'NoCaptchaTask' if captchaType == 'reCaptcha' else 'HCaptchaTask' 'type': 'NoCaptchaTask' if captchaType == 'reCaptcha' else 'HCaptchaTask'
} },
'softId': 37
} }
if self.proxy: if self.proxy:
@@ -113,7 +113,7 @@ class captchaSolver(Captcha):
response = polling2.poll( response = polling2.poll(
lambda: self.session.post( lambda: self.session.post(
'{}/createTask'.format(self.host), f'{self.host}/createTask',
json=data, json=data,
allow_redirects=False, allow_redirects=False,
timeout=30 timeout=30

View File

@@ -68,7 +68,7 @@ class captchaSolver(Captcha):
response = polling2.poll( response = polling2.poll(
lambda: self.session.post( lambda: self.session.post(
'{}/user'.format(self.host), f'{self.host}/user',
headers={'Accept': 'application/json'}, headers={'Accept': 'application/json'},
data={ data={
'username': self.username, 'username': self.username,
@@ -100,7 +100,7 @@ class captchaSolver(Captcha):
response = polling2.poll( response = polling2.poll(
lambda: self.session.post( lambda: self.session.post(
'{}/captcha/{}/report'.format(self.host, jobID), f'{self.host}/captcha/{jobID}/report',
headers={'Accept': 'application/json'}, headers={'Accept': 'application/json'},
data={ data={
'username': self.username, 'username': self.username,
@@ -137,7 +137,7 @@ class captchaSolver(Captcha):
response = polling2.poll( response = polling2.poll(
lambda: self.session.get( lambda: self.session.get(
'{}/captcha/{}'.format(self.host, jobID), f'{self.host}/captcha/{jobID}',
headers={'Accept': 'application/json'} headers={'Accept': 'application/json'}
), ),
check_success=_checkRequest, check_success=_checkRequest,
@@ -203,7 +203,7 @@ class captchaSolver(Captcha):
response = polling2.poll( response = polling2.poll(
lambda: self.session.post( lambda: self.session.post(
'{}/captcha'.format(self.host), f'{self.host}/captcha',
headers={'Accept': 'application/json'}, headers={'Accept': 'application/json'},
data=data, data=data,
allow_redirects=False allow_redirects=False

View File

@@ -0,0 +1,490 @@
# Cloudflare V1
import re
import sys
import time
from copy import deepcopy
from collections import OrderedDict
# ------------------------------------------------------------------------------- #
try:
from HTMLParser import HTMLParser
except ImportError:
if sys.version_info >= (3, 4):
import html
else:
from html.parser import HTMLParser
try:
from urlparse import urlparse, urljoin
except ImportError:
from urllib.parse import urlparse, urljoin
# ------------------------------------------------------------------------------- #
from .exceptions import (
CloudflareCode1020,
CloudflareIUAMError,
CloudflareSolveError,
CloudflareChallengeError,
CloudflareCaptchaError,
CloudflareCaptchaProvider
)
# ------------------------------------------------------------------------------- #
from .captcha import Captcha
from .interpreters import JavaScriptInterpreter
# ------------------------------------------------------------------------------- #
class Cloudflare():
def __init__(self, cloudscraper):
self.cloudscraper = cloudscraper
# ------------------------------------------------------------------------------- #
# Unescape / decode html entities
# ------------------------------------------------------------------------------- #
@staticmethod
def unescape(html_text):
if sys.version_info >= (3, 0):
if sys.version_info >= (3, 4):
return html.unescape(html_text)
return HTMLParser().unescape(html_text)
return HTMLParser().unescape(html_text)
# ------------------------------------------------------------------------------- #
# check if the response contains a valid Cloudflare challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_IUAM_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code in [429, 503]
and re.search(
r'<form .*?="challenge-form" action="/.*?__cf_chl_jschl_tk__=\S+"',
resp.text,
re.M | re.S
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains new Cloudflare challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_New_IUAM_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code in [429, 503]
and re.search(
r'''cpo.src\s*=\s*['"]/cdn-cgi/challenge-platform/\S+orchestrate/jsch/v1''',
resp.text,
re.M | re.S
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains a v2 hCaptcha Cloudflare challenge
# ------------------------------------------------------------------------------- #
def is_New_Captcha_Challenge(self, resp):
try:
return (
self.is_Captcha_Challenge(resp)
and re.search(
r'''cpo.src\s*=\s*['"]/cdn-cgi/challenge-platform/\S+orchestrate/(captcha|managed)/v1''',
resp.text,
re.M | re.S
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains a Cloudflare hCaptcha challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_Captcha_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code == 403
and re.search(
r'''action="/\S+__cf_chl(|_f)_tk=\S+''',
resp.text,
re.M | re.DOTALL
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains Firewall 1020 Error
# ------------------------------------------------------------------------------- #
@staticmethod
def is_Firewall_Blocked(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code == 403
and re.search(
r'<span class="cf-error-code">1020</span>',
resp.text,
re.M | re.DOTALL
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# Wrapper for is_Captcha_Challenge, is_IUAM_Challenge, is_Firewall_Blocked
# ------------------------------------------------------------------------------- #
def is_Challenge_Request(self, resp):
if self.is_Firewall_Blocked(resp):
self.cloudscraper.simpleException(
CloudflareCode1020,
'Cloudflare has blocked this request (Code 1020 Detected).'
)
if self.is_New_Captcha_Challenge(resp):
self.cloudscraper.simpleException(
CloudflareChallengeError,
'Detected a Cloudflare version 2 Captcha challenge, This feature is not available in the opensource (free) version.'
)
if self.is_New_IUAM_Challenge(resp):
self.cloudscraper.simpleException(
CloudflareChallengeError,
'Detected a Cloudflare version 2 challenge, This feature is not available in the opensource (free) version.'
)
if self.is_Captcha_Challenge(resp) or self.is_IUAM_Challenge(resp):
if self.cloudscraper.debug:
print('Detected a Cloudflare version 1 challenge.')
return True
return False
# ------------------------------------------------------------------------------- #
# Try to solve cloudflare javascript challenge.
# ------------------------------------------------------------------------------- #
def IUAM_Challenge_Response(self, body, url, interpreter):
try:
formPayload = re.search(
r'<form (?P<form>.*?="challenge-form" '
r'action="(?P<challengeUUID>.*?'
r'__cf_chl_jschl_tk__=\S+)"(.*?)</form>)',
body,
re.M | re.DOTALL
).groupdict()
if not all(key in formPayload for key in ['form', 'challengeUUID']):
self.cloudscraper.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM detected, unfortunately we can't extract the parameters correctly."
)
payload = OrderedDict()
for challengeParam in re.findall(r'^\s*<input\s(.*?)/>', formPayload['form'], re.M | re.S):
inputPayload = dict(re.findall(r'(\S+)="(\S+)"', challengeParam))
if inputPayload.get('name') in ['r', 'jschl_vc', 'pass']:
payload.update({inputPayload['name']: inputPayload['value']})
except AttributeError:
self.cloudscraper.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM detected, unfortunately we can't extract the parameters correctly."
)
hostParsed = urlparse(url)
try:
payload['jschl_answer'] = JavaScriptInterpreter.dynamicImport(
interpreter
).solveChallenge(body, hostParsed.netloc)
except Exception as e:
self.cloudscraper.simpleException(
CloudflareIUAMError,
f"Unable to parse Cloudflare anti-bots page: {getattr(e, 'message', e)}"
)
return {
'url': f"{hostParsed.scheme}://{hostParsed.netloc}{self.unescape(formPayload['challengeUUID'])}",
'data': payload
}
# ------------------------------------------------------------------------------- #
# Try to solve the Captcha challenge via 3rd party.
# ------------------------------------------------------------------------------- #
def captcha_Challenge_Response(self, provider, provider_params, body, url):
try:
formPayload = re.search(
r'<form (?P<form>.*?="challenge-form" '
r'action="(?P<challengeUUID>.*?__cf_chl_captcha_tk__=\S+)"(.*?)</form>)',
body,
re.M | re.DOTALL
).groupdict()
if not all(key in formPayload for key in ['form', 'challengeUUID']):
self.cloudscraper.simpleException(
CloudflareCaptchaError,
"Cloudflare Captcha detected, unfortunately we can't extract the parameters correctly."
)
payload = OrderedDict(
re.findall(
r'(name="r"\svalue|data-ray|data-sitekey|name="cf_captcha_kind"\svalue)="(.*?)"',
formPayload['form']
)
)
captchaType = 'reCaptcha' if payload['name="cf_captcha_kind" value'] == 're' else 'hCaptcha'
except (AttributeError, KeyError):
self.cloudscraper.simpleException(
CloudflareCaptchaError,
"Cloudflare Captcha detected, unfortunately we can't extract the parameters correctly."
)
# ------------------------------------------------------------------------------- #
# Pass proxy parameter to provider to solve captcha.
# ------------------------------------------------------------------------------- #
if self.cloudscraper.proxies and self.cloudscraper.proxies != self.cloudscraper.captcha.get('proxy'):
self.cloudscraper.captcha['proxy'] = self.proxies
# ------------------------------------------------------------------------------- #
# Pass User-Agent if provider supports it to solve captcha.
# ------------------------------------------------------------------------------- #
self.cloudscraper.captcha['User-Agent'] = self.cloudscraper.headers['User-Agent']
# ------------------------------------------------------------------------------- #
# Submit job to provider to request captcha solve.
# ------------------------------------------------------------------------------- #
captchaResponse = Captcha.dynamicImport(
provider.lower()
).solveCaptcha(
captchaType,
url,
payload['data-sitekey'],
provider_params
)
# ------------------------------------------------------------------------------- #
# Parse and handle the response of solved captcha.
# ------------------------------------------------------------------------------- #
dataPayload = OrderedDict([
('r', payload.get('name="r" value', '')),
('cf_captcha_kind', payload['name="cf_captcha_kind" value']),
('id', payload.get('data-ray')),
('g-recaptcha-response', captchaResponse)
])
if captchaType == 'hCaptcha':
dataPayload.update({'h-captcha-response': captchaResponse})
hostParsed = urlparse(url)
return {
'url': f"{hostParsed.scheme}://{hostParsed.netloc}{self.unescape(formPayload['challengeUUID'])}",
'data': dataPayload
}
# ------------------------------------------------------------------------------- #
# Attempt to handle and send the challenge response back to cloudflare
# ------------------------------------------------------------------------------- #
def Challenge_Response(self, resp, **kwargs):
if self.is_Captcha_Challenge(resp):
# ------------------------------------------------------------------------------- #
# double down on the request as some websites are only checking
# if cfuid is populated before issuing Captcha.
# ------------------------------------------------------------------------------- #
if self.cloudscraper.doubleDown:
resp = self.cloudscraper.decodeBrotli(
self.cloudscraper.perform_request(resp.request.method, resp.url, **kwargs)
)
if not self.is_Captcha_Challenge(resp):
return resp
# ------------------------------------------------------------------------------- #
# if no captcha provider raise a runtime error.
# ------------------------------------------------------------------------------- #
if (
not self.cloudscraper.captcha
or not isinstance(self.cloudscraper.captcha, dict)
or not self.cloudscraper.captcha.get('provider')
):
self.cloudscraper.simpleException(
CloudflareCaptchaProvider,
"Cloudflare Captcha detected, unfortunately you haven't loaded an anti Captcha provider "
"correctly via the 'captcha' parameter."
)
# ------------------------------------------------------------------------------- #
# if provider is return_response, return the response without doing anything.
# ------------------------------------------------------------------------------- #
if self.cloudscraper.captcha.get('provider') == 'return_response':
return resp
# ------------------------------------------------------------------------------- #
# Submit request to parser wrapper to solve captcha
# ------------------------------------------------------------------------------- #
submit_url = self.captcha_Challenge_Response(
self.cloudscraper.captcha.get('provider'),
self.cloudscraper.captcha,
resp.text,
resp.url
)
else:
# ------------------------------------------------------------------------------- #
# Cloudflare requires a delay before solving the challenge
# ------------------------------------------------------------------------------- #
if not self.cloudscraper.delay:
try:
delay = float(
re.search(
r'submit\(\);\r?\n\s*},\s*([0-9]+)',
resp.text
).group(1)
) / float(1000)
if isinstance(delay, (int, float)):
self.cloudscraper.delay = delay
except (AttributeError, ValueError):
self.cloudscraper.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM possibility malformed, issue extracing delay value."
)
time.sleep(self.cloudscraper.delay)
# ------------------------------------------------------------------------------- #
submit_url = self.IUAM_Challenge_Response(
resp.text,
resp.url,
self.cloudscraper.interpreter
)
# ------------------------------------------------------------------------------- #
# Send the Challenge Response back to Cloudflare
# ------------------------------------------------------------------------------- #
if submit_url:
def updateAttr(obj, name, newValue):
try:
obj[name].update(newValue)
return obj[name]
except (AttributeError, KeyError):
obj[name] = {}
obj[name].update(newValue)
return obj[name]
cloudflare_kwargs = deepcopy(kwargs)
cloudflare_kwargs['allow_redirects'] = False
cloudflare_kwargs['data'] = updateAttr(
cloudflare_kwargs,
'data',
submit_url['data']
)
urlParsed = urlparse(resp.url)
cloudflare_kwargs['headers'] = updateAttr(
cloudflare_kwargs,
'headers',
{
'Origin': f'{urlParsed.scheme}://{urlParsed.netloc}',
'Referer': resp.url
}
)
challengeSubmitResponse = self.cloudscraper.request(
'POST',
submit_url['url'],
**cloudflare_kwargs
)
if challengeSubmitResponse.status_code == 400:
self.cloudscraper.simpleException(
CloudflareSolveError,
'Invalid challenge answer detected, Cloudflare broken?'
)
# ------------------------------------------------------------------------------- #
# Return response if Cloudflare is doing content pass through instead of 3xx
# else request with redirect URL also handle protocol scheme change http -> https
# ------------------------------------------------------------------------------- #
if not challengeSubmitResponse.is_redirect:
return challengeSubmitResponse
else:
cloudflare_kwargs = deepcopy(kwargs)
cloudflare_kwargs['headers'] = updateAttr(
cloudflare_kwargs,
'headers',
{'Referer': challengeSubmitResponse.url}
)
if not urlparse(challengeSubmitResponse.headers['Location']).netloc:
redirect_location = urljoin(
challengeSubmitResponse.url,
challengeSubmitResponse.headers['Location']
)
else:
redirect_location = challengeSubmitResponse.headers['Location']
return self.cloudscraper.request(
resp.request.method,
redirect_location,
**cloudflare_kwargs
)
# ------------------------------------------------------------------------------- #
# We shouldn't be here...
# Re-request the original query and/or process again....
# ------------------------------------------------------------------------------- #
return self.cloudscraper.request(resp.request.method, resp.url, **kwargs)
# ------------------------------------------------------------------------------- #

View File

@@ -28,9 +28,9 @@ def _pythonVersion():
if interpreter == 'PyPy': if interpreter == 'PyPy':
interpreter_version = \ interpreter_version = \
'{}.{}.{}'.format(sys.pypy_version_info.major, sys.pypy_version_info.minor, sys.pypy_version_info.micro) f'{sys.pypy_version_info.major}.{sys.pypy_version_info.minor}.{sys.pypy_version_info.micro}'
if sys.pypy_version_info.releaselevel != 'final': if sys.pypy_version_info.releaselevel != 'final':
interpreter_version = '{}{}'.format(interpreter_version, sys.pypy_version_info.releaselevel) interpreter_version = f'{interpreter_version}{sys.pypy_version_info.releaselevel}'
return { return {
'name': interpreter, 'name': interpreter,
'version': interpreter_version 'version': interpreter_version

View File

@@ -93,14 +93,14 @@ class User_Agent():
else: else:
if self.browser and self.browser not in self.browsers: if self.browser and self.browser not in self.browsers:
sys.tracebacklimit = 0 sys.tracebacklimit = 0
raise RuntimeError('Sorry "{}" browser is not valid, valid browsers are [{}].'.format(self.browser), ", ".join(self.browsers)) raise RuntimeError(f'Sorry "{self.browser}" browser is not valid, valid browsers are [{", ".join(self.browsers)}].')
if not self.platform: if not self.platform:
self.platform = random.SystemRandom().choice(self.platforms) self.platform = random.SystemRandom().choice(self.platforms)
if self.platform not in self.platforms: if self.platform not in self.platforms:
sys.tracebacklimit = 0 sys.tracebacklimit = 0
raise RuntimeError('Sorry the platform "{}" is not valid, valid platforms are [{)}]'.format(self.platform, ", ".join(self.platforms))) raise RuntimeError(f'Sorry the platform "{self.platform}" is not valid, valid platforms are [{", ".join(self.platforms)}]')
filteredAgents = self.filterAgents(user_agents['user_agents']) filteredAgents = self.filterAgents(user_agents['user_agents'])
@@ -111,7 +111,7 @@ class User_Agent():
if not filteredAgents[self.browser]: if not filteredAgents[self.browser]:
sys.tracebacklimit = 0 sys.tracebacklimit = 0
raise RuntimeError('Sorry "{}" browser was not found with a platform of "{}".'.format(self.browser, self.platform)) raise RuntimeError(f'Sorry "{self.browser}" browser was not found with a platform of "{self.platform}".')
self.cipherSuite = user_agents['cipherSuite'][self.browser] self.cipherSuite = user_agents['cipherSuite'][self.browser]
self.headers = user_agents['headers'][self.browser] self.headers = user_agents['headers'][self.browser]