# Technical Design Document (TDD) ## High-Fidelity Autonomous Extraction Agent (FAEA) **Document Version:** 0.1 **Based On:** ADD v0.1 **Classification:** Technical Implementation Blueprint **Author:** System Architect **Date:** December 22, 2025 --- ## Table of Contents 1. [Overview](#1-overview) 2. [System Architecture](#2-system-architecture) 3. [Module Specifications](#3-module-specifications) 4. [Data Models & Interfaces](#4-data-models--interfaces) 5. [API Contracts](#5-api-contracts) 6. [Database Design](#6-database-design) 7. [Core Algorithms](#7-core-algorithms) 8. [Configuration Management](#8-configuration-management) 9. [Error Handling Strategy](#9-error-handling-strategy) 10. [Testing Strategy](#10-testing-strategy) 11. [Deployment Specifications](#11-deployment-specifications) 12. [Performance Requirements](#12-performance-requirements) 13. [Security Implementation](#13-security-implementation) 14. [Appendices](#14-appendices) --- ## 1. Overview ### 1.1 Purpose This Technical Design Document translates the architectural vision from ADD v0.1 into implementable specifications. It provides developers with detailed class hierarchies, interface definitions, algorithm implementations, and testing strategies. ### 1.2 Scope | In Scope | Out of Scope | |----------|--------------| | Camoufox browser manager implementation | ML-based behavior generation (future) | | curl_cffi client pool management | Distributed orchestration (future) | | Redis session state management | Computer vision CAPTCHA solving | | BrowserForge profile generation | Third-party CAPTCHA API implementation | | Ghost Cursor behavioral engine | | | Entropy scheduler implementation | | ### 1.3 Document Conventions ``` [MUST] - Mandatory requirement [SHOULD] - Strongly recommended [MAY] - Optional enhancement ``` --- ## 2. System Architecture ### 2.1 High-Level Module Decomposition ```mermaid graph TB subgraph "Core Modules" A[faea.core.orchestrator] B[faea.core.scheduler] C[faea.core.config] end subgraph "Browser Subsystem" D[faea.browser.camoufox_manager] E[faea.browser.profile_generator] F[faea.browser.ghost_cursor] G[faea.browser.challenge_handler] end subgraph "Network Subsystem" H[faea.network.curl_client] I[faea.network.proxy_rotator] J[faea.network.session_store] end subgraph "Utilities" K[faea.utils.entropy] L[faea.utils.crypto] M[faea.utils.metrics] end A --> B A --> D A --> H D --> E D --> F D --> G D --> J H --> J H --> I B --> K ``` ### 2.2 Package Structure ``` faea/ ├── __init__.py ├── core/ │ ├── __init__.py │ ├── orchestrator.py # Main coordination logic │ ├── scheduler.py # Entropy-based task scheduling │ └── config.py # Configuration management ├── browser/ │ ├── __init__.py │ ├── camoufox_manager.py # Browser lifecycle management │ ├── profile_generator.py # BrowserForge integration │ ├── ghost_cursor.py # Human-like mouse movements │ └── challenge_handler.py # CAPTCHA/challenge solving ├── network/ │ ├── __init__.py │ ├── curl_client.py # curl_cffi wrapper │ ├── proxy_rotator.py # Mobile proxy management │ └── session_store.py # Redis state management ├── utils/ │ ├── __init__.py │ ├── entropy.py # Random number generation │ ├── crypto.py # Encryption utilities │ └── metrics.py # Prometheus metrics ├── models/ │ ├── __init__.py │ ├── session.py # SessionState dataclass │ ├── profile.py # BrowserForgeProfile │ └── task.py # Task definitions └── exceptions/ ├── __init__.py └── errors.py # Custom exceptions ``` --- ## 3. Module Specifications ### 3.1 Core Module: Orchestrator **File:** `faea/core/orchestrator.py` ```python from abc import ABC, abstractmethod from dataclasses import dataclass, field from enum import Enum, auto from typing import Optional, Callable, Awaitable import asyncio class TaskType(Enum): """Task type enumeration.""" AUTHENTICATION = auto() EXTRACTION = auto() RE_AUTHENTICATION = auto() @dataclass class Task: """Represents a dispatchable task.""" id: str type: TaskType target_url: str session_id: Optional[str] = None priority: int = 5 # 1-10, lower is higher priority retry_count: int = 0 max_retries: int = 3 created_at: float = field(default_factory=time.time) class OrchestratorState(Enum): """Orchestrator lifecycle states.""" IDLE = auto() RUNNING = auto() PAUSED = auto() SHUTTING_DOWN = auto() class Orchestrator: """ Central coordination of authentication and extraction workflows. [MUST] Implement graceful shutdown with in-flight task completion. [MUST] Enforce rate limits per target domain. [SHOULD] Support priority queue for task dispatch. """ def __init__( self, config: "Config", browser_pool: "CamoufoxPool", curl_pool: "CurlClientPool", session_store: "SessionStore", scheduler: "EntropyScheduler" ): self.config = config self.browser_pool = browser_pool self.curl_pool = curl_pool self.session_store = session_store self.scheduler = scheduler self.state = OrchestratorState.IDLE self._task_queue: asyncio.PriorityQueue[Task] = asyncio.PriorityQueue() self._active_tasks: dict[str, asyncio.Task] = {} async def start(self) -> None: """ Start the orchestrator main loop. Lifecycle: 1. Initialize resource pools 2. Start task dispatcher 3. Start health monitor """ self.state = OrchestratorState.RUNNING await asyncio.gather( self._dispatch_loop(), self._health_monitor_loop() ) async def submit_task(self, task: Task) -> str: """ Submit task to queue. Returns: Task ID for tracking """ await self._task_queue.put((task.priority, task)) return task.id async def _dispatch_loop(self) -> None: """Main dispatch loop with entropy-based timing.""" while self.state == OrchestratorState.RUNNING: priority, task = await self._task_queue.get() if task.type == TaskType.AUTHENTICATION: asyncio.create_task(self._handle_auth_task(task)) elif task.type == TaskType.EXTRACTION: asyncio.create_task(self._handle_extraction_task(task)) async def _handle_auth_task(self, task: Task) -> None: """ Handle authentication task. Flow: 1. Acquire browser from pool 2. Generate profile if needed 3. Navigate and authenticate 4. Extract session state 5. Store in Redis 6. Release browser """ browser = await self.browser_pool.acquire() try: profile = await self._get_or_create_profile(task.session_id) await browser.initialize(profile) await browser.solve_authentication(task.target_url) session_state = await browser.extract_session_state() await self.session_store.store(task.session_id, session_state) finally: await self.browser_pool.release(browser) async def _handle_extraction_task(self, task: Task) -> None: """ Handle extraction task. Flow: 1. Retrieve session state 2. Validate session TTL 3. Acquire curl client 4. Execute request 5. Handle response """ session_state = await self.session_store.retrieve(task.session_id) if not session_state or session_state.is_expired(): # Trigger re-authentication await self.submit_task(Task( id=f"reauth-{task.session_id}", type=TaskType.RE_AUTHENTICATION, target_url=task.target_url, session_id=task.session_id )) return client = await self.curl_pool.acquire(session_state) try: response = await client.fetch(task.target_url) await self._process_response(task, response) finally: await self.curl_pool.release(client) ``` ### 3.2 Browser Module: CamoufoxManager **File:** `faea/browser/camoufox_manager.py` ```python from enum import Enum, auto from dataclasses import dataclass from typing import Optional, Any import asyncio class BrowserState(Enum): """Browser instance lifecycle states.""" COLD = auto() # Not initialized WARMING = auto() # Starting up READY = auto() # Ready for navigation AUTHENTICATED = auto() # Session established EXTRACTING = auto() # Extracting state TERMINATED = auto() # Shutdown complete FAILED = auto() # Error state @dataclass class LaunchOptions: """Camoufox launch configuration.""" headless: bool = True viewport_width: int = 1920 viewport_height: int = 1080 locale: str = "en-US" timezone: str = "America/New_York" proxy: Optional[str] = None args: list[str] = field(default_factory=list) class CamoufoxManager: """ Manages Camoufox browser instance lifecycle. [MUST] Match TLS fingerprint to User-Agent. [MUST] Inject BrowserForge profile consistently. [MUST] Extract ALL session state (cookies, storage, etc.). [SHOULD] Implement 60s timeout for all operations. """ STATE_TRANSITIONS = { BrowserState.COLD: [BrowserState.WARMING, BrowserState.FAILED], BrowserState.WARMING: [BrowserState.READY, BrowserState.FAILED], BrowserState.READY: [BrowserState.AUTHENTICATED, BrowserState.FAILED], BrowserState.AUTHENTICATED: [BrowserState.EXTRACTING, BrowserState.FAILED], BrowserState.EXTRACTING: [BrowserState.TERMINATED, BrowserState.FAILED], BrowserState.FAILED: [BrowserState.TERMINATED], BrowserState.TERMINATED: [], } def __init__(self, profile: "BrowserForgeProfile"): self.profile = profile self.state = BrowserState.COLD self.context = None self.page = None self.ghost_cursor = GhostCursorEngine() self._state_lock = asyncio.Lock() async def _transition_state(self, new_state: BrowserState) -> None: """Atomic state transition with validation.""" async with self._state_lock: if new_state not in self.STATE_TRANSITIONS[self.state]: raise InvalidStateTransitionError( f"Cannot transition from {self.state} to {new_state}" ) self.state = new_state async def initialize(self) -> None: """ Initialize browser with full fingerprint injection. Steps: 1. Build launch arguments from profile 2. Launch Playwright context 3. Inject navigator overrides 4. Inject canvas/WebGL noise 5. Initialize Ghost Cursor """ await self._transition_state(BrowserState.WARMING) try: launch_options = self._build_launch_options() # Use Camoufox-specific launcher self.context = await camoufox.async_launch(**launch_options) self.page = await self.context.new_page() # Fingerprint injection sequence await self._inject_navigator_overrides() await self._inject_canvas_noise() await self._inject_webgl_overrides() await self._transition_state(BrowserState.READY) except Exception as e: await self._transition_state(BrowserState.FAILED) raise BrowserInitializationError(str(e)) from e def _build_launch_options(self) -> dict[str, Any]: """Build Camoufox-compatible launch options from profile.""" return { "headless": True, "fingerprint": { "screen": self.profile.screen, "navigator": { "userAgent": self.profile.user_agent, "platform": self.profile.platform, "hardwareConcurrency": self.profile.hardware_concurrency, "deviceMemory": self.profile.device_memory, }, }, "proxy": {"server": self.profile.proxy} if self.profile.proxy else None, "viewport": { "width": self.profile.viewport["width"], "height": self.profile.viewport["height"], }, "locale": self.profile.locale, "timezoneId": self.profile.timezone, } async def _inject_navigator_overrides(self) -> None: """Inject navigator property overrides.""" script = f""" Object.defineProperty(navigator, 'hardwareConcurrency', {{ get: () => {self.profile.hardware_concurrency} }}); Object.defineProperty(navigator, 'deviceMemory', {{ get: () => {self.profile.device_memory} }}); Object.defineProperty(navigator, 'maxTouchPoints', {{ get: () => {self.profile.max_touch_points} }}); """ await self.page.add_init_script(script) async def solve_authentication( self, target_url: str, timeout: float = 60.0 ) -> None: """ Navigate to target with human-like behavior. Behavioral Sequence: 1. Pre-navigation delay (2-7s) 2. Navigation with networkidle wait 3. Challenge detection and handling 4. Post-load reading simulation """ await self._transition_state(BrowserState.AUTHENTICATED) # Pre-navigation hesitation await asyncio.sleep(random.uniform(2.0, 7.0)) # Navigate await self.page.goto(target_url, wait_until="networkidle", timeout=timeout * 1000) # Detect and handle challenges challenge_handler = ChallengeHandler(self.page, self.ghost_cursor) await challenge_handler.detect_and_handle() # Simulate reading behavior await self._simulate_reading_behavior() async def _simulate_reading_behavior(self) -> None: """ Generate F-pattern scroll behavior. Based on Nielsen Norman Group eye-tracking research: - Horizontal movement across top - Shorter horizontal scan lower - Vertical scan down left side """ viewport_height = self.profile.viewport["height"] page_height = await self.page.evaluate("document.body.scrollHeight") # Generate F-pattern scroll points scroll_points = self._generate_f_pattern_scroll(page_height, viewport_height) for point in scroll_points: await self.page.evaluate(f"window.scrollTo(0, {point})") await self.ghost_cursor.random_micro_movement(self.page) # Log-normal delay simulates reading time await asyncio.sleep(random.lognormvariate(0.8, 0.3)) def _generate_f_pattern_scroll( self, page_height: int, viewport_height: int ) -> list[int]: """Generate scroll positions for F-pattern simulation.""" points = [0] # Start at top # Add scroll points with decreasing attention current = 0 while current < page_height - viewport_height: # Scroll increment decreases over time (attention decay) increment = random.randint(100, 300) * (1 - current / page_height * 0.5) current += int(increment) points.append(min(current, page_height - viewport_height)) return points async def extract_session_state(self) -> "SessionState": """ Extract all session artifacts. [MUST] Capture: - All cookies (including HttpOnly) - localStorage - sessionStorage - IndexedDB keys (if applicable) [MUST] Verify cf_clearance presence for Cloudflare targets. """ await self._transition_state(BrowserState.EXTRACTING) # Get cookies cookies = await self.context.cookies() # Get storage local_storage = await self.page.evaluate( "() => Object.fromEntries(Object.entries(localStorage))" ) session_storage = await self.page.evaluate( "() => Object.fromEntries(Object.entries(sessionStorage))" ) # Extract critical tokens cf_clearance = next( (c for c in cookies if c["name"] == "cf_clearance"), None ) return SessionState( cookies=cookies, local_storage=local_storage, session_storage=session_storage, cf_clearance=cf_clearance, user_agent=self.profile.user_agent, tls_fingerprint=self.profile.tls_fingerprint, timestamp=time.time() ) async def terminate(self) -> None: """Graceful shutdown.""" await self._transition_state(BrowserState.TERMINATED) if self.context: await self.context.close() ``` ### 3.3 Browser Module: Ghost Cursor Engine **File:** `faea/browser/ghost_cursor.py` ```python import math import random from typing import Tuple from dataclasses import dataclass @dataclass class Point: """2D point representation.""" x: float y: float def distance_to(self, other: "Point") -> float: return math.sqrt((self.x - other.x) ** 2 + (self.y - other.y) ** 2) class GhostCursorEngine: """ Human-like mouse movement generator using composite Bezier curves. Based on motor control research: - Meyer et al. (1988): Submovement composition - Fitts's Law: Movement time = a + b * log2(D/W + 1) [MUST] Generate non-deterministic paths. [MUST] Include micro-movements during "reading". [SHOULD] Vary velocity based on distance (Fitts's Law). """ # Fitts's Law constants (empirically derived) FITTS_A = 0.1 # Base time FITTS_B = 0.15 # Scaling factor # Velocity bounds (pixels/second) MIN_VELOCITY = 200 MAX_VELOCITY = 400 def __init__(self, seed: Optional[int] = None): self.rng = random.Random(seed) async def move_to( self, page: "Page", target: Point, origin: Optional[Point] = None ) -> None: """ Move cursor to target with human-like trajectory. Algorithm: 1. Calculate distance for submovement count 2. Generate waypoints with Gaussian noise 3. Execute submovements with Bezier curves 4. Apply Fitts's Law timing """ if origin is None: origin = await self._get_cursor_position(page) distance = origin.distance_to(target) # Determine submovement count (1-5 based on distance) num_submovements = min(5, max(1, int(distance / 300))) # Generate intermediate waypoints waypoints = self._generate_waypoints(origin, target, num_submovements) # Execute each submovement for i in range(len(waypoints) - 1): await self._execute_submovement( page, waypoints[i], waypoints[i + 1] ) def _generate_waypoints( self, start: Point, end: Point, count: int ) -> list[Point]: """ Generate waypoints with perpendicular Gaussian noise. Simulates motor control imprecision and overshooting. """ waypoints = [start] for i in range(1, count): t = i / count # Linear interpolation x = start.x + t * (end.x - start.x) y = start.y + t * (end.y - start.y) # Add perpendicular noise (overshooting behavior) angle = math.atan2(end.y - start.y, end.x - start.x) perp_angle = angle + math.pi / 2 noise_magnitude = self.rng.gauss(0, 10) x += noise_magnitude * math.cos(perp_angle) y += noise_magnitude * math.sin(perp_angle) waypoints.append(Point(x, y)) waypoints.append(end) return waypoints async def _execute_submovement( self, page: "Page", start: Point, end: Point ) -> None: """ Execute single submovement with cubic Bezier curve. Timing derived from Fitts's Law. """ distance = start.distance_to(end) # Generate control points for Bezier curve control1, control2 = self._generate_bezier_controls(start, end) # Calculate movement time from Fitts's Law target_width = 10 # Assumed target width movement_time = self.FITTS_A + self.FITTS_B * math.log2(distance / target_width + 1) # Sample curve and execute steps = max(10, int(distance / 5)) step_delay = movement_time / steps for i in range(steps + 1): t = i / steps point = self._bezier_point(t, start, control1, control2, end) await page.mouse.move(point.x, point.y) await asyncio.sleep(step_delay) def _generate_bezier_controls( self, start: Point, end: Point ) -> Tuple[Point, Point]: """Generate control points with randomized curvature.""" # Midpoint mid_x = (start.x + end.x) / 2 mid_y = (start.y + end.y) / 2 # Offset perpendicular to line dx = end.x - start.x dy = end.y - start.y length = math.sqrt(dx * dx + dy * dy) if length == 0: return Point(mid_x, mid_y), Point(mid_x, mid_y) # Perpendicular unit vector perp_x = -dy / length perp_y = dx / length # Random offset magnitude (0-30% of distance) offset1 = self.rng.uniform(-0.3, 0.3) * length offset2 = self.rng.uniform(-0.3, 0.3) * length control1 = Point( start.x + 0.3 * dx + offset1 * perp_x, start.y + 0.3 * dy + offset1 * perp_y ) control2 = Point( start.x + 0.7 * dx + offset2 * perp_x, start.y + 0.7 * dy + offset2 * perp_y ) return control1, control2 def _bezier_point( self, t: float, p0: Point, p1: Point, p2: Point, p3: Point ) -> Point: """Evaluate cubic Bezier curve at parameter t.""" # B(t) = (1-t)³P₀ + 3(1-t)²tP₁ + 3(1-t)t²P₂ + t³P₃ u = 1 - t tt = t * t uu = u * u uuu = uu * u ttt = tt * t x = uuu * p0.x + 3 * uu * t * p1.x + 3 * u * tt * p2.x + ttt * p3.x y = uuu * p0.y + 3 * uu * t * p1.y + 3 * u * tt * p2.y + ttt * p3.y return Point(x, y) async def random_micro_movement(self, page: "Page") -> None: """ Small drift movements simulating hand tremor/fidgeting. Used during "reading" pauses. """ current = await self._get_cursor_position(page) # Small Gaussian drift drift = Point( current.x + self.rng.gauss(0, 5), current.y + self.rng.gauss(0, 5) ) # Slow movement (low velocity = inattention) await page.mouse.move(drift.x, drift.y) await asyncio.sleep(self.rng.uniform(0.05, 0.15)) async def _get_cursor_position(self, page: "Page") -> Point: """Get current cursor position from page.""" pos = await page.evaluate(""" () => ({x: window.mouseX || 0, y: window.mouseY || 0}) """) return Point(pos["x"], pos["y"]) ``` ### 3.4 Network Module: Session Store **File:** `faea/network/session_store.py` ```python from dataclasses import dataclass from typing import Optional import hashlib import hmac import msgpack import time @dataclass class SessionState: """ Complete session state for handover between browser and curl. [MUST] Include all cookies (especially cf_clearance). [MUST] Include TLS fingerprint for matching. """ cookies: list[dict] local_storage: dict[str, str] session_storage: dict[str, str] cf_clearance: Optional[dict] user_agent: str tls_fingerprint: str # e.g., "chrome120" timestamp: float def is_expired(self, ttl_buffer: int = 300) -> bool: """Check if session is expired or near expiration.""" if not self.cf_clearance: return True # cf_clearance typically expires in 1800s expires = self.cf_clearance.get("expires", 0) return time.time() > (expires - ttl_buffer) def serialize(self) -> bytes: """Serialize with MessagePack + HMAC.""" payload = msgpack.packb({ "cookies": self.cookies, "local_storage": self.local_storage, "session_storage": self.session_storage, "cf_clearance": self.cf_clearance, "user_agent": self.user_agent, "tls_fingerprint": self.tls_fingerprint, "timestamp": self.timestamp, }) return payload @classmethod def deserialize(cls, data: bytes) -> "SessionState": """Deserialize from MessagePack.""" obj = msgpack.unpackb(data, raw=False) return cls(**obj) class SessionStore: """ Redis-backed encrypted session storage. [MUST] Encrypt at rest with Fernet. [MUST] Set TTL shorter than cookie expiration. [SHOULD] Implement connection pooling. """ def __init__(self, redis_client, encryption_key: bytes): from cryptography.fernet import Fernet self.redis = redis_client self.cipher = Fernet(encryption_key) async def store(self, session_id: str, state: SessionState) -> None: """Encrypt and store session.""" plaintext = state.serialize() ciphertext = self.cipher.encrypt(plaintext) # TTL = 25 minutes (before 30-min cookie expiration) await self.redis.setex( name=f"session:{session_id}", time=1500, value=ciphertext ) async def retrieve(self, session_id: str) -> Optional[SessionState]: """Retrieve and decrypt session.""" ciphertext = await self.redis.get(f"session:{session_id}") if not ciphertext: return None plaintext = self.cipher.decrypt(ciphertext) return SessionState.deserialize(plaintext) async def delete(self, session_id: str) -> None: """Delete session.""" await self.redis.delete(f"session:{session_id}") ``` ### 3.5 Network Module: curl_cffi Client **File:** `faea/network/curl_client.py` ```python from curl_cffi.requests import AsyncSession from typing import Optional, Any class CurlClient: """ curl_cffi-based HTTP client with TLS fingerprint matching. [MUST] Use same TLS fingerprint as browser session. [MUST] Include all cookies from session state. [MUST] Build consistent sec-ch-ua headers. """ def __init__(self, session_state: "SessionState"): self.session_state = session_state self.session: Optional[AsyncSession] = None self.headers: dict[str, str] = {} async def initialize(self) -> None: """Configure client to match browser fingerprint.""" # Create session with matching TLS impersonation self.session = AsyncSession( impersonate=self.session_state.tls_fingerprint ) # Inject all cookies for cookie in self.session_state.cookies: self.session.cookies.set( name=cookie["name"], value=cookie["value"], domain=cookie["domain"], path=cookie.get("path", "/"), secure=cookie.get("secure", False), ) # Build header profile self.headers = self._build_headers() def _build_headers(self) -> dict[str, str]: """Build headers matching browser profile.""" return { "User-Agent": self.session_state.user_agent, "Accept": "application/json, text/plain, */*", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "sec-ch-ua": self._derive_sec_ch_ua(), "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": self._extract_platform(), } def _derive_sec_ch_ua(self) -> str: """ Derive sec-ch-ua from User-Agent. Chrome/120.0.6099.109 → "Chromium";v="120", "Google Chrome";v="120" """ import re match = re.search(r"Chrome/(\d+)", self.session_state.user_agent) if match: version = match.group(1) return f'"Not_A Brand";v="8", "Chromium";v="{version}", "Google Chrome";v="{version}"' return '"Not_A Brand";v="8"' def _extract_platform(self) -> str: """Extract platform from User-Agent.""" ua = self.session_state.user_agent if "Windows" in ua: return '"Windows"' elif "Mac" in ua: return '"macOS"' elif "Linux" in ua: return '"Linux"' return '"Unknown"' async def fetch( self, url: str, method: str = "GET", **kwargs ) -> "Response": """ Execute request with fingerprint matching. Includes pre-request jitter for entropy. """ import asyncio import random # Pre-request jitter await asyncio.sleep(random.lognormvariate(0.2, 0.1)) response = await self.session.request( method=method, url=url, headers=self.headers, **kwargs ) # Check for challenge response if "cf-mitigated" in response.headers: raise SessionInvalidatedError("Cloudflare challenge detected") return response async def close(self) -> None: """Close session.""" if self.session: await self.session.close() ``` --- ## 4. Data Models & Interfaces ### 4.1 Core Data Classes ```python # faea/models/profile.py from dataclasses import dataclass, field from typing import Dict, Any, Optional @dataclass class BrowserForgeProfile: """ Complete browser fingerprint profile. All fields must exhibit internal consistency. """ # User-Agent and related user_agent: str sec_ch_ua: str platform: str # Screen configuration screen: Dict[str, int] = field(default_factory=lambda: { "width": 1920, "height": 1080, "colorDepth": 24, "pixelRatio": 1.0, }) viewport: Dict[str, int] = field(default_factory=lambda: { "width": 1920, "height": 969, # Screen height - browser chrome }) # Hardware profile hardware_concurrency: int = 8 device_memory: int = 8 max_touch_points: int = 0 # WebGL spoofing webgl_vendor: str = "Google Inc. (Intel)" webgl_renderer: str = "ANGLE (Intel, Intel(R) UHD Graphics 620)" # Canvas noise seed canvas_noise_seed: int = 0 # Network identity tls_fingerprint: str = "chrome120" proxy: Optional[str] = None # Locale/timezone locale: str = "en-US" timezone: str = "America/New_York" def validate(self) -> "ValidationResult": """Validate internal consistency.""" validator = ProfileValidator() return validator.validate(self) @dataclass class ValidationResult: """Profile validation result.""" passed: bool failures: list["ValidationCheck"] = field(default_factory=list) @dataclass class ValidationCheck: """Single validation check result.""" name: str passed: bool details: str ``` ### 4.2 Exception Hierarchy ```python # faea/exceptions/errors.py class FAEABaseError(Exception): """Base exception for all FAEA errors.""" pass class BrowserError(FAEABaseError): """Browser subsystem errors.""" pass class BrowserInitializationError(BrowserError): """Failed to initialize browser.""" pass class InvalidStateTransitionError(BrowserError): """Invalid state machine transition.""" pass class ChallengeError(BrowserError): """Challenge detection/solving failed.""" pass class NetworkError(FAEABaseError): """Network subsystem errors.""" pass class SessionNotFoundError(NetworkError): """Session not found in store.""" pass class SessionInvalidatedError(NetworkError): """Session was invalidated by target.""" pass class ProxyError(NetworkError): """Proxy-related errors.""" pass class ProxyExhaustionError(ProxyError): """All proxies exhausted or in cooldown.""" pass class ConfigurationError(FAEABaseError): """Configuration errors.""" pass ``` --- ## 5. API Contracts ### 5.1 Internal Module Interfaces ```python # Type definitions for module boundaries from abc import ABC, abstractmethod from typing import Protocol class BrowserPoolProtocol(Protocol): """Interface for browser pool management.""" async def acquire(self) -> "CamoufoxManager": """Acquire browser from pool.""" ... async def release(self, browser: "CamoufoxManager") -> None: """Release browser back to pool.""" ... @property def available(self) -> int: """Number of available browsers.""" ... class SessionStoreProtocol(Protocol): """Interface for session storage.""" async def store(self, session_id: str, state: "SessionState") -> None: ... async def retrieve(self, session_id: str) -> Optional["SessionState"]: ... async def delete(self, session_id: str) -> None: ... class SchedulerProtocol(Protocol): """Interface for entropy scheduler.""" def next_execution_time(self) -> float: """Calculate next execution timestamp.""" ... async def dispatch_with_entropy(self, task: Callable) -> None: """Dispatch task at entropic time.""" ... ``` --- ## 6. Database Design ### 6.1 Redis Key Schema ``` # Session State session:{session_id} → Encrypted SessionState (binary) TTL: 1500 seconds # Proxy Usage Tracking proxy:usage:{proxy_id} → JSON {session_id, last_used_ts} TTL: 600 seconds # Profile Cache profile:{profile_hash} → Serialized BrowserForgeProfile TTL: 86400 seconds (1 day) # Rate Limiting ratelimit:{domain}:{minute} → Counter TTL: 120 seconds # Metrics Aggregation metrics:auth:success → Counter metrics:auth:failure:{reason} → Counter metrics:extraction:success → Counter metrics:extraction:challenge → Counter ``` ### 6.2 Redis Configuration ```yaml # redis.conf highlights maxmemory 4gb maxmemory-policy allkeys-lru appendonly yes appendfsync everysec # Cluster mode (production) cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 ``` --- ## 7. Core Algorithms ### 7.1 Entropy Scheduler ```python # faea/core/scheduler.py import random import time from dataclasses import dataclass @dataclass class SchedulerConfig: """Scheduler configuration.""" base_interval: float = 30.0 # seconds drift_sigma: float = 5.0 # Gaussian noise σ min_interval: float = 5.0 # Minimum delay max_interval: float = 120.0 # Maximum delay class EntropyScheduler: """ Entropy-based task scheduling to defeat temporal analysis. Model: T_actual = T_base + N(0, σ²) + φ(t) Where φ(t) is a slowly-drifting phase offset. """ def __init__(self, config: SchedulerConfig = None): self.config = config or SchedulerConfig() self.phase_offset = 0.0 def next_execution_time(self) -> float: """Calculate next execution time with drift.""" # Gaussian noise noise = random.gauss(0, self.config.drift_sigma) # Phase shift accumulation (low-frequency drift) self.phase_offset += random.uniform(-0.5, 0.5) # Calculate interval interval = self.config.base_interval + noise + self.phase_offset # Clamp to bounds interval = max(self.config.min_interval, min(self.config.max_interval, interval)) return time.time() + interval async def dispatch_with_entropy(self, task: Callable) -> None: """Dispatch task at entropic time.""" import asyncio target_time = self.next_execution_time() delay = target_time - time.time() if delay > 0: await asyncio.sleep(delay) # Pre-execution jitter (human hesitation) await asyncio.sleep(random.uniform(0.1, 0.8)) await task() ``` ### 7.2 Proxy Rotation Algorithm ```python # faea/network/proxy_rotator.py from dataclasses import dataclass, field import time from typing import Dict, List, Optional @dataclass class ProxyUsage: """Track proxy usage.""" proxy: str session_id: str last_used: float request_count: int = 0 class ProxyRotator: """ Sticky session proxy rotation with cooldown. Rules: 1. Same session_id → same proxy (until cooldown) 2. After cooldown → LRU selection 3. Blacklisted proxies excluded """ def __init__( self, proxy_pool: List[str], cooldown_period: int = 300 # 5 minutes ): self.proxy_pool = proxy_pool self.cooldown_period = cooldown_period self.usage: Dict[str, ProxyUsage] = {} self.blacklist: set[str] = set() def select_proxy(self, session_id: str) -> str: """Select proxy with sticky session support.""" # Check for existing sticky session if session_id in self.usage: usage = self.usage[session_id] if time.time() - usage.last_used < self.cooldown_period: usage.last_used = time.time() usage.request_count += 1 return usage.proxy # Select least-recently-used available proxy proxy = self._select_lru_proxy() self.usage[session_id] = ProxyUsage( proxy=proxy, session_id=session_id, last_used=time.time() ) return proxy def _select_lru_proxy(self) -> str: """Select least-recently-used non-blacklisted proxy.""" available = [ p for p in self.proxy_pool if p not in self.blacklist and self._is_cooled_down(p) ] if not available: raise ProxyExhaustionError("No proxies available") # Sort by last use time (ascending) return min(available, key=lambda p: self._last_use_time(p)) def _is_cooled_down(self, proxy: str) -> bool: """Check if proxy completed cooldown.""" for usage in self.usage.values(): if usage.proxy == proxy: return time.time() - usage.last_used > self.cooldown_period return True def _last_use_time(self, proxy: str) -> float: """Get last use time for proxy.""" for usage in self.usage.values(): if usage.proxy == proxy: return usage.last_used return 0.0 def blacklist_proxy(self, proxy: str) -> None: """Add proxy to blacklist.""" self.blacklist.add(proxy) def release_session(self, session_id: str) -> None: """Release sticky session.""" if session_id in self.usage: del self.usage[session_id] ``` --- ## 8. Configuration Management ### 8.1 Configuration Schema ```python # faea/core/config.py from dataclasses import dataclass, field from typing import List, Optional import os @dataclass class RedisConfig: """Redis connection configuration.""" host: str = "localhost" port: int = 6379 db: int = 0 password: Optional[str] = None @property def url(self) -> str: auth = f":{self.password}@" if self.password else "" return f"redis://{auth}{self.host}:{self.port}/{self.db}" @dataclass class ProxyConfig: """Proxy pool configuration.""" provider: str = "oxylabs" # oxylabs, smartproxy, etc. api_key: str = "" country_code: str = "us" pool_size: int = 10 cooldown_seconds: int = 300 @dataclass class BrowserConfig: """Browser pool configuration.""" pool_size: int = 5 headless: bool = True timeout_seconds: int = 60 shm_size: str = "2gb" @dataclass class SchedulerConfig: """Scheduler configuration.""" base_interval: float = 30.0 drift_sigma: float = 5.0 min_interval: float = 5.0 max_interval: float = 120.0 @dataclass class Config: """Root configuration.""" redis: RedisConfig = field(default_factory=RedisConfig) proxy: ProxyConfig = field(default_factory=ProxyConfig) browser: BrowserConfig = field(default_factory=BrowserConfig) scheduler: SchedulerConfig = field(default_factory=SchedulerConfig) encryption_key: str = "" # Fernet key log_level: str = "INFO" @classmethod def from_env(cls) -> "Config": """Load configuration from environment variables.""" return cls( redis=RedisConfig( host=os.getenv("REDIS_HOST", "localhost"), port=int(os.getenv("REDIS_PORT", "6379")), password=os.getenv("REDIS_PASSWORD"), ), proxy=ProxyConfig( api_key=os.getenv("PROXY_API_KEY", ""), country_code=os.getenv("PROXY_COUNTRY", "us"), ), encryption_key=os.getenv("ENCRYPTION_KEY", ""), log_level=os.getenv("LOG_LEVEL", "INFO"), ) ``` ### 8.2 Environment Variables | Variable | Description | Required | Default | |----------|-------------|----------|---------| | `REDIS_HOST` | Redis server hostname | No | localhost | | `REDIS_PORT` | Redis server port | No | 6379 | | `REDIS_PASSWORD` | Redis password | No | None | | `PROXY_API_KEY` | Mobile proxy API key | Yes | - | | `PROXY_COUNTRY` | Proxy country code | No | us | | `ENCRYPTION_KEY` | Fernet encryption key | Yes | - | | `BROWSERFORGE_SEED` | Profile generation seed | No | Random | | `LOG_LEVEL` | Logging verbosity | No | INFO | --- ## 9. Error Handling Strategy ### 9.1 Retry Policies ```python from dataclasses import dataclass from typing import Type, Tuple import asyncio @dataclass class RetryPolicy: """Retry configuration.""" max_attempts: int = 3 base_delay: float = 1.0 max_delay: float = 60.0 exponential_base: float = 2.0 retryable_exceptions: Tuple[Type[Exception], ...] = ( NetworkError, SessionInvalidatedError, ) async def with_retry( func: Callable, policy: RetryPolicy, *args, **kwargs ): """Execute function with exponential backoff retry.""" last_exception = None for attempt in range(policy.max_attempts): try: return await func(*args, **kwargs) except policy.retryable_exceptions as e: last_exception = e if attempt == policy.max_attempts - 1: raise delay = min( policy.base_delay * (policy.exponential_base ** attempt), policy.max_delay ) # Add jitter delay += random.uniform(0, delay * 0.1) await asyncio.sleep(delay) raise last_exception ``` ### 9.2 Circuit Breaker ```python from enum import Enum, auto import time class CircuitState(Enum): CLOSED = auto() # Normal operation OPEN = auto() # Failing, reject requests HALF_OPEN = auto() # Testing recovery class CircuitBreaker: """ Circuit breaker for external service protection. Prevents cascading failures by cutting off failing services. """ def __init__( self, failure_threshold: int = 5, recovery_timeout: float = 30.0, half_open_requests: int = 3 ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.half_open_requests = half_open_requests self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = 0.0 self.half_open_successes = 0 def can_execute(self) -> bool: """Check if request can proceed.""" if self.state == CircuitState.CLOSED: return True if self.state == CircuitState.OPEN: # Check if recovery timeout elapsed if time.time() - self.last_failure_time > self.recovery_timeout: self.state = CircuitState.HALF_OPEN return True return False # HALF_OPEN: allow limited requests return True def record_success(self) -> None: """Record successful execution.""" if self.state == CircuitState.HALF_OPEN: self.half_open_successes += 1 if self.half_open_successes >= self.half_open_requests: self.state = CircuitState.CLOSED self.failure_count = 0 else: self.failure_count = 0 def record_failure(self) -> None: """Record failed execution.""" self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN self.half_open_successes = 0 ``` --- ## 10. Testing Strategy ### 10.1 Test Categories | Category | Scope | Tools | Coverage Target | |----------|-------|-------|-----------------| | Unit | Individual classes/functions | pytest, unittest.mock | 80% | | Integration | Module interactions | pytest, testcontainers | 70% | | Contract | API interfaces | pytest, hypothesis | 90% | | E2E | Full workflow | pytest, Playwright | Critical paths | ### 10.2 Unit Test Examples ```python # tests/unit/test_ghost_cursor.py import pytest from unittest.mock import AsyncMock, MagicMock from faea.browser.ghost_cursor import GhostCursorEngine, Point class TestGhostCursorEngine: """Unit tests for Ghost Cursor.""" def test_point_distance(self): """Test distance calculation.""" p1 = Point(0, 0) p2 = Point(3, 4) assert p1.distance_to(p2) == 5.0 def test_waypoint_generation(self): """Test waypoint generation includes start and end.""" engine = GhostCursorEngine(seed=42) start = Point(0, 0) end = Point(100, 100) waypoints = engine._generate_waypoints(start, end, 3) assert len(waypoints) == 4 # start + 2 intermediate + end assert waypoints[0] == start assert waypoints[-1] == end def test_bezier_curve_endpoints(self): """Test Bezier curve passes through endpoints.""" engine = GhostCursorEngine(seed=42) p0 = Point(0, 0) p1 = Point(25, 50) p2 = Point(75, 50) p3 = Point(100, 100) # t=0 should give p0 result_0 = engine._bezier_point(0, p0, p1, p2, p3) assert abs(result_0.x - p0.x) < 0.001 assert abs(result_0.y - p0.y) < 0.001 # t=1 should give p3 result_1 = engine._bezier_point(1, p0, p1, p2, p3) assert abs(result_1.x - p3.x) < 0.001 assert abs(result_1.y - p3.y) < 0.001 @pytest.mark.asyncio async def test_move_to_generates_submovements(self): """Test move_to generates appropriate submovements.""" engine = GhostCursorEngine(seed=42) page = AsyncMock() page.evaluate = AsyncMock(return_value={"x": 0, "y": 0}) page.mouse.move = AsyncMock() await engine.move_to(page, Point(500, 500)) # Should have multiple move calls assert page.mouse.move.call_count > 10 ``` ### 10.3 Integration Test Examples ```python # tests/integration/test_session_store.py import pytest from testcontainers.redis import RedisContainer from faea.network.session_store import SessionStore, SessionState from cryptography.fernet import Fernet @pytest.fixture(scope="module") def redis_container(): """Start Redis container for testing.""" with RedisContainer() as redis: yield redis @pytest.fixture def session_store(redis_container): """Create session store with test Redis.""" import redis as r client = r.Redis( host=redis_container.get_container_host_ip(), port=redis_container.get_exposed_port(6379) ) key = Fernet.generate_key() return SessionStore(client, key) class TestSessionStore: """Integration tests for SessionStore.""" @pytest.mark.asyncio async def test_store_and_retrieve(self, session_store): """Test round-trip store and retrieve.""" state = SessionState( cookies=[{"name": "test", "value": "123"}], local_storage={"key": "value"}, session_storage={}, cf_clearance=None, user_agent="Test/1.0", tls_fingerprint="chrome120", timestamp=1234567890.0 ) await session_store.store("test-session", state) retrieved = await session_store.retrieve("test-session") assert retrieved is not None assert retrieved.cookies == state.cookies assert retrieved.user_agent == state.user_agent @pytest.mark.asyncio async def test_retrieve_nonexistent(self, session_store): """Test retrieving nonexistent session.""" result = await session_store.retrieve("nonexistent") assert result is None ``` ### 10.4 Test Execution Commands ```bash # Run all unit tests pytest tests/unit -v --cov=faea --cov-report=html # Run integration tests (requires Docker) pytest tests/integration -v --tb=short # Run specific test file pytest tests/unit/test_ghost_cursor.py -v # Run with coverage threshold pytest tests/unit --cov=faea --cov-fail-under=80 ``` --- ## 11. Deployment Specifications ### 11.1 Docker Images **Camoufox Worker Image:** ```dockerfile # docker/camoufox.Dockerfile FROM python:3.11-slim # Install browser dependencies RUN apt-get update && apt-get install -y \ wget gnupg ca-certificates \ fonts-liberation libasound2 libatk-bridge2.0-0 \ libatk1.0-0 libcups2 libdbus-1-3 libgdk-pixbuf2.0-0 \ libnspr4 libnss3 libx11-xcb1 libxcomposite1 \ libxdamage1 libxrandr2 xdg-utils tini \ && rm -rf /var/lib/apt/lists/* WORKDIR /app COPY requirements-camoufox.txt . RUN pip install --no-cache-dir -r requirements-camoufox.txt # Install browsers RUN playwright install chromium RUN pip install camoufox COPY faea/ ./faea/ COPY workers/camoufox_worker.py . ENTRYPOINT ["/usr/bin/tini", "--"] CMD ["python", "camoufox_worker.py"] ``` **curl Client Image:** ```dockerfile # docker/curl.Dockerfile FROM python:3.11-slim WORKDIR /app COPY requirements-curl.txt . RUN pip install --no-cache-dir -r requirements-curl.txt COPY faea/ ./faea/ COPY workers/curl_worker.py . CMD ["python", "curl_worker.py"] ``` ### 11.2 Docker Compose ```yaml # docker-compose.yml version: "3.8" services: orchestrator: build: context: . dockerfile: docker/orchestrator.Dockerfile environment: - REDIS_URL=redis://redis:6379 - PROXY_API_KEY=${PROXY_API_KEY} - ENCRYPTION_KEY=${ENCRYPTION_KEY} depends_on: - redis deploy: replicas: 1 camoufox-pool: build: context: . dockerfile: docker/camoufox.Dockerfile environment: - REDIS_URL=redis://redis:6379 shm_size: 2gb deploy: replicas: 5 resources: limits: cpus: "2" memory: 2G volumes: - /dev/shm:/dev/shm curl-pool: build: context: . dockerfile: docker/curl.Dockerfile environment: - REDIS_URL=redis://redis:6379 deploy: replicas: 20 resources: limits: cpus: "0.5" memory: 512M redis: image: redis:7-alpine command: redis-server --maxmemory 4gb --maxmemory-policy allkeys-lru volumes: - redis-data:/data ports: - "6379:6379" volumes: redis-data: ``` --- ## 12. Performance Requirements ### 12.1 SLAs | Metric | Target | Critical Threshold | |--------|--------|-------------------| | Authentication time | < 15s | < 30s | | Session TTL | 25-35 min | > 10 min | | Extraction RPS (per session) | 2-5 RPS | > 1 RPS | | Challenge rate | < 10% | < 30% | | Auth success rate | > 90% | > 70% | ### 12.2 Resource Budgets | Component | Memory | CPU | Network | |-----------|--------|-----|---------| | Camoufox instance | 2 GB | 2 cores | 10 Mbps | | curl client | 512 MB | 0.5 cores | 10 Mbps | | Redis | 4 GB | 2 cores | - | | Orchestrator | 1 GB | 1 core | - | --- ## 13. Security Implementation ### 13.1 Encryption Standards - **At Rest:** Fernet (AES-128-CBC with HMAC) - **In Transit:** TLS 1.3 (Redis, external APIs) - **Key Management:** Environment variables, secrets manager in production ### 13.2 Access Control ```python # Rate limiting per domain RATE_LIMITS = { "default": 60, # 60 req/min "api.target.com": 30, # More conservative } # Resource limits per container RESOURCE_LIMITS = { "max_concurrent_browsers": 5, "max_concurrent_extractors": 20, } ``` --- ## 14. Appendices ### 14.1 Glossary | Term | Definition | |------|------------| | cf_clearance | Cloudflare session cookie indicating passed challenge | | JA3 | TLS fingerprinting method using client hello | | Turnstile | Cloudflare's CAPTCHA alternative | | CGNAT | Carrier-grade NAT (shared mobile IPs) | | Fernet | Symmetric encryption scheme | ### 14.2 References 1. [ADD v0.1](file:///home/kasm-user/workspace/FAEA/docs/ADD_v0.1.md) - Architecture Definition Document 2. Camoufox Documentation - https://camoufox.com 3. curl_cffi Documentation - https://curl-cffi.readthedocs.io 4. BrowserForge - Browser fingerprint generation ### 14.3 Document History | Version | Date | Author | Changes | |---------|------|--------|---------| | 0.1 | 2025-12-22 | System Architect | Initial draft from ADD v0.1 | --- **End of Technical Design Document**