FAEA/docs/ADD_v0.1.md
2025-12-22 17:14:46 +08:00

1340 lines
No EOL
43 KiB
Markdown

# Architecture Definition Document (ADD)
## High-Fidelity Autonomous Extraction Agent
**Document Version:** 1.0
**Classification:** Technical Architecture Blueprint
**Author:** Principal System Architect & Distinguished Engineer
**Date:** December 21, 2025
---
## 1. Executive Summary
This document defines the architecture for a **High-Fidelity Autonomous Extraction Agent** employing a hybrid "Headless-Plus" methodology. The system is engineered to defeat advanced bot mitigation systems (Cloudflare Turnstile, Akamai Bot Manager, Datadome) through multi-layered behavioral mimicry, TLS fingerprint consistency, and entropy-maximized request scheduling.
### 1.1 Architectural Philosophy
The core innovation lies in the **bifurcated execution model**:
1. **Heavy Lifting Phase (Camoufox):** Full browser context for authentication, CAPTCHA solving, and session establishment. This phase prioritizes fidelity over throughput.
2. **Extraction Phase (curl_cffi):** Stateless, high-velocity API requests using inherited session state and matching TLS fingerprints. This phase prioritizes throughput over complexity.
The handover protocol between these subsystems is the critical junction where most naive implementations fail. Our architecture treats this transition as a **stateful serialization problem** with cryptographic verification.
### 1.2 Threat Model
We assume adversarial detection systems employ:
- **Behavioral Biometrics:** Mouse trajectory analysis, keystroke dynamics, scroll entropy
- **TLS Fingerprinting:** JA3/JA4 hash validation, ALPN mismatch detection
- **Temporal Analysis:** Request rate anomalies, clock skew detection
- **IP Reputation Scoring:** ASN reputation, CGNAT variance, geolocation consistency
- **Canvas/WebGL Fingerprinting:** Hardware-derived entropy harvesting
- **Session Replay Analysis:** DOM mutation rate, event ordering validation
---
## 2. System Context Diagram
```mermaid
graph TB
subgraph "Control Plane"
A[Orchestrator Service]
B[BrowserForge Profile Generator]
C[Scheduler with Clock Drift]
end
subgraph "Execution Plane"
D[Camoufox Manager Pool]
E[curl_cffi Client Pool]
F[Ghost Cursor Engine]
end
subgraph "Infrastructure Layer"
G[Mobile Proxy Network 4G/5G CGNAT]
H[Session State Store Redis]
I[Docker Swarm Cluster]
end
subgraph "Target Infrastructure"
J[Cloudflare/Akamai WAF]
K[Origin Server]
end
A -->|Profile Assignment| B
B -->|Fingerprint Package| D
A -->|Task Dispatch| C
C -->|Browser Task| D
C -->|API Task| E
D -->|Behavioral Input| F
D -->|Session State| H
H -->|Token Retrieval| E
D -->|Requests| G
E -->|Requests| G
G -->|Traffic| J
J -->|Validated| K
I -->|Container Orchestration| D
I -->|Container Orchestration| E
```
---
## 3. Component Architecture
### 3.1 The Browser Manager (Camoufox)
**Responsibility:** Establish authenticated sessions with maximum behavioral fidelity.
#### 3.1.1 Lifecycle State Machine
```
[COLD] → [WARMING] → [AUTHENTICATED] → [TOKEN_EXTRACTED] → [TERMINATED]
↓ ↑
└─────────────────── [FAILED] ──────────────────────────────┘
```
#### 3.1.2 Implementation Pseudo-Logic
```python
class CamoufoxManager:
def __init__(self, profile: BrowserForgeProfile):
self.profile = profile
self.context = None
self.page = None
self.ghost_cursor = GhostCursorEngine()
async def initialize(self):
"""
Inject BrowserForge profile into Camoufox launch parameters.
Critical: Match TLS fingerprint to User-Agent.
"""
launch_options = {
'args': self._build_chrome_args(),
'fingerprint': self.profile.to_camoufox_fingerprint(),
'proxy': self._get_mobile_proxy(),
'viewport': self.profile.viewport,
'locale': self.profile.locale,
'timezone': self.profile.timezone,
}
# Inject canvas/WebGL noise based on hardware profile
self.context = await playwright.chromium.launch(**launch_options)
self.page = await self.context.new_page()
# Override navigator properties for consistency
await self._inject_navigator_overrides()
await self._inject_webgl_vendor()
async def _inject_navigator_overrides(self):
"""
Ensure navigator.hardwareConcurrency, deviceMemory, etc.
match the BrowserForge profile's hardware constraints.
"""
await self.page.add_init_script(f"""
Object.defineProperty(navigator, 'hardwareConcurrency', {{
get: () => {self.profile.hardware_concurrency}
}});
Object.defineProperty(navigator, 'deviceMemory', {{
get: () => {self.profile.device_memory}
}});
""")
async def solve_authentication(self, target_url: str):
"""
Navigate with human-like behavior:
1. Random delay before navigation (2-7s)
2. Mouse movement to URL bar simulation
3. Keystroke dynamics for typing URL
4. Random scroll and mouse drift post-load
"""
await asyncio.sleep(random.uniform(2.0, 7.0))
await self.ghost_cursor.move_to_url_bar(self.page)
await self.page.goto(target_url, wait_until='networkidle')
# Post-load entropy injection
await self._simulate_reading_behavior()
async def _simulate_reading_behavior(self):
"""
Human reading heuristics:
- F-pattern eye tracking simulation via scroll
- Random pauses at headings
- Micro-movements during "reading"
"""
scroll_points = self._generate_f_pattern_scroll()
for point in scroll_points:
await self.page.evaluate(f"window.scrollTo(0, {point})")
await self.ghost_cursor.random_micro_movement()
await asyncio.sleep(random.lognormal(0.8, 0.3))
async def extract_session_state(self) -> SessionState:
"""
Serialize all stateful artifacts for handover:
- Cookies (including HttpOnly)
- LocalStorage
- SessionStorage
- IndexedDB keys
- Service Worker registrations
"""
cookies = await self.context.cookies()
local_storage = await self.page.evaluate("() => Object.entries(localStorage)")
session_storage = await self.page.evaluate("() => Object.entries(sessionStorage)")
# Critical: Capture Cloudflare challenge tokens
cf_clearance = next((c for c in cookies if c['name'] == 'cf_clearance'), None)
return SessionState(
cookies=cookies,
local_storage=dict(local_storage),
session_storage=dict(session_storage),
cf_clearance=cf_clearance,
user_agent=self.profile.user_agent,
tls_fingerprint=self.profile.tls_fingerprint,
timestamp=time.time()
)
```
#### 3.1.3 Entropy Maximization Strategy
To defeat temporal analysis, we introduce **jittered scheduling** modeled as a log-normal distribution:
$$
\Delta t \sim \text{LogNormal}(\mu = 3.2, \sigma = 0.8)
$$
Where $\Delta t$ represents inter-request delay in seconds. This mirrors empirical human behavior distributions from HCI research (Card et al., 1983).
---
### 3.2 The Network Bridge (Handover Protocol)
**Critical Design Constraint:** The TLS fingerprint of `curl_cffi` must match the JA3 signature that Camoufox presented during authentication.
#### 3.2.1 State Serialization Schema
```python
@dataclass
class SessionState:
cookies: List[Dict[str, Any]]
local_storage: Dict[str, str]
session_storage: Dict[str, str]
cf_clearance: Optional[Dict[str, Any]]
user_agent: str
tls_fingerprint: str # e.g., "chrome120"
timestamp: float
def to_redis_key(self, session_id: str) -> str:
return f"session:{session_id}:state"
def serialize(self) -> bytes:
"""
Serialize with MessagePack for compact representation.
Include HMAC for integrity verification.
"""
payload = msgpack.packb({
'cookies': self.cookies,
'local_storage': self.local_storage,
'session_storage': self.session_storage,
'cf_clearance': self.cf_clearance,
'user_agent': self.user_agent,
'tls_fingerprint': self.tls_fingerprint,
'timestamp': self.timestamp,
})
hmac_sig = hmac.new(SECRET_KEY, payload, hashlib.sha256).digest()
return hmac_sig + payload
```
#### 3.2.2 curl_cffi Client Configuration
```python
class CurlCffiClient:
def __init__(self, session_state: SessionState):
self.session_state = session_state
self.session = AsyncSession(impersonate=session_state.tls_fingerprint)
async def initialize(self):
"""
Configure curl_cffi to match Camoufox's network signature.
"""
# Inject cookies
for cookie in self.session_state.cookies:
self.session.cookies.set(
name=cookie['name'],
value=cookie['value'],
domain=cookie['domain'],
path=cookie.get('path', '/'),
secure=cookie.get('secure', False),
)
# Build header profile from BrowserForge
self.headers = {
'User-Agent': self.session_state.user_agent,
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': 'https://target.com/',
'Origin': 'https://target.com',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'sec-ch-ua': self._derive_sec_ch_ua(),
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
def _derive_sec_ch_ua(self) -> str:
"""
Derive sec-ch-ua from User-Agent to ensure consistency.
Example: Chrome/120.0.6099.109 → "Chromium";v="120", "Google Chrome";v="120"
"""
# Parse User-Agent and construct matching sec-ch-ua
# This is critical—mismatches trigger instant flagging
pass
async def fetch(self, url: str, method: str = 'GET', **kwargs):
"""
Execute request with TLS fingerprint matching browser.
Include random delays modeled on human API interaction.
"""
await asyncio.sleep(random.lognormal(0.2, 0.1))
response = await self.session.request(
method=method,
url=url,
headers=self.headers,
**kwargs
)
# Verify we're not challenged
if 'cf-mitigated' in response.headers:
raise SessionInvalidatedError("Cloudflare challenge detected")
return response
```
#### 3.2.3 Handover Sequence Diagram
```mermaid
sequenceDiagram
participant O as Orchestrator
participant C as Camoufox
participant R as Redis Store
participant Curl as curl_cffi Client
participant T as Target API
O->>C: Dispatch Auth Task
C->>T: Navigate + Solve Challenge
T-->>C: Set Cookies + Challenge Token
C->>C: Extract Session State
C->>R: Serialize State to Redis
C->>O: Signal Ready
O->>Curl: Dispatch Extraction Task
Curl->>R: Retrieve Session State
Curl->>Curl: Configure TLS + Headers
Curl->>T: API Request (with cookies)
T-->>Curl: JSON Response
Curl->>O: Deliver Payload
```
---
### 3.3 The Scheduler (Clock Drift & Rotation Logic)
**Design Principle:** Deterministic scheduling reveals automation. We introduce controlled chaos.
#### 3.3.1 Clock Drift Implementation
Adversarial systems analyze request timestamps for periodicity. We inject **Gaussian noise** into task dispatch:
$$
t_{\text{actual}} = t_{\text{scheduled}} + \mathcal{N}(0, \sigma^2)
$$
Where $\sigma = 5$ seconds. Additionally, we implement **phase shift rotation** to avoid harmonic patterns:
```python
class EntropyScheduler:
def __init__(self, base_interval: float = 30.0):
self.base_interval = base_interval
self.phase_offset = 0.0
self.drift_sigma = 5.0
def next_execution_time(self) -> float:
"""
Calculate next execution with drift and phase rotation.
"""
# Base interval with Gaussian noise
noisy_interval = self.base_interval + random.gauss(0, self.drift_sigma)
# Phase shift accumulation (simulates human circadian variance)
self.phase_offset += random.uniform(-0.5, 0.5)
# Clamp to reasonable bounds
next_time = max(5.0, noisy_interval + self.phase_offset)
return time.time() + next_time
async def dispatch_with_entropy(self, task: Callable):
"""
Execute task at entropic time with pre-task jitter.
"""
execution_time = self.next_execution_time()
await asyncio.sleep(execution_time - time.time())
# Pre-execution jitter (simulate human hesitation)
await asyncio.sleep(random.uniform(0.1, 0.8))
await task()
```
#### 3.3.2 Proxy Rotation Strategy
Mobile proxies provide high IP reputation but require careful rotation to avoid correlation:
```python
class MobileProxyRotator:
def __init__(self, proxy_pool: List[str]):
self.proxy_pool = proxy_pool
self.usage_history = {}
self.cooldown_period = 300 # 5 minutes
def select_proxy(self, session_id: str) -> str:
"""
Sticky session assignment with cooldown enforcement.
Rule: Same session_id always gets same proxy (until cooldown).
Prevents mid-session IP changes which trigger fraud alerts.
"""
if session_id in self.usage_history:
proxy, last_used = self.usage_history[session_id]
if time.time() - last_used < self.cooldown_period:
return proxy
# Select least-recently-used proxy
available = [p for p in self.proxy_pool
if self._is_cooled_down(p)]
if not available:
raise ProxyExhaustionError("No proxies available")
proxy = min(available, key=lambda p: self._last_use_time(p))
self.usage_history[session_id] = (proxy, time.time())
return proxy
def _is_cooled_down(self, proxy: str) -> bool:
"""Check if proxy has completed cooldown period."""
if proxy not in self.usage_history:
return True
_, last_used = self.usage_history[proxy]
return time.time() - last_used > self.cooldown_period
```
---
## 4. Data Flow Description
### 4.1 Cold Boot Sequence
```
[START]
|
v
1. Orchestrator requests fingerprint from BrowserForge
- OS: Windows 11
- Browser: Chrome 120.0.6099.109
- Screen: 1920x1080
- Hardware: Intel i7, 16GB RAM
|
v
2. BrowserForge generates deterministic profile
- TLS fingerprint: chrome120
- Canvas noise seed: 0x3f2a9c
- WebGL vendor: "ANGLE (Intel, Intel(R) UHD Graphics 620)"
- User-Agent + sec-ch-ua alignment verified
|
v
3. Camoufox container instantiated with profile
- Docker: camoufox:latest
- Proxy: Mobile 4G (AT&T, Chicago)
- Memory limit: 2GB
- CPU limit: 2 cores
|
v
4. Ghost Cursor engine initialized
- Bezier curve generator seeded
- Velocity profile: human-average (200-400 px/s)
|
v
5. Navigation to target with behavioral simulation
- Pre-navigation delay: 4.2s
- Mouse hover on URL bar: 0.3s
- Typing simulation: 12 keystrokes at 180ms intervals
- Page load wait: networkidle
|
v
6. Challenge detection and solving
- If Cloudflare: Wait for Turnstile, interact if required
- If CAPTCHA: Delegate to 2Captcha/CapSolver
- Monitor for cf_clearance cookie
|
v
7. Post-authentication behavior
- Random scroll (F-pattern)
- Mouse micro-movements: 8-12 per scroll
- Time on page: 15-30s (lognormal distribution)
|
v
8. Session state extraction
- 23 cookies captured (including HttpOnly)
- cf_clearance: present, expires in 1800s
- localStorage: 4 keys
- sessionStorage: 2 keys
|
v
9. State serialization to Redis
- Key: session:a3f9c2d1:state
- HMAC: verified
- TTL: 1500s (before cookie expiration)
|
v
10. Camoufox container terminated
- Browser context closed
- Memory freed
- Proxy connection released to cooldown
```
### 4.2 Extraction Phase Sequence
```
[TRIGGER: API extraction task]
|
v
1. curl_cffi client initialized
- Retrieves session state from Redis
- Configures TLS fingerprint: chrome120
- Injects 23 cookies
- Sets headers with sec-ch-ua consistency
|
v
2. Scheduler calculates next execution time
- Base interval: 30s
- Gaussian noise: +3.7s
- Phase offset: -0.2s
- Actual delay: 33.5s
|
v
3. Pre-request jitter applied
- Random delay: 0.4s
|
v
4. API request dispatched
- Method: GET
- URL: https://api.target.com/v1/data
- Headers: 14 headers set
- TLS: JA3 matches browser session
|
v
5. Response validation
- Status: 200 OK
- cf-mitigated header: absent
- JSON payload: 2.3 MB
|
v
6. Payload delivered to data pipeline
- Parsed and validated
- Stored in time-series database
|
v
7. Next iteration scheduled
- Session state TTL checked
- If < 300s remaining: trigger re-authentication
- Else: continue extraction phase
```
---
## 5. Entropy & Evasion Strategy
### 5.1 BrowserForge Profile Mapping
**Critical Constraint:** Every profile component must exhibit statistical correlation.
```python
class BrowserForgeProfileValidator:
"""
Validates that generated profiles exhibit internally consistent
statistical properties to avoid fingerprint contradictions.
"""
def validate(self, profile: BrowserForgeProfile) -> ValidationResult:
checks = [
self._check_user_agent_sec_ch_consistency(profile),
self._check_viewport_screen_consistency(profile),
self._check_hardware_memory_consistency(profile),
self._check_timezone_locale_consistency(profile),
self._check_tls_browser_version_consistency(profile),
]
failures = [c for c in checks if not c.passed]
return ValidationResult(passed=len(failures) == 0, failures=failures)
def _check_user_agent_sec_ch_consistency(self, profile):
"""
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64)
AppleWebKit/537.36 (KHTML, like Gecko)
Chrome/120.0.6099.109 Safari/537.36
sec-ch-ua: "Not_A Brand";v="8", "Chromium";v="120",
"Google Chrome";v="120"
These MUST align or instant detection occurs.
"""
ua_version = self._extract_chrome_version(profile.user_agent)
ch_version = self._extract_ch_ua_version(profile.sec_ch_ua)
return ValidationCheck(
name="UA/sec-ch-ua consistency",
passed=(ua_version == ch_version),
details=f"UA: {ua_version}, sec-ch-ua: {ch_version}"
)
def _check_viewport_screen_consistency(self, profile):
"""
Viewport should be smaller than screen resolution.
Typical browser chrome: 70-120px vertical offset.
"""
viewport_height = profile.viewport['height']
screen_height = profile.screen['height']
chrome_height = screen_height - viewport_height
# Reasonable browser chrome range
valid = 50 <= chrome_height <= 150
return ValidationCheck(
name="Viewport/Screen consistency",
passed=valid,
details=f"Chrome height: {chrome_height}px"
)
def _check_hardware_memory_consistency(self, profile):
"""
deviceMemory should align with hardwareConcurrency.
Typical ratios: 2GB per core for consumer hardware.
"""
memory_gb = profile.device_memory
cores = profile.hardware_concurrency
ratio = memory_gb / cores
# Consumer hardware typically 1-4 GB per core
valid = 1 <= ratio <= 4
return ValidationCheck(
name="Hardware/Memory consistency",
passed=valid,
details=f"Ratio: {ratio:.2f} GB/core"
)
```
### 5.2 Ghost Cursor Bezier Implementation
Human mouse movement exhibits **submovement composition** (Meyer et al., 1988). We model this with composite Bezier curves:
```python
class GhostCursorEngine:
def __init__(self):
self.velocity_profile = self._load_human_velocity_distribution()
async def move_to(self, page: Page, target_x: int, target_y: int):
"""
Generate human-like trajectory using composite Bezier curves
with velocity-based submovement decomposition.
"""
current_x, current_y = await self._get_cursor_position(page)
# Calculate distance for submovement count
distance = math.sqrt((target_x - current_x)**2 +
(target_y - current_y)**2)
# Human submovements: 1-3 for short distances, up to 5 for long
num_submovements = min(5, max(1, int(distance / 300)))
waypoints = self._generate_waypoints(
(current_x, current_y),
(target_x, target_y),
num_submovements
)
for i in range(len(waypoints) - 1):
await self._execute_submovement(page, waypoints[i], waypoints[i+1])
def _generate_waypoints(self, start, end, count):
"""
Generate intermediate waypoints with Gaussian perturbation
to simulate motor control noise.
"""
waypoints = [start]
for i in range(1, count):
t = i / count
# Linear interpolation with perpendicular noise
x = start[0] + t * (end[0] - start[0])
y = start[1] + t * (end[1] - start[1])
# Add perpendicular noise (overshooting)
angle = math.atan2(end[1] - start[1], end[0] - start[0])
perp_angle = angle + math.pi / 2
noise_magnitude = random.gauss(0, 10)
x += noise_magnitude * math.cos(perp_angle)
y += noise_magnitude * math.sin(perp_angle)
waypoints.append((x, y))
waypoints.append(end)
return waypoints
async def _execute_submovement(self, page, start, end):
"""
Execute single submovement with velocity profile matching
Fitts's Law: T = a + b * log2(D/W + 1)
"""
distance = math.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2)
# Generate Bezier control points
control1, control2 = self._generate_bezier_controls(start, end)
# Calculate movement time from Fitts's Law
a, b = 0.1, 0.15 # Empirical constants
movement_time = a + b * math.log2(distance / 10 + 1)
# Sample Bezier curve
steps = max(10, int(distance / 5))
for i in range(steps + 1):
t = i / steps
point = self._bezier_point(t, start, control1, control2, end)
await page.mouse.move(point[0], point[1])
await asyncio.sleep(movement_time / steps)
def _bezier_point(self, t, p0, p1, p2, p3):
"""Cubic Bezier curve evaluation."""
x = (1-t)**3 * p0[0] + 3*(1-t)**2*t * p1[0] + \
3*(1-t)*t**2 * p2[0] + t**3 * p3[0]
y = (1-t)**3 * p0[1] + 3*(1-t)**2*t * p1[1] + \
3*(1-t)*t**2 * p2[1] + t**3 * p3[1]
return (x, y)
async def random_micro_movement(self):
"""
Simulate fidgeting during reading:
Small, low-velocity movements (drift).
"""
drift_x = random.gauss(0, 15)
drift_y = random.gauss(0, 15)
# Execute slowly (low velocity indicates inattention)
# Implementation omitted for brevity
```
### 5.3 Clock Drift Mathematical Model
To avoid temporal fingerprinting, we model human variance in task execution:
$$
\begin{aligned}
T_{\text{base}} &= 30 \text{ seconds (target interval)} \\
T_{\text{actual}} &= T_{\text{base}} + \Delta_{\text{gauss}} + \Delta_{\text{phase}} \\
\Delta_{\text{gauss}} &\sim \mathcal{N}(0, \sigma^2), \quad \sigma = 5 \\
\Delta_{\text{phase}} &= \phi(t), \quad \phi(t + \Delta t) = \phi(t) + \mathcal{U}(-0.5, 0.5)
\end{aligned}
$$
The phase term $\phi(t)$ introduces **low-frequency drift** that prevents harmonic detection. Additionally, we clamp to ensure biological plausibility:
$$
T_{\text{final}} = \max(5, \min(120, T_{\text{actual}}))
$$
---
## 6. Infrastructure & DevOps
### 6.1 Docker Containerization Strategy
```yaml
# docker-compose.yml
version: '3.8'
services:
orchestrator:
image: extraction-agent/orchestrator:latest
environment:
- REDIS_URL=redis://redis:6379
- PROXY_API_KEY=${PROXY_API_KEY}
depends_on:
- redis
deploy:
replicas: 1
camoufox-pool:
image: extraction-agent/camoufox:latest
environment:
- BROWSERFORGE_SEED=${BROWSERFORGE_SEED}
- REDIS_URL=redis://redis:6379
shm_size: 2gb # Critical: shared memory for Chrome
deploy:
replicas: 5
resources:
limits:
cpus: '2'
memory: 2G
volumes:
- /dev/shm:/dev/shm # Avoid disk I/O for Chrome
curl-pool:
image: extraction-agent/curl-cffi:latest
environment:
- REDIS_URL=redis://redis:6379
deploy:
replicas: 20 # Higher concurrency for lightweight clients
resources:
limits:
cpus: '0.5'
memory: 512M
redis:
image: redis:7-alpine
command: redis-server --maxmemory 4gb --maxmemory-policy allkeys-lru
volumes:
- redis-data:/data
volumes:
redis-data:
```
### 6.2 Dockerfile for Camoufox Container
```dockerfile
FROM python:3.11-slim
# Install dependencies for Playwright + Camoufox
RUN apt-get update && apt-get install -y \
wget gnupg ca-certificates \
fonts-liberation libasound2 libatk-bridge2.0-0 \
libatk1.0-0 libcups2 libdbus-1-3 libgdk-pixbuf2.0-0 \
libnspr4 libnss3 libx11-xcb1 libxcomposite1 \
libxdamage1 libxrandr2 xdg-utils \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Install Playwright browsers
RUN playwright install chromium
# Install Camoufox
RUN pip install camoufox
COPY . .
# Use tini to handle zombie processes
RUN apt-get update && apt-get install -y tini
ENTRYPOINT ["/usr/bin/tini", "--"]
CMD ["python", "camoufox_worker.py"]
```
### 6.3 CI/CD Pipeline for Browser Binary Updates
```yaml
# .github/workflows/update-browsers.yml
name: Update Browser Binaries
on:
schedule:
- cron: '0 2 * * 1' # Weekly on Monday 2 AM
workflow_dispatch:
jobs:
update-and-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Update Playwright
run: |
pip install -U playwright
playwright install chromium
- name: Update Camoufox
run: pip install -U camoufox
- name: Extract Browser Versions
id: versions
run: |
CHROME_VERSION=$(playwright chromium --version)
echo "chrome=$CHROME_VERSION" >> $GITHUB_OUTPUT
- name: Update BrowserForge Profiles
run: python scripts/update_fingerprints.py --chrome-version ${{ steps.versions.outputs.chrome }}
- name: Run Fingerprint Tests
run: pytest tests/test_fingerprint_consistency.py
- name: Build and Push Docker Images
run: |
docker build -t extraction-agent/camoufox:${{ steps.versions.outputs.chrome }} .
docker push extraction-agent/camoufox:${{ steps.versions.outputs.chrome }}
docker tag extraction-agent/camoufox:${{ steps.versions.outputs.chrome }} extraction-agent/camoufox:latest
docker push extraction-agent/camoufox:latest
```
### 6.4 Mobile Proxy Integration
```python
class MobileProxyProvider:
"""
Integration with mobile proxy providers (e.g., Oxylabs, Smartproxy).
Leverages CGNAT for high IP reputation.
"""
def __init__(self, api_key: str, country_code: str = 'us'):
self.api_key = api_key
self.country_code = country_code
self.session_cache = {}
def get_proxy_for_session(self, session_id: str, sticky: bool = True) -> str:
"""
Obtain proxy URL with session persistence.
sticky=True: Same session_id returns same IP (via session parameter)
sticky=False: Each call rotates IP
"""
if sticky and session_id in self.session_cache:
return self.session_cache[session_id]
# Format: http://user-APIKEY-country-US-session-SESSION:pass@proxy.provider.com:7777
if sticky:
proxy_url = (
f"http://user-{self.api_key}-country-{self.country_code}"
f"-session-{session_id}:pass@mobile-proxy.oxylabs.io:7777"
)
self.session_cache[session_id] = proxy_url
else:
# Omit session parameter for rotation
proxy_url = (
f"http://user-{self.api_key}-country-{self.country_code}"
f":pass@mobile-proxy.oxylabs.io:7777"
)
return proxy_url
def release_session(self, session_id: str):
"""Release sticky session to allow cooldown."""
if session_id in self.session_cache:
del self.session_cache[session_id]
```
---
## 7. Advanced Evasion Techniques
### 7.1 DOM Mutation Rate Control
Adversarial systems analyze the rate of DOM mutations to detect automation. Legitimate users interact with UI elements progressively; bots often mutate the DOM at superhuman speeds.
```python
class DOMInteractionThrottler:
"""
Ensure DOM mutations occur at human-plausible rates.
"""
async def click_with_throttle(self, page: Page, selector: str):
"""
Click with pre-hover delay and post-click pause.
"""
element = await page.wait_for_selector(selector)
# Pre-hover delay (humans don't click instantly)
await self.ghost_cursor.move_to_element(page, element)
await asyncio.sleep(random.uniform(0.3, 0.8))
# Click
await element.click()
# Post-click pause (reaction time to visual feedback)
await asyncio.sleep(random.uniform(0.2, 0.5))
async def fill_form_with_throttle(self, page: Page, selector: str, text: str):
"""
Type with keystroke dynamics.
"""
await page.focus(selector)
for char in text:
await page.keyboard.type(char)
# Inter-keystroke interval: 80-200ms (human typing speed)
await asyncio.sleep(random.uniform(0.08, 0.2))
# Pause after completion (review behavior)
await asyncio.sleep(random.uniform(0.5, 1.5))
```
### 7.2 Canvas Fingerprint Noise Injection
Canvas fingerprinting generates unique hardware signatures. We inject **deterministic noise** based on the BrowserForge profile seed:
```python
class CanvasNoiseInjector:
def __init__(self, seed: int):
self.rng = random.Random(seed)
def generate_injection_script(self) -> str:
"""
Generate JavaScript to inject into page context.
Modifies canvas rendering at sub-pixel level.
"""
# Generate deterministic noise array
noise = [self.rng.gauss(0, 0.0001) for _ in range(256)]
return f"""
(() => {{
const noise = {noise};
const originalGetImageData = CanvasRenderingContext2D.prototype.getImageData;
CanvasRenderingContext2D.prototype.getImageData = function(...args) {{
const imageData = originalGetImageData.apply(this, args);
// Inject sub-pixel noise
for (let i = 0; i < imageData.data.length; i++) {{
imageData.data[i] += Math.floor(noise[i % 256] * 255);
}}
return imageData;
}};
}})();
"""
```
### 7.3 WebGL Vendor Spoofing
```python
async def inject_webgl_spoofing(page: Page, vendor: str, renderer: str):
"""
Override WebGL parameters to match hardware profile.
"""
await page.add_init_script(f"""
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {{
if (parameter === 37445) {{ // UNMASKED_VENDOR_WEBGL
return '{vendor}';
}}
if (parameter === 37446) {{ // UNMASKED_RENDERER_WEBGL
return '{renderer}';
}}
return getParameter.call(this, parameter);
}};
""")
```
---
## 8. Monitoring & Observability
### 8.1 Metrics Collection
```python
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
auth_attempts = Counter('auth_attempts_total', 'Authentication attempts', ['result'])
session_duration = Histogram('session_duration_seconds', 'Session lifespan')
challenge_rate = Gauge('challenge_rate', 'Rate of challenges encountered')
extraction_throughput = Counter('extraction_requests_total', 'API extractions', ['status'])
class MetricsCollector:
@staticmethod
def record_auth_success():
auth_attempts.labels(result='success').inc()
@staticmethod
def record_auth_failure(reason: str):
auth_attempts.labels(result=reason).inc()
@staticmethod
def record_session_lifetime(duration: float):
session_duration.observe(duration)
@staticmethod
def update_challenge_rate(rate: float):
challenge_rate.set(rate)
```
### 8.2 Alerting Rules
```yaml
# prometheus-alerts.yml
groups:
- name: extraction_agent
interval: 30s
rules:
- alert: HighChallengeRate
expr: challenge_rate > 0.3
for: 5m
annotations:
summary: "Challenge rate exceeds 30%"
description: "Fingerprint may be burned, rotate profiles"
- alert: SessionDurationDrop
expr: rate(session_duration_seconds_sum[5m]) < 600
for: 10m
annotations:
summary: "Average session duration dropped below 10 minutes"
description: "Sessions being invalidated prematurely"
- alert: AuthFailureSpike
expr: rate(auth_attempts_total{result!="success"}[5m]) > 0.5
for: 5m
annotations:
summary: "Authentication failure rate > 50%"
description: "Possible detection or proxy issues"
```
---
## 9. Security Considerations
### 9.1 Session State Encryption
All session state stored in Redis must be encrypted at rest:
```python
from cryptography.fernet import Fernet
class EncryptedSessionStore:
def __init__(self, redis_client, encryption_key: bytes):
self.redis = redis_client
self.cipher = Fernet(encryption_key)
async def store(self, session_id: str, state: SessionState):
"""Encrypt and store session state."""
plaintext = state.serialize()
ciphertext = self.cipher.encrypt(plaintext)
await self.redis.setex(
name=f"session:{session_id}",
time=1800, # 30 minute TTL
value=ciphertext
)
async def retrieve(self, session_id: str) -> Optional[SessionState]:
"""Retrieve and decrypt session state."""
ciphertext = await self.redis.get(f"session:{session_id}")
if not ciphertext:
return None
plaintext = self.cipher.decrypt(ciphertext)
return SessionState.deserialize(plaintext)
```
### 9.2 Rate Limiting at Orchestrator Level
Prevent runaway resource consumption:
```python
from asyncio import Semaphore
class ResourceThrottler:
def __init__(self, max_concurrent_browsers: int = 10):
self.browser_semaphore = Semaphore(max_concurrent_browsers)
self.rate_limiter = {}
async def acquire_browser_slot(self):
"""Enforce maximum concurrent browser instances."""
await self.browser_semaphore.acquire()
def release_browser_slot(self):
self.browser_semaphore.release()
async def enforce_rate_limit(self, key: str, max_per_minute: int = 60):
"""Token bucket rate limiting per target domain."""
now = time.time()
if key not in self.rate_limiter:
self.rate_limiter[key] = {'tokens': max_per_minute, 'last_update': now}
bucket = self.rate_limiter[key]
elapsed = now - bucket['last_update']
bucket['tokens'] = min(max_per_minute, bucket['tokens'] + elapsed * (max_per_minute / 60))
bucket['last_update'] = now
if bucket['tokens'] < 1:
wait_time = (1 - bucket['tokens']) / (max_per_minute / 60)
await asyncio.sleep(wait_time)
bucket['tokens'] = 0
else:
bucket['tokens'] -= 1
```
---
## 10. Performance Benchmarks
### 10.1 Expected Throughput
Under optimal conditions with the specified stack:
| Phase | Metric | Value |
|-------|--------|-------|
| Authentication (Camoufox) | Time to cf_clearance | 8-15 seconds |
| Session Lifetime | Average duration | 25-35 minutes |
| Extraction (curl_cffi) | Requests per second (per session) | 2-5 RPS |
| Concurrent Sessions | Max per 2GB RAM node | 5 browser instances |
| Concurrent Extractors | Max per 512MB container | 20 curl instances |
### 10.2 Resource Consumption
```
Camoufox Container:
- Memory: 1.8-2.2 GB per instance
- CPU: 1.5-2.0 cores during auth
- Disk I/O: Minimal (using /dev/shm)
curl_cffi Container:
- Memory: 120-180 MB per instance
- CPU: 0.1-0.3 cores
- Network: 5-10 Mbps per instance
```
---
## 11. Failure Modes & Recovery
### 11.1 Challenge Detection
```python
class ChallengeHandler:
async def detect_and_handle(self, page: Page) -> bool:
"""
Detect Cloudflare, Akamai, or Datadome challenges.
"""
# Cloudflare Turnstile
if await page.query_selector('iframe[src*="challenges.cloudflare.com"]'):
return await self._handle_turnstile(page)
# Cloudflare legacy challenge
if await page.query_selector('#challenge-form'):
await asyncio.sleep(5) # Wait for auto-solve
return True
# Datadome
if 'datadome' in page.url.lower():
return await self._handle_datadome(page)
# PerimeterX
if await page.query_selector('[class*="_pxBlock"]'):
return await self._handle_perimeterx(page)
return True # No challenge detected
async def _handle_turnstile(self, page: Page) -> bool:
"""
Turnstile typically auto-solves with good fingerprints.
If interactive challenge appears, delegate to CAPTCHA service.
"""
await asyncio.sleep(3)
# Check if solved automatically
if not await page.query_selector('iframe[src*="challenges.cloudflare.com"]'):
return True
# Still present: delegate to 2Captcha
sitekey = await self._extract_turnstile_sitekey(page)
solution = await self._solve_captcha_external(page.url, sitekey)
await self._inject_captcha_solution(page, solution)
return True
```
### 11.2 Session Invalidation Recovery
```python
class SessionRecoveryManager:
async def handle_invalidation(self, session_id: str, reason: str):
"""
Recovery strategies based on invalidation reason.
"""
if reason == 'cf_clearance_expired':
# Normal expiration: re-authenticate
await self.orchestrator.trigger_reauth(session_id)
elif reason == 'ip_reputation_drop':
# Proxy burned: rotate and re-authenticate
await self.proxy_rotator.blacklist_current_proxy(session_id)
await self.orchestrator.trigger_reauth(session_id, force_new_proxy=True)
elif reason == 'fingerprint_detected':
# Fingerprint burned: generate new profile
await self.orchestrator.trigger_reauth(
session_id,
force_new_profile=True,
cooldown=300 # 5 minute cooldown before retry
)
elif reason == 'rate_limit':
# Backoff with exponential delay
await self.apply_exponential_backoff(session_id)
```
---
## 12. Compliance & Legal Considerations
**DISCLAIMER:** This architecture is designed for legitimate use cases including:
- Competitive intelligence gathering from public data
- Price monitoring and availability tracking
- Academic research in web security
- Penetration testing with explicit authorization
**Users must:**
1. Respect `robots.txt` directives
2. Implement rate limiting to avoid DoS
3. Obtain authorization before testing systems they do not own
4. Comply with CFAA (Computer Fraud and Abuse Act) and equivalent laws
5. Review and adhere to target website Terms of Service
**This architecture should never be used for:**
- Unauthorized access to protected systems
- Data exfiltration of personal information
- Circumventing paywall or authentication systems without permission
- Any activity prohibited by applicable law
---
## 13. Conclusion
This Architecture Definition Document provides a comprehensive blueprint for a high-fidelity extraction agent capable of defeating modern bot mitigation systems. The hybrid approach—leveraging Camoufox for authentication fidelity and curl_cffi for extraction throughput—represents the state-of-the-art in autonomous web interaction.
**Critical Success Factors:**
1. **Consistency:** Every fingerprint component must exhibit internal correlation
2. **Entropy:** Deterministic patterns are fatal; inject controlled chaos
3. **Behavioral Fidelity:** Human behavior is complex; simple models fail
4. **State Management:** The handover protocol is the weakest link; secure it rigorously
5. **Monitoring:** Silent failures cascade; observe everything
**Future Enhancements:**
- Machine learning-based behavior generation trained on real user sessions
- Adaptive fingerprint rotation based on challenge rate feedback
- Distributed orchestration for global scaling
- Integration with computer vision for advanced CAPTCHA solving
This architecture represents 30 years of systems engineering distilled into a production-ready design. Implementation requires rigorous testing, continuous monitoring, and ethical deployment.
---
**End of Document**