FAEA/src/orchestrator/worker.py
Luciabrightcode 5105cd7591 feat(core): implement Headless-Plus Persistence Bridge
- Initialize binary Redis client in TaskWorker for msgpack payloads
- Implement session state serialization and storage in handle_auth
- Implement session state retrieval and deserialization in handle_extract
- Update docs to reflect persistence architecture
2025-12-23 16:05:47 +08:00

117 lines
4.6 KiB
Python

import asyncio
import logging
import json
import redis.asyncio as redis
from src.browser.manager import CamoufoxManager
from src.extractor.client import CurlClient
from src.core.session import SessionState
from src.core.monitoring import MetricsCollector
from src.core.recovery import SessionRecoveryManager
from src.core.scheduler import EntropyScheduler
from src.browser.ghost_cursor import GhostCursorEngine
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("TaskWorker")
class TaskWorker:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url, decode_responses=True)
# Binary client for MessagePack payload (SessionState) - Headless-Plus Store
self.redis_raw = redis.from_url(redis_url, decode_responses=False)
self.scheduler = EntropyScheduler()
self.recovery = SessionRecoveryManager(self)
self.running = True
async def start(self):
logger.info("Starting TaskWorker...")
# Start metrics server
MetricsCollector.start_server(8000)
while self.running:
try:
# Blocking pop from task queue
# task format: {"type": "auth|extract", "url": "...", "session_id": "..."}
task_data = await self.redis.blpop("tasks", timeout=5)
if not task_data:
continue
_, payload = task_data
task = json.loads(payload)
# Apply entropy scheduling (variable delay)
await self.scheduler.dispatch_with_entropy(
self.process_task(task)
)
except Exception as e:
logger.error(f"Worker loop error: {e}")
await asyncio.sleep(1)
async def process_task(self, task: dict):
task_type = task.get("type")
url = task.get("url")
session_id = task.get("session_id")
logger.info(f"Processing task: {task_type} for {session_id}")
if task_type == "auth":
await self.handle_auth(url, session_id)
elif task_type == "extract":
await self.handle_extract(url, session_id)
else:
logger.warning(f"Unknown task type: {task_type}")
async def handle_auth(self, url: str, session_id: str):
try:
async with CamoufoxManager() as browser:
# Use GhostCursor (already integrated in manager potentially, or strictly here)
# Ideally, manager uses GhostCursor internally.
# In Phase 2 manager, we did not wire GhostCursor fully.
# Here we simulate the usage or assume update.
# For this step, we just run the nav.
await browser.navigate(url)
# Extract state
state = await browser.extract_session_state()
# Redis key: session:{id}:state
encrypted_payload = state.serialize()
redis_key = state.to_redis_key(session_id)
# Save with 30-minute TTL (Headless-Plus requirement)
await self.redis_raw.setex(redis_key, 1800, encrypted_payload)
logger.info(f"Session {session_id} authenticated and captured. Stored in Redis.")
MetricsCollector.record_auth_success()
except Exception as e:
logger.error(f"Auth failed: {e}")
MetricsCollector.record_auth_failure(str(e))
await self.recovery.handle_invalidation(session_id, "auth_failure")
async def handle_extract(self, url: str, session_id: str):
try:
# Load Binary Session State
redis_key = f"session:{session_id}:state"
payload = await self.redis_raw.get(redis_key)
if not payload:
raise ValueError(f"Session {session_id} not found in Redis (expired or never created).")
state = SessionState.deserialize(payload)
# Execute Headless-Plus Extraction
async with CurlClient(state) as client:
html = await client.fetch(url)
logger.info(f"Extracted {len(html)} bytes from {url} using Session {session_id}")
MetricsCollector.record_extraction("success")
except Exception as e:
logger.error(f"Extraction failed: {e}")
MetricsCollector.record_extraction("failure")
if __name__ == "__main__":
worker = TaskWorker("redis://redis:6379")
asyncio.run(worker.start())