recaptcha librerie

This commit is contained in:
marco
2021-11-21 13:34:57 +01:00
parent 264b2d4292
commit 377044a879
32 changed files with 9845 additions and 0 deletions

View File

@@ -0,0 +1,23 @@
# Copyright (C) 2017-2018 taylor.fish <contact@taylor.fish>
#
# This file is part of librecaptcha.
#
# librecaptcha is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# librecaptcha is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with librecaptcha. If not, see <https://www.gnu.org/licenses/>.
# flake8: noqa
from . import librecaptcha, extract_strings, user_agents
from .recaptcha import ReCaptcha
from .librecaptcha import __version__, get_token, has_gui
from .user_agents import USER_AGENTS, random_user_agent
from .__main__ import main

View File

@@ -0,0 +1,280 @@
# Copyright (C) 2017, 2019 taylor.fish <contact@taylor.fish>
#
# This file is part of librecaptcha.
#
# librecaptcha is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# librecaptcha is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with librecaptcha. If not, see <https://www.gnu.org/licenses/>.
from . import errors
from .errors import UserError, UserExit
from .librecaptcha import get_token, __version__
from .user_agents import random_user_agent
import os
import re
import sys
def get_cmd():
if not sys.argv:
return "librecaptcha"
if sys.argv[0].startswith("./"):
return sys.argv[0]
return os.path.basename(sys.argv[0])
CMD = get_cmd()
USAGE = """\
Usage:
{0} [options] [--] <api-key> <site-url> [<user-agent>]
{0} -h | --help | --version
Arguments:
<api-key> The reCAPTCHA API key to use. This is usually the value of the
"data-sitekey" HTML attribute.
<site-url> The URL of the site that contains the reCAPTCHA challenge.
Should start with http:// or https://. Everything after the
hostname is optional. For example: https://example.com
<user-agent> A user-agent string. The client that will use the obtained
reCAPTCHA token should have this user-agent string. If not
provided, a random user-agent string will be chosen and shown.
Options:
-g --gui Use the GTK 3 GUI (as opposed to the CLI).
--debug Show debugging information while running.
-h --help Show this help message.
--version Show the program version.
""".format(CMD)
def usage(file=sys.stdout):
print(USAGE, end="", file=file)
def usage_error(exit=True):
usage(sys.stderr)
if exit:
sys.exit(1)
class ParsedArgs:
def __init__(self):
self.parse_error = None
self.api_key = None
self.site_url = None
self.user_agent = None
self.gui = False
self.debug = False
self.help = False
self.version = False
class ArgParser:
def __init__(self, args):
self.args = args
self.index = 0
self.positional_index = 0
self.parsed = ParsedArgs()
self.options_done = False
self.end_early = False
@property
def arg(self):
try:
return self.args[self.index]
except IndexError:
return None
@property
def done(self):
return self.end_early or self.index >= len(self.args)
def advance(self):
self.index += 1
def error(self, message):
self.parsed.parse_error = message
self.end_early = True
def parse_long_option(self, arg):
body = arg[len("--"):]
if body == "debug":
self.parsed.debug = True
return
if body == "help":
self.parsed.help = True
self.end_early = True
return
if body == "version":
self.parsed.version = True
self.end_early = True
return
if body == "gui":
self.parsed.gui = True
return
self.error("Unrecognized option: {}".format(arg))
def parse_short_option_char(self, char):
if char == "h":
self.parsed.help = True
self.end_early = True
return
if char == "g":
self.parsed.gui = True
return
self.error("Unrecognized option: -{}".format(char))
def parse_short_option(self, arg):
body = arg[len("-"):]
for char in body:
self.parse_short_option_char(char)
def try_parse_option(self):
arg = self.arg
if arg == "--":
self.options_done = True
return True
if re.match(r"--[^-]", arg):
self.parse_long_option(arg)
return True
if re.match(r"-[^-]", arg):
self.parse_short_option(arg)
return True
return False
def parse_positional(self):
arg = self.arg
if self.positional_index == 0:
self.parsed.api_key = arg
return
if self.positional_index == 1:
self.parsed.site_url = arg
return
if self.positional_index == 2:
self.parsed.user_agent = arg
return
self.error("Unexpected positional argument: {}".format(arg))
def parse_single(self):
if not self.options_done and self.try_parse_option():
return
self.parse_positional()
self.positional_index += 1
def handle_end(self):
if self.end_early:
return
if self.positional_index < 1:
self.error("Missing positional argument: <api-key>")
return
if self.positional_index < 2:
self.error("Missing positional argument: <site-url>")
return
def parse(self):
while not self.done:
self.parse_single()
self.advance()
self.handle_end()
return self.parsed
USER_ERRORS = (
errors.GtkImportError,
errors.SiteUrlParseError,
errors.UnsupportedChallengeError,
)
GOT_TOKEN_MSG = """\
Received token. This token should usually be submitted with the form as the
value of the "g-recaptcha-response" field.
"""
def run(args: ParsedArgs):
random_ua = False
user_agent = args.user_agent
if args.user_agent is None:
random_ua = True
user_agent = random_user_agent()
if args.debug:
print("User-agent string: {}".format(user_agent), file=sys.stderr)
try:
uvtoken = get_token(
api_key=args.api_key,
site_url=args.site_url,
user_agent=user_agent,
gui=args.gui,
debug=args.debug,
)
except USER_ERRORS as e:
raise UserError(str(e)) from e
print(GOT_TOKEN_MSG)
if random_ua:
print("Note: The following user-agent string was used:")
print(user_agent)
print()
print("Token:")
print(uvtoken)
UNEXPECTED_ERR_MSG = """\
An unexpected error occurred. The exception traceback is shown below:
"""
def run_or_exit(args: ParsedArgs):
if args.debug:
return run(args)
try:
return run(args)
except UserExit:
sys.exit(2)
except UserError as e:
if e.show_by_default:
print(e.message, file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print(file=sys.stderr)
sys.exit(2)
except Exception:
print(UNEXPECTED_ERR_MSG, file=sys.stderr)
raise
def main():
args = sys.argv[1:]
parsed = ArgParser(args).parse()
error = parsed.parse_error
if error is not None:
print(error, file=sys.stderr)
print("For usage information, run: {} --help".format(CMD),
file=sys.stderr)
sys.exit(1)
if parsed.help:
usage()
return
if parsed.version:
print(__version__)
return
run_or_exit(parsed)
if __name__ == "__main__":
main()

317
lib/librecaptcha/cli.py Normal file
View File

@@ -0,0 +1,317 @@
# Copyright (C) 2017, 2019, 2021 taylor.fish <contact@taylor.fish>
#
# This file is part of librecaptcha.
#
# librecaptcha is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# librecaptcha is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with librecaptcha. If not, see <https://www.gnu.org/licenses/>.
from .recaptcha import ChallengeGoal, GridDimensions, ImageGridChallenge
from .recaptcha import DynamicSolver, MultiCaptchaSolver, Solver
from .recaptcha import ReCaptcha, Solution
from .typing import List
from PIL import Image, ImageDraw, ImageFont
from threading import Thread
from queue import Queue
import io
import os
import random
import readline # noqa: F401
import subprocess
import sys
import time
TYPEFACES = [
"FreeSans",
"LiberationSans-Regular",
"DejaVuSans",
"Arial",
"arial",
]
def get_font(size: int) -> ImageFont.ImageFont:
for typeface in TYPEFACES:
try:
return ImageFont.truetype(typeface, size=size)
except OSError:
pass
return ImageFont.load_default()
FONT_SIZE = 16
FONT = get_font(FONT_SIZE)
def read_indices(prompt: str, max_index: int) -> List[int]:
while True:
line = input(prompt)
try:
indices = [int(i) - 1 for i in line.split()]
except ValueError:
print("Invalid input.")
continue
if all(0 <= i < max_index for i in indices):
return indices
print("Numbers out of bounds.")
def draw_lines(image: Image.Image, dimensions: GridDimensions):
draw = ImageDraw.Draw(image)
def line(p1, p2):
draw.line([p1, p2], fill=(255, 255, 255), width=2)
for i in range(1, dimensions.rows):
y = image.height * i // dimensions.rows - 1
line((0, y), (image.width, y))
for i in range(1, dimensions.columns):
x = image.width * i // dimensions.columns - 1
line((x, 0), (x, image.height))
def draw_indices(image: Image.Image, dimensions: GridDimensions):
draw = ImageDraw.Draw(image, "RGBA")
for i in range(dimensions.rows * dimensions.columns):
row, column = divmod(i, dimensions.columns)
corner = (
image.width * column // dimensions.columns,
image.height * (row + 1) // dimensions.rows,
)
text_loc = (
corner[0] + round(FONT_SIZE / 2),
corner[1] - round(FONT_SIZE * 1.5),
)
text = str(i + 1)
text_size = FONT.getsize(text)
draw.rectangle([
(text_loc[0] - round(FONT_SIZE / 10), text_loc[1]), (
text_loc[0] + text_size[0] + round(FONT_SIZE / 10),
text_loc[1] + text_size[1] + round(FONT_SIZE / 10),
),
], fill=(0, 0, 0, 128))
draw.text(text_loc, str(i + 1), fill=(255, 255, 255), font=FONT)
def print_temporary(string: str, file=sys.stdout):
end = "" if file.isatty() else "\n"
print(string, file=file, end=end, flush=True)
def clear_temporary(file=sys.stdout):
if not file.isatty():
return
print("\r\x1b[K", file=file, end="", flush=True)
HAS_DISPLAY_CMD = (os.name == "posix")
def run_display_cmd():
return subprocess.Popen(
["display", "-"],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def try_display_cmd(image: Image.Image):
global HAS_DISPLAY_CMD
if not HAS_DISPLAY_CMD:
return None
img_buffer = io.BytesIO()
image.save(img_buffer, "png")
img_bytes = img_buffer.getvalue()
try:
proc = run_display_cmd()
except FileNotFoundError:
HAS_DISPLAY_CMD = False
return None
proc.stdin.write(img_bytes)
proc.stdin.close()
return proc
class SolverCli:
def __init__(self, cli: "Cli", solver: Solver):
self.cli = cli
self.solver = solver
self.__image_procs = []
def show_image(self, image):
proc = try_display_cmd(image)
if proc is None:
image.show()
else:
self.__image_procs.append(proc)
def hide_images(self):
for proc in self.__image_procs:
proc.terminate()
self.__image_procs.clear()
def run(self):
self.solver.run()
class DynamicCli(SolverCli):
def __init__(self, cli: "Cli", solver: DynamicSolver):
super().__init__(cli, solver)
self.image_open = False
self.image_queue = Queue()
self.num_pending = 0
def run(self):
challenge = self.solver.get_challenge()
self.cli.handle_challenge(challenge)
image = challenge.image
num_rows = challenge.dimensions.rows
num_columns = challenge.dimensions.columns
num_tiles = challenge.dimensions.count
draw_indices(image, challenge.dimensions)
self.show_image(image)
print("Take a look at the grid of tiles that just appeared. ", end="")
print("({} rows, {} columns)".format(num_rows, num_columns))
print("Which tiles should be selected?")
print("(Top-left is 1; bottom-right is {}.)".format(num_tiles))
indices = read_indices(
"Enter numbers separated by spaces: ",
num_tiles,
)
print()
self.hide_images()
self.select_initial(indices)
self.new_tile_loop()
return self.solver.finish()
def new_tile_loop(self):
while self.num_pending > 0:
print_temporary("Waiting for next image...")
index, image = self.image_queue.get()
clear_temporary()
self.num_pending -= 1
self.show_image(image)
print("Take a look at the image that just appeared.")
accept = input(
"Should this image be selected? [y/N] ",
)[:1].lower() == "y"
print()
self.hide_images()
if accept:
self.select_tile(index)
def select_initial(self, indices):
print_temporary("Selecting images...")
for i, index in enumerate(indices):
if i > 0:
# Avoid sending initial requests simultaneously.
time.sleep(random.uniform(0.5, 1))
self.select_tile(index)
clear_temporary()
def select_tile(self, index: int):
self.num_pending += 1
tile = self.solver.select_tile(index)
def add_to_queue():
self.image_queue.put((index, tile.image))
def target():
time.sleep(tile.delay)
add_to_queue()
if tile.delay > 0:
Thread(target=target, daemon=True).start()
else:
target()
class MultiCaptchaCli(SolverCli):
def __init__(self, cli: "Cli", solver: MultiCaptchaSolver):
super().__init__(cli, solver)
def run(self) -> Solution:
result = self.solver.first_challenge()
while not isinstance(result, Solution):
if not isinstance(result, ImageGridChallenge):
raise TypeError("Unexpected type: {}".format(type(result)))
indices = self.handle_challenge(result)
result = self.solver.select_indices(indices)
return result
def handle_challenge(self, challenge: ImageGridChallenge) -> List[int]:
self.cli.handle_challenge(challenge)
num_rows = challenge.dimensions.rows
num_columns = challenge.dimensions.columns
num_tiles = challenge.dimensions.count
image = challenge.image
draw_lines(image, challenge.dimensions)
draw_indices(image, challenge.dimensions)
self.show_image(image)
print("Take a look at the grid of tiles that just appeared. ", end="")
print("({} rows, {} columns)".format(num_rows, num_columns))
print("Which tiles should be selected?")
print("(Top-left is 1; bottom-right is {}.)".format(num_tiles))
indices = read_indices(
"Enter numbers separated by spaces: ",
num_tiles,
)
print()
self.hide_images()
return indices
class Cli:
def __init__(self, rc: ReCaptcha):
self.rc = rc
self._first = True
def run(self) -> str:
result = self.rc.first_solver()
while not isinstance(result, str):
solution = self.run_solver(result)
result = self.rc.send_solution(solution)
return result
def run_solver(self, solver: Solver) -> Solution:
return {
DynamicSolver: DynamicCli,
MultiCaptchaSolver: MultiCaptchaCli,
}[type(solver)](self, solver).run()
def show_goal(self, goal: ChallengeGoal):
plain = goal.plain
if plain:
print("CHALLENGE OBJECTIVE: {}".format(plain))
return
print("WARNING: Could not determine challenge objective.")
print("Challenge information: {}".format(goal.fallback))
def handle_challenge(self, challenge: ImageGridChallenge):
if not self._first:
print("You must solve another challenge.")
print()
self._first = False
self.show_goal(challenge.goal)

100
lib/librecaptcha/errors.py Normal file
View File

@@ -0,0 +1,100 @@
# Copyright (C) 2019, 2021 taylor.fish <contact@taylor.fish>
#
# This file is part of librecaptcha.
#
# librecaptcha is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# librecaptcha is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with librecaptcha. If not, see <https://www.gnu.org/licenses/>.
GUI_MISSING_MESSAGE = """\
Error: Could not load the GUI. Is PyGObject installed?
Try (re)installing librecaptcha[gtk] with pip.
For more details, add the --debug option.
"""[:-1]
CHALLENGE_BLOCKED_MESSAGE = """\
Error: Unsupported challenge type: {}
Requests are most likely being blocked; see the previously displayed messages.
"""[:-1]
UNKNOWN_CHALLENGE_MESSAGE = """\
Error: Unsupported challenge type: {}
See the previously displayed messages for more information.
"""[:-1]
class UserError(Exception):
"""A user-facing exception for an expected error condition (e.g., bad
user-supplied data). When librecaptcha is run as a program, exceptions of
this type are shown without a traceback unless --debug is passed.
"""
def __init__(self, message):
super().__init__(message)
@property
def message(self):
return self.args[0]
@property
def show_by_default(self) -> bool:
"""Whether the exception message should be shown to the user by
default. Certain exception types may want to set this to ``False`` if a
detailed message has already been displayed to the user.
"""
return True
class UserExit(UserError):
"""When librecaptcha is run as a program, throwing this exception causes
the program to terminate. The exception message is not shown by default.
"""
def __init__(self, message="Program terminated."):
super().__init__(message)
class GtkImportError(ImportError):
def __str__(self) -> str:
return GUI_MISSING_MESSAGE
class SiteUrlParseError(ValueError):
pass
class UnsupportedChallengeError(Exception):
def __init__(self, challenge_type: str):
self.challenge_type = challenge_type
def __str__(self):
return "Error: Unsupported challenge type: {}".format(
self.challenge_type,
)
class ChallengeBlockedError(UnsupportedChallengeError):
def __str__(self) -> str:
return CHALLENGE_BLOCKED_MESSAGE.format(self.challenge_type)
@property
def show_by_default(self) -> bool:
# A detailed message is already shown in `librecaptcha.get_token()`.
return False
class UnknownChallengeError(UnsupportedChallengeError):
def __str__(self) -> str:
return UNKNOWN_CHALLENGE_MESSAGE.format(self.challenge_type)
@property
def show_by_default(self) -> bool:
# A detailed message is already shown in `librecaptcha.get_token()`.
return False

View File

@@ -0,0 +1,113 @@
# Copyright (C) 2017, 2019, 2021 taylor.fish <contact@taylor.fish>
#
# This file is part of librecaptcha.
#
# librecaptcha is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# librecaptcha is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with librecaptcha. If not, see <https://www.gnu.org/licenses/>.
from .typing import List
import requests
import json
import os
import os.path
import re
import sys
SHOW_WARNINGS = False
def load_javascript(url: str, user_agent: str) -> str:
print("Downloading <{}>...".format(url), file=sys.stderr)
r = requests.get(url, headers={
"User-Agent": user_agent,
})
return r.text
def extract_strings_slimit(javascript: str) -> List[str]:
from slimit.parser import Parser
from slimit import ast
if SHOW_WARNINGS:
parser = Parser()
else:
# File descriptor hackiness to silence warnings
null_fd = os.open(os.devnull, os.O_RDWR)
old_fd = os.dup(2)
try:
os.dup2(null_fd, 2)
parser = Parser()
finally:
os.dup2(old_fd, 2)
os.close(null_fd)
os.close(old_fd)
# Hack to work around https://github.com/rspivak/slimit/issues/52
KEYWORDS = r"(?:catch|delete|return|throw)"
javascript = re.sub(rf"(\.\s*{KEYWORDS})\b", r"\1_", javascript)
javascript = re.sub(rf"\b({KEYWORDS})(\s*:)", r"'\1'\2", javascript)
parsed = parser.parse(javascript)
strings = []
def add_strings(tree, strings):
if tree is None:
return
if not isinstance(tree, (ast.Node, list, tuple)):
raise TypeError("Unexpected item: {!r}".format(tree))
if isinstance(tree, ast.String):
strings.append(tree.value[1:-1])
children = tree
if isinstance(tree, ast.Node):
children = tree.children()
for child in children:
add_strings(child, strings)
add_strings(parsed, strings)
return strings
def extract_strings(javascript: str) -> List[str]:
print("Extracting strings...", file=sys.stderr)
try:
import esprima
except ImportError:
return extract_strings_slimit(javascript)
strings = []
def handle_node(node, *args):
if node.type == "Literal" and isinstance(node.value, str):
strings.append(node.value)
esprima.parseScript(javascript, delegate=handle_node)
return strings
def extract_and_save(
url: str,
path: str,
version: str,
rc_version: str,
user_agent: str,
) -> List[str]:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
print("{}/{}".format(version, rc_version), file=f)
js = load_javascript(url, user_agent)
strings = extract_strings(js)
strings_json = json.dumps(strings)
print('Saving strings to "{}"...'.format(path), file=sys.stderr)
f.write(strings_json)
return strings

787
lib/librecaptcha/gui.py Normal file
View File

@@ -0,0 +1,787 @@
# Copyright (C) 2019 cyclopsian
# Copyright (C) 2019, 2021 taylor.fish <contact@taylor.fish>
#
# This file is part of librecaptcha.
#
# librecaptcha is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# librecaptcha is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with librecaptcha. If not, see <https://www.gnu.org/licenses/>.
from .errors import UserExit, GtkImportError
from .recaptcha import ChallengeGoal, GridDimensions, ImageGridChallenge
from .recaptcha import DynamicSolver, MultiCaptchaSolver, Solver
from .recaptcha import ReCaptcha, Solution
from .typing import Callable, Iterable, List
from PIL import Image
from collections import namedtuple
from typing import Any, Optional, Union
import html
import re
import sys
import threading
try:
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk, Gdk, GdkPixbuf, GLib
except ImportError as e:
raise GtkImportError from e
def tiles_from_image(
image: Image.Image,
dimensions: GridDimensions,
) -> Iterable[Image.Image]:
tile_width = image.width // dimensions.columns
tile_height = image.height // dimensions.rows
for row in range(0, dimensions.rows):
for column in range(0, dimensions.columns):
left = tile_width * column
top = tile_height * row
right = left + tile_width
bottom = top + tile_height
yield image.crop((left, top, right, bottom))
def image_to_gdk_pixbuf(image: Image.Image):
width, height = image.size
image_bytes = GLib.Bytes(image.tobytes())
has_alpha = (image.mode == "RGBA")
bpp = 4 if has_alpha else 3
return GdkPixbuf.Pixbuf.new_from_bytes(
image_bytes, GdkPixbuf.Colorspace.RGB,
has_alpha, 8, width, height, width * bpp,
)
CSS = """\
grid {
margin-top: 1px;
margin-left: 1px;
}
.challenge-button {
margin: 0;
padding: 0;
border-radius: 0;
box-shadow: none;
-gtk-icon-shadow: none;
border-width: 1px;
margin-top: -1px;
margin-left: -1px;
}
.challenge-check, .challenge-header {
margin-left: 1px;
margin-top: 1px;
color: @theme_selected_fg_color;
background-image: linear-gradient(
@theme_selected_bg_color,
@theme_selected_bg_color
);
}
.challenge-header {
padding: 12px;
}
.challenge-check {
border-radius: 50%;
}
"""
def load_css():
global CSS
if CSS is None:
return
css_provider = Gtk.CssProvider.new()
css_provider.load_from_data(CSS.encode())
CSS = None
Gtk.StyleContext.add_provider_for_screen(
Gdk.Screen.get_default(), css_provider,
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
)
Dispatch = Callable[[Any], None]
class DynamicTile:
def __init__(self, dispatch: Dispatch):
self.dispatch = dispatch
self.pres: Optional["DynamicTilePres"] = None
self.box = Gtk.Box.new(Gtk.Orientation.VERTICAL, 0)
self.inner = None
self.inner_size = (0, 0) # (width, height)
@property
def widget(self):
return self.box
def update(self, pres: "DynamicTilePres"):
if pres.same(self.pres):
return
if self.inner is not None:
self.box.remove(self.inner)
self.make_inner(pres.image)
self.box.show_all()
self.pres = pres
def make_inner(self, image: Image.Image):
if image is None:
self.make_spinner()
return
button = Gtk.Button.new()
button.get_style_context().add_class("challenge-button")
button.add(Gtk.Image.new_from_pixbuf(image_to_gdk_pixbuf(image)))
button.connect("clicked", lambda _: self.pres.on_click(self.dispatch))
def on_size_allocate(obj, size):
self.inner_size = (size.width, size.height)
button.connect("size-allocate", on_size_allocate)
self.set_inner(button)
def make_spinner(self):
width, height = (max(n, 32) for n in self.inner_size)
spinner = Gtk.Spinner.new()
spinner.set_size_request(32, 32)
left = (width - 32) // 2
top = (height - 32) // 2
spinner.set_margin_top(top)
spinner.set_margin_start(left)
spinner.set_margin_bottom(height - top - 32)
spinner.set_margin_end(width - left - 32)
self.set_inner(spinner)
spinner.start()
return spinner
def set_inner(self, widget):
self.inner = widget
self.box.add(self.inner)
class MultiCaptchaTile:
def __init__(self, dispatch: Dispatch):
self.dispatch = dispatch
self.pres: Optional["MultiCaptchaTilePres"] = None
self.image = Gtk.Image.new()
self.check = self.make_check()
self.fixed = Gtk.Fixed.new()
self.fixed.put(self.image, 0, 0)
self.fixed.put(self.check, 0, 0)
self.button = Gtk.ToggleButton.new()
self.button.get_style_context().add_class("challenge-button")
self.button.add(self.fixed)
self.toggle_id = None
self.pixbuf = None
self.small_pixbuf = None
self.button.show_all()
@property
def widget(self):
return self.button
def update(self, pres: "MultiCaptchaTilePres"):
if pres.same(self.pres):
return None
if self.pres is None:
def on_toggle(obj):
self._set_active(not obj.get_active())
self.pres.on_click(self.dispatch)
self.toggle_id = self.button.connect("toggled", on_toggle)
if (self.pres and self.pres.image) is not pres.image:
self.pixbuf = image_to_gdk_pixbuf(pres.image)
width = self.pixbuf.get_width()
height = self.pixbuf.get_height()
self.image.set_size_request(width, height)
self.small_pixbuf = self.pixbuf.scale_simple(
width * 0.9, height * 0.9, GdkPixbuf.InterpType.BILINEAR,
)
if pres.selected:
self.check.show()
self.image.set_from_pixbuf(self.small_pixbuf)
else:
self.check.hide()
self.image.set_from_pixbuf(self.pixbuf)
self._set_active(pres.selected)
self.pres = pres
def make_check(self):
check = Gtk.Image.new_from_icon_name(
"object-select-symbolic", Gtk.IconSize.DND,
)
check.set_pixel_size(24)
check.set_no_show_all(True)
check.get_style_context().add_class("challenge-check")
return check
def _set_active(self, selected: bool):
self.button.handler_block(self.toggle_id)
self.button.set_active(selected)
self.button.handler_unblock(self.toggle_id)
class ChallengeTile:
def __init__(self, dispatch: Dispatch):
self.dispatch = dispatch
self.box = Gtk.Box.new(Gtk.Orientation.VERTICAL, 0)
self.tile = None
@property
def widget(self):
return self.box
def update(self, pres: "TilePres"):
tile_type = {
DynamicTilePres: DynamicTile,
MultiCaptchaTilePres: MultiCaptchaTile,
}[type(pres)]
if type(self.tile) is not tile_type:
if self.tile is not None:
self.box.remove(self.tile.widget)
self.tile = tile_type(self.dispatch)
self.box.add(self.tile.widget)
self.box.show_all()
self.tile.update(pres)
class ImageGridChallengeDialog:
def __init__(self, dispatch: Dispatch):
self.dispatch = dispatch
self.pres: Optional["ImageGridChallengePres"] = None
self.dialog = Gtk.Dialog.new()
self.dialog.set_resizable(False)
self.dialog.set_title("librecaptcha")
self.dialog.set_icon_name("view-refresh-symbolic")
self.verify = self.dialog.add_button("", Gtk.ResponseType.OK)
self.verify.get_style_context().add_class("suggested-action")
def on_click(obj):
# This will get reset in `self.update()`, but it prevents multiple
# clicks from taking effect if the UI temporarily pauses.
self.verify.set_sensitive(False)
self.verify.connect("clicked", on_click)
self.header = Gtk.Label.new("")
self.header.set_xalign(0)
self.header.get_style_context().add_class("challenge-header")
self.content = self.dialog.get_content_area()
self.content.set_spacing(6)
for dir in ["start", "end", "top", "bottom"]:
getattr(self.content, "set_margin_" + dir)(6)
self.content.pack_start(self.header, False, False, 0)
self.grid = None
self.tiles = []
self.dialog.show_all()
def run(self) -> bool:
"""Returns ``True`` on success, or ``False`` if the dialog was closed
without activating the default button.
"""
return self.dialog.run() == Gtk.ResponseType.OK
def destroy(self):
self.dialog.destroy()
def update(self, pres: "ImageGridChallengePres"):
if pres.same(self.pres):
return
dimensions = pres.dimensions
if dimensions != (self.pres and self.pres.dimensions):
if self.grid is not None:
self.content.remove(self.grid)
self.grid = self.make_grid(dimensions)
self.grid.show_all()
self.content.pack_start(self.grid, True, True, 0)
if not pres.same_goal(self.pres):
self.header.set_markup(pres.goal)
if not pres.same_verify_label(self.pres):
self.verify.set_label(pres.verify_label)
self.verify.set_sensitive(pres.is_verify_enabled)
for tile, tile_pres in zip(self.tiles, pres.tiles):
tile.update(tile_pres)
self.pres = pres
def make_grid(self, dimensions: GridDimensions):
grid = Gtk.Grid.new()
self.tiles = []
for row in range(0, dimensions.rows):
for column in range(0, dimensions.columns):
tile = ChallengeTile(self.dispatch)
grid.attach(tile.widget, column, row, 1, 1)
self.tiles.append(tile)
return grid
def format_goal(goal: ChallengeGoal) -> str:
if goal.raw is None:
return goal.fallback
match = re.fullmatch(r"(.*)<strong>(.*)</strong>(.*)", goal.raw)
if not match:
return html.escape(goal.raw)
groups = match.groups()
return '{}<span size="xx-large">{}</span>{}'.format(
*map(html.escape, [
groups[0] and groups[0] + "\n",
groups[1],
groups[2],
]),
)
def format_goal_with_note(goal: ChallengeGoal, note: str) -> str:
return "{}\n{}".format(format_goal(goal), note)
# Messages
Start = namedtuple("Start", [])
FinishChallenge = namedtuple("FinishChallenge", [])
SelectTile = namedtuple("SelectTile", [
"index", # int
])
ReplaceTile = namedtuple("ReplaceTile", [
"index", # int
"image", # Image.Image
])
SetState = namedtuple("SetState", [
"state", # State
])
SetNextChallenge = namedtuple("NextChallenge", [
"challenge", # ImageGridChallenge
])
def gtk_run(f: Callable[[], Any]):
old_excepthook = sys.excepthook
def excepthook(*args, **kwargs):
old_excepthook(*args, **kwargs)
sys.exit(1)
try:
sys.excepthook = excepthook
return f()
finally:
sys.excepthook = old_excepthook
class Gui:
def __init__(self, rc: ReCaptcha):
self.store = Store(self.final_dispatch, rc)
self.view = ImageGridChallengeDialog(self.dispatch)
self.update_pending = False
@property
def dispatch(self) -> Dispatch:
return self.store.dispatch
@property
def state(self) -> Optional["State"]:
return self.store.state
def final_dispatch(self, msg):
self.store.state = reduce_state(self.state, msg)
if not self.update_pending:
self.update_pending = True
GLib.idle_add(self._update)
def _update(self):
self.update_pending = False
pres = self.pres
if pres is not None:
self.view.update(pres)
return False
def run(self) -> str:
load_css()
self.dispatch(Start())
try:
while self.token is None:
if not gtk_run(self.view.run):
raise UserExit
self.dispatch(FinishChallenge())
finally:
self.view.destroy()
while Gtk.events_pending():
Gtk.main_iteration()
return self.token
@property
def pres(self) -> Optional["ImageGridChallengePres"]:
return pres(self.state)
@property
def token(self) -> Optional[str]:
if isinstance(self.state, str):
return self.state
return None
class Store:
state: Optional["State"]
dispatch: Dispatch
def __init__(self, final_dispatch: Dispatch, rc: ReCaptcha):
self.state = None
middleware = WarningMiddleware(self, final_dispatch)
middleware = SolverMiddleware(self, middleware.dispatch, rc)
self.dispatch = middleware.dispatch
def pres(state: "State") -> Optional["ImageGridChallengePres"]:
return {
DynamicState: DynamicPres,
MultiCaptchaState: MultiCaptchaPres,
}.get(type(state), lambda _: None)(state)
def state_from_solver(solver: Solver) -> "SolverState":
return {
DynamicSolver: DynamicState,
MultiCaptchaSolver: MultiCaptchaState,
}[type(solver)].from_new_solver(solver)
# Returns the new state after applying `msg`.
def reduce_state(state: "State", msg) -> "State":
if type(msg) is SetState:
return msg.state
if type(state) in SOLVER_STATE_TYPES:
return state.reduce(msg)
return state
class SolverMiddleware:
def __init__(self, store: Store, next: Dispatch, rc: ReCaptcha):
self.store = store
self.next = next
self.rc = rc
self.solver = None
self._select_tile_lock = threading.Lock()
def dispatch(self, msg):
if type(msg) is Start:
self.solver = self.rc.first_solver()
self.next(SetState(state_from_solver(self.solver)))
elif isinstance(self.solver, DynamicSolver):
self.dispatch_dynamic(msg)
elif isinstance(self.solver, MultiCaptchaSolver):
self.dispatch_multicaptcha(msg)
else:
self.next(msg)
def dispatch_dynamic(self, msg):
if type(msg) is FinishChallenge:
if self.store.state.num_waiting <= 0:
self.send_solution(self.solver.finish())
elif type(msg) is SelectTile:
self.dynamic_select_tile(msg)
else:
self.next(msg)
def dynamic_select_tile(self, msg: SelectTile):
def select_tile():
with self._select_tile_lock:
tile = self.solver.select_tile(msg.index)
def replace():
self.next(ReplaceTile(index=msg.index, image=tile.image))
return False
GLib.timeout_add(round(tile.delay * 1000), replace)
self.next(ReplaceTile(index=msg.index, image=None))
if self.store.state.num_waiting <= 0:
raise RuntimeError("num_waiting should be greater than 0")
threading.Thread(target=select_tile, daemon=True).start()
def dispatch_multicaptcha(self, msg):
if type(msg) is FinishChallenge:
self.multicaptcha_finish()
else:
self.next(msg)
def multicaptcha_finish(self):
result = self.solver.select_indices(self.store.state.indices)
if isinstance(result, Solution):
self.send_solution(result)
elif isinstance(result, ImageGridChallenge):
self.next(SetNextChallenge(result))
else:
raise TypeError("Unexpected type: {}".format(type(result)))
def send_solution(self, solution: Solution):
self.solver = None
result = self.rc.send_solution(solution)
if not isinstance(result, str):
self.solver = result
result = state_from_solver(result)
self.next(SetState(result))
class WarningMiddleware:
def __init__(self, store: Store, next: Dispatch):
self.next = next
def dispatch(self, msg):
if type(msg) is SetNextChallenge:
self.check_goal(msg.challenge.goal)
elif type(msg) is SetState:
if type(msg.state) in SOLVER_STATE_TYPES:
self.check_goal(msg.state.challenge.goal)
self.next(msg)
def check_goal(self, goal: ChallengeGoal):
if goal.raw is not None:
return
msg = "WARNING: Could not determine challenge objective in: {}"
print(msg.format(goal.fallback), file=sys.stderr)
class DynamicState(namedtuple("DynamicState", [
"challenge", # Challenge
"tile_images", # List[Optional[Image.Image]]
"num_waiting", # int
])):
@classmethod
def from_new_solver(cls, solver: Solver):
challenge = solver.get_challenge()
tiles = list(tiles_from_image(challenge.image, challenge.dimensions))
return cls(
challenge=challenge,
tile_images=tiles,
num_waiting=0,
)
def replace_tile(
self,
index: int,
image: Optional[Image.Image],
) -> "DynamicState":
old_image = self.tile_images[index]
num_waiting = self.num_waiting
if (old_image is None) != (image is None):
num_waiting += 1 if image is None else -1
images = list(self.tile_images)
images[index] = image
return self._replace(tile_images=images, num_waiting=num_waiting)
def reduce(self, msg) -> "DynamicState":
if type(msg) is ReplaceTile:
return self.replace_tile(msg.index, msg.image)
return self
class MultiCaptchaState(namedtuple("MultiCaptchaState", [
"challenge", # Challenge
"tile_images", # List[Image.Image]
"selected", # List[bool]
])):
@classmethod
def from_new_solver(cls, solver: Solver):
return cls.from_challenge(solver.first_challenge())
@classmethod
def from_challenge(cls, challenge: ImageGridChallenge):
tiles = list(tiles_from_image(challenge.image, challenge.dimensions))
return cls(
challenge=challenge,
tile_images=tiles,
selected=([False] * challenge.dimensions.count),
)
def toggle_tile(self, index: int) -> "MultiCaptchaState":
selected = list(self.selected)
selected[index] ^= True
return self._replace(selected=selected)
@property
def indices(self) -> List[int]:
return [i for i, selected in enumerate(self.selected) if selected]
@property
def any_selected(self) -> bool:
return any(self.selected)
def same_any_selected(self, other) -> bool:
return type(self) is type(other) and (
self.selected is other.selected or
self.any_selected is other.any_selected
)
def reduce(self, msg) -> "MultiCaptchaState":
if type(msg) is SelectTile:
return self.toggle_tile(msg.index)
if type(msg) is SetNextChallenge:
return self.from_challenge(msg.challenge)
return self
SOLVER_STATE_TYPES = (MultiCaptchaState, DynamicState)
SolverState = Union[SOLVER_STATE_TYPES]
State = Union[SolverState, str, None]
class ImageGridChallengePres:
def __init__(self, state: SolverState):
self.state = state
def same(self, other) -> bool:
return (
type(self) is type(other) and
self.state is other.state
)
@property
def dimensions(self) -> GridDimensions:
return self.state.challenge.dimensions
@property
def goal(self) -> str:
raise NotImplementedError
def same_goal(self, other) -> bool:
raise NotImplementedError
@property
def verify_label(self) -> str:
return "Ver_ify"
def same_verify_label(self, other) -> bool:
return type(self) is type(other)
@property
def is_verify_enabled(self) -> bool:
return True
class DynamicPres(ImageGridChallengePres):
def __init__(self, state: DynamicState):
super().__init__(state)
@property
def goal(self) -> str:
note = "Click verify once there are none left."
return format_goal_with_note(self.state.challenge.goal, note)
# This method is more efficient than comparing `self.goal` and `other.goal`
# as it avoids formatting the goal strings.
def same_goal(self, other) -> bool:
return (
type(self) is type(other) and
self.state.challenge.goal is other.state.challenge.goal
)
@property
def is_verify_enabled(self) -> bool:
return self.state.num_waiting <= 0
@property
def tiles(self) -> Iterable["DynamicTilePres"]:
for i, image in enumerate(self.state.tile_images):
yield DynamicTilePres(index=i, image=image)
class MultiCaptchaPres(ImageGridChallengePres):
def __init__(self, state: MultiCaptchaState):
super().__init__(state)
@property
def goal(self) -> str:
note = "If there are none, click skip."
if any(self.state.selected):
note = '<span alpha="30%">{}</span>'.format(note)
return format_goal_with_note(self.state.challenge.goal, note)
# This method is more efficient than comparing `self.goal` and `other.goal`
# as it avoids formatting the goal strings.
def same_goal(self, other) -> bool:
return (
type(self) is type(other) and
self.state.challenge.goal is other.state.challenge.goal and
self.state.same_any_selected(other.state)
)
@property
def verify_label(self) -> str:
if self.state.any_selected:
return "Ver_ify"
return "Sk_ip"
def same_verify_label(self, other) -> bool:
return (
type(self) is type(other) and
self.state.same_any_selected(other.state)
)
@property
def tiles(self) -> Iterable["MultiCaptchaTilePres"]:
iterable = enumerate(zip(self.state.tile_images, self.state.selected))
for i, (image, selected) in iterable:
yield MultiCaptchaTilePres(index=i, image=image, selected=selected)
class TilePres:
index: int
image: Image.Image
def __init__(self, index: int, image: Image.Image):
self.index = index
self.image = image
def same(self, other) -> bool:
return (
type(self) is type(other) and
self.index == other.index and
self.image is other.image
)
def on_click(self, dispatch: Dispatch):
dispatch(SelectTile(index=self.index))
class DynamicTilePres(TilePres):
pass
class MultiCaptchaTilePres(TilePres):
selected: bool
def __init__(self, index: int, image: Image.Image, selected: bool):
super().__init__(index, image)
self.selected = selected
def same(self, other) -> bool:
return (
super().same(other) and
self.selected == other.selected
)

View File

@@ -0,0 +1,84 @@
# Copyright (C) 2017, 2019, 2021 taylor.fish <contact@taylor.fish>
#
# This file is part of librecaptcha.
#
# librecaptcha is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# librecaptcha is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with librecaptcha. If not, see <https://www.gnu.org/licenses/>.
from . import cli
from .errors import ChallengeBlockedError, UnknownChallengeError
from .errors import GtkImportError
from .recaptcha import ReCaptcha
__version__ = "0.7.4-dev"
GUI_MISSING_MESSAGE = """\
Error: Could not load the GUI. Is PyGObject installed?
Try (re)installing librecaptcha[gtk] with pip.
For more details, add the --debug option.
"""
CHALLENGE_BLOCKED_MESSAGE = """\
ERROR: Received challenge type "{}".
This is usually an indication that reCAPTCHA requests from this network are
being blocked.
Try installing Tor (the full installation, not just the browser bundle) and
running this program over Tor with the "torsocks" command.
Alternatively, try waiting a while before requesting another challenge over
this network.
"""
UNKNOWN_CHALLENGE_MESSAGE = """\
ERROR: Received unrecognized challenge type "{}".
Currently, the only supported challenge types are "dynamic" and "multicaptcha".
Please file an issue if this problem persists.
"""
def _get_gui():
from . import gui
return gui
def has_gui():
try:
_get_gui()
except GtkImportError:
return False
return True
def get_token(
api_key: str,
site_url: str,
user_agent: str, *,
gui=False,
debug=False,
) -> str:
ui = (_get_gui().Gui if gui else cli.Cli)(ReCaptcha(
api_key=api_key,
site_url=site_url,
user_agent=user_agent,
debug=debug,
))
try:
return ui.run()
except ChallengeBlockedError as e:
print(CHALLENGE_BLOCKED_MESSAGE.format(e.challenge_type))
raise
except UnknownChallengeError as e:
print(UNKNOWN_CHALLENGE_MESSAGE.format(e.challenge_type))
raise

View File

@@ -0,0 +1,613 @@
# Copyright (C) 2017, 2019, 2021 taylor.fish <contact@taylor.fish>
#
# This file is part of librecaptcha.
#
# librecaptcha is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# librecaptcha is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with librecaptcha. If not, see <https://www.gnu.org/licenses/>.
from .errors import ChallengeBlockedError, UnknownChallengeError
from .errors import SiteUrlParseError
from .extract_strings import extract_and_save
from .typing import Dict, Iterable, List, Tuple
import requests
from collections import namedtuple
from html.parser import HTMLParser
from typing import Optional, Union
from urllib.parse import urlparse
import base64
import io
import json
import os
import os.path
import re
import sys
import time
BASE_URL = "https://www.google.com/recaptcha/api2/"
API_JS_URL = "https://www.google.com/recaptcha/api.js"
JS_URL_TEMPLATE = """\
https://www.gstatic.com/recaptcha/releases/{}/recaptcha__en.js
"""[:-1]
STRINGS_VERSION = "0.1.0"
STRINGS_PATH = os.path.join(
os.path.expanduser("~"), ".cache", "librecaptcha", "cached-strings",
)
DYNAMIC_SELECT_DELAY = 4.5 # seconds
FIND_GOAL_SEARCH_DISTANCE = 10
def get_testing_url(url: str) -> str:
return urlparse(url)._replace(
scheme="http",
netloc="localhost:55476",
).geturl()
if os.getenv("LIBRECAPTCHA_USE_TEST_SERVER"):
BASE_URL = get_testing_url(BASE_URL)
API_JS_URL = get_testing_url(API_JS_URL)
JS_URL_TEMPLATE = get_testing_url(JS_URL_TEMPLATE)
def get_full_url(url: str) -> str:
return BASE_URL.rstrip("/") + "/" + url.lstrip("/")
def get_rc_site_url(url: str) -> str:
parsed = urlparse(url)
if not parsed.hostname:
raise SiteUrlParseError("Error: Site URL has no hostname.")
if not parsed.scheme:
raise SiteUrlParseError("Error: Site URL has no scheme.")
if parsed.scheme not in ["http", "https"]:
raise SiteUrlParseError(
"Error: Site URL has invalid scheme: {}".format(parsed.scheme),
)
port = parsed.port
if port is None:
port = {"http": 80, "https": 443}[parsed.scheme]
return "{}://{}:{}".format(parsed.scheme, parsed.hostname, port)
def rc_base64(string: str) -> str:
data = string
if isinstance(string, str):
data = string.encode()
return base64.b64encode(data, b"-_").decode().replace("=", ".")
def load_rc_json(text: str):
return json.loads(text.split("\n", 1)[1])
def get_meta(pmeta, probable_index: int):
if not isinstance(pmeta, list):
raise TypeError("pmeta is not a list: {!r}".format(pmeta))
def matches(meta):
return meta and isinstance(meta, list)
if probable_index < len(pmeta):
meta = pmeta[probable_index]
if matches(meta):
return meta
for child in pmeta:
if matches(child):
return child
raise RuntimeError("Could not find meta; pmeta: {!r}".format(pmeta))
def get_rresp(uvresp):
if not isinstance(uvresp, list):
raise TypeError("uvresp is not a list: {!r}".format(uvresp))
for child in uvresp:
if child and isinstance(child, list) and child[0] == "rresp":
return child
return None
def get_js_strings(user_agent: str, rc_version: str) -> List[str]:
def get_json():
with open(STRINGS_PATH) as f:
version, text = f.read().split("\n", 1)
if version != "{}/{}".format(STRINGS_VERSION, rc_version):
raise OSError("Incorrect version: {}".format(version))
return json.loads(text)
try:
return get_json()
except (OSError, ValueError, json.JSONDecodeError):
pass
result = extract_and_save(
url=JS_URL_TEMPLATE.format(rc_version),
path=STRINGS_PATH,
version=STRINGS_VERSION,
rc_version=rc_version,
user_agent=user_agent,
)
print(file=sys.stderr)
return result
def get_rc_version(user_agent: str) -> str:
match = re.search(r"/recaptcha/releases/(.+?)/", requests.get(
API_JS_URL, headers={
"User-Agent": user_agent,
},
).text)
if match is None:
raise RuntimeError("Could not extract version from api.js.")
return match.group(1)
# def get_image(data: bytes) -> Image.Image:
# image = Image.open(io.BytesIO(data))
# if image.mode in ["RGB", "RGBA"]:
# return image
# return image.convert("RGB")
def varint_encode(n: int, out: bytearray) -> bytes:
if n < 0:
raise ValueError("n must be nonnegative")
while True:
b = n & 127
n >>= 7
if n > 0:
out.append(b | 128)
else:
out.append(b)
break
def protobuf_encode(fields: Iterable[Tuple[int, bytes]]) -> bytes:
result = bytearray()
for num, value in fields:
# Wire type of 2 indicates a length-delimited field.
varint_encode((num << 3) | 2, result)
varint_encode(len(value), result)
result += value
return bytes(result)
def format_reload_protobuf(
rc_version: str,
token: str,
reason: str,
api_key: str,
) -> bytes:
# Note: We're not sending fields 3, 5, and 16.
return protobuf_encode([
(1, rc_version.encode()),
(2, token.encode()),
(6, reason.encode()),
(14, api_key.encode()),
])
class GridDimensions(namedtuple("GridDimensions", [
"rows", # int
"columns", # int
])):
@property
def count(self) -> int:
return self.rows * self.columns
Solution = namedtuple("Solution", [
"response",
])
ImageGridChallenge = namedtuple("ImageGridChallenge", [
"goal", # ChallengeGoal
"image", # Image.Image
"dimensions", # GridDimensions
])
DynamicTile = namedtuple("DynamicTile", [
"image", # Image.Image
"delay", # float
])
class DynamicSolver:
def __init__(self, recaptcha: "ReCaptcha", pmeta):
self.rc = recaptcha
self.selections = []
meta = get_meta(pmeta, 1)
self.meta = meta
self.tile_index_map = list(range(self.num_tiles))
self.last_request_map = [0] * self.num_tiles
self.latest_index = self.num_tiles - 1
self.challenge_retrieved = False
def get_challenge(self) -> ImageGridChallenge:
if self.challenge_retrieved:
raise RuntimeError("Challenge was already retrieved")
self.challenge_retrieved = True
goal = self.rc.get_challenge_goal(self.meta)
image = self._first_image()
return ImageGridChallenge(
goal=goal,
image=image,
dimensions=self.dimensions,
)
def select_tile(self, index: int) -> DynamicTile:
if not self.challenge_retrieved:
raise RuntimeError("Challenge must be retrieved first")
image = self._replace_tile(index)
delay = self.get_timeout(index)
return DynamicTile(image=image, delay=delay)
def finish(self) -> Solution:
if not self.challenge_retrieved:
raise RuntimeError("Challenge must be retrieved first")
return Solution(self.selections)
@property
def final_timeout(self):
return max(self.get_timeout(i) for i in range(self.num_tiles))
@property
def dimensions(self) -> GridDimensions:
return GridDimensions(rows=self.meta[3], columns=self.meta[4])
@property
def num_tiles(self):
return self.dimensions.count
def get_timeout(self, index: int):
elapsed = time.monotonic() - self.last_request_map[index]
duration = max(DYNAMIC_SELECT_DELAY - elapsed, 0)
return duration
def _first_image(self):
return self.rc.get("payload", params={
"p": None,
"k": None,
}).content
def _replace_tile(self, index: int):
real_index = self.tile_index_map[index]
self.selections.append(real_index)
r = self.rc.post("replaceimage", data={
"v": None,
"c": None,
"ds": "[{}]".format(real_index),
})
self.last_request_map[index] = time.monotonic()
data = load_rc_json(r.text)
self.latest_index += 1
self.tile_index_map[index] = self.latest_index
self.rc.current_token = data[1]
self.rc.current_p = data[5]
replacement_id = data[2][0]
# The server might not return any image, but it seems unlikely in
# practice. If it becomes a problem we can handle this case.
return self.rc.get("payload", params={
"p": None,
"k": None,
"id": replacement_id,
}).content
class MultiCaptchaSolver:
def __init__(self, recaptcha: "ReCaptcha", pmeta):
"""The current challenge."""
self.rc = recaptcha
self.selection_groups = []
self.challenge_type = None
self.id = "2"
self.metas = list(get_meta(pmeta, 5)[0])
self.challenge_index = -1
def first_challenge(self) -> ImageGridChallenge:
if self.challenge_index >= 0:
raise RuntimeError("Already retrieved first challenge")
return self._get_challenge(self._first_image())
def select_indices(self, indices) -> Union[ImageGridChallenge, Solution]:
if self.challenge_index < 0:
raise RuntimeError("First challenge wasn't retrieved")
self.selection_groups.append(list(sorted(indices)))
if not self.metas:
return Solution(self.selection_groups)
return self._get_challenge(self._replace_image())
def _get_challenge(self, image):
self.challenge_index += 1
meta = self.metas.pop(0)
dimensions = GridDimensions(rows=meta[3], columns=meta[4])
goal = self.rc.get_challenge_goal(meta)
return ImageGridChallenge(
goal=goal,
image=image,
dimensions=dimensions,
)
def _first_image(self):
return self.rc.get("payload", params={
"c": self.rc.current_token,
"k": self.rc.api_key,
}).content
def _replace_image(self):
selections = self.selection_groups[-1]
r = self.rc.post("replaceimage", data={
"v": None,
"c": self.rc.current_token,
"ds": json.dumps([selections], separators=",:"),
})
data = load_rc_json(r.text)
self.rc.current_token = data[1]
prev_p = self.rc.current_p
self.rc.current_p = data[5]
prev_id = self.id
self.id = (data[2] or [None])[0]
return self.rc.get("payload", params={
"p": prev_p,
"k": None,
"id": prev_id,
}).content
Solver = Union[DynamicSolver, MultiCaptchaSolver]
class ChallengeGoal(namedtuple("ChallengeGoal", [
"raw", # Optional[str]
"meta",
])):
@property
def plain(self) -> Optional[str]:
if self.raw is None:
return None
return self.raw.replace("<strong>", "").replace("</strong>", "")
@property
def fallback(self) -> str:
return json.dumps(self.meta)
class ReCaptcha:
def __init__(self, api_key, site_url, user_agent, debug=False,
make_requests=True):
self.api_key = api_key
self.site_url = get_rc_site_url(site_url)
self.debug = debug
self.co = rc_base64(self.site_url)
self.first_token = None
self.current_token = None
self.current_p = None
self.user_agent = user_agent
self.js_strings = None
self.rc_version = None
if make_requests:
self.rc_version = get_rc_version(self.user_agent)
self.js_strings = get_js_strings(self.user_agent, self.rc_version)
self.solver_index = -1
def first_solver(self) -> Solver:
if self.solver_index >= 0:
raise RuntimeError("First solver was already retrieved")
self._request_first_token()
rresp = self._get_first_rresp()
return self._get_solver(rresp)
def send_solution(self, solution: Solution) -> Union[Solver, str]:
if self.solver_index < 0:
raise RuntimeError("First solver wasn't retrieved")
uvtoken, rresp = self._verify(solution.response)
if rresp is not None:
return self._get_solver(rresp)
if not uvtoken:
raise RuntimeError("Got neither uvtoken nor new rresp.")
return uvtoken
def debug_print(self, *args, **kwargs):
if not self.debug:
return
if len(args) == 1 and callable(args[0]):
args = (args[0](),)
print(*args, file=sys.stderr, **kwargs)
def get_challenge_goal(self, meta) -> ChallengeGoal:
raw = self.find_challenge_goal_text(meta[0])
return ChallengeGoal(raw=raw, meta=meta)
def find_challenge_goal_text(self, id: str, raw=False) -> str:
start = 0
matching_strings = []
def try_find():
nonlocal start
index = self.js_strings.index(id, start)
for i in range(FIND_GOAL_SEARCH_DISTANCE):
next_str = self.js_strings[index + i + 1]
if re.search(r"\bselect all\b", next_str, re.I):
matching_strings.append((i, index, next_str))
start = index + FIND_GOAL_SEARCH_DISTANCE + 1
try:
while True:
try_find()
except (ValueError, IndexError):
pass
try:
goal = min(matching_strings)[2]
except ValueError:
return None
return goal
def get_headers(self, headers: Optional[Dict[str, str]]) -> Dict[str, str]:
headers = headers or {}
updates = {}
if "User-Agent" not in headers:
updates["User-Agent"] = self.user_agent
if updates:
headers = dict(headers)
headers.update(updates)
return headers
def get(self, url, *, params=None, headers=None, allow_errors=None,
**kwargs):
if params is None:
params = {"k": None, "v": None}
if params.get("k", "") is None:
params["k"] = self.api_key
if params.get("v", "") is None:
params["v"] = self.rc_version
if params.get("p", "") is None:
params["p"] = self.current_p
headers = self.get_headers(headers)
r = requests.get(
get_full_url(url), params=params, headers=headers,
**kwargs,
)
self.debug_print(lambda: "[http] [get] {}".format(r.url))
if not (allow_errors is True or r.status_code in (allow_errors or {})):
r.raise_for_status()
return r
def post(self, url, *, params=None, data=None, headers=None,
allow_errors=None, no_debug_response=False, **kwargs):
if params is None:
params = {"k": None}
if data is None:
data = {"v": None}
if params.get("k", "") is None:
params["k"] = self.api_key
if isinstance(data, dict) and data.get("v", "") is None:
data["v"] = self.rc_version
if isinstance(data, dict) and data.get("c", "") is None:
data["c"] = self.current_token
headers = self.get_headers(headers)
r = requests.post(
get_full_url(url), params=params, data=data, headers=headers,
**kwargs,
)
self.debug_print(lambda: "[http] [post] {}".format(r.url))
self.debug_print(lambda: "[http] [post] [data] {!r}".format(data))
if not no_debug_response:
self.debug_print(
lambda: "[http] [post] [response] {}".format(r.text),
)
if not (allow_errors is True or r.status_code in (allow_errors or {})):
r.raise_for_status()
return r
def _request_first_token(self):
class Parser(HTMLParser):
def __init__(p_self):
p_self.token = None
super().__init__()
def handle_starttag(p_self, tag, attrs):
attrs = dict(attrs)
if attrs.get("id") == "recaptcha-token":
p_self.token = attrs.get("value")
# Note: We're not sending "cb".
text = self.get("anchor", params={
"ar": "1",
"k": None,
"co": self.co,
"hl": "en",
"v": None,
"size": "normal",
"sa": "action",
}).text
parser = Parser()
parser.feed(text)
if not parser.token:
raise RuntimeError(
"Could not get first token. Response:\n{}".format(text),
)
self.current_token = parser.token
def _verify(self, response):
response_text = json.dumps({"response": response}, separators=",:")
response_b64 = rc_base64(response_text)
self.debug_print("Sending verify request...")
# Note: We're not sending "t", "ct", and "bg".
r = self.post("userverify", data={
"v": None,
"c": None,
"response": response_b64,
})
uvresp = load_rc_json(r.text)
self.debug_print(lambda: "Got verify response: {!r}".format(uvresp))
rresp = get_rresp(uvresp)
uvresp_token = uvresp[1]
return (uvresp_token, rresp)
def _get_first_rresp(self):
self.debug_print("Getting first rresp...")
r = self.post("reload", data=format_reload_protobuf(
rc_version=self.rc_version,
token=self.current_token,
reason="fi",
api_key=self.api_key,
), headers={
"Content-Type": "application/x-protobuffer",
})
rresp = load_rc_json(r.text)
self.debug_print(lambda: "Got first rresp: {!r}".format(rresp))
return rresp
def _get_solver(self, rresp) -> Solver:
self.solver_index += 1
challenge_type = rresp[5]
self.debug_print(lambda: "Challenge type: {}".format(challenge_type))
pmeta = rresp[4]
self.debug_print(lambda: "pmeta: {}".format(pmeta))
self.current_token = rresp[1]
self.current_p = rresp[9]
self.debug_print(
lambda: "Current token: {}".format(self.current_token),
)
solver_class = {
"dynamic": DynamicSolver,
"multicaptcha": MultiCaptchaSolver,
}.get(challenge_type)
if solver_class is not None:
return solver_class(self, pmeta)
if challenge_type in ["default", "doscaptcha"]:
raise ChallengeBlockedError(challenge_type)
raise UnknownChallengeError(challenge_type)

View File

@@ -0,0 +1,40 @@
# Copyright (C) 2021 taylor.fish <contact@taylor.fish>
#
# This file is part of librecaptcha.
#
# librecaptcha is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# librecaptcha is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with librecaptcha. If not, see <https://www.gnu.org/licenses/>.
from collections.abc import Callable, Iterable
import typing
try:
list[int]
except Exception:
List = typing.List
Dict = typing.Dict
Tuple = typing.Tuple
else:
List = list
Dict = dict
Tuple = tuple
try:
Callable[[], int]
except Exception:
Callable = typing.Callable
try:
Iterable[int]
except Exception:
Iterable = typing.Iterable

View File

@@ -0,0 +1,36 @@
# This file was automatically generated by update_user_agents.py using data
# from <https://techblog.willshouse.com/2012/01/03/most-common-user-agents/>.
# flake8: noqa
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:85.0) Gecko/20100101 Firefox/85.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.146 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.104 Safari/537.36",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:85.0) Gecko/20100101 Firefox/85.0",
"Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64; rv:85.0) Gecko/20100101 Firefox/85.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.2 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36 Edg/88.0.705.63",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.16; rv:85.0) Gecko/20100101 Firefox/85.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36 Edg/88.0.705.74",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:85.0) Gecko/20100101 Firefox/85.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 11_2_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36 Edg/88.0.705.68",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64; rv:78.0) Gecko/20100101 Firefox/78.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 11_2_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 11_2_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.2 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36 OPR/73.0.3856.344"
]

View File

@@ -0,0 +1,23 @@
# Copyright (C) 2017-2019 taylor.fish <contact@taylor.fish>
#
# This file is part of librecaptcha.
#
# librecaptcha is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# librecaptcha is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with librecaptcha. If not, see <https://www.gnu.org/licenses/>.
from .user_agent_data import USER_AGENTS
import random
def random_user_agent():
return random.choice(USER_AGENTS)