Fix DooD Stream

This commit is contained in:
Alhaziel01
2021-05-31 17:56:33 +02:00
parent f7e4d1568b
commit 949f398a64
21 changed files with 10851 additions and 20 deletions

View File

@@ -0,0 +1,260 @@
from __future__ import absolute_import
import requests
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
from ..exceptions import (
CaptchaServiceUnavailable,
CaptchaAPIError,
CaptchaTimeout,
CaptchaParameter,
CaptchaBadJobID,
CaptchaReportError
)
try:
import polling2
except ImportError:
raise ImportError("Please install the python module 'polling2' via pip")
from . import Captcha
class captchaSolver(Captcha):
def __init__(self):
super(captchaSolver, self).__init__('2captcha')
self.host = 'https://2captcha.com'
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response, request_type):
if response.status_code in [500, 502]:
raise CaptchaServiceUnavailable(f'2Captcha: Server Side Error {response.status_code}')
errors = {
'in.php': {
"ERROR_WRONG_USER_KEY": "You've provided api_key parameter value is in incorrect format, it should contain 32 symbols.",
"ERROR_KEY_DOES_NOT_EXIST": "The api_key you've provided does not exists.",
"ERROR_ZERO_BALANCE": "You don't have sufficient funds on your account.",
"ERROR_PAGEURL": "pageurl parameter is missing in your request.",
"ERROR_NO_SLOT_AVAILABLE":
"No Slots Available.\nYou can receive this error in two cases:\n"
"1. If you solve ReCaptcha: the queue of your captchas that are not distributed to workers is too long. "
"Queue limit changes dynamically and depends on total amount of captchas awaiting solution and usually it's between 50 and 100 captchas.\n"
"2. If you solve Normal Captcha: your maximum rate for normal captchas is lower than current rate on the server."
"You can change your maximum rate in your account's settings.",
"ERROR_IP_NOT_ALLOWED": "The request is sent from the IP that is not on the list of your allowed IPs.",
"IP_BANNED": "Your IP address is banned due to many frequent attempts to access the server using wrong authorization keys.",
"ERROR_BAD_TOKEN_OR_PAGEURL":
"You can get this error code when sending ReCaptcha V2. "
"That happens if your request contains invalid pair of googlekey and pageurl. "
"The common reason for that is that ReCaptcha is loaded inside an iframe hosted on another domain/subdomain.",
"ERROR_GOOGLEKEY":
"You can get this error code when sending ReCaptcha V2. "
"That means that sitekey value provided in your request is incorrect: it's blank or malformed.",
"MAX_USER_TURN": "You made more than 60 requests within 3 seconds.Your account is banned for 10 seconds. Ban will be lifted automatically."
},
'res.php': {
"ERROR_CAPTCHA_UNSOLVABLE":
"We are unable to solve your captcha - three of our workers were unable solve it "
"or we didn't get an answer within 90 seconds (300 seconds for ReCaptcha V2). "
"We will not charge you for that request.",
"ERROR_WRONG_USER_KEY": "You've provided api_key parameter value in incorrect format, it should contain 32 symbols.",
"ERROR_KEY_DOES_NOT_EXIST": "The api_key you've provided does not exists.",
"ERROR_WRONG_ID_FORMAT": "You've provided captcha ID in wrong format. The ID can contain numbers only.",
"ERROR_WRONG_CAPTCHA_ID": "You've provided incorrect captcha ID.",
"ERROR_BAD_DUPLICATES":
"Error is returned when 100% accuracy feature is enabled. "
"The error means that max numbers of tries is reached but min number of matches not found.",
"REPORT_NOT_RECORDED": "Error is returned to your complain request if you already complained lots of correctly solved captchas.",
"ERROR_IP_ADDRES":
"You can receive this error code when registering a pingback (callback) IP or domain."
"That happes if your request is coming from an IP address that doesn't match the IP address of your pingback IP or domain.",
"ERROR_TOKEN_EXPIRED": "You can receive this error code when sending GeeTest. That error means that challenge value you provided is expired.",
"ERROR_EMPTY_ACTION": "Action parameter is missing or no value is provided for action parameter."
}
}
rPayload = response.json()
if rPayload.get('status') == 0 and rPayload.get('request') in errors.get(request_type):
raise CaptchaAPIError(
f"{rPayload['request']} {errors.get(request_type).get(rPayload['request'])}"
)
# ------------------------------------------------------------------------------- #
def reportJob(self, jobID):
if not jobID:
raise CaptchaBadJobID(
"2Captcha: Error bad job id to request Captcha."
)
def _checkRequest(response):
self.checkErrorStatus(response, 'res.php')
if response.ok and response.json().get('status') == 1:
return response
return None
response = polling2.poll(
lambda: self.session.get(
f'{self.host}/res.php',
params={
'key': self.api_key,
'action': 'reportbad',
'id': jobID,
'json': '1'
},
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return True
else:
raise CaptchaReportError(
"2Captcha: Error - Failed to report bad Captcha solve."
)
# ------------------------------------------------------------------------------- #
def requestJob(self, jobID):
if not jobID:
raise CaptchaBadJobID("2Captcha: Error bad job id to request Captcha.")
def _checkRequest(response):
self.checkErrorStatus(response, 'res.php')
if response.ok and response.json().get('status') == 1:
return response
return None
response = polling2.poll(
lambda: self.session.get(
f'{self.host}/res.php',
params={
'key': self.api_key,
'action': 'get',
'id': jobID,
'json': '1'
},
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json().get('request')
else:
raise CaptchaTimeout(
"2Captcha: Error failed to solve Captcha."
)
# ------------------------------------------------------------------------------- #
def requestSolve(self, captchaType, url, siteKey):
def _checkRequest(response):
self.checkErrorStatus(response, 'in.php')
if response.ok and response.json().get("status") == 1 and response.json().get('request'):
return response
return None
data = {
'key': self.api_key,
'pageurl': url,
'json': 1,
'soft_id': 2905
}
data.update(
{
'method': 'userrcaptcha',
'googlekey': siteKey
} if captchaType == 'reCaptcha' else {
'method': 'hcaptcha',
'sitekey': siteKey
}
)
if self.proxy:
data.update(
{
'proxy': self.proxy,
'proxytype': self.proxyType
}
)
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/in.php',
data=data,
allow_redirects=False,
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json().get('request')
else:
raise CaptchaBadJobID(
'2Captcha: Error no job id was returned.'
)
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, captchaType, url, siteKey, captchaParams):
jobID = None
if not captchaParams.get('api_key'):
raise CaptchaParameter(
"2Captcha: Missing api_key parameter."
)
self.api_key = captchaParams.get('api_key')
if captchaParams.get('proxy') and not captchaParams.get('no_proxy'):
hostParsed = urlparse(captchaParams.get('proxy', {}).get('https'))
if not hostParsed.scheme:
raise CaptchaParameter('Cannot parse proxy correctly, bad scheme')
if not hostParsed.netloc:
raise CaptchaParameter('Cannot parse proxy correctly, bad netloc')
self.proxyType = hostParsed.scheme
self.proxy = hostParsed.netloc
else:
self.proxy = None
try:
jobID = self.requestSolve(captchaType, url, siteKey)
return self.requestJob(jobID)
except polling2.TimeoutException:
try:
if jobID:
self.reportJob(jobID)
except polling2.TimeoutException:
raise CaptchaTimeout(
f"2Captcha: Captcha solve took to long and also failed reporting the job the job id {jobID}."
)
raise CaptchaTimeout(
f"2Captcha: Captcha solve took to long to execute job id {jobID}, aborting."
)
# ------------------------------------------------------------------------------- #
captchaSolver()

View File

@@ -0,0 +1,212 @@
from __future__ import absolute_import
import re
import requests
try:
import polling
except ImportError:
raise ImportError(
"Please install the python module 'polling' via pip or download it from "
"https://github.com/justiniso/polling/"
)
from ..exceptions import (
reCaptchaServiceUnavailable,
reCaptchaAPIError,
reCaptchaTimeout,
reCaptchaParameter,
reCaptchaBadJobID
)
from . import reCaptcha
class captchaSolver(reCaptcha):
def __init__(self):
super(captchaSolver, self).__init__('9kw')
self.host = 'https://www.9kw.eu/index.cgi'
self.maxtimeout = 180
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response):
if response.status_code in [500, 502]:
raise reCaptchaServiceUnavailable(
f'9kw: Server Side Error {response.status_code}'
)
error_codes = {
1: 'No API Key available.',
2: 'No API key found.',
3: 'No active API key found.',
4: 'API Key has been disabled by the operator. ',
5: 'No user found.',
6: 'No data found.',
7: 'Found No ID.',
8: 'found No captcha.',
9: 'No image found.',
10: 'Image size not allowed.',
11: 'credit is not sufficient.',
12: 'what was done.',
13: 'No answer contain.',
14: 'Captcha already been answered.',
15: 'Captcha to quickly filed.',
16: 'JD check active.',
17: 'Unknown problem.',
18: 'Found No ID.',
19: 'Incorrect answer.',
20: 'Do not timely filed (Incorrect UserID).',
21: 'Link not allowed.',
22: 'Prohibited submit.',
23: 'Entering prohibited.',
24: 'Too little credit.',
25: 'No entry found.',
26: 'No Conditions accepted.',
27: 'No coupon code found in the database.',
28: 'Already unused voucher code.',
29: 'maxTimeout under 60 seconds.',
30: 'User not found.',
31: 'An account is not yet 24 hours in system.',
32: 'An account does not have the full rights.',
33: 'Plugin needed a update.',
34: 'No HTTPS allowed.',
35: 'No HTTP allowed.',
36: 'Source not allowed.',
37: 'Transfer denied.',
38: 'Incorrect answer without space',
39: 'Incorrect answer with space',
40: 'Incorrect answer with not only numbers',
41: 'Incorrect answer with not only A-Z, a-z',
42: 'Incorrect answer with not only 0-9, A-Z, a-z',
43: 'Incorrect answer with not only [0-9,- ]',
44: 'Incorrect answer with not only [0-9A-Za-z,- ]',
45: 'Incorrect answer with not only coordinates',
46: 'Incorrect answer with not only multiple coordinates',
47: 'Incorrect answer with not only data',
48: 'Incorrect answer with not only rotate number',
49: 'Incorrect answer with not only text',
50: 'Incorrect answer with not only text and too short',
51: 'Incorrect answer with not enough chars',
52: 'Incorrect answer with too many chars',
53: 'Incorrect answer without no or yes',
54: 'Assignment was not found.'
}
if response.text.startswith('{'):
if response.json().get('error'):
raise reCaptchaAPIError(error_codes.get(int(response.json().get('error'))))
else:
error_code = int(re.search(r'^00(?P<error_code>\d+)', response.text).groupdict().get('error_code', 0))
if error_code:
raise reCaptchaAPIError(error_codes.get(error_code))
# ------------------------------------------------------------------------------- #
def requestJob(self, jobID):
if not jobID:
raise reCaptchaBadJobID(
"9kw: Error bad job id to request reCaptcha against."
)
def _checkRequest(response):
if response.ok and response.json().get('answer') != 'NO DATA':
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.get(
self.host,
params={
'apikey': self.api_key,
'action': 'usercaptchacorrectdata',
'id': jobID,
'info': 1,
'json': 1
}
),
check_success=_checkRequest,
step=10,
timeout=(self.maxtimeout + 10)
)
if response:
return response.json().get('answer')
else:
raise reCaptchaTimeout("9kw: Error failed to solve reCaptcha.")
# ------------------------------------------------------------------------------- #
def requestSolve(self, captchaType, url, siteKey):
def _checkRequest(response):
if response.ok and response.text.startswith('{') and response.json().get('captchaid'):
return response
self.checkErrorStatus(response)
return None
captchaMap = {
'reCaptcha': 'recaptchav2',
'hCaptcha': 'hcaptcha'
}
response = polling.poll(
lambda: self.session.post(
self.host,
data={
'apikey': self.api_key,
'action': 'usercaptchaupload',
'interactive': 1,
'file-upload-01': siteKey,
'oldsource': captchaMap[captchaType],
'pageurl': url,
'maxtimeout': self.maxtimeout,
'json': 1
},
allow_redirects=False
),
check_success=_checkRequest,
step=5,
timeout=(self.maxtimeout + 10)
)
if response:
return response.json().get('captchaid')
else:
raise reCaptchaBadJobID('9kw: Error no valid job id was returned.')
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, captchaType, url, siteKey, reCaptchaParams):
jobID = None
if not reCaptchaParams.get('api_key'):
raise reCaptchaParameter("9kw: Missing api_key parameter.")
self.api_key = reCaptchaParams.get('api_key')
if reCaptchaParams.get('maxtimeout'):
self.maxtimeout = reCaptchaParams.get('maxtimeout')
if reCaptchaParams.get('proxy'):
self.session.proxies = reCaptchaParams.get('proxies')
try:
jobID = self.requestSolve(captchaType, url, siteKey)
return self.requestJob(jobID)
except polling.TimeoutException:
raise reCaptchaTimeout(
f"9kw: reCaptcha solve took to long to execute 'captchaid' {jobID}, aborting."
)
# ------------------------------------------------------------------------------- #
captchaSolver()

View File

@@ -0,0 +1,47 @@
import abc
import logging
import sys
if sys.version_info >= (3, 4):
ABC = abc.ABC # noqa
else:
ABC = abc.ABCMeta('ABC', (), {})
# ------------------------------------------------------------------------------- #
captchaSolvers = {}
# ------------------------------------------------------------------------------- #
class Captcha(ABC):
@abc.abstractmethod
def __init__(self, name):
captchaSolvers[name] = self
# ------------------------------------------------------------------------------- #
@classmethod
def dynamicImport(cls, name):
if name not in captchaSolvers:
try:
__import__(f'{cls.__module__}.{name}')
if not isinstance(captchaSolvers.get(name), Captcha):
raise ImportError('The anti captcha provider was not initialized.')
except ImportError as e:
sys.tracebacklimit = 0
logging.error(f'Unable to load {name} anti captcha provider -> {e}')
raise
return captchaSolvers[name]
# ------------------------------------------------------------------------------- #
@abc.abstractmethod
def getCaptchaAnswer(self, captchaType, url, siteKey, captchaParams):
pass
# ------------------------------------------------------------------------------- #
def solveCaptcha(self, captchaType, url, siteKey, captchaParams):
return self.getCaptchaAnswer(captchaType, url, siteKey, captchaParams)

View File

@@ -0,0 +1,109 @@
from __future__ import absolute_import
from ..exceptions import (
CaptchaParameter,
CaptchaTimeout,
CaptchaAPIError
)
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
try:
from python_anticaptcha import (
AnticaptchaClient,
NoCaptchaTaskProxylessTask,
HCaptchaTaskProxyless,
NoCaptchaTask,
HCaptchaTask,
AnticaptchaException
)
except ImportError:
raise ImportError(
"Please install/upgrade the python module 'python_anticaptcha' via "
"pip install python-anticaptcha or https://github.com/ad-m/python-anticaptcha/"
)
import sys
from . import Captcha
class captchaSolver(Captcha):
def __init__(self):
if sys.modules['python_anticaptcha'].__version__ < '0.6':
raise ImportError(
"Please upgrade the python module 'python_anticaptcha' via "
"pip install -U python-anticaptcha or https://github.com/ad-m/python-anticaptcha/"
)
super(captchaSolver, self).__init__('anticaptcha')
# ------------------------------------------------------------------------------- #
def parseProxy(self, url, user_agent):
parsed = urlparse(url)
return dict(
proxy_type=parsed.scheme,
proxy_address=parsed.hostname,
proxy_port=parsed.port,
proxy_login=parsed.username,
proxy_password=parsed.password,
user_agent=user_agent
)
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, captchaType, url, siteKey, captchaParams):
if not captchaParams.get('api_key'):
raise CaptchaParameter("anticaptcha: Missing api_key parameter.")
client = AnticaptchaClient(captchaParams.get('api_key'))
if captchaParams.get('proxy') and not captchaParams.get('no_proxy'):
captchaMap = {
'reCaptcha': NoCaptchaTask,
'hCaptcha': HCaptchaTask
}
proxy = self.parseProxy(
captchaParams.get('proxy', {}).get('https'),
captchaParams.get('User-Agent', '')
)
task = captchaMap[captchaType](
url,
siteKey,
**proxy
)
else:
captchaMap = {
'reCaptcha': NoCaptchaTaskProxylessTask,
'hCaptcha': HCaptchaTaskProxyless
}
task = captchaMap[captchaType](url, siteKey)
if not hasattr(client, 'createTaskSmee'):
raise NotImplementedError(
"Please upgrade 'python_anticaptcha' via pip or download it from "
"https://github.com/ad-m/python-anticaptcha/tree/hcaptcha"
)
job = client.createTaskSmee(task, timeout=180)
try:
job.join(maximum_time=180)
except (AnticaptchaException) as e:
raise CaptchaTimeout(f"{getattr(e, 'message', e)}")
if 'solution' in job._last_result:
return job.get_solution_response()
else:
raise CaptchaAPIError('Job did not return `solution` key in payload.')
# ------------------------------------------------------------------------------- #
captchaSolver()

View File

@@ -0,0 +1,190 @@
from __future__ import absolute_import
import requests
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
from ..exceptions import (
CaptchaServiceUnavailable,
CaptchaAPIError,
CaptchaTimeout,
CaptchaParameter,
CaptchaBadJobID
)
try:
import polling2
except ImportError:
raise ImportError("Please install the python module 'polling2' via pip")
from . import Captcha
class captchaSolver(Captcha):
def __init__(self):
super(captchaSolver, self).__init__('capmonster')
self.host = 'https://api.capmonster.cloud'
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response):
if response.status_code in [500, 502]:
raise CaptchaServiceUnavailable(
f'CapMonster: Server Side Error {response.status_code}'
)
payload = response.json()
if payload['errorId'] == 1:
if 'errorDescription' in payload:
raise CaptchaAPIError(
payload['errorDescription']
)
else:
raise CaptchaAPIError(payload['errorCode'])
# ------------------------------------------------------------------------------- #
def requestJob(self, taskID):
if not taskID:
raise CaptchaBadJobID(
'CapMonster: Error bad task id to request Captcha.'
)
def _checkRequest(response):
self.checkErrorStatus(response)
if response.ok and response.json()['status'] == 'ready':
return True
return None
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/getTaskResult',
json={
'clientKey': self.clientKey,
'taskId': taskID
},
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json()['solution']['gRecaptchaResponse']
else:
raise CaptchaTimeout(
"CapMonster: Error failed to solve Captcha."
)
# ------------------------------------------------------------------------------- #
def requestSolve(self, captchaType, url, siteKey):
def _checkRequest(response):
self.checkErrorStatus(response)
if response.ok and response.json()['taskId']:
return True
return None
data = {
'clientKey': self.clientKey,
'task': {
'websiteURL': url,
'websiteKey': siteKey,
'softId': 37,
'type': 'NoCaptchaTask' if captchaType == 'reCaptcha' else 'HCaptchaTask'
}
}
if self.proxy:
data['task'].update(self.proxy)
else:
data['task']['type'] = f"{data['task']['type']}Proxyless"
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/createTask',
json=data,
allow_redirects=False,
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json()['taskId']
else:
raise CaptchaBadJobID(
'CapMonster: Error no task id was returned.'
)
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, captchaType, url, siteKey, captchaParams):
taskID = None
if not captchaParams.get('clientKey'):
raise CaptchaParameter(
"CapMonster: Missing clientKey parameter."
)
self.clientKey = captchaParams.get('clientKey')
if captchaParams.get('proxy') and not captchaParams.get('no_proxy'):
hostParsed = urlparse(captchaParams.get('proxy', {}).get('https'))
if not hostParsed.scheme:
raise CaptchaParameter('Cannot parse proxy correctly, bad scheme')
if not hostParsed.netloc:
raise CaptchaParameter('Cannot parse proxy correctly, bad netloc')
ports = {
'http': 80,
'https': 443
}
self.proxy = {
'proxyType': hostParsed.scheme,
'proxyAddress': hostParsed.hostname,
'proxyPort': hostParsed.port if hostParsed.port else ports[self.proxy['proxyType']],
'proxyLogin': hostParsed.username,
'proxyPassword': hostParsed.password,
}
else:
self.proxy = None
try:
taskID = self.requestSolve(captchaType, url, siteKey)
return self.requestJob(taskID)
except polling2.TimeoutException:
try:
if taskID:
self.reportJob(taskID)
except polling2.TimeoutException:
raise CaptchaTimeout(
"CapMonster: Captcha solve took to long and also failed "
f"reporting the task with task id {taskID}."
)
raise CaptchaTimeout(
"CapMonster: Captcha solve took to long to execute "
f"task id {taskID}, aborting."
)
# ------------------------------------------------------------------------------- #
captchaSolver()

View File

@@ -0,0 +1,268 @@
from __future__ import absolute_import
import json
import requests
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
try:
import polling2
except ImportError:
raise ImportError("Please install the python module 'polling2' via pip")
from ..exceptions import (
CaptchaServiceUnavailable,
CaptchaTimeout,
CaptchaParameter,
CaptchaBadJobID,
CaptchaReportError
)
from . import Captcha
class captchaSolver(Captcha):
def __init__(self):
super(captchaSolver, self).__init__('deathbycaptcha')
self.host = 'http://api.dbcapi.me/api'
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response):
errors = dict(
[
(400, "DeathByCaptcha: 400 Bad Request"),
(403, "DeathByCaptcha: 403 Forbidden - Invalid credentails or insufficient credits."),
# (500, "DeathByCaptcha: 500 Internal Server Error."),
(503, "DeathByCaptcha: 503 Service Temporarily Unavailable.")
]
)
if response.status_code in errors:
raise CaptchaServiceUnavailable(errors.get(response.status_code))
# ------------------------------------------------------------------------------- #
def login(self, username, password):
self.username = username
self.password = password
def _checkRequest(response):
if response.ok:
if response.json().get('is_banned'):
raise CaptchaServiceUnavailable('DeathByCaptcha: Your account is banned.')
if response.json().get('balanace') == 0:
raise CaptchaServiceUnavailable('DeathByCaptcha: insufficient credits.')
return response
self.checkErrorStatus(response)
return None
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/user',
headers={'Accept': 'application/json'},
data={
'username': self.username,
'password': self.password
}
),
check_success=_checkRequest,
step=10,
timeout=120
)
self.debugRequest(response)
# ------------------------------------------------------------------------------- #
def reportJob(self, jobID):
if not jobID:
raise CaptchaBadJobID(
"DeathByCaptcha: Error bad job id to report failed reCaptcha."
)
def _checkRequest(response):
if response.status_code == 200:
return response
self.checkErrorStatus(response)
return None
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/captcha/{jobID}/report',
headers={'Accept': 'application/json'},
data={
'username': self.username,
'password': self.password
}
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return True
else:
raise CaptchaReportError(
"DeathByCaptcha: Error report failed reCaptcha."
)
# ------------------------------------------------------------------------------- #
def requestJob(self, jobID):
if not jobID:
raise CaptchaBadJobID(
"DeathByCaptcha: Error bad job id to request reCaptcha."
)
def _checkRequest(response):
if response.ok and response.json().get('text'):
return response
self.checkErrorStatus(response)
return None
response = polling2.poll(
lambda: self.session.get(
f'{self.host}/captcha/{jobID}',
headers={'Accept': 'application/json'}
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return response.json().get('text')
else:
raise CaptchaTimeout(
"DeathByCaptcha: Error failed to solve reCaptcha."
)
# ------------------------------------------------------------------------------- #
def requestSolve(self, captchaType, url, siteKey):
def _checkRequest(response):
if response.ok and response.json().get("is_correct") and response.json().get('captcha'):
return response
self.checkErrorStatus(response)
return None
data = {
'username': self.username,
'password': self.password,
}
if captchaType == 'reCaptcha':
jPayload = {
'googlekey': siteKey,
'pageurl': url
}
if self.proxy:
jPayload.update({
'proxy': self.proxy,
'proxytype': self.proxyType
})
data.update({
'type': '4',
'token_params': json.dumps(jPayload)
})
else:
jPayload = {
'sitekey': siteKey,
'pageurl': url
}
if self.proxy:
jPayload.update({
'proxy': self.proxy,
'proxytype': self.proxyType
})
data.update({
'type': '7',
'hcaptcha_params': json.dumps(jPayload)
})
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/captcha',
headers={'Accept': 'application/json'},
data=data,
allow_redirects=False
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return response.json().get('captcha')
else:
raise CaptchaBadJobID(
'DeathByCaptcha: Error no job id was returned.'
)
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, captchaType, url, siteKey, captchaParams):
jobID = None
for param in ['username', 'password']:
if not captchaParams.get(param):
raise CaptchaParameter(
f"DeathByCaptcha: Missing '{param}' parameter."
)
setattr(self, param, captchaParams.get(param))
if captchaParams.get('proxy') and not captchaParams.get('no_proxy'):
hostParsed = urlparse(captchaParams.get('proxy', {}).get('https'))
if not hostParsed.scheme:
raise CaptchaParameter('Cannot parse proxy correctly, bad scheme')
if not hostParsed.netloc:
raise CaptchaParameter('Cannot parse proxy correctly, bad netloc')
self.proxyType = hostParsed.scheme.upper()
self.proxy = captchaParams.get('proxy', {}).get('https')
else:
self.proxy = None
try:
jobID = self.requestSolve(captchaType, url, siteKey)
return self.requestJob(jobID)
except polling2.TimeoutException:
try:
if jobID:
self.reportJob(jobID)
except polling2.TimeoutException:
raise CaptchaTimeout(
f"DeathByCaptcha: Captcha solve took to long and also failed reporting the job id {jobID}."
)
raise CaptchaTimeout(
f"DeathByCaptcha: Captcha solve took to long to execute job id {jobID}, aborting."
)
# ------------------------------------------------------------------------------- #
captchaSolver()