import asyncio import json import logging import re from datetime import datetime, timezone from pathlib import Path from typing import Any from api.config import DEVICE, SCAN_DIR, SCAN_TIMEOUT, SCRIPT_PATH log = logging.getLogger(__name__) class ScannerBusyError(Exception): pass class ScannerUnavailableError(Exception): pass class ScannerTimeoutError(Exception): pass class ScannerManager: def __init__(self) -> None: self._lock = asyncio.Lock() self._scanning = False self._last_result: dict[str, Any] | None = None self._current_scan_info: dict[str, str] | None = None @property def is_scanning(self) -> bool: return self._scanning @property def last_result(self) -> dict[str, Any] | None: return self._last_result @property def current_scan_info(self) -> dict[str, str] | None: return self._current_scan_info async def start_scan( self, mode: str, resolution: int, language: str, output: str | None = None, ) -> str: if self._scanning: raise ScannerBusyError("A scan is already in progress") SCAN_DIR.mkdir(parents=True, exist_ok=True) if output is None: ts = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H-%M-%S") output = str(SCAN_DIR / f"scan_{ts}.pdf") self._scanning = True self._current_scan_info = { "output": output, "mode": mode, "resolution": str(resolution), "started_at": datetime.now(timezone.utc).isoformat(), } asyncio.create_task(self._run_scan(mode, resolution, language, output)) return output async def _run_scan( self, mode: str, resolution: int, language: str, output: str, ) -> None: async with self._lock: try: proc = await asyncio.create_subprocess_exec( "bash", str(SCRIPT_PATH), "--mode", mode, "--resolution", str(resolution), "--language", language, "--output", output, "--overwrite-output-file", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=SCAN_TIMEOUT ) except asyncio.TimeoutError: proc.kill() await proc.communicate() log.error("Scan timed out after %ds", SCAN_TIMEOUT) self._last_result = {"status": "timeout", "output": output} return if proc.returncode != 0: log.error("scan.sh failed: %s", stderr.decode()) json_path = Path(output.replace(".pdf", ".json")) if json_path.exists(): self._last_result = json.loads(json_path.read_text()) self._last_result["output"] = output else: self._last_result = { "status": "failed", "output": output, "returncode": proc.returncode, } except Exception: log.exception("Unexpected error during scan") self._last_result = {"status": "error", "output": output} finally: self._scanning = False self._current_scan_info = None async def check_paper(self) -> dict[str, bool | str]: if self._scanning: raise ScannerBusyError( "Cannot check paper while a scan is in progress" ) async with self._lock: try: proc = await asyncio.create_subprocess_exec( "scanimage", "-A", "--device-name", DEVICE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=10 ) except asyncio.TimeoutError: raise ScannerTimeoutError("scanimage -A timed out") except FileNotFoundError: raise ScannerUnavailableError("scanimage not found") if proc.returncode != 0: raise ScannerUnavailableError( f"scanimage failed: {stderr.decode().strip()}" ) output_text = stdout.decode() match = re.search( r"--page-loaded\[=\(yes\|no\)\]\s+\[(yes|no)\]", output_text ) paper_loaded = match.group(1) == "yes" if match else False return {"paper_loaded": paper_loaded, "raw": output_text}