fix: cloudscraper ver. dev

This commit is contained in:
greko17
2019-12-02 23:59:00 +01:00
parent 4651e0d179
commit d0b02f327d
27 changed files with 27182 additions and 11697 deletions

382
lib/cloudscraper/__init__.py Executable file → Normal file
View File

@@ -1,20 +1,21 @@
## Modded version of cloudscrape 1.1.24
## https://github.com/venomous/cloudscraper
import logging
import re
import OpenSSL
import sys
import ssl
import requests
try:
import copyreg
except ImportError:
import copy_reg as copyreg
from copy import deepcopy
from time import sleep
from collections import OrderedDict
from requests.sessions import Session
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
from .interpreters import JavaScriptInterpreter
from .reCaptcha import reCaptcha
@@ -35,153 +36,210 @@ try:
except ImportError:
from urllib.parse import urlparse
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
__version__ = '1.1.24'
__version__ = '1.2.15'
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# class CipherSuiteAdapter(HTTPAdapter):
#
# def __init__(self, cipherSuite=None, **kwargs):
# self.cipherSuite = cipherSuite
#
# self.ssl_context = create_urllib3_context(
# ssl_version=ssl.PROTOCOL_TLS,
# ciphers=self.cipherSuite
# )
#
# super(CipherSuiteAdapter, self).__init__(**kwargs)
class CipherSuiteAdapter(HTTPAdapter):
def __init__(self, cipherSuite=None, **kwargs):
self.cipherSuite = cipherSuite
__attrs__ = [
'ssl_context',
'max_retries',
'config',
'_pool_connections',
'_pool_maxsize',
'_pool_block'
]
if hasattr(ssl, 'PROTOCOL_TLS'):
self.ssl_context = create_urllib3_context(
ssl_version=getattr(ssl, 'PROTOCOL_TLSv1_3', ssl.PROTOCOL_TLSv1_2),
ciphers=self.cipherSuite
)
else:
self.ssl_context = create_urllib3_context(ssl_version=ssl.PROTOCOL_TLSv1)
def __init__(self, *args, **kwargs):
self.ssl_context = kwargs.pop('ssl_context', None)
self.cipherSuite = kwargs.pop('cipherSuite', None)
if not self.ssl_context:
self.ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
self.ssl_context.set_ciphers(self.cipherSuite)
self.ssl_context.options |= (ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1)
super(CipherSuiteAdapter, self).__init__(**kwargs)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
return super(CipherSuiteAdapter, self).init_poolmanager(*args, **kwargs)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def proxy_manager_for(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
return super(CipherSuiteAdapter, self).proxy_manager_for(*args, **kwargs)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
class CloudScraper(Session):
def __init__(self, *args, **kwargs):
self.allow_brotli = kwargs.pop('allow_brotli', True if 'brotli' in sys.modules.keys() else False)
self.debug = kwargs.pop('debug', False)
self.delay = kwargs.pop('delay', None)
self.interpreter = kwargs.pop('interpreter', 'js2py')
self.cipherSuite = kwargs.pop('cipherSuite', None)
self.interpreter = kwargs.pop('interpreter', 'native')
self.recaptcha = kwargs.pop('recaptcha', {})
self.allow_brotli = kwargs.pop(
'allow_brotli',
True if 'brotli' in sys.modules.keys() else False
)
self.user_agent = User_Agent(
allow_brotli=self.allow_brotli,
browser=kwargs.pop('browser', None)
)
self.cipherSuite = None
self._solveDepthCnt = 0
self.solveDepth = kwargs.pop('solveDepth', 3)
super(CloudScraper, self).__init__(*args, **kwargs)
# pylint: disable=E0203
if 'requests' in self.headers['User-Agent']:
# ------------------------------------------------------------------------------- #
# Set a random User-Agent if no custom User-Agent has been set
self.headers = User_Agent(allow_brotli=self.allow_brotli).headers
# ------------------------------------------------------------------------------- #
self.headers = self.user_agent.headers
self.mount('https://', CipherSuiteAdapter(self.loadCipherSuite()))
self.mount(
'https://',
CipherSuiteAdapter(
cipherSuite=self.loadCipherSuite() if not self.cipherSuite else self.cipherSuite
)
)
##########################################################################################################################################################
# purely to allow us to pickle dump
copyreg.pickle(ssl.SSLContext, lambda obj: (obj.__class__, (obj.protocol,)))
# ------------------------------------------------------------------------------- #
# Allow us to pickle our session back with all variables
# ------------------------------------------------------------------------------- #
def __getstate__(self):
return self.__dict__
# ------------------------------------------------------------------------------- #
# debug the request via the response
# ------------------------------------------------------------------------------- #
@staticmethod
def debugRequest(req):
try:
print(dump.dump_all(req).decode('utf-8'))
except: # noqa
pass
except ValueError as e:
print("Debug Error: {}".format(getattr(e, 'message', e)))
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# Decode Brotli on older versions of urllib3 manually
# ------------------------------------------------------------------------------- #
def decodeBrotli(self, resp):
if requests.packages.urllib3.__version__ < '1.25.1' and resp.headers.get('Content-Encoding') == 'br':
if self.allow_brotli and resp._content:
resp._content = brotli.decompress(resp.content)
else:
logging.warning(
'You\'re running urllib3 {}, Brotli content detected, '
'Which requires manual decompression, '
'But option allow_brotli is set to False, '
'We will not continue to decompress.'.format(requests.packages.urllib3.__version__)
)
return resp
# ------------------------------------------------------------------------------- #
# construct a cipher suite of ciphers the system actually supports
# ------------------------------------------------------------------------------- #
def loadCipherSuite(self):
if self.cipherSuite:
return self.cipherSuite
self.cipherSuite = ''
if hasattr(ssl, 'PROTOCOL_TLS'):
ciphers = [
'TLS13-AES-128-GCM-SHA256',
'TLS13-AES-256-GCM-SHA384',
'TLS13-CHACHA20-POLY1305-SHA256',
'ECDHE-ECDSA-CHACHA20-POLY1305',
'ECDHE-ECDSA-AES128-GCM-SHA256',
'ECDHE-ECDSA-AES128-SHA',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES256-SHA',
'ECDHE-ECDSA-AES256-SHA384',
# Slip in some additional intermediate compatibility ciphers, This should help out users for non Cloudflare based sites.
'ECDHE-RSA-AES128-SHA256',
'ECDHE-RSA-AES256-SHA384',
'ECDHE-RSA-AES256-GCM-SHA384',
'DHE-RSA-AES128-GCM-SHA256',
'DHE-RSA-AES256-GCM-SHA384'
]
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
for cipher in ciphers:
if hasattr(ssl, 'Purpose') and hasattr(ssl.Purpose, 'SERVER_AUTH'):
for cipher in self.user_agent.cipherSuite[:]:
try:
ctx.set_ciphers(cipher)
self.cipherSuite = '{}:{}'.format(self.cipherSuite, cipher).rstrip(':').lstrip(':')
except ssl.SSLError:
pass
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
context.set_ciphers(cipher)
except (OpenSSL.SSL.Error, ssl.SSLError):
self.user_agent.cipherSuite.remove(cipher)
return self.cipherSuite
if self.user_agent.cipherSuite:
self.cipherSuite = ':'.join(self.user_agent.cipherSuite)
return self.cipherSuite
##########################################################################################################################################################
sys.tracebacklimit = 0
raise RuntimeError("The OpenSSL on this system does not meet the minimum cipher requirements.")
# ------------------------------------------------------------------------------- #
# Our hijacker request function
# ------------------------------------------------------------------------------- #
def request(self, method, url, *args, **kwargs):
ourSuper = super(CloudScraper, self)
resp = ourSuper.request(method, url, *args, **kwargs)
# pylint: disable=E0203
if kwargs.get('proxies') and kwargs.get('proxies') != self.proxies:
self.proxies = kwargs.get('proxies')
if requests.packages.urllib3.__version__ < '1.25.1' and resp.headers.get('Content-Encoding') == 'br':
if self.allow_brotli and resp._content:
resp._content = brotli.decompress(resp.content)
else:
logging.warning('Brotli content detected, But option is disabled, we will not continue.')
return resp
resp = self.decodeBrotli(
super(CloudScraper, self).request(method, url, *args, **kwargs)
)
# ------------------------------------------------------------------------------- #
# Debug request
# ------------------------------------------------------------------------------- #
if self.debug:
self.debugRequest(resp)
# Check if Cloudflare anti-bot is on
if self.isChallengeRequest(resp):
if resp.request.method != 'GET':
# Work around if the initial request is not a GET,
# Supersede with a GET then re-request the original METHOD.
self.request('GET', resp.url)
resp = ourSuper.request(method, url, *args, **kwargs)
else:
# Solve Challenge
resp = self.sendChallengeResponse(resp, **kwargs)
if self.is_Challenge_Request(resp):
# ------------------------------------------------------------------------------- #
# Try to solve the challenge and send it back
# ------------------------------------------------------------------------------- #
if self._solveDepthCnt >= self.solveDepth:
sys.tracebacklimit = 0
_ = self._solveDepthCnt
self._solveDepthCnt = 0
raise RuntimeError("!!Loop Protection!! We have tried to solve {} time(s) in a row.".format(_))
self._solveDepthCnt += 1
resp = self.Challenge_Response(resp, **kwargs)
else:
if resp.status_code not in [302, 429, 503]:
self._solveDepthCnt = 0
return resp
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# check if the response contains a valid Cloudflare challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_IUAM_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code in [429, 503]
and re.search(
r'action="/.*?__cf_chl_jschl_tk__=\S+".*?name="jschl_vc"\svalue=.*?',
resp.text,
re.M | re.DOTALL
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains a valid Cloudflare reCaptcha challenge
# ------------------------------------------------------------------------------- #
@@ -203,41 +261,40 @@ class CloudScraper(Session):
return False
@staticmethod
def isChallengeRequest(resp):
if resp.headers.get('Server', '').startswith('cloudflare'):
return (
resp.status_code in [403, 429, 503]
and (
all(s in resp.content for s in [b'jschl_vc', b'jschl_answer'])
or
all(s in resp.content for s in [b'why_captcha', b'/cdn-cgi/l/chk_captcha'])
)
)
# ------------------------------------------------------------------------------- #
# Wrapper for is_reCaptcha_Challenge and is_IUAM_Challenge
# ------------------------------------------------------------------------------- #
def is_Challenge_Request(self, resp):
if self.is_reCaptcha_Challenge(resp) or self.is_IUAM_Challenge(resp):
return True
return False
# ------------------------------------------------------------------------------- #
# Try to solve cloudflare javascript challenge.
# ------------------------------------------------------------------------------- #
@staticmethod
def IUAM_Challenge_Response(body, domain, interpreter):
def IUAM_Challenge_Response(body, url, interpreter):
try:
challengeUUID = re.search(
r'__cf_chl_jschl_tk__=(?P<challengeUUID>\S+)"',
r'id="challenge-form" action="(?P<challengeUUID>\S+)"',
body, re.M | re.DOTALL
).groupdict().get('challengeUUID')
params = OrderedDict(re.findall(r'name="(r|jschl_vc|pass)"\svalue="(.*?)"', body))
).groupdict().get('challengeUUID', '')
payload = OrderedDict(re.findall(r'name="(r|jschl_vc|pass)"\svalue="(.*?)"', body))
except AttributeError:
sys.tracebacklimit = 0
raise RuntimeError("Cloudflare IUAM detected, unfortunately we can't extract the parameters correctly.")
raise RuntimeError(
"Cloudflare IUAM detected, unfortunately we can't extract the parameters correctly."
)
hostParsed = urlparse(url)
try:
params['jschl_answer'] = JavaScriptInterpreter.dynamicImport(
payload['jschl_answer'] = JavaScriptInterpreter.dynamicImport(
interpreter
).solveChallenge(body, domain)
).solveChallenge(body, hostParsed.netloc)
except Exception as e:
raise RuntimeError(
'Unable to parse Cloudflare anti-bots page: {}'.format(
@@ -246,16 +303,23 @@ class CloudScraper(Session):
)
return {
'url': 'https://{}/'.format(domain),
'params': {'__cf_chl_jschl_tk__': challengeUUID},
'data': params
'url': '{}://{}{}'.format(
hostParsed.scheme,
hostParsed.netloc,
challengeUUID
),
'data': payload
}
# ------------------------------------------------------------------------------- #
# Try to solve the reCaptcha challenge via 3rd party.
# ------------------------------------------------------------------------------- #
@staticmethod
def reCaptcha_Challenge_Response(provider, provider_params, body, url):
try:
params = re.search(
r'(name="r"\svalue="(?P<r>\S+)"|).*?__cf_chl_captcha_tk__=(?P<challengeUUID>\S+)".*?'
payload = re.search(
r'(name="r"\svalue="(?P<r>\S+)"|).*?challenge-form" action="(?P<challengeUUID>\S+)".*?'
r'data-ray="(?P<data_ray>\S+)".*?data-sitekey="(?P<site_key>\S+)"',
body, re.M | re.DOTALL
).groupdict()
@@ -265,24 +329,30 @@ class CloudScraper(Session):
"Cloudflare reCaptcha detected, unfortunately we can't extract the parameters correctly."
)
hostParsed = urlparse(url)
return {
'url': url,
'params': {'__cf_chl_captcha_tk__': params.get('challengeUUID')},
'url': '{}://{}{}'.format(
hostParsed.scheme,
hostParsed.netloc,
payload.get('challengeUUID', '')
),
'data': OrderedDict([
('r', ''),
('id', params.get('data_ray')),
('r', payload.get('r', '')),
('id', payload.get('data_ray')),
(
'g-recaptcha-response',
reCaptcha.dynamicImport(
provider.lower()
).solveCaptcha(url, params.get('site_key'), provider_params)
).solveCaptcha(url, payload.get('site_key'), provider_params)
)
])
}
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# Attempt to handle and send the challenge response back to cloudflare
# ------------------------------------------------------------------------------- #
def sendChallengeResponse(self, resp, **original_kwargs):
def Challenge_Response(self, resp, **kwargs):
if self.is_reCaptcha_Challenge(resp):
# ------------------------------------------------------------------------------- #
# double down on the request as some websites are only checking
@@ -290,7 +360,7 @@ class CloudScraper(Session):
# ------------------------------------------------------------------------------- #
resp = self.decodeBrotli(
super(CloudScraper, self).request(resp.request.method, resp.url, **original_kwargs)
super(CloudScraper, self).request(resp.request.method, resp.url, **kwargs)
)
if not self.is_reCaptcha_Challenge(resp):
@@ -346,7 +416,7 @@ class CloudScraper(Session):
submit_url = self.IUAM_Challenge_Response(
resp.text,
urlparse(resp.url).netloc,
resp.url,
self.interpreter
)
@@ -364,33 +434,34 @@ class CloudScraper(Session):
obj[name].update(newValue)
return obj[name]
cloudflare_kwargs = deepcopy(original_kwargs)
cloudflare_kwargs = deepcopy(kwargs)
cloudflare_kwargs['allow_redirects'] = False
cloudflare_kwargs['params'] = updateAttr(cloudflare_kwargs, 'params', submit_url['params'])
cloudflare_kwargs['data'] = updateAttr(cloudflare_kwargs, 'data', submit_url['data'])
cloudflare_kwargs['headers'] = updateAttr(cloudflare_kwargs, 'headers', {'Referer': resp.url})
cloudflare_kwargs['data'] = updateAttr(
cloudflare_kwargs,
'data',
submit_url['data']
)
cloudflare_kwargs['headers'] = updateAttr(
cloudflare_kwargs,
'headers',
{
'Referer': resp.url
}
)
self.request(
return self.request(
'POST',
submit_url['url'],
**cloudflare_kwargs
)
# ------------------------------------------------------------------------------- #
# Request the original query request and return it
# We shouldn't be here.... Re-request the original query and process again....
# ------------------------------------------------------------------------------- #
return self.request(resp.request.method, resp.url, **original_kwargs)
# ------------------------------------------------------------------------------- #
# Request the original query request and return it
# ------------------------------------------------------------------------------- #
# return self.request(resp.request.method, resp.url, **kwargs)
return self.request(resp.request.method, resp.url, **kwargs)
# ------------------------------------------------------------------------------- #
##########################################################################################################################################################
@classmethod
def create_scraper(cls, sess=None, **kwargs):
"""
@@ -399,25 +470,30 @@ class CloudScraper(Session):
scraper = cls(**kwargs)
if sess:
attrs = ['auth', 'cert', 'cookies', 'headers', 'hooks', 'params', 'proxies', 'data']
for attr in attrs:
for attr in ['auth', 'cert', 'cookies', 'headers', 'hooks', 'params', 'proxies', 'data']:
val = getattr(sess, attr, None)
if val:
setattr(scraper, attr, val)
return scraper
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# Functions for integrating cloudscraper with other applications and scripts
# ------------------------------------------------------------------------------- #
@classmethod
def get_tokens(cls, url, **kwargs):
scraper = cls.create_scraper(
debug=kwargs.pop('debug', False),
delay=kwargs.pop('delay', None),
interpreter=kwargs.pop('interpreter', 'js2py'),
allow_brotli=kwargs.pop('allow_brotli', True),
recaptcha=kwargs.pop('recaptcha', {})
**{
field: kwargs.pop(field, None) for field in [
'allow_brotli',
'browser',
'debug',
'delay',
'interpreter',
'recaptcha'
] if field in kwargs
}
)
try:
@@ -436,7 +512,11 @@ class CloudScraper(Session):
cookie_domain = d
break
else:
raise ValueError('Unable to find Cloudflare cookies. Does the site actually have Cloudflare IUAM ("I\'m Under Attack Mode") enabled?')
sys.tracebacklimit = 0
raise RuntimeError(
"Unable to find Cloudflare cookies. Does the site actually "
"have Cloudflare IUAM (I'm Under Attack Mode) enabled?"
)
return (
{
@@ -446,7 +526,7 @@ class CloudScraper(Session):
scraper.headers['User-Agent']
)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
@classmethod
def get_cookie_string(cls, url, **kwargs):
@@ -457,7 +537,7 @@ class CloudScraper(Session):
return '; '.join('='.join(pair) for pair in tokens.items()), user_agent
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
create_scraper = CloudScraper.create_scraper
get_tokens = CloudScraper.get_tokens

63
lib/cloudscraper/interpreters/__init__.py Executable file → Normal file
View File

@@ -1,4 +1,3 @@
import re
import sys
import logging
import abc
@@ -8,20 +7,24 @@ if sys.version_info >= (3, 4):
else:
ABC = abc.ABCMeta('ABC', (), {})
##########################################################################################################################################################
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
interpreters = {}
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
# ------------------------------------------------------------------------------- #
class JavaScriptInterpreter(ABC):
# ------------------------------------------------------------------------------- #
@abc.abstractmethod
def __init__(self, name):
interpreters[name] = self
# ------------------------------------------------------------------------------- #
@classmethod
def dynamicImport(cls, name):
if name not in interpreters:
@@ -35,55 +38,17 @@ class JavaScriptInterpreter(ABC):
return interpreters[name]
# ------------------------------------------------------------------------------- #
@abc.abstractmethod
def eval(self, jsEnv, js):
pass
# ------------------------------------------------------------------------------- #
def solveChallenge(self, body, domain):
try:
js = re.search(
r'setTimeout\(function\(\){\s+(var s,t,o,p,b,r,e,a,k,i,n,g,f.+?\r?\n[\s\S]+?a\.value =.+?)\r?\n',
body
).group(1)
except Exception:
raise ValueError('Unable to identify Cloudflare IUAM Javascript on website. {}'.format(BUG_REPORT))
js = re.sub(r'\s{2,}', ' ', js, flags=re.MULTILINE | re.DOTALL).replace('\'; 121\'', '')
js += '\na.value;'
jsEnv = '''
String.prototype.italics=function(str) {{return "<i>" + this + "</i>";}};
var document = {{
createElement: function () {{
return {{ firstChild: {{ href: "https://{domain}/" }} }}
}},
getElementById: function () {{
return {{"innerHTML": "{innerHTML}"}};
}}
}};
'''
try:
innerHTML = re.search(
r'<div(?: [^<>]*)? id="([^<>]*?)">([^<>]*?)</div>',
body,
re.MULTILINE | re.DOTALL
)
innerHTML = innerHTML.group(2) if innerHTML else ''
except: # noqa
logging.error('Error extracting Cloudflare IUAM Javascript. {}'.format(BUG_REPORT))
raise
try:
result = self.eval(
re.sub(r'\s{2,}', ' ', jsEnv.format(domain=domain, innerHTML=innerHTML), flags=re.MULTILINE | re.DOTALL),
js
)
float(result)
return float(self.eval(body, domain))
except Exception:
logging.error('Error executing Cloudflare IUAM Javascript. {}'.format(BUG_REPORT))
raise
return result

198
lib/cloudscraper/interpreters/chakracore.py Executable file → Normal file
View File

@@ -1,95 +1,103 @@
from __future__ import absolute_import
import os
import sys
import ctypes.util
from ctypes import c_void_p, c_size_t, byref, create_string_buffer, CDLL
from . import JavaScriptInterpreter
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('chakracore')
def eval(self, jsEnv, js):
chakraCoreLibrary = None
# check current working directory.
for _libraryFile in ['libChakraCore.so', 'libChakraCore.dylib', 'ChakraCore.dll']:
if os.path.isfile(os.path.join(os.getcwd(), _libraryFile)):
chakraCoreLibrary = os.path.join(os.getcwd(), _libraryFile)
continue
if not chakraCoreLibrary:
chakraCoreLibrary = ctypes.util.find_library('ChakraCore')
if not chakraCoreLibrary:
sys.tracebacklimit = 0
raise RuntimeError(
'ChakraCore library not found in current path or any of your system library paths, '
'please download from https://www.github.com/VeNoMouS/cloudscraper/tree/ChakraCore/, '
'or https://github.com/Microsoft/ChakraCore/'
)
try:
chakraCore = CDLL(chakraCoreLibrary)
except OSError:
sys.tracebacklimit = 0
raise RuntimeError('There was an error loading the ChakraCore library {}'.format(chakraCoreLibrary))
if sys.platform != 'win32':
chakraCore.DllMain(0, 1, 0)
chakraCore.DllMain(0, 2, 0)
script = create_string_buffer('{}{}'.format(jsEnv, js).encode('utf-16'))
runtime = c_void_p()
chakraCore.JsCreateRuntime(0, 0, byref(runtime))
context = c_void_p()
chakraCore.JsCreateContext(runtime, byref(context))
chakraCore.JsSetCurrentContext(context)
fname = c_void_p()
chakraCore.JsCreateString(
'iuam-challenge.js',
len('iuam-challenge.js'),
byref(fname)
)
scriptSource = c_void_p()
chakraCore.JsCreateExternalArrayBuffer(
script,
len(script),
0,
0,
byref(scriptSource)
)
jsResult = c_void_p()
chakraCore.JsRun(scriptSource, 0, fname, 0x02, byref(jsResult))
resultJSString = c_void_p()
chakraCore.JsConvertValueToString(jsResult, byref(resultJSString))
stringLength = c_size_t()
chakraCore.JsCopyString(resultJSString, 0, 0, byref(stringLength))
resultSTR = create_string_buffer(stringLength.value + 1)
chakraCore.JsCopyString(
resultJSString,
byref(resultSTR),
stringLength.value + 1,
0
)
chakraCore.JsDisposeRuntime(runtime)
return resultSTR.value
ChallengeInterpreter()
from __future__ import absolute_import
import os
import sys
import ctypes.util
from ctypes import c_void_p, c_size_t, byref, create_string_buffer, CDLL
from . import JavaScriptInterpreter
from .encapsulated import template
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
# ------------------------------------------------------------------------------- #
def __init__(self):
super(ChallengeInterpreter, self).__init__('chakracore')
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
chakraCoreLibrary = None
# check current working directory.
for _libraryFile in ['libChakraCore.so', 'libChakraCore.dylib', 'ChakraCore.dll']:
if os.path.isfile(os.path.join(os.getcwd(), _libraryFile)):
chakraCoreLibrary = os.path.join(os.getcwd(), _libraryFile)
continue
if not chakraCoreLibrary:
chakraCoreLibrary = ctypes.util.find_library('ChakraCore')
if not chakraCoreLibrary:
sys.tracebacklimit = 0
raise RuntimeError(
'ChakraCore library not found in current path or any of your system library paths, '
'please download from https://www.github.com/VeNoMouS/cloudscraper/tree/ChakraCore/, '
'or https://github.com/Microsoft/ChakraCore/'
)
try:
chakraCore = CDLL(chakraCoreLibrary)
except OSError:
sys.tracebacklimit = 0
raise RuntimeError('There was an error loading the ChakraCore library {}'.format(chakraCoreLibrary))
if sys.platform != 'win32':
chakraCore.DllMain(0, 1, 0)
chakraCore.DllMain(0, 2, 0)
script = create_string_buffer(template(body, domain).encode('utf-16'))
runtime = c_void_p()
chakraCore.JsCreateRuntime(0, 0, byref(runtime))
context = c_void_p()
chakraCore.JsCreateContext(runtime, byref(context))
chakraCore.JsSetCurrentContext(context)
fname = c_void_p()
chakraCore.JsCreateString(
'iuam-challenge.js',
len('iuam-challenge.js'),
byref(fname)
)
scriptSource = c_void_p()
chakraCore.JsCreateExternalArrayBuffer(
script,
len(script),
0,
0,
byref(scriptSource)
)
jsResult = c_void_p()
chakraCore.JsRun(scriptSource, 0, fname, 0x02, byref(jsResult))
resultJSString = c_void_p()
chakraCore.JsConvertValueToString(jsResult, byref(resultJSString))
stringLength = c_size_t()
chakraCore.JsCopyString(resultJSString, 0, 0, byref(stringLength))
resultSTR = create_string_buffer(stringLength.value + 1)
chakraCore.JsCopyString(
resultJSString,
byref(resultSTR),
stringLength.value + 1,
0
)
chakraCore.JsDisposeRuntime(runtime)
return resultSTR.value
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()

View File

@@ -0,0 +1,58 @@
import logging
import re
# ------------------------------------------------------------------------------- #
def template(body, domain):
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
try:
js = re.search(
r'setTimeout\(function\(\){\s+(var s,t,o,p,b,r,e,a,k,i,n,g,f.+?\r?\n[\s\S]+?a\.value =.+?)\r?\n',
body
).group(1)
except Exception:
raise ValueError('Unable to identify Cloudflare IUAM Javascript on website. {}'.format(BUG_REPORT))
js = re.sub(r'\s{2,}', ' ', js, flags=re.MULTILINE | re.DOTALL).replace('\'; 121\'', '')
js += '\na.value;'
jsEnv = '''
String.prototype.italics=function(str) {{return "<i>" + this + "</i>";}};
var document = {{
createElement: function () {{
return {{ firstChild: {{ href: "https://{domain}/" }} }}
}},
getElementById: function () {{
return {{"innerHTML": "{innerHTML}"}};
}}
}};
'''
try:
innerHTML = re.search(
r'<div(?: [^<>]*)? id="([^<>]*?)">([^<>]*?)</div>',
body,
re.MULTILINE | re.DOTALL
)
innerHTML = innerHTML.group(2) if innerHTML else ''
except: # noqa
logging.error('Error extracting Cloudflare IUAM Javascript. {}'.format(BUG_REPORT))
raise
return '{}{}'.format(
re.sub(
r'\s{2,}',
' ',
jsEnv.format(
domain=domain,
innerHTML=innerHTML
),
re.MULTILINE | re.DOTALL
),
js
)
# ------------------------------------------------------------------------------- #

79
lib/cloudscraper/interpreters/js2py.py Executable file → Normal file
View File

@@ -1,35 +1,44 @@
from __future__ import absolute_import
import js2py
import logging
import base64
from . import JavaScriptInterpreter
from .jsunfuck import jsunfuck
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('js2py')
def eval(self, jsEnv, js):
### blocca lo script
# from core.support import dbg; dbg()
if js2py.eval_js('(+(+!+[]+[+!+[]]+(!![]+[])[!+[]+!+[]+!+[]]+[!+[]+!+[]]+[+[]])+[])[+!+[]]') == '1':
logging.warning('WARNING - Please upgrade your js2py https://github.com/PiotrDabkowski/Js2Py, applying work around for the meantime.')
js = jsunfuck(js)
def atob(s):
return base64.b64decode('{}'.format(s)).decode('utf-8')
js2py.disable_pyimport()
context = js2py.EvalJs({'atob': atob})
result = context.eval('{}{}'.format(jsEnv, js))
return result
ChallengeInterpreter()
from __future__ import absolute_import
import js2py
import logging
import base64
from . import JavaScriptInterpreter
from .encapsulated import template
from .jsunfuck import jsunfuck
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
# ------------------------------------------------------------------------------- #
def __init__(self):
super(ChallengeInterpreter, self).__init__('js2py')
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
jsPayload = template(body, domain)
if js2py.eval_js('(+(+!+[]+[+!+[]]+(!![]+[])[!+[]+!+[]+!+[]]+[!+[]+!+[]]+[+[]])+[])[+!+[]]') == '1':
logging.warning('WARNING - Please upgrade your js2py https://github.com/PiotrDabkowski/Js2Py, applying work around for the meantime.')
jsPayload = jsunfuck(jsPayload)
def atob(s):
return base64.b64decode('{}'.format(s)).decode('utf-8')
js2py.disable_pyimport()
context = js2py.EvalJs({'atob': atob})
result = context.eval(jsPayload)
return result
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()

194
lib/cloudscraper/interpreters/jsunfuck.py Executable file → Normal file
View File

@@ -1,97 +1,97 @@
MAPPING = {
'a': '(false+"")[1]',
'b': '([]["entries"]()+"")[2]',
'c': '([]["fill"]+"")[3]',
'd': '(undefined+"")[2]',
'e': '(true+"")[3]',
'f': '(false+"")[0]',
'g': '(false+[0]+String)[20]',
'h': '(+(101))["to"+String["name"]](21)[1]',
'i': '([false]+undefined)[10]',
'j': '([]["entries"]()+"")[3]',
'k': '(+(20))["to"+String["name"]](21)',
'l': '(false+"")[2]',
'm': '(Number+"")[11]',
'n': '(undefined+"")[1]',
'o': '(true+[]["fill"])[10]',
'p': '(+(211))["to"+String["name"]](31)[1]',
'q': '(+(212))["to"+String["name"]](31)[1]',
'r': '(true+"")[1]',
's': '(false+"")[3]',
't': '(true+"")[0]',
'u': '(undefined+"")[0]',
'v': '(+(31))["to"+String["name"]](32)',
'w': '(+(32))["to"+String["name"]](33)',
'x': '(+(101))["to"+String["name"]](34)[1]',
'y': '(NaN+[Infinity])[10]',
'z': '(+(35))["to"+String["name"]](36)',
'A': '(+[]+Array)[10]',
'B': '(+[]+Boolean)[10]',
'C': 'Function("return escape")()(("")["italics"]())[2]',
'D': 'Function("return escape")()([]["fill"])["slice"]("-1")',
'E': '(RegExp+"")[12]',
'F': '(+[]+Function)[10]',
'G': '(false+Function("return Date")()())[30]',
'I': '(Infinity+"")[0]',
'M': '(true+Function("return Date")()())[30]',
'N': '(NaN+"")[0]',
'O': '(NaN+Function("return{}")())[11]',
'R': '(+[]+RegExp)[10]',
'S': '(+[]+String)[10]',
'T': '(NaN+Function("return Date")()())[30]',
'U': '(NaN+Function("return{}")()["to"+String["name"]]["call"]())[11]',
' ': '(NaN+[]["fill"])[11]',
'"': '("")["fontcolor"]()[12]',
'%': 'Function("return escape")()([]["fill"])[21]',
'&': '("")["link"](0+")[10]',
'(': '(undefined+[]["fill"])[22]',
')': '([0]+false+[]["fill"])[20]',
'+': '(+(+!+[]+(!+[]+[])[!+[]+!+[]+!+[]]+[+!+[]]+[+[]]+[+[]])+[])[2]',
',': '([]["slice"]["call"](false+"")+"")[1]',
'-': '(+(.+[0000000001])+"")[2]',
'.': '(+(+!+[]+[+!+[]]+(!![]+[])[!+[]+!+[]+!+[]]+[!+[]+!+[]]+[+[]])+[])[+!+[]]',
'/': '(false+[0])["italics"]()[10]',
':': '(RegExp()+"")[3]',
';': '("")["link"](")[14]',
'<': '("")["italics"]()[0]',
'=': '("")["fontcolor"]()[11]',
'>': '("")["italics"]()[2]',
'?': '(RegExp()+"")[2]',
'[': '([]["entries"]()+"")[0]',
']': '([]["entries"]()+"")[22]',
'{': '(true+[]["fill"])[20]',
'}': '([]["fill"]+"")["slice"]("-1")'
}
SIMPLE = {
'false': '![]',
'true': '!![]',
'undefined': '[][[]]',
'NaN': '+[![]]',
'Infinity': '+(+!+[]+(!+[]+[])[!+[]+!+[]+!+[]]+[+!+[]]+[+[]]+[+[]]+[+[]])' # +"1e1000"
}
CONSTRUCTORS = {
'Array': '[]',
'Number': '(+[])',
'String': '([]+[])',
'Boolean': '(![])',
'Function': '[]["fill"]',
'RegExp': 'Function("return/"+false+"/")()'
}
def jsunfuck(jsfuckString):
for key in sorted(MAPPING, key=lambda k: len(MAPPING[k]), reverse=True):
if MAPPING.get(key) in jsfuckString:
jsfuckString = jsfuckString.replace(MAPPING.get(key), '"{}"'.format(key))
for key in sorted(SIMPLE, key=lambda k: len(SIMPLE[k]), reverse=True):
if SIMPLE.get(key) in jsfuckString:
jsfuckString = jsfuckString.replace(SIMPLE.get(key), '{}'.format(key))
# for key in sorted(CONSTRUCTORS, key=lambda k: len(CONSTRUCTORS[k]), reverse=True):
# if CONSTRUCTORS.get(key) in jsfuckString:
# jsfuckString = jsfuckString.replace(CONSTRUCTORS.get(key), '{}'.format(key))
return jsfuckString
MAPPING = {
'a': '(false+"")[1]',
'b': '([]["entries"]()+"")[2]',
'c': '([]["fill"]+"")[3]',
'd': '(undefined+"")[2]',
'e': '(true+"")[3]',
'f': '(false+"")[0]',
'g': '(false+[0]+String)[20]',
'h': '(+(101))["to"+String["name"]](21)[1]',
'i': '([false]+undefined)[10]',
'j': '([]["entries"]()+"")[3]',
'k': '(+(20))["to"+String["name"]](21)',
'l': '(false+"")[2]',
'm': '(Number+"")[11]',
'n': '(undefined+"")[1]',
'o': '(true+[]["fill"])[10]',
'p': '(+(211))["to"+String["name"]](31)[1]',
'q': '(+(212))["to"+String["name"]](31)[1]',
'r': '(true+"")[1]',
's': '(false+"")[3]',
't': '(true+"")[0]',
'u': '(undefined+"")[0]',
'v': '(+(31))["to"+String["name"]](32)',
'w': '(+(32))["to"+String["name"]](33)',
'x': '(+(101))["to"+String["name"]](34)[1]',
'y': '(NaN+[Infinity])[10]',
'z': '(+(35))["to"+String["name"]](36)',
'A': '(+[]+Array)[10]',
'B': '(+[]+Boolean)[10]',
'C': 'Function("return escape")()(("")["italics"]())[2]',
'D': 'Function("return escape")()([]["fill"])["slice"]("-1")',
'E': '(RegExp+"")[12]',
'F': '(+[]+Function)[10]',
'G': '(false+Function("return Date")()())[30]',
'I': '(Infinity+"")[0]',
'M': '(true+Function("return Date")()())[30]',
'N': '(NaN+"")[0]',
'O': '(NaN+Function("return{}")())[11]',
'R': '(+[]+RegExp)[10]',
'S': '(+[]+String)[10]',
'T': '(NaN+Function("return Date")()())[30]',
'U': '(NaN+Function("return{}")()["to"+String["name"]]["call"]())[11]',
' ': '(NaN+[]["fill"])[11]',
'"': '("")["fontcolor"]()[12]',
'%': 'Function("return escape")()([]["fill"])[21]',
'&': '("")["link"](0+")[10]',
'(': '(undefined+[]["fill"])[22]',
')': '([0]+false+[]["fill"])[20]',
'+': '(+(+!+[]+(!+[]+[])[!+[]+!+[]+!+[]]+[+!+[]]+[+[]]+[+[]])+[])[2]',
',': '([]["slice"]["call"](false+"")+"")[1]',
'-': '(+(.+[0000000001])+"")[2]',
'.': '(+(+!+[]+[+!+[]]+(!![]+[])[!+[]+!+[]+!+[]]+[!+[]+!+[]]+[+[]])+[])[+!+[]]',
'/': '(false+[0])["italics"]()[10]',
':': '(RegExp()+"")[3]',
';': '("")["link"](")[14]',
'<': '("")["italics"]()[0]',
'=': '("")["fontcolor"]()[11]',
'>': '("")["italics"]()[2]',
'?': '(RegExp()+"")[2]',
'[': '([]["entries"]()+"")[0]',
']': '([]["entries"]()+"")[22]',
'{': '(true+[]["fill"])[20]',
'}': '([]["fill"]+"")["slice"]("-1")'
}
SIMPLE = {
'false': '![]',
'true': '!![]',
'undefined': '[][[]]',
'NaN': '+[![]]',
'Infinity': '+(+!+[]+(!+[]+[])[!+[]+!+[]+!+[]]+[+!+[]]+[+[]]+[+[]]+[+[]])' # +"1e1000"
}
CONSTRUCTORS = {
'Array': '[]',
'Number': '(+[])',
'String': '([]+[])',
'Boolean': '(![])',
'Function': '[]["fill"]',
'RegExp': 'Function("return/"+false+"/")()'
}
def jsunfuck(jsfuckString):
for key in sorted(MAPPING, key=lambda k: len(MAPPING[k]), reverse=True):
if MAPPING.get(key) in jsfuckString:
jsfuckString = jsfuckString.replace(MAPPING.get(key), '"{}"'.format(key))
for key in sorted(SIMPLE, key=lambda k: len(SIMPLE[k]), reverse=True):
if SIMPLE.get(key) in jsfuckString:
jsfuckString = jsfuckString.replace(SIMPLE.get(key), '{}'.format(key))
# for key in sorted(CONSTRUCTORS, key=lambda k: len(CONSTRUCTORS[k]), reverse=True):
# if CONSTRUCTORS.get(key) in jsfuckString:
# jsfuckString = jsfuckString.replace(CONSTRUCTORS.get(key), '{}'.format(key))
return jsfuckString

View File

@@ -0,0 +1,120 @@
from __future__ import absolute_import
import re
import operator as op
from . import JavaScriptInterpreter
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('native')
def eval(self, body, domain):
# ------------------------------------------------------------------------------- #
operators = {
'+': op.add,
'-': op.sub,
'*': op.mul,
'/': op.truediv
}
# ------------------------------------------------------------------------------- #
def jsfuckToNumber(jsFuck):
t = ''
split_numbers = re.compile(r'-?\d+').findall
for i in re.findall(
r'\((?:\d|\+|\-)*\)',
jsFuck.replace('!+[]', '1').replace('!![]', '1').replace('[]', '0').lstrip('+').replace('(+', '(')
):
t = '{}{}'.format(t, sum(int(x) for x in split_numbers(i)))
return int(t)
# ------------------------------------------------------------------------------- #
def divisorMath(payload, needle, domain):
jsfuckMath = payload.split('/')
if needle in jsfuckMath[1]:
expression = re.findall(r"^(.*?)(.)\(function", jsfuckMath[1])[0]
expression_value = operators[expression[1]](
float(jsfuckToNumber(expression[0])),
float(ord(domain[jsfuckToNumber(jsfuckMath[1][
jsfuckMath[1].find('"("+p+")")}') + len('"("+p+")")}'):-2
])]))
)
else:
expression_value = jsfuckToNumber(jsfuckMath[1])
expression_value = jsfuckToNumber(jsfuckMath[0]) / float(expression_value)
return expression_value
# ------------------------------------------------------------------------------- #
def challengeSolve(body, domain):
jschl_answer = 0
jsfuckChallenge = re.search(
r"setTimeout\(function\(\){\s+var.*?f,\s*(?P<variable>\w+).*?:(?P<init>\S+)};"
r".*?\('challenge-form'\);\s+;(?P<challenge>.*?a\.value)"
r"(?:.*id=\"cf-dn-.*?>(?P<k>\S+)<)?",
body,
re.DOTALL | re.MULTILINE
).groupdict()
jsfuckChallenge['challenge'] = re.finditer(
r'{}.*?([+\-*/])=(.*?);(?=a\.value|{})'.format(
jsfuckChallenge['variable'],
jsfuckChallenge['variable']
),
jsfuckChallenge['challenge']
)
# ------------------------------------------------------------------------------- #
if '/' in jsfuckChallenge['init']:
val = jsfuckChallenge['init'].split('/')
jschl_answer = jsfuckToNumber(val[0]) / float(jsfuckToNumber(val[1]))
else:
jschl_answer = jsfuckToNumber(jsfuckChallenge['init'])
# ------------------------------------------------------------------------------- #
for expressionMatch in jsfuckChallenge['challenge']:
oper, expression = expressionMatch.groups()
if '/' in expression:
expression_value = divisorMath(expression, 'function(p)', domain)
else:
if 'Element' in expression:
expression_value = divisorMath(jsfuckChallenge['k'], '"("+p+")")}', domain)
else:
expression_value = jsfuckToNumber(expression)
jschl_answer = operators[oper](jschl_answer, expression_value)
# ------------------------------------------------------------------------------- #
if not jsfuckChallenge['k'] and '+ t.length' in body:
jschl_answer += len(domain)
# ------------------------------------------------------------------------------- #
return '{0:.10f}'.format(jschl_answer)
# ------------------------------------------------------------------------------- #
return challengeSolve(body, domain)
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()

93
lib/cloudscraper/interpreters/nodejs.py Executable file → Normal file
View File

@@ -1,46 +1,47 @@
import base64
import logging
import subprocess
from . import JavaScriptInterpreter
##########################################################################################################################################################
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('nodejs')
def eval(self, jsEnv, js):
try:
js = 'var atob = function(str) {return Buffer.from(str, "base64").toString("binary");};' \
'var challenge = atob("%s");' \
'var context = {atob: atob};' \
'var options = {filename: "iuam-challenge.js", timeout: 4000};' \
'var answer = require("vm").runInNewContext(challenge, context, options);' \
'process.stdout.write(String(answer));' \
% base64.b64encode('{}{}'.format(jsEnv, js).encode('UTF-8')).decode('ascii')
return subprocess.check_output(['node', '-e', js])
except OSError as e:
if e.errno == 2:
raise EnvironmentError(
'Missing Node.js runtime. Node is required and must be in the PATH (check with `node -v`). Your Node binary may be called `nodejs` rather than `node`, '
'in which case you may need to run `apt-get install nodejs-legacy` on some Debian-based systems. (Please read the cloudscraper'
' README\'s Dependencies section: https://github.com/VeNoMouS/cloudscraper#dependencies.'
)
raise
except Exception:
logging.error('Error executing Cloudflare IUAM Javascript. %s' % BUG_REPORT)
raise
pass
ChallengeInterpreter()
import base64
import subprocess
import sys
from . import JavaScriptInterpreter
from .encapsulated import template
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
# ------------------------------------------------------------------------------- #
def __init__(self):
super(ChallengeInterpreter, self).__init__('nodejs')
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
try:
js = 'var atob = function(str) {return Buffer.from(str, "base64").toString("binary");};' \
'var challenge = atob("%s");' \
'var context = {atob: atob};' \
'var options = {filename: "iuam-challenge.js", timeout: 4000};' \
'var answer = require("vm").runInNewContext(challenge, context, options);' \
'process.stdout.write(String(answer));' \
% base64.b64encode(template(body, domain).encode('UTF-8')).decode('ascii')
return subprocess.check_output(['node', '-e', js])
except OSError as e:
if e.errno == 2:
raise EnvironmentError(
'Missing Node.js runtime. Node is required and must be in the PATH (check with `node -v`). Your Node binary may be called `nodejs` rather than `node`, '
'in which case you may need to run `apt-get install nodejs-legacy` on some Debian-based systems. (Please read the cloudscraper'
' README\'s Dependencies section: https://github.com/VeNoMouS/cloudscraper#dependencies.'
)
raise
except Exception:
sys.tracebacklimit = 0
raise RuntimeError('Error executing Cloudflare IUAM Javascript in nodejs')
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()

59
lib/cloudscraper/interpreters/v8.py Executable file → Normal file
View File

@@ -1,26 +1,33 @@
from __future__ import absolute_import
import sys
try:
import v8eval
except ImportError:
sys.tracebacklimit = 0
raise RuntimeError('Please install the python module v8eval either via pip or download it from https://github.com/sony/v8eval')
from . import JavaScriptInterpreter
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('v8')
def eval(self, jsEnv, js):
try:
return v8eval.V8().eval('{}{}'.format(jsEnv, js))
except: # noqa
RuntimeError('We encountered an error running the V8 Engine.')
ChallengeInterpreter()
from __future__ import absolute_import
import sys
try:
import v8eval
except ImportError:
sys.tracebacklimit = 0
raise RuntimeError('Please install the python module v8eval either via pip or download it from https://github.com/sony/v8eval')
from . import JavaScriptInterpreter
from .encapsulated import template
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('v8')
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
try:
return v8eval.V8().eval(template(body, domain))
except (TypeError, v8eval.V8Error):
RuntimeError('We encountered an error running the V8 Engine.')
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()

464
lib/cloudscraper/old/__init__.py Executable file
View File

@@ -0,0 +1,464 @@
## Modded version of cloudscrape 1.1.24
## https://github.com/venomous/cloudscraper
import logging
import re
import sys
import ssl
import requests
from copy import deepcopy
from time import sleep
from collections import OrderedDict
from requests.sessions import Session
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
from .interpreters import JavaScriptInterpreter
from .reCaptcha import reCaptcha
from .user_agent import User_Agent
try:
from requests_toolbelt.utils import dump
except ImportError:
pass
try:
import brotli
except ImportError:
pass
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
##########################################################################################################################################################
__version__ = '1.1.24'
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
# class CipherSuiteAdapter(HTTPAdapter):
#
# def __init__(self, cipherSuite=None, **kwargs):
# self.cipherSuite = cipherSuite
#
# self.ssl_context = create_urllib3_context(
# ssl_version=ssl.PROTOCOL_TLS,
# ciphers=self.cipherSuite
# )
#
# super(CipherSuiteAdapter, self).__init__(**kwargs)
class CipherSuiteAdapter(HTTPAdapter):
def __init__(self, cipherSuite=None, **kwargs):
self.cipherSuite = cipherSuite
if hasattr(ssl, 'PROTOCOL_TLS'):
self.ssl_context = create_urllib3_context(
ssl_version=getattr(ssl, 'PROTOCOL_TLSv1_3', ssl.PROTOCOL_TLSv1_2),
ciphers=self.cipherSuite
)
else:
self.ssl_context = create_urllib3_context(ssl_version=ssl.PROTOCOL_TLSv1)
super(CipherSuiteAdapter, self).__init__(**kwargs)
##########################################################################################################################################################
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
return super(CipherSuiteAdapter, self).init_poolmanager(*args, **kwargs)
##########################################################################################################################################################
def proxy_manager_for(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
return super(CipherSuiteAdapter, self).proxy_manager_for(*args, **kwargs)
##########################################################################################################################################################
class CloudScraper(Session):
def __init__(self, *args, **kwargs):
self.allow_brotli = kwargs.pop('allow_brotli', True if 'brotli' in sys.modules.keys() else False)
self.debug = kwargs.pop('debug', False)
self.delay = kwargs.pop('delay', None)
self.interpreter = kwargs.pop('interpreter', 'js2py')
self.recaptcha = kwargs.pop('recaptcha', {})
self.cipherSuite = None
super(CloudScraper, self).__init__(*args, **kwargs)
if 'requests' in self.headers['User-Agent']:
# Set a random User-Agent if no custom User-Agent has been set
self.headers = User_Agent(allow_brotli=self.allow_brotli).headers
self.mount('https://', CipherSuiteAdapter(self.loadCipherSuite()))
##########################################################################################################################################################
@staticmethod
def debugRequest(req):
try:
print(dump.dump_all(req).decode('utf-8'))
except: # noqa
pass
##########################################################################################################################################################
def loadCipherSuite(self):
if self.cipherSuite:
return self.cipherSuite
self.cipherSuite = ''
if hasattr(ssl, 'PROTOCOL_TLS'):
ciphers = [
'TLS13-AES-128-GCM-SHA256',
'TLS13-AES-256-GCM-SHA384',
'TLS13-CHACHA20-POLY1305-SHA256',
'ECDHE-ECDSA-CHACHA20-POLY1305',
'ECDHE-ECDSA-AES128-GCM-SHA256',
'ECDHE-ECDSA-AES128-SHA',
'ECDHE-ECDSA-AES128-SHA256',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-ECDSA-AES256-SHA',
'ECDHE-ECDSA-AES256-SHA384',
# Slip in some additional intermediate compatibility ciphers, This should help out users for non Cloudflare based sites.
'ECDHE-RSA-AES128-SHA256',
'ECDHE-RSA-AES256-SHA384',
'ECDHE-RSA-AES256-GCM-SHA384',
'DHE-RSA-AES128-GCM-SHA256',
'DHE-RSA-AES256-GCM-SHA384'
]
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
for cipher in ciphers:
try:
ctx.set_ciphers(cipher)
self.cipherSuite = '{}:{}'.format(self.cipherSuite, cipher).rstrip(':').lstrip(':')
except ssl.SSLError:
pass
return self.cipherSuite
##########################################################################################################################################################
def request(self, method, url, *args, **kwargs):
ourSuper = super(CloudScraper, self)
resp = ourSuper.request(method, url, *args, **kwargs)
if requests.packages.urllib3.__version__ < '1.25.1' and resp.headers.get('Content-Encoding') == 'br':
if self.allow_brotli and resp._content:
resp._content = brotli.decompress(resp.content)
else:
logging.warning('Brotli content detected, But option is disabled, we will not continue.')
return resp
# Debug request
if self.debug:
self.debugRequest(resp)
# Check if Cloudflare anti-bot is on
if self.isChallengeRequest(resp):
if resp.request.method != 'GET':
# Work around if the initial request is not a GET,
# Supersede with a GET then re-request the original METHOD.
self.request('GET', resp.url)
resp = ourSuper.request(method, url, *args, **kwargs)
else:
# Solve Challenge
resp = self.sendChallengeResponse(resp, **kwargs)
return resp
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# check if the response contains a valid Cloudflare reCaptcha challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_reCaptcha_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code == 403
and re.search(
r'action="/.*?__cf_chl_captcha_tk__=\S+".*?data\-sitekey=.*?',
resp.text,
re.M | re.DOTALL
)
)
except AttributeError:
pass
return False
@staticmethod
def isChallengeRequest(resp):
if resp.headers.get('Server', '').startswith('cloudflare'):
return (
resp.status_code in [403, 429, 503]
and (
all(s in resp.content for s in [b'jschl_vc', b'jschl_answer'])
or
all(s in resp.content for s in [b'why_captcha', b'/cdn-cgi/l/chk_captcha'])
)
)
return False
# ------------------------------------------------------------------------------- #
# Try to solve cloudflare javascript challenge.
# ------------------------------------------------------------------------------- #
@staticmethod
def IUAM_Challenge_Response(body, domain, interpreter):
try:
challengeUUID = re.search(
r'__cf_chl_jschl_tk__=(?P<challengeUUID>\S+)"',
body, re.M | re.DOTALL
).groupdict().get('challengeUUID')
params = OrderedDict(re.findall(r'name="(r|jschl_vc|pass)"\svalue="(.*?)"', body))
except AttributeError:
sys.tracebacklimit = 0
raise RuntimeError("Cloudflare IUAM detected, unfortunately we can't extract the parameters correctly.")
try:
params['jschl_answer'] = JavaScriptInterpreter.dynamicImport(
interpreter
).solveChallenge(body, domain)
except Exception as e:
raise RuntimeError(
'Unable to parse Cloudflare anti-bots page: {}'.format(
getattr(e, 'message', e)
)
)
return {
'url': 'https://{}/'.format(domain),
'params': {'__cf_chl_jschl_tk__': challengeUUID},
'data': params
}
@staticmethod
def reCaptcha_Challenge_Response(provider, provider_params, body, url):
try:
params = re.search(
r'(name="r"\svalue="(?P<r>\S+)"|).*?__cf_chl_captcha_tk__=(?P<challengeUUID>\S+)".*?'
r'data-ray="(?P<data_ray>\S+)".*?data-sitekey="(?P<site_key>\S+)"',
body, re.M | re.DOTALL
).groupdict()
except (AttributeError):
sys.tracebacklimit = 0
raise RuntimeError(
"Cloudflare reCaptcha detected, unfortunately we can't extract the parameters correctly."
)
return {
'url': url,
'params': {'__cf_chl_captcha_tk__': params.get('challengeUUID')},
'data': OrderedDict([
('r', ''),
('id', params.get('data_ray')),
(
'g-recaptcha-response',
reCaptcha.dynamicImport(
provider.lower()
).solveCaptcha(url, params.get('site_key'), provider_params)
)
])
}
##########################################################################################################################################################
def sendChallengeResponse(self, resp, **original_kwargs):
if self.is_reCaptcha_Challenge(resp):
# ------------------------------------------------------------------------------- #
# double down on the request as some websites are only checking
# if cfuid is populated before issuing reCaptcha.
# ------------------------------------------------------------------------------- #
resp = self.decodeBrotli(
super(CloudScraper, self).request(resp.request.method, resp.url, **original_kwargs)
)
if not self.is_reCaptcha_Challenge(resp):
return resp
# ------------------------------------------------------------------------------- #
# if no reCaptcha provider raise a runtime error.
# ------------------------------------------------------------------------------- #
if not self.recaptcha or not isinstance(self.recaptcha, dict) or not self.recaptcha.get('provider'):
sys.tracebacklimit = 0
raise RuntimeError(
"Cloudflare reCaptcha detected, unfortunately you haven't loaded an anti reCaptcha provider "
"correctly via the 'recaptcha' parameter."
)
# ------------------------------------------------------------------------------- #
# if provider is return_response, return the response without doing anything.
# ------------------------------------------------------------------------------- #
if self.recaptcha.get('provider') == 'return_response':
return resp
self.recaptcha['proxies'] = self.proxies
submit_url = self.reCaptcha_Challenge_Response(
self.recaptcha.get('provider'),
self.recaptcha,
resp.text,
resp.url
)
else:
# ------------------------------------------------------------------------------- #
# Cloudflare requires a delay before solving the challenge
# ------------------------------------------------------------------------------- #
if not self.delay:
try:
delay = float(
re.search(
r'submit\(\);\r?\n\s*},\s*([0-9]+)',
resp.text
).group(1)
) / float(1000)
if isinstance(delay, (int, float)):
self.delay = delay
except (AttributeError, ValueError):
sys.tracebacklimit = 0
raise RuntimeError("Cloudflare IUAM possibility malformed, issue extracing delay value.")
sleep(self.delay)
# ------------------------------------------------------------------------------- #
submit_url = self.IUAM_Challenge_Response(
resp.text,
urlparse(resp.url).netloc,
self.interpreter
)
# ------------------------------------------------------------------------------- #
# Send the Challenge Response back to Cloudflare
# ------------------------------------------------------------------------------- #
if submit_url:
def updateAttr(obj, name, newValue):
try:
obj[name].update(newValue)
return obj[name]
except (AttributeError, KeyError):
obj[name] = {}
obj[name].update(newValue)
return obj[name]
cloudflare_kwargs = deepcopy(original_kwargs)
cloudflare_kwargs['allow_redirects'] = False
cloudflare_kwargs['params'] = updateAttr(cloudflare_kwargs, 'params', submit_url['params'])
cloudflare_kwargs['data'] = updateAttr(cloudflare_kwargs, 'data', submit_url['data'])
cloudflare_kwargs['headers'] = updateAttr(cloudflare_kwargs, 'headers', {'Referer': resp.url})
self.request(
'POST',
submit_url['url'],
**cloudflare_kwargs
)
# ------------------------------------------------------------------------------- #
# Request the original query request and return it
# ------------------------------------------------------------------------------- #
return self.request(resp.request.method, resp.url, **original_kwargs)
# ------------------------------------------------------------------------------- #
# Request the original query request and return it
# ------------------------------------------------------------------------------- #
# return self.request(resp.request.method, resp.url, **kwargs)
# ------------------------------------------------------------------------------- #
##########################################################################################################################################################
@classmethod
def create_scraper(cls, sess=None, **kwargs):
"""
Convenience function for creating a ready-to-go CloudScraper object.
"""
scraper = cls(**kwargs)
if sess:
attrs = ['auth', 'cert', 'cookies', 'headers', 'hooks', 'params', 'proxies', 'data']
for attr in attrs:
val = getattr(sess, attr, None)
if val:
setattr(scraper, attr, val)
return scraper
##########################################################################################################################################################
# Functions for integrating cloudscraper with other applications and scripts
@classmethod
def get_tokens(cls, url, **kwargs):
scraper = cls.create_scraper(
debug=kwargs.pop('debug', False),
delay=kwargs.pop('delay', None),
interpreter=kwargs.pop('interpreter', 'js2py'),
allow_brotli=kwargs.pop('allow_brotli', True),
recaptcha=kwargs.pop('recaptcha', {})
)
try:
resp = scraper.get(url, **kwargs)
resp.raise_for_status()
except Exception:
logging.error('"{}" returned an error. Could not collect tokens.'.format(url))
raise
domain = urlparse(resp.url).netloc
# noinspection PyUnusedLocal
cookie_domain = None
for d in scraper.cookies.list_domains():
if d.startswith('.') and d in ('.{}'.format(domain)):
cookie_domain = d
break
else:
raise ValueError('Unable to find Cloudflare cookies. Does the site actually have Cloudflare IUAM ("I\'m Under Attack Mode") enabled?')
return (
{
'__cfduid': scraper.cookies.get('__cfduid', '', domain=cookie_domain),
'cf_clearance': scraper.cookies.get('cf_clearance', '', domain=cookie_domain)
},
scraper.headers['User-Agent']
)
##########################################################################################################################################################
@classmethod
def get_cookie_string(cls, url, **kwargs):
"""
Convenience function for building a Cookie HTTP header value.
"""
tokens, user_agent = cls.get_tokens(url, **kwargs)
return '; '.join('='.join(pair) for pair in tokens.items()), user_agent
##########################################################################################################################################################
create_scraper = CloudScraper.create_scraper
get_tokens = CloudScraper.get_tokens
get_cookie_string = CloudScraper.get_cookie_string

View File

@@ -0,0 +1,89 @@
import re
import sys
import logging
import abc
if sys.version_info >= (3, 4):
ABC = abc.ABC # noqa
else:
ABC = abc.ABCMeta('ABC', (), {})
##########################################################################################################################################################
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
interpreters = {}
class JavaScriptInterpreter(ABC):
@abc.abstractmethod
def __init__(self, name):
interpreters[name] = self
@classmethod
def dynamicImport(cls, name):
if name not in interpreters:
try:
__import__('{}.{}'.format(cls.__module__, name))
if not isinstance(interpreters.get(name), JavaScriptInterpreter):
raise ImportError('The interpreter was not initialized.')
except ImportError:
logging.error('Unable to load {} interpreter'.format(name))
raise
return interpreters[name]
@abc.abstractmethod
def eval(self, jsEnv, js):
pass
def solveChallenge(self, body, domain):
try:
js = re.search(
r'setTimeout\(function\(\){\s+(var s,t,o,p,b,r,e,a,k,i,n,g,f.+?\r?\n[\s\S]+?a\.value =.+?)\r?\n',
body
).group(1)
except Exception:
raise ValueError('Unable to identify Cloudflare IUAM Javascript on website. {}'.format(BUG_REPORT))
js = re.sub(r'\s{2,}', ' ', js, flags=re.MULTILINE | re.DOTALL).replace('\'; 121\'', '')
js += '\na.value;'
jsEnv = '''
String.prototype.italics=function(str) {{return "<i>" + this + "</i>";}};
var document = {{
createElement: function () {{
return {{ firstChild: {{ href: "https://{domain}/" }} }}
}},
getElementById: function () {{
return {{"innerHTML": "{innerHTML}"}};
}}
}};
'''
try:
innerHTML = re.search(
r'<div(?: [^<>]*)? id="([^<>]*?)">([^<>]*?)</div>',
body,
re.MULTILINE | re.DOTALL
)
innerHTML = innerHTML.group(2) if innerHTML else ''
except: # noqa
logging.error('Error extracting Cloudflare IUAM Javascript. {}'.format(BUG_REPORT))
raise
try:
result = self.eval(
re.sub(r'\s{2,}', ' ', jsEnv.format(domain=domain, innerHTML=innerHTML), flags=re.MULTILINE | re.DOTALL),
js
)
float(result)
except Exception:
logging.error('Error executing Cloudflare IUAM Javascript. {}'.format(BUG_REPORT))
raise
return result

View File

@@ -0,0 +1,95 @@
from __future__ import absolute_import
import os
import sys
import ctypes.util
from ctypes import c_void_p, c_size_t, byref, create_string_buffer, CDLL
from . import JavaScriptInterpreter
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('chakracore')
def eval(self, jsEnv, js):
chakraCoreLibrary = None
# check current working directory.
for _libraryFile in ['libChakraCore.so', 'libChakraCore.dylib', 'ChakraCore.dll']:
if os.path.isfile(os.path.join(os.getcwd(), _libraryFile)):
chakraCoreLibrary = os.path.join(os.getcwd(), _libraryFile)
continue
if not chakraCoreLibrary:
chakraCoreLibrary = ctypes.util.find_library('ChakraCore')
if not chakraCoreLibrary:
sys.tracebacklimit = 0
raise RuntimeError(
'ChakraCore library not found in current path or any of your system library paths, '
'please download from https://www.github.com/VeNoMouS/cloudscraper/tree/ChakraCore/, '
'or https://github.com/Microsoft/ChakraCore/'
)
try:
chakraCore = CDLL(chakraCoreLibrary)
except OSError:
sys.tracebacklimit = 0
raise RuntimeError('There was an error loading the ChakraCore library {}'.format(chakraCoreLibrary))
if sys.platform != 'win32':
chakraCore.DllMain(0, 1, 0)
chakraCore.DllMain(0, 2, 0)
script = create_string_buffer('{}{}'.format(jsEnv, js).encode('utf-16'))
runtime = c_void_p()
chakraCore.JsCreateRuntime(0, 0, byref(runtime))
context = c_void_p()
chakraCore.JsCreateContext(runtime, byref(context))
chakraCore.JsSetCurrentContext(context)
fname = c_void_p()
chakraCore.JsCreateString(
'iuam-challenge.js',
len('iuam-challenge.js'),
byref(fname)
)
scriptSource = c_void_p()
chakraCore.JsCreateExternalArrayBuffer(
script,
len(script),
0,
0,
byref(scriptSource)
)
jsResult = c_void_p()
chakraCore.JsRun(scriptSource, 0, fname, 0x02, byref(jsResult))
resultJSString = c_void_p()
chakraCore.JsConvertValueToString(jsResult, byref(resultJSString))
stringLength = c_size_t()
chakraCore.JsCopyString(resultJSString, 0, 0, byref(stringLength))
resultSTR = create_string_buffer(stringLength.value + 1)
chakraCore.JsCopyString(
resultJSString,
byref(resultSTR),
stringLength.value + 1,
0
)
chakraCore.JsDisposeRuntime(runtime)
return resultSTR.value
ChallengeInterpreter()

View File

@@ -0,0 +1,35 @@
from __future__ import absolute_import
import js2py
import logging
import base64
from . import JavaScriptInterpreter
from .jsunfuck import jsunfuck
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('js2py')
def eval(self, jsEnv, js):
### blocca lo script
# from core.support import dbg; dbg()
if js2py.eval_js('(+(+!+[]+[+!+[]]+(!![]+[])[!+[]+!+[]+!+[]]+[!+[]+!+[]]+[+[]])+[])[+!+[]]') == '1':
logging.warning('WARNING - Please upgrade your js2py https://github.com/PiotrDabkowski/Js2Py, applying work around for the meantime.')
js = jsunfuck(js)
def atob(s):
return base64.b64decode('{}'.format(s)).decode('utf-8')
js2py.disable_pyimport()
context = js2py.EvalJs({'atob': atob})
result = context.eval('{}{}'.format(jsEnv, js))
return result
ChallengeInterpreter()

View File

@@ -0,0 +1,97 @@
MAPPING = {
'a': '(false+"")[1]',
'b': '([]["entries"]()+"")[2]',
'c': '([]["fill"]+"")[3]',
'd': '(undefined+"")[2]',
'e': '(true+"")[3]',
'f': '(false+"")[0]',
'g': '(false+[0]+String)[20]',
'h': '(+(101))["to"+String["name"]](21)[1]',
'i': '([false]+undefined)[10]',
'j': '([]["entries"]()+"")[3]',
'k': '(+(20))["to"+String["name"]](21)',
'l': '(false+"")[2]',
'm': '(Number+"")[11]',
'n': '(undefined+"")[1]',
'o': '(true+[]["fill"])[10]',
'p': '(+(211))["to"+String["name"]](31)[1]',
'q': '(+(212))["to"+String["name"]](31)[1]',
'r': '(true+"")[1]',
's': '(false+"")[3]',
't': '(true+"")[0]',
'u': '(undefined+"")[0]',
'v': '(+(31))["to"+String["name"]](32)',
'w': '(+(32))["to"+String["name"]](33)',
'x': '(+(101))["to"+String["name"]](34)[1]',
'y': '(NaN+[Infinity])[10]',
'z': '(+(35))["to"+String["name"]](36)',
'A': '(+[]+Array)[10]',
'B': '(+[]+Boolean)[10]',
'C': 'Function("return escape")()(("")["italics"]())[2]',
'D': 'Function("return escape")()([]["fill"])["slice"]("-1")',
'E': '(RegExp+"")[12]',
'F': '(+[]+Function)[10]',
'G': '(false+Function("return Date")()())[30]',
'I': '(Infinity+"")[0]',
'M': '(true+Function("return Date")()())[30]',
'N': '(NaN+"")[0]',
'O': '(NaN+Function("return{}")())[11]',
'R': '(+[]+RegExp)[10]',
'S': '(+[]+String)[10]',
'T': '(NaN+Function("return Date")()())[30]',
'U': '(NaN+Function("return{}")()["to"+String["name"]]["call"]())[11]',
' ': '(NaN+[]["fill"])[11]',
'"': '("")["fontcolor"]()[12]',
'%': 'Function("return escape")()([]["fill"])[21]',
'&': '("")["link"](0+")[10]',
'(': '(undefined+[]["fill"])[22]',
')': '([0]+false+[]["fill"])[20]',
'+': '(+(+!+[]+(!+[]+[])[!+[]+!+[]+!+[]]+[+!+[]]+[+[]]+[+[]])+[])[2]',
',': '([]["slice"]["call"](false+"")+"")[1]',
'-': '(+(.+[0000000001])+"")[2]',
'.': '(+(+!+[]+[+!+[]]+(!![]+[])[!+[]+!+[]+!+[]]+[!+[]+!+[]]+[+[]])+[])[+!+[]]',
'/': '(false+[0])["italics"]()[10]',
':': '(RegExp()+"")[3]',
';': '("")["link"](")[14]',
'<': '("")["italics"]()[0]',
'=': '("")["fontcolor"]()[11]',
'>': '("")["italics"]()[2]',
'?': '(RegExp()+"")[2]',
'[': '([]["entries"]()+"")[0]',
']': '([]["entries"]()+"")[22]',
'{': '(true+[]["fill"])[20]',
'}': '([]["fill"]+"")["slice"]("-1")'
}
SIMPLE = {
'false': '![]',
'true': '!![]',
'undefined': '[][[]]',
'NaN': '+[![]]',
'Infinity': '+(+!+[]+(!+[]+[])[!+[]+!+[]+!+[]]+[+!+[]]+[+[]]+[+[]]+[+[]])' # +"1e1000"
}
CONSTRUCTORS = {
'Array': '[]',
'Number': '(+[])',
'String': '([]+[])',
'Boolean': '(![])',
'Function': '[]["fill"]',
'RegExp': 'Function("return/"+false+"/")()'
}
def jsunfuck(jsfuckString):
for key in sorted(MAPPING, key=lambda k: len(MAPPING[k]), reverse=True):
if MAPPING.get(key) in jsfuckString:
jsfuckString = jsfuckString.replace(MAPPING.get(key), '"{}"'.format(key))
for key in sorted(SIMPLE, key=lambda k: len(SIMPLE[k]), reverse=True):
if SIMPLE.get(key) in jsfuckString:
jsfuckString = jsfuckString.replace(SIMPLE.get(key), '{}'.format(key))
# for key in sorted(CONSTRUCTORS, key=lambda k: len(CONSTRUCTORS[k]), reverse=True):
# if CONSTRUCTORS.get(key) in jsfuckString:
# jsfuckString = jsfuckString.replace(CONSTRUCTORS.get(key), '{}'.format(key))
return jsfuckString

View File

@@ -0,0 +1,46 @@
import base64
import logging
import subprocess
from . import JavaScriptInterpreter
##########################################################################################################################################################
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('nodejs')
def eval(self, jsEnv, js):
try:
js = 'var atob = function(str) {return Buffer.from(str, "base64").toString("binary");};' \
'var challenge = atob("%s");' \
'var context = {atob: atob};' \
'var options = {filename: "iuam-challenge.js", timeout: 4000};' \
'var answer = require("vm").runInNewContext(challenge, context, options);' \
'process.stdout.write(String(answer));' \
% base64.b64encode('{}{}'.format(jsEnv, js).encode('UTF-8')).decode('ascii')
return subprocess.check_output(['node', '-e', js])
except OSError as e:
if e.errno == 2:
raise EnvironmentError(
'Missing Node.js runtime. Node is required and must be in the PATH (check with `node -v`). Your Node binary may be called `nodejs` rather than `node`, '
'in which case you may need to run `apt-get install nodejs-legacy` on some Debian-based systems. (Please read the cloudscraper'
' README\'s Dependencies section: https://github.com/VeNoMouS/cloudscraper#dependencies.'
)
raise
except Exception:
logging.error('Error executing Cloudflare IUAM Javascript. %s' % BUG_REPORT)
raise
pass
ChallengeInterpreter()

View File

@@ -0,0 +1,26 @@
from __future__ import absolute_import
import sys
try:
import v8eval
except ImportError:
sys.tracebacklimit = 0
raise RuntimeError('Please install the python module v8eval either via pip or download it from https://github.com/sony/v8eval')
from . import JavaScriptInterpreter
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('v8')
def eval(self, jsEnv, js):
try:
return v8eval.V8().eval('{}{}'.format(jsEnv, js))
except: # noqa
RuntimeError('We encountered an error running the V8 Engine.')
ChallengeInterpreter()

View File

@@ -0,0 +1,48 @@
import re
import sys
import logging
import abc
if sys.version_info >= (3, 4):
ABC = abc.ABC # noqa
else:
ABC = abc.ABCMeta('ABC', (), {})
##########################################################################################################################################################
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
captchaSolvers = {}
class reCaptcha(ABC):
@abc.abstractmethod
def __init__(self, name):
captchaSolvers[name] = self
@classmethod
def dynamicImport(cls, name):
if name not in captchaSolvers:
try:
__import__('{}.{}'.format(cls.__module__, name))
if not isinstance(captchaSolvers.get(name), reCaptcha):
raise ImportError('The anti reCaptcha provider was not initialized.')
except ImportError:
logging.error("Unable to load {} anti reCaptcha provider".format(name))
raise
return captchaSolvers[name]
@abc.abstractmethod
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
pass
def solveCaptcha(self, ret, reCaptchaParams):
try:
site_key = re.search('data-sitekey="(.+?)"', ret.text).group(1)
except Exception as e:
raise ValueError("Unable to parse Cloudflare\'s reCaptcha variable 'data-sitekey': {} {}".format(e.message, BUG_REPORT))
return self.getCaptchaAnswer(ret.url, site_key, reCaptchaParams)

View File

@@ -0,0 +1,42 @@
from __future__ import absolute_import
import sys
try:
from python_anticaptcha import AnticaptchaClient, NoCaptchaTaskProxylessTask, NoCaptchaTask, Proxy
except ImportError:
sys.tracebacklimit = 0
raise RuntimeError("Please install the python module 'python_anticaptcha' via pip or download it https://github.com/ad-m/python-anticaptcha")
from . import reCaptcha
class captchaSolver(reCaptcha):
def __init__(self):
super(captchaSolver, self).__init__('anticaptcha')
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
if not reCaptchaParams.get('api_key'):
raise ValueError("reCaptcha provider 'anticaptcha' was not provided an 'api_key' parameter.")
client = AnticaptchaClient(reCaptchaParams.get('api_key'))
if reCaptchaParams.get('proxy', False) and reCaptchaParams.get('proxies'):
client.session.proxies = reCaptchaParams.get('proxies')
task = NoCaptchaTask(
site_url,
site_key,
proxy=Proxy.parse_url(
reCaptchaParams.get('proxies').get('https')
)
)
else:
task = NoCaptchaTaskProxylessTask(site_url, site_key)
job = client.createTask(task)
job.join()
return job.get_solution_response()
captchaSolver()

View File

@@ -0,0 +1,198 @@
from __future__ import absolute_import
import json
import requests
try:
import polling
except ImportError:
import sys
sys.tracebacklimit = 0
raise RuntimeError("Please install the python module 'polling' via pip or download it from https://github.com/justiniso/polling/")
from . import reCaptcha
class captchaSolver(reCaptcha):
def __init__(self):
super(captchaSolver, self).__init__('deathbycaptcha')
self.host = 'http://api.dbcapi.me/api'
self.session = requests.Session()
##########################################################################################################################################################
def checkErrorStatus(self, response):
errors = dict(
[
(400, "DeathByCaptcha: 400 Bad Request"),
(403, "DeathByCaptcha: 403 Forbidden - Invalid credentails or insufficient credits."),
# (500, "DeathByCaptcha: 500 Internal Server Error."),
(503, "DeathByCaptcha: 503 Service Temporarily Unavailable.")
]
)
if response.status_code in errors:
raise RuntimeError(errors.get(response.status_code))
##########################################################################################################################################################
def login(self, username, password):
self.username = username
self.password = password
def _checkRequest(response):
if response.status_code == 200:
if response.json().get('is_banned'):
raise RuntimeError('DeathByCaptcha: Your account is banned.')
if response.json().get('balanace') == 0:
raise RuntimeError('DeathByCaptcha: insufficient credits.')
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.post(
'{}/user'.format(self.host),
headers={'Accept': 'application/json'},
data={
'username': self.username,
'password': self.password
}
),
check_success=_checkRequest,
step=10,
timeout=120
)
self.debugRequest(response)
##########################################################################################################################################################
def reportJob(self, jobID):
if not jobID:
raise RuntimeError("DeathByCaptcha: Error bad job id to report failed reCaptcha.")
def _checkRequest(response):
if response.status_code == 200:
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.post(
'{}/captcha/{}/report'.format(self.host, jobID),
headers={'Accept': 'application/json'},
data={
'username': self.username,
'password': self.password
}
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return True
else:
raise RuntimeError("DeathByCaptcha: Error report failed reCaptcha.")
##########################################################################################################################################################
def requestJob(self, jobID):
if not jobID:
raise RuntimeError("DeathByCaptcha: Error bad job id to request reCaptcha.")
def _checkRequest(response):
if response.status_code in [200, 303] and response.json().get('text'):
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.get(
'{}/captcha/{}'.format(self.host, jobID),
headers={'Accept': 'application/json'}
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return response.json().get('text')
else:
raise RuntimeError("DeathByCaptcha: Error failed to solve reCaptcha.")
##########################################################################################################################################################
def requestSolve(self, site_url, site_key):
def _checkRequest(response):
if response.status_code in [200, 303] and response.json().get("is_correct") and response.json().get('captcha'):
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.post(
'{}/captcha'.format(self.host),
headers={'Accept': 'application/json'},
data={
'username': self.username,
'password': self.password,
'type': '4',
'token_params': json.dumps({
'googlekey': site_key,
'pageurl': site_url
})
},
allow_redirects=False
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return response.json().get('captcha')
else:
raise RuntimeError('DeathByCaptcha: Error no job id was returned.')
##########################################################################################################################################################
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
jobID = None
for param in ['username', 'password']:
if not reCaptchaParams.get(param):
raise ValueError("DeathByCaptcha: Missing '{}' parameter.".format(param))
setattr(self, param, reCaptchaParams.get(param))
if reCaptchaParams.get('proxy'):
self.session.proxies = reCaptchaParams.get('proxies')
try:
jobID = self.requestSolve(site_url, site_key)
return self.requestJob(jobID)
except polling.TimeoutException:
try:
if jobID:
self.reportJob(jobID)
except polling.TimeoutException:
raise RuntimeError("DeathByCaptcha: reCaptcha solve took to long and also failed reporting the job.")
raise RuntimeError("DeathByCaptcha: reCaptcha solve took to long to execute, aborting.")
captchaSolver()

View File

@@ -0,0 +1,47 @@
import os
import json
import random
import logging
from collections import OrderedDict
##########################################################################################################################################################
class User_Agent():
##########################################################################################################################################################
def __init__(self, *args, **kwargs):
self.headers = None
self.loadUserAgent(*args, **kwargs)
##########################################################################################################################################################
def loadUserAgent(self, *args, **kwargs):
browser = kwargs.pop('browser', None)
user_agents = json.load(
open(os.path.join(os.path.dirname(__file__), 'browsers.json'), 'r'),
object_pairs_hook=OrderedDict
)
if browser and not user_agents.get(browser):
logging.error('Sorry "{}" browser User-Agent was not found.'.format(browser))
raise
if not browser:
browser = random.SystemRandom().choice(list(user_agents))
user_agent_version = random.SystemRandom().choice(list(user_agents.get(browser).get('releases')))
if user_agents.get(browser).get('releases').get(user_agent_version).get('headers'):
self.headers = user_agents.get(browser).get('releases').get(user_agent_version).get('headers')
else:
self.headers = user_agents.get(browser).get('default_headers')
self.headers['User-Agent'] = random.SystemRandom().choice(user_agents.get(browser).get('releases').get(user_agent_version).get('User-Agent'))
if not kwargs.get('allow_brotli', False):
if 'br' in self.headers['Accept-Encoding']:
self.headers['Accept-Encoding'] = ','.join([encoding for encoding in self.headers['Accept-Encoding'].split(',') if encoding.strip() != 'br']).strip()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,206 @@
from __future__ import absolute_import
import requests
try:
import polling
except ImportError:
import sys
sys.tracebacklimit = 0
raise RuntimeError("Please install the python module 'polling' via pip or download it from https://github.com/justiniso/polling/")
from . import reCaptcha
class captchaSolver(reCaptcha):
def __init__(self):
super(captchaSolver, self).__init__('2captcha')
self.host = 'https://2captcha.com'
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response, request_type):
if response.status_code in [500, 502]:
raise RuntimeError('2Captcha: Server Side Error {}'.format(response.status_code))
errors = {
'in.php': {
"ERROR_WRONG_USER_KEY": "You've provided api_key parameter value is in incorrect format, it should contain 32 symbols.",
"ERROR_KEY_DOES_NOT_EXIST": "The api_key you've provided does not exists.",
"ERROR_ZERO_BALANCE": "You don't have sufficient funds on your account.",
"ERROR_PAGEURL": "pageurl parameter is missing in your request.",
"ERROR_NO_SLOT_AVAILABLE":
"No Slots Available.\nYou can receive this error in two cases:\n"
"1. If you solve ReCaptcha: the queue of your captchas that are not distributed to workers is too long. "
"Queue limit changes dynamically and depends on total amount of captchas awaiting solution and usually it's between 50 and 100 captchas.\n"
"2. If you solve Normal Captcha: your maximum rate for normal captchas is lower than current rate on the server."
"You can change your maximum rate in your account's settings.",
"ERROR_IP_NOT_ALLOWED": "The request is sent from the IP that is not on the list of your allowed IPs.",
"IP_BANNED": "Your IP address is banned due to many frequent attempts to access the server using wrong authorization keys.",
"ERROR_BAD_TOKEN_OR_PAGEURL":
"You can get this error code when sending ReCaptcha V2. "
"That happens if your request contains invalid pair of googlekey and pageurl. "
"The common reason for that is that ReCaptcha is loaded inside an iframe hosted on another domain/subdomain.",
"ERROR_GOOGLEKEY":
"You can get this error code when sending ReCaptcha V2. "
"That means that sitekey value provided in your request is incorrect: it's blank or malformed.",
"MAX_USER_TURN": "You made more than 60 requests within 3 seconds.Your account is banned for 10 seconds. Ban will be lifted automatically."
},
'res.php': {
"ERROR_CAPTCHA_UNSOLVABLE":
"We are unable to solve your captcha - three of our workers were unable solve it "
"or we didn't get an answer within 90 seconds (300 seconds for ReCaptcha V2). "
"We will not charge you for that request.",
"ERROR_WRONG_USER_KEY": "You've provided api_key parameter value in incorrect format, it should contain 32 symbols.",
"ERROR_KEY_DOES_NOT_EXIST": "The api_key you've provided does not exists.",
"ERROR_WRONG_ID_FORMAT": "You've provided captcha ID in wrong format. The ID can contain numbers only.",
"ERROR_WRONG_CAPTCHA_ID": "You've provided incorrect captcha ID.",
"ERROR_BAD_DUPLICATES":
"Error is returned when 100% accuracy feature is enabled. "
"The error means that max numbers of tries is reached but min number of matches not found.",
"REPORT_NOT_RECORDED": "Error is returned to your complain request if you already complained lots of correctly solved captchas.",
"ERROR_IP_ADDRES":
"You can receive this error code when registering a pingback (callback) IP or domain."
"That happes if your request is coming from an IP address that doesn't match the IP address of your pingback IP or domain.",
"ERROR_TOKEN_EXPIRED": "You can receive this error code when sending GeeTest. That error means that challenge value you provided is expired.",
"ERROR_EMPTY_ACTION": "Action parameter is missing or no value is provided for action parameter."
}
}
if response.json().get('status') is False and response.json().get('request') in errors.get(request_type):
raise RuntimeError('{} {}'.format(response.json().get('request'), errors.get(request_type).get(response.json().get('request'))))
# ------------------------------------------------------------------------------- #
def reportJob(self, jobID):
if not jobID:
raise RuntimeError("2Captcha: Error bad job id to request reCaptcha.")
def _checkRequest(response):
if response.status_code in [200, 303] and response.json().get('status') == 1:
return response
self.checkErrorStatus(response, 'res.php')
return None
response = polling.poll(
lambda: self.session.get(
'{}/res.php'.format(self.host),
params={
'key': self.api_key,
'action': 'reportbad',
'id': jobID,
'json': '1'
}
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return True
else:
raise RuntimeError("2Captcha: Error - Failed to report bad reCaptcha solve.")
# ------------------------------------------------------------------------------- #
def requestJob(self, jobID):
if not jobID:
raise RuntimeError("2Captcha: Error bad job id to request reCaptcha.")
def _checkRequest(response):
if response.status_code in [200, 303] and response.json().get('status') == 1:
return response
self.checkErrorStatus(response, 'res.php')
return None
response = polling.poll(
lambda: self.session.get(
'{}/res.php'.format(self.host),
params={
'key': self.api_key,
'action': 'get',
'id': jobID,
'json': '1'
}
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json().get('request')
else:
raise RuntimeError("2Captcha: Error failed to solve reCaptcha.")
# ------------------------------------------------------------------------------- #
def requestSolve(self, site_url, site_key):
def _checkRequest(response):
if response.status_code in [200, 303] and response.json().get("status") == 1 and response.json().get('request'):
return response
self.checkErrorStatus(response, 'in.php')
return None
response = polling.poll(
lambda: self.session.post(
'{}/in.php'.format(self.host),
data={
'key': self.api_key,
'method': 'userrecaptcha',
'googlekey': site_key,
'pageurl': site_url,
'json': '1',
'soft_id': '5507698'
},
allow_redirects=False
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json().get('request')
else:
raise RuntimeError('2Captcha: Error no job id was returned.')
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
jobID = None
if not reCaptchaParams.get('api_key'):
raise ValueError("2Captcha: Missing api_key parameter.")
self.api_key = reCaptchaParams.get('api_key')
if reCaptchaParams.get('proxy'):
self.session.proxies = reCaptchaParams.get('proxies')
try:
jobID = self.requestSolve(site_url, site_key)
return self.requestJob(jobID)
except polling.TimeoutException:
try:
if jobID:
self.reportJob(jobID)
except polling.TimeoutException:
raise RuntimeError("2Captcha: reCaptcha solve took to long and also failed reporting the job.")
raise RuntimeError("2Captcha: reCaptcha solve took to long to execute, aborting.")
# ------------------------------------------------------------------------------- #
captchaSolver()

22
lib/cloudscraper/reCaptcha/__init__.py Executable file → Normal file
View File

@@ -1,4 +1,3 @@
import re
import sys
import logging
import abc
@@ -8,20 +7,20 @@ if sys.version_info >= (3, 4):
else:
ABC = abc.ABCMeta('ABC', (), {})
##########################################################################################################################################################
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
captchaSolvers = {}
# ------------------------------------------------------------------------------- #
class reCaptcha(ABC):
@abc.abstractmethod
def __init__(self, name):
captchaSolvers[name] = self
# ------------------------------------------------------------------------------- #
@classmethod
def dynamicImport(cls, name):
if name not in captchaSolvers:
@@ -35,14 +34,13 @@ class reCaptcha(ABC):
return captchaSolvers[name]
# ------------------------------------------------------------------------------- #
@abc.abstractmethod
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
pass
def solveCaptcha(self, ret, reCaptchaParams):
try:
site_key = re.search('data-sitekey="(.+?)"', ret.text).group(1)
except Exception as e:
raise ValueError("Unable to parse Cloudflare\'s reCaptcha variable 'data-sitekey': {} {}".format(e.message, BUG_REPORT))
# ------------------------------------------------------------------------------- #
return self.getCaptchaAnswer(ret.url, site_key, reCaptchaParams)
def solveCaptcha(self, site_url, site_key, reCaptchaParams):
return self.getCaptchaAnswer(site_url, site_key, reCaptchaParams)

80
lib/cloudscraper/reCaptcha/anticaptcha.py Executable file → Normal file
View File

@@ -1,42 +1,38 @@
from __future__ import absolute_import
import sys
try:
from python_anticaptcha import AnticaptchaClient, NoCaptchaTaskProxylessTask, NoCaptchaTask, Proxy
except ImportError:
sys.tracebacklimit = 0
raise RuntimeError("Please install the python module 'python_anticaptcha' via pip or download it https://github.com/ad-m/python-anticaptcha")
from . import reCaptcha
class captchaSolver(reCaptcha):
def __init__(self):
super(captchaSolver, self).__init__('anticaptcha')
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
if not reCaptchaParams.get('api_key'):
raise ValueError("reCaptcha provider 'anticaptcha' was not provided an 'api_key' parameter.")
client = AnticaptchaClient(reCaptchaParams.get('api_key'))
if reCaptchaParams.get('proxy', False) and reCaptchaParams.get('proxies'):
client.session.proxies = reCaptchaParams.get('proxies')
task = NoCaptchaTask(
site_url,
site_key,
proxy=Proxy.parse_url(
reCaptchaParams.get('proxies').get('https')
)
)
else:
task = NoCaptchaTaskProxylessTask(site_url, site_key)
job = client.createTask(task)
job.join()
return job.get_solution_response()
captchaSolver()
from __future__ import absolute_import
import sys
try:
from python_anticaptcha import AnticaptchaClient, NoCaptchaTaskProxylessTask
except ImportError:
sys.tracebacklimit = 0
raise RuntimeError("Please install the python module 'python_anticaptcha' via pip or download it from https://github.com/ad-m/python-anticaptcha")
from . import reCaptcha
class captchaSolver(reCaptcha):
def __init__(self):
super(captchaSolver, self).__init__('anticaptcha')
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
if not reCaptchaParams.get('api_key'):
raise ValueError("reCaptcha provider 'anticaptcha' was not provided an 'api_key' parameter.")
client = AnticaptchaClient(reCaptchaParams.get('api_key'))
if reCaptchaParams.get('proxy'):
client.session.proxies = reCaptchaParams.get('proxies')
task = NoCaptchaTaskProxylessTask(site_url, site_key)
if not hasattr(client, 'createTaskSmee'):
sys.tracebacklimit = 0
raise RuntimeError("Please upgrade 'python_anticaptcha' via pip or download it from https://github.com/ad-m/python-anticaptcha")
job = client.createTaskSmee(task)
return job.get_solution_response()
captchaSolver()

17
lib/cloudscraper/reCaptcha/deathbycaptcha.py Executable file → Normal file
View File

@@ -20,9 +20,10 @@ class captchaSolver(reCaptcha):
self.host = 'http://api.dbcapi.me/api'
self.session = requests.Session()
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def checkErrorStatus(self, response):
@staticmethod
def checkErrorStatus(response):
errors = dict(
[
(400, "DeathByCaptcha: 400 Bad Request"),
@@ -35,7 +36,7 @@ class captchaSolver(reCaptcha):
if response.status_code in errors:
raise RuntimeError(errors.get(response.status_code))
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def login(self, username, password):
self.username = username
@@ -71,7 +72,7 @@ class captchaSolver(reCaptcha):
self.debugRequest(response)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def reportJob(self, jobID):
if not jobID:
@@ -104,7 +105,7 @@ class captchaSolver(reCaptcha):
else:
raise RuntimeError("DeathByCaptcha: Error report failed reCaptcha.")
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def requestJob(self, jobID):
if not jobID:
@@ -133,7 +134,7 @@ class captchaSolver(reCaptcha):
else:
raise RuntimeError("DeathByCaptcha: Error failed to solve reCaptcha.")
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def requestSolve(self, site_url, site_key):
def _checkRequest(response):
@@ -169,7 +170,7 @@ class captchaSolver(reCaptcha):
else:
raise RuntimeError('DeathByCaptcha: Error no job id was returned.')
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, site_url, site_key, reCaptchaParams):
jobID = None
@@ -195,4 +196,6 @@ class captchaSolver(reCaptcha):
raise RuntimeError("DeathByCaptcha: reCaptcha solve took to long to execute, aborting.")
# ------------------------------------------------------------------------------- #
captchaSolver()

142
lib/cloudscraper/user_agent/__init__.py Executable file → Normal file
View File

@@ -1,47 +1,95 @@
import os
import json
import random
import logging
from collections import OrderedDict
##########################################################################################################################################################
class User_Agent():
##########################################################################################################################################################
def __init__(self, *args, **kwargs):
self.headers = None
self.loadUserAgent(*args, **kwargs)
##########################################################################################################################################################
def loadUserAgent(self, *args, **kwargs):
browser = kwargs.pop('browser', None)
user_agents = json.load(
open(os.path.join(os.path.dirname(__file__), 'browsers.json'), 'r'),
object_pairs_hook=OrderedDict
)
if browser and not user_agents.get(browser):
logging.error('Sorry "{}" browser User-Agent was not found.'.format(browser))
raise
if not browser:
browser = random.SystemRandom().choice(list(user_agents))
user_agent_version = random.SystemRandom().choice(list(user_agents.get(browser).get('releases')))
if user_agents.get(browser).get('releases').get(user_agent_version).get('headers'):
self.headers = user_agents.get(browser).get('releases').get(user_agent_version).get('headers')
else:
self.headers = user_agents.get(browser).get('default_headers')
self.headers['User-Agent'] = random.SystemRandom().choice(user_agents.get(browser).get('releases').get(user_agent_version).get('User-Agent'))
if not kwargs.get('allow_brotli', False):
if 'br' in self.headers['Accept-Encoding']:
self.headers['Accept-Encoding'] = ','.join([encoding for encoding in self.headers['Accept-Encoding'].split(',') if encoding.strip() != 'br']).strip()
import json
import os
import random
import sys
import ssl
from collections import OrderedDict
# ------------------------------------------------------------------------------- #
class User_Agent():
# ------------------------------------------------------------------------------- #
def __init__(self, *args, **kwargs):
self.headers = None
self.cipherSuite = []
self.loadUserAgent(*args, **kwargs)
# ------------------------------------------------------------------------------- #
def loadHeaders(self, user_agents, user_agent_version):
if user_agents.get(self.browser).get('releases').get(user_agent_version).get('headers'):
self.headers = user_agents.get(self.browser).get('releases').get(user_agent_version).get('headers')
else:
self.headers = user_agents.get(self.browser).get('default_headers')
# ------------------------------------------------------------------------------- #
def filterAgents(self, releases):
filtered = {}
for release in releases:
if self.mobile and releases[release]['User-Agent']['mobile']:
filtered[release] = filtered.get(release, []) + releases[release]['User-Agent']['mobile']
if self.desktop and releases[release]['User-Agent']['desktop']:
filtered[release] = filtered.get(release, []) + releases[release]['User-Agent']['desktop']
return filtered
# ------------------------------------------------------------------------------- #
def loadUserAgent(self, *args, **kwargs):
self.browser = kwargs.pop('browser', None)
if isinstance(self.browser, dict):
self.custom = self.browser.get('custom', None)
self.desktop = self.browser.get('desktop', True)
self.mobile = self.browser.get('mobile', True)
self.browser = self.browser.get('browser', None)
else:
self.custom = kwargs.pop('custom', None)
self.desktop = kwargs.pop('desktop', True)
self.mobile = kwargs.pop('mobile', True)
if not self.desktop and not self.mobile:
sys.tracebacklimit = 0
raise RuntimeError("Sorry you can't have mobile and desktop disabled at the same time.")
user_agents = json.load(
open(os.path.join(os.path.dirname(__file__), 'browsers.json'), 'r'),
object_pairs_hook=OrderedDict
)
if self.custom:
self.cipherSuite = '{}:!ECDHE+SHA:!AES128-SHA'.format(ssl._DEFAULT_CIPHERS).split(':')
self.headers = OrderedDict([
('User-Agent', self.custom),
('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8'),
('Accept-Language', 'en-US,en;q=0.9'),
('Accept-Encoding', 'gzip, deflate, br')
])
else:
if self.browser and not user_agents.get(self.browser):
sys.tracebacklimit = 0
raise RuntimeError('Sorry "{}" browser User-Agent was not found.'.format(self.browser))
if not self.browser:
self.browser = random.SystemRandom().choice(list(user_agents))
self.cipherSuite = user_agents.get(self.browser).get('cipherSuite', [])
filteredAgents = self.filterAgents(user_agents.get(self.browser).get('releases'))
user_agent_version = random.SystemRandom().choice(list(filteredAgents))
self.loadHeaders(user_agents, user_agent_version)
self.headers['User-Agent'] = random.SystemRandom().choice(filteredAgents[user_agent_version])
if not kwargs.get('allow_brotli', False):
if 'br' in self.headers['Accept-Encoding']:
self.headers['Accept-Encoding'] = ','.join([encoding for encoding in self.headers['Accept-Encoding'].split(',') if encoding.strip() != 'br']).strip()

24232
lib/cloudscraper/user_agent/browsers.json Executable file → Normal file

File diff suppressed because it is too large Load Diff