Tutoriels

Création de pipelines CAPTCHA clients avec CaptchaAI

Lorsque vous gérez le scraping ou l'automatisation pour plusieurs clients, chaque projet finit par atteindre des CAPTCHA. Au lieu d'écrire du code de résolution unique par projet, créez un pipeline réutilisable. Ce guide parcourt l'architecture.


Architecture des pipelines

┌──────────────┐    ┌───────────────┐    ┌──────────────┐
│  Client A    │──▶ │               │    │              │
│  Client B    │──▶ │  Task Queue   │──▶ │  CaptchaAI   │
│  Client C    │──▶ │               │    │  API         │
└──────────────┘    └───────────────┘    └──────────────┘
                           │                    │
                           ▼                    ▼
                    ┌───────────────┐    ┌──────────────┐
                    │  Result Store │◀── │  Polling      │
                    │  (Redis/DB)   │    │  Workers      │
                    └───────────────┘    └──────────────┘

Composants :

  1. ** Prise de tâches ** – reçoit les demandes de résolution des scrapers clients
  2. File d'attente : met en mémoire tampon les tâches et applique les limites de concurrence par client.
  3. ** Travailleurs du solveur ** – soumettez à CaptchaAI et interrogez les résultats
  4. Magasin de résultats – contient les jetons résolus pour la récupération par le consommateur

Pipeline Python

Classe de base du solveur

import requests
import time
from dataclasses import dataclass
from typing import Optional
from collections import deque
from threading import Lock

SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"

@dataclass
class SolveRequest:
    client_id: str
    method: str
    params: dict
    callback: Optional[callable] = None

@dataclass
class SolveResult:
    client_id: str
    task_id: str
    token: Optional[str] = None
    error: Optional[str] = None


class CaptchaPipeline:
    def __init__(self, api_key: str, max_concurrent: int = 10):
        self.api_key = api_key
        self.max_concurrent = max_concurrent
        self.queue = deque()
        self.active = {}
        self.lock = Lock()

    def enqueue(self, request: SolveRequest):
        with self.lock:
            self.queue.append(request)

    def submit_task(self, request: SolveRequest) -> Optional[str]:
        data = {
            "key": self.api_key,
            "method": request.method,
            "json": 1,
            **request.params
        }

        try:
            resp = requests.post(SUBMIT_URL, data=data, timeout=15)
            result = resp.json()

            if result.get("status") == 1:
                return result["request"]
            else:
                print(f"[{request.client_id}] Submit error: {result.get('error_text', result.get('request'))}")
                return None
        except requests.RequestException as e:
            print(f"[{request.client_id}] Network error: {e}")
            return None

    def poll_result(self, task_id: str, max_wait: int = 120) -> Optional[str]:
        elapsed = 0
        interval = 5
        while elapsed < max_wait:
            time.sleep(interval)
            elapsed += interval

            try:
                resp = requests.get(RESULT_URL, params={
                    "key": self.api_key,
                    "action": "get",
                    "id": task_id,
                    "json": 1
                }, timeout=10)
                result = resp.json()

                if result.get("status") == 1:
                    return result["request"]
                elif result.get("request") == "CAPCHA_NOT_READY":
                    continue
                else:
                    print(f"Poll error for {task_id}: {result.get('error_text', result.get('request'))}")
                    return None
            except requests.RequestException:
                continue

        return None

    def process_queue(self):
        while self.queue or self.active:
            # Fill active slots
            with self.lock:
                while self.queue and len(self.active) < self.max_concurrent:
                    request = self.queue.popleft()
                    task_id = self.submit_task(request)
                    if task_id:
                        self.active[task_id] = request

            # Poll active tasks
            completed = []
            for task_id, request in list(self.active.items()):
                token = self.poll_result(task_id, max_wait=10)
                if token:
                    result = SolveResult(
                        client_id=request.client_id,
                        task_id=task_id,
                        token=token
                    )
                    if request.callback:
                        request.callback(result)
                    completed.append(task_id)

            with self.lock:
                for task_id in completed:
                    del self.active[task_id]

Utilisation multi-clients

pipeline = CaptchaPipeline(api_key="YOUR_API_KEY", max_concurrent=15)

# Client A — reCAPTCHA v2
pipeline.enqueue(SolveRequest(
    client_id="client_a",
    method="userrecaptcha",
    params={
        "googlekey": "6Le-SITEKEY-A",
        "pageurl": "https://client-a-target.com/form"
    },
    callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))

# Client B — Turnstile
pipeline.enqueue(SolveRequest(
    client_id="client_b",
    method="turnstile",
    params={
        "sitekey": "0x4AAAA-SITEKEY-B",
        "pageurl": "https://client-b-target.com/login"
    },
    callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))

pipeline.process_queue()

Pipeline Node.js

const axios = require("axios");

const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";

class CaptchaPipeline {
  constructor(apiKey, maxConcurrent = 10) {
    this.apiKey = apiKey;
    this.maxConcurrent = maxConcurrent;
    this.queue = [];
    this.activeCount = 0;
  }

  enqueue(clientId, method, params) {
    return new Promise((resolve, reject) => {
      this.queue.push({ clientId, method, params, resolve, reject });
      this._processNext();
    });
  }

  async _processNext() {
    if (this.activeCount >= this.maxConcurrent || this.queue.length === 0) return;

    this.activeCount++;
    const task = this.queue.shift();

    try {
      const token = await this._solve(task);
      task.resolve({ clientId: task.clientId, token });
    } catch (err) {
      task.reject(err);
    } finally {
      this.activeCount--;
      this._processNext();
    }
  }

  async _solve(task) {
    const submitResp = await axios.post(SUBMIT_URL, null, {
      params: {
        key: this.apiKey,
        method: task.method,
        json: 1,
        ...task.params,
      },
      timeout: 15000,
    });

    if (submitResp.data.status !== 1) {
      throw new Error(submitResp.data.error_text || submitResp.data.request);
    }

    const taskId = submitResp.data.request;
    return this._poll(taskId);
  }

  async _poll(taskId, maxWait = 120000) {
    const interval = 5000;
    let elapsed = 0;

    while (elapsed < maxWait) {
      await new Promise((r) => setTimeout(r, interval));
      elapsed += interval;

      try {
        const resp = await axios.get(RESULT_URL, {
          params: {
            key: this.apiKey,
            action: "get",
            id: taskId,
            json: 1,
          },
          timeout: 10000,
        });

        if (resp.data.status === 1) return resp.data.request;
        if (resp.data.request !== "CAPCHA_NOT_READY") {
          throw new Error(resp.data.error_text || resp.data.request);
        }
      } catch (err) {
        if (err.response) throw err;
      }
    }

    throw new Error(`Timeout waiting for task ${taskId}`);
  }
}

// Usage
(async () => {
  const pipeline = new CaptchaPipeline("YOUR_API_KEY", 15);

  const results = await Promise.allSettled([
    pipeline.enqueue("client_a", "userrecaptcha", {
      googlekey: "6Le-SITEKEY-A",
      pageurl: "https://client-a-target.com/form",
    }),
    pipeline.enqueue("client_b", "turnstile", {
      sitekey: "0x4AAAA-SITEKEY-B",
      pageurl: "https://client-b-target.com/login",
    }),
  ]);

  results.forEach((r) => {
    if (r.status === "fulfilled") {
      console.log(`[${r.value.clientId}] Token: ${r.value.token.slice(0, 40)}...`);
    } else {
      console.error(`Failed: ${r.reason.message}`);
    }
  });
})();

Configuration par client

Suivez les paramètres par client tels que le proxy, les préférences du solveur et les limites de débit :

CLIENT_CONFIG = {
    "client_a": {
        "proxy": "host:port:user:pass",
        "proxytype": "HTTP",
        "max_concurrent": 5,
        "default_method": "userrecaptcha"
    },
    "client_b": {
        "proxy": None,
        "proxytype": None,
        "max_concurrent": 10,
        "default_method": "turnstile"
    }
}

def build_params(client_id, params):
    config = CLIENT_CONFIG.get(client_id, {})
    if config.get("proxy"):
        params["proxy"] = config["proxy"]
        params["proxytype"] = config["proxytype"]
    return params

Stratégie de gestion des erreurs

Erreur Réponse
ERROR_ZERO_BALANCE Arrêtez la file d'attente, alertez tous les clients
ERROR_NO_SLOT_AVAILABLE Remettre la tâche en file d'attente avec retard
ERROR_WRONG_CAPTCHA_ID Jeter, enregistrer l'erreur
ERROR_CAPTCHA_UNSOLVABLE Réessayez une fois, puis échouez
Délai d'expiration du réseau Réessayer avec interruption (max 3 tentatives)

Dépannage

Problème Parce que Corriger
La file d'attente s'étend sans limite Emplacements actifs pleins Augmentez max_concurrent ou ajoutez des travailleurs
Le rappel ne se déclenche pas La tâche a échoué silencieusement Vérifier le retour d'erreur dans la boucle d'interrogation
Jetons mixtes entre clients Magasin de résultats partagés Résultats clés par client_id + task_id
Erreurs de limite de débit (429) Trop de soumissions simultanées Réduire la concurrence, ajouter un délai de soumission

FAQ

Combien de tâches simultanées dois-je exécuter par client ?

Commencez par 5 à 10. Surveillez les temps de résolution et les taux d'erreur, puis ajustez. CaptchaAI prend en charge une concurrence élevée, mais votre pool de proxys peut être le goulot d'étranglement.

Dois-je utiliser une clé API distincte par client ?

Cela simplifie la facturation. Utilisez le paramètre CaptchaAI soft_id si vous avez besoin d'un suivi sous une seule touche.

Comment gérer les files d’attente de nuit ?

Conserver la file d'attente (Redis ou base de données). Au redémarrage, rechargez les tâches en attente et reprenez le traitement.


Construisez votre pipeline CAPTCHA avec CaptchaAI

Commencez à créer des pipelines de clients àcaptchaai.com.


Guides associés

Les commentaires sont désactivés pour cet article.