Tutoriels API

Résoudre plusieurs CAPTCHA en lot avec CaptchaAI

Quand vous traitez quelques pages, résoudre les CAPTCHA l'un après l'autre peut suffire. Dès que vous passez à des centaines de pages, cette approche devient un goulot d'étranglement. L'objectif n'est plus seulement de résoudre un challenge, mais d'organiser un pipeline de soumission, de polling et de récupération des résultats à débit plus élevé.

CaptchaAI permet ce fonctionnement en parallèle, à condition de bien séparer la soumission, le suivi des identifiants de tâche et la collecte des résultats.


Séquentiel contre parallèle

Sequential (slow):
  Submit #1 → Poll → Result (15s)
  Submit #2 → Poll → Result (15s)
  Submit #3 → Poll → Result (15s)
  Total: ~45s for 3 solves

Parallel (fast):
  Submit #1 ─┐
  Submit #2 ─┤→ Poll all → Results arrive
  Submit #3 ─┘
  Total: ~15s for 3 solves

Concurrence de base

import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"


def submit_task(method, **params):
    """Submit a single CAPTCHA task."""
    data = {"key": API_KEY, "method": method, "json": 1}
    data.update(params)
    resp = requests.post(f"{BASE_URL}/in.php", data=data, timeout=30)
    result = resp.json()
    if result.get("status") != 1:
        raise RuntimeError(f"Submit error: {result.get('request')}")
    return result["request"]


def poll_result(task_id, timeout=120):
    """Poll until result is ready."""
    start = time.time()
    while time.time() - start < timeout:
        time.sleep(5)
        resp = requests.get(f"{BASE_URL}/res.php", params={
            "key": API_KEY, "action": "get",
            "id": task_id, "json": 1,
        }, timeout=15)
        data = resp.json()
        if data["request"] != "CAPCHA_NOT_READY":
            return data["request"]
    raise TimeoutError(f"Task {task_id} timeout")


def solve_one(sitekey, pageurl):
    """Submit and poll a single task."""
    task_id = submit_task("userrecaptcha", googlekey=sitekey, pageurl=pageurl)
    token = poll_result(task_id)
    return {"url": pageurl, "token": token}


def batch_solve(tasks, max_workers=10):
    """Solve multiple CAPTCHAs in parallel."""
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(solve_one, t["sitekey"], t["url"]): t
            for t in tasks
        }
        for future in as_completed(futures):
            task = futures[future]
            try:
                result = future.result()
                results.append(result)
                print(f"Solved: {result['url']}")
            except Exception as e:
                print(f"Failed: {task['url']} - {e}")
                results.append({"url": task["url"], "token": None, "error": str(e)})

    return results


# Usage
tasks = [
    {"sitekey": "SITE_KEY_1", "url": "https://example.com/page1"},
    {"sitekey": "SITE_KEY_2", "url": "https://example.com/page2"},
    {"sitekey": "SITE_KEY_3", "url": "https://example.com/page3"},
]

results = batch_solve(tasks, max_workers=5)
print(f"Solved {sum(1 for r in results if r.get('token'))}/{len(tasks)}")

Solveur batch asynchrone

Si vous avez besoin d'un niveau de concurrence plus élevé, asyncio devient rapidement plus efficace que des threads classiques.

import asyncio
import aiohttp
import time

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"


async def submit_task_async(session, method, **params):
    data = {"key": API_KEY, "method": method, "json": 1}
    data.update(params)
    async with session.post(f"{BASE_URL}/in.php", data=data) as resp:
        result = await resp.json()
        if result.get("status") != 1:
            raise RuntimeError(f"Submit error: {result.get('request')}")
        return result["request"]


async def poll_result_async(session, task_id, timeout=120):
    start = time.time()
    while time.time() - start < timeout:
        await asyncio.sleep(5)
        params = {
            "key": API_KEY, "action": "get",
            "id": task_id, "json": 1,
        }
        async with session.get(f"{BASE_URL}/res.php", params=params) as resp:
            data = await resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                return data["request"]
    raise TimeoutError(f"Task {task_id} timeout")


async def solve_one_async(session, sitekey, pageurl):
    task_id = await submit_task_async(
        session, "userrecaptcha",
        googlekey=sitekey, pageurl=pageurl,
    )
    token = await poll_result_async(session, task_id)
    return {"url": pageurl, "token": token}


async def batch_solve_async(tasks, max_concurrent=20):
    """Solve many CAPTCHAs concurrently with asyncio."""
    semaphore = asyncio.Semaphore(max_concurrent)
    results = []

    async def solve_with_limit(task):
        async with semaphore:
            try:
                result = await solve_one_async(
                    session, task["sitekey"], task["url"],
                )
                return result
            except Exception as e:
                return {"url": task["url"], "token": None, "error": str(e)}

    async with aiohttp.ClientSession() as session:
        coros = [solve_with_limit(t) for t in tasks]
        results = await asyncio.gather(*coros)

    return results


# Usage
tasks = [
    {"sitekey": "KEY", "url": f"https://example.com/page{i}"}
    for i in range(50)
]

results = asyncio.run(batch_solve_async(tasks, max_concurrent=20))
solved = sum(1 for r in results if r.get("token"))
print(f"Solved: {solved}/{len(tasks)}")

Pattern submit puis poll

Pour augmenter le débit, vous pouvez soumettre toutes les tâches d'abord, puis lancer le polling dans un second temps.

import requests
import time

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"


def batch_submit(tasks):
    """Submit all tasks first, return task IDs."""
    submitted = []
    for task in tasks:
        try:
            data = {
                "key": API_KEY,
                "method": "userrecaptcha",
                "googlekey": task["sitekey"],
                "pageurl": task["url"],
                "json": 1,
            }
            resp = requests.post(f"{BASE_URL}/in.php", data=data, timeout=30)
            result = resp.json()
            if result.get("status") == 1:
                submitted.append({
                    "task_id": result["request"],
                    "url": task["url"],
                })
            time.sleep(0.1)  # Brief delay between submits
        except Exception as e:
            print(f"Submit failed for {task['url']}: {e}")
    return submitted


def batch_poll(submitted, timeout=120):
    """Poll all submitted tasks until complete."""
    pending = {s["task_id"]: s for s in submitted}
    results = []
    start = time.time()

    while pending and time.time() - start < timeout:
        time.sleep(5)
        for task_id in list(pending.keys()):
            try:
                resp = requests.get(f"{BASE_URL}/res.php", params={
                    "key": API_KEY, "action": "get",
                    "id": task_id, "json": 1,
                }, timeout=15)
                data = resp.json()
                if data["request"] != "CAPCHA_NOT_READY":
                    info = pending.pop(task_id)
                    results.append({
                        "url": info["url"],
                        "token": data["request"],
                    })
            except Exception:
                pass

    # Mark remaining as failed
    for task_id, info in pending.items():
        results.append({"url": info["url"], "token": None, "error": "timeout"})

    return results


# Usage
tasks = [
    {"sitekey": "KEY", "url": f"https://example.com/page{i}"}
    for i in range(20)
]

submitted = batch_submit(tasks)
print(f"Submitted {len(submitted)} tasks")

results = batch_poll(submitted)
solved = sum(1 for r in results if r.get("token"))
print(f"Solved: {solved}/{len(tasks)}")

Repères de débit

Tâches simultanées Vitesse approximative Idéal pour
1-5 3-5 résolutions/min Tests et petits scripts
5-20 15-60 résolutions/min Scraping ou automation en production
20-50 60-150 résolutions/min Pipelines à gros volume
50-100 150-300 résolutions/min Charges élevées bien encadrées

Depannage

Probleme Cause probable Correctif
Limite de taux 429 Trop de soumissions par seconde Ajoutez un petit délai entre les envois
Beaucoup de timeouts Polling trop court Montez à 120 ou 180 secondes
Rendement qui plafonne Réseau ou modèle de concurrence mal adapté Passez à aiohttp pour les gros volumes
Résultats associés au mauvais job Mauvais suivi des task IDs Indexez tout par task_id

FAQ

Combien de tâches lancer en même temps ?

Commencez de façon conservative, par exemple 10 à 20, puis augmentez selon vos temps de résolution, votre réseau et votre taux de succès.

Threads ou async ?

Les threads suffisent pour des lots modestes. Au-delà, asyncio et aiohttp donnent souvent un meilleur ratio entre débit et coût d'exécution.

Le batch coûte-t-il plus cher ?

Non. Vous payez toujours à la tâche. Le gain se situe surtout sur le temps total de traitement et l'efficacité du pipeline.


Guides connexes


Passez à la résolution haute cadence : essayez CaptchaAI pour traiter vos lots de CAPTCHA plus efficacement.

Les commentaires sont désactivés pour cet article.