diff --git a/implementation_plan.md b/implementation_plan.md index b6069eb..3e0b7e6 100644 --- a/implementation_plan.md +++ b/implementation_plan.md @@ -24,7 +24,8 @@ Transition the system from a functional prototype to a scalable, production-read - **TaskWorker** (`src/orchestrator/worker.py`): - Implemented persistent Redis consumer loop. - Integrated with `EntropyScheduler` and `SessionRecoveryManager`. - - Dispatches `auth` and `extract` tasks. + - **Persistence Bridge**: Implemented binary Redis support (`redis_raw`) for Headless-Plus state handover. + - Dispatches `auth` and `extract` tasks with full session serialization/deserialization. - **SessionRecoveryManager** (`src/core/recovery.py`): - Implemented logic for handling `cf_clearance_expired`, `rate_limit`, etc. @@ -36,5 +37,5 @@ Transition the system from a functional prototype to a scalable, production-read ## Verification Status - **Infrastructure**: Services definitions validated. -- **Logic**: Worker loop and recovery logic implemented. +- **Logic**: Worker loop, recovery logic, and Headless-Plus persistence bridge implemented. - **Readiness**: Configured for production deployment. diff --git a/src/orchestrator/worker.py b/src/orchestrator/worker.py index aa63279..e0a9317 100644 --- a/src/orchestrator/worker.py +++ b/src/orchestrator/worker.py @@ -17,6 +17,8 @@ logger = logging.getLogger("TaskWorker") class TaskWorker: def __init__(self, redis_url: str): self.redis = redis.from_url(redis_url, decode_responses=True) + # Binary client for MessagePack payload (SessionState) + self.redis_raw = redis.from_url(redis_url, decode_responses=False) self.scheduler = EntropyScheduler() self.recovery = SessionRecoveryManager(self) self.running = True @@ -74,17 +76,14 @@ class TaskWorker: # Extract state state = await browser.extract_session_state() - # Serialization handled by SessionState, we need to save to Redis - # But manager.extract_session_state returns an object. - # We need to save it. # Redis key: session:{id}:state - encrypted_payload = state.serialize() # (Not encrypted yet, just msgpack+hmac) + encrypted_payload = state.serialize() + redis_key = state.to_redis_key(session_id) - # In a real app we would use redis bytes client for state - # Here we mix string/bytes usage. - # Ideally use a separate client or decode_responses=False for data. - # For MVP simplicity, we might skip actual saving or assume helpers. - logger.info(f"Session {session_id} authenticated and captured.") + # Save with 30-minute TTL (Headless-Plus requirement) + await self.redis_raw.setex(redis_key, 1800, encrypted_payload) + + logger.info(f"Session {session_id} authenticated and captured. Stored in Redis.") MetricsCollector.record_auth_success() except Exception as e: @@ -94,11 +93,20 @@ class TaskWorker: async def handle_extract(self, url: str, session_id: str): try: - # We need to load session state. - # Mock loading for now as we don't have the heavy state store wired in this file - # In real implementaiton: state = await session_store.load(session_id) - # For demonstration of the loop: - logger.info(f"Extracting {url} with session {session_id}") + # Load Binary Session State + redis_key = f"session:{session_id}:state" + payload = await self.redis_raw.get(redis_key) + + if not payload: + raise ValueError(f"Session {session_id} not found in Redis (expired or never created).") + + state = SessionState.deserialize(payload) + + # Execute Headless-Plus Extraction + async with CurlClient(state) as client: + html = await client.fetch(url) + logger.info(f"Extracted {len(html)} bytes from {url} using Session {session_id}") + MetricsCollector.record_extraction("success") except Exception as e: logger.error(f"Extraction failed: {e}") diff --git a/walkthrough.md b/walkthrough.md index 76830a6..8da4885 100644 --- a/walkthrough.md +++ b/walkthrough.md @@ -108,6 +108,7 @@ tests/unit/test_session_core.py .. [100%] - switched `camoufox-pool` base image to Microsoft Playwright Jammy for stability. - Removed `shm_size` conflict and legacy version tag. - **Worker**: Implemented `src/orchestrator/worker.py` for continuous task processing. + - **Persistence Bridge**: Integrated binary Redis serialization (MsgPack + HMAC) to bridge Authentication (Browser) and Extraction (Curl). - **Monitoring**: Implemented `src/core/monitoring.py` exposing metrics on port 8000. - **Hotfix: Runtime Dependencies**: Resolved `ModuleNotFoundError` crashes by adding `prometheus-client`, `redis`, and `msgpack` to `requirements.txt`. - **Field Verification**: Confirmed end-to-end success in production environment (v1.0.1 hotfix).