FAEA/docs/TDD_v0.1.md
2025-12-22 17:14:46 +08:00

55 KiB
Raw Blame History

Technical Design Document (TDD)

High-Fidelity Autonomous Extraction Agent (FAEA)

Document Version: 0.1
Based On: ADD v0.1
Classification: Technical Implementation Blueprint
Author: System Architect
Date: December 22, 2025


Table of Contents

  1. Overview
  2. System Architecture
  3. Module Specifications
  4. Data Models & Interfaces
  5. API Contracts
  6. Database Design
  7. Core Algorithms
  8. Configuration Management
  9. Error Handling Strategy
  10. Testing Strategy
  11. Deployment Specifications
  12. Performance Requirements
  13. Security Implementation
  14. Appendices

1. Overview

1.1 Purpose

This Technical Design Document translates the architectural vision from ADD v0.1 into implementable specifications. It provides developers with detailed class hierarchies, interface definitions, algorithm implementations, and testing strategies.

1.2 Scope

In Scope Out of Scope
Camoufox browser manager implementation ML-based behavior generation (future)
curl_cffi client pool management Distributed orchestration (future)
Redis session state management Computer vision CAPTCHA solving
BrowserForge profile generation Third-party CAPTCHA API implementation
Ghost Cursor behavioral engine
Entropy scheduler implementation

1.3 Document Conventions

[MUST] - Mandatory requirement
[SHOULD] - Strongly recommended
[MAY] - Optional enhancement

2. System Architecture

2.1 High-Level Module Decomposition

graph TB
    subgraph "Core Modules"
        A[faea.core.orchestrator]
        B[faea.core.scheduler]
        C[faea.core.config]
    end
    
    subgraph "Browser Subsystem"
        D[faea.browser.camoufox_manager]
        E[faea.browser.profile_generator]
        F[faea.browser.ghost_cursor]
        G[faea.browser.challenge_handler]
    end
    
    subgraph "Network Subsystem"
        H[faea.network.curl_client]
        I[faea.network.proxy_rotator]
        J[faea.network.session_store]
    end
    
    subgraph "Utilities"
        K[faea.utils.entropy]
        L[faea.utils.crypto]
        M[faea.utils.metrics]
    end
    
    A --> B
    A --> D
    A --> H
    D --> E
    D --> F
    D --> G
    D --> J
    H --> J
    H --> I
    B --> K

2.2 Package Structure

faea/
├── __init__.py
├── core/
│   ├── __init__.py
│   ├── orchestrator.py          # Main coordination logic
│   ├── scheduler.py             # Entropy-based task scheduling
│   └── config.py                # Configuration management
├── browser/
│   ├── __init__.py
│   ├── camoufox_manager.py      # Browser lifecycle management
│   ├── profile_generator.py     # BrowserForge integration
│   ├── ghost_cursor.py          # Human-like mouse movements
│   └── challenge_handler.py     # CAPTCHA/challenge solving
├── network/
│   ├── __init__.py
│   ├── curl_client.py           # curl_cffi wrapper
│   ├── proxy_rotator.py         # Mobile proxy management
│   └── session_store.py         # Redis state management
├── utils/
│   ├── __init__.py
│   ├── entropy.py               # Random number generation
│   ├── crypto.py                # Encryption utilities
│   └── metrics.py               # Prometheus metrics
├── models/
│   ├── __init__.py
│   ├── session.py               # SessionState dataclass
│   ├── profile.py               # BrowserForgeProfile
│   └── task.py                  # Task definitions
└── exceptions/
    ├── __init__.py
    └── errors.py                # Custom exceptions

3. Module Specifications

3.1 Core Module: Orchestrator

File: faea/core/orchestrator.py

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional, Callable, Awaitable
import asyncio

class TaskType(Enum):
    """Task type enumeration."""
    AUTHENTICATION = auto()
    EXTRACTION = auto()
    RE_AUTHENTICATION = auto()

@dataclass
class Task:
    """Represents a dispatchable task."""
    id: str
    type: TaskType
    target_url: str
    session_id: Optional[str] = None
    priority: int = 5  # 1-10, lower is higher priority
    retry_count: int = 0
    max_retries: int = 3
    created_at: float = field(default_factory=time.time)

class OrchestratorState(Enum):
    """Orchestrator lifecycle states."""
    IDLE = auto()
    RUNNING = auto()
    PAUSED = auto()
    SHUTTING_DOWN = auto()

class Orchestrator:
    """
    Central coordination of authentication and extraction workflows.
    
    [MUST] Implement graceful shutdown with in-flight task completion.
    [MUST] Enforce rate limits per target domain.
    [SHOULD] Support priority queue for task dispatch.
    """
    
    def __init__(
        self,
        config: "Config",
        browser_pool: "CamoufoxPool",
        curl_pool: "CurlClientPool",
        session_store: "SessionStore",
        scheduler: "EntropyScheduler"
    ):
        self.config = config
        self.browser_pool = browser_pool
        self.curl_pool = curl_pool
        self.session_store = session_store
        self.scheduler = scheduler
        self.state = OrchestratorState.IDLE
        self._task_queue: asyncio.PriorityQueue[Task] = asyncio.PriorityQueue()
        self._active_tasks: dict[str, asyncio.Task] = {}
    
    async def start(self) -> None:
        """
        Start the orchestrator main loop.
        
        Lifecycle:
        1. Initialize resource pools
        2. Start task dispatcher
        3. Start health monitor
        """
        self.state = OrchestratorState.RUNNING
        await asyncio.gather(
            self._dispatch_loop(),
            self._health_monitor_loop()
        )
    
    async def submit_task(self, task: Task) -> str:
        """
        Submit task to queue.
        
        Returns: Task ID for tracking
        """
        await self._task_queue.put((task.priority, task))
        return task.id
    
    async def _dispatch_loop(self) -> None:
        """Main dispatch loop with entropy-based timing."""
        while self.state == OrchestratorState.RUNNING:
            priority, task = await self._task_queue.get()
            
            if task.type == TaskType.AUTHENTICATION:
                asyncio.create_task(self._handle_auth_task(task))
            elif task.type == TaskType.EXTRACTION:
                asyncio.create_task(self._handle_extraction_task(task))
    
    async def _handle_auth_task(self, task: Task) -> None:
        """
        Handle authentication task.
        
        Flow:
        1. Acquire browser from pool
        2. Generate profile if needed
        3. Navigate and authenticate
        4. Extract session state
        5. Store in Redis
        6. Release browser
        """
        browser = await self.browser_pool.acquire()
        try:
            profile = await self._get_or_create_profile(task.session_id)
            await browser.initialize(profile)
            await browser.solve_authentication(task.target_url)
            session_state = await browser.extract_session_state()
            await self.session_store.store(task.session_id, session_state)
        finally:
            await self.browser_pool.release(browser)
    
    async def _handle_extraction_task(self, task: Task) -> None:
        """
        Handle extraction task.
        
        Flow:
        1. Retrieve session state
        2. Validate session TTL
        3. Acquire curl client
        4. Execute request
        5. Handle response
        """
        session_state = await self.session_store.retrieve(task.session_id)
        
        if not session_state or session_state.is_expired():
            # Trigger re-authentication
            await self.submit_task(Task(
                id=f"reauth-{task.session_id}",
                type=TaskType.RE_AUTHENTICATION,
                target_url=task.target_url,
                session_id=task.session_id
            ))
            return
        
        client = await self.curl_pool.acquire(session_state)
        try:
            response = await client.fetch(task.target_url)
            await self._process_response(task, response)
        finally:
            await self.curl_pool.release(client)

3.2 Browser Module: CamoufoxManager

File: faea/browser/camoufox_manager.py

from enum import Enum, auto
from dataclasses import dataclass
from typing import Optional, Any
import asyncio

class BrowserState(Enum):
    """Browser instance lifecycle states."""
    COLD = auto()          # Not initialized
    WARMING = auto()       # Starting up
    READY = auto()         # Ready for navigation
    AUTHENTICATED = auto() # Session established
    EXTRACTING = auto()    # Extracting state
    TERMINATED = auto()    # Shutdown complete
    FAILED = auto()        # Error state

@dataclass
class LaunchOptions:
    """Camoufox launch configuration."""
    headless: bool = True
    viewport_width: int = 1920
    viewport_height: int = 1080
    locale: str = "en-US"
    timezone: str = "America/New_York"
    proxy: Optional[str] = None
    args: list[str] = field(default_factory=list)

class CamoufoxManager:
    """
    Manages Camoufox browser instance lifecycle.
    
    [MUST] Match TLS fingerprint to User-Agent.
    [MUST] Inject BrowserForge profile consistently.
    [MUST] Extract ALL session state (cookies, storage, etc.).
    [SHOULD] Implement 60s timeout for all operations.
    """
    
    STATE_TRANSITIONS = {
        BrowserState.COLD: [BrowserState.WARMING, BrowserState.FAILED],
        BrowserState.WARMING: [BrowserState.READY, BrowserState.FAILED],
        BrowserState.READY: [BrowserState.AUTHENTICATED, BrowserState.FAILED],
        BrowserState.AUTHENTICATED: [BrowserState.EXTRACTING, BrowserState.FAILED],
        BrowserState.EXTRACTING: [BrowserState.TERMINATED, BrowserState.FAILED],
        BrowserState.FAILED: [BrowserState.TERMINATED],
        BrowserState.TERMINATED: [],
    }
    
    def __init__(self, profile: "BrowserForgeProfile"):
        self.profile = profile
        self.state = BrowserState.COLD
        self.context = None
        self.page = None
        self.ghost_cursor = GhostCursorEngine()
        self._state_lock = asyncio.Lock()
    
    async def _transition_state(self, new_state: BrowserState) -> None:
        """Atomic state transition with validation."""
        async with self._state_lock:
            if new_state not in self.STATE_TRANSITIONS[self.state]:
                raise InvalidStateTransitionError(
                    f"Cannot transition from {self.state} to {new_state}"
                )
            self.state = new_state
    
    async def initialize(self) -> None:
        """
        Initialize browser with full fingerprint injection.
        
        Steps:
        1. Build launch arguments from profile
        2. Launch Playwright context
        3. Inject navigator overrides
        4. Inject canvas/WebGL noise
        5. Initialize Ghost Cursor
        """
        await self._transition_state(BrowserState.WARMING)
        
        try:
            launch_options = self._build_launch_options()
            # Use Camoufox-specific launcher
            self.context = await camoufox.async_launch(**launch_options)
            self.page = await self.context.new_page()
            
            # Fingerprint injection sequence
            await self._inject_navigator_overrides()
            await self._inject_canvas_noise()
            await self._inject_webgl_overrides()
            
            await self._transition_state(BrowserState.READY)
        except Exception as e:
            await self._transition_state(BrowserState.FAILED)
            raise BrowserInitializationError(str(e)) from e
    
    def _build_launch_options(self) -> dict[str, Any]:
        """Build Camoufox-compatible launch options from profile."""
        return {
            "headless": True,
            "fingerprint": {
                "screen": self.profile.screen,
                "navigator": {
                    "userAgent": self.profile.user_agent,
                    "platform": self.profile.platform,
                    "hardwareConcurrency": self.profile.hardware_concurrency,
                    "deviceMemory": self.profile.device_memory,
                },
            },
            "proxy": {"server": self.profile.proxy} if self.profile.proxy else None,
            "viewport": {
                "width": self.profile.viewport["width"],
                "height": self.profile.viewport["height"],
            },
            "locale": self.profile.locale,
            "timezoneId": self.profile.timezone,
        }
    
    async def _inject_navigator_overrides(self) -> None:
        """Inject navigator property overrides."""
        script = f"""
        Object.defineProperty(navigator, 'hardwareConcurrency', {{
            get: () => {self.profile.hardware_concurrency}
        }});
        Object.defineProperty(navigator, 'deviceMemory', {{
            get: () => {self.profile.device_memory}
        }});
        Object.defineProperty(navigator, 'maxTouchPoints', {{
            get: () => {self.profile.max_touch_points}
        }});
        """
        await self.page.add_init_script(script)
    
    async def solve_authentication(
        self, 
        target_url: str,
        timeout: float = 60.0
    ) -> None:
        """
        Navigate to target with human-like behavior.
        
        Behavioral Sequence:
        1. Pre-navigation delay (2-7s)
        2. Navigation with networkidle wait
        3. Challenge detection and handling
        4. Post-load reading simulation
        """
        await self._transition_state(BrowserState.AUTHENTICATED)
        
        # Pre-navigation hesitation
        await asyncio.sleep(random.uniform(2.0, 7.0))
        
        # Navigate
        await self.page.goto(target_url, wait_until="networkidle", timeout=timeout * 1000)
        
        # Detect and handle challenges
        challenge_handler = ChallengeHandler(self.page, self.ghost_cursor)
        await challenge_handler.detect_and_handle()
        
        # Simulate reading behavior
        await self._simulate_reading_behavior()
    
    async def _simulate_reading_behavior(self) -> None:
        """
        Generate F-pattern scroll behavior.
        
        Based on Nielsen Norman Group eye-tracking research:
        - Horizontal movement across top
        - Shorter horizontal scan lower
        - Vertical scan down left side
        """
        viewport_height = self.profile.viewport["height"]
        page_height = await self.page.evaluate("document.body.scrollHeight")
        
        # Generate F-pattern scroll points
        scroll_points = self._generate_f_pattern_scroll(page_height, viewport_height)
        
        for point in scroll_points:
            await self.page.evaluate(f"window.scrollTo(0, {point})")
            await self.ghost_cursor.random_micro_movement(self.page)
            # Log-normal delay simulates reading time
            await asyncio.sleep(random.lognormvariate(0.8, 0.3))
    
    def _generate_f_pattern_scroll(
        self, 
        page_height: int, 
        viewport_height: int
    ) -> list[int]:
        """Generate scroll positions for F-pattern simulation."""
        points = [0]  # Start at top
        
        # Add scroll points with decreasing attention
        current = 0
        while current < page_height - viewport_height:
            # Scroll increment decreases over time (attention decay)
            increment = random.randint(100, 300) * (1 - current / page_height * 0.5)
            current += int(increment)
            points.append(min(current, page_height - viewport_height))
        
        return points
    
    async def extract_session_state(self) -> "SessionState":
        """
        Extract all session artifacts.
        
        [MUST] Capture:
        - All cookies (including HttpOnly)
        - localStorage
        - sessionStorage
        - IndexedDB keys (if applicable)
        
        [MUST] Verify cf_clearance presence for Cloudflare targets.
        """
        await self._transition_state(BrowserState.EXTRACTING)
        
        # Get cookies
        cookies = await self.context.cookies()
        
        # Get storage
        local_storage = await self.page.evaluate(
            "() => Object.fromEntries(Object.entries(localStorage))"
        )
        session_storage = await self.page.evaluate(
            "() => Object.fromEntries(Object.entries(sessionStorage))"
        )
        
        # Extract critical tokens
        cf_clearance = next(
            (c for c in cookies if c["name"] == "cf_clearance"), 
            None
        )
        
        return SessionState(
            cookies=cookies,
            local_storage=local_storage,
            session_storage=session_storage,
            cf_clearance=cf_clearance,
            user_agent=self.profile.user_agent,
            tls_fingerprint=self.profile.tls_fingerprint,
            timestamp=time.time()
        )
    
    async def terminate(self) -> None:
        """Graceful shutdown."""
        await self._transition_state(BrowserState.TERMINATED)
        if self.context:
            await self.context.close()

3.3 Browser Module: Ghost Cursor Engine

File: faea/browser/ghost_cursor.py

import math
import random
from typing import Tuple
from dataclasses import dataclass

@dataclass
class Point:
    """2D point representation."""
    x: float
    y: float
    
    def distance_to(self, other: "Point") -> float:
        return math.sqrt((self.x - other.x) ** 2 + (self.y - other.y) ** 2)

class GhostCursorEngine:
    """
    Human-like mouse movement generator using composite Bezier curves.
    
    Based on motor control research:
    - Meyer et al. (1988): Submovement composition
    - Fitts's Law: Movement time = a + b * log2(D/W + 1)
    
    [MUST] Generate non-deterministic paths.
    [MUST] Include micro-movements during "reading".
    [SHOULD] Vary velocity based on distance (Fitts's Law).
    """
    
    # Fitts's Law constants (empirically derived)
    FITTS_A = 0.1  # Base time
    FITTS_B = 0.15  # Scaling factor
    
    # Velocity bounds (pixels/second)
    MIN_VELOCITY = 200
    MAX_VELOCITY = 400
    
    def __init__(self, seed: Optional[int] = None):
        self.rng = random.Random(seed)
    
    async def move_to(
        self, 
        page: "Page", 
        target: Point,
        origin: Optional[Point] = None
    ) -> None:
        """
        Move cursor to target with human-like trajectory.
        
        Algorithm:
        1. Calculate distance for submovement count
        2. Generate waypoints with Gaussian noise
        3. Execute submovements with Bezier curves
        4. Apply Fitts's Law timing
        """
        if origin is None:
            origin = await self._get_cursor_position(page)
        
        distance = origin.distance_to(target)
        
        # Determine submovement count (1-5 based on distance)
        num_submovements = min(5, max(1, int(distance / 300)))
        
        # Generate intermediate waypoints
        waypoints = self._generate_waypoints(origin, target, num_submovements)
        
        # Execute each submovement
        for i in range(len(waypoints) - 1):
            await self._execute_submovement(
                page, 
                waypoints[i], 
                waypoints[i + 1]
            )
    
    def _generate_waypoints(
        self, 
        start: Point, 
        end: Point, 
        count: int
    ) -> list[Point]:
        """
        Generate waypoints with perpendicular Gaussian noise.
        
        Simulates motor control imprecision and overshooting.
        """
        waypoints = [start]
        
        for i in range(1, count):
            t = i / count
            
            # Linear interpolation
            x = start.x + t * (end.x - start.x)
            y = start.y + t * (end.y - start.y)
            
            # Add perpendicular noise (overshooting behavior)
            angle = math.atan2(end.y - start.y, end.x - start.x)
            perp_angle = angle + math.pi / 2
            noise_magnitude = self.rng.gauss(0, 10)
            
            x += noise_magnitude * math.cos(perp_angle)
            y += noise_magnitude * math.sin(perp_angle)
            
            waypoints.append(Point(x, y))
        
        waypoints.append(end)
        return waypoints
    
    async def _execute_submovement(
        self, 
        page: "Page", 
        start: Point, 
        end: Point
    ) -> None:
        """
        Execute single submovement with cubic Bezier curve.
        
        Timing derived from Fitts's Law.
        """
        distance = start.distance_to(end)
        
        # Generate control points for Bezier curve
        control1, control2 = self._generate_bezier_controls(start, end)
        
        # Calculate movement time from Fitts's Law
        target_width = 10  # Assumed target width
        movement_time = self.FITTS_A + self.FITTS_B * math.log2(distance / target_width + 1)
        
        # Sample curve and execute
        steps = max(10, int(distance / 5))
        step_delay = movement_time / steps
        
        for i in range(steps + 1):
            t = i / steps
            point = self._bezier_point(t, start, control1, control2, end)
            await page.mouse.move(point.x, point.y)
            await asyncio.sleep(step_delay)
    
    def _generate_bezier_controls(
        self, 
        start: Point, 
        end: Point
    ) -> Tuple[Point, Point]:
        """Generate control points with randomized curvature."""
        # Midpoint
        mid_x = (start.x + end.x) / 2
        mid_y = (start.y + end.y) / 2
        
        # Offset perpendicular to line
        dx = end.x - start.x
        dy = end.y - start.y
        length = math.sqrt(dx * dx + dy * dy)
        
        if length == 0:
            return Point(mid_x, mid_y), Point(mid_x, mid_y)
        
        # Perpendicular unit vector
        perp_x = -dy / length
        perp_y = dx / length
        
        # Random offset magnitude (0-30% of distance)
        offset1 = self.rng.uniform(-0.3, 0.3) * length
        offset2 = self.rng.uniform(-0.3, 0.3) * length
        
        control1 = Point(
            start.x + 0.3 * dx + offset1 * perp_x,
            start.y + 0.3 * dy + offset1 * perp_y
        )
        control2 = Point(
            start.x + 0.7 * dx + offset2 * perp_x,
            start.y + 0.7 * dy + offset2 * perp_y
        )
        
        return control1, control2
    
    def _bezier_point(
        self, 
        t: float, 
        p0: Point, 
        p1: Point, 
        p2: Point, 
        p3: Point
    ) -> Point:
        """Evaluate cubic Bezier curve at parameter t."""
        # B(t) = (1-t)³P₀ + 3(1-t)²tP₁ + 3(1-t)t²P₂ + t³P₃
        u = 1 - t
        tt = t * t
        uu = u * u
        uuu = uu * u
        ttt = tt * t
        
        x = uuu * p0.x + 3 * uu * t * p1.x + 3 * u * tt * p2.x + ttt * p3.x
        y = uuu * p0.y + 3 * uu * t * p1.y + 3 * u * tt * p2.y + ttt * p3.y
        
        return Point(x, y)
    
    async def random_micro_movement(self, page: "Page") -> None:
        """
        Small drift movements simulating hand tremor/fidgeting.
        
        Used during "reading" pauses.
        """
        current = await self._get_cursor_position(page)
        
        # Small Gaussian drift
        drift = Point(
            current.x + self.rng.gauss(0, 5),
            current.y + self.rng.gauss(0, 5)
        )
        
        # Slow movement (low velocity = inattention)
        await page.mouse.move(drift.x, drift.y)
        await asyncio.sleep(self.rng.uniform(0.05, 0.15))
    
    async def _get_cursor_position(self, page: "Page") -> Point:
        """Get current cursor position from page."""
        pos = await page.evaluate("""
            () => ({x: window.mouseX || 0, y: window.mouseY || 0})
        """)
        return Point(pos["x"], pos["y"])

3.4 Network Module: Session Store

File: faea/network/session_store.py

from dataclasses import dataclass
from typing import Optional
import hashlib
import hmac
import msgpack
import time

@dataclass  
class SessionState:
    """
    Complete session state for handover between browser and curl.
    
    [MUST] Include all cookies (especially cf_clearance).
    [MUST] Include TLS fingerprint for matching.
    """
    cookies: list[dict]
    local_storage: dict[str, str]
    session_storage: dict[str, str]
    cf_clearance: Optional[dict]
    user_agent: str
    tls_fingerprint: str  # e.g., "chrome120"
    timestamp: float
    
    def is_expired(self, ttl_buffer: int = 300) -> bool:
        """Check if session is expired or near expiration."""
        if not self.cf_clearance:
            return True
        # cf_clearance typically expires in 1800s
        expires = self.cf_clearance.get("expires", 0)
        return time.time() > (expires - ttl_buffer)
    
    def serialize(self) -> bytes:
        """Serialize with MessagePack + HMAC."""
        payload = msgpack.packb({
            "cookies": self.cookies,
            "local_storage": self.local_storage,
            "session_storage": self.session_storage,
            "cf_clearance": self.cf_clearance,
            "user_agent": self.user_agent,
            "tls_fingerprint": self.tls_fingerprint,
            "timestamp": self.timestamp,
        })
        return payload
    
    @classmethod
    def deserialize(cls, data: bytes) -> "SessionState":
        """Deserialize from MessagePack."""
        obj = msgpack.unpackb(data, raw=False)
        return cls(**obj)

class SessionStore:
    """
    Redis-backed encrypted session storage.
    
    [MUST] Encrypt at rest with Fernet.
    [MUST] Set TTL shorter than cookie expiration.
    [SHOULD] Implement connection pooling.
    """
    
    def __init__(self, redis_client, encryption_key: bytes):
        from cryptography.fernet import Fernet
        self.redis = redis_client
        self.cipher = Fernet(encryption_key)
    
    async def store(self, session_id: str, state: SessionState) -> None:
        """Encrypt and store session."""
        plaintext = state.serialize()
        ciphertext = self.cipher.encrypt(plaintext)
        
        # TTL = 25 minutes (before 30-min cookie expiration)
        await self.redis.setex(
            name=f"session:{session_id}",
            time=1500,
            value=ciphertext
        )
    
    async def retrieve(self, session_id: str) -> Optional[SessionState]:
        """Retrieve and decrypt session."""
        ciphertext = await self.redis.get(f"session:{session_id}")
        if not ciphertext:
            return None
        
        plaintext = self.cipher.decrypt(ciphertext)
        return SessionState.deserialize(plaintext)
    
    async def delete(self, session_id: str) -> None:
        """Delete session."""
        await self.redis.delete(f"session:{session_id}")

3.5 Network Module: curl_cffi Client

File: faea/network/curl_client.py

from curl_cffi.requests import AsyncSession
from typing import Optional, Any

class CurlClient:
    """
    curl_cffi-based HTTP client with TLS fingerprint matching.
    
    [MUST] Use same TLS fingerprint as browser session.
    [MUST] Include all cookies from session state.
    [MUST] Build consistent sec-ch-ua headers.
    """
    
    def __init__(self, session_state: "SessionState"):
        self.session_state = session_state
        self.session: Optional[AsyncSession] = None
        self.headers: dict[str, str] = {}
    
    async def initialize(self) -> None:
        """Configure client to match browser fingerprint."""
        # Create session with matching TLS impersonation
        self.session = AsyncSession(
            impersonate=self.session_state.tls_fingerprint
        )
        
        # Inject all cookies
        for cookie in self.session_state.cookies:
            self.session.cookies.set(
                name=cookie["name"],
                value=cookie["value"],
                domain=cookie["domain"],
                path=cookie.get("path", "/"),
                secure=cookie.get("secure", False),
            )
        
        # Build header profile
        self.headers = self._build_headers()
    
    def _build_headers(self) -> dict[str, str]:
        """Build headers matching browser profile."""
        return {
            "User-Agent": self.session_state.user_agent,
            "Accept": "application/json, text/plain, */*",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
            "Sec-Fetch-Dest": "empty",
            "Sec-Fetch-Mode": "cors",
            "Sec-Fetch-Site": "same-origin",
            "sec-ch-ua": self._derive_sec_ch_ua(),
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": self._extract_platform(),
        }
    
    def _derive_sec_ch_ua(self) -> str:
        """
        Derive sec-ch-ua from User-Agent.
        
        Chrome/120.0.6099.109 → "Chromium";v="120", "Google Chrome";v="120"
        """
        import re
        match = re.search(r"Chrome/(\d+)", self.session_state.user_agent)
        if match:
            version = match.group(1)
            return f'"Not_A Brand";v="8", "Chromium";v="{version}", "Google Chrome";v="{version}"'
        return '"Not_A Brand";v="8"'
    
    def _extract_platform(self) -> str:
        """Extract platform from User-Agent."""
        ua = self.session_state.user_agent
        if "Windows" in ua:
            return '"Windows"'
        elif "Mac" in ua:
            return '"macOS"'
        elif "Linux" in ua:
            return '"Linux"'
        return '"Unknown"'
    
    async def fetch(
        self, 
        url: str, 
        method: str = "GET",
        **kwargs
    ) -> "Response":
        """
        Execute request with fingerprint matching.
        
        Includes pre-request jitter for entropy.
        """
        import asyncio
        import random
        
        # Pre-request jitter
        await asyncio.sleep(random.lognormvariate(0.2, 0.1))
        
        response = await self.session.request(
            method=method,
            url=url,
            headers=self.headers,
            **kwargs
        )
        
        # Check for challenge response
        if "cf-mitigated" in response.headers:
            raise SessionInvalidatedError("Cloudflare challenge detected")
        
        return response
    
    async def close(self) -> None:
        """Close session."""
        if self.session:
            await self.session.close()

4. Data Models & Interfaces

4.1 Core Data Classes

# faea/models/profile.py

from dataclasses import dataclass, field
from typing import Dict, Any, Optional

@dataclass
class BrowserForgeProfile:
    """
    Complete browser fingerprint profile.
    
    All fields must exhibit internal consistency.
    """
    # User-Agent and related
    user_agent: str
    sec_ch_ua: str
    platform: str
    
    # Screen configuration
    screen: Dict[str, int] = field(default_factory=lambda: {
        "width": 1920,
        "height": 1080,
        "colorDepth": 24,
        "pixelRatio": 1.0,
    })
    viewport: Dict[str, int] = field(default_factory=lambda: {
        "width": 1920,
        "height": 969,  # Screen height - browser chrome
    })
    
    # Hardware profile
    hardware_concurrency: int = 8
    device_memory: int = 8
    max_touch_points: int = 0
    
    # WebGL spoofing
    webgl_vendor: str = "Google Inc. (Intel)"
    webgl_renderer: str = "ANGLE (Intel, Intel(R) UHD Graphics 620)"
    
    # Canvas noise seed
    canvas_noise_seed: int = 0
    
    # Network identity
    tls_fingerprint: str = "chrome120"
    proxy: Optional[str] = None
    
    # Locale/timezone
    locale: str = "en-US"
    timezone: str = "America/New_York"
    
    def validate(self) -> "ValidationResult":
        """Validate internal consistency."""
        validator = ProfileValidator()
        return validator.validate(self)

@dataclass
class ValidationResult:
    """Profile validation result."""
    passed: bool
    failures: list["ValidationCheck"] = field(default_factory=list)

@dataclass
class ValidationCheck:
    """Single validation check result."""
    name: str
    passed: bool
    details: str

4.2 Exception Hierarchy

# faea/exceptions/errors.py

class FAEABaseError(Exception):
    """Base exception for all FAEA errors."""
    pass

class BrowserError(FAEABaseError):
    """Browser subsystem errors."""
    pass

class BrowserInitializationError(BrowserError):
    """Failed to initialize browser."""
    pass

class InvalidStateTransitionError(BrowserError):
    """Invalid state machine transition."""
    pass

class ChallengeError(BrowserError):
    """Challenge detection/solving failed."""
    pass

class NetworkError(FAEABaseError):
    """Network subsystem errors."""
    pass

class SessionNotFoundError(NetworkError):
    """Session not found in store."""
    pass

class SessionInvalidatedError(NetworkError):
    """Session was invalidated by target."""
    pass

class ProxyError(NetworkError):
    """Proxy-related errors."""
    pass

class ProxyExhaustionError(ProxyError):
    """All proxies exhausted or in cooldown."""
    pass

class ConfigurationError(FAEABaseError):
    """Configuration errors."""
    pass

5. API Contracts

5.1 Internal Module Interfaces

# Type definitions for module boundaries

from abc import ABC, abstractmethod
from typing import Protocol

class BrowserPoolProtocol(Protocol):
    """Interface for browser pool management."""
    
    async def acquire(self) -> "CamoufoxManager":
        """Acquire browser from pool."""
        ...
    
    async def release(self, browser: "CamoufoxManager") -> None:
        """Release browser back to pool."""
        ...
    
    @property
    def available(self) -> int:
        """Number of available browsers."""
        ...

class SessionStoreProtocol(Protocol):
    """Interface for session storage."""
    
    async def store(self, session_id: str, state: "SessionState") -> None:
        ...
    
    async def retrieve(self, session_id: str) -> Optional["SessionState"]:
        ...
    
    async def delete(self, session_id: str) -> None:
        ...

class SchedulerProtocol(Protocol):
    """Interface for entropy scheduler."""
    
    def next_execution_time(self) -> float:
        """Calculate next execution timestamp."""
        ...
    
    async def dispatch_with_entropy(self, task: Callable) -> None:
        """Dispatch task at entropic time."""
        ...

6. Database Design

6.1 Redis Key Schema

# Session State
session:{session_id}                 → Encrypted SessionState (binary)
  TTL: 1500 seconds

# Proxy Usage Tracking  
proxy:usage:{proxy_id}               → JSON {session_id, last_used_ts}
  TTL: 600 seconds

# Profile Cache
profile:{profile_hash}               → Serialized BrowserForgeProfile
  TTL: 86400 seconds (1 day)

# Rate Limiting
ratelimit:{domain}:{minute}          → Counter
  TTL: 120 seconds

# Metrics Aggregation
metrics:auth:success                  → Counter
metrics:auth:failure:{reason}         → Counter
metrics:extraction:success            → Counter
metrics:extraction:challenge          → Counter

6.2 Redis Configuration

# redis.conf highlights
maxmemory 4gb
maxmemory-policy allkeys-lru
appendonly yes
appendfsync everysec

# Cluster mode (production)
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000

7. Core Algorithms

7.1 Entropy Scheduler

# faea/core/scheduler.py

import random
import time
from dataclasses import dataclass

@dataclass
class SchedulerConfig:
    """Scheduler configuration."""
    base_interval: float = 30.0  # seconds
    drift_sigma: float = 5.0     # Gaussian noise σ
    min_interval: float = 5.0    # Minimum delay
    max_interval: float = 120.0  # Maximum delay

class EntropyScheduler:
    """
    Entropy-based task scheduling to defeat temporal analysis.
    
    Model:
    T_actual = T_base + N(0, σ²) + φ(t)
    
    Where φ(t) is a slowly-drifting phase offset.
    """
    
    def __init__(self, config: SchedulerConfig = None):
        self.config = config or SchedulerConfig()
        self.phase_offset = 0.0
    
    def next_execution_time(self) -> float:
        """Calculate next execution time with drift."""
        # Gaussian noise
        noise = random.gauss(0, self.config.drift_sigma)
        
        # Phase shift accumulation (low-frequency drift)
        self.phase_offset += random.uniform(-0.5, 0.5)
        
        # Calculate interval
        interval = self.config.base_interval + noise + self.phase_offset
        
        # Clamp to bounds
        interval = max(self.config.min_interval, 
                       min(self.config.max_interval, interval))
        
        return time.time() + interval
    
    async def dispatch_with_entropy(self, task: Callable) -> None:
        """Dispatch task at entropic time."""
        import asyncio
        
        target_time = self.next_execution_time()
        delay = target_time - time.time()
        
        if delay > 0:
            await asyncio.sleep(delay)
        
        # Pre-execution jitter (human hesitation)
        await asyncio.sleep(random.uniform(0.1, 0.8))
        
        await task()

7.2 Proxy Rotation Algorithm

# faea/network/proxy_rotator.py

from dataclasses import dataclass, field
import time
from typing import Dict, List, Optional

@dataclass
class ProxyUsage:
    """Track proxy usage."""
    proxy: str
    session_id: str
    last_used: float
    request_count: int = 0

class ProxyRotator:
    """
    Sticky session proxy rotation with cooldown.
    
    Rules:
    1. Same session_id → same proxy (until cooldown)
    2. After cooldown → LRU selection
    3. Blacklisted proxies excluded
    """
    
    def __init__(
        self, 
        proxy_pool: List[str],
        cooldown_period: int = 300  # 5 minutes
    ):
        self.proxy_pool = proxy_pool
        self.cooldown_period = cooldown_period
        self.usage: Dict[str, ProxyUsage] = {}
        self.blacklist: set[str] = set()
    
    def select_proxy(self, session_id: str) -> str:
        """Select proxy with sticky session support."""
        # Check for existing sticky session
        if session_id in self.usage:
            usage = self.usage[session_id]
            if time.time() - usage.last_used < self.cooldown_period:
                usage.last_used = time.time()
                usage.request_count += 1
                return usage.proxy
        
        # Select least-recently-used available proxy
        proxy = self._select_lru_proxy()
        
        self.usage[session_id] = ProxyUsage(
            proxy=proxy,
            session_id=session_id,
            last_used=time.time()
        )
        
        return proxy
    
    def _select_lru_proxy(self) -> str:
        """Select least-recently-used non-blacklisted proxy."""
        available = [
            p for p in self.proxy_pool 
            if p not in self.blacklist and self._is_cooled_down(p)
        ]
        
        if not available:
            raise ProxyExhaustionError("No proxies available")
        
        # Sort by last use time (ascending)
        return min(available, key=lambda p: self._last_use_time(p))
    
    def _is_cooled_down(self, proxy: str) -> bool:
        """Check if proxy completed cooldown."""
        for usage in self.usage.values():
            if usage.proxy == proxy:
                return time.time() - usage.last_used > self.cooldown_period
        return True
    
    def _last_use_time(self, proxy: str) -> float:
        """Get last use time for proxy."""
        for usage in self.usage.values():
            if usage.proxy == proxy:
                return usage.last_used
        return 0.0
    
    def blacklist_proxy(self, proxy: str) -> None:
        """Add proxy to blacklist."""
        self.blacklist.add(proxy)
    
    def release_session(self, session_id: str) -> None:
        """Release sticky session."""
        if session_id in self.usage:
            del self.usage[session_id]

8. Configuration Management

8.1 Configuration Schema

# faea/core/config.py

from dataclasses import dataclass, field
from typing import List, Optional
import os

@dataclass
class RedisConfig:
    """Redis connection configuration."""
    host: str = "localhost"
    port: int = 6379
    db: int = 0
    password: Optional[str] = None
    
    @property
    def url(self) -> str:
        auth = f":{self.password}@" if self.password else ""
        return f"redis://{auth}{self.host}:{self.port}/{self.db}"

@dataclass
class ProxyConfig:
    """Proxy pool configuration."""
    provider: str = "oxylabs"  # oxylabs, smartproxy, etc.
    api_key: str = ""
    country_code: str = "us"
    pool_size: int = 10
    cooldown_seconds: int = 300

@dataclass
class BrowserConfig:
    """Browser pool configuration."""
    pool_size: int = 5
    headless: bool = True
    timeout_seconds: int = 60
    shm_size: str = "2gb"

@dataclass
class SchedulerConfig:
    """Scheduler configuration."""
    base_interval: float = 30.0
    drift_sigma: float = 5.0
    min_interval: float = 5.0
    max_interval: float = 120.0

@dataclass
class Config:
    """Root configuration."""
    redis: RedisConfig = field(default_factory=RedisConfig)
    proxy: ProxyConfig = field(default_factory=ProxyConfig)
    browser: BrowserConfig = field(default_factory=BrowserConfig)
    scheduler: SchedulerConfig = field(default_factory=SchedulerConfig)
    
    encryption_key: str = ""  # Fernet key
    log_level: str = "INFO"
    
    @classmethod
    def from_env(cls) -> "Config":
        """Load configuration from environment variables."""
        return cls(
            redis=RedisConfig(
                host=os.getenv("REDIS_HOST", "localhost"),
                port=int(os.getenv("REDIS_PORT", "6379")),
                password=os.getenv("REDIS_PASSWORD"),
            ),
            proxy=ProxyConfig(
                api_key=os.getenv("PROXY_API_KEY", ""),
                country_code=os.getenv("PROXY_COUNTRY", "us"),
            ),
            encryption_key=os.getenv("ENCRYPTION_KEY", ""),
            log_level=os.getenv("LOG_LEVEL", "INFO"),
        )

8.2 Environment Variables

Variable Description Required Default
REDIS_HOST Redis server hostname No localhost
REDIS_PORT Redis server port No 6379
REDIS_PASSWORD Redis password No None
PROXY_API_KEY Mobile proxy API key Yes -
PROXY_COUNTRY Proxy country code No us
ENCRYPTION_KEY Fernet encryption key Yes -
BROWSERFORGE_SEED Profile generation seed No Random
LOG_LEVEL Logging verbosity No INFO

9. Error Handling Strategy

9.1 Retry Policies

from dataclasses import dataclass
from typing import Type, Tuple
import asyncio

@dataclass
class RetryPolicy:
    """Retry configuration."""
    max_attempts: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    retryable_exceptions: Tuple[Type[Exception], ...] = (
        NetworkError,
        SessionInvalidatedError,
    )

async def with_retry(
    func: Callable,
    policy: RetryPolicy,
    *args,
    **kwargs
):
    """Execute function with exponential backoff retry."""
    last_exception = None
    
    for attempt in range(policy.max_attempts):
        try:
            return await func(*args, **kwargs)
        except policy.retryable_exceptions as e:
            last_exception = e
            
            if attempt == policy.max_attempts - 1:
                raise
            
            delay = min(
                policy.base_delay * (policy.exponential_base ** attempt),
                policy.max_delay
            )
            # Add jitter
            delay += random.uniform(0, delay * 0.1)
            
            await asyncio.sleep(delay)
    
    raise last_exception

9.2 Circuit Breaker

from enum import Enum, auto
import time

class CircuitState(Enum):
    CLOSED = auto()   # Normal operation
    OPEN = auto()     # Failing, reject requests
    HALF_OPEN = auto() # Testing recovery

class CircuitBreaker:
    """
    Circuit breaker for external service protection.
    
    Prevents cascading failures by cutting off failing services.
    """
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        half_open_requests: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_requests = half_open_requests
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0.0
        self.half_open_successes = 0
    
    def can_execute(self) -> bool:
        """Check if request can proceed."""
        if self.state == CircuitState.CLOSED:
            return True
        
        if self.state == CircuitState.OPEN:
            # Check if recovery timeout elapsed
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                return True
            return False
        
        # HALF_OPEN: allow limited requests
        return True
    
    def record_success(self) -> None:
        """Record successful execution."""
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_successes += 1
            if self.half_open_successes >= self.half_open_requests:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
        else:
            self.failure_count = 0
    
    def record_failure(self) -> None:
        """Record failed execution."""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.half_open_successes = 0

10. Testing Strategy

10.1 Test Categories

Category Scope Tools Coverage Target
Unit Individual classes/functions pytest, unittest.mock 80%
Integration Module interactions pytest, testcontainers 70%
Contract API interfaces pytest, hypothesis 90%
E2E Full workflow pytest, Playwright Critical paths

10.2 Unit Test Examples

# tests/unit/test_ghost_cursor.py

import pytest
from unittest.mock import AsyncMock, MagicMock
from faea.browser.ghost_cursor import GhostCursorEngine, Point

class TestGhostCursorEngine:
    """Unit tests for Ghost Cursor."""
    
    def test_point_distance(self):
        """Test distance calculation."""
        p1 = Point(0, 0)
        p2 = Point(3, 4)
        assert p1.distance_to(p2) == 5.0
    
    def test_waypoint_generation(self):
        """Test waypoint generation includes start and end."""
        engine = GhostCursorEngine(seed=42)
        start = Point(0, 0)
        end = Point(100, 100)
        
        waypoints = engine._generate_waypoints(start, end, 3)
        
        assert len(waypoints) == 4  # start + 2 intermediate + end
        assert waypoints[0] == start
        assert waypoints[-1] == end
    
    def test_bezier_curve_endpoints(self):
        """Test Bezier curve passes through endpoints."""
        engine = GhostCursorEngine(seed=42)
        p0 = Point(0, 0)
        p1 = Point(25, 50)
        p2 = Point(75, 50)
        p3 = Point(100, 100)
        
        # t=0 should give p0
        result_0 = engine._bezier_point(0, p0, p1, p2, p3)
        assert abs(result_0.x - p0.x) < 0.001
        assert abs(result_0.y - p0.y) < 0.001
        
        # t=1 should give p3
        result_1 = engine._bezier_point(1, p0, p1, p2, p3)
        assert abs(result_1.x - p3.x) < 0.001
        assert abs(result_1.y - p3.y) < 0.001
    
    @pytest.mark.asyncio
    async def test_move_to_generates_submovements(self):
        """Test move_to generates appropriate submovements."""
        engine = GhostCursorEngine(seed=42)
        page = AsyncMock()
        page.evaluate = AsyncMock(return_value={"x": 0, "y": 0})
        page.mouse.move = AsyncMock()
        
        await engine.move_to(page, Point(500, 500))
        
        # Should have multiple move calls
        assert page.mouse.move.call_count > 10

10.3 Integration Test Examples

# tests/integration/test_session_store.py

import pytest
from testcontainers.redis import RedisContainer
from faea.network.session_store import SessionStore, SessionState
from cryptography.fernet import Fernet

@pytest.fixture(scope="module")
def redis_container():
    """Start Redis container for testing."""
    with RedisContainer() as redis:
        yield redis

@pytest.fixture
def session_store(redis_container):
    """Create session store with test Redis."""
    import redis as r
    client = r.Redis(
        host=redis_container.get_container_host_ip(),
        port=redis_container.get_exposed_port(6379)
    )
    key = Fernet.generate_key()
    return SessionStore(client, key)

class TestSessionStore:
    """Integration tests for SessionStore."""
    
    @pytest.mark.asyncio
    async def test_store_and_retrieve(self, session_store):
        """Test round-trip store and retrieve."""
        state = SessionState(
            cookies=[{"name": "test", "value": "123"}],
            local_storage={"key": "value"},
            session_storage={},
            cf_clearance=None,
            user_agent="Test/1.0",
            tls_fingerprint="chrome120",
            timestamp=1234567890.0
        )
        
        await session_store.store("test-session", state)
        retrieved = await session_store.retrieve("test-session")
        
        assert retrieved is not None
        assert retrieved.cookies == state.cookies
        assert retrieved.user_agent == state.user_agent
    
    @pytest.mark.asyncio
    async def test_retrieve_nonexistent(self, session_store):
        """Test retrieving nonexistent session."""
        result = await session_store.retrieve("nonexistent")
        assert result is None

10.4 Test Execution Commands

# Run all unit tests
pytest tests/unit -v --cov=faea --cov-report=html

# Run integration tests (requires Docker)
pytest tests/integration -v --tb=short

# Run specific test file
pytest tests/unit/test_ghost_cursor.py -v

# Run with coverage threshold
pytest tests/unit --cov=faea --cov-fail-under=80

11. Deployment Specifications

11.1 Docker Images

Camoufox Worker Image:

# docker/camoufox.Dockerfile
FROM python:3.11-slim

# Install browser dependencies
RUN apt-get update && apt-get install -y \
    wget gnupg ca-certificates \
    fonts-liberation libasound2 libatk-bridge2.0-0 \
    libatk1.0-0 libcups2 libdbus-1-3 libgdk-pixbuf2.0-0 \
    libnspr4 libnss3 libx11-xcb1 libxcomposite1 \
    libxdamage1 libxrandr2 xdg-utils tini \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY requirements-camoufox.txt .
RUN pip install --no-cache-dir -r requirements-camoufox.txt

# Install browsers
RUN playwright install chromium
RUN pip install camoufox

COPY faea/ ./faea/
COPY workers/camoufox_worker.py .

ENTRYPOINT ["/usr/bin/tini", "--"]
CMD ["python", "camoufox_worker.py"]

curl Client Image:

# docker/curl.Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements-curl.txt .
RUN pip install --no-cache-dir -r requirements-curl.txt

COPY faea/ ./faea/
COPY workers/curl_worker.py .

CMD ["python", "curl_worker.py"]

11.2 Docker Compose

# docker-compose.yml
version: "3.8"

services:
  orchestrator:
    build:
      context: .
      dockerfile: docker/orchestrator.Dockerfile
    environment:
      - REDIS_URL=redis://redis:6379
      - PROXY_API_KEY=${PROXY_API_KEY}
      - ENCRYPTION_KEY=${ENCRYPTION_KEY}
    depends_on:
      - redis
    deploy:
      replicas: 1

  camoufox-pool:
    build:
      context: .
      dockerfile: docker/camoufox.Dockerfile
    environment:
      - REDIS_URL=redis://redis:6379
    shm_size: 2gb
    deploy:
      replicas: 5
      resources:
        limits:
          cpus: "2"
          memory: 2G
    volumes:
      - /dev/shm:/dev/shm

  curl-pool:
    build:
      context: .
      dockerfile: docker/curl.Dockerfile
    environment:
      - REDIS_URL=redis://redis:6379
    deploy:
      replicas: 20
      resources:
        limits:
          cpus: "0.5"
          memory: 512M

  redis:
    image: redis:7-alpine
    command: redis-server --maxmemory 4gb --maxmemory-policy allkeys-lru
    volumes:
      - redis-data:/data
    ports:
      - "6379:6379"

volumes:
  redis-data:

12. Performance Requirements

12.1 SLAs

Metric Target Critical Threshold
Authentication time < 15s < 30s
Session TTL 25-35 min > 10 min
Extraction RPS (per session) 2-5 RPS > 1 RPS
Challenge rate < 10% < 30%
Auth success rate > 90% > 70%

12.2 Resource Budgets

Component Memory CPU Network
Camoufox instance 2 GB 2 cores 10 Mbps
curl client 512 MB 0.5 cores 10 Mbps
Redis 4 GB 2 cores -
Orchestrator 1 GB 1 core -

13. Security Implementation

13.1 Encryption Standards

  • At Rest: Fernet (AES-128-CBC with HMAC)
  • In Transit: TLS 1.3 (Redis, external APIs)
  • Key Management: Environment variables, secrets manager in production

13.2 Access Control

# Rate limiting per domain
RATE_LIMITS = {
    "default": 60,      # 60 req/min
    "api.target.com": 30,  # More conservative
}

# Resource limits per container
RESOURCE_LIMITS = {
    "max_concurrent_browsers": 5,
    "max_concurrent_extractors": 20,
}

14. Appendices

14.1 Glossary

Term Definition
cf_clearance Cloudflare session cookie indicating passed challenge
JA3 TLS fingerprinting method using client hello
Turnstile Cloudflare's CAPTCHA alternative
CGNAT Carrier-grade NAT (shared mobile IPs)
Fernet Symmetric encryption scheme

14.2 References

  1. ADD v0.1 - Architecture Definition Document
  2. Camoufox Documentation - https://camoufox.com
  3. curl_cffi Documentation - https://curl-cffi.readthedocs.io
  4. BrowserForge - Browser fingerprint generation

14.3 Document History

Version Date Author Changes
0.1 2025-12-22 System Architect Initial draft from ADD v0.1

End of Technical Design Document