Le suivi de vos mesures de résolution de CAPTCHA permet d'optimiser les coûts, de détecter les problèmes à un stade précoce et de planifier la capacité. Ce guide crée un système de surveillance qui enregistre chaque résolution et génère des rapports d'utilisation.
Que surveiller
| Métrique | Pourquoi c'est important |
|---|---|
| Résoudre le nombre | Suivre le volume d'utilisation |
| Taux de réussite | Détecter les problèmes de qualité |
| Temps de réponse | Identifier les ralentissements |
| Taux de dépenses | Gestion budgétaire |
| Répartition des erreurs | Déboguer les modèles d'échec |
| Solde | Prévenir les pannes |
| Détail de la méthode | Comprendre les modèles d'utilisation |
Collecteur de métriques
import time
import csv
import datetime
import threading
from collections import defaultdict
class MetricsCollector:
"""Collect and store CaptchaAI solve metrics."""
def __init__(self, log_file="captchaai_metrics.csv"):
self.log_file = log_file
self.lock = threading.Lock()
self.session_stats = defaultdict(lambda: {
"count": 0, "success": 0, "error": 0,
"timeout": 0, "total_time": 0,
})
self._init_log()
def _init_log(self):
try:
with open(self.log_file, "r"):
pass
except FileNotFoundError:
with open(self.log_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"timestamp", "method", "duration_s",
"status", "error_code", "task_id",
])
def record(self, method, duration, status, error_code="", task_id=""):
"""Record a solve attempt."""
with self.lock:
# Update in-memory stats
stats = self.session_stats[method]
stats["count"] += 1
stats["total_time"] += duration
if status == "success":
stats["success"] += 1
elif status == "timeout":
stats["timeout"] += 1
else:
stats["error"] += 1
# Write to CSV
with open(self.log_file, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow([
datetime.datetime.utcnow().isoformat(),
method, f"{duration:.2f}",
status, error_code, task_id,
])
def get_session_summary(self):
"""Get current session statistics."""
summary = {}
for method, stats in self.session_stats.items():
avg_time = (
stats["total_time"] / stats["count"]
if stats["count"] > 0 else 0
)
success_rate = (
stats["success"] / stats["count"] * 100
if stats["count"] > 0 else 0
)
summary[method] = {
"total": stats["count"],
"success": stats["success"],
"errors": stats["error"],
"timeouts": stats["timeout"],
"success_rate": f"{success_rate:.1f}%",
"avg_time": f"{avg_time:.1f}s",
}
return summary
Solveur instrumenté
Enveloppez votre solveur pour collecter automatiquement des métriques :
import requests
import time
class MonitoredSolver:
"""Solver with automatic metric collection."""
def __init__(self, api_key, metrics=None):
self.api_key = api_key
self.base = "https://ocr.captchaai.com"
self.metrics = metrics or MetricsCollector()
def solve(self, method, **params):
start = time.time()
task_id = ""
status = "error"
error_code = ""
try:
# Submit
data = {"key": self.api_key, "method": method, "json": 1}
data.update(params)
resp = requests.post(
f"{self.base}/in.php", data=data, timeout=30,
)
result = resp.json()
if result.get("status") != 1:
error_code = result.get("request", "UNKNOWN")
raise RuntimeError(f"Submit error: {error_code}")
task_id = result["request"]
# Poll
token = self._poll(task_id)
status = "success"
return token
except TimeoutError:
status = "timeout"
raise
except Exception as e:
error_code = str(e)[:50]
raise
finally:
duration = time.time() - start
self.metrics.record(method, duration, status, error_code, task_id)
def _poll(self, task_id, timeout=120):
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(f"Solve error: {data['request']}")
raise TimeoutError("Poll timeout")
def print_summary(self):
"""Print current session metrics."""
summary = self.metrics.get_session_summary()
print("\n=== CaptchaAI Usage Summary ===")
for method, stats in summary.items():
print(f"\n{method}:")
for key, value in stats.items():
print(f" {key}: {value}")
# Usage
metrics = MetricsCollector()
solver = MonitoredSolver("YOUR_API_KEY", metrics)
# Solve some CAPTCHAs
for i in range(10):
try:
token = solver.solve(
"userrecaptcha",
googlekey="SITE_KEY",
pageurl="https://example.com",
)
except Exception as e:
print(f"Failed: {e}")
# Print results
solver.print_summary()
Générateur de rapports d'utilisation
Générez des rapports quotidiens /weekly à partir des métriques collectées :
import csv
import datetime
from collections import defaultdict
class UsageReport:
"""Generate usage reports from metrics CSV."""
def __init__(self, log_file="captchaai_metrics.csv"):
self.log_file = log_file
def _load_data(self, days=None):
"""Load metrics, optionally filtered by date range."""
cutoff = None
if days:
cutoff = datetime.datetime.utcnow() - datetime.timedelta(days=days)
records = []
with open(self.log_file, "r") as f:
reader = csv.DictReader(f)
for row in reader:
ts = datetime.datetime.fromisoformat(row["timestamp"])
if cutoff and ts < cutoff:
continue
row["_ts"] = ts
row["_duration"] = float(row["duration_s"])
records.append(row)
return records
def daily_summary(self, days=7):
"""Summarize usage per day."""
records = self._load_data(days=days)
by_day = defaultdict(lambda: {"count": 0, "success": 0, "total_time": 0})
for rec in records:
day = rec["_ts"].date().isoformat()
by_day[day]["count"] += 1
if rec["status"] == "success":
by_day[day]["success"] += 1
by_day[day]["total_time"] += rec["_duration"]
print(f"=== Daily Summary (last {days} days) ===")
print(f"{'Date':<12} {'Total':>6} {'Success':>8} {'Rate':>7} {'Avg Time':>9}")
for day in sorted(by_day.keys()):
stats = by_day[day]
rate = stats["success"] / stats["count"] * 100 if stats["count"] > 0 else 0
avg = stats["total_time"] / stats["count"] if stats["count"] > 0 else 0
print(f"{day:<12} {stats['count']:>6} {stats['success']:>8} {rate:>6.1f}% {avg:>8.1f}s")
def method_breakdown(self, days=30):
"""Summarize usage by CAPTCHA type."""
records = self._load_data(days=days)
by_method = defaultdict(lambda: {"count": 0, "success": 0, "total_time": 0})
for rec in records:
method = rec["method"]
by_method[method]["count"] += 1
if rec["status"] == "success":
by_method[method]["success"] += 1
by_method[method]["total_time"] += rec["_duration"]
print(f"\n=== Method Breakdown (last {days} days) ===")
print(f"{'Method':<25} {'Total':>6} {'Success':>8} {'Rate':>7} {'Avg Time':>9}")
for method in sorted(by_method.keys()):
stats = by_method[method]
rate = stats["success"] / stats["count"] * 100
avg = stats["total_time"] / stats["count"]
print(f"{method:<25} {stats['count']:>6} {stats['success']:>8} {rate:>6.1f}% {avg:>8.1f}s")
def error_breakdown(self, days=7):
"""Show error distribution."""
records = self._load_data(days=days)
errors = defaultdict(int)
for rec in records:
if rec["status"] != "success" and rec["error_code"]:
errors[rec["error_code"]] += 1
if errors:
print(f"\n=== Error Breakdown (last {days} days) ===")
for error, count in sorted(errors.items(), key=lambda x: -x[1]):
print(f" {error}: {count}")
# Usage
report = UsageReport()
report.daily_summary(days=7)
report.method_breakdown(days=30)
report.error_breakdown(days=7)
Suivi de l'historique des soldes
import requests
import time
import csv
import datetime
class BalanceDashboard:
"""Track balance over time for spending analysis."""
def __init__(self, api_key, log_file="balance_history.csv"):
self.api_key = api_key
self.log_file = log_file
def record(self):
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key,
"action": "getbalance",
"json": 1,
})
balance = float(resp.json()["request"])
with open(self.log_file, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow([
datetime.datetime.utcnow().isoformat(),
f"{balance:.4f}",
])
return balance
def get_spending(self, hours=24):
"""Calculate spending over time period."""
cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=hours)
balances = []
try:
with open(self.log_file, "r") as f:
reader = csv.reader(f)
for row in reader:
ts = datetime.datetime.fromisoformat(row[0])
if ts > cutoff:
balances.append(float(row[1]))
except FileNotFoundError:
return 0
if len(balances) < 2:
return 0
return balances[0] - balances[-1]
Dépannage
| Problème | Parce que | Corriger |
|---|---|---|
| Le fichier CSV devient trop volumineux | Surveillance à long terme | Rotation des fichiers quotidiennement/weekly |
| Enregistrements de résolution manquants | Solveur non instrumenté | Enveloppez tous les solveurs avec MonitoredSolver |
| Les statistiques ne correspondent pas à la facturation | Enregistrements d'erreurs manquants | Assurez-vous que le bloc finally enregistre toujours |
| Taux d'erreur élevé dans le tableau de bord | Paramètres API incorrects | Vérifier le rapport de répartition des erreurs |
FAQ
Quelle quantité de données dois-je conserver ?
Conservez les mesures détaillées pendant 30 jours et les données résumées pendant 90 jours. Archivez les données plus anciennes pour réduire la taille des fichiers.
Puis-je exporter des métriques vers Prometheus/Grafana ?
Oui. Le MetricsCollector peut être étendu pour transmettre des métriques à Prometheus à l'aide de la bibliothèque prometheus_client. Voir le guide de surveillance Prometheus.
Dois-je surveiller en production ?
Oui. La surcharge liée à la journalisation de chaque résolution au format CSV est négligeable (<1 ms par écriture). La visibilité qu’elle offre est essentielle pour les pipelines de production.
Guides connexes
- Vérification du solde et recharge automatique
- Arbre de décision de gestion des erreurs
Connaissez vos chiffres -démarrer la surveillance CaptchaAIaujourd'hui.