dws-city-res-solve/main.py
Pagwin f52871318e Solves are Queued
This has some nice improvements in the case I want to batch a bunch of
stuff overnight.
2026-06-18 23:16:18 -04:00

362 lines
15 KiB
Python

"""Web UI for the Days Without Strife planner (see solve.py).
A dependency-free (stdlib only) HTTP server that exposes every input the
solver accepts: turns, starting resources, cities, agents, scoring terms
(linear or log), resource constraints and the misc Problem knobs.
For log-scored terms the user supplies a JavaScript expression that evals into
a single-argument function (e.g. ``(x) => Math.log2(x)``). The browser evals it
once, then calls it over the amounts it needs (0..max_resource) to build the
``log_mapping`` lookup table, which is sent to the server as a plain array.
"""
from __future__ import annotations
import json
import os
import sqlite3
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import parse_qs, unquote, urlsplit
from solve import problem_from_dict, solve, solution_to_dict
# The UI is a single static page served from index.html next to this module.
INDEX_HTML = (Path(__file__).resolve().parent / "index.html").read_text(encoding="utf-8")
# Completed solves are persisted to SQLite so they can be looked up later by
# their UUID (GET /solve/<uuid>) and shared via /?solve=<uuid> deep links. Point
# DB_PATH at a mounted volume so history survives container restarts/redeploys.
# Old rows are evicted so the DB stays bounded (keep the newest MAX_SOLVES).
DB_PATH = os.environ.get(
"DB_PATH", str(Path(__file__).resolve().parent / "data" / "solves.db"))
MAX_SOLVES = int(os.environ.get("MAX_SOLVES", "500"))
def _db():
conn = sqlite3.connect(DB_PATH, timeout=5.0)
# WAL lets the concurrent /solve/<uuid> readers run alongside a writer.
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
return conn
def init_db():
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
conn = _db()
try:
with conn:
conn.execute(
"CREATE TABLE IF NOT EXISTS solves ("
"token TEXT PRIMARY KEY, ts REAL NOT NULL, status TEXT, "
"problem TEXT NOT NULL, solution TEXT NOT NULL)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_solves_ts ON solves(ts)")
finally:
conn.close()
def store_solve(token, problem, solution):
# token is client-generated; skip blanks. INSERT OR REPLACE means a repeated
# token overwrites (a client could clobber its own/another's row — acceptable
# for this app; switch to a server-issued id if that ever matters).
if not token:
return
conn = _db()
try:
with conn:
conn.execute(
"INSERT OR REPLACE INTO solves (token, ts, status, problem, solution) "
"VALUES (?, ?, ?, ?, ?)",
(token, time.time(), solution.get("status"),
json.dumps(problem), json.dumps(solution)))
conn.execute(
"DELETE FROM solves WHERE token NOT IN "
"(SELECT token FROM solves ORDER BY ts DESC LIMIT ?)",
(MAX_SOLVES,))
finally:
conn.close()
def fetch_solve(token):
conn = _db()
try:
row = conn.execute(
"SELECT token, ts, status, problem, solution FROM solves WHERE token = ?",
(token,)).fetchone()
finally:
conn.close()
if row is None:
return None
return {"token": row[0], "ts": row[1], "status": row[2],
"problem": json.loads(row[3]), "solution": json.loads(row[4])}
# Solves are queued and run one at a time by a single background worker, so any
# number of viewers can pile requests on without being rejected — each request
# returns immediately with a queue position and the client polls /job/<token>
# for progress. Everything below is guarded by _cond (a Condition whose lock
# also protects _jobs/_queue/_active); the worker waits on it for new work.
_cond = threading.Condition()
# token -> {"status", "problem", "raw_problem", "max_time", "error"}.
# status is one of: queued, running, done, error, cancelled.
_jobs = {}
# Tokens waiting to run, in order.
_queue = []
# The in-flight solve so /cancel can stop it. /cancel is hit two ways: the user
# clicking a card's Cancel button (a normal fetch, tab stays open) and a closing
# tab firing navigator.sendBeacon("/cancel?token=...") — both small requests
# that pass cleanly through reverse proxies. For a still-queued token /cancel
# just drops it from the queue; for the running token it stops the search.
_active = {"token": None, "solver": None}
# Bound the in-memory job map: keep at most this many terminal (done/error/
# cancelled) entries. Completed solves still live in SQLite, so a pruned "done"
# job degrades gracefully — /job falls back to the DB and still reports done.
MAX_TERMINAL_JOBS = 100
def _prune_jobs():
# Caller holds _cond. Drop the oldest terminal jobs beyond the cap.
terminal = [t for t, j in _jobs.items()
if j["status"] in ("done", "error", "cancelled")]
for t in terminal[:max(0, len(terminal) - MAX_TERMINAL_JOBS)]:
_jobs.pop(t, None)
def _solve_worker():
# Run queued solves one at a time, forever. Started as a daemon in main().
while True:
with _cond:
while not _queue:
_cond.wait()
token = _queue.pop(0)
job = _jobs.get(token)
if job is None or job["status"] != "queued":
continue # cancelled (or vanished) before it ran
job["status"] = "running"
problem = job["problem"]
raw_problem = job["raw_problem"]
max_time = job["max_time"]
_active["token"] = token
_active["solver"] = None
_cond.notify_all() # wake /job_status streams watching this token
def register(s):
with _cond:
if _active["token"] == token:
_active["solver"] = s
try:
sol = solve(problem, max_time_seconds=max_time, solver_sink=register)
sol_dict = solution_to_dict(sol)
# Persist before flipping to "done" so a client that sees "done"
# can always fetch the solution. A cancelled-midway solve returns
# the best plan found so far and is stored like any other.
store_solve(token, raw_problem, sol_dict)
with _cond:
job["status"] = "done"
_cond.notify_all()
except Exception as exc:
with _cond:
job["status"] = "error"
job["error"] = f"{type(exc).__name__}: {exc}"
_cond.notify_all()
finally:
with _cond:
_active["token"] = None
_active["solver"] = None
def _stop_search(solver):
# StopSearch() exists in OR-Tools 9.x+; degrade gracefully on older builds
# (the solve then just runs to max_time_seconds).
stop = getattr(solver, "StopSearch", None)
if callable(stop):
stop()
class Handler(BaseHTTPRequestHandler):
def _send(self, code, body, content_type="application/json", no_cache=False):
if isinstance(body, str):
body = body.encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
if no_cache:
self.send_header("Cache-Control", "no-store")
self.end_headers()
self.wfile.write(body)
def do_GET(self):
path = urlsplit(self.path).path
if path in ("/", "/index.html"):
self._send(200, INDEX_HTML, "text/html; charset=utf-8")
elif path == "/job_status":
tokens = parse_qs(urlsplit(self.path).query).get("tokens", [""])[0]
self._handle_job_stream(tokens)
elif path.startswith("/job/"):
self._send(200, json.dumps(self._job_state(unquote(path[len("/job/"):]))),
no_cache=True)
elif path.startswith("/solve/"):
token = unquote(path[len("/solve/"):])
rec = fetch_solve(token)
if rec is None:
self._send(404, json.dumps({"error": "not found"}), no_cache=True)
else:
self._send(200, json.dumps(rec), no_cache=True)
else:
self._send(404, json.dumps({"error": "not found"}))
def _job_state(self, token):
# Report a queued/running solve's live state, falling back to SQLite for
# a finished (or evicted-from-memory) one. The client polls this to drive
# each pending card: queued -> running -> done (render) / error / cancelled.
with _cond:
job = _jobs.get(token)
if job is not None:
status = job["status"]
if status == "queued":
pos = _queue.index(token) if token in _queue else 0
return {"status": "queued", "position": pos}
if status == "running":
return {"status": "running"}
if status == "error":
return {"status": "error", "error": job.get("error")}
if status == "cancelled":
return {"status": "cancelled"}
# status == "done": fall through to load the stored solution.
rec = fetch_solve(token)
if rec is not None:
return {"status": "done", "solution": rec["solution"]}
return {"status": "unknown"}
# Statuses a job can't move on from — once a tracked token reaches one we
# send it a final time and stop watching it.
_TERMINAL = ("done", "error", "cancelled", "unknown")
def _handle_job_stream(self, tokens_csv):
# Server-Sent Events stream for one or more tokens
# (GET /job_status?tokens=t1,t2,…). Emits a `data:` event per token
# whenever its state changes (queued/position -> running -> done/…),
# blocking on _cond between changes rather than busy-polling, and drops
# each token once it's terminal; the stream closes when all are done.
remaining, seen = [], set()
for t in (s.strip() for s in tokens_csv.split(",")):
if t and t not in seen:
seen.add(t)
remaining.append(t)
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-store")
self.send_header("Connection", "close")
# Tell nginx & friends not to buffer the stream (see reverse-proxy notes).
self.send_header("X-Accel-Buffering", "no")
self.end_headers()
last = {}
try:
while remaining:
# Send any token whose state changed since we last reported it,
# and stop tracking the ones that have reached a terminal state.
for token in list(remaining):
state = self._job_state(token)
payload = json.dumps({"token": token, **state})
if last.get(token) != payload:
last[token] = payload
self.wfile.write(b"data: " + payload.encode("utf-8") + b"\n\n")
self.wfile.flush()
if state["status"] in self._TERMINAL:
remaining.remove(token)
if not remaining:
break
# Block until a job changes state (worker/cancel call notify_all);
# on the periodic timeout send a comment as a keep-alive heartbeat.
with _cond:
notified = _cond.wait(timeout=15)
if not notified:
self.wfile.write(b": ping\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError, OSError):
return # client closed the EventSource; let the thread end
def do_POST(self):
path = urlsplit(self.path)
if path.path == "/cancel":
self._handle_cancel(parse_qs(path.query))
return
if path.path != "/solve":
self._send(404, json.dumps({"error": "not found"}))
return
# Validate and enqueue, then return immediately with a queue position.
# The single background worker runs queued solves one at a time; the
# client follows progress over /job_status (SSE) or by polling /job/<token>.
try:
length = int(self.headers.get("Content-Length", 0))
payload = json.loads(self.rfile.read(length) or b"{}")
raw_problem = payload.get("problem", {})
problem = problem_from_dict(raw_problem) # surfaces bad input now
max_time = float(payload.get("max_time_seconds", 30.0))
token = str(payload.get("token") or "")
if not token:
self._send(400, json.dumps({"error": "missing token"}))
return
with _cond:
_prune_jobs()
_jobs[token] = {
"status": "queued", "problem": problem,
"raw_problem": raw_problem, "max_time": max_time, "error": None,
}
_queue.append(token)
position = len(_queue) - 1
_cond.notify_all()
self._send(202, json.dumps({"status": "queued", "position": position}),
no_cache=True)
except Exception as exc: # surface errors to the browser
self._send(400, json.dumps({"error": f"{type(exc).__name__}: {exc}"}))
def _handle_cancel(self, query):
# Cancel iff the request's token matches a queued/running solve, so a
# Cancel click (or closing tab) can't cancel a *different* viewer's
# solve. A still-queued token is simply dropped from the queue; the
# running token has its search stopped (the worker then stores the best
# plan found so far and the job flips to "done").
token = (query.get("token") or [""])[0]
result = "none"
with _cond:
if token and token in _queue:
_queue.remove(token)
job = _jobs.get(token)
if job is not None:
job["status"] = "cancelled"
result = "dequeued"
_cond.notify_all()
elif token and token == _active["token"] and _active["solver"] is not None:
_stop_search(_active["solver"])
result = "stopped"
self._send(200, json.dumps({"cancelled": result}), no_cache=True)
def log_message(self, fmt, *args): # quieter console
pass
def main():
init_db()
threading.Thread(target=_solve_worker, daemon=True).start()
host = os.environ.get("HOST", "127.0.0.1")
port = int(os.environ.get("PORT", "8000"))
server = ThreadingHTTPServer((host, port), Handler)
print(f"Days Without Strife planner UI: http://{host}:{port}")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nshutting down")
server.shutdown()
if __name__ == "__main__":
main()