Add api server2

2
This commit is contained in:
2026-02-08 20:48:59 +01:00
parent 627a730191
commit 5a2fdd65d0
14 changed files with 328 additions and 0 deletions

155
api/scanner.py Normal file
View File

@@ -0,0 +1,155 @@
import asyncio
import json
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from api.config import DEVICE, SCAN_DIR, SCAN_TIMEOUT, SCRIPT_PATH
log = logging.getLogger(__name__)
class ScannerBusyError(Exception):
pass
class ScannerUnavailableError(Exception):
pass
class ScannerTimeoutError(Exception):
pass
class ScannerManager:
def __init__(self) -> None:
self._lock = asyncio.Lock()
self._scanning = False
self._last_result: dict[str, Any] | None = None
self._current_scan_info: dict[str, str] | None = None
@property
def is_scanning(self) -> bool:
return self._scanning
@property
def last_result(self) -> dict[str, Any] | None:
return self._last_result
@property
def current_scan_info(self) -> dict[str, str] | None:
return self._current_scan_info
async def start_scan(
self,
mode: str,
resolution: int,
language: str,
output: str | None = None,
) -> str:
if self._scanning:
raise ScannerBusyError("A scan is already in progress")
SCAN_DIR.mkdir(parents=True, exist_ok=True)
if output is None:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H-%M-%S")
output = str(SCAN_DIR / f"scan_{ts}.pdf")
self._scanning = True
self._current_scan_info = {
"output": output,
"mode": mode,
"resolution": str(resolution),
"started_at": datetime.now(timezone.utc).isoformat(),
}
asyncio.create_task(self._run_scan(mode, resolution, language, output))
return output
async def _run_scan(
self,
mode: str,
resolution: int,
language: str,
output: str,
) -> None:
async with self._lock:
try:
proc = await asyncio.create_subprocess_exec(
"bash",
str(SCRIPT_PATH),
"--mode", mode,
"--resolution", str(resolution),
"--language", language,
"--output", output,
"--overwrite-output-file",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=SCAN_TIMEOUT
)
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
log.error("Scan timed out after %ds", SCAN_TIMEOUT)
self._last_result = {"status": "timeout", "output": output}
return
if proc.returncode != 0:
log.error("scan.sh failed: %s", stderr.decode())
json_path = Path(output.replace(".pdf", ".json"))
if json_path.exists():
self._last_result = json.loads(json_path.read_text())
self._last_result["output"] = output
else:
self._last_result = {
"status": "failed",
"output": output,
"returncode": proc.returncode,
}
except Exception:
log.exception("Unexpected error during scan")
self._last_result = {"status": "error", "output": output}
finally:
self._scanning = False
self._current_scan_info = None
async def check_paper(self) -> dict[str, bool | str]:
if self._scanning:
raise ScannerBusyError(
"Cannot check paper while a scan is in progress"
)
async with self._lock:
try:
proc = await asyncio.create_subprocess_exec(
"scanimage", "-A", "--device-name", DEVICE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=10
)
except asyncio.TimeoutError:
raise ScannerTimeoutError("scanimage -A timed out")
except FileNotFoundError:
raise ScannerUnavailableError("scanimage not found")
if proc.returncode != 0:
raise ScannerUnavailableError(
f"scanimage failed: {stderr.decode().strip()}"
)
output_text = stdout.decode()
match = re.search(
r"--page-loaded\[=\(yes\|no\)\]\s+\[(yes|no)\]", output_text
)
paper_loaded = match.group(1) == "yes" if match else False
return {"paper_loaded": paper_loaded, "raw": output_text}