This commit is contained in:
Pagwin 2026-06-09 12:57:36 -04:00
parent 9711aa5bac
commit 7229124bb5
No known key found for this signature in database
GPG key ID: 81137023740CA260
2 changed files with 217 additions and 122 deletions

336
main.py
View file

@ -27,15 +27,18 @@ INITIAL = (30, 30, 30, 30)
# at the START of that step. Types: 'H' Hub, 'F' Foundry, 'M' Metropolis, 'N' Monument.
# Total cities across all steps must be <= 7. Arriving cities act that same step.
# Each entry may be either a string shorthand (e.g. "H") or a dict, e.g.
# {"type": "F", "adjacent_to": [city_idx, ...]}
# {"type": "F", "adjacent_to": [city_idx, ...], "vats": {"E": 1, "B": 1, "S": 1},
# "departure_step": NUM_STEPS + 1}
# `adjacent_to` is currently parsed but unused; see normalize_city().
# Optional `vats` overrides initial vat values at arrival (each defaults to 1).
# Optional `departure_step` is the first step at which the city is ABSENT;
# default NUM_STEPS+1 means the city stays through the whole simulation.
ARRIVALS = {
1: [
{"type": "F", "adjacent_to": []},
{"type": "F", "adjacent_to": []},
{"type": "F", "adjacent_to": [], "departure_step": 6},
{"type": "H", "adjacent_to": []},
{"type": "H", "adjacent_to": []},
{"type": "N", "adjacent_to": []},
],
2: [],
3: [],
@ -46,8 +49,19 @@ ARRIVALS = {
def normalize_city(c):
if isinstance(c, str):
return {"type": c, "adjacent_to": []}
return c
c = {"type": c}
vats_in = c.get("vats", {}) or {}
vats = {
"E": vats_in.get("E", 1),
"B": vats_in.get("B", 1),
"S": vats_in.get("S", 1),
}
return {
"type": c["type"],
"adjacent_to": c.get("adjacent_to", []),
"vats": vats,
"departure_step": c.get("departure_step", NUM_STEPS + 1),
}
# Collect Bonus (b) adds +1 to whatever a Collect gives. On a foundry that is
@ -73,7 +87,7 @@ AGENT_AVAILABILITY = {
"prodigy": [],
"provisioner": [],
"metallurgist": [2, 3, 4, 5],
"builder": [],
"builder": [2, 3, 4],
"courier": [],
"planner": [1, 2, 3, 4, 5],
"fence": [],
@ -94,6 +108,7 @@ ENABLED_ACTIONS = {
"overwork": True,
"renovate_h": True,
"renovate_f": True,
"noop": True,
# "renovate_m": False, # currently always disabled in main.py
}
@ -154,6 +169,40 @@ class Action:
# ======================================================================
class NoOpAction(Action):
def __init__(self):
super().__init__("noop")
def declare_vars(self, model, i, t, ctx):
noop = model.NewBoolVar(f"noop_{i}_{t}")
ctx["action_vars"]["noop"][(i, t)] = noop
ctx["noop"][i, t] = noop
# only allowed after overwork to avoid more things needing to be checked
def add_constraints(self, model, i, t, ctx):
noop = ctx["noop"][i, t]
if t == ctx["arrival_step"][i]:
# Can't noop at arrival (no prior step)
model.Add(noop == 0)
else:
# noop ≤ ow[i, t-1]: noop can be 1 only if overwork was 1 last step
model.Add(noop <= ctx["ow"][i, t - 1])
def add_global_constraints(self, model, t, ctx):
pass
def contributes_gains_costs(self, model, i, t, ctx):
# No gains or costs
pass
def transition_contrib(self, model, i, t, ctx):
pass
def describe(self, solver, i, t, ctx):
return "Rest (no action)"
class CollectAction(Action):
def __init__(self):
super().__init__("collect")
@ -466,9 +515,9 @@ class OverworkAction(Action):
m = model
ow = ctx["ow"]
N = ctx["N"]
# no planner = no overworks
if "planner" not in AGENT_AVAILABILITY:
# At most 1 city overworks per step
m.Add(sum(ow[i, t] for i in range(N)) <= 1)
m.Add(sum(ow[i, t] for i in range(N)) <= 0)
# else: gating handled per-city by Planner
def contributes_gains_costs(self, model, i, t, ctx):
@ -693,6 +742,7 @@ class Baron(Agent):
m.Add(deposit == amt).OnlyEnforceIf(both)
m.Add(deposit == 0).OnlyEnforceIf(both.Not())
ctx["tg_pool"][t].append(deposit)
ctx.setdefault("baron_deposits", {})[t] = deposit
class Prodigy(Agent):
@ -856,7 +906,9 @@ class Fence(EventAgent):
"""Each available step, deposits +20 (=2 trade goods x10) into tg_pool[t]."""
def apply_event(self, model, t, ctx):
ctx["tg_pool"][t].append(model.NewConstant(20))
deposit = model.NewConstant(20)
ctx["tg_pool"][t].append(deposit)
ctx.setdefault("fence_deposits", {})[t] = 20
# ======================================================================
@ -899,6 +951,7 @@ def solve(
# action variables (sparse, only created for enabled actions)
col, ua, ub, ud = {}, {}, {}, {}
ow = {}
noop = {}
rH, rF, rM = {}, {}, {}
cvE, cvB, cvS = {}, {}, {}
owcvE, owcvB, owcvS = {}, {}, {}
@ -908,6 +961,7 @@ def solve(
for i in range(N):
a_step, a_city = cities[i]
a_type = a_city["type"]
d_step = a_city["departure_step"]
for t in range(1, NUM_STEPS + 2):
isH[i, t] = m.NewBoolVar(f"isH_{i}_{t}")
isF[i, t] = m.NewBoolVar(f"isF_{i}_{t}")
@ -921,8 +975,8 @@ def solve(
vB[i, t] = m.NewIntVar(0, max_vat, f"vB_{i}_{t}")
vS[i, t] = m.NewIntVar(0, max_vat, f"vS_{i}_{t}")
# presence: present from arrival step onward, persists
m.Add(present[i, t] == (1 if t >= a_step else 0))
# presence: present from arrival step until departure step (exclusive)
m.Add(present[i, t] == (1 if a_step <= t < d_step else 0))
# exactly one type iff present
m.Add(isH[i, t] + isF[i, t] + isM[i, t] + isMon[i, t] == present[i, t])
@ -962,6 +1016,8 @@ def solve(
action_registry.append(RenovateAction("renovate_h", "H"))
if ENABLED_ACTIONS.get("renovate_f", True):
action_registry.append(RenovateAction("renovate_f", "F"))
if ENABLED_ACTIONS.get("noop", True):
action_registry.append(NoOpAction())
# ---- Governor variable matrix ----
# governor[(i, t, name)] = BoolVar (or constant 0) for whether agent `name`
@ -1002,7 +1058,6 @@ def solve(
"planner": Planner,
"foreman": Foreman,
"industrialist": Industrialist,
"economist": Economist,
}
for name, cls in agent_classes.items():
if name in AGENT_AVAILABILITY:
@ -1019,6 +1074,8 @@ def solve(
builder_fire_map = {}
# One-shot agent declarations.
fence_deposits = {}
baron_deposits = {}
_agent_decl_ctx = {
"model": m,
"N": N,
@ -1032,6 +1089,8 @@ def solve(
"cost_C": cost_C,
"cost_S": cost_S,
"builder_fire": builder_fire_map,
"fence_deposits": fence_deposits,
"baron_deposits": baron_deposits,
}
for agent in agent_registry:
agent.declare_vars(m, _agent_decl_ctx)
@ -1051,21 +1110,28 @@ def solve(
m.Add(hasB[i, a_step] == 0)
m.Add(hasD[i, a_step] == 0)
# vats at arrival
m.Add(vE[i, a_step] == 1)
m.Add(vB[i, a_step] == 1)
m.Add(vS[i, a_step] == 1)
m.Add(vE[i, a_step] == a_city["vats"]["E"])
m.Add(vB[i, a_step] == a_city["vats"]["B"])
m.Add(vS[i, a_step] == a_city["vats"]["S"])
# before arrival: everything zero
for t in range(1, a_step):
for v in (isH, isF, isM, isMon, hasA, hasB, hasD, vE, vB, vS):
m.Add(v[i, t] == 0)
# after departure: everything zero from d_step onward
d_step = a_city["departure_step"]
for t in range(d_step, NUM_STEPS + 2):
for v in (isH, isF, isM, isMon, hasA, hasB, hasD, vE, vB, vS):
m.Add(v[i, t] == 0)
# action + transition logic for active steps
for t in range(a_step, NUM_STEPS + 1):
for t in range(a_step, min(d_step, NUM_STEPS + 1)):
P = present[i, t]
# Build context for action methods
ctx = {
"noop": noop,
"model": m,
"i": i,
"t": t,
@ -1118,6 +1184,7 @@ def solve(
"renovations": {},
"vat_next": {},
"builder_fire": builder_fire_map,
"baron_deposits": baron_deposits,
}
# Declare all action variables
@ -1165,123 +1232,127 @@ def solve(
for agent in agent_registry:
agent.apply(m, i, t, ctx)
# Upgrade transitions
m.AddMaxEquality(
hasA[i, t + 1], [hasA[i, t], ua.get((i, t), m.NewConstant(0))]
)
m.AddMaxEquality(
hasB[i, t + 1], [hasB[i, t], ub.get((i, t), m.NewConstant(0))]
)
d_keep = m.NewBoolVar("")
d_sources = [hasD[i, t], ud.get((i, t), m.NewConstant(0))]
_bfire = builder_fire_map.get((i, t))
if _bfire is not None:
d_sources.append(_bfire)
m.AddMaxEquality(d_keep, d_sources)
m.Add(hasD[i, t + 1] == 0).OnlyEnforceIf(ren)
m.Add(hasD[i, t + 1] == d_keep).OnlyEnforceIf(no_ren)
# Upgrade transitions (skip propagation past last present step)
if t + 1 < d_step:
m.AddMaxEquality(
hasA[i, t + 1], [hasA[i, t], ua.get((i, t), m.NewConstant(0))]
)
m.AddMaxEquality(
hasB[i, t + 1], [hasB[i, t], ub.get((i, t), m.NewConstant(0))]
)
d_keep = m.NewBoolVar("")
d_sources = [hasD[i, t], ud.get((i, t), m.NewConstant(0))]
_bfire = builder_fire_map.get((i, t))
if _bfire is not None:
d_sources.append(_bfire)
m.AddMaxEquality(d_keep, d_sources)
m.Add(hasD[i, t + 1] == 0).OnlyEnforceIf(ren)
m.Add(hasD[i, t + 1] == d_keep).OnlyEnforceIf(no_ren)
# Gain/cost contributions
for action in action_registry:
action.contributes_gains_costs(m, i, t, ctx)
# Transition contributions (including vats)
for action in action_registry:
action.transition_contrib(m, i, t, ctx)
if t + 1 < d_step:
# Transition contributions (including vats)
for action in action_registry:
action.transition_contrib(m, i, t, ctx)
# Type transition t -> t+1
m.Add(isH[i, t + 1] == isH[i, t]).OnlyEnforceIf(no_ren)
m.Add(isF[i, t + 1] == isF[i, t]).OnlyEnforceIf(no_ren)
m.Add(isM[i, t + 1] == isM[i, t]).OnlyEnforceIf(no_ren)
for r, target in [
(rH.get((i, t)), isH),
(rF.get((i, t)), isF),
(rM.get((i, t)), isM),
]:
if r is not None:
m.Add(target[i, t + 1] == 1).OnlyEnforceIf(r)
# Type transition t -> t+1
m.Add(isH[i, t + 1] == isH[i, t]).OnlyEnforceIf(no_ren)
m.Add(isF[i, t + 1] == isF[i, t]).OnlyEnforceIf(no_ren)
m.Add(isM[i, t + 1] == isM[i, t]).OnlyEnforceIf(no_ren)
for r, target in [
(rH.get((i, t)), isH),
(rF.get((i, t)), isF),
(rM.get((i, t)), isM),
]:
if r is not None:
m.Add(target[i, t + 1] == 1).OnlyEnforceIf(r)
for r1, t1, t2 in [
(rH.get((i, t)), isF, isM),
(rF.get((i, t)), isH, isM),
(rM.get((i, t)), isH, isF),
]:
if r1 is not None:
m.Add(t1[i, t + 1] == 0).OnlyEnforceIf(r1)
m.Add(t2[i, t + 1] == 0).OnlyEnforceIf(r1)
for r1, t1, t2 in [
(rH.get((i, t)), isF, isM),
(rF.get((i, t)), isH, isM),
(rM.get((i, t)), isH, isF),
]:
if r1 is not None:
m.Add(t1[i, t + 1] == 0).OnlyEnforceIf(r1)
m.Add(t2[i, t + 1] == 0).OnlyEnforceIf(r1)
# ---- Centralized vat transitions (mirrors main.py exactly) ----
# Collect/overwork vat selectors (None if action disabled)
_cvE = cvE.get((i, t))
_cvB = cvB.get((i, t))
_cvS = cvS.get((i, t))
_owcvE = owcvE.get((i, t))
_owcvB = owcvB.get((i, t))
_owcvS = owcvS.get((i, t))
# ---- Centralized vat transitions (mirrors main.py exactly) ----
# Collect/overwork vat selectors (None if action disabled)
_cvE = cvE.get((i, t))
_cvB = cvB.get((i, t))
_cvS = cvS.get((i, t))
_owcvE = owcvE.get((i, t))
_owcvB = owcvB.get((i, t))
_owcvS = owcvS.get((i, t))
inc = m.NewIntVar(1, 3, "")
metallurgist_g = governor.get((i, t, "metallurgist"))
if metallurgist_g is not None and "metallurgist" in AGENT_AVAILABILITY:
metal_active = m.NewBoolVar("")
m.AddMultiplicationEquality(metal_active, [metallurgist_g, isF[i, t]])
m.Add(inc == 1 + hasD[i, t] + metal_active)
else:
m.Add(inc == 1 + hasD[i, t])
inc = m.NewIntVar(1, 3, "")
metallurgist_g = governor.get((i, t, "metallurgist"))
if metallurgist_g is not None and "metallurgist" in AGENT_AVAILABILITY:
metal_active = m.NewBoolVar("")
m.AddMultiplicationEquality(
metal_active, [metallurgist_g, isF[i, t]]
)
m.Add(inc == 1 + metal_active)
else:
m.Add(inc == 1 + hasD[i, t])
vEn = m.NewIntVar(0, max_vat, "")
vBn = m.NewIntVar(0, max_vat, "")
vSn = m.NewIntVar(0, max_vat, "")
vEn = m.NewIntVar(0, max_vat, "")
vBn = m.NewIntVar(0, max_vat, "")
vSn = m.NewIntVar(0, max_vat, "")
# For each vat selector (collect E, collect B, collect S, overwork E, overwork B, overwork S)
# apply the same reset/increment pattern from main.py
for sel_E, sel_B, sel_S in [
(_cvE, _cvB, _cvS),
(_owcvE, _owcvB, _owcvS),
]:
if sel_E is not None:
m.Add(vEn == 0).OnlyEnforceIf(sel_E)
m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_E)
m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_E)
if sel_B is not None:
m.Add(vBn == 0).OnlyEnforceIf(sel_B)
m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_B)
m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_B)
if sel_S is not None:
m.Add(vSn == 0).OnlyEnforceIf(sel_S)
m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_S)
m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_S)
# For each vat selector (collect E, collect B, collect S, overwork E, overwork B, overwork S)
# apply the same reset/increment pattern from main.py
for sel_E, sel_B, sel_S in [
(_cvE, _cvB, _cvS),
(_owcvE, _owcvB, _owcvS),
]:
if sel_E is not None:
m.Add(vEn == 0).OnlyEnforceIf(sel_E)
m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_E)
m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_E)
if sel_B is not None:
m.Add(vBn == 0).OnlyEnforceIf(sel_B)
m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_B)
m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_B)
if sel_S is not None:
m.Add(vSn == 0).OnlyEnforceIf(sel_S)
m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_S)
m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_S)
# foundry but neither collecting nor overworking: vats unchanged
# f_noncollect_noow = isF AND NOT fcol AND NOT fow = isF - fcol - fow
_fcol_count = sum(s for s in [_cvE, _cvB, _cvS] if s is not None)
_fow_count = sum(s for s in [_owcvE, _owcvB, _owcvS] if s is not None)
f_noncollect_noow = m.NewBoolVar("")
m.Add(f_noncollect_noow == isF[i, t] - _fcol_count - _fow_count)
m.Add(vEn == vE[i, t]).OnlyEnforceIf(f_noncollect_noow)
m.Add(vBn == vB[i, t]).OnlyEnforceIf(f_noncollect_noow)
m.Add(vSn == vS[i, t]).OnlyEnforceIf(f_noncollect_noow)
# foundry but neither collecting nor overworking: vats unchanged
# f_noncollect_noow = isF AND NOT fcol AND NOT fow = isF - fcol - fow
_fcol_count = sum(s for s in [_cvE, _cvB, _cvS] if s is not None)
_fow_count = sum(s for s in [_owcvE, _owcvB, _owcvS] if s is not None)
f_noncollect_noow = m.NewBoolVar("")
m.Add(f_noncollect_noow == isF[i, t] - _fcol_count - _fow_count)
m.Add(vEn == vE[i, t]).OnlyEnforceIf(f_noncollect_noow)
m.Add(vBn == vB[i, t]).OnlyEnforceIf(f_noncollect_noow)
m.Add(vSn == vS[i, t]).OnlyEnforceIf(f_noncollect_noow)
# assign vat[i, t+1]:
# renovate-to-foundry -> reset to 1
# continuing foundry -> vat_next
# otherwise (not foundry next) -> 0
cont_F = AND(isF[i, t], no_ren)
for vnext, vn in (
(vE[i, t + 1], vEn),
(vB[i, t + 1], vBn),
(vS[i, t + 1], vSn),
):
if rF.get((i, t)) is not None:
m.Add(vnext == 1).OnlyEnforceIf(rF[i, t])
m.Add(vnext == vn).OnlyEnforceIf(cont_F)
not_F_next = isF[i, t + 1].Not()
m.Add(vE[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
m.Add(vB[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
m.Add(vS[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
# assign vat[i, t+1]:
# renovate-to-foundry -> reset to 1
# continuing foundry -> vat_next
# otherwise (not foundry next) -> 0
cont_F = AND(isF[i, t], no_ren)
for vnext, vn in (
(vE[i, t + 1], vEn),
(vB[i, t + 1], vBn),
(vS[i, t + 1], vSn),
):
if rF.get((i, t)) is not None:
m.Add(vnext == 1).OnlyEnforceIf(rF[i, t])
m.Add(vnext == vn).OnlyEnforceIf(cont_F)
not_F_next = isF[i, t + 1].Not()
m.Add(vE[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
m.Add(vB[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
m.Add(vS[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
# ---- Builder one-shot: at most one fire across all (i, t) ----
if builder_fire_map:
m.Add(sum(builder_fire_map.values()) <= 1)
# if builder_fire_map:
# m.Add(sum(builder_fire_map.values()) <= 1)
# ---- global constraints (per action type, per step) ----
for t in range(1, NUM_STEPS + 1):
@ -1302,6 +1373,8 @@ def solve(
"gain_C": gain_C,
"cost_C": cost_C,
"cost_S": cost_S,
"fence_deposits": fence_deposits,
"baron_deposits": baron_deposits,
}
for t in range(1, NUM_STEPS + 1):
for agent in event_agent_registry:
@ -1324,6 +1397,10 @@ def solve(
tg_S = m.NewIntVar(0, max_res, f"tg_S_{t}")
tg_C = m.NewIntVar(0, max_res, f"tg_C_{t}")
m.Add(tg_E + tg_B + tg_S + tg_C == pool_total)
# Each trade good (10 scaled units) must be allocated entirely to one resource
m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_E, 10)
m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_B, 10)
m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_S, 10)
gain_E[t].append(tg_E)
gain_B[t].append(tg_B)
gain_S[t].append(tg_S)
@ -1398,9 +1475,9 @@ def solve(
solver.parameters.num_search_workers = num_workers
status = solver.Solve(
m,
printer.IntermediateSolutionPrinter(
{"electrum": finalE, "brass": finalB, "steel": finalS}, scale=0.01
),
# printer.IntermediateSolutionPrinter(
# {"electrum": finalE, "brass": finalB, "steel": finalS}, scale=0.1
# ),
)
if verbose:
@ -1453,6 +1530,8 @@ def solve(
governor,
agent_registry,
capital_spent,
fence_deposits,
baron_deposits,
)
return solver, status
@ -1505,6 +1584,8 @@ def _report(
governor=None,
agent_registry=None,
capital_spent=None,
fence_deposits=None,
baron_deposits=None,
):
print("status:", solver.StatusName(status))
if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE):
@ -1605,6 +1686,19 @@ def _report(
spent = sum(solver.Value(x) for x in capital_spent[t]) / 10
print(f" step {t}: {spent:6.1f}")
if fence_deposits:
print("\n Fence deposits (trade goods):")
for t in fence_deposits:
print(
f" step {t}: {fence_deposits[t] / 10:.1f} (scaled: {fence_deposits[t]})"
)
if baron_deposits:
print("\n Baron deposits (trade goods):")
for t in sorted(baron_deposits.keys()):
amt = solver.Value(baron_deposits[t])
print(f" step {t}: {amt / 10:.1f} (scaled: {amt})")
# Resolve governor assignments for display
def gov_for(i, t):
if not governor or not agent_registry:

View file

@ -8,12 +8,13 @@ class IntermediateSolutionPrinter(cp_model.CpSolverSolutionCallback):
cp_model.CpSolverSolutionCallback.__init__(self)
self._variables = variables
self._solution_count = 0
self.scale = scale
def on_solution_callback(self):
"""Called each time an improving solution is found."""
print("\n--- Solution ---")
for name, var in self._variables.items():
print(f"{name} = {self.Value(var)}")
print(f"{name} = {self.scale * self.Value(var)}")
@property
def solution_count(self):