Lorsque vous gérez le scraping ou l'automatisation pour plusieurs clients, chaque projet finit par atteindre des CAPTCHA. Au lieu d'écrire du code de résolution unique par projet, créez un pipeline réutilisable. Ce guide parcourt l'architecture.
Architecture des pipelines
┌──────────────┐ ┌───────────────┐ ┌──────────────┐
│ Client A │──▶ │ │ │ │
│ Client B │──▶ │ Task Queue │──▶ │ CaptchaAI │
│ Client C │──▶ │ │ │ API │
└──────────────┘ └───────────────┘ └──────────────┘
│ │
▼ ▼
┌───────────────┐ ┌──────────────┐
│ Result Store │◀── │ Polling │
│ (Redis/DB) │ │ Workers │
└───────────────┘ └──────────────┘
Composants :
- ** Prise de tâches ** – reçoit les demandes de résolution des scrapers clients
- File d'attente : met en mémoire tampon les tâches et applique les limites de concurrence par client.
- ** Travailleurs du solveur ** – soumettez à CaptchaAI et interrogez les résultats
- Magasin de résultats – contient les jetons résolus pour la récupération par le consommateur
Pipeline Python
Classe de base du solveur
import requests
import time
from dataclasses import dataclass
from typing import Optional
from collections import deque
from threading import Lock
SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"
@dataclass
class SolveRequest:
client_id: str
method: str
params: dict
callback: Optional[callable] = None
@dataclass
class SolveResult:
client_id: str
task_id: str
token: Optional[str] = None
error: Optional[str] = None
class CaptchaPipeline:
def __init__(self, api_key: str, max_concurrent: int = 10):
self.api_key = api_key
self.max_concurrent = max_concurrent
self.queue = deque()
self.active = {}
self.lock = Lock()
def enqueue(self, request: SolveRequest):
with self.lock:
self.queue.append(request)
def submit_task(self, request: SolveRequest) -> Optional[str]:
data = {
"key": self.api_key,
"method": request.method,
"json": 1,
**request.params
}
try:
resp = requests.post(SUBMIT_URL, data=data, timeout=15)
result = resp.json()
if result.get("status") == 1:
return result["request"]
else:
print(f"[{request.client_id}] Submit error: {result.get('error_text', result.get('request'))}")
return None
except requests.RequestException as e:
print(f"[{request.client_id}] Network error: {e}")
return None
def poll_result(self, task_id: str, max_wait: int = 120) -> Optional[str]:
elapsed = 0
interval = 5
while elapsed < max_wait:
time.sleep(interval)
elapsed += interval
try:
resp = requests.get(RESULT_URL, params={
"key": self.api_key,
"action": "get",
"id": task_id,
"json": 1
}, timeout=10)
result = resp.json()
if result.get("status") == 1:
return result["request"]
elif result.get("request") == "CAPCHA_NOT_READY":
continue
else:
print(f"Poll error for {task_id}: {result.get('error_text', result.get('request'))}")
return None
except requests.RequestException:
continue
return None
def process_queue(self):
while self.queue or self.active:
# Fill active slots
with self.lock:
while self.queue and len(self.active) < self.max_concurrent:
request = self.queue.popleft()
task_id = self.submit_task(request)
if task_id:
self.active[task_id] = request
# Poll active tasks
completed = []
for task_id, request in list(self.active.items()):
token = self.poll_result(task_id, max_wait=10)
if token:
result = SolveResult(
client_id=request.client_id,
task_id=task_id,
token=token
)
if request.callback:
request.callback(result)
completed.append(task_id)
with self.lock:
for task_id in completed:
del self.active[task_id]
Utilisation multi-clients
pipeline = CaptchaPipeline(api_key="YOUR_API_KEY", max_concurrent=15)
# Client A — reCAPTCHA v2
pipeline.enqueue(SolveRequest(
client_id="client_a",
method="userrecaptcha",
params={
"googlekey": "6Le-SITEKEY-A",
"pageurl": "https://client-a-target.com/form"
},
callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))
# Client B — Turnstile
pipeline.enqueue(SolveRequest(
client_id="client_b",
method="turnstile",
params={
"sitekey": "0x4AAAA-SITEKEY-B",
"pageurl": "https://client-b-target.com/login"
},
callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))
pipeline.process_queue()
Pipeline Node.js
const axios = require("axios");
const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";
class CaptchaPipeline {
constructor(apiKey, maxConcurrent = 10) {
this.apiKey = apiKey;
this.maxConcurrent = maxConcurrent;
this.queue = [];
this.activeCount = 0;
}
enqueue(clientId, method, params) {
return new Promise((resolve, reject) => {
this.queue.push({ clientId, method, params, resolve, reject });
this._processNext();
});
}
async _processNext() {
if (this.activeCount >= this.maxConcurrent || this.queue.length === 0) return;
this.activeCount++;
const task = this.queue.shift();
try {
const token = await this._solve(task);
task.resolve({ clientId: task.clientId, token });
} catch (err) {
task.reject(err);
} finally {
this.activeCount--;
this._processNext();
}
}
async _solve(task) {
const submitResp = await axios.post(SUBMIT_URL, null, {
params: {
key: this.apiKey,
method: task.method,
json: 1,
...task.params,
},
timeout: 15000,
});
if (submitResp.data.status !== 1) {
throw new Error(submitResp.data.error_text || submitResp.data.request);
}
const taskId = submitResp.data.request;
return this._poll(taskId);
}
async _poll(taskId, maxWait = 120000) {
const interval = 5000;
let elapsed = 0;
while (elapsed < maxWait) {
await new Promise((r) => setTimeout(r, interval));
elapsed += interval;
try {
const resp = await axios.get(RESULT_URL, {
params: {
key: this.apiKey,
action: "get",
id: taskId,
json: 1,
},
timeout: 10000,
});
if (resp.data.status === 1) return resp.data.request;
if (resp.data.request !== "CAPCHA_NOT_READY") {
throw new Error(resp.data.error_text || resp.data.request);
}
} catch (err) {
if (err.response) throw err;
}
}
throw new Error(`Timeout waiting for task ${taskId}`);
}
}
// Usage
(async () => {
const pipeline = new CaptchaPipeline("YOUR_API_KEY", 15);
const results = await Promise.allSettled([
pipeline.enqueue("client_a", "userrecaptcha", {
googlekey: "6Le-SITEKEY-A",
pageurl: "https://client-a-target.com/form",
}),
pipeline.enqueue("client_b", "turnstile", {
sitekey: "0x4AAAA-SITEKEY-B",
pageurl: "https://client-b-target.com/login",
}),
]);
results.forEach((r) => {
if (r.status === "fulfilled") {
console.log(`[${r.value.clientId}] Token: ${r.value.token.slice(0, 40)}...`);
} else {
console.error(`Failed: ${r.reason.message}`);
}
});
})();
Configuration par client
Suivez les paramètres par client tels que le proxy, les préférences du solveur et les limites de débit :
CLIENT_CONFIG = {
"client_a": {
"proxy": "host:port:user:pass",
"proxytype": "HTTP",
"max_concurrent": 5,
"default_method": "userrecaptcha"
},
"client_b": {
"proxy": None,
"proxytype": None,
"max_concurrent": 10,
"default_method": "turnstile"
}
}
def build_params(client_id, params):
config = CLIENT_CONFIG.get(client_id, {})
if config.get("proxy"):
params["proxy"] = config["proxy"]
params["proxytype"] = config["proxytype"]
return params
Stratégie de gestion des erreurs
| Erreur | Réponse |
|---|---|
ERROR_ZERO_BALANCE |
Arrêtez la file d'attente, alertez tous les clients |
ERROR_NO_SLOT_AVAILABLE |
Remettre la tâche en file d'attente avec retard |
ERROR_WRONG_CAPTCHA_ID |
Jeter, enregistrer l'erreur |
ERROR_CAPTCHA_UNSOLVABLE |
Réessayez une fois, puis échouez |
| Délai d'expiration du réseau | Réessayer avec interruption (max 3 tentatives) |
Dépannage
| Problème | Parce que | Corriger |
|---|---|---|
| La file d'attente s'étend sans limite | Emplacements actifs pleins | Augmentez max_concurrent ou ajoutez des travailleurs |
| Le rappel ne se déclenche pas | La tâche a échoué silencieusement | Vérifier le retour d'erreur dans la boucle d'interrogation |
| Jetons mixtes entre clients | Magasin de résultats partagés | Résultats clés par client_id + task_id |
| Erreurs de limite de débit (429) | Trop de soumissions simultanées | Réduire la concurrence, ajouter un délai de soumission |
FAQ
Combien de tâches simultanées dois-je exécuter par client ?
Commencez par 5 à 10. Surveillez les temps de résolution et les taux d'erreur, puis ajustez. CaptchaAI prend en charge une concurrence élevée, mais votre pool de proxys peut être le goulot d'étranglement.
Dois-je utiliser une clé API distincte par client ?
Cela simplifie la facturation. Utilisez le paramètre CaptchaAI soft_id si vous avez besoin d'un suivi sous une seule touche.
Comment gérer les files d’attente de nuit ?
Conserver la file d'attente (Redis ou base de données). Au redémarrage, rechargez les tâches en attente et reprenez le traitement.
Construisez votre pipeline CAPTCHA avec CaptchaAI
Commencez à créer des pipelines de clients àcaptchaai.com.
Guides associés
- Résolution parallèle de CAPTCHA
- Implémentation de la logique de nouvelle tentative
- Traitement distribué de la file d'attente Redis
- Script de surveillance du bilan de santé