"""Web UI for the Days Without Strife planner (see solve.py). A dependency-free (stdlib only) HTTP server that exposes every input the solver accepts: turns, starting resources, cities, agents, scoring terms (linear or log), resource constraints and the misc Problem knobs. For log-scored terms the user supplies a JavaScript expression that evals into a single-argument function (e.g. ``(x) => Math.log2(x)``). The browser evals it once, then calls it over the amounts it needs (0..max_resource) to build the ``log_mapping`` lookup table, which is sent to the server as a plain array. """ from __future__ import annotations import json import os import select import socket import threading from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from urllib.parse import parse_qs, urlsplit from solve import problem_from_dict, solve, solution_to_dict # The UI is a single static page served from index.html next to this module. INDEX_HTML = (Path(__file__).resolve().parent / "index.html").read_text(encoding="utf-8") # Only one solve may run at a time across all viewers. While it's held, other # clients get an immediate 429 from /solve and disable their button (they learn # the busy state by polling /status). _solve_lock = threading.Lock() # Tracks the in-flight solve so /cancel can stop it. When a tab closes, the # browser fires navigator.sendBeacon("/cancel?token=...") — a normal small # request that survives the close and passes cleanly through reverse proxies, # unlike a dropped upstream socket (which the proxy keeps alive). Guarded by # _active_lock; only one solve runs at a time, but the lock keeps the token / # solver handoff race-free against a concurrent /cancel. _active_lock = threading.Lock() _active = {"token": None, "solver": None} def _stop_search(solver): # StopSearch() exists in OR-Tools 9.x+; degrade gracefully on older builds # (the solve then just runs to max_time_seconds). stop = getattr(solver, "StopSearch", None) if callable(stop): stop() class Handler(BaseHTTPRequestHandler): def _send(self, code, body, content_type="application/json", no_cache=False): if isinstance(body, str): body = body.encode("utf-8") self.send_response(code) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) if no_cache: self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(body) def do_GET(self): if self.path in ("/", "/index.html"): self._send(200, INDEX_HTML, "text/html; charset=utf-8") elif self.path == "/status": self._send(200, json.dumps({"solving": _solve_lock.locked()}), no_cache=True) else: self._send(404, json.dumps({"error": "not found"})) def _client_gone(self): # True once the peer has closed its end. We've already consumed the # request body, so any readable data here is EOF (b"") for a closed tab. sock = self.connection try: r, _, _ = select.select([sock], [], [], 0) if r: return sock.recv(1, socket.MSG_PEEK) == b"" except OSError: return True return False def do_POST(self): path = urlsplit(self.path) if path.path == "/cancel": self._handle_cancel(parse_qs(path.query)) return if path.path != "/solve": self._send(404, json.dumps({"error": "not found"})) return # Reject immediately if another viewer is already solving. if not _solve_lock.acquire(blocking=False): self._send(429, json.dumps({"error": "Another solve is already in progress"})) return released = False token = None try: length = int(self.headers.get("Content-Length", 0)) payload = json.loads(self.rfile.read(length) or b"{}") problem = problem_from_dict(payload.get("problem", {})) max_time = float(payload.get("max_time_seconds", 30.0)) token = str(payload.get("token") or "") # Mark this token as the in-flight solve so a matching /cancel beacon # (sent when the requesting tab closes) can stop it. with _active_lock: _active["token"] = token _active["solver"] = None # Run the (blocking) solve in a worker thread so this thread is free # to watch the socket. solver_sink hands us the CpSolver so both the # local disconnect watcher and /cancel can abort the search. result = {} def register(s): with _active_lock: if _active["token"] == token: _active["solver"] = s def run(): try: result["sol"] = solve( problem, max_time_seconds=max_time, solver_sink=register, ) except Exception as exc: result["err"] = exc worker = threading.Thread(target=run, daemon=True) worker.start() while worker.is_alive(): worker.join(0.25) # Direct connections (no proxy) still get fast release via socket # EOF; behind a proxy the /cancel beacon is what stops the solve. if worker.is_alive() and self._client_gone(): with _active_lock: if _active["solver"] is not None: _stop_search(_active["solver"]) _solve_lock.release() released = True return if "err" in result: raise result["err"] self._send(200, json.dumps(solution_to_dict(result["sol"]))) except Exception as exc: # surface errors to the browser self._send(400, json.dumps({"error": f"{type(exc).__name__}: {exc}"})) finally: with _active_lock: if _active["token"] == token: _active["token"] = None _active["solver"] = None if not released: _solve_lock.release() def _handle_cancel(self, query): # Stop the in-flight solve iff the beacon's token matches it, so a closing # tab can't cancel a *different* viewer's solve. The running /solve handler # then returns normally and releases the lock. token = (query.get("token") or [""])[0] stopped = False with _active_lock: if token and token == _active["token"] and _active["solver"] is not None: _stop_search(_active["solver"]) stopped = True self._send(200, json.dumps({"cancelled": stopped}), no_cache=True) def log_message(self, fmt, *args): # quieter console pass def main(): host = os.environ.get("HOST", "127.0.0.1") port = int(os.environ.get("PORT", "8000")) server = ThreadingHTTPServer((host, port), Handler) print(f"Days Without Strife planner UI: http://{host}:{port}") try: server.serve_forever() except KeyboardInterrupt: print("\nshutting down") server.shutdown() if __name__ == "__main__": main()