DevOps & Mise à l'Échelle

Mettre à l'échelle automatiquement des workers de résolution CAPTCHA

Un pool statique de workers est rarement optimal. Quand le trafic est faible, vous payez trop cher pour de la capacité inutile. Quand la charge monte, les mêmes workers deviennent un goulot d'étranglement. L'auto-scaling permet d'ajuster le nombre de workers à la demande réelle, en fonction de la file d'attente, de la latence observée et de l'état global du système.

L'objectif n'est pas seulement d'ajouter des workers, mais de le faire au bon moment, avec de bons signaux, et sans vider le solde du compte ni provoquer des oscillations permanentes entre scale-up et scale-down.


Signaux utiles pour piloter le scaling

Signal Monter quand Redescendre quand
Profondeur de file > 20 tâches en attente < 5 tâches
Utilisation workers > 80 % occupés < 20 %
Latence de résolution P95 > 60 s P95 < 20 s
Taux d'erreur > 5 % Stable sous 1 %
Solde N/A Solde trop bas, stop du scale-up

Auto-scaling avec des threads

Si votre worker passe surtout son temps à appeler des API et à attendre des résultats, une stratégie basée sur les threads peut suffire. C'est souvent le cas avec CaptchaAI, où la charge est principalement I/O.

import os
import time
import threading
import requests
import json
import redis


class AutoScalingPool:
    """Dynamically scale CaptchaAI worker threads."""

    def __init__(self, api_key, redis_url="redis://localhost:6379"):
        self.api_key = api_key
        self.redis = redis.from_url(redis_url)
        self.base = "https://ocr.captchaai.com"
        self.queue_key = "captcha:tasks"
        self.results_key = "captcha:results"

        self.min_workers = 2
        self.max_workers = 20
        self.workers = []
        self.active_count = 0
        self.lock = threading.Lock()
        self.running = True

    def start(self):
        """Start the pool with minimum workers."""
        for _ in range(self.min_workers):
            self._add_worker()

        # Start scaler in background
        scaler = threading.Thread(target=self._scaling_loop, daemon=True)
        scaler.start()
        print(f"Pool started with {self.min_workers} workers")

    def _add_worker(self):
        """Add a worker thread."""
        if len(self.workers) >= self.max_workers:
            return
        t = threading.Thread(target=self._worker_loop, daemon=True)
        t.start()
        self.workers.append(t)

    def _remove_worker(self):
        """Signal one worker to stop (lazy removal)."""
        if len(self.workers) <= self.min_workers:
            return
        self.workers.pop()  # Thread will exit on next idle cycle

    def _worker_loop(self):
        """Worker loop: fetch and process tasks."""
        while self.running and threading.current_thread() in self.workers:
            result = self.redis.blpop(self.queue_key, timeout=10)
            if result is None:
                continue

            _, raw = result
            task = json.loads(raw)
            task_id = task["id"]

            with self.lock:
                self.active_count += 1

            try:
                token = self._solve(task["method"], task["params"])
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "success", "token": token,
                }))
            except Exception as e:
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "error", "error": str(e),
                }))
            finally:
                with self.lock:
                    self.active_count -= 1

    def _scaling_loop(self):
        """Periodically adjust worker count."""
        while self.running:
            time.sleep(10)

            queue_depth = self.redis.llen(self.queue_key)
            current = len(self.workers)
            utilization = (
                self.active_count / current * 100 if current > 0 else 0
            )

            # Scale up: queue growing and workers busy
            if queue_depth > 20 and utilization > 70:
                new_count = min(current + 2, self.max_workers)
                while len(self.workers) < new_count:
                    self._add_worker()
                print(f"Scaled up: {current} → {len(self.workers)} workers")

            # Scale down: queue empty and workers idle
            elif queue_depth < 5 and utilization < 20:
                target = max(current - 1, self.min_workers)
                while len(self.workers) > target:
                    self._remove_worker()
                if len(self.workers) < current:
                    print(f"Scaled down: {current} → {len(self.workers)} workers")

    def _solve(self, method, params, timeout=120):
        data = {"key": self.api_key, "method": method, "json": 1}
        data.update(params)

        resp = requests.post(
            f"{self.base}/in.php", data=data, timeout=30,
        )
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")

    def stats(self):
        return {
            "workers": len(self.workers),
            "active": self.active_count,
            "queue": self.redis.llen(self.queue_key),
        }


# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()

# Monitor
while True:
    print(pool.stats())
    time.sleep(30)

Auto-scaling avec des processus

Quand vous ajoutez du prétraitement d'image, du parsing lourd ou d'autres tâches CPU, il peut être plus propre d'isoler les workers dans des processus distincts.

import multiprocessing
import time
import redis
import os


class ProcessScaler:
    """Scale worker processes based on queue depth."""

    def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
        self.worker_fn = worker_fn
        self.redis = redis.from_url(redis_url)
        self.processes = []
        self.min_workers = 2
        self.max_workers = 16

    def run(self, check_interval=15):
        """Run the scaler loop."""
        # Start minimum workers
        for _ in range(self.min_workers):
            self._spawn()

        while True:
            time.sleep(check_interval)
            self._cleanup_dead()

            queue_depth = self.redis.llen("captcha:tasks")
            current = len(self.processes)

            # Scale up
            if queue_depth > current * 5 and current < self.max_workers:
                to_add = min(
                    max(1, queue_depth // 10),
                    self.max_workers - current,
                )
                for _ in range(to_add):
                    self._spawn()
                print(f"Scaled up to {len(self.processes)} workers")

            # Scale down
            elif queue_depth < 3 and current > self.min_workers:
                to_remove = min(2, current - self.min_workers)
                for _ in range(to_remove):
                    p = self.processes.pop()
                    p.terminate()
                print(f"Scaled down to {len(self.processes)} workers")

    def _spawn(self):
        p = multiprocessing.Process(target=self.worker_fn)
        p.start()
        self.processes.append(p)

    def _cleanup_dead(self):
        self.processes = [p for p in self.processes if p.is_alive()]
        # Ensure minimum
        while len(self.processes) < self.min_workers:
            self._spawn()

Ajouter une logique liée au solde

Un bon auto-scaler ne regarde pas seulement la file d'attente. Si le solde descend trop bas, continuer à monter en charge peut simplement accélérer l'épuisement du compte sans résoudre le problème de fond.

def check_balance(api_key, min_balance=2.0):
    """Check if balance is sufficient for scaling."""
    resp = requests.get("https://ocr.captchaai.com/res.php", params={
        "key": api_key,
        "action": "getbalance",
        "json": 1,
    }, timeout=15)
    balance = float(resp.json()["request"])

    if balance < min_balance:
        print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
        return False
    return True

Intégrez cette vérification directement dans la boucle de scaling :

# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
    if check_balance(self.api_key, min_balance=2.0):
        # Scale up
        ...
    else:
        print("Scaling paused — low balance")

Comparer les stratégies de scaling

Stratégie Idéale pour Latence Complexité
Thread pool Appels API et I/O Faible Faible
Process pool Prétraitement CPU Moyenne Moyenne
Kubernetes HPA Infra cloud native Plus haute Élevée
KEDA Scaling event-driven Moyenne Moyenne

Depannage

Probleme Cause probable Correctif
Les workers continuent de monter La file ne se vide pas vraiment Vérifiez que les workers consomment bien les tâches
Le scale-down est trop agressif Seuils trop bas Augmentez le délai avant réduction
Des processus zombies restent actifs Nettoyage incomplet Appelez _cleanup_dead() régulièrement
Le solde fond trop vite Trop de workers actifs Ajoutez un garde-fou sur le solde

FAQ

Quel ratio viser entre file d'attente et workers ?

Un bon point de départ est souvent 1 worker pour 5 à 10 tâches en attente, puis un ajustement selon le type de CAPTCHA et la latence réelle observée.

Faut-il choisir des threads ou des processus ?

Utilisez des threads si votre charge est surtout orientée I/O. Passez aux processus si vous ajoutez du prétraitement d'image ou d'autres opérations CPU en plus de la résolution.

À quelle vitesse faut-il scaler ?

Montez plutôt vite, mais redescendez lentement. C'est ce qui évite les oscillations continuelles quand la charge bouge autour d'un seuil.


Guides connexes

  • Files d'attente de tâches Kubernetes
  • Surveillance Prometheus/Grafana

Si vous voulez mieux absorber les pics de charge, récupérez votre clé CaptchaAI et ajoutez un auto-scaling piloté par des signaux concrets plutôt qu'un nombre fixe de workers.

Les commentaires sont désactivés pour cet article.