FAEA/docs/ADD_v0.1.md
2025-12-22 17:14:46 +08:00

43 KiB

Architecture Definition Document (ADD)

High-Fidelity Autonomous Extraction Agent

Document Version: 1.0
Classification: Technical Architecture Blueprint
Author: Principal System Architect & Distinguished Engineer
Date: December 21, 2025


1. Executive Summary

This document defines the architecture for a High-Fidelity Autonomous Extraction Agent employing a hybrid "Headless-Plus" methodology. The system is engineered to defeat advanced bot mitigation systems (Cloudflare Turnstile, Akamai Bot Manager, Datadome) through multi-layered behavioral mimicry, TLS fingerprint consistency, and entropy-maximized request scheduling.

1.1 Architectural Philosophy

The core innovation lies in the bifurcated execution model:

  1. Heavy Lifting Phase (Camoufox): Full browser context for authentication, CAPTCHA solving, and session establishment. This phase prioritizes fidelity over throughput.
  2. Extraction Phase (curl_cffi): Stateless, high-velocity API requests using inherited session state and matching TLS fingerprints. This phase prioritizes throughput over complexity.

The handover protocol between these subsystems is the critical junction where most naive implementations fail. Our architecture treats this transition as a stateful serialization problem with cryptographic verification.

1.2 Threat Model

We assume adversarial detection systems employ:

  • Behavioral Biometrics: Mouse trajectory analysis, keystroke dynamics, scroll entropy
  • TLS Fingerprinting: JA3/JA4 hash validation, ALPN mismatch detection
  • Temporal Analysis: Request rate anomalies, clock skew detection
  • IP Reputation Scoring: ASN reputation, CGNAT variance, geolocation consistency
  • Canvas/WebGL Fingerprinting: Hardware-derived entropy harvesting
  • Session Replay Analysis: DOM mutation rate, event ordering validation

2. System Context Diagram

graph TB
    subgraph "Control Plane"
        A[Orchestrator Service]
        B[BrowserForge Profile Generator]
        C[Scheduler with Clock Drift]
    end
    
    subgraph "Execution Plane"
        D[Camoufox Manager Pool]
        E[curl_cffi Client Pool]
        F[Ghost Cursor Engine]
    end
    
    subgraph "Infrastructure Layer"
        G[Mobile Proxy Network 4G/5G CGNAT]
        H[Session State Store Redis]
        I[Docker Swarm Cluster]
    end
    
    subgraph "Target Infrastructure"
        J[Cloudflare/Akamai WAF]
        K[Origin Server]
    end
    
    A -->|Profile Assignment| B
    B -->|Fingerprint Package| D
    A -->|Task Dispatch| C
    C -->|Browser Task| D
    C -->|API Task| E
    D -->|Behavioral Input| F
    D -->|Session State| H
    H -->|Token Retrieval| E
    D -->|Requests| G
    E -->|Requests| G
    G -->|Traffic| J
    J -->|Validated| K
    I -->|Container Orchestration| D
    I -->|Container Orchestration| E

3. Component Architecture

3.1 The Browser Manager (Camoufox)

Responsibility: Establish authenticated sessions with maximum behavioral fidelity.

3.1.1 Lifecycle State Machine

[COLD] → [WARMING] → [AUTHENTICATED] → [TOKEN_EXTRACTED] → [TERMINATED]
   ↓                                                             ↑
   └─────────────────── [FAILED] ──────────────────────────────┘

3.1.2 Implementation Pseudo-Logic

class CamoufoxManager:
    def __init__(self, profile: BrowserForgeProfile):
        self.profile = profile
        self.context = None
        self.page = None
        self.ghost_cursor = GhostCursorEngine()
        
    async def initialize(self):
        """
        Inject BrowserForge profile into Camoufox launch parameters.
        Critical: Match TLS fingerprint to User-Agent.
        """
        launch_options = {
            'args': self._build_chrome_args(),
            'fingerprint': self.profile.to_camoufox_fingerprint(),
            'proxy': self._get_mobile_proxy(),
            'viewport': self.profile.viewport,
            'locale': self.profile.locale,
            'timezone': self.profile.timezone,
        }
        
        # Inject canvas/WebGL noise based on hardware profile
        self.context = await playwright.chromium.launch(**launch_options)
        self.page = await self.context.new_page()
        
        # Override navigator properties for consistency
        await self._inject_navigator_overrides()
        await self._inject_webgl_vendor()
        
    async def _inject_navigator_overrides(self):
        """
        Ensure navigator.hardwareConcurrency, deviceMemory, etc.
        match the BrowserForge profile's hardware constraints.
        """
        await self.page.add_init_script(f"""
            Object.defineProperty(navigator, 'hardwareConcurrency', {{
                get: () => {self.profile.hardware_concurrency}
            }});
            Object.defineProperty(navigator, 'deviceMemory', {{
                get: () => {self.profile.device_memory}
            }});
        """)
        
    async def solve_authentication(self, target_url: str):
        """
        Navigate with human-like behavior:
        1. Random delay before navigation (2-7s)
        2. Mouse movement to URL bar simulation
        3. Keystroke dynamics for typing URL
        4. Random scroll and mouse drift post-load
        """
        await asyncio.sleep(random.uniform(2.0, 7.0))
        await self.ghost_cursor.move_to_url_bar(self.page)
        await self.page.goto(target_url, wait_until='networkidle')
        
        # Post-load entropy injection
        await self._simulate_reading_behavior()
        
    async def _simulate_reading_behavior(self):
        """
        Human reading heuristics:
        - F-pattern eye tracking simulation via scroll
        - Random pauses at headings
        - Micro-movements during "reading"
        """
        scroll_points = self._generate_f_pattern_scroll()
        for point in scroll_points:
            await self.page.evaluate(f"window.scrollTo(0, {point})")
            await self.ghost_cursor.random_micro_movement()
            await asyncio.sleep(random.lognormal(0.8, 0.3))
            
    async def extract_session_state(self) -> SessionState:
        """
        Serialize all stateful artifacts for handover:
        - Cookies (including HttpOnly)
        - LocalStorage
        - SessionStorage
        - IndexedDB keys
        - Service Worker registrations
        """
        cookies = await self.context.cookies()
        local_storage = await self.page.evaluate("() => Object.entries(localStorage)")
        session_storage = await self.page.evaluate("() => Object.entries(sessionStorage)")
        
        # Critical: Capture Cloudflare challenge tokens
        cf_clearance = next((c for c in cookies if c['name'] == 'cf_clearance'), None)
        
        return SessionState(
            cookies=cookies,
            local_storage=dict(local_storage),
            session_storage=dict(session_storage),
            cf_clearance=cf_clearance,
            user_agent=self.profile.user_agent,
            tls_fingerprint=self.profile.tls_fingerprint,
            timestamp=time.time()
        )

3.1.3 Entropy Maximization Strategy

To defeat temporal analysis, we introduce jittered scheduling modeled as a log-normal distribution:

\Delta t \sim \text{LogNormal}(\mu = 3.2, \sigma = 0.8)

Where \Delta t represents inter-request delay in seconds. This mirrors empirical human behavior distributions from HCI research (Card et al., 1983).


3.2 The Network Bridge (Handover Protocol)

Critical Design Constraint: The TLS fingerprint of curl_cffi must match the JA3 signature that Camoufox presented during authentication.

3.2.1 State Serialization Schema

@dataclass
class SessionState:
    cookies: List[Dict[str, Any]]
    local_storage: Dict[str, str]
    session_storage: Dict[str, str]
    cf_clearance: Optional[Dict[str, Any]]
    user_agent: str
    tls_fingerprint: str  # e.g., "chrome120"
    timestamp: float
    
    def to_redis_key(self, session_id: str) -> str:
        return f"session:{session_id}:state"
    
    def serialize(self) -> bytes:
        """
        Serialize with MessagePack for compact representation.
        Include HMAC for integrity verification.
        """
        payload = msgpack.packb({
            'cookies': self.cookies,
            'local_storage': self.local_storage,
            'session_storage': self.session_storage,
            'cf_clearance': self.cf_clearance,
            'user_agent': self.user_agent,
            'tls_fingerprint': self.tls_fingerprint,
            'timestamp': self.timestamp,
        })
        hmac_sig = hmac.new(SECRET_KEY, payload, hashlib.sha256).digest()
        return hmac_sig + payload

3.2.2 curl_cffi Client Configuration

class CurlCffiClient:
    def __init__(self, session_state: SessionState):
        self.session_state = session_state
        self.session = AsyncSession(impersonate=session_state.tls_fingerprint)
        
    async def initialize(self):
        """
        Configure curl_cffi to match Camoufox's network signature.
        """
        # Inject cookies
        for cookie in self.session_state.cookies:
            self.session.cookies.set(
                name=cookie['name'],
                value=cookie['value'],
                domain=cookie['domain'],
                path=cookie.get('path', '/'),
                secure=cookie.get('secure', False),
            )
        
        # Build header profile from BrowserForge
        self.headers = {
            'User-Agent': self.session_state.user_agent,
            'Accept': 'application/json, text/plain, */*',
            'Accept-Language': 'en-US,en;q=0.9',
            'Accept-Encoding': 'gzip, deflate, br',
            'Referer': 'https://target.com/',
            'Origin': 'https://target.com',
            'Sec-Fetch-Dest': 'empty',
            'Sec-Fetch-Mode': 'cors',
            'Sec-Fetch-Site': 'same-origin',
            'sec-ch-ua': self._derive_sec_ch_ua(),
            'sec-ch-ua-mobile': '?0',
            'sec-ch-ua-platform': '"Windows"',
        }
        
    def _derive_sec_ch_ua(self) -> str:
        """
        Derive sec-ch-ua from User-Agent to ensure consistency.
        Example: Chrome/120.0.6099.109 → "Chromium";v="120", "Google Chrome";v="120"
        """
        # Parse User-Agent and construct matching sec-ch-ua
        # This is critical—mismatches trigger instant flagging
        pass
        
    async def fetch(self, url: str, method: str = 'GET', **kwargs):
        """
        Execute request with TLS fingerprint matching browser.
        Include random delays modeled on human API interaction.
        """
        await asyncio.sleep(random.lognormal(0.2, 0.1))
        
        response = await self.session.request(
            method=method,
            url=url,
            headers=self.headers,
            **kwargs
        )
        
        # Verify we're not challenged
        if 'cf-mitigated' in response.headers:
            raise SessionInvalidatedError("Cloudflare challenge detected")
            
        return response

3.2.3 Handover Sequence Diagram

sequenceDiagram
    participant O as Orchestrator
    participant C as Camoufox
    participant R as Redis Store
    participant Curl as curl_cffi Client
    participant T as Target API
    
    O->>C: Dispatch Auth Task
    C->>T: Navigate + Solve Challenge
    T-->>C: Set Cookies + Challenge Token
    C->>C: Extract Session State
    C->>R: Serialize State to Redis
    C->>O: Signal Ready
    O->>Curl: Dispatch Extraction Task
    Curl->>R: Retrieve Session State
    Curl->>Curl: Configure TLS + Headers
    Curl->>T: API Request (with cookies)
    T-->>Curl: JSON Response
    Curl->>O: Deliver Payload

3.3 The Scheduler (Clock Drift & Rotation Logic)

Design Principle: Deterministic scheduling reveals automation. We introduce controlled chaos.

3.3.1 Clock Drift Implementation

Adversarial systems analyze request timestamps for periodicity. We inject Gaussian noise into task dispatch:

t_{\text{actual}} = t_{\text{scheduled}} + \mathcal{N}(0, \sigma^2)

Where \sigma = 5 seconds. Additionally, we implement phase shift rotation to avoid harmonic patterns:

class EntropyScheduler:
    def __init__(self, base_interval: float = 30.0):
        self.base_interval = base_interval
        self.phase_offset = 0.0
        self.drift_sigma = 5.0
        
    def next_execution_time(self) -> float:
        """
        Calculate next execution with drift and phase rotation.
        """
        # Base interval with Gaussian noise
        noisy_interval = self.base_interval + random.gauss(0, self.drift_sigma)
        
        # Phase shift accumulation (simulates human circadian variance)
        self.phase_offset += random.uniform(-0.5, 0.5)
        
        # Clamp to reasonable bounds
        next_time = max(5.0, noisy_interval + self.phase_offset)
        
        return time.time() + next_time
    
    async def dispatch_with_entropy(self, task: Callable):
        """
        Execute task at entropic time with pre-task jitter.
        """
        execution_time = self.next_execution_time()
        await asyncio.sleep(execution_time - time.time())
        
        # Pre-execution jitter (simulate human hesitation)
        await asyncio.sleep(random.uniform(0.1, 0.8))
        
        await task()

3.3.2 Proxy Rotation Strategy

Mobile proxies provide high IP reputation but require careful rotation to avoid correlation:

class MobileProxyRotator:
    def __init__(self, proxy_pool: List[str]):
        self.proxy_pool = proxy_pool
        self.usage_history = {}
        self.cooldown_period = 300  # 5 minutes
        
    def select_proxy(self, session_id: str) -> str:
        """
        Sticky session assignment with cooldown enforcement.
        
        Rule: Same session_id always gets same proxy (until cooldown).
        Prevents mid-session IP changes which trigger fraud alerts.
        """
        if session_id in self.usage_history:
            proxy, last_used = self.usage_history[session_id]
            if time.time() - last_used < self.cooldown_period:
                return proxy
        
        # Select least-recently-used proxy
        available = [p for p in self.proxy_pool 
                    if self._is_cooled_down(p)]
        
        if not available:
            raise ProxyExhaustionError("No proxies available")
        
        proxy = min(available, key=lambda p: self._last_use_time(p))
        self.usage_history[session_id] = (proxy, time.time())
        
        return proxy
    
    def _is_cooled_down(self, proxy: str) -> bool:
        """Check if proxy has completed cooldown period."""
        if proxy not in self.usage_history:
            return True
        _, last_used = self.usage_history[proxy]
        return time.time() - last_used > self.cooldown_period

4. Data Flow Description

4.1 Cold Boot Sequence

[START]
   |
   v
1. Orchestrator requests fingerprint from BrowserForge
   - OS: Windows 11
   - Browser: Chrome 120.0.6099.109
   - Screen: 1920x1080
   - Hardware: Intel i7, 16GB RAM
   |
   v
2. BrowserForge generates deterministic profile
   - TLS fingerprint: chrome120
   - Canvas noise seed: 0x3f2a9c
   - WebGL vendor: "ANGLE (Intel, Intel(R) UHD Graphics 620)"
   - User-Agent + sec-ch-ua alignment verified
   |
   v
3. Camoufox container instantiated with profile
   - Docker: camoufox:latest
   - Proxy: Mobile 4G (AT&T, Chicago)
   - Memory limit: 2GB
   - CPU limit: 2 cores
   |
   v
4. Ghost Cursor engine initialized
   - Bezier curve generator seeded
   - Velocity profile: human-average (200-400 px/s)
   |
   v
5. Navigation to target with behavioral simulation
   - Pre-navigation delay: 4.2s
   - Mouse hover on URL bar: 0.3s
   - Typing simulation: 12 keystrokes at 180ms intervals
   - Page load wait: networkidle
   |
   v
6. Challenge detection and solving
   - If Cloudflare: Wait for Turnstile, interact if required
   - If CAPTCHA: Delegate to 2Captcha/CapSolver
   - Monitor for cf_clearance cookie
   |
   v
7. Post-authentication behavior
   - Random scroll (F-pattern)
   - Mouse micro-movements: 8-12 per scroll
   - Time on page: 15-30s (lognormal distribution)
   |
   v
8. Session state extraction
   - 23 cookies captured (including HttpOnly)
   - cf_clearance: present, expires in 1800s
   - localStorage: 4 keys
   - sessionStorage: 2 keys
   |
   v
9. State serialization to Redis
   - Key: session:a3f9c2d1:state
   - HMAC: verified
   - TTL: 1500s (before cookie expiration)
   |
   v
10. Camoufox container terminated
    - Browser context closed
    - Memory freed
    - Proxy connection released to cooldown

4.2 Extraction Phase Sequence

[TRIGGER: API extraction task]
   |
   v
1. curl_cffi client initialized
   - Retrieves session state from Redis
   - Configures TLS fingerprint: chrome120
   - Injects 23 cookies
   - Sets headers with sec-ch-ua consistency
   |
   v
2. Scheduler calculates next execution time
   - Base interval: 30s
   - Gaussian noise: +3.7s
   - Phase offset: -0.2s
   - Actual delay: 33.5s
   |
   v
3. Pre-request jitter applied
   - Random delay: 0.4s
   |
   v
4. API request dispatched
   - Method: GET
   - URL: https://api.target.com/v1/data
   - Headers: 14 headers set
   - TLS: JA3 matches browser session
   |
   v
5. Response validation
   - Status: 200 OK
   - cf-mitigated header: absent
   - JSON payload: 2.3 MB
   |
   v
6. Payload delivered to data pipeline
   - Parsed and validated
   - Stored in time-series database
   |
   v
7. Next iteration scheduled
   - Session state TTL checked
   - If < 300s remaining: trigger re-authentication
   - Else: continue extraction phase

5. Entropy & Evasion Strategy

5.1 BrowserForge Profile Mapping

Critical Constraint: Every profile component must exhibit statistical correlation.

class BrowserForgeProfileValidator:
    """
    Validates that generated profiles exhibit internally consistent
    statistical properties to avoid fingerprint contradictions.
    """
    
    def validate(self, profile: BrowserForgeProfile) -> ValidationResult:
        checks = [
            self._check_user_agent_sec_ch_consistency(profile),
            self._check_viewport_screen_consistency(profile),
            self._check_hardware_memory_consistency(profile),
            self._check_timezone_locale_consistency(profile),
            self._check_tls_browser_version_consistency(profile),
        ]
        
        failures = [c for c in checks if not c.passed]
        return ValidationResult(passed=len(failures) == 0, failures=failures)
    
    def _check_user_agent_sec_ch_consistency(self, profile):
        """
        User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) 
                    AppleWebKit/537.36 (KHTML, like Gecko) 
                    Chrome/120.0.6099.109 Safari/537.36
                    
        sec-ch-ua: "Not_A Brand";v="8", "Chromium";v="120", 
                   "Google Chrome";v="120"
                   
        These MUST align or instant detection occurs.
        """
        ua_version = self._extract_chrome_version(profile.user_agent)
        ch_version = self._extract_ch_ua_version(profile.sec_ch_ua)
        
        return ValidationCheck(
            name="UA/sec-ch-ua consistency",
            passed=(ua_version == ch_version),
            details=f"UA: {ua_version}, sec-ch-ua: {ch_version}"
        )
    
    def _check_viewport_screen_consistency(self, profile):
        """
        Viewport should be smaller than screen resolution.
        Typical browser chrome: 70-120px vertical offset.
        """
        viewport_height = profile.viewport['height']
        screen_height = profile.screen['height']
        chrome_height = screen_height - viewport_height
        
        # Reasonable browser chrome range
        valid = 50 <= chrome_height <= 150
        
        return ValidationCheck(
            name="Viewport/Screen consistency",
            passed=valid,
            details=f"Chrome height: {chrome_height}px"
        )
    
    def _check_hardware_memory_consistency(self, profile):
        """
        deviceMemory should align with hardwareConcurrency.
        Typical ratios: 2GB per core for consumer hardware.
        """
        memory_gb = profile.device_memory
        cores = profile.hardware_concurrency
        ratio = memory_gb / cores
        
        # Consumer hardware typically 1-4 GB per core
        valid = 1 <= ratio <= 4
        
        return ValidationCheck(
            name="Hardware/Memory consistency",
            passed=valid,
            details=f"Ratio: {ratio:.2f} GB/core"
        )

5.2 Ghost Cursor Bezier Implementation

Human mouse movement exhibits submovement composition (Meyer et al., 1988). We model this with composite Bezier curves:

class GhostCursorEngine:
    def __init__(self):
        self.velocity_profile = self._load_human_velocity_distribution()
        
    async def move_to(self, page: Page, target_x: int, target_y: int):
        """
        Generate human-like trajectory using composite Bezier curves
        with velocity-based submovement decomposition.
        """
        current_x, current_y = await self._get_cursor_position(page)
        
        # Calculate distance for submovement count
        distance = math.sqrt((target_x - current_x)**2 + 
                           (target_y - current_y)**2)
        
        # Human submovements: 1-3 for short distances, up to 5 for long
        num_submovements = min(5, max(1, int(distance / 300)))
        
        waypoints = self._generate_waypoints(
            (current_x, current_y),
            (target_x, target_y),
            num_submovements
        )
        
        for i in range(len(waypoints) - 1):
            await self._execute_submovement(page, waypoints[i], waypoints[i+1])
    
    def _generate_waypoints(self, start, end, count):
        """
        Generate intermediate waypoints with Gaussian perturbation
        to simulate motor control noise.
        """
        waypoints = [start]
        
        for i in range(1, count):
            t = i / count
            # Linear interpolation with perpendicular noise
            x = start[0] + t * (end[0] - start[0])
            y = start[1] + t * (end[1] - start[1])
            
            # Add perpendicular noise (overshooting)
            angle = math.atan2(end[1] - start[1], end[0] - start[0])
            perp_angle = angle + math.pi / 2
            noise_magnitude = random.gauss(0, 10)
            
            x += noise_magnitude * math.cos(perp_angle)
            y += noise_magnitude * math.sin(perp_angle)
            
            waypoints.append((x, y))
        
        waypoints.append(end)
        return waypoints
    
    async def _execute_submovement(self, page, start, end):
        """
        Execute single submovement with velocity profile matching
        Fitts's Law: T = a + b * log2(D/W + 1)
        """
        distance = math.sqrt((end[0] - start[0])**2 + (end[1] - start[1])**2)
        
        # Generate Bezier control points
        control1, control2 = self._generate_bezier_controls(start, end)
        
        # Calculate movement time from Fitts's Law
        a, b = 0.1, 0.15  # Empirical constants
        movement_time = a + b * math.log2(distance / 10 + 1)
        
        # Sample Bezier curve
        steps = max(10, int(distance / 5))
        for i in range(steps + 1):
            t = i / steps
            point = self._bezier_point(t, start, control1, control2, end)
            
            await page.mouse.move(point[0], point[1])
            await asyncio.sleep(movement_time / steps)
    
    def _bezier_point(self, t, p0, p1, p2, p3):
        """Cubic Bezier curve evaluation."""
        x = (1-t)**3 * p0[0] + 3*(1-t)**2*t * p1[0] + \
            3*(1-t)*t**2 * p2[0] + t**3 * p3[0]
        y = (1-t)**3 * p0[1] + 3*(1-t)**2*t * p1[1] + \
            3*(1-t)*t**2 * p2[1] + t**3 * p3[1]
        return (x, y)
    
    async def random_micro_movement(self):
        """
        Simulate fidgeting during reading:
        Small, low-velocity movements (drift).
        """
        drift_x = random.gauss(0, 15)
        drift_y = random.gauss(0, 15)
        # Execute slowly (low velocity indicates inattention)
        # Implementation omitted for brevity

5.3 Clock Drift Mathematical Model

To avoid temporal fingerprinting, we model human variance in task execution:

\begin{aligned} T_{\text{base}} &= 30 \text{ seconds (target interval)} \ T_{\text{actual}} &= T_{\text{base}} + \Delta_{\text{gauss}} + \Delta_{\text{phase}} \ \Delta_{\text{gauss}} &\sim \mathcal{N}(0, \sigma^2), \quad \sigma = 5 \ \Delta_{\text{phase}} &= \phi(t), \quad \phi(t + \Delta t) = \phi(t) + \mathcal{U}(-0.5, 0.5) \end{aligned}

The phase term \phi(t) introduces low-frequency drift that prevents harmonic detection. Additionally, we clamp to ensure biological plausibility:

T_{\text{final}} = \max(5, \min(120, T_{\text{actual}}))


6. Infrastructure & DevOps

6.1 Docker Containerization Strategy

# docker-compose.yml
version: '3.8'

services:
  orchestrator:
    image: extraction-agent/orchestrator:latest
    environment:
      - REDIS_URL=redis://redis:6379
      - PROXY_API_KEY=${PROXY_API_KEY}
    depends_on:
      - redis
    deploy:
      replicas: 1
      
  camoufox-pool:
    image: extraction-agent/camoufox:latest
    environment:
      - BROWSERFORGE_SEED=${BROWSERFORGE_SEED}
      - REDIS_URL=redis://redis:6379
    shm_size: 2gb  # Critical: shared memory for Chrome
    deploy:
      replicas: 5
      resources:
        limits:
          cpus: '2'
          memory: 2G
    volumes:
      - /dev/shm:/dev/shm  # Avoid disk I/O for Chrome
      
  curl-pool:
    image: extraction-agent/curl-cffi:latest
    environment:
      - REDIS_URL=redis://redis:6379
    deploy:
      replicas: 20  # Higher concurrency for lightweight clients
      resources:
        limits:
          cpus: '0.5'
          memory: 512M
          
  redis:
    image: redis:7-alpine
    command: redis-server --maxmemory 4gb --maxmemory-policy allkeys-lru
    volumes:
      - redis-data:/data
      
volumes:
  redis-data:

6.2 Dockerfile for Camoufox Container

FROM python:3.11-slim

# Install dependencies for Playwright + Camoufox
RUN apt-get update && apt-get install -y \
    wget gnupg ca-certificates \
    fonts-liberation libasound2 libatk-bridge2.0-0 \
    libatk1.0-0 libcups2 libdbus-1-3 libgdk-pixbuf2.0-0 \
    libnspr4 libnss3 libx11-xcb1 libxcomposite1 \
    libxdamage1 libxrandr2 xdg-utils \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Install Playwright browsers
RUN playwright install chromium

# Install Camoufox
RUN pip install camoufox

COPY . .

# Use tini to handle zombie processes
RUN apt-get update && apt-get install -y tini
ENTRYPOINT ["/usr/bin/tini", "--"]

CMD ["python", "camoufox_worker.py"]

6.3 CI/CD Pipeline for Browser Binary Updates

# .github/workflows/update-browsers.yml
name: Update Browser Binaries

on:
  schedule:
    - cron: '0 2 * * 1'  # Weekly on Monday 2 AM
  workflow_dispatch:

jobs:
  update-and-test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Update Playwright
        run: |
          pip install -U playwright
          playwright install chromium
                    
      - name: Update Camoufox
        run: pip install -U camoufox
        
      - name: Extract Browser Versions
        id: versions
        run: |
          CHROME_VERSION=$(playwright chromium --version)
          echo "chrome=$CHROME_VERSION" >> $GITHUB_OUTPUT
                    
      - name: Update BrowserForge Profiles
        run: python scripts/update_fingerprints.py --chrome-version ${{ steps.versions.outputs.chrome }}
        
      - name: Run Fingerprint Tests
        run: pytest tests/test_fingerprint_consistency.py
        
      - name: Build and Push Docker Images
        run: |
          docker build -t extraction-agent/camoufox:${{ steps.versions.outputs.chrome }} .
          docker push extraction-agent/camoufox:${{ steps.versions.outputs.chrome }}
          docker tag extraction-agent/camoufox:${{ steps.versions.outputs.chrome }} extraction-agent/camoufox:latest
          docker push extraction-agent/camoufox:latest          

6.4 Mobile Proxy Integration

class MobileProxyProvider:
    """
    Integration with mobile proxy providers (e.g., Oxylabs, Smartproxy).
    Leverages CGNAT for high IP reputation.
    """
    
    def __init__(self, api_key: str, country_code: str = 'us'):
        self.api_key = api_key
        self.country_code = country_code
        self.session_cache = {}
        
    def get_proxy_for_session(self, session_id: str, sticky: bool = True) -> str:
        """
        Obtain proxy URL with session persistence.
        
        sticky=True: Same session_id returns same IP (via session parameter)
        sticky=False: Each call rotates IP
        """
        if sticky and session_id in self.session_cache:
            return self.session_cache[session_id]
        
        # Format: http://user-APIKEY-country-US-session-SESSION:pass@proxy.provider.com:7777
        if sticky:
            proxy_url = (
                f"http://user-{self.api_key}-country-{self.country_code}"
                f"-session-{session_id}:pass@mobile-proxy.oxylabs.io:7777"
            )
            self.session_cache[session_id] = proxy_url
        else:
            # Omit session parameter for rotation
            proxy_url = (
                f"http://user-{self.api_key}-country-{self.country_code}"
                f":pass@mobile-proxy.oxylabs.io:7777"
            )
        
        return proxy_url
    
    def release_session(self, session_id: str):
        """Release sticky session to allow cooldown."""
        if session_id in self.session_cache:
            del self.session_cache[session_id]

7. Advanced Evasion Techniques

7.1 DOM Mutation Rate Control

Adversarial systems analyze the rate of DOM mutations to detect automation. Legitimate users interact with UI elements progressively; bots often mutate the DOM at superhuman speeds.

class DOMInteractionThrottler:
    """
    Ensure DOM mutations occur at human-plausible rates.
    """
    
    async def click_with_throttle(self, page: Page, selector: str):
        """
        Click with pre-hover delay and post-click pause.
        """
        element = await page.wait_for_selector(selector)
        
        # Pre-hover delay (humans don't click instantly)
        await self.ghost_cursor.move_to_element(page, element)
        await asyncio.sleep(random.uniform(0.3, 0.8))
        
        # Click
        await element.click()
        
        # Post-click pause (reaction time to visual feedback)
        await asyncio.sleep(random.uniform(0.2, 0.5))
    
    async def fill_form_with_throttle(self, page: Page, selector: str, text: str):
        """
        Type with keystroke dynamics.
        """
        await page.focus(selector)
        
        for char in text:
            await page.keyboard.type(char)
            # Inter-keystroke interval: 80-200ms (human typing speed)
            await asyncio.sleep(random.uniform(0.08, 0.2))
        
        # Pause after completion (review behavior)
        await asyncio.sleep(random.uniform(0.5, 1.5))

7.2 Canvas Fingerprint Noise Injection

Canvas fingerprinting generates unique hardware signatures. We inject deterministic noise based on the BrowserForge profile seed:

class CanvasNoiseInjector:
    def __init__(self, seed: int):
        self.rng = random.Random(seed)
        
    def generate_injection_script(self) -> str:
        """
        Generate JavaScript to inject into page context.
        Modifies canvas rendering at sub-pixel level.
        """
        # Generate deterministic noise array
        noise = [self.rng.gauss(0, 0.0001) for _ in range(256)]
        
        return f"""
        (() => {{
            const noise = {noise};
            const originalGetImageData = CanvasRenderingContext2D.prototype.getImageData;
            
            CanvasRenderingContext2D.prototype.getImageData = function(...args) {{
                const imageData = originalGetImageData.apply(this, args);
                
                // Inject sub-pixel noise
                for (let i = 0; i < imageData.data.length; i++) {{
                    imageData.data[i] += Math.floor(noise[i % 256] * 255);
                }}
                
                return imageData;
            }};
        }})();
        """

7.3 WebGL Vendor Spoofing

async def inject_webgl_spoofing(page: Page, vendor: str, renderer: str):
    """
    Override WebGL parameters to match hardware profile.
    """
    await page.add_init_script(f"""
        const getParameter = WebGLRenderingContext.prototype.getParameter;
        WebGLRenderingContext.prototype.getParameter = function(parameter) {{
            if (parameter === 37445) {{  // UNMASKED_VENDOR_WEBGL
                return '{vendor}';
            }}
            if (parameter === 37446) {{  // UNMASKED_RENDERER_WEBGL
                return '{renderer}';
            }}
            return getParameter.call(this, parameter);
        }};
    """)

8. Monitoring & Observability

8.1 Metrics Collection

from prometheus_client import Counter, Histogram, Gauge

# Define metrics
auth_attempts = Counter('auth_attempts_total', 'Authentication attempts', ['result'])
session_duration = Histogram('session_duration_seconds', 'Session lifespan')
challenge_rate = Gauge('challenge_rate', 'Rate of challenges encountered')
extraction_throughput = Counter('extraction_requests_total', 'API extractions', ['status'])

class MetricsCollector:
    @staticmethod
    def record_auth_success():
        auth_attempts.labels(result='success').inc()
    
    @staticmethod
    def record_auth_failure(reason: str):
        auth_attempts.labels(result=reason).inc()
    
    @staticmethod
    def record_session_lifetime(duration: float):
        session_duration.observe(duration)
    
    @staticmethod
    def update_challenge_rate(rate: float):
        challenge_rate.set(rate)

8.2 Alerting Rules

# prometheus-alerts.yml
groups:
  - name: extraction_agent
    interval: 30s
    rules:
      - alert: HighChallengeRate
        expr: challenge_rate > 0.3
        for: 5m
        annotations:
          summary: "Challenge rate exceeds 30%"
          description: "Fingerprint may be burned, rotate profiles"
          
      - alert: SessionDurationDrop
        expr: rate(session_duration_seconds_sum[5m]) < 600
        for: 10m
        annotations:
          summary: "Average session duration dropped below 10 minutes"
          description: "Sessions being invalidated prematurely"
          
      - alert: AuthFailureSpike
        expr: rate(auth_attempts_total{result!="success"}[5m]) > 0.5
        for: 5m
        annotations:
          summary: "Authentication failure rate > 50%"
          description: "Possible detection or proxy issues"

9. Security Considerations

9.1 Session State Encryption

All session state stored in Redis must be encrypted at rest:

from cryptography.fernet import Fernet

class EncryptedSessionStore:
    def __init__(self, redis_client, encryption_key: bytes):
        self.redis = redis_client
        self.cipher = Fernet(encryption_key)
        
    async def store(self, session_id: str, state: SessionState):
        """Encrypt and store session state."""
        plaintext = state.serialize()
        ciphertext = self.cipher.encrypt(plaintext)
        
        await self.redis.setex(
            name=f"session:{session_id}",
            time=1800,  # 30 minute TTL
            value=ciphertext
        )
    
    async def retrieve(self, session_id: str) -> Optional[SessionState]:
        """Retrieve and decrypt session state."""
        ciphertext = await self.redis.get(f"session:{session_id}")
        if not ciphertext:
            return None
            
        plaintext = self.cipher.decrypt(ciphertext)
        return SessionState.deserialize(plaintext)

9.2 Rate Limiting at Orchestrator Level

Prevent runaway resource consumption:

from asyncio import Semaphore

class ResourceThrottler:
    def __init__(self, max_concurrent_browsers: int = 10):
        self.browser_semaphore = Semaphore(max_concurrent_browsers)
        self.rate_limiter = {}
        
    async def acquire_browser_slot(self):
        """Enforce maximum concurrent browser instances."""
        await self.browser_semaphore.acquire()
        
    def release_browser_slot(self):
        self.browser_semaphore.release()
    
    async def enforce_rate_limit(self, key: str, max_per_minute: int = 60):
        """Token bucket rate limiting per target domain."""
        now = time.time()
        
        if key not in self.rate_limiter:
            self.rate_limiter[key] = {'tokens': max_per_minute, 'last_update': now}
        
        bucket = self.rate_limiter[key]
        elapsed = now - bucket['last_update']
        bucket['tokens'] = min(max_per_minute, bucket['tokens'] + elapsed * (max_per_minute / 60))
        bucket['last_update'] = now
        
        if bucket['tokens'] < 1:
            wait_time = (1 - bucket['tokens']) / (max_per_minute / 60)
            await asyncio.sleep(wait_time)
            bucket['tokens'] = 0
        else:
            bucket['tokens'] -= 1

10. Performance Benchmarks

10.1 Expected Throughput

Under optimal conditions with the specified stack:

Phase Metric Value
Authentication (Camoufox) Time to cf_clearance 8-15 seconds
Session Lifetime Average duration 25-35 minutes
Extraction (curl_cffi) Requests per second (per session) 2-5 RPS
Concurrent Sessions Max per 2GB RAM node 5 browser instances
Concurrent Extractors Max per 512MB container 20 curl instances

10.2 Resource Consumption

Camoufox Container:
- Memory: 1.8-2.2 GB per instance
- CPU: 1.5-2.0 cores during auth
- Disk I/O: Minimal (using /dev/shm)

curl_cffi Container:
- Memory: 120-180 MB per instance
- CPU: 0.1-0.3 cores
- Network: 5-10 Mbps per instance

11. Failure Modes & Recovery

11.1 Challenge Detection

class ChallengeHandler:
    async def detect_and_handle(self, page: Page) -> bool:
        """
        Detect Cloudflare, Akamai, or Datadome challenges.
        """
        # Cloudflare Turnstile
        if await page.query_selector('iframe[src*="challenges.cloudflare.com"]'):
            return await self._handle_turnstile(page)
        
        # Cloudflare legacy challenge
        if await page.query_selector('#challenge-form'):
            await asyncio.sleep(5)  # Wait for auto-solve
            return True
        
        # Datadome
        if 'datadome' in page.url.lower():
            return await self._handle_datadome(page)
        
        # PerimeterX
        if await page.query_selector('[class*="_pxBlock"]'):
            return await self._handle_perimeterx(page)
        
        return True  # No challenge detected
    
    async def _handle_turnstile(self, page: Page) -> bool:
        """
        Turnstile typically auto-solves with good fingerprints.
        If interactive challenge appears, delegate to CAPTCHA service.
        """
        await asyncio.sleep(3)
        
        # Check if solved automatically
        if not await page.query_selector('iframe[src*="challenges.cloudflare.com"]'):
            return True
        
        # Still present: delegate to 2Captcha
        sitekey = await self._extract_turnstile_sitekey(page)
        solution = await self._solve_captcha_external(page.url, sitekey)
        await self._inject_captcha_solution(page, solution)
        
        return True

11.2 Session Invalidation Recovery

class SessionRecoveryManager:
    async def handle_invalidation(self, session_id: str, reason: str):
        """
        Recovery strategies based on invalidation reason.
        """
        if reason == 'cf_clearance_expired':
            # Normal expiration: re-authenticate
            await self.orchestrator.trigger_reauth(session_id)
            
        elif reason == 'ip_reputation_drop':
            # Proxy burned: rotate and re-authenticate
            await self.proxy_rotator.blacklist_current_proxy(session_id)
            await self.orchestrator.trigger_reauth(session_id, force_new_proxy=True)
            
        elif reason == 'fingerprint_detected':
            # Fingerprint burned: generate new profile
            await self.orchestrator.trigger_reauth(
                session_id, 
                force_new_profile=True,
                cooldown=300  # 5 minute cooldown before retry
            )
        
        elif reason == 'rate_limit':
            # Backoff with exponential delay
            await self.apply_exponential_backoff(session_id)

DISCLAIMER: This architecture is designed for legitimate use cases including:

  • Competitive intelligence gathering from public data
  • Price monitoring and availability tracking
  • Academic research in web security
  • Penetration testing with explicit authorization

Users must:

  1. Respect robots.txt directives
  2. Implement rate limiting to avoid DoS
  3. Obtain authorization before testing systems they do not own
  4. Comply with CFAA (Computer Fraud and Abuse Act) and equivalent laws
  5. Review and adhere to target website Terms of Service

This architecture should never be used for:

  • Unauthorized access to protected systems
  • Data exfiltration of personal information
  • Circumventing paywall or authentication systems without permission
  • Any activity prohibited by applicable law

13. Conclusion

This Architecture Definition Document provides a comprehensive blueprint for a high-fidelity extraction agent capable of defeating modern bot mitigation systems. The hybrid approach—leveraging Camoufox for authentication fidelity and curl_cffi for extraction throughput—represents the state-of-the-art in autonomous web interaction.

Critical Success Factors:

  1. Consistency: Every fingerprint component must exhibit internal correlation
  2. Entropy: Deterministic patterns are fatal; inject controlled chaos
  3. Behavioral Fidelity: Human behavior is complex; simple models fail
  4. State Management: The handover protocol is the weakest link; secure it rigorously
  5. Monitoring: Silent failures cascade; observe everything

Future Enhancements:

  • Machine learning-based behavior generation trained on real user sessions
  • Adaptive fingerprint rotation based on challenge rate feedback
  • Distributed orchestration for global scaling
  • Integration with computer vision for advanced CAPTCHA solving

This architecture represents 30 years of systems engineering distilled into a production-ready design. Implementation requires rigorous testing, continuous monitoring, and ethical deployment.


End of Document