Solves are Queued
This has some nice improvements in the case I want to batch a bunch of stuff overnight.
This commit is contained in:
parent
ae7fef5d7b
commit
f52871318e
2 changed files with 419 additions and 233 deletions
370
index.html
370
index.html
|
|
@ -70,26 +70,6 @@
|
|||
font-weight: 600;
|
||||
}
|
||||
|
||||
#solveBtn:disabled {
|
||||
opacity: 50%;
|
||||
background: #888;
|
||||
}
|
||||
|
||||
#busy {
|
||||
font-size: 1rem;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
/* This tab is the one solving. */
|
||||
#busy.busy-self {
|
||||
color: #16a34a;
|
||||
}
|
||||
|
||||
/* Another viewer holds the solve lock. */
|
||||
#busy.busy-other {
|
||||
color: #d97706;
|
||||
}
|
||||
|
||||
.mini {
|
||||
padding: .1rem .45rem;
|
||||
font-size: .85rem;
|
||||
|
|
@ -285,10 +265,41 @@
|
|||
padding: .3rem 0;
|
||||
}
|
||||
|
||||
.pending {
|
||||
opacity: .75;
|
||||
/* A reserved slot for a queued/running solve: dashed to read as "not done
|
||||
yet", holding a spinner, its share link, and a Cancel button. */
|
||||
.solution-pending {
|
||||
border: 1px dashed #8886;
|
||||
border-radius: 8px;
|
||||
padding: .4rem .8rem;
|
||||
margin-top: .8rem;
|
||||
}
|
||||
|
||||
.solution-pending.err {
|
||||
border-color: #dc2626;
|
||||
}
|
||||
|
||||
.pending-head {
|
||||
font-weight: 600;
|
||||
padding: .3rem 0;
|
||||
}
|
||||
|
||||
.spinner {
|
||||
display: inline-block;
|
||||
width: .9em;
|
||||
height: .9em;
|
||||
border: 2px solid #8886;
|
||||
border-top-color: #2563eb;
|
||||
border-radius: 50%;
|
||||
animation: spin .8s linear infinite;
|
||||
vertical-align: -.1em;
|
||||
margin-right: .45rem;
|
||||
}
|
||||
|
||||
@keyframes spin {
|
||||
to {
|
||||
transform: rotate(360deg);
|
||||
}
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
|
||||
|
|
@ -386,8 +397,8 @@
|
|||
|
||||
<p>
|
||||
<button id="solveBtn" class="primary" type="button" onclick="run()">Solve</button>
|
||||
<button id="cancelBtn" type="button" onclick="cancelSolve()" style="display:none;margin-left:.4rem">Cancel</button>
|
||||
<span id="busy" class="help" style="margin-left:.6rem"></span>
|
||||
<button id="shareAllBtn" type="button" onclick="shareVisible()" style="margin-left:.4rem">Share
|
||||
visible solutions</button>
|
||||
</p>
|
||||
|
||||
<div id="error" class="err"></div>
|
||||
|
|
@ -856,127 +867,208 @@
|
|||
// them in request order.
|
||||
let solutionCount = 0;
|
||||
|
||||
// Whether *this* tab is the one currently solving. The /status poll only
|
||||
// disables the button for OTHER viewers, so the solver keeps its button
|
||||
// state under run()'s own control.
|
||||
let solvingHere = false;
|
||||
// Live tracking of this tab's queued/running solves.
|
||||
// token -> {token, card, n, confirmed}. Each gets a reserved "pending" card
|
||||
// on the page right away; a shared /job_status event stream drives it through
|
||||
// queued -> running -> done (rendered) / error / cancelled.
|
||||
const pending = new Map();
|
||||
|
||||
// Token identifying this tab's in-flight solve. When the tab closes mid
|
||||
// solve we beacon /cancel?token=... so the server stops it and frees the
|
||||
// lock immediately — reliable even behind a reverse proxy, which keeps
|
||||
// the upstream socket open and so hides the disconnect from the server.
|
||||
let activeToken = null;
|
||||
function uuid() {
|
||||
return crypto.randomUUID ? crypto.randomUUID()
|
||||
: String(Date.now()) + Math.random();
|
||||
}
|
||||
|
||||
function cancelActiveSolve() {
|
||||
if (activeToken && navigator.sendBeacon) {
|
||||
navigator.sendBeacon("/cancel?token=" + encodeURIComponent(activeToken));
|
||||
// A share URL for a set of solve tokens. The schema is
|
||||
// ?solves=<id1>,<id2>,… so anyone who knows the ids can compose an
|
||||
// arbitrary set; a single ?solve=<id> link is also still understood.
|
||||
function shareUrl(tokens) {
|
||||
return location.origin + location.pathname +
|
||||
"?solves=" + tokens.map(encodeURIComponent).join(",");
|
||||
}
|
||||
|
||||
// A "copy link" button that briefly confirms after writing to the clipboard.
|
||||
function copyLinkButton(label, getUrl) {
|
||||
return el("button", {
|
||||
class: "mini", type: "button",
|
||||
onclick: (ev) => {
|
||||
navigator.clipboard?.writeText(getUrl());
|
||||
const orig = ev.target.textContent;
|
||||
ev.target.textContent = "Copied!";
|
||||
setTimeout(() => {ev.target.textContent = orig;}, 1500);
|
||||
},
|
||||
}, label);
|
||||
}
|
||||
|
||||
// Copy a single link sharing every solution currently on the page (pending
|
||||
// ones included — their links resolve once the solve finishes).
|
||||
function shareVisible() {
|
||||
const tokens = [...document.querySelectorAll("#output [data-token]")]
|
||||
.map(e => e.dataset.token);
|
||||
const btn = document.getElementById("shareAllBtn");
|
||||
const reset = () => setTimeout(() => {btn.textContent = "Share visible solutions";}, 1500);
|
||||
if (!tokens.length) {btn.textContent = "Nothing to share"; reset(); return;}
|
||||
navigator.clipboard?.writeText(shareUrl(tokens));
|
||||
btn.textContent = "Link copied!";
|
||||
reset();
|
||||
}
|
||||
|
||||
// Build the reserved card for a queued/running solve: a spinner + status,
|
||||
// the (already valid) share link, and a Cancel button.
|
||||
function makePendingCard(token, n) {
|
||||
const card = el("div", {class: "solution-pending"});
|
||||
card.dataset.token = token;
|
||||
const status = el("span", {}, "Queued…");
|
||||
card._status = status;
|
||||
const actions = el("p", {}, [
|
||||
copyLinkButton("Copy share link", () => shareUrl([token])), " ",
|
||||
el("button", {
|
||||
class: "mini", type: "button",
|
||||
onclick: () => cancelPending(token),
|
||||
}, "Cancel"),
|
||||
]);
|
||||
card._actions = actions;
|
||||
card.append(
|
||||
el("div", {class: "pending-head"},
|
||||
[el("span", {class: "spinner"}), `Solution ${n} — `, status]),
|
||||
actions);
|
||||
return card;
|
||||
}
|
||||
|
||||
// Cancel a pending solve: drop it from the queue or stop the running search.
|
||||
// The stream then reports "cancelled" (card removed) or, for a stopped
|
||||
// running solve, "done" with the best plan found so far (card rendered).
|
||||
function cancelPending(token) {
|
||||
const entry = pending.get(token);
|
||||
if (entry) entry.card._status.textContent = "Cancelling…";
|
||||
fetch("/cancel?token=" + encodeURIComponent(token), {method: "POST"})
|
||||
.catch(() => {/* the stream reflects the outcome regardless */});
|
||||
}
|
||||
|
||||
// Stop tracking a pending solve and run a final action on its card. Once
|
||||
// nothing is pending, close the event stream so it doesn't reconnect.
|
||||
function finishPending(token, action) {
|
||||
const entry = pending.get(token);
|
||||
if (!entry) return;
|
||||
pending.delete(token);
|
||||
action(entry);
|
||||
if (pending.size === 0 && stream) {stream.close(); stream = null;}
|
||||
}
|
||||
|
||||
// Put a pending card into a terminal state: show the message, drop the
|
||||
// spinner & cancel, and offer a Dismiss button. `isError` tints it red.
|
||||
function finalizeCard(token, msg, isError = false) {
|
||||
finishPending(token, (entry) => {
|
||||
entry.card.classList.toggle("err", isError);
|
||||
entry.card.querySelector(".spinner")?.remove(); // it's no longer loading
|
||||
entry.card._status.textContent = msg;
|
||||
entry.card._actions.replaceChildren(el("button", {
|
||||
class: "mini", type: "button",
|
||||
onclick: () => entry.card.remove(),
|
||||
}, "Dismiss"));
|
||||
});
|
||||
}
|
||||
function pendingError(token, msg) {finalizeCard(token, msg, true);}
|
||||
|
||||
// A single Server-Sent Events stream carries progress for every pending
|
||||
// solve at once: /job_status?tokens=t1,t2,… It's rebuilt whenever the set
|
||||
// of confirmed (enqueued) solves changes, and closed when none remain.
|
||||
let stream = null;
|
||||
function syncStream() {
|
||||
if (stream) {stream.close(); stream = null;}
|
||||
const tokens = [...pending.values()].filter(e => e.confirmed).map(e => e.token);
|
||||
if (!tokens.length) return;
|
||||
stream = new EventSource(
|
||||
"/job_status?tokens=" + tokens.map(encodeURIComponent).join(","));
|
||||
stream.onmessage = (ev) => handleJobEvent(JSON.parse(ev.data));
|
||||
// On a transient network error EventSource auto-reconnects to the same
|
||||
// URL; we explicitly close it (in finishPending) once all solves finish.
|
||||
stream.onerror = () => {/* let the browser retry */};
|
||||
}
|
||||
|
||||
// Advance one pending solve's card from a streamed state update.
|
||||
function handleJobEvent(j) {
|
||||
const entry = pending.get(j.token);
|
||||
if (!entry) return; // already finished/dismissed
|
||||
switch (j.status) {
|
||||
case "queued":
|
||||
entry.card._status.textContent =
|
||||
j.position > 0 ? `Queued (${j.position} ahead)…` : "Queued (next up)…";
|
||||
break;
|
||||
case "running":
|
||||
entry.card._status.textContent = "Solving…";
|
||||
break;
|
||||
case "done":
|
||||
finishPending(j.token, (e) =>
|
||||
renderSolution(j.solution, e.card, j.token, e.n));
|
||||
break;
|
||||
case "cancelled":
|
||||
// Leave a visible "cancelled" card (whether this tab cancelled
|
||||
// it or it was cancelled elsewhere) rather than silently vanishing.
|
||||
finalizeCard(j.token, "This solve was cancelled.");
|
||||
break;
|
||||
case "error":
|
||||
pendingError(j.token, "Error: " + (j.error || "solve failed"));
|
||||
break;
|
||||
default: // "unknown": no such job/solve (evicted, or server restarted)
|
||||
pendingError(j.token,
|
||||
`Solve ${j.token} not found (it may have been evicted or the server restarted).`);
|
||||
}
|
||||
}
|
||||
// pagehide fires on tab close / navigation away (not on mere tab switch,
|
||||
// so we don't cancel a solve the user backgrounds and comes back to).
|
||||
window.addEventListener("pagehide", cancelActiveSolve);
|
||||
|
||||
// Cancel this tab's in-flight solve without closing the tab: POST /cancel
|
||||
// so the server stops the search. The running /solve then returns with the
|
||||
// best plan found so far (or an empty/infeasible one), which run() renders.
|
||||
function cancelSolve() {
|
||||
if (!activeToken) return;
|
||||
const btn = document.getElementById("cancelBtn");
|
||||
btn.disabled = true;
|
||||
btn.textContent = "Cancelling…";
|
||||
fetch("/cancel?token=" + encodeURIComponent(activeToken), {method: "POST"})
|
||||
.catch(() => {/* the solve still returns; nothing to do here */});
|
||||
}
|
||||
// Toggle the Cancel button's visibility, resetting its label/disabled state.
|
||||
function showCancel(visible) {
|
||||
const btn = document.getElementById("cancelBtn");
|
||||
btn.style.display = visible ? "" : "none";
|
||||
btn.disabled = false;
|
||||
btn.textContent = "Cancel";
|
||||
}
|
||||
// Closing or leaving a tab no longer cancels its solves: with the queue,
|
||||
// a left-behind solve just runs (or waits its turn) to completion and is
|
||||
// stored, so its share link still resolves — and a tab that only *views* a
|
||||
// shared solve can't cancel the author's work just by being closed. Cancel
|
||||
// is now an explicit per-card button only (see cancelPending).
|
||||
|
||||
async function run() {
|
||||
// Queue a solve: reserve its card immediately, POST it, then subscribe to
|
||||
// the progress stream once it's enqueued (confirmed). Multiple solves can
|
||||
// be queued at once; the server runs them one at a time.
|
||||
function run() {
|
||||
const errBox = document.getElementById("error");
|
||||
const out = document.getElementById("output");
|
||||
errBox.textContent = "";
|
||||
// A placeholder card for this request, prepended so the newest is on top.
|
||||
// It's replaced by the solution on success, or removed on error.
|
||||
const pending = el("p", {class: "pending"}, "Solving…");
|
||||
out.prepend(pending);
|
||||
let problem, time;
|
||||
try {
|
||||
problem = buildProblem();
|
||||
time = +document.getElementById("time").value;
|
||||
} catch (e) {errBox.textContent = e.message; pending.remove(); return;}
|
||||
solvingHere = true;
|
||||
setSolveDisabled(true, "Solving…");
|
||||
const token = (crypto.randomUUID ? crypto.randomUUID()
|
||||
: String(Date.now()) + Math.random());
|
||||
activeToken = token;
|
||||
showCancel(true);
|
||||
try {
|
||||
const resp = await fetch("/solve", {
|
||||
method: "POST",
|
||||
headers: {"Content-Type": "application/json"},
|
||||
body: JSON.stringify({problem, max_time_seconds: time, token}),
|
||||
});
|
||||
if (resp.status === 429) {
|
||||
pending.remove();
|
||||
errBox.textContent = "Another viewer is already solving — try again once they finish.";
|
||||
} catch (e) {errBox.textContent = e.message; return;}
|
||||
const token = uuid();
|
||||
const n = ++solutionCount;
|
||||
const card = makePendingCard(token, n);
|
||||
document.getElementById("output").prepend(card);
|
||||
pending.set(token, {token, card, n, confirmed: false});
|
||||
fetch("/solve", {
|
||||
method: "POST",
|
||||
headers: {"Content-Type": "application/json"},
|
||||
body: JSON.stringify({problem, max_time_seconds: time, token}),
|
||||
}).then(async (resp) => {
|
||||
if (!resp.ok) {
|
||||
const data = await resp.json().catch(() => ({}));
|
||||
pendingError(token, data.error || "Server error");
|
||||
return;
|
||||
}
|
||||
const data = await resp.json();
|
||||
if (!resp.ok) {pending.remove(); errBox.textContent = data.error || "Server error"; return;}
|
||||
renderSolution(data, pending, token);
|
||||
} catch (e) {pending.remove(); errBox.textContent = e.message;}
|
||||
finally {solvingHere = false; activeToken = null; showCancel(false); setSolveDisabled(false);}
|
||||
// Now the server knows the token; (re)open the stream to include it.
|
||||
const entry = pending.get(token);
|
||||
if (entry) {entry.confirmed = true; syncStream();}
|
||||
}).catch((e) => pendingError(token, e.message));
|
||||
}
|
||||
|
||||
// Enable/disable the Solve button with an optional note beside it. `kind`
|
||||
// ("self" or "other") picks the note's color so this tab's "Solving…" and
|
||||
// another viewer's "Another viewer is solving…" stand out distinctly.
|
||||
function setSolveDisabled(disabled, note = "", kind = "self") {
|
||||
document.getElementById("solveBtn").disabled = disabled;
|
||||
const busy = document.getElementById("busy");
|
||||
busy.textContent = note;
|
||||
busy.classList.toggle("busy-self", kind === "self");
|
||||
busy.classList.toggle("busy-other", kind === "other");
|
||||
}
|
||||
|
||||
// Poll the server's solve state so that while one viewer is solving the
|
||||
// others see their Solve button disabled (and re-enabled when it frees up).
|
||||
async function pollStatus() {
|
||||
try {
|
||||
const resp = await fetch("/status", {cache: "no-store"});
|
||||
const {solving} = await resp.json();
|
||||
if (!solvingHere) setSolveDisabled(solving,
|
||||
solving ? "Another viewer is solving…" : "", "other");
|
||||
} catch (e) {/* leave button as-is on a transient error */}
|
||||
}
|
||||
setInterval(pollStatus, 1000);
|
||||
pollStatus();
|
||||
|
||||
function renderSolution(s, placeholder, token) {
|
||||
function renderSolution(s, placeholder, token, n) {
|
||||
// Collapse any previously-shown solutions so the new one is the focus.
|
||||
for (const d of document.querySelectorAll("#output details.solution")) d.open = false;
|
||||
|
||||
const n = ++solutionCount;
|
||||
n = n ?? ++solutionCount;
|
||||
const details = el("details", {class: "solution", open: ""});
|
||||
// Tag the card with its token so "Share visible solutions" can collect it.
|
||||
if (token) details.dataset.token = token;
|
||||
details.append(el("summary", {},
|
||||
`Solution ${n} — ${s.status}, objective ${s.objective_value ?? "—"}`));
|
||||
const out = details;
|
||||
|
||||
// A shareable permalink to this stored solve, looked up by its UUID.
|
||||
if (token) {
|
||||
const url = location.origin + location.pathname + "?solve=" + encodeURIComponent(token);
|
||||
out.append(el("p", {}, el("button", {
|
||||
class: "mini", type: "button",
|
||||
onclick: (ev) => {
|
||||
navigator.clipboard?.writeText(url);
|
||||
ev.target.textContent = "Link copied!";
|
||||
setTimeout(() => {ev.target.textContent = "Copy share link";}, 1500);
|
||||
},
|
||||
}, "Copy share link")));
|
||||
out.append(el("p", {}, copyLinkButton("Copy share link",
|
||||
() => shareUrl([token]))));
|
||||
}
|
||||
out.append(el("p", {
|
||||
html:
|
||||
|
|
@ -1197,26 +1289,28 @@
|
|||
"electrum": 2
|
||||
})
|
||||
|
||||
// --- deep link: /?solve=<uuid> loads a previously stored solve ---
|
||||
async function loadSharedSolve(token) {
|
||||
const out = document.getElementById("output");
|
||||
const errBox = document.getElementById("error");
|
||||
const pending = el("p", {class: "pending"}, "Loading shared solve…");
|
||||
out.prepend(pending);
|
||||
try {
|
||||
const resp = await fetch("/solve/" + encodeURIComponent(token), {cache: "no-store"});
|
||||
if (resp.status === 404) {
|
||||
pending.remove();
|
||||
errBox.textContent = "No stored solve found for that link (it may have been evicted).";
|
||||
return;
|
||||
}
|
||||
const rec = await resp.json();
|
||||
if (!resp.ok) {pending.remove(); errBox.textContent = rec.error || "Server error"; return;}
|
||||
renderSolution(rec.solution, pending, rec.token);
|
||||
} catch (e) {pending.remove(); errBox.textContent = e.message;}
|
||||
// --- deep links ---
|
||||
// /?solve=<id> loads one solve; /?solves=<id1>,<id2>,… loads an arbitrary
|
||||
// set (the schema "Share visible solutions" produces). Each id is attached
|
||||
// to its live job state exactly like a locally-queued solve: a card is
|
||||
// reserved synchronously (so the set keeps its given order) and the shared
|
||||
// /job_status stream resolves it — rendering a finished solve, or tracking
|
||||
// one that's still queued/running until it completes (jobs are server-wide).
|
||||
// Only truly-unknown ids (evicted / never existed) fall to the "not found"
|
||||
// message, instead of every in-flight solve 404ing as "not stored yet".
|
||||
function loadSharedSolve(token) {
|
||||
const n = ++solutionCount;
|
||||
const card = makePendingCard(token, n);
|
||||
card._status.textContent = "Loading…";
|
||||
document.getElementById("output").prepend(card);
|
||||
pending.set(token, {token, card, n, confirmed: true});
|
||||
}
|
||||
const sharedToken = new URLSearchParams(location.search).get("solve");
|
||||
if (sharedToken) loadSharedSolve(sharedToken);
|
||||
const params = new URLSearchParams(location.search);
|
||||
const sharedTokens = (params.get("solves") || params.get("solve") || "")
|
||||
.split(",").map(s => s.trim()).filter(Boolean);
|
||||
// Load in reverse so the first id ends up on top (each load prepends).
|
||||
for (const token of sharedTokens.slice().reverse()) loadSharedSolve(token);
|
||||
if (sharedTokens.length) syncStream();
|
||||
</script>
|
||||
</body>
|
||||
|
||||
|
|
|
|||
282
main.py
282
main.py
|
|
@ -14,8 +14,6 @@ from __future__ import annotations
|
|||
|
||||
import json
|
||||
import os
|
||||
import select
|
||||
import socket
|
||||
import sqlite3
|
||||
import threading
|
||||
import time
|
||||
|
|
@ -95,21 +93,83 @@ def fetch_solve(token):
|
|||
return {"token": row[0], "ts": row[1], "status": row[2],
|
||||
"problem": json.loads(row[3]), "solution": json.loads(row[4])}
|
||||
|
||||
# Only one solve may run at a time across all viewers. While it's held, other
|
||||
# clients get an immediate 429 from /solve and disable their button (they learn
|
||||
# the busy state by polling /status).
|
||||
_solve_lock = threading.Lock()
|
||||
# Solves are queued and run one at a time by a single background worker, so any
|
||||
# number of viewers can pile requests on without being rejected — each request
|
||||
# returns immediately with a queue position and the client polls /job/<token>
|
||||
# for progress. Everything below is guarded by _cond (a Condition whose lock
|
||||
# also protects _jobs/_queue/_active); the worker waits on it for new work.
|
||||
_cond = threading.Condition()
|
||||
|
||||
# Tracks the in-flight solve so /cancel can stop it. /cancel is hit two ways:
|
||||
# the user clicking Cancel (a normal fetch, tab stays open) and a closing tab
|
||||
# firing navigator.sendBeacon("/cancel?token=...") — both small requests that
|
||||
# pass cleanly through reverse proxies, unlike a dropped upstream socket (which
|
||||
# the proxy keeps alive). Guarded by _active_lock; only one solve runs at a
|
||||
# time, but the lock keeps the token / solver handoff race-free against a
|
||||
# concurrent /cancel.
|
||||
_active_lock = threading.Lock()
|
||||
# token -> {"status", "problem", "raw_problem", "max_time", "error"}.
|
||||
# status is one of: queued, running, done, error, cancelled.
|
||||
_jobs = {}
|
||||
# Tokens waiting to run, in order.
|
||||
_queue = []
|
||||
|
||||
# The in-flight solve so /cancel can stop it. /cancel is hit two ways: the user
|
||||
# clicking a card's Cancel button (a normal fetch, tab stays open) and a closing
|
||||
# tab firing navigator.sendBeacon("/cancel?token=...") — both small requests
|
||||
# that pass cleanly through reverse proxies. For a still-queued token /cancel
|
||||
# just drops it from the queue; for the running token it stops the search.
|
||||
_active = {"token": None, "solver": None}
|
||||
|
||||
# Bound the in-memory job map: keep at most this many terminal (done/error/
|
||||
# cancelled) entries. Completed solves still live in SQLite, so a pruned "done"
|
||||
# job degrades gracefully — /job falls back to the DB and still reports done.
|
||||
MAX_TERMINAL_JOBS = 100
|
||||
|
||||
|
||||
def _prune_jobs():
|
||||
# Caller holds _cond. Drop the oldest terminal jobs beyond the cap.
|
||||
terminal = [t for t, j in _jobs.items()
|
||||
if j["status"] in ("done", "error", "cancelled")]
|
||||
for t in terminal[:max(0, len(terminal) - MAX_TERMINAL_JOBS)]:
|
||||
_jobs.pop(t, None)
|
||||
|
||||
|
||||
def _solve_worker():
|
||||
# Run queued solves one at a time, forever. Started as a daemon in main().
|
||||
while True:
|
||||
with _cond:
|
||||
while not _queue:
|
||||
_cond.wait()
|
||||
token = _queue.pop(0)
|
||||
job = _jobs.get(token)
|
||||
if job is None or job["status"] != "queued":
|
||||
continue # cancelled (or vanished) before it ran
|
||||
job["status"] = "running"
|
||||
problem = job["problem"]
|
||||
raw_problem = job["raw_problem"]
|
||||
max_time = job["max_time"]
|
||||
_active["token"] = token
|
||||
_active["solver"] = None
|
||||
_cond.notify_all() # wake /job_status streams watching this token
|
||||
|
||||
def register(s):
|
||||
with _cond:
|
||||
if _active["token"] == token:
|
||||
_active["solver"] = s
|
||||
|
||||
try:
|
||||
sol = solve(problem, max_time_seconds=max_time, solver_sink=register)
|
||||
sol_dict = solution_to_dict(sol)
|
||||
# Persist before flipping to "done" so a client that sees "done"
|
||||
# can always fetch the solution. A cancelled-midway solve returns
|
||||
# the best plan found so far and is stored like any other.
|
||||
store_solve(token, raw_problem, sol_dict)
|
||||
with _cond:
|
||||
job["status"] = "done"
|
||||
_cond.notify_all()
|
||||
except Exception as exc:
|
||||
with _cond:
|
||||
job["status"] = "error"
|
||||
job["error"] = f"{type(exc).__name__}: {exc}"
|
||||
_cond.notify_all()
|
||||
finally:
|
||||
with _cond:
|
||||
_active["token"] = None
|
||||
_active["solver"] = None
|
||||
|
||||
|
||||
def _stop_search(solver):
|
||||
# StopSearch() exists in OR-Tools 9.x+; degrade gracefully on older builds
|
||||
|
|
@ -135,8 +195,12 @@ class Handler(BaseHTTPRequestHandler):
|
|||
path = urlsplit(self.path).path
|
||||
if path in ("/", "/index.html"):
|
||||
self._send(200, INDEX_HTML, "text/html; charset=utf-8")
|
||||
elif path == "/status":
|
||||
self._send(200, json.dumps({"solving": _solve_lock.locked()}), no_cache=True)
|
||||
elif path == "/job_status":
|
||||
tokens = parse_qs(urlsplit(self.path).query).get("tokens", [""])[0]
|
||||
self._handle_job_stream(tokens)
|
||||
elif path.startswith("/job/"):
|
||||
self._send(200, json.dumps(self._job_state(unquote(path[len("/job/"):]))),
|
||||
no_cache=True)
|
||||
elif path.startswith("/solve/"):
|
||||
token = unquote(path[len("/solve/"):])
|
||||
rec = fetch_solve(token)
|
||||
|
|
@ -147,17 +211,78 @@ class Handler(BaseHTTPRequestHandler):
|
|||
else:
|
||||
self._send(404, json.dumps({"error": "not found"}))
|
||||
|
||||
def _client_gone(self):
|
||||
# True once the peer has closed its end. We've already consumed the
|
||||
# request body, so any readable data here is EOF (b"") for a closed tab.
|
||||
sock = self.connection
|
||||
def _job_state(self, token):
|
||||
# Report a queued/running solve's live state, falling back to SQLite for
|
||||
# a finished (or evicted-from-memory) one. The client polls this to drive
|
||||
# each pending card: queued -> running -> done (render) / error / cancelled.
|
||||
with _cond:
|
||||
job = _jobs.get(token)
|
||||
if job is not None:
|
||||
status = job["status"]
|
||||
if status == "queued":
|
||||
pos = _queue.index(token) if token in _queue else 0
|
||||
return {"status": "queued", "position": pos}
|
||||
if status == "running":
|
||||
return {"status": "running"}
|
||||
if status == "error":
|
||||
return {"status": "error", "error": job.get("error")}
|
||||
if status == "cancelled":
|
||||
return {"status": "cancelled"}
|
||||
# status == "done": fall through to load the stored solution.
|
||||
rec = fetch_solve(token)
|
||||
if rec is not None:
|
||||
return {"status": "done", "solution": rec["solution"]}
|
||||
return {"status": "unknown"}
|
||||
|
||||
# Statuses a job can't move on from — once a tracked token reaches one we
|
||||
# send it a final time and stop watching it.
|
||||
_TERMINAL = ("done", "error", "cancelled", "unknown")
|
||||
|
||||
def _handle_job_stream(self, tokens_csv):
|
||||
# Server-Sent Events stream for one or more tokens
|
||||
# (GET /job_status?tokens=t1,t2,…). Emits a `data:` event per token
|
||||
# whenever its state changes (queued/position -> running -> done/…),
|
||||
# blocking on _cond between changes rather than busy-polling, and drops
|
||||
# each token once it's terminal; the stream closes when all are done.
|
||||
remaining, seen = [], set()
|
||||
for t in (s.strip() for s in tokens_csv.split(",")):
|
||||
if t and t not in seen:
|
||||
seen.add(t)
|
||||
remaining.append(t)
|
||||
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "text/event-stream")
|
||||
self.send_header("Cache-Control", "no-store")
|
||||
self.send_header("Connection", "close")
|
||||
# Tell nginx & friends not to buffer the stream (see reverse-proxy notes).
|
||||
self.send_header("X-Accel-Buffering", "no")
|
||||
self.end_headers()
|
||||
|
||||
last = {}
|
||||
try:
|
||||
r, _, _ = select.select([sock], [], [], 0)
|
||||
if r:
|
||||
return sock.recv(1, socket.MSG_PEEK) == b""
|
||||
except OSError:
|
||||
return True
|
||||
return False
|
||||
while remaining:
|
||||
# Send any token whose state changed since we last reported it,
|
||||
# and stop tracking the ones that have reached a terminal state.
|
||||
for token in list(remaining):
|
||||
state = self._job_state(token)
|
||||
payload = json.dumps({"token": token, **state})
|
||||
if last.get(token) != payload:
|
||||
last[token] = payload
|
||||
self.wfile.write(b"data: " + payload.encode("utf-8") + b"\n\n")
|
||||
self.wfile.flush()
|
||||
if state["status"] in self._TERMINAL:
|
||||
remaining.remove(token)
|
||||
if not remaining:
|
||||
break
|
||||
# Block until a job changes state (worker/cancel call notify_all);
|
||||
# on the periodic timeout send a comment as a keep-alive heartbeat.
|
||||
with _cond:
|
||||
notified = _cond.wait(timeout=15)
|
||||
if not notified:
|
||||
self.wfile.write(b": ping\n\n")
|
||||
self.wfile.flush()
|
||||
except (BrokenPipeError, ConnectionResetError, OSError):
|
||||
return # client closed the EventSource; let the thread end
|
||||
|
||||
def do_POST(self):
|
||||
path = urlsplit(self.path)
|
||||
|
|
@ -167,87 +292,53 @@ class Handler(BaseHTTPRequestHandler):
|
|||
if path.path != "/solve":
|
||||
self._send(404, json.dumps({"error": "not found"}))
|
||||
return
|
||||
# Reject immediately if another viewer is already solving.
|
||||
if not _solve_lock.acquire(blocking=False):
|
||||
self._send(429, json.dumps({"error": "Another solve is already in progress"}))
|
||||
return
|
||||
released = False
|
||||
token = None
|
||||
# Validate and enqueue, then return immediately with a queue position.
|
||||
# The single background worker runs queued solves one at a time; the
|
||||
# client follows progress over /job_status (SSE) or by polling /job/<token>.
|
||||
try:
|
||||
length = int(self.headers.get("Content-Length", 0))
|
||||
payload = json.loads(self.rfile.read(length) or b"{}")
|
||||
raw_problem = payload.get("problem", {})
|
||||
problem = problem_from_dict(raw_problem)
|
||||
problem = problem_from_dict(raw_problem) # surfaces bad input now
|
||||
max_time = float(payload.get("max_time_seconds", 30.0))
|
||||
token = str(payload.get("token") or "")
|
||||
|
||||
# Mark this token as the in-flight solve so a matching /cancel beacon
|
||||
# (sent when the requesting tab closes) can stop it.
|
||||
with _active_lock:
|
||||
_active["token"] = token
|
||||
_active["solver"] = None
|
||||
|
||||
# Run the (blocking) solve in a worker thread so this thread is free
|
||||
# to watch the socket. solver_sink hands us the CpSolver so both the
|
||||
# local disconnect watcher and /cancel can abort the search.
|
||||
result = {}
|
||||
|
||||
def register(s):
|
||||
with _active_lock:
|
||||
if _active["token"] == token:
|
||||
_active["solver"] = s
|
||||
|
||||
def run():
|
||||
try:
|
||||
result["sol"] = solve(
|
||||
problem, max_time_seconds=max_time, solver_sink=register,
|
||||
)
|
||||
except Exception as exc:
|
||||
result["err"] = exc
|
||||
|
||||
worker = threading.Thread(target=run, daemon=True)
|
||||
worker.start()
|
||||
while worker.is_alive():
|
||||
worker.join(0.25)
|
||||
# Direct connections (no proxy) still get fast release via socket
|
||||
# EOF; behind a proxy the /cancel beacon is what stops the solve.
|
||||
if worker.is_alive() and self._client_gone():
|
||||
with _active_lock:
|
||||
if _active["solver"] is not None:
|
||||
_stop_search(_active["solver"])
|
||||
_solve_lock.release()
|
||||
released = True
|
||||
return
|
||||
|
||||
if "err" in result:
|
||||
raise result["err"]
|
||||
sol_dict = solution_to_dict(result["sol"])
|
||||
# Persist the completed solve so it can be looked up by its UUID.
|
||||
# (Cancelled solves take the early return above and aren't stored.)
|
||||
store_solve(token, raw_problem, sol_dict)
|
||||
self._send(200, json.dumps(sol_dict))
|
||||
if not token:
|
||||
self._send(400, json.dumps({"error": "missing token"}))
|
||||
return
|
||||
with _cond:
|
||||
_prune_jobs()
|
||||
_jobs[token] = {
|
||||
"status": "queued", "problem": problem,
|
||||
"raw_problem": raw_problem, "max_time": max_time, "error": None,
|
||||
}
|
||||
_queue.append(token)
|
||||
position = len(_queue) - 1
|
||||
_cond.notify_all()
|
||||
self._send(202, json.dumps({"status": "queued", "position": position}),
|
||||
no_cache=True)
|
||||
except Exception as exc: # surface errors to the browser
|
||||
self._send(400, json.dumps({"error": f"{type(exc).__name__}: {exc}"}))
|
||||
finally:
|
||||
with _active_lock:
|
||||
if _active["token"] == token:
|
||||
_active["token"] = None
|
||||
_active["solver"] = None
|
||||
if not released:
|
||||
_solve_lock.release()
|
||||
|
||||
def _handle_cancel(self, query):
|
||||
# Stop the in-flight solve iff the request's token matches it, so a Cancel
|
||||
# click (or closing tab) can't cancel a *different* viewer's solve. The
|
||||
# running /solve handler then returns normally — with the best plan found
|
||||
# so far — and releases the lock.
|
||||
# Cancel iff the request's token matches a queued/running solve, so a
|
||||
# Cancel click (or closing tab) can't cancel a *different* viewer's
|
||||
# solve. A still-queued token is simply dropped from the queue; the
|
||||
# running token has its search stopped (the worker then stores the best
|
||||
# plan found so far and the job flips to "done").
|
||||
token = (query.get("token") or [""])[0]
|
||||
stopped = False
|
||||
with _active_lock:
|
||||
if token and token == _active["token"] and _active["solver"] is not None:
|
||||
result = "none"
|
||||
with _cond:
|
||||
if token and token in _queue:
|
||||
_queue.remove(token)
|
||||
job = _jobs.get(token)
|
||||
if job is not None:
|
||||
job["status"] = "cancelled"
|
||||
result = "dequeued"
|
||||
_cond.notify_all()
|
||||
elif token and token == _active["token"] and _active["solver"] is not None:
|
||||
_stop_search(_active["solver"])
|
||||
stopped = True
|
||||
self._send(200, json.dumps({"cancelled": stopped}), no_cache=True)
|
||||
result = "stopped"
|
||||
self._send(200, json.dumps({"cancelled": result}), no_cache=True)
|
||||
|
||||
def log_message(self, fmt, *args): # quieter console
|
||||
pass
|
||||
|
|
@ -255,6 +346,7 @@ class Handler(BaseHTTPRequestHandler):
|
|||
|
||||
def main():
|
||||
init_db()
|
||||
threading.Thread(target=_solve_worker, daemon=True).start()
|
||||
host = os.environ.get("HOST", "127.0.0.1")
|
||||
port = int(os.environ.get("PORT", "8000"))
|
||||
server = ThreadingHTTPServer((host, port), Handler)
|
||||
|
|
|
|||
Loading…
Reference in a new issue