Un pool statique de workers est rarement optimal. Quand le trafic est faible, vous payez trop cher pour de la capacité inutile. Quand la charge monte, les mêmes workers deviennent un goulot d'étranglement. L'auto-scaling permet d'ajuster le nombre de workers à la demande réelle, en fonction de la file d'attente, de la latence observée et de l'état global du système.
L'objectif n'est pas seulement d'ajouter des workers, mais de le faire au bon moment, avec de bons signaux, et sans vider le solde du compte ni provoquer des oscillations permanentes entre scale-up et scale-down.
Signaux utiles pour piloter le scaling
| Signal | Monter quand | Redescendre quand |
|---|---|---|
| Profondeur de file | > 20 tâches en attente | < 5 tâches |
| Utilisation workers | > 80 % occupés | < 20 % |
| Latence de résolution | P95 > 60 s | P95 < 20 s |
| Taux d'erreur | > 5 % | Stable sous 1 % |
| Solde | N/A | Solde trop bas, stop du scale-up |
Auto-scaling avec des threads
Si votre worker passe surtout son temps à appeler des API et à attendre des résultats, une stratégie basée sur les threads peut suffire. C'est souvent le cas avec CaptchaAI, où la charge est principalement I/O.
import os
import time
import threading
import requests
import json
import redis
class AutoScalingPool:
"""Dynamically scale CaptchaAI worker threads."""
def __init__(self, api_key, redis_url="redis://localhost:6379"):
self.api_key = api_key
self.redis = redis.from_url(redis_url)
self.base = "https://ocr.captchaai.com"
self.queue_key = "captcha:tasks"
self.results_key = "captcha:results"
self.min_workers = 2
self.max_workers = 20
self.workers = []
self.active_count = 0
self.lock = threading.Lock()
self.running = True
def start(self):
"""Start the pool with minimum workers."""
for _ in range(self.min_workers):
self._add_worker()
# Start scaler in background
scaler = threading.Thread(target=self._scaling_loop, daemon=True)
scaler.start()
print(f"Pool started with {self.min_workers} workers")
def _add_worker(self):
"""Add a worker thread."""
if len(self.workers) >= self.max_workers:
return
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self.workers.append(t)
def _remove_worker(self):
"""Signal one worker to stop (lazy removal)."""
if len(self.workers) <= self.min_workers:
return
self.workers.pop() # Thread will exit on next idle cycle
def _worker_loop(self):
"""Worker loop: fetch and process tasks."""
while self.running and threading.current_thread() in self.workers:
result = self.redis.blpop(self.queue_key, timeout=10)
if result is None:
continue
_, raw = result
task = json.loads(raw)
task_id = task["id"]
with self.lock:
self.active_count += 1
try:
token = self._solve(task["method"], task["params"])
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "success", "token": token,
}))
except Exception as e:
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "error", "error": str(e),
}))
finally:
with self.lock:
self.active_count -= 1
def _scaling_loop(self):
"""Periodically adjust worker count."""
while self.running:
time.sleep(10)
queue_depth = self.redis.llen(self.queue_key)
current = len(self.workers)
utilization = (
self.active_count / current * 100 if current > 0 else 0
)
# Scale up: queue growing and workers busy
if queue_depth > 20 and utilization > 70:
new_count = min(current + 2, self.max_workers)
while len(self.workers) < new_count:
self._add_worker()
print(f"Scaled up: {current} → {len(self.workers)} workers")
# Scale down: queue empty and workers idle
elif queue_depth < 5 and utilization < 20:
target = max(current - 1, self.min_workers)
while len(self.workers) > target:
self._remove_worker()
if len(self.workers) < current:
print(f"Scaled down: {current} → {len(self.workers)} workers")
def _solve(self, method, params, timeout=120):
data = {"key": self.api_key, "method": method, "json": 1}
data.update(params)
resp = requests.post(
f"{self.base}/in.php", data=data, timeout=30,
)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(result.get("request"))
captcha_id = result["request"]
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key,
"action": "get",
"id": captcha_id,
"json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(data["request"])
raise TimeoutError("Solve timeout")
def stats(self):
return {
"workers": len(self.workers),
"active": self.active_count,
"queue": self.redis.llen(self.queue_key),
}
# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()
# Monitor
while True:
print(pool.stats())
time.sleep(30)
Auto-scaling avec des processus
Quand vous ajoutez du prétraitement d'image, du parsing lourd ou d'autres tâches CPU, il peut être plus propre d'isoler les workers dans des processus distincts.
import multiprocessing
import time
import redis
import os
class ProcessScaler:
"""Scale worker processes based on queue depth."""
def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
self.worker_fn = worker_fn
self.redis = redis.from_url(redis_url)
self.processes = []
self.min_workers = 2
self.max_workers = 16
def run(self, check_interval=15):
"""Run the scaler loop."""
# Start minimum workers
for _ in range(self.min_workers):
self._spawn()
while True:
time.sleep(check_interval)
self._cleanup_dead()
queue_depth = self.redis.llen("captcha:tasks")
current = len(self.processes)
# Scale up
if queue_depth > current * 5 and current < self.max_workers:
to_add = min(
max(1, queue_depth // 10),
self.max_workers - current,
)
for _ in range(to_add):
self._spawn()
print(f"Scaled up to {len(self.processes)} workers")
# Scale down
elif queue_depth < 3 and current > self.min_workers:
to_remove = min(2, current - self.min_workers)
for _ in range(to_remove):
p = self.processes.pop()
p.terminate()
print(f"Scaled down to {len(self.processes)} workers")
def _spawn(self):
p = multiprocessing.Process(target=self.worker_fn)
p.start()
self.processes.append(p)
def _cleanup_dead(self):
self.processes = [p for p in self.processes if p.is_alive()]
# Ensure minimum
while len(self.processes) < self.min_workers:
self._spawn()
Ajouter une logique liée au solde
Un bon auto-scaler ne regarde pas seulement la file d'attente. Si le solde descend trop bas, continuer à monter en charge peut simplement accélérer l'épuisement du compte sans résoudre le problème de fond.
def check_balance(api_key, min_balance=2.0):
"""Check if balance is sufficient for scaling."""
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": api_key,
"action": "getbalance",
"json": 1,
}, timeout=15)
balance = float(resp.json()["request"])
if balance < min_balance:
print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
return False
return True
Intégrez cette vérification directement dans la boucle de scaling :
# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
if check_balance(self.api_key, min_balance=2.0):
# Scale up
...
else:
print("Scaling paused — low balance")
Comparer les stratégies de scaling
| Stratégie | Idéale pour | Latence | Complexité |
|---|---|---|---|
| Thread pool | Appels API et I/O | Faible | Faible |
| Process pool | Prétraitement CPU | Moyenne | Moyenne |
| Kubernetes HPA | Infra cloud native | Plus haute | Élevée |
| KEDA | Scaling event-driven | Moyenne | Moyenne |
Depannage
| Probleme | Cause probable | Correctif |
|---|---|---|
| Les workers continuent de monter | La file ne se vide pas vraiment | Vérifiez que les workers consomment bien les tâches |
| Le scale-down est trop agressif | Seuils trop bas | Augmentez le délai avant réduction |
| Des processus zombies restent actifs | Nettoyage incomplet | Appelez _cleanup_dead() régulièrement |
| Le solde fond trop vite | Trop de workers actifs | Ajoutez un garde-fou sur le solde |
FAQ
Quel ratio viser entre file d'attente et workers ?
Un bon point de départ est souvent 1 worker pour 5 à 10 tâches en attente, puis un ajustement selon le type de CAPTCHA et la latence réelle observée.
Faut-il choisir des threads ou des processus ?
Utilisez des threads si votre charge est surtout orientée I/O. Passez aux processus si vous ajoutez du prétraitement d'image ou d'autres opérations CPU en plus de la résolution.
À quelle vitesse faut-il scaler ?
Montez plutôt vite, mais redescendez lentement. C'est ce qui évite les oscillations continuelles quand la charge bouge autour d'un seuil.
Guides connexes
- Files d'attente de tâches Kubernetes
- Surveillance Prometheus/Grafana
Si vous voulez mieux absorber les pics de charge, récupérez votre clé CaptchaAI et ajoutez un auto-scaling piloté par des signaux concrets plutôt qu'un nombre fixe de workers.