Lors du scraping à grande échelle, vous avez besoin de plusieurs résolutions CAPTCHA individuelles. Un système de file d'attente dissocie la soumission CAPTCHA de la récupération des résultats, permettant une résolution parallèle sur des centaines de tâches.
Pourquoi utiliser une file d'attente ?
La résolution de CAPTCHA à requête unique fait perdre du temps à attendre. Un système de file d'attente :
- Soumet tous les CAPTCHA immédiatement
- Interroge plusieurs ID de tâches en parallèle
- Les tentatives échouées sont résolues automatiquement
- Contrôle la simultanéité pour respecter les limites de débit de l'API
- Fournit un suivi des progrès et des rappels
File d'attente de thread de base
import time
import threading
import requests
from queue import Queue, Empty
API_KEY = "YOUR_API_KEY"
class CaptchaQueue:
"""Thread-based CAPTCHA solving queue."""
def __init__(self, api_key, max_workers=10):
self.api_key = api_key
self.task_queue = Queue()
self.result_queue = Queue()
self.max_workers = max_workers
self.workers = []
def submit(self, method, callback=None, **params):
"""Add a CAPTCHA task to the queue."""
task = {
"method": method,
"params": params,
"callback": callback,
}
self.task_queue.put(task)
def start(self):
"""Start worker threads."""
for _ in range(self.max_workers):
t = threading.Thread(target=self._worker, daemon=True)
t.start()
self.workers.append(t)
def wait(self):
"""Wait for all tasks to complete."""
self.task_queue.join()
def get_results(self):
"""Get all available results."""
results = []
while not self.result_queue.empty():
try:
results.append(self.result_queue.get_nowait())
except Empty:
break
return results
def _worker(self):
while True:
try:
task = self.task_queue.get(timeout=1)
except Empty:
continue
try:
result = self._solve(task["method"], **task["params"])
entry = {"status": "solved", "result": result, "task": task}
self.result_queue.put(entry)
if task["callback"]:
task["callback"](result)
except Exception as e:
entry = {"status": "error", "error": str(e), "task": task}
self.result_queue.put(entry)
finally:
self.task_queue.task_done()
def _solve(self, method, **params):
submit = requests.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}, timeout=30).json()
if submit.get("status") != 1:
raise Exception(f"Submit error: {submit.get('request')}")
task_id = submit["request"]
for _ in range(30):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}, timeout=30).json()
if result.get("status") == 1:
return result["request"]
if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
raise Exception("CAPTCHA unsolvable")
raise TimeoutError("Solve timed out")
# Usage
queue = CaptchaQueue(API_KEY, max_workers=5)
queue.start()
# Submit multiple CAPTCHAs
urls_and_sitekeys = [
("https://example.com/page1", "SITEKEY_1"),
("https://example.com/page2", "SITEKEY_2"),
("https://example.com/page3", "SITEKEY_3"),
]
for url, sitekey in urls_and_sitekeys:
queue.submit("userrecaptcha", googlekey=sitekey, pageurl=url)
queue.wait()
results = queue.get_results()
print(f"Solved {len(results)} CAPTCHAs")
for r in results:
print(f" {r['status']}: {r.get('result', r.get('error', ''))[:50]}")
File d'attente asynchrone avec asyncio
import asyncio
import aiohttp
API_KEY = "YOUR_API_KEY"
class AsyncCaptchaQueue:
"""Async CAPTCHA solving queue with concurrency control."""
def __init__(self, api_key, max_concurrent=10):
self.api_key = api_key
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
async def solve_batch(self, tasks):
"""Solve a batch of CAPTCHA tasks concurrently."""
coros = [self._solve_task(task) for task in tasks]
self.results = await asyncio.gather(*coros, return_exceptions=True)
return self.results
async def _solve_task(self, task):
async with self.semaphore:
return await self._solve(task["method"], **task["params"])
async def _solve(self, method, **params):
async with aiohttp.ClientSession() as session:
# Submit
async with session.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}) as resp:
data = await resp.json(content_type=None)
if data.get("status") != 1:
raise Exception(f"Submit error: {data.get('request')}")
task_id = data["request"]
# Poll
for _ in range(30):
await asyncio.sleep(5)
async with session.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") == 1:
return result["request"]
if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
raise Exception("CAPTCHA unsolvable")
raise TimeoutError("Solve timed out")
# Usage
async def main():
queue = AsyncCaptchaQueue(API_KEY, max_concurrent=5)
tasks = [
{"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
for i in range(10)
]
results = await queue.solve_batch(tasks)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i}: ERROR — {result}")
else:
print(f"Task {i}: {result[:50]}...")
asyncio.run(main())
Modèle producteur-consommateur
Pour les charges de travail de scraping continu où les pages sont découvertes dynamiquement :
import asyncio
import aiohttp
API_KEY = "YOUR_API_KEY"
class ProducerConsumerQueue:
"""Continuous CAPTCHA solving with producer-consumer pattern."""
def __init__(self, api_key, queue_size=100, num_consumers=5):
self.api_key = api_key
self.queue = asyncio.Queue(maxsize=queue_size)
self.num_consumers = num_consumers
self.solved_count = 0
self.error_count = 0
self.running = True
async def produce(self, tasks):
"""Producer: feed CAPTCHA tasks into the queue."""
for task in tasks:
await self.queue.put(task)
# Signal consumers to stop
for _ in range(self.num_consumers):
await self.queue.put(None)
async def consume(self, result_handler):
"""Consumer: solve CAPTCHAs and call result handler."""
async with aiohttp.ClientSession() as session:
while True:
task = await self.queue.get()
if task is None:
self.queue.task_done()
break
try:
result = await self._solve(session, task["method"], **task["params"])
self.solved_count += 1
if result_handler:
await result_handler(task, result)
except Exception as e:
self.error_count += 1
print(f"Error: {e}")
finally:
self.queue.task_done()
async def run(self, tasks, result_handler=None):
"""Run the producer-consumer pipeline."""
# Start producer
producer = asyncio.create_task(self.produce(tasks))
# Start consumers
consumers = [
asyncio.create_task(self.consume(result_handler))
for _ in range(self.num_consumers)
]
# Wait for everything to finish
await producer
await asyncio.gather(*consumers)
print(f"Complete: {self.solved_count} solved, {self.error_count} errors")
async def _solve(self, session, method, **params):
async with session.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}) as resp:
data = await resp.json(content_type=None)
if data.get("status") != 1:
raise Exception(f"Submit: {data.get('request')}")
task_id = data["request"]
for _ in range(30):
await asyncio.sleep(5)
async with session.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") == 1:
return result["request"]
raise TimeoutError("Timed out")
# Usage
async def handle_result(task, token):
url = task["params"]["pageurl"]
print(f"Solved for {url}: {token[:30]}...")
async def main():
queue = ProducerConsumerQueue(API_KEY, num_consumers=5)
tasks = [
{"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
for i in range(20)
]
await queue.run(tasks, result_handler=handle_result)
asyncio.run(main())
File d'attente prioritaire
Quand certains CAPTCHA sont plus importants que d’autres :
import asyncio
from dataclasses import dataclass, field
API_KEY = "YOUR_API_KEY"
@dataclass(order=True)
class PriorityTask:
priority: int
task: dict = field(compare=False)
class PriorityCaptchaQueue:
"""CAPTCHA queue with priority levels."""
def __init__(self, api_key, num_workers=5):
self.api_key = api_key
self.queue = asyncio.PriorityQueue()
self.num_workers = num_workers
self.results = {}
async def submit(self, task_id, method, priority=5, **params):
"""Submit with priority (lower number = higher priority)."""
await self.queue.put(PriorityTask(
priority=priority,
task={"id": task_id, "method": method, "params": params},
))
async def process(self):
"""Process all queued tasks by priority."""
workers = [asyncio.create_task(self._worker()) for _ in range(self.num_workers)]
# Wait for queue to drain
await self.queue.join()
# Cancel workers
for w in workers:
w.cancel()
return self.results
async def _worker(self):
import aiohttp
async with aiohttp.ClientSession() as session:
while True:
item = await self.queue.get()
task = item.task
try:
result = await self._solve(session, task["method"], **task["params"])
self.results[task["id"]] = {"status": "solved", "token": result}
except Exception as e:
self.results[task["id"]] = {"status": "error", "error": str(e)}
finally:
self.queue.task_done()
async def _solve(self, session, method, **params):
import aiohttp
async with session.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key, "method": method, "json": 1, **params,
}) as resp:
data = await resp.json(content_type=None)
if data.get("status") != 1:
raise Exception(data.get("request"))
task_id = data["request"]
for _ in range(30):
await asyncio.sleep(5)
async with session.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get", "id": task_id, "json": 1,
}) as resp:
result = await resp.json(content_type=None)
if result.get("status") == 1:
return result["request"]
raise TimeoutError()
# Usage
async def main():
pq = PriorityCaptchaQueue(API_KEY, num_workers=3)
# High priority — checkout pages
await pq.submit("checkout_1", "turnstile", priority=1, sitekey="KEY", pageurl="https://shop.com/checkout")
# Normal priority — product pages
for i in range(5):
await pq.submit(f"product_{i}", "userrecaptcha", priority=5, googlekey="KEY", pageurl=f"https://shop.com/p/{i}")
# Low priority — info pages
for i in range(3):
await pq.submit(f"info_{i}", "userrecaptcha", priority=10, googlekey="KEY", pageurl=f"https://shop.com/info/{i}")
results = await pq.process()
for task_id, result in results.items():
print(f"{task_id}: {result['status']}")
asyncio.run(main())
Surveillance et reporting
import time
from dataclasses import dataclass, field
@dataclass
class QueueMetrics:
submitted: int = 0
solved: int = 0
failed: int = 0
total_solve_time: float = 0.0
start_time: float = field(default_factory=time.time)
@property
def avg_solve_time(self):
return self.total_solve_time / self.solved if self.solved else 0
@property
def success_rate(self):
total = self.solved + self.failed
return (self.solved / total * 100) if total else 0
@property
def throughput(self):
elapsed = time.time() - self.start_time
return self.solved / elapsed * 60 if elapsed > 0 else 0
def report(self):
return (
f"Submitted: {self.submitted} | "
f"Solved: {self.solved} | "
f"Failed: {self.failed} | "
f"Avg time: {self.avg_solve_time:.1f}s | "
f"Success: {self.success_rate:.1f}% | "
f"Throughput: {self.throughput:.0f}/min"
)
Dépannage
| Symptôme | Parce que | Corriger |
|---|---|---|
| La file d'attente s'allonge mais les tâches ne se terminent pas | Trop de travailleurs submergent l'API | Réduire max_workers / max_concurrent |
ERROR_NO_SLOT_AVAILABLE |
Limite de simultanéité de l'API atteinte | Ajouter un délai entre les soumissions |
| Tâches bloquées dans la file d'attente | Les threads de travail sont morts exceptionnellement | Envelopper la boucle de travail dans try/except |
| La mémoire augmente avec le temps | Résultats non consommés | Appelez périodiquement get_results() |
| Blocs de file d'attente asynchrone | await manquant |
Assurez-vous que tous les appels asynchrones sont attendus |
Questions fréquemment posées
Combien de résolutions simultanées puis-je exécuter ?
CaptchaAI gère la concurrence côté serveur. Commencez avec 10 travailleurs simultanés et augmentez en fonction des limites de votre plan. Vérifiez ERROR_NO_SLOT_AVAILABLE pour savoir quand accélérer.
Dois-je utiliser le threading ou l’asyncio ?
Utilisez asyncio pour les nouveaux projets - il gère plus efficacement la résolution de I/O-bound CAPTCHA. Utilisez le threading si vous l'intégrez dans du code synchrone existant.
Comment gérer les limites de débit de l'API ?
Utilisez un sémaphore (asynchrone) ou une file d'attente limitée (threading) pour limiter les requêtes simultanées. Ajoutez un court délai entre les soumissions si vous appuyez sur ERROR_NO_SLOT_AVAILABLE.
Résumé
Une file d'attente de résolution CAPTCHA dissocie la soumission de l'interrogation, permettant une résolution parallèle avecCaptchaAI. Choisissez le threading pour le code synchrone, l'asyncio pour le Python moderne et le producteur-consommateur pour les charges de travail continues.
Articles connexes
- Guide complet du dramaturge Python Captchaai
- Création de pipelines Captcha client Captchaai
- Construire une automatisation responsable Captchaai