Files
scan-adf/api/scanner.py
2026-02-08 20:48:59 +01:00

156 lines
4.9 KiB
Python

import asyncio
import json
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from api.config import DEVICE, SCAN_DIR, SCAN_TIMEOUT, SCRIPT_PATH
log = logging.getLogger(__name__)
class ScannerBusyError(Exception):
pass
class ScannerUnavailableError(Exception):
pass
class ScannerTimeoutError(Exception):
pass
class ScannerManager:
def __init__(self) -> None:
self._lock = asyncio.Lock()
self._scanning = False
self._last_result: dict[str, Any] | None = None
self._current_scan_info: dict[str, str] | None = None
@property
def is_scanning(self) -> bool:
return self._scanning
@property
def last_result(self) -> dict[str, Any] | None:
return self._last_result
@property
def current_scan_info(self) -> dict[str, str] | None:
return self._current_scan_info
async def start_scan(
self,
mode: str,
resolution: int,
language: str,
output: str | None = None,
) -> str:
if self._scanning:
raise ScannerBusyError("A scan is already in progress")
SCAN_DIR.mkdir(parents=True, exist_ok=True)
if output is None:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H-%M-%S")
output = str(SCAN_DIR / f"scan_{ts}.pdf")
self._scanning = True
self._current_scan_info = {
"output": output,
"mode": mode,
"resolution": str(resolution),
"started_at": datetime.now(timezone.utc).isoformat(),
}
asyncio.create_task(self._run_scan(mode, resolution, language, output))
return output
async def _run_scan(
self,
mode: str,
resolution: int,
language: str,
output: str,
) -> None:
async with self._lock:
try:
proc = await asyncio.create_subprocess_exec(
"bash",
str(SCRIPT_PATH),
"--mode", mode,
"--resolution", str(resolution),
"--language", language,
"--output", output,
"--overwrite-output-file",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=SCAN_TIMEOUT
)
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
log.error("Scan timed out after %ds", SCAN_TIMEOUT)
self._last_result = {"status": "timeout", "output": output}
return
if proc.returncode != 0:
log.error("scan.sh failed: %s", stderr.decode())
json_path = Path(output.replace(".pdf", ".json"))
if json_path.exists():
self._last_result = json.loads(json_path.read_text())
self._last_result["output"] = output
else:
self._last_result = {
"status": "failed",
"output": output,
"returncode": proc.returncode,
}
except Exception:
log.exception("Unexpected error during scan")
self._last_result = {"status": "error", "output": output}
finally:
self._scanning = False
self._current_scan_info = None
async def check_paper(self) -> dict[str, bool | str]:
if self._scanning:
raise ScannerBusyError(
"Cannot check paper while a scan is in progress"
)
async with self._lock:
try:
proc = await asyncio.create_subprocess_exec(
"scanimage", "-A", "--device-name", DEVICE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=10
)
except asyncio.TimeoutError:
raise ScannerTimeoutError("scanimage -A timed out")
except FileNotFoundError:
raise ScannerUnavailableError("scanimage not found")
if proc.returncode != 0:
raise ScannerUnavailableError(
f"scanimage failed: {stderr.decode().strip()}"
)
output_text = stdout.decode()
match = re.search(
r"--page-loaded\[=\(yes\|no\)\]\s+\[(yes|no)\]", output_text
)
paper_loaded = match.group(1) == "yes" if match else False
return {"paper_loaded": paper_loaded, "raw": output_text}