dws-city-res-solve/solve.py
2026-06-10 13:00:06 -04:00

1862 lines
63 KiB
Python

"""
City Resource Optimization -> CP-SAT (Google OR-Tools) [ACTION REGISTRY VERSION]
Maximise Electrum * Brass * Steel after the gains of step 5.
The objective is a product of three variables, so this is a constraint-
programming / nonlinear problem, hence CP-SAT (cp_model) rather than the
LP/MIP solver. Read the comments at:
- PARAMETERS (initial pools + arrival schedule + bonus mode)
- "OBJECTIVE IS SET HERE" (the product being maximised -- tweak freely)
"""
from ortools.sat.python import cp_model
import printer
# ======================================================================
# PARAMETERS -- edit these
# ======================================================================
# Starting resource pools at the start of step 1: (Electrum, Brass, Steel, Capital)
# Note: all resource values are scaled by 10 internally so that fractional
# trade-good splits (Baron/Fence et al, phase B/C) can be modeled with integers.
# Display in _report divides by 10.
INITIAL = (30, 30, 30, 30)
# fixed_choices = {
# "actions": {
# (city_idx, step): "action_name", # e.g., (0, 2): "collect"
# # ...
# },
# "governors": {
# (city_idx, step, agent_name): bool, # e.g., (1, 1, "provisioner"): True
# # ...
# }
# }
#
# None for nothing
# FIXED_CHOICES = None
FIXED_CHOICES = {
"actions": {
(0, 1): "renovate_h",
(1, 1): "renovate_h",
(2, 1): "renovate_h",
(3, 1): "overwork",
(0, 2): "collect",
(1, 2): "collect",
(2, 2): "overwork",
(3, 2): "upgrade_a",
(0, 3): "collect",
(1, 3): "collect",
(2, 3): "upgrade_a",
(3, 3): "overwork",
(0, 4): "collect",
(1, 4): "collect",
(2, 4): "overwork",
(3, 4): "upgrade_b",
(0, 5): "collect",
(1, 5): "collect",
(2, 5): "upgrade_b",
(3, 5): "overwork",
}
}
# Resource constraints: list of callables that receive a dict with keys E, B, S, C
# mapping to resource variables indexed by step. Each callable returns a constraint
# (or None to skip) to be passed to m.Add(). Example:
# lambda res: res["E"][3] >= 50 # Ensure at least 50 E at step 3
RESOURCE_CONSTRAINTS = []
# Arrival schedule. Key = step (1..5), value = list of city types that arrive
# at the START of that step. Types: 'H' Hub, 'F' Foundry, 'M' Metropolis, 'N' Monument.
# Arriving cities act that same step.
# Each entry may be either a string shorthand (e.g. "H") or a dict, e.g.
# {"type": "F", "adjacent_to": [city_idx, ...], "vats": {"E": 1, "B": 1, "S": 1},
# "departure_step": NUM_STEPS + 1}
# `adjacent_to` is currently parsed but unused; see normalize_city().
# Optional `vats` overrides initial vat values at arrival (each defaults to 1).
# Optional `departure_step` is the first step at which the city is ABSENT;
# default NUM_STEPS+1 means the city stays through the whole simulation.
ARRIVALS = {
1: [
{"type": "F", "adjacent_to": []},
{"type": "F", "adjacent_to": []},
{"type": "F", "adjacent_to": []},
{"type": "H", "adjacent_to": []},
],
2: [],
3: [],
4: [],
5: [],
}
def normalize_city(c):
if isinstance(c, str):
c = {"type": c}
vats_in = c.get("vats", {}) or {}
vats = {
"E": vats_in.get("E", 1),
"B": vats_in.get("B", 1),
"S": vats_in.get("S", 1),
}
return {
"type": c["type"],
"adjacent_to": c.get("adjacent_to", []),
"vats": vats,
"departure_step": c.get("departure_step", NUM_STEPS + 1),
}
# Collect Bonus (b) adds +1 to whatever a Collect gives. On a foundry that is
# +1 of the resource collected (i.e. the chosen vat's value + 1). This is the
# same uniform rule as Hub (+1 Capital) and Metropolis (+1 resource pick).
NUM_STEPS = 5
MAX_RES = (
2000 # upper bound on any resource pool (scaled x10; raise if you expect more)
)
MAX_VAT = 15 # upper bound on a foundry vat value (1 + 2*nsteps is plenty); unscaled
# Bastion counts indexed by t-1; bastions on map at step t. Reserved for
# phase B/C/D agents (no effect yet).
BASTION_COUNTS = [3, 3, 3, 3, 3]
# Agent name -> list of steps where the agent is available.
# Empty/missing means the agent is disabled. Phase B/C/D will populate.
AGENT_AVAILABILITY = {
"capitalist": [],
"baron": [],
"prodigy": [],
"provisioner": [],
"metallurgist": [],
"builder": [],
"courier": [],
"planner": [],
"fence": [],
"foreman": [],
"industrialist": [],
}
# ======================================================================
# ACTION REGISTRY CONFIGURATION
# ======================================================================
ENABLED_ACTIONS = {
"collect": True,
"upgrade_a": True,
"upgrade_b": True,
"upgrade_d": True,
"overwork": True,
"renovate_h": True,
"renovate_f": True,
"noop": True,
# "renovate_m": False, # currently always disabled in main.py
}
# ======================================================================
# ACTION REGISTRY BASE CLASS
# ======================================================================
class Action:
"""Base class for all actions. Subclasses implement the action's behavior."""
def __init__(self, name):
self.name = name
def declare_vars(self, model, i, t, ctx):
"""Declare the action's decision variable(s) for city i at step t.
Should create and store a selector var (selector_var) in ctx['action_vars'][self.name].
"""
pass
def add_constraints(self, model, i, t, ctx):
"""Add precondition and legality constraints for this action."""
pass
def add_global_constraints(self, model, t, ctx):
"""Add per-step global constraints (e.g. overwork's 'at most 1 per step')."""
pass
def contributes_gains_costs(self, model, i, t, ctx):
"""Add contributions to gain_E/B/S/C and cost_C/S accumulators."""
pass
def transition_contrib(self, model, i, t, ctx):
"""Handle state transitions (type, upgrade, vat) caused by this action."""
pass
def selector_var(self, i, t, ctx):
"""Return the boolean variable that is 1 iff this action is chosen for (i,t).
Used to form the "exactly one action" constraint.
"""
return ctx["action_vars"].get(self.name, {}).get((i, t), None)
def describe(self, solver, i, t, ctx):
"""Return a human-readable string describing what this action does for (i,t).
Called during reporting if this action is the chosen one.
"""
return f"{self.name}"
# (VatsMixin removed — vat transitions are handled centrally in solve())
# ======================================================================
# CONCRETE ACTIONS
# ======================================================================
class NoOpAction(Action):
def __init__(self):
super().__init__("noop")
def declare_vars(self, model, i, t, ctx):
noop = model.NewBoolVar(f"noop_{i}_{t}")
ctx["action_vars"]["noop"][(i, t)] = noop
ctx["noop"][i, t] = noop
# only allowed after overwork to avoid more things needing to be checked
def add_constraints(self, model, i, t, ctx):
noop = ctx["noop"][i, t]
if t == ctx["arrival_step"][i]:
# Can't noop at arrival (no prior step)
model.Add(noop == 0)
else:
# noop ≤ ow[i, t-1]: noop can be 1 only if overwork was 1 last step
model.Add(noop <= ctx["ow"][i, t - 1])
def add_global_constraints(self, model, t, ctx):
pass
def contributes_gains_costs(self, model, i, t, ctx):
# No gains or costs
pass
def transition_contrib(self, model, i, t, ctx):
pass
def describe(self, solver, i, t, ctx):
return "Rest (no action)"
class CollectAction(Action):
def __init__(self):
super().__init__("collect")
self.AND_cache = {}
def declare_vars(self, model, i, t, ctx):
m = model
col = m.NewBoolVar(f"col_{i}_{t}")
cvE = m.NewBoolVar(f"cvE_{i}_{t}")
cvB = m.NewBoolVar(f"cvB_{i}_{t}")
cvS = m.NewBoolVar(f"cvS_{i}_{t}")
mE = m.NewIntVar(0, 30, f"mE_{i}_{t}")
mB = m.NewIntVar(0, 30, f"mB_{i}_{t}")
mS = m.NewIntVar(0, 30, f"mS_{i}_{t}")
mC = m.NewIntVar(0, 30, f"mC_{i}_{t}")
ctx["action_vars"]["collect"][(i, t)] = col
ctx["col"][i, t] = col
ctx["cvE"][i, t] = cvE
ctx["cvB"][i, t] = cvB
ctx["cvS"][i, t] = cvS
ctx["mE"][i, t] = mE
ctx["mB"][i, t] = mB
ctx["mS"][i, t] = mS
ctx["mC"][i, t] = mC
def add_constraints(self, model, i, t, ctx):
m = model
isF = ctx["isF"]
isM = ctx["isM"]
isMon = ctx["isMon"]
hasB = ctx["hasB"]
col = ctx["col"][i, t]
cvE, cvB, cvS = ctx["cvE"][i, t], ctx["cvB"][i, t], ctx["cvS"][i, t]
mE, mB, mS, mC = (
ctx["mE"][i, t],
ctx["mB"][i, t],
ctx["mS"][i, t],
ctx["mC"][i, t],
)
# Monument can't collect
m.Add(col == 0).OnlyEnforceIf(isMon[i, t])
# Foundry: exactly one vat chosen iff foundry collects
fcol = self._AND(model, isF[i, t], col, ctx)
m.Add(cvE + cvB + cvS == fcol)
# Metropolis: pick (2 + hasB) resources
mcol = self._AND(model, isM[i, t], col, ctx)
npick = m.NewIntVar(0, 3, "")
m.Add(npick == 2 + hasB[i, t]).OnlyEnforceIf(mcol)
m.Add(npick == 0).OnlyEnforceIf(mcol.Not())
# Each "pick" is 10 units of the chosen resource (x10 scaling).
m.Add(mE + mB + mS + mC == npick * 10)
def add_global_constraints(self, model, t, ctx):
pass
def contributes_gains_costs(self, model, i, t, ctx):
m = model
isH = ctx["isH"]
isF = ctx["isF"]
isM = ctx["isM"]
hasB = ctx["hasB"]
vE, vB, vS = ctx["vE"], ctx["vB"], ctx["vS"]
col = ctx["col"][i, t]
cvE, cvB, cvS = ctx["cvE"][i, t], ctx["cvB"][i, t], ctx["cvS"][i, t]
mE, mB, mS, mC = (
ctx["mE"][i, t],
ctx["mB"][i, t],
ctx["mS"][i, t],
ctx["mC"][i, t],
)
gain_E = ctx["gain_E"][t]
gain_B = ctx["gain_B"][t]
gain_S = ctx["gain_S"][t]
gain_C = ctx["gain_C"][t]
cost_C = ctx["cost_C"][t]
capital_spent = ctx["capital_spent"][t]
# Collect sub-choices
fcol = self._AND(model, isF[i, t], col, ctx)
mcol = self._AND(model, isM[i, t], col, ctx)
hcol = self._AND(model, isH[i, t], col, ctx)
# Costs: foundry & metropolis collect each cost 1 Capital (x10 scaled)
fcol_cost = m.NewIntVar(0, 10, "")
m.Add(fcol_cost == 10 * fcol)
cost_C.append(fcol_cost)
capital_spent.append(fcol_cost)
mcol_cost = m.NewIntVar(0, 10, "")
m.Add(mcol_cost == 10 * mcol)
cost_C.append(mcol_cost)
capital_spent.append(mcol_cost)
# Hub collect: +2 Capital (+1 more if collect-bonus b) -> x10
hub_gain = m.NewIntVar(0, 30, "")
m.Add(hub_gain == 10 * (2 + hasB[i, t])).OnlyEnforceIf(hcol)
m.Add(hub_gain == 0).OnlyEnforceIf(hcol.Not())
gain_C.append(hub_gain)
# Metropolis collect: each "pick" is already 10 units (mE/mB/mS/mC).
gain_E.append(mE)
gain_B.append(mB)
gain_S.append(mS)
gain_C.append(mC)
# Foundry collect: gain chosen vat's value as that resource (vat unscaled),
# then scale x10 for the resource gain.
gEf = m.NewIntVar(0, ctx["max_vat"], "")
gBf = m.NewIntVar(0, ctx["max_vat"], "")
gSf = m.NewIntVar(0, ctx["max_vat"], "")
m.AddMultiplicationEquality(gEf, [cvE, vE[i, t]])
m.AddMultiplicationEquality(gBf, [cvB, vB[i, t]])
m.AddMultiplicationEquality(gSf, [cvS, vS[i, t]])
gEf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "")
gBf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "")
gSf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "")
m.Add(gEf_scaled == 10 * gEf)
m.Add(gBf_scaled == 10 * gBf)
m.Add(gSf_scaled == 10 * gSf)
gain_E.append(gEf_scaled)
gain_B.append(gBf_scaled)
gain_S.append(gSf_scaled)
# Collect Bonus (b): adds +1 to the amount a Collect gives -> +10 scaled
be = self._AND(model, cvE, hasB[i, t], ctx)
bb = self._AND(model, cvB, hasB[i, t], ctx)
bs = self._AND(model, cvS, hasB[i, t], ctx)
be10 = m.NewIntVar(0, 10, "")
bb10 = m.NewIntVar(0, 10, "")
bs10 = m.NewIntVar(0, 10, "")
m.Add(be10 == 10 * be)
m.Add(bb10 == 10 * bb)
m.Add(bs10 == 10 * bs)
gain_E.append(be10)
gain_B.append(bb10)
gain_S.append(bs10)
def transition_contrib(self, model, i, t, ctx):
pass # Vat transitions handled centrally in solve()
def describe(self, solver, i, t, ctx):
typ_name = ctx["typ_name"].get((i, t), "-")
cvE = ctx["cvE"].get((i, t))
cvB = ctx["cvB"].get((i, t))
cvS = ctx["cvS"].get((i, t))
mE = ctx["mE"].get((i, t))
mB = ctx["mB"].get((i, t))
mS = ctx["mS"].get((i, t))
mC = ctx["mC"].get((i, t))
if typ_name == "Foundry":
v = "E" if solver.Value(cvE) else "B" if solver.Value(cvB) else "S"
return f"Collect vat {v}"
if typ_name == "Metro":
picks = []
for nm, var in (("E", mE), ("B", mB), ("S", mS), ("C", mC)):
picks += [nm] * (solver.Value(var) // 10)
return "Collect {" + ",".join(picks) + "}"
return "Collect (+Capital)"
def _AND(self, model, a, b, ctx):
key = (id(a), id(b))
if key not in self.AND_cache:
c = model.NewBoolVar("")
model.AddMultiplicationEquality(c, [a, b])
self.AND_cache[key] = c
return self.AND_cache[key]
class UpgradeAction(Action):
"""Base class for upgrade actions (a, b, d)."""
def __init__(self, name, upgrade_key):
super().__init__(name)
self.upgrade_key = upgrade_key # 'A', 'B', or 'D'
def declare_vars(self, model, i, t, ctx):
m = model
u = m.NewBoolVar(f"{self.upgrade_key.lower()}_{i}_{t}")
ctx["action_vars"][self.name][(i, t)] = u
ctx[f"u{self.upgrade_key.lower()}"][i, t] = u
def add_constraints(self, model, i, t, ctx):
m = model
u = ctx[f"u{self.upgrade_key.lower()}"][i, t]
has_key = ctx[f"has{self.upgrade_key}"]
isMon = ctx["isMon"]
isF = ctx["isF"]
# Can't re-acquire
m.Add(has_key[i, t] == 0).OnlyEnforceIf(u)
# Monument can't upgrade
m.Add(u == 0).OnlyEnforceIf(isMon[i, t])
# D is foundry-only
if self.upgrade_key == "D":
m.Add(isF[i, t] == 1).OnlyEnforceIf(u)
def add_global_constraints(self, model, t, ctx):
pass
def contributes_gains_costs(self, model, i, t, ctx):
m = model
u = ctx[f"u{self.upgrade_key.lower()}"][i, t]
hasA = ctx["hasA"]
cost_S = ctx["cost_S"][t]
# Upgrades b,d cost 2 Steel, reduced by 1 if cost-reduction (a) already held
# (x10 scaled: 20 / 10).
if self.upgrade_key in ("B", "D"):
c = m.NewIntVar(0, 20, "")
both = m.NewBoolVar("")
m.AddMultiplicationEquality(both, [u, hasA[i, t]])
m.Add(c == 20).OnlyEnforceIf(u, hasA[i, t].Not())
m.Add(c == 10).OnlyEnforceIf(both)
m.Add(c == 0).OnlyEnforceIf(u.Not())
cost_S.append(c)
def transition_contrib(self, model, i, t, ctx):
pass
def describe(self, solver, i, t, ctx):
if self.upgrade_key == "A":
return "Upgrade a (cost-reduction)"
elif self.upgrade_key == "B":
return "Upgrade b (collect-bonus)"
elif self.upgrade_key == "D":
return "Upgrade d (vat-increment)"
return self.name
class OverworkAction(Action):
def __init__(self):
super().__init__("overwork")
self.AND_cache = {}
def declare_vars(self, model, i, t, ctx):
m = model
ow = m.NewBoolVar(f"ow_{i}_{t}")
owcvE = m.NewBoolVar(f"owcvE_{i}_{t}")
owcvB = m.NewBoolVar(f"owcvB_{i}_{t}")
owcvS = m.NewBoolVar(f"owcvS_{i}_{t}")
owmE = m.NewIntVar(0, 60, f"owmE_{i}_{t}")
owmB = m.NewIntVar(0, 60, f"owmB_{i}_{t}")
owmS = m.NewIntVar(0, 60, f"owmS_{i}_{t}")
owmC = m.NewIntVar(0, 60, f"owmC_{i}_{t}")
ctx["action_vars"]["overwork"][(i, t)] = ow
ctx["ow"][i, t] = ow
ctx["owcvE"][i, t] = owcvE
ctx["owcvB"][i, t] = owcvB
ctx["owcvS"][i, t] = owcvS
ctx["owmE"][i, t] = owmE
ctx["owmB"][i, t] = owmB
ctx["owmS"][i, t] = owmS
ctx["owmC"][i, t] = owmC
def add_constraints(self, model, i, t, ctx):
m = model
isF = ctx["isF"]
isM = ctx["isM"]
isMon = ctx["isMon"]
hasB = ctx["hasB"]
ow = ctx["ow"][i, t]
owcvE = ctx["owcvE"][i, t]
owcvB = ctx["owcvB"][i, t]
owcvS = ctx["owcvS"][i, t]
owmE = ctx["owmE"][i, t]
owmB = ctx["owmB"][i, t]
owmS = ctx["owmS"][i, t]
owmC = ctx["owmC"][i, t]
# Monument can't overwork
m.Add(ow == 0).OnlyEnforceIf(isMon[i, t])
# Foundry overwork: exactly one vat chosen iff foundry overworks
fow = self._AND(model, isF[i, t], ow, ctx)
m.Add(owcvE + owcvB + owcvS == fow)
# Metropolis overwork: pick 2*(2 + hasB) resources
mow = self._AND(model, isM[i, t], ow, ctx)
ow_npick = m.NewIntVar(0, 6, "")
m.Add(ow_npick == 2 * (2 + hasB[i, t])).OnlyEnforceIf(mow)
m.Add(ow_npick == 0).OnlyEnforceIf(mow.Not())
# Each "pick" is 10 units (x10 scaling).
m.Add(owmE + owmB + owmS + owmC == ow_npick * 10)
# Overwork cooldown: city that overworked last step can't collect or overwork
if t > ctx["arrival_step"][i]:
col = ctx["col"].get((i, t))
if col is not None:
m.Add(col == 0).OnlyEnforceIf(ctx["ow"][i, t - 1])
m.Add(ow == 0).OnlyEnforceIf(ctx["ow"][i, t - 1])
# Planner gating: when planner is enabled, a city may overwork only if
# the planner agent governs it. Removes the global "at most 1" cap.
if "planner" in AGENT_AVAILABILITY:
planner_g = ctx["governor"].get((i, t, "planner"))
if planner_g is not None:
m.Add(ow == 0).OnlyEnforceIf(planner_g.Not())
else:
m.Add(ow == 0)
def add_global_constraints(self, model, t, ctx):
m = model
ow = ctx["ow"]
N = ctx["N"]
# no planner = no overworks
if "planner" not in AGENT_AVAILABILITY:
m.Add(sum(ow[i, t] for i in range(N)) <= 0)
# else: gating handled per-city by Planner
def contributes_gains_costs(self, model, i, t, ctx):
m = model
isH = ctx["isH"]
isF = ctx["isF"]
isM = ctx["isM"]
hasB = ctx["hasB"]
vE, vB, vS = ctx["vE"], ctx["vB"], ctx["vS"]
ow = ctx["ow"][i, t]
owcvE = ctx["owcvE"][i, t]
owcvB = ctx["owcvB"][i, t]
owcvS = ctx["owcvS"][i, t]
owmE = ctx["owmE"][i, t]
owmB = ctx["owmB"][i, t]
owmS = ctx["owmS"][i, t]
owmC = ctx["owmC"][i, t]
gain_E = ctx["gain_E"][t]
gain_B = ctx["gain_B"][t]
gain_S = ctx["gain_S"][t]
gain_C = ctx["gain_C"][t]
# Overwork sub-choices
fow = self._AND(model, isF[i, t], ow, ctx)
mow = self._AND(model, isM[i, t], ow, ctx)
how = self._AND(model, isH[i, t], ow, ctx)
# Hub overwork: 2*(2 + hasB) Capital -> x10
hub_ow_gain = m.NewIntVar(0, 60, "")
m.Add(hub_ow_gain == 20 * (2 + hasB[i, t])).OnlyEnforceIf(how)
m.Add(hub_ow_gain == 0).OnlyEnforceIf(how.Not())
gain_C.append(hub_ow_gain)
# Metropolis overwork: each "pick" is already 10 units (owm* vars).
gain_E.append(owmE)
gain_B.append(owmB)
gain_S.append(owmS)
gain_C.append(owmC)
# Foundry overwork: 2 * chosen vat's value, then x10 scaling.
_owE = m.NewIntVar(0, ctx["max_vat"], "")
_owB = m.NewIntVar(0, ctx["max_vat"], "")
_owS = m.NewIntVar(0, ctx["max_vat"], "")
m.AddMultiplicationEquality(_owE, [owcvE, vE[i, t]])
m.AddMultiplicationEquality(_owB, [owcvB, vB[i, t]])
m.AddMultiplicationEquality(_owS, [owcvS, vS[i, t]])
owgEf = m.NewIntVar(0, 20 * ctx["max_vat"], "")
owgBf = m.NewIntVar(0, 20 * ctx["max_vat"], "")
owgSf = m.NewIntVar(0, 20 * ctx["max_vat"], "")
m.Add(owgEf == 20 * _owE)
m.Add(owgBf == 20 * _owB)
m.Add(owgSf == 20 * _owS)
gain_E.append(owgEf)
gain_B.append(owgBf)
gain_S.append(owgSf)
# Collect Bonus (b) for foundry overwork: 2 * (+1) of that resource -> x10 = +20
ow_be = self._AND(model, owcvE, hasB[i, t], ctx)
ow_bb = self._AND(model, owcvB, hasB[i, t], ctx)
ow_bs = self._AND(model, owcvS, hasB[i, t], ctx)
ow_be20 = m.NewIntVar(0, 20, "")
ow_bb20 = m.NewIntVar(0, 20, "")
ow_bs20 = m.NewIntVar(0, 20, "")
m.Add(ow_be20 == 20 * ow_be)
m.Add(ow_bb20 == 20 * ow_bb)
m.Add(ow_bs20 == 20 * ow_bs)
gain_E.append(ow_be20)
gain_B.append(ow_bb20)
gain_S.append(ow_bs20)
def transition_contrib(self, model, i, t, ctx):
pass # Vat transitions handled centrally in solve()
def describe(self, solver, i, t, ctx):
typ_name = ctx["typ_name"].get((i, t), "-")
owcvE = ctx["owcvE"].get((i, t))
owcvB = ctx["owcvB"].get((i, t))
owcvS = ctx["owcvS"].get((i, t))
owmE = ctx["owmE"].get((i, t))
owmB = ctx["owmB"].get((i, t))
owmS = ctx["owmS"].get((i, t))
owmC = ctx["owmC"].get((i, t))
if typ_name == "Foundry":
v = "E" if solver.Value(owcvE) else "B" if solver.Value(owcvB) else "S"
return f"Overwork vat {v} (2x)"
if typ_name == "Metro":
picks = []
for nm, var in (("E", owmE), ("B", owmB), ("S", owmS), ("C", owmC)):
picks += [nm] * (solver.Value(var) // 10)
return "Overwork {" + ",".join(picks) + "} (2x)"
return "Overwork (+Capital 2x)"
def _AND(self, model, a, b, ctx):
key = (id(a), id(b))
if not hasattr(self, "AND_cache"):
self.AND_cache = {}
if key not in self.AND_cache:
c = model.NewBoolVar("")
model.AddMultiplicationEquality(c, [a, b])
self.AND_cache[key] = c
return self.AND_cache[key]
class RenovateAction(Action):
"""Base class for renovation actions."""
def __init__(self, name, target_type):
super().__init__(name)
self.target_type = target_type # 'H', 'F', etc.
def declare_vars(self, model, i, t, ctx):
m = model
r = m.NewBoolVar(f"r{self.target_type}_{i}_{t}")
ctx["action_vars"][self.name][(i, t)] = r
ctx[f"r{self.target_type}"][i, t] = r
def add_constraints(self, model, i, t, ctx):
m = model
r = ctx[f"r{self.target_type}"][i, t]
target = ctx[f"is{self.target_type}"]
# Renovate-to-X requires not-X now
m.Add(target[i, t] == 0).OnlyEnforceIf(r)
def add_global_constraints(self, model, t, ctx):
pass
def contributes_gains_costs(self, model, i, t, ctx):
pass
def transition_contrib(self, model, i, t, ctx):
pass
def describe(self, solver, i, t, ctx):
type_name = {"H": "Hub", "F": "Foundry", "M": "Metro"}
return f"Renovate -> {type_name.get(self.target_type, self.target_type)}"
# ======================================================================
# AGENT REGISTRY BASE CLASSES
# ======================================================================
class Agent:
"""Governor-style agent. Assigned to a city per step via governor[i,t,name]."""
name: str
available_steps: list # steps where this agent may be assigned
def __init__(self, name, available_steps):
self.name = name
self.available_steps = list(available_steps)
def declare_vars(self, model, ctx):
"""One-time vars (e.g. one-shot flags). Called once before per-(i,t) loop."""
pass
def apply(self, model, i, t, ctx):
"""Per-(city,step) effect. Gate logic on ctx['governor'][i,t,self.name]."""
pass
class EventAgent:
"""Triggers automatically each available step. No city target.
Reads/writes ctx (typically tg_pool or gain_*)."""
name: str
available_steps: list
def __init__(self, name, available_steps):
self.name = name
self.available_steps = list(available_steps)
def declare_vars(self, model, ctx):
pass
def apply_event(self, model, t, ctx):
pass
# ======================================================================
# PHASE B AGENTS
# ======================================================================
class Capitalist(Agent):
"""+20 Capital (scaled = +2 Capital) when governing a Collect action."""
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
col = ctx["col"].get((i, t))
if col is None:
return
both = m.NewBoolVar("")
m.AddMultiplicationEquality(both, [g, col])
bonus = m.NewIntVar(0, 20, "")
m.Add(bonus == 20).OnlyEnforceIf(both)
m.Add(bonus == 0).OnlyEnforceIf(both.Not())
ctx["gain_C"][t].append(bonus)
class Baron(Agent):
"""BASTION_COUNTS[t-1]*10 trade-goods when governing a Collect action."""
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
col = ctx["col"].get((i, t))
if col is None:
return
amt = BASTION_COUNTS[t - 1] * 10
both = m.NewBoolVar("")
m.AddMultiplicationEquality(both, [g, col])
deposit = m.NewIntVar(0, max(amt, 0), "")
m.Add(deposit == amt).OnlyEnforceIf(both)
m.Add(deposit == 0).OnlyEnforceIf(both.Not())
ctx["tg_pool"][t].append(deposit)
ctx.setdefault("baron_deposits", {})[t] = deposit
class Prodigy(Agent):
"""+20 Steel (refund 2 Steel) when governing AND city upgrades B or D."""
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
for u_var in (ctx["ub"].get((i, t)), ctx["ud"].get((i, t))):
if u_var is None:
continue
both = m.NewBoolVar("")
m.AddMultiplicationEquality(both, [g, u_var])
refund = m.NewIntVar(0, 20, "")
m.Add(refund == 20).OnlyEnforceIf(both)
m.Add(refund == 0).OnlyEnforceIf(both.Not())
ctx["gain_S"][t].append(refund)
class Provisioner(Agent):
"""+15 Electrum (+1.5 scaled) when governing, unconditional."""
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
bonus = m.NewIntVar(0, 15, "")
m.Add(bonus == 15).OnlyEnforceIf(g)
m.Add(bonus == 0).OnlyEnforceIf(g.Not())
ctx["gain_E"][t].append(bonus)
class Metallurgist(Agent):
"""Foundry-only: adds +1 to vat increment. Implemented centrally in solve().
This class exists only for registry uniformity."""
def apply(self, model, i, t, ctx):
# Vat-level effect handled in solve()'s centralized vat-transition block.
pass
class Builder(Agent):
"""One-shot: grants hasD[i,t+1]=1 when governing a Foundry city.
Fires at most once across all (i,t)."""
def declare_vars(self, model, ctx):
ctx.setdefault("builder_fire", {})
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
isF = ctx["isF"]
fire = m.NewBoolVar(f"builder_fire_{i}_{t}")
m.AddMultiplicationEquality(fire, [g, isF[i, t]])
ctx.setdefault("builder_fire", {})[(i, t)] = fire
class Courier(Agent):
"""One-shot globally: +30 to gain_C, gain_S, gain_B on the first step
where Courier governs anywhere."""
def declare_vars(self, model, ctx):
m = model
N = ctx["N"]
governor = ctx["governor"]
gain_C = ctx["gain_C"]
gain_S = ctx["gain_S"]
gain_B = ctx["gain_B"]
courier_any = {}
courier_fired = {}
ever_fired = {}
prev_ever = None
for t in range(1, NUM_STEPS + 1):
terms = []
for i in range(N):
v = governor.get((i, t, self.name))
if v is not None:
terms.append(v)
any_t = m.NewBoolVar(f"courier_any_{t}")
if terms:
# any_t = OR(terms)
m.AddMaxEquality(any_t, terms)
else:
m.Add(any_t == 0)
courier_any[t] = any_t
fired_t = m.NewBoolVar(f"courier_fired_{t}")
if prev_ever is None:
# fired_t == any_t (no prior history)
m.Add(fired_t == any_t)
else:
# fired_t = any_t AND NOT prev_ever
m.Add(fired_t <= any_t)
m.Add(fired_t <= 1 - prev_ever)
m.Add(fired_t >= any_t - prev_ever)
courier_fired[t] = fired_t
ever_t = m.NewBoolVar(f"courier_ever_{t}")
if prev_ever is None:
m.Add(ever_t == fired_t)
else:
m.AddMaxEquality(ever_t, [prev_ever, fired_t])
ever_fired[t] = ever_t
prev_ever = ever_t
# Add +30 contribution to gain_C/S/B
for accum in (gain_C[t], gain_S[t], gain_B[t]):
bonus = m.NewIntVar(0, 30, "")
m.Add(bonus == 30).OnlyEnforceIf(fired_t)
m.Add(bonus == 0).OnlyEnforceIf(fired_t.Not())
accum.append(bonus)
ctx["courier_any"] = courier_any
ctx["courier_fired"] = courier_fired
def apply(self, model, i, t, ctx):
pass
# ======================================================================
# PHASE C AGENTS
# ======================================================================
class Planner(Agent):
"""Effect realized via OverworkAction (per-city gating). No-op here."""
def apply(self, model, i, t, ctx):
pass
class Foreman(Agent):
"""TODO: Restructure ability — allows other industry actions during renovation.
The current model uses 'exactly one action per city per step', so renovation
already blocks no specific action separately. Foreman becomes meaningful only
once renovation-blocks-action semantics are modeled.
"""
def apply(self, model, i, t, ctx):
pass
class Industrialist(Agent):
"""Network ability — grants Infrastructure (hasA) to governed city and its
adjacent cities when appointed governor."""
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
cities = ctx.get("cities")
hasA = ctx.get("hasA")
NUM_STEPS = ctx.get("NUM_STEPS", 5)
if cities is None or hasA is None:
return
a_step, a_city = cities[i]
adjacent_indices = a_city.get("adjacent_to", [])
# When Industrialist governs city i at step t, grant hasA to city i and adjacent cities at t+1
if t + 1 <= NUM_STEPS:
# Grant to governed city
m.Add(hasA[i, t + 1] == 1).OnlyEnforceIf(g)
# Grant to adjacent cities
for adj_i in adjacent_indices:
m.Add(hasA[adj_i, t + 1] == 1).OnlyEnforceIf(g)
class Fence(EventAgent):
"""Each available step, deposits +20 (=2 trade goods x10) into tg_pool[t]."""
def apply_event(self, model, t, ctx):
deposit = model.NewConstant(20)
ctx["tg_pool"][t].append(deposit)
ctx.setdefault("fence_deposits", {})[t] = 20
# ======================================================================
# MODEL
# ======================================================================
def solve(
*,
initial=INITIAL,
arrivals=ARRIVALS,
max_res=MAX_RES,
max_vat=MAX_VAT,
time_limit=60.0,
num_workers=8,
verbose=True,
fixed_choices=FIXED_CHOICES,
resource_constraints=None,
):
# ---- build the city list -----------------------------------------
# cities: list[tuple[int, dict]] where the dict has keys "type" and
# "adjacent_to". TODO: adjacency unused; for Industrialist agent.
cities = []
for s in range(1, NUM_STEPS + 1):
for entry in arrivals.get(s, []):
cities.append((s, normalize_city(entry)))
N = len(cities)
m = cp_model.CpModel()
def AND(a, b):
"""Boolean AND of two 0/1 vars, returned as a new 0/1 var."""
c = m.NewBoolVar("")
m.AddMultiplicationEquality(c, [a, b])
return c
# ---- state variables ---------------------------------------------
isH, isF, isM, isMon, present = {}, {}, {}, {}, {}
hasA, hasB, hasD = {}, {}, {}
vE, vB, vS = {}, {}, {} # foundry vats at start of step t
# action variables (sparse, only created for enabled actions)
col, ua, ub, ud = {}, {}, {}, {}
ow = {}
noop = {}
rH, rF, rM = {}, {}, {}
cvE, cvB, cvS = {}, {}, {}
owcvE, owcvB, owcvS = {}, {}, {}
mE, mB, mS, mC = {}, {}, {}, {}
owmE, owmB, owmS, owmC = {}, {}, {}, {}
for i in range(N):
a_step, a_city = cities[i]
a_type = a_city["type"]
d_step = a_city["departure_step"]
for t in range(1, NUM_STEPS + 2):
isH[i, t] = m.NewBoolVar(f"isH_{i}_{t}")
isF[i, t] = m.NewBoolVar(f"isF_{i}_{t}")
isM[i, t] = m.NewBoolVar(f"isM_{i}_{t}")
isMon[i, t] = m.NewBoolVar(f"isMon_{i}_{t}")
present[i, t] = m.NewBoolVar(f"present_{i}_{t}")
hasA[i, t] = m.NewBoolVar(f"hasA_{i}_{t}")
hasB[i, t] = m.NewBoolVar(f"hasB_{i}_{t}")
hasD[i, t] = m.NewBoolVar(f"hasD_{i}_{t}")
vE[i, t] = m.NewIntVar(0, max_vat, f"vE_{i}_{t}")
vB[i, t] = m.NewIntVar(0, max_vat, f"vB_{i}_{t}")
vS[i, t] = m.NewIntVar(0, max_vat, f"vS_{i}_{t}")
# presence: present from arrival step until departure step (exclusive)
m.Add(present[i, t] == (1 if a_step <= t < d_step else 0))
# exactly one type iff present
m.Add(isH[i, t] + isF[i, t] + isM[i, t] + isMon[i, t] == present[i, t])
# ---- per-step gain/cost accumulators (linear expressions) ---------
gain_E = {t: [] for t in range(1, NUM_STEPS + 1)}
gain_B = {t: [] for t in range(1, NUM_STEPS + 1)}
gain_S = {t: [] for t in range(1, NUM_STEPS + 1)}
gain_C = {t: [] for t in range(1, NUM_STEPS + 1)}
cost_C = {t: [] for t in range(1, NUM_STEPS + 1)}
cost_S = {t: [] for t in range(1, NUM_STEPS + 1)}
# Trade-goods pool (per step): new generic-trade-good sources (Baron, Fence,
# phase B/C) deposit IntVars here; centralized split logic routes the pool
# into gain_E/B/S/C below. Metro picks are NOT routed through here.
tg_pool = {t: [] for t in range(1, NUM_STEPS + 1)}
# Per-step capital spent tracker (mirrors entries appended to cost_C).
capital_spent = {t: [] for t in range(1, NUM_STEPS + 1)}
# ---- Agent Registry (populated by phase B/C/D; empty in phase A) ----
agent_registry = []
event_agent_registry = []
# ---- Action Registry ----
action_registry = []
if ENABLED_ACTIONS.get("collect", True):
action_registry.append(CollectAction())
if ENABLED_ACTIONS.get("upgrade_a", True):
action_registry.append(UpgradeAction("upgrade_a", "A"))
if ENABLED_ACTIONS.get("upgrade_b", True):
action_registry.append(UpgradeAction("upgrade_b", "B"))
if ENABLED_ACTIONS.get("upgrade_d", True):
action_registry.append(UpgradeAction("upgrade_d", "D"))
if ENABLED_ACTIONS.get("overwork", True):
action_registry.append(OverworkAction())
if ENABLED_ACTIONS.get("renovate_h", True):
action_registry.append(RenovateAction("renovate_h", "H"))
if ENABLED_ACTIONS.get("renovate_f", True):
action_registry.append(RenovateAction("renovate_f", "F"))
if ENABLED_ACTIONS.get("noop", True):
action_registry.append(NoOpAction())
# ---- Governor variable matrix ----
# governor[(i, t, name)] = BoolVar (or constant 0) for whether agent `name`
# governs city i at step t. Created only when agent is available at step t
# AND the city has arrived; otherwise NewConstant(0).
governor = {}
arrival_step_map = {i: cities[i][0] for i in range(N)}
for name, avail_steps in AGENT_AVAILABILITY.items():
avail_set = set(avail_steps)
for i in range(N):
for t in range(1, NUM_STEPS + 1):
if t in avail_set and t >= arrival_step_map[i]:
governor[i, t, name] = m.NewBoolVar(f"gov_{name}_{i}_{t}")
else:
governor[i, t, name] = m.NewConstant(0)
# At each (i, t): at most one agent governs city i.
for i in range(N):
for t in range(1, NUM_STEPS + 1):
terms = [governor[i, t, n] for n in AGENT_AVAILABILITY]
if terms:
m.Add(sum(terms) <= 1)
# At each (name, t): at most one city has agent `name` assigned.
for name in AGENT_AVAILABILITY:
for t in range(1, NUM_STEPS + 1):
m.Add(sum(governor[i, t, name] for i in range(N)) <= 1)
# Phase B agent registration
agent_classes = {
"capitalist": Capitalist,
"baron": Baron,
"prodigy": Prodigy,
"provisioner": Provisioner,
"metallurgist": Metallurgist,
"builder": Builder,
"courier": Courier,
"planner": Planner,
"foreman": Foreman,
"industrialist": Industrialist,
}
for name, cls in agent_classes.items():
if name in AGENT_AVAILABILITY:
agent_registry.append(cls(name, AGENT_AVAILABILITY[name]))
event_agent_classes = {
"fence": Fence,
}
for name, cls in event_agent_classes.items():
if name in AGENT_AVAILABILITY:
event_agent_registry.append(cls(name, AGENT_AVAILABILITY[name]))
# Shared state populated by agent declare_vars (e.g., Builder's fire map).
builder_fire_map = {}
# One-shot agent declarations.
fence_deposits = {}
baron_deposits = {}
_agent_decl_ctx = {
"model": m,
"N": N,
"governor": governor,
"tg_pool": tg_pool,
"capital_spent": capital_spent,
"gain_E": gain_E,
"gain_B": gain_B,
"gain_S": gain_S,
"gain_C": gain_C,
"cost_C": cost_C,
"cost_S": cost_S,
"builder_fire": builder_fire_map,
"fence_deposits": fence_deposits,
"baron_deposits": baron_deposits,
}
for agent in agent_registry:
agent.declare_vars(m, _agent_decl_ctx)
for agent in event_agent_registry:
agent.declare_vars(m, _agent_decl_ctx)
if fixed_choices and "governors" in fixed_choices:
for (city, step, agent_name), value in fixed_choices["governors"].items():
if governor.get((city, step, agent_name)) is not None:
m.Add(governor[city, step, agent_name] == (1 if value else 0))
# ---- per-city logic -----------------------------------------------
for i in range(N):
a_step, a_city = cities[i]
a_type = a_city["type"]
# initial type at arrival step
init = {"H": isH, "F": isF, "M": isM, "N": isMon}
m.Add(init[a_type][i, a_step] == 1)
# no upgrades at arrival
m.Add(hasA[i, a_step] == 0)
m.Add(hasB[i, a_step] == 0)
m.Add(hasD[i, a_step] == 0)
# vats at arrival
m.Add(vE[i, a_step] == a_city["vats"]["E"])
m.Add(vB[i, a_step] == a_city["vats"]["B"])
m.Add(vS[i, a_step] == a_city["vats"]["S"])
# before arrival: everything zero
for t in range(1, a_step):
for v in (isH, isF, isM, isMon, hasA, hasB, hasD, vE, vB, vS):
m.Add(v[i, t] == 0)
# after departure: everything zero from d_step onward
d_step = a_city["departure_step"]
for t in range(d_step, NUM_STEPS + 2):
for v in (isH, isF, isM, isMon, hasA, hasB, hasD, vE, vB, vS):
m.Add(v[i, t] == 0)
# action + transition logic for active steps
for t in range(a_step, min(d_step, NUM_STEPS + 1)):
P = present[i, t]
# Build context for action methods
ctx = {
"noop": noop,
"model": m,
"i": i,
"t": t,
"N": N,
"max_vat": max_vat,
"action_vars": {act.name: {} for act in action_registry},
"arrival_step": {i: a_step for i in range(N)},
"isH": isH,
"isF": isF,
"isM": isM,
"isMon": isMon,
"present": present,
"hasA": hasA,
"hasB": hasB,
"hasD": hasD,
"vE": vE,
"vB": vB,
"vS": vS,
"col": col,
"ua": ua,
"ub": ub,
"ud": ud,
"ow": ow,
"rH": rH,
"rF": rF,
"rM": rM,
"cvE": cvE,
"cvB": cvB,
"cvS": cvS,
"owcvE": owcvE,
"owcvB": owcvB,
"owcvS": owcvS,
"mE": mE,
"mB": mB,
"mS": mS,
"mC": mC,
"owmE": owmE,
"owmB": owmB,
"owmS": owmS,
"owmC": owmC,
"gain_E": gain_E,
"gain_B": gain_B,
"gain_S": gain_S,
"gain_C": gain_C,
"cost_C": cost_C,
"cost_S": cost_S,
"tg_pool": tg_pool,
"capital_spent": capital_spent,
"governor": governor,
"renovations": {},
"vat_next": {},
"builder_fire": builder_fire_map,
"baron_deposits": baron_deposits,
"cities": cities,
"NUM_STEPS": NUM_STEPS,
}
# Declare all action variables
for action in action_registry:
action.declare_vars(m, i, t, ctx)
# After declaring all action variables for (i,t)
if fixed_choices and "actions" in fixed_choices:
action_key = (i, t)
if action_key in fixed_choices["actions"]:
fixed_action = fixed_choices["actions"][action_key]
# Set the chosen action to 1, others to 0
for action in action_registry:
var = action.selector_var(i, t, ctx)
if var is not None:
if action.name == fixed_action:
m.Add(var == 1)
else:
m.Add(var == 0)
# renovations helper (used by multiple actions)
ren = m.NewBoolVar("")
rH_act = rH.get((i, t))
rF_act = rF.get((i, t))
rM_act = rM.get((i, t))
ren_sum = sum(x for x in [rH_act, rF_act, rM_act] if x is not None)
m.Add(ren == ren_sum)
ctx["renovations"][i, t] = ren
no_ren = m.NewBoolVar("")
m.Add(no_ren == 1 - ren)
# Add all constraints
for action in action_registry:
action.add_constraints(m, i, t, ctx)
# LLM allowed renovation into metropolis, need to prevent that now
if rM.get((i, t)) is not None:
m.Add(rM[i, t] == 0)
# Renovation must change type (renovate-to-X requires not-X now)
for r, target in [
(rH.get((i, t)), isH),
(rF.get((i, t)), isF),
(rM.get((i, t)), isM),
]:
if r is not None:
m.Add(target[i, t] == 0).OnlyEnforceIf(r)
# Exactly one action while present (sum all action selectors)
action_sum = sum(
action.selector_var(i, t, ctx)
for action in action_registry
if action.selector_var(i, t, ctx) is not None
)
m.Add(action_sum == P)
# Agent per-(city, step) hooks - run BEFORE upgrade transitions so
# Builder can register its fire var that feeds into d_keep.
for agent in agent_registry:
agent.apply(m, i, t, ctx)
# Upgrade transitions (skip propagation past last present step)
if t + 1 < d_step:
m.AddMaxEquality(
hasA[i, t + 1], [hasA[i, t], ua.get((i, t), m.NewConstant(0))]
)
m.AddMaxEquality(
hasB[i, t + 1], [hasB[i, t], ub.get((i, t), m.NewConstant(0))]
)
d_keep = m.NewBoolVar("")
d_sources = [hasD[i, t], ud.get((i, t), m.NewConstant(0))]
_bfire = builder_fire_map.get((i, t))
if _bfire is not None:
d_sources.append(_bfire)
m.AddMaxEquality(d_keep, d_sources)
m.Add(hasD[i, t + 1] == 0).OnlyEnforceIf(ren)
m.Add(hasD[i, t + 1] == d_keep).OnlyEnforceIf(no_ren)
# Gain/cost contributions
for action in action_registry:
action.contributes_gains_costs(m, i, t, ctx)
if t + 1 < d_step:
# Transition contributions (including vats)
for action in action_registry:
action.transition_contrib(m, i, t, ctx)
# Type transition t -> t+1
m.Add(isH[i, t + 1] == isH[i, t]).OnlyEnforceIf(no_ren)
m.Add(isF[i, t + 1] == isF[i, t]).OnlyEnforceIf(no_ren)
m.Add(isM[i, t + 1] == isM[i, t]).OnlyEnforceIf(no_ren)
for r, target in [
(rH.get((i, t)), isH),
(rF.get((i, t)), isF),
(rM.get((i, t)), isM),
]:
if r is not None:
m.Add(target[i, t + 1] == 1).OnlyEnforceIf(r)
for r1, t1, t2 in [
(rH.get((i, t)), isF, isM),
(rF.get((i, t)), isH, isM),
(rM.get((i, t)), isH, isF),
]:
if r1 is not None:
m.Add(t1[i, t + 1] == 0).OnlyEnforceIf(r1)
m.Add(t2[i, t + 1] == 0).OnlyEnforceIf(r1)
# ---- Centralized vat transitions (mirrors main.py exactly) ----
# Collect/overwork vat selectors (None if action disabled)
_cvE = cvE.get((i, t))
_cvB = cvB.get((i, t))
_cvS = cvS.get((i, t))
_owcvE = owcvE.get((i, t))
_owcvB = owcvB.get((i, t))
_owcvS = owcvS.get((i, t))
inc = m.NewIntVar(1, 3, "")
metallurgist_g = governor.get((i, t, "metallurgist"))
if metallurgist_g is not None and "metallurgist" in AGENT_AVAILABILITY:
metal_active = m.NewBoolVar("")
m.AddMultiplicationEquality(
metal_active, [metallurgist_g, isF[i, t]]
)
m.Add(inc == 1 + metal_active)
else:
m.Add(inc == 1 + hasD[i, t])
vEn = m.NewIntVar(0, max_vat, "")
vBn = m.NewIntVar(0, max_vat, "")
vSn = m.NewIntVar(0, max_vat, "")
# For each vat selector (collect E, collect B, collect S, overwork E, overwork B, overwork S)
# apply the same reset/increment pattern from main.py
for sel_E, sel_B, sel_S in [
(_cvE, _cvB, _cvS),
(_owcvE, _owcvB, _owcvS),
]:
if sel_E is not None:
m.Add(vEn == 0).OnlyEnforceIf(sel_E)
m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_E)
m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_E)
if sel_B is not None:
m.Add(vBn == 0).OnlyEnforceIf(sel_B)
m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_B)
m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_B)
if sel_S is not None:
m.Add(vSn == 0).OnlyEnforceIf(sel_S)
m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_S)
m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_S)
# foundry but neither collecting nor overworking: vats unchanged
# f_noncollect_noow = isF AND NOT fcol AND NOT fow = isF - fcol - fow
_fcol_count = sum(s for s in [_cvE, _cvB, _cvS] if s is not None)
_fow_count = sum(s for s in [_owcvE, _owcvB, _owcvS] if s is not None)
f_noncollect_noow = m.NewBoolVar("")
m.Add(f_noncollect_noow == isF[i, t] - _fcol_count - _fow_count)
m.Add(vEn == vE[i, t]).OnlyEnforceIf(f_noncollect_noow)
m.Add(vBn == vB[i, t]).OnlyEnforceIf(f_noncollect_noow)
m.Add(vSn == vS[i, t]).OnlyEnforceIf(f_noncollect_noow)
# assign vat[i, t+1]:
# renovate-to-foundry -> reset to 1
# continuing foundry -> vat_next
# otherwise (not foundry next) -> 0
cont_F = AND(isF[i, t], no_ren)
for vnext, vn in (
(vE[i, t + 1], vEn),
(vB[i, t + 1], vBn),
(vS[i, t + 1], vSn),
):
if rF.get((i, t)) is not None:
m.Add(vnext == 1).OnlyEnforceIf(rF[i, t])
m.Add(vnext == vn).OnlyEnforceIf(cont_F)
not_F_next = isF[i, t + 1].Not()
m.Add(vE[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
m.Add(vB[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
m.Add(vS[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
# ---- Builder one-shot: at most one fire across all (i, t) ----
# if builder_fire_map:
# m.Add(sum(builder_fire_map.values()) <= 1)
# ---- global constraints (per action type, per step) ----
for t in range(1, NUM_STEPS + 1):
ctx_global = {"N": N, "ow": ow, "t": t}
for action in action_registry:
action.add_global_constraints(m, t, ctx_global)
# ---- event-agent per-step hooks (registry empty in phase A) ----
_event_ctx = {
"model": m,
"N": N,
"governor": governor,
"tg_pool": tg_pool,
"capital_spent": capital_spent,
"gain_E": gain_E,
"gain_B": gain_B,
"gain_S": gain_S,
"gain_C": gain_C,
"cost_C": cost_C,
"cost_S": cost_S,
"fence_deposits": fence_deposits,
"baron_deposits": baron_deposits,
}
for t in range(1, NUM_STEPS + 1):
for agent in event_agent_registry:
if t in agent.available_steps:
agent.apply_event(m, t, _event_ctx)
# ---- trade-goods pool split (routes pool[t] -> gain_E/B/S/C) ----
# Generic-trade-good sources (Baron, Fence in later phases) deposit
# IntVars into tg_pool[t]; here we declare a 4-way split into the
# resource gain accumulators. If the pool is empty for step t, the split
# vars are constrained to 0.
for t in range(1, NUM_STEPS + 1):
pool_total = m.NewIntVar(0, max_res, f"tg_total_{t}")
if tg_pool[t]:
m.Add(pool_total == sum(tg_pool[t]))
else:
m.Add(pool_total == 0)
tg_E = m.NewIntVar(0, max_res, f"tg_E_{t}")
tg_B = m.NewIntVar(0, max_res, f"tg_B_{t}")
tg_S = m.NewIntVar(0, max_res, f"tg_S_{t}")
tg_C = m.NewIntVar(0, max_res, f"tg_C_{t}")
m.Add(tg_E + tg_B + tg_S + tg_C == pool_total)
# Each trade good (10 scaled units) must be allocated entirely to one resource
m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_E, 10)
m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_B, 10)
m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_S, 10)
gain_E[t].append(tg_E)
gain_B[t].append(tg_B)
gain_S[t].append(tg_S)
gain_C[t].append(tg_C)
# ---- resource pool recursion --------------------------------------
E = {1: m.NewIntVar(initial[0], initial[0], "E1")}
B = {1: m.NewIntVar(initial[1], initial[1], "B1")}
S = {1: m.NewIntVar(initial[2], initial[2], "S1")}
C = {1: m.NewIntVar(initial[3], initial[3], "C1")}
for t in range(1, NUM_STEPS + 1):
E[t + 1] = m.NewIntVar(0, max_res, f"E_{t + 1}")
B[t + 1] = m.NewIntVar(0, max_res, f"B_{t + 1}")
S[t + 1] = m.NewIntVar(0, max_res, f"S_{t + 1}")
C[t + 1] = m.NewIntVar(0, max_res, f"C_{t + 1}")
m.Add(C[t] - sum(cost_C[t]) >= 0)
m.Add(S[t] - sum(cost_S[t]) >= 0)
m.Add(E[t + 1] == E[t] + sum(gain_E[t]))
m.Add(B[t + 1] == B[t] + sum(gain_B[t]))
m.Add(S[t + 1] == S[t] - sum(cost_S[t]) + sum(gain_S[t]))
m.Add(C[t + 1] == C[t] - sum(cost_C[t]) + sum(gain_C[t]))
finalE, finalB, finalS = E[NUM_STEPS + 1], B[NUM_STEPS + 1], S[NUM_STEPS + 1]
# ---- Apply resource constraints ----
if resource_constraints is None:
resource_constraints = RESOURCE_CONSTRAINTS
if verbose and resource_constraints:
print(f"Adding {len(resource_constraints)} resource constraint(s)")
res_dict = {
"E": E,
"B": B,
"S": S,
"C": C,
}
for i, constraint_fn in enumerate(resource_constraints):
constraint = constraint_fn(res_dict)
if verbose:
print(f" Constraint {i + 1}: {constraint}")
print(f" Type: {type(constraint)}")
print(f" Constraint object: {constraint}")
if constraint is not None:
m.Add(constraint)
# ---- Phase 1: real per-resource ceilings ----
def _ceiling(var):
s = cp_model.CpSolver()
s.parameters.max_time_in_seconds = 20.0
s.parameters.num_search_workers = num_workers
m.Maximize(var)
st = s.Solve(m)
return (
int(s.ObjectiveValue())
if st in (cp_model.OPTIMAL, cp_model.FEASIBLE)
else max_res
)
capE = _ceiling(finalE)
capB = _ceiling(finalB)
capS = _ceiling(finalS)
m.Add(finalE <= capE)
m.Add(finalB <= capB)
m.Add(finalS <= capS)
# ======================================================================
# OBJECTIVE IS SET HERE
# ======================================================================
def Eprod(v):
return v * v
def Bprod(v):
return v
def Sprod(v):
return v * v
prodEE = m.NewIntVar(0, Eprod(capE), "prodEE")
m.AddMultiplicationEquality(prodEE, [finalE])
prodSS = m.NewIntVar(0, Sprod(capS), "prodSS")
m.AddMultiplicationEquality(prodSS, [finalS])
prodBB = m.NewIntVar(0, Bprod(capB), "prodBB")
m.AddMultiplicationEquality(prodBB, [finalB])
prodEB = m.NewIntVar(0, Eprod(capE) * Bprod(capB), "prodEB")
m.AddMultiplicationEquality(prodEB, [prodEE, prodBB])
obj = m.NewIntVar(0, Eprod(capE) * Bprod(capB) * Sprod(capS), "obj")
m.AddMultiplicationEquality(obj, [prodEB, prodSS])
m.Maximize(obj)
# ---- Phase 2: solve the product to optimality ----
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = time_limit
solver.parameters.num_search_workers = num_workers
status = None
if verbose:
status = solver.Solve(
m,
printer.IntermediateSolutionPrinter(
{"electrum": finalE, "brass": finalB, "steel": finalS}, scale=0.1
),
)
else:
status = solver.Solve(m)
if verbose:
print(f"(resource ceilings used: E<={capE} B<={capB} S<={capS})")
_report(
solver,
status,
cities,
N,
isH,
isF,
isM,
isMon,
col,
ua,
ub,
ud,
ow,
rH,
rF,
rM,
cvE,
cvB,
cvS,
owcvE,
owcvB,
owcvS,
mE,
mB,
mS,
mC,
owmE,
owmB,
owmS,
owmC,
hasA,
hasB,
hasD,
vE,
vB,
vS,
E,
B,
S,
C,
finalE,
finalB,
finalS,
action_registry,
governor,
agent_registry,
capital_spent,
fence_deposits,
baron_deposits,
)
return solver, status
def _report(
solver,
status,
cities,
N,
isH,
isF,
isM,
isMon,
col,
ua,
ub,
ud,
ow,
rH,
rF,
rM,
cvE,
cvB,
cvS,
owcvE,
owcvB,
owcvS,
mE,
mB,
mS,
mC,
owmE,
owmB,
owmS,
owmC,
hasA,
hasB,
hasD,
vE,
vB,
vS,
E,
B,
S,
C,
finalE,
finalB,
finalS,
action_registry,
governor=None,
agent_registry=None,
capital_spent=None,
fence_deposits=None,
baron_deposits=None,
):
print("status:", solver.StatusName(status))
if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE):
return
typ_name = {}
for i in range(N):
for t in range(1, NUM_STEPS + 1):
if solver.Value(isH[i, t]):
typ_name[i, t] = "Hub"
elif solver.Value(isF[i, t]):
typ_name[i, t] = "Foundry"
elif solver.Value(isM[i, t]):
typ_name[i, t] = "Metro"
elif solver.Value(isMon[i, t]):
typ_name[i, t] = "Monument"
else:
typ_name[i, t] = "-"
def action_str(i, t):
# Try each action in order until one is active
ctx = {
"typ_name": typ_name,
"cvE": cvE,
"cvB": cvB,
"cvS": cvS,
"mE": mE,
"mB": mB,
"mS": mS,
"mC": mC,
"owcvE": owcvE,
"owcvB": owcvB,
"owcvS": owcvS,
"owmE": owmE,
"owmB": owmB,
"owmS": owmS,
"owmC": owmC,
}
if solver.Value(col.get((i, t), 0)):
if typ_name[i, t] == "Foundry":
v = (
"E"
if solver.Value(cvE[i, t])
else "B"
if solver.Value(cvB[i, t])
else "S"
)
return f"Collect vat {v}"
if typ_name[i, t] == "Metro":
picks = []
for nm, var in (("E", mE), ("B", mB), ("S", mS), ("C", mC)):
picks += [nm] * (solver.Value(var[i, t]) // 10)
return "Collect {" + ",".join(picks) + "}"
return "Collect (+Capital)"
if solver.Value(ow.get((i, t), 0)):
if typ_name[i, t] == "Foundry":
v = (
"E"
if solver.Value(owcvE[i, t])
else "B"
if solver.Value(owcvB[i, t])
else "S"
)
return f"Overwork vat {v} (2x)"
if typ_name[i, t] == "Metro":
picks = []
for nm, var in (("E", owmE), ("B", owmB), ("S", owmS), ("C", owmC)):
picks += [nm] * (solver.Value(var[i, t]) // 10)
return "Overwork {" + ",".join(picks) + "} (2x)"
return "Overwork (+Capital 2x)"
if solver.Value(ua.get((i, t), 0)):
return "Upgrade a (cost-reduction)"
if solver.Value(ub.get((i, t), 0)):
return "Upgrade b (collect-bonus)"
if solver.Value(ud.get((i, t), 0)):
return "Upgrade d (vat-increment)"
if solver.Value(rH.get((i, t), 0)):
return "Renovate -> Hub"
if solver.Value(rF.get((i, t), 0)):
return "Renovate -> Foundry"
if solver.Value(rM.get((i, t), 0)):
return "Renovate -> Metro"
return "(inactive)"
print("\nPer-step pools (start of step):")
print(" step: E B S C")
for t in range(1, NUM_STEPS + 2):
label = f"after5" if t == NUM_STEPS + 1 else f"start {t}"
c_val = solver.Value(C[t])
c_str = f"{c_val / 10:6.1f}"
print(
f" {label:>8}: {solver.Value(E[t]) / 10:6.1f} {solver.Value(B[t]) / 10:6.1f} "
f"{solver.Value(S[t]) / 10:6.1f} {c_str}"
)
if capital_spent is not None:
print("\n capital spent per step:")
for t in range(1, NUM_STEPS + 1):
spent = sum(solver.Value(x) for x in capital_spent[t]) / 10
print(f" step {t}: {spent:6.1f}")
if fence_deposits:
print("\n Fence deposits (trade goods):")
for t in fence_deposits:
print(
f" step {t}: {fence_deposits[t] / 10:.1f} (scaled: {fence_deposits[t]})"
)
if baron_deposits:
print("\n Baron deposits (trade goods):")
for t in sorted(baron_deposits.keys()):
amt = solver.Value(baron_deposits[t])
print(f" step {t}: {amt / 10:.1f} (scaled: {amt})")
# Resolve governor assignments for display
def gov_for(i, t):
if not governor or not agent_registry:
return "-"
names = [a.name for a in agent_registry]
for name in names:
v = governor.get((i, t, name))
if v is None:
continue
try:
if solver.Value(v):
return name
except Exception:
continue
return "-"
print("\nActions:")
for i in range(N):
a_step, a_city = cities[i]
a_type = a_city["type"] if isinstance(a_city, dict) else a_city
print(f" City {i} (arrives step {a_step} as {a_type}):")
for t in range(a_step, NUM_STEPS + 1):
ups = "".join(
n
for n, h in (("a", hasA), ("b", hasB), ("d", hasD))
if solver.Value(h[i, t])
)
extra = (
f" vats(E{solver.Value(vE[i, t])},B{solver.Value(vB[i, t])},S{solver.Value(vS[i, t])})"
if typ_name[i, t] == "Foundry"
else ""
)
gov = gov_for(i, t)
print(
f" step {t}: [{typ_name[i, t]:7}] {action_str(i, t):28}"
f" upg[{ups}] gov[{gov}]{extra}"
)
fe, fb, fs = solver.Value(finalE), solver.Value(finalB), solver.Value(finalS)
print(
f"\nFINAL E={fe / 10:.1f} B={fb / 10:.1f} S={fs / 10:.1f} "
f"product(scaled) = {fe * fb * fs / 1000} sum = {(fe + fb + fs) / 10:.1f}"
)
if __name__ == "__main__":
solve()