dws-city-res-solve/solve.py

2170 lines
77 KiB
Python

"""
City Resource Optimization -> CP-SAT (Google OR-Tools) [ACTION REGISTRY VERSION]
Maximise Electrum * Brass * Steel after the gains of step 5.
The objective is a product of three variables, so this is a constraint-
programming / nonlinear problem, hence CP-SAT (cp_model) rather than the
LP/MIP solver. Read the comments at:
- PARAMETERS (initial pools + arrival schedule + bonus mode)
- "OBJECTIVE IS SET HERE" (the product being maximised -- tweak freely)
"""
from ortools.sat.python import cp_model
import printer
# ======================================================================
# PARAMETERS -- edit these
# ======================================================================
# Starting resource pools at the start of step 1:
# (Electrum, Brass, Steel, Capital, Renown, Luxuries, Express tickets)
# Note: all resource values are scaled by 10 internally so that fractional
# trade-good splits (Baron/Fence et al, phase B/C) can be modeled with integers.
# Display in _report divides by 10.
INITIAL = (30, 30, 30, 30, 0, 0, 0)
# fixed_choices = {
# "actions": {
# (city_idx, step): "action_name", # e.g., (0, 2): "collect"
# # ...
# },
# "governors": {
# (city_idx, step, agent_name): bool, # e.g., (1, 1, "provisioner"): True
# # ...
# }
# }
#
# None for nothing
# FIXED_CHOICES = None
FIXED_CHOICES = {"actions": {}}
# Resource constraints: list of callables that receive a dict with keys
# E, B, S, C, R (Renown), L (Luxuries), X (Express tickets)
# mapping to resource variables indexed by step. Each callable returns a constraint
# (or None to skip) to be passed to m.Add(). Example:
# lambda res: res["E"][3] >= 50 # Ensure at least 50 E at step 3
RESOURCE_CONSTRAINTS = []
# Maximize criteria for solve(). Factor = exponent in "product" mode,
# weight in "sum" mode. Keys: E, B, S, C, R, L, X; missing keys = 0 (resource
# excluded from the objective — it is NOT forced to zero). Negative
# factors are allowed only in "sum" mode (a negative exponent would mean
# division, which integer CP-SAT cannot express).
OBJECTIVE_FACTORS = {"E": 2, "B": 1, "S": 2}
OBJECTIVE_MODE = "product" # "product" or "sum"
def _validate_objective(factors, mode):
if mode not in ("product", "sum"):
raise ValueError(f"objective_mode must be 'product' or 'sum', got {mode!r}")
unknown = set(factors) - {"E", "B", "S", "C", "R", "L", "X"}
if unknown:
raise ValueError(f"unknown objective_factors keys: {sorted(unknown)}")
for k, v in factors.items():
if not isinstance(v, int) or isinstance(v, bool):
raise ValueError(f"objective factor {k}={v!r} must be an int")
if mode == "product" and v < 0:
raise ValueError(
f"objective factor {k}={v} is negative; negative exponents "
"are not expressible in product mode"
)
if not any(factors.values()):
raise ValueError("objective_factors needs at least one nonzero factor")
# Arrival schedule. Key = step (1..5), value = list of city types that arrive
# at the START of that step. Types: 'H' Hub, 'F' Foundry, 'M' Metropolis, 'N' Monument.
# Arriving cities act that same step.
# Each entry may be either a string shorthand (e.g. "H") or a dict, e.g.
# {"type": "F", "adjacent_to": [city_idx, ...], "vats": {"E": 1, "B": 1, "S": 1},
# "base": False, "departure_step": NUM_STEPS + 1}
# `adjacent_to` lists city indices considered adjacent to this city; it is
# used by the Industrialist agent (Network grants flow from the governed
# city to the cities listed in ITS adjacent_to).
# Optional `base` marks the city as having a Base (used by Provisioner);
# defaults to False.
# Optional `vats` overrides initial vat values at arrival (each defaults to 1).
# Optional `departure_step` is the first step at which the city is ABSENT;
# default NUM_STEPS+1 means the city stays through the whole simulation.
ARRIVALS = {
1: [
{"type": "F", "adjacent_to": []},
{"type": "F", "adjacent_to": []},
{"type": "F", "adjacent_to": []},
{"type": "H", "adjacent_to": []},
],
2: [],
3: [],
4: [],
5: [],
}
def normalize_city(c):
if isinstance(c, str):
c = {"type": c}
vats_in = c.get("vats", {}) or {}
vats = {
"E": vats_in.get("E", 1),
"B": vats_in.get("B", 1),
"S": vats_in.get("S", 1),
}
return {
"type": c["type"],
"adjacent_to": c.get("adjacent_to", []),
"base": c.get("base", False),
"vats": vats,
"departure_step": c.get("departure_step", NUM_STEPS + 1),
}
# Collect Bonus (b) adds +1 to whatever a Collect gives. On a foundry that is
# +1 of the resource collected (i.e. the chosen vat's value + 1). This is the
# same uniform rule as Hub (+1 Capital) and Metropolis (+1 resource pick).
NUM_STEPS = 5
MAX_RES = (
2000 # upper bound on any resource pool (scaled x10; raise if you expect more)
)
MAX_VAT = 15 # upper bound on a foundry vat value (1 + 2*nsteps is plenty); unscaled
# Bastion counts indexed by t-1; bastions on map at step t. Reserved for
# phase B/C/D agents (no effect yet).
BASTION_COUNTS = [3, 3, 3, 3, 3]
# Agent name -> list of steps where the agent is available.
# Empty/missing means the agent is disabled. Phase B/C/D will populate.
AGENT_AVAILABILITY = {
"capitalist": [],
"baron": [],
"prodigy": [],
"provisioner": [],
"metallurgist": [],
"builder": [],
"courier": [],
"planner": [1, 2, 3, 4, 5],
"fence": [],
"foreman": [],
"industrialist": [],
"vinter": [],
"artificer": [],
"connoisseur": [],
"economist": [],
"influencer": [],
"cosmopolitan": [],
}
# ======================================================================
# ACTION REGISTRY CONFIGURATION
# ======================================================================
ENABLED_ACTIONS = {
"collect": True,
"upgrade_a": True,
"upgrade_b": True,
"upgrade_d": True,
"overwork": True,
"renovate_h": True,
"renovate_f": True,
"noop": True,
# "renovate_m": False, # currently always disabled in main.py
}
# ======================================================================
# ACTION REGISTRY BASE CLASS
# ======================================================================
class Action:
"""Base class for all actions. Subclasses implement the action's behavior."""
def __init__(self, name):
self.name = name
def declare_vars(self, model, i, t, ctx):
"""Declare the action's decision variable(s) for city i at step t.
Should create and store a selector var (selector_var) in ctx['action_vars'][self.name].
"""
pass
def add_constraints(self, model, i, t, ctx):
"""Add precondition and legality constraints for this action."""
pass
def add_global_constraints(self, model, t, ctx):
"""Add per-step global constraints (e.g. overwork's 'at most 1 per step')."""
pass
def contributes_gains_costs(self, model, i, t, ctx):
"""Add contributions to gain_E/B/S/C and cost_C/S accumulators."""
pass
def transition_contrib(self, model, i, t, ctx):
"""Handle state transitions (type, upgrade, vat) caused by this action."""
pass
def selector_var(self, i, t, ctx):
"""Return the boolean variable that is 1 iff this action is chosen for (i,t).
Used to form the "exactly one action" constraint.
"""
return ctx["action_vars"].get(self.name, {}).get((i, t), None)
def describe(self, solver, i, t, ctx):
"""Return a human-readable string describing what this action does for (i,t).
Called during reporting if this action is the chosen one.
"""
return f"{self.name}"
# (VatsMixin removed — vat transitions are handled centrally in solve())
# ======================================================================
# CONCRETE ACTIONS
# ======================================================================
class NoOpAction(Action):
def __init__(self):
super().__init__("noop")
def declare_vars(self, model, i, t, ctx):
noop = model.NewBoolVar(f"noop_{i}_{t}")
ctx["action_vars"]["noop"][(i, t)] = noop
ctx["noop"][i, t] = noop
# only allowed after overwork to avoid more things needing to be checked
def add_constraints(self, model, i, t, ctx):
noop = ctx["noop"][i, t]
if t == ctx["arrival_step"][i]:
# Can't noop at arrival (no prior step)
model.Add(noop == 0)
else:
# noop ≤ ow[i, t-1]: noop can be 1 only if overwork was 1 last step
model.Add(noop <= ctx["ow"][i, t - 1])
def add_global_constraints(self, model, t, ctx):
pass
def contributes_gains_costs(self, model, i, t, ctx):
# No gains or costs
pass
def transition_contrib(self, model, i, t, ctx):
pass
def describe(self, solver, i, t, ctx):
return "Rest (no action)"
class CollectAction(Action):
def __init__(self):
super().__init__("collect")
self.AND_cache = {}
def declare_vars(self, model, i, t, ctx):
m = model
col = m.NewBoolVar(f"col_{i}_{t}")
cvE = m.NewBoolVar(f"cvE_{i}_{t}")
cvB = m.NewBoolVar(f"cvB_{i}_{t}")
cvS = m.NewBoolVar(f"cvS_{i}_{t}")
mE = m.NewIntVar(0, 30, f"mE_{i}_{t}")
mB = m.NewIntVar(0, 30, f"mB_{i}_{t}")
mS = m.NewIntVar(0, 30, f"mS_{i}_{t}")
mC = m.NewIntVar(0, 30, f"mC_{i}_{t}")
ctx["action_vars"]["collect"][(i, t)] = col
ctx["col"][i, t] = col
ctx["cvE"][i, t] = cvE
ctx["cvB"][i, t] = cvB
ctx["cvS"][i, t] = cvS
ctx["mE"][i, t] = mE
ctx["mB"][i, t] = mB
ctx["mS"][i, t] = mS
ctx["mC"][i, t] = mC
def add_constraints(self, model, i, t, ctx):
m = model
isF = ctx["isF"]
isM = ctx["isM"]
isMon = ctx["isMon"]
hasB = ctx["hasB"]
col = ctx["col"][i, t]
cvE, cvB, cvS = ctx["cvE"][i, t], ctx["cvB"][i, t], ctx["cvS"][i, t]
mE, mB, mS, mC = (
ctx["mE"][i, t],
ctx["mB"][i, t],
ctx["mS"][i, t],
ctx["mC"][i, t],
)
# Monument can't collect
m.Add(col == 0).OnlyEnforceIf(isMon[i, t])
# Foundry: exactly one vat chosen iff foundry collects
fcol = self._AND(model, isF[i, t], col, ctx)
m.Add(cvE + cvB + cvS == fcol)
# Metropolis: pick (2 + hasB) resources
mcol = self._AND(model, isM[i, t], col, ctx)
npick = m.NewIntVar(0, 3, "")
m.Add(npick == 2 + hasB[i, t]).OnlyEnforceIf(mcol)
m.Add(npick == 0).OnlyEnforceIf(mcol.Not())
# Each "pick" is 10 units of the chosen resource (x10 scaling).
m.Add(mE + mB + mS + mC == npick * 10)
def add_global_constraints(self, model, t, ctx):
pass
def contributes_gains_costs(self, model, i, t, ctx):
m = model
isH = ctx["isH"]
isF = ctx["isF"]
isM = ctx["isM"]
hasB = ctx["hasB"]
vE, vB, vS = ctx["vE"], ctx["vB"], ctx["vS"]
col = ctx["col"][i, t]
cvE, cvB, cvS = ctx["cvE"][i, t], ctx["cvB"][i, t], ctx["cvS"][i, t]
mE, mB, mS, mC = (
ctx["mE"][i, t],
ctx["mB"][i, t],
ctx["mS"][i, t],
ctx["mC"][i, t],
)
gain_E = ctx["gain_E"][t]
gain_B = ctx["gain_B"][t]
gain_S = ctx["gain_S"][t]
gain_C = ctx["gain_C"][t]
cost_C = ctx["cost_C"][t]
capital_spent = ctx["capital_spent"][t]
# Collect sub-choices
fcol = self._AND(model, isF[i, t], col, ctx)
mcol = self._AND(model, isM[i, t], col, ctx)
hcol = self._AND(model, isH[i, t], col, ctx)
# Costs: foundry & metropolis collect each cost 1 Capital (x10 scaled)
fcol_cost = m.NewIntVar(0, 10, "")
m.Add(fcol_cost == 10 * fcol)
cost_C.append(fcol_cost)
capital_spent.append(fcol_cost)
mcol_cost = m.NewIntVar(0, 10, "")
m.Add(mcol_cost == 10 * mcol)
cost_C.append(mcol_cost)
capital_spent.append(mcol_cost)
# Hub collect: +2 Capital (+1 more if collect-bonus b) -> x10
hub_gain = m.NewIntVar(0, 30, "")
m.Add(hub_gain == 10 * (2 + hasB[i, t])).OnlyEnforceIf(hcol)
m.Add(hub_gain == 0).OnlyEnforceIf(hcol.Not())
gain_C.append(hub_gain)
# Metropolis collect: each "pick" is already 10 units (mE/mB/mS/mC).
gain_E.append(mE)
gain_B.append(mB)
gain_S.append(mS)
gain_C.append(mC)
# Foundry collect: gain chosen vat's value as that resource (vat unscaled),
# then scale x10 for the resource gain.
gEf = m.NewIntVar(0, ctx["max_vat"], "")
gBf = m.NewIntVar(0, ctx["max_vat"], "")
gSf = m.NewIntVar(0, ctx["max_vat"], "")
m.AddMultiplicationEquality(gEf, [cvE, vE[i, t]])
m.AddMultiplicationEquality(gBf, [cvB, vB[i, t]])
m.AddMultiplicationEquality(gSf, [cvS, vS[i, t]])
gEf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "")
gBf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "")
gSf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "")
m.Add(gEf_scaled == 10 * gEf)
m.Add(gBf_scaled == 10 * gBf)
m.Add(gSf_scaled == 10 * gSf)
gain_E.append(gEf_scaled)
gain_B.append(gBf_scaled)
gain_S.append(gSf_scaled)
# Collect Bonus (b): adds +1 to the amount a Collect gives -> +10 scaled
be = self._AND(model, cvE, hasB[i, t], ctx)
bb = self._AND(model, cvB, hasB[i, t], ctx)
bs = self._AND(model, cvS, hasB[i, t], ctx)
be10 = m.NewIntVar(0, 10, "")
bb10 = m.NewIntVar(0, 10, "")
bs10 = m.NewIntVar(0, 10, "")
m.Add(be10 == 10 * be)
m.Add(bb10 == 10 * bb)
m.Add(bs10 == 10 * bs)
gain_E.append(be10)
gain_B.append(bb10)
gain_S.append(bs10)
def transition_contrib(self, model, i, t, ctx):
pass # Vat transitions handled centrally in solve()
def describe(self, solver, i, t, ctx):
typ_name = ctx["typ_name"].get((i, t), "-")
cvE = ctx["cvE"].get((i, t))
cvB = ctx["cvB"].get((i, t))
cvS = ctx["cvS"].get((i, t))
mE = ctx["mE"].get((i, t))
mB = ctx["mB"].get((i, t))
mS = ctx["mS"].get((i, t))
mC = ctx["mC"].get((i, t))
if typ_name == "Foundry":
v = "E" if solver.Value(cvE) else "B" if solver.Value(cvB) else "S"
return f"Collect vat {v}"
if typ_name == "Metro":
picks = []
for nm, var in (("E", mE), ("B", mB), ("S", mS), ("C", mC)):
picks += [nm] * (solver.Value(var) // 10)
return "Collect {" + ",".join(picks) + "}"
return "Collect (+Capital)"
def _AND(self, model, a, b, ctx):
key = (id(a), id(b))
if key not in self.AND_cache:
c = model.NewBoolVar("")
model.AddMultiplicationEquality(c, [a, b])
self.AND_cache[key] = c
return self.AND_cache[key]
class UpgradeAction(Action):
"""Base class for upgrade actions (a, b, d)."""
def __init__(self, name, upgrade_key):
super().__init__(name)
self.upgrade_key = upgrade_key # 'A', 'B', or 'D'
def declare_vars(self, model, i, t, ctx):
m = model
u = m.NewBoolVar(f"{self.upgrade_key.lower()}_{i}_{t}")
ctx["action_vars"][self.name][(i, t)] = u
ctx[f"u{self.upgrade_key.lower()}"][i, t] = u
def add_constraints(self, model, i, t, ctx):
m = model
u = ctx[f"u{self.upgrade_key.lower()}"][i, t]
has_key = ctx[f"has{self.upgrade_key}"]
isMon = ctx["isMon"]
isF = ctx["isF"]
# Can't re-acquire
m.Add(has_key[i, t] == 0).OnlyEnforceIf(u)
# Monument can't upgrade
m.Add(u == 0).OnlyEnforceIf(isMon[i, t])
# D is foundry-only
if self.upgrade_key == "D":
m.Add(isF[i, t] == 1).OnlyEnforceIf(u)
def add_global_constraints(self, model, t, ctx):
pass
def contributes_gains_costs(self, model, i, t, ctx):
m = model
u = ctx[f"u{self.upgrade_key.lower()}"][i, t]
hasA = ctx["hasA"]
cost_S = ctx["cost_S"][t]
# Upgrades b,d cost 2 Steel, reduced by 1 if cost-reduction (a) already held
# (x10 scaled: 20 / 10).
if self.upgrade_key in ("B", "D"):
c = m.NewIntVar(0, 20, "")
both = m.NewBoolVar("")
m.AddMultiplicationEquality(both, [u, hasA[i, t]])
m.Add(c == 20).OnlyEnforceIf(u, hasA[i, t].Not())
m.Add(c == 10).OnlyEnforceIf(both)
m.Add(c == 0).OnlyEnforceIf(u.Not())
cost_S.append(c)
def transition_contrib(self, model, i, t, ctx):
pass
def describe(self, solver, i, t, ctx):
if self.upgrade_key == "A":
return "Upgrade a (cost-reduction)"
elif self.upgrade_key == "B":
return "Upgrade b (collect-bonus)"
elif self.upgrade_key == "D":
return "Upgrade d (vat-increment)"
return self.name
class OverworkAction(Action):
def __init__(self):
super().__init__("overwork")
self.AND_cache = {}
def declare_vars(self, model, i, t, ctx):
m = model
ow = m.NewBoolVar(f"ow_{i}_{t}")
owcvE = m.NewBoolVar(f"owcvE_{i}_{t}")
owcvB = m.NewBoolVar(f"owcvB_{i}_{t}")
owcvS = m.NewBoolVar(f"owcvS_{i}_{t}")
owmE = m.NewIntVar(0, 60, f"owmE_{i}_{t}")
owmB = m.NewIntVar(0, 60, f"owmB_{i}_{t}")
owmS = m.NewIntVar(0, 60, f"owmS_{i}_{t}")
owmC = m.NewIntVar(0, 60, f"owmC_{i}_{t}")
ctx["action_vars"]["overwork"][(i, t)] = ow
ctx["ow"][i, t] = ow
ctx["owcvE"][i, t] = owcvE
ctx["owcvB"][i, t] = owcvB
ctx["owcvS"][i, t] = owcvS
ctx["owmE"][i, t] = owmE
ctx["owmB"][i, t] = owmB
ctx["owmS"][i, t] = owmS
ctx["owmC"][i, t] = owmC
def add_constraints(self, model, i, t, ctx):
m = model
isF = ctx["isF"]
isM = ctx["isM"]
isMon = ctx["isMon"]
hasB = ctx["hasB"]
ow = ctx["ow"][i, t]
owcvE = ctx["owcvE"][i, t]
owcvB = ctx["owcvB"][i, t]
owcvS = ctx["owcvS"][i, t]
owmE = ctx["owmE"][i, t]
owmB = ctx["owmB"][i, t]
owmS = ctx["owmS"][i, t]
owmC = ctx["owmC"][i, t]
# Monument can't overwork
m.Add(ow == 0).OnlyEnforceIf(isMon[i, t])
# Foundry overwork: exactly one vat chosen iff foundry overworks
fow = self._AND(model, isF[i, t], ow, ctx)
m.Add(owcvE + owcvB + owcvS == fow)
# Metropolis overwork: pick 2*(2 + hasB) resources
mow = self._AND(model, isM[i, t], ow, ctx)
ow_npick = m.NewIntVar(0, 6, "")
m.Add(ow_npick == 2 * (2 + hasB[i, t])).OnlyEnforceIf(mow)
m.Add(ow_npick == 0).OnlyEnforceIf(mow.Not())
# Each "pick" is 10 units (x10 scaling).
m.Add(owmE + owmB + owmS + owmC == ow_npick * 10)
# Overwork cooldown: city that overworked last step can't collect or overwork
if t > ctx["arrival_step"][i]:
col = ctx["col"].get((i, t))
if col is not None:
m.Add(col == 0).OnlyEnforceIf(ctx["ow"][i, t - 1])
m.Add(ow == 0).OnlyEnforceIf(ctx["ow"][i, t - 1])
# Planner gating: when planner is enabled, a city may overwork only if
# the planner agent governs it. Removes the global "at most 1" cap.
if "planner" in AGENT_AVAILABILITY:
planner_g = ctx["governor"].get((i, t, "planner"))
if planner_g is not None:
m.Add(ow == 0).OnlyEnforceIf(planner_g.Not())
else:
m.Add(ow == 0)
def add_global_constraints(self, model, t, ctx):
m = model
ow = ctx["ow"]
N = ctx["N"]
# no planner = no overworks
if "planner" not in AGENT_AVAILABILITY:
m.Add(sum(ow[i, t] for i in range(N)) <= 0)
# else: gating handled per-city by Planner
def contributes_gains_costs(self, model, i, t, ctx):
m = model
isH = ctx["isH"]
isF = ctx["isF"]
isM = ctx["isM"]
hasB = ctx["hasB"]
vE, vB, vS = ctx["vE"], ctx["vB"], ctx["vS"]
ow = ctx["ow"][i, t]
owcvE = ctx["owcvE"][i, t]
owcvB = ctx["owcvB"][i, t]
owcvS = ctx["owcvS"][i, t]
owmE = ctx["owmE"][i, t]
owmB = ctx["owmB"][i, t]
owmS = ctx["owmS"][i, t]
owmC = ctx["owmC"][i, t]
gain_E = ctx["gain_E"][t]
gain_B = ctx["gain_B"][t]
gain_S = ctx["gain_S"][t]
gain_C = ctx["gain_C"][t]
# Overwork sub-choices
fow = self._AND(model, isF[i, t], ow, ctx)
mow = self._AND(model, isM[i, t], ow, ctx)
how = self._AND(model, isH[i, t], ow, ctx)
# Hub overwork: 2*(2 + hasB) Capital -> x10
hub_ow_gain = m.NewIntVar(0, 60, "")
m.Add(hub_ow_gain == 20 * (2 + hasB[i, t])).OnlyEnforceIf(how)
m.Add(hub_ow_gain == 0).OnlyEnforceIf(how.Not())
gain_C.append(hub_ow_gain)
# Metropolis overwork: each "pick" is already 10 units (owm* vars).
gain_E.append(owmE)
gain_B.append(owmB)
gain_S.append(owmS)
gain_C.append(owmC)
# Foundry overwork: 2 * chosen vat's value, then x10 scaling.
_owE = m.NewIntVar(0, ctx["max_vat"], "")
_owB = m.NewIntVar(0, ctx["max_vat"], "")
_owS = m.NewIntVar(0, ctx["max_vat"], "")
m.AddMultiplicationEquality(_owE, [owcvE, vE[i, t]])
m.AddMultiplicationEquality(_owB, [owcvB, vB[i, t]])
m.AddMultiplicationEquality(_owS, [owcvS, vS[i, t]])
owgEf = m.NewIntVar(0, 20 * ctx["max_vat"], "")
owgBf = m.NewIntVar(0, 20 * ctx["max_vat"], "")
owgSf = m.NewIntVar(0, 20 * ctx["max_vat"], "")
m.Add(owgEf == 20 * _owE)
m.Add(owgBf == 20 * _owB)
m.Add(owgSf == 20 * _owS)
gain_E.append(owgEf)
gain_B.append(owgBf)
gain_S.append(owgSf)
# Collect Bonus (b) for foundry overwork: 2 * (+1) of that resource -> x10 = +20
ow_be = self._AND(model, owcvE, hasB[i, t], ctx)
ow_bb = self._AND(model, owcvB, hasB[i, t], ctx)
ow_bs = self._AND(model, owcvS, hasB[i, t], ctx)
ow_be20 = m.NewIntVar(0, 20, "")
ow_bb20 = m.NewIntVar(0, 20, "")
ow_bs20 = m.NewIntVar(0, 20, "")
m.Add(ow_be20 == 20 * ow_be)
m.Add(ow_bb20 == 20 * ow_bb)
m.Add(ow_bs20 == 20 * ow_bs)
gain_E.append(ow_be20)
gain_B.append(ow_bb20)
gain_S.append(ow_bs20)
def transition_contrib(self, model, i, t, ctx):
pass # Vat transitions handled centrally in solve()
def describe(self, solver, i, t, ctx):
typ_name = ctx["typ_name"].get((i, t), "-")
owcvE = ctx["owcvE"].get((i, t))
owcvB = ctx["owcvB"].get((i, t))
owcvS = ctx["owcvS"].get((i, t))
owmE = ctx["owmE"].get((i, t))
owmB = ctx["owmB"].get((i, t))
owmS = ctx["owmS"].get((i, t))
owmC = ctx["owmC"].get((i, t))
if typ_name == "Foundry":
v = "E" if solver.Value(owcvE) else "B" if solver.Value(owcvB) else "S"
return f"Overwork vat {v} (2x)"
if typ_name == "Metro":
picks = []
for nm, var in (("E", owmE), ("B", owmB), ("S", owmS), ("C", owmC)):
picks += [nm] * (solver.Value(var) // 10)
return "Overwork {" + ",".join(picks) + "} (2x)"
return "Overwork (+Capital 2x)"
def _AND(self, model, a, b, ctx):
key = (id(a), id(b))
if not hasattr(self, "AND_cache"):
self.AND_cache = {}
if key not in self.AND_cache:
c = model.NewBoolVar("")
model.AddMultiplicationEquality(c, [a, b])
self.AND_cache[key] = c
return self.AND_cache[key]
class RenovateAction(Action):
"""Base class for renovation actions."""
def __init__(self, name, target_type):
super().__init__(name)
self.target_type = target_type # 'H', 'F', etc.
def declare_vars(self, model, i, t, ctx):
m = model
r = m.NewBoolVar(f"r{self.target_type}_{i}_{t}")
ctx["action_vars"][self.name][(i, t)] = r
ctx[f"r{self.target_type}"][i, t] = r
def add_constraints(self, model, i, t, ctx):
m = model
r = ctx[f"r{self.target_type}"][i, t]
target = ctx[f"is{self.target_type}"]
# Renovate-to-X requires not-X now
m.Add(target[i, t] == 0).OnlyEnforceIf(r)
def add_global_constraints(self, model, t, ctx):
pass
def contributes_gains_costs(self, model, i, t, ctx):
pass
def transition_contrib(self, model, i, t, ctx):
pass
def describe(self, solver, i, t, ctx):
type_name = {"H": "Hub", "F": "Foundry", "M": "Metro"}
return f"Renovate -> {type_name.get(self.target_type, self.target_type)}"
# ======================================================================
# AGENT REGISTRY BASE CLASSES
# ======================================================================
class Agent:
"""Governor-style agent. Assigned to a city per step via governor[i,t,name]."""
name: str
available_steps: list # steps where this agent may be assigned
def __init__(self, name, available_steps):
self.name = name
self.available_steps = list(available_steps)
def declare_vars(self, model, ctx):
"""One-time vars (e.g. one-shot flags). Called once before per-(i,t) loop."""
pass
def apply(self, model, i, t, ctx):
"""Per-(city,step) effect. Gate logic on ctx['governor'][i,t,self.name]."""
pass
class EventAgent:
"""Triggers automatically each available step. No city target.
Reads/writes ctx (typically tg_pool or gain_*)."""
name: str
available_steps: list
def __init__(self, name, available_steps):
self.name = name
self.available_steps = list(available_steps)
def declare_vars(self, model, ctx):
pass
def apply_event(self, model, t, ctx):
pass
# ======================================================================
# PHASE B AGENTS
# ======================================================================
class Capitalist(Agent):
"""+20 Capital (scaled = +2 Capital) when governing a Collect that
actually yields Capital: Hub collects, or Metropolis collects that pick
at least one Capital. Foundry collects yield no Capital and get no bonus
(agents.txt: "increases Capital Collection by +2")."""
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
col = ctx["col"].get((i, t))
if col is None:
return
isH = ctx["isH"]
mC_var = ctx["mC"].get((i, t))
# cap_yield: this city's Collect yields Capital (Hub always; Metro
# only when its picks include Capital, i.e. mC > 0).
if mC_var is not None:
mc_pos = m.NewBoolVar("")
m.Add(mC_var >= 10).OnlyEnforceIf(mc_pos)
m.Add(mC_var == 0).OnlyEnforceIf(mc_pos.Not())
cap_yield = m.NewBoolVar("")
m.AddMaxEquality(cap_yield, [isH[i, t], mc_pos])
else:
cap_yield = isH[i, t]
both = m.NewBoolVar("")
m.AddMultiplicationEquality(both, [g, col])
active = m.NewBoolVar("")
m.AddMultiplicationEquality(active, [both, cap_yield])
bonus = m.NewIntVar(0, 20, "")
m.Add(bonus == 20).OnlyEnforceIf(active)
m.Add(bonus == 0).OnlyEnforceIf(active.Not())
ctx["gain_C"][t].append(bonus)
class Baron(Agent):
"""BASTION_COUNTS[t-1]*10 trade-goods when governing a Collect action."""
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
col = ctx["col"].get((i, t))
if col is None:
return
amt = BASTION_COUNTS[t - 1] * 10
both = m.NewBoolVar("")
m.AddMultiplicationEquality(both, [g, col])
deposit = m.NewIntVar(0, max(amt, 0), "")
m.Add(deposit == amt).OnlyEnforceIf(both)
m.Add(deposit == 0).OnlyEnforceIf(both.Not())
ctx["tg_pool"][t].append(deposit)
ctx.setdefault("baron_deposits", {})[t] = deposit
class Prodigy(Agent):
"""Refund "up to 2 Steel" when governing AND city upgrades B or D.
The refund is capped at the Steel actually spent: upgrades cost 20
scaled Steel normally but only 10 if the city already holds the
cost-reduction upgrade (a), so the refund matches (10 with hasA, else 20).
Airship launches are not modeled, so that half of the ability is out of
scope."""
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
hasA = ctx["hasA"]
for u_var in (ctx["ub"].get((i, t)), ctx["ud"].get((i, t))):
if u_var is None:
continue
both = m.NewBoolVar("")
m.AddMultiplicationEquality(both, [g, u_var])
refund = m.NewIntVar(0, 20, "")
m.Add(refund == 20).OnlyEnforceIf(both, hasA[i, t].Not())
m.Add(refund == 10).OnlyEnforceIf(both, hasA[i, t])
m.Add(refund == 0).OnlyEnforceIf(both.Not())
ctx["gain_S"][t].append(refund)
class Provisioner(Agent):
"""+15 Electrum (=1.5 Electrum scaled) when APPOINTED Governor of a City
with a Base. Pays once per appointment: governs at t but did not govern
at t-1 (governing at the first step counts as an appointment). A city
declares a Base via the `base` flag in its arrival entry (default False)."""
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
cities = ctx.get("cities")
if cities is None or not cities[i][1].get("base", False):
return
prev = ctx["governor"].get((i, t - 1, self.name), 0) if t > 1 else 0
# appoint = g AND NOT prev (linear form; prev may be a constant)
appoint = m.NewBoolVar("")
m.Add(appoint <= g)
m.Add(appoint <= 1 - prev)
m.Add(appoint >= g - prev)
bonus = m.NewIntVar(0, 15, "")
m.Add(bonus == 15).OnlyEnforceIf(appoint)
m.Add(bonus == 0).OnlyEnforceIf(appoint.Not())
ctx["gain_E"][t].append(bonus)
class Metallurgist(Agent):
"""As Governor of a Foundry, grants the effect of the Overflow Vats
Upgrade (upgrade d, +1 vat increment). Does NOT stack with the actual
Upgrade: the increment is 1 + max(hasD, metallurgist-governs-foundry).
Implemented centrally in solve()'s vat-transition block; this class
exists only for registry uniformity."""
def apply(self, model, i, t, ctx):
# Vat-level effect handled in solve()'s centralized vat-transition block.
pass
class Builder(Agent):
"""Per appointment: grants the governed City its type-specific Upgrade.
Only the Foundry type-specific Upgrade (Overflow Vats, hasD) is modeled,
so the grant fires only when governing a Foundry (hasD[i,t+1]=1 via the
central transition). Hub/Metropolis type-specific Upgrades are not
represented in this model; Builder is a no-op on those types."""
def declare_vars(self, model, ctx):
ctx.setdefault("builder_fire", {})
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
isF = ctx["isF"]
fire = m.NewBoolVar(f"builder_fire_{i}_{t}")
m.AddMultiplicationEquality(fire, [g, isF[i, t]])
ctx.setdefault("builder_fire", {})[(i, t)] = fire
class Courier(Agent):
"""One-shot globally: +30 to gain_C, gain_S, gain_B on the first step
where Courier governs anywhere."""
def declare_vars(self, model, ctx):
m = model
N = ctx["N"]
governor = ctx["governor"]
gain_C = ctx["gain_C"]
gain_S = ctx["gain_S"]
gain_B = ctx["gain_B"]
courier_any = {}
courier_fired = {}
ever_fired = {}
prev_ever = None
for t in range(1, NUM_STEPS + 1):
terms = []
for i in range(N):
v = governor.get((i, t, self.name))
if v is not None:
terms.append(v)
any_t = m.NewBoolVar(f"courier_any_{t}")
if terms:
# any_t = OR(terms)
m.AddMaxEquality(any_t, terms)
else:
m.Add(any_t == 0)
courier_any[t] = any_t
fired_t = m.NewBoolVar(f"courier_fired_{t}")
if prev_ever is None:
# fired_t == any_t (no prior history)
m.Add(fired_t == any_t)
else:
# fired_t = any_t AND NOT prev_ever
m.Add(fired_t <= any_t)
m.Add(fired_t <= 1 - prev_ever)
m.Add(fired_t >= any_t - prev_ever)
courier_fired[t] = fired_t
ever_t = m.NewBoolVar(f"courier_ever_{t}")
if prev_ever is None:
m.Add(ever_t == fired_t)
else:
m.AddMaxEquality(ever_t, [prev_ever, fired_t])
ever_fired[t] = ever_t
prev_ever = ever_t
# Add +30 contribution to gain_C/S/B
for accum in (gain_C[t], gain_S[t], gain_B[t]):
bonus = m.NewIntVar(0, 30, "")
m.Add(bonus == 30).OnlyEnforceIf(fired_t)
m.Add(bonus == 0).OnlyEnforceIf(fired_t.Not())
accum.append(bonus)
ctx["courier_any"] = courier_any
ctx["courier_fired"] = courier_fired
def apply(self, model, i, t, ctx):
pass
# ======================================================================
# PHASE C AGENTS
# ======================================================================
class Planner(Agent):
"""Effect realized via OverworkAction (per-city gating). No-op here."""
def apply(self, model, i, t, ctx):
pass
class Foreman(Agent):
"""Restructure: as Governor, Renovating the city does not prevent other
Industry Actions — the governed city may take one additional action in
the same step as a Renovate. Wired centrally into the per-city action
count constraint in solve() (action_sum == present + foreman_extra);
this class exists only for registry uniformity."""
def apply(self, model, i, t, ctx):
pass
class Industrialist(Agent):
"""Network: when appointed Governor, the governed City and all cities
listed in its `adjacent_to` gain the Infrastructure Upgrade.
ASSUMPTION: "Infrastructure Upgrade" is modeled as upgrade a
(cost-reduction / hasA). The grant is wired centrally in solve() as an
extra source of the hasA max-equality transition (indus_grant_map), so
it cannot conflict with the transition constraint, and grants targeting
cities that are absent (not yet arrived / departed) are naturally
dropped. This class exists only for registry uniformity."""
def apply(self, model, i, t, ctx):
pass
class Fence(EventAgent):
"""When hired (= first available step), one-time bonus: 1 Express Ticket
(+10 scaled into gain_X) and 2 Trade Goods (+20 scaled into tg_pool)."""
def apply_event(self, model, t, ctx):
if t != min(self.available_steps):
return
deposit = model.NewConstant(20)
ctx["tg_pool"][t].append(deposit)
ctx.setdefault("fence_deposits", {})[t] = 20
ticket = model.NewConstant(10)
ctx["gain_X"][t].append(ticket)
# ======================================================================
# PHASE D AGENTS
# ======================================================================
class Vinter(Agent):
"""As Governor, increases Luxury Collection by +2 (=+20 scaled L).
NOTE: no city type in this model yields Luxuries from a Collect
(Hub -> Capital, Foundry -> vat resource, Metropolis picks E/B/S/C),
so this agent currently has NO effect; it is registered for parity with
agents.txt and will need wiring if a Luxury-yielding collection is ever
modeled. Intentional no-op until then (mirrors the Capitalist rule that
the bonus applies only to collections actually yielding the resource)."""
def apply(self, model, i, t, ctx):
# No Luxury-yielding collections exist in the model; intentional no-op.
pass
class Artificer(Agent):
"""As Governor, Collects an extra Trade Good: +10 scaled into tg_pool
when governing a city that takes Collect (analogue of Baron)."""
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
col = ctx["col"].get((i, t))
if col is None:
return
both = m.NewBoolVar("")
m.AddMultiplicationEquality(both, [g, col])
deposit = m.NewIntVar(0, 10, "")
m.Add(deposit == 10).OnlyEnforceIf(both)
m.Add(deposit == 0).OnlyEnforceIf(both.Not())
ctx["tg_pool"][t].append(deposit)
class Cosmopolitan(Agent):
"""When appointed Governor, gains +1 Renown (+10 scaled R). Pays once
per appointment: governs at t but did not govern at t-1 (governing at
the first step counts as an appointment). The Forum-invitation half of
the ability is diplomacy-only and out of scope for this model."""
def apply(self, model, i, t, ctx):
m = model
g = ctx["governor"].get((i, t, self.name))
if g is None:
return
prev = ctx["governor"].get((i, t - 1, self.name), 0) if t > 1 else 0
# appoint = g AND NOT prev (linear form; prev may be a constant)
appoint = m.NewBoolVar("")
m.Add(appoint <= g)
m.Add(appoint <= 1 - prev)
m.Add(appoint >= g - prev)
bonus = m.NewIntVar(0, 10, "")
m.Add(bonus == 10).OnlyEnforceIf(appoint)
m.Add(bonus == 0).OnlyEnforceIf(appoint.Not())
ctx["gain_R"][t].append(bonus)
class Connoisseur(EventAgent):
"""When hired (= first available step), one-time bonus of 5 Luxuries
(+50 scaled into gain_L)."""
def apply_event(self, model, t, ctx):
if t != min(self.available_steps):
return
ctx["gain_L"][t].append(model.NewConstant(50))
class Influencer(EventAgent):
"""Gains +1 Renown (+10 scaled R) at the start of each Turn while hired
(i.e. each available step)."""
def apply_event(self, model, t, ctx):
ctx["gain_R"][t].append(model.NewConstant(10))
class Economist(EventAgent):
"""Gain 1 Renown (=10 scaled R) for every 10 Capital (=100 scaled C)
spent on Collection while hired. Consumes the capital_spent tracker
(Foundry/Metropolis collect costs). Cumulative across hired steps with
integer-division payout: at step t the gain is
10 * (floor(cum_t / 100) - floor(cum_{t-1} / 100))."""
def declare_vars(self, model, ctx):
self._cum = None # cumulative scaled-Capital collection spend so far
self._paid = None # renown units (x1, unscaled) already paid out
def apply_event(self, model, t, ctx):
m = model
cum_max = MAX_RES * NUM_STEPS
spent_terms = ctx["capital_spent"][t]
spent_t = m.NewIntVar(0, MAX_RES, f"econ_spent_{t}")
m.Add(spent_t == (sum(spent_terms) if spent_terms else 0))
cum = m.NewIntVar(0, cum_max, f"econ_cum_{t}")
if self._cum is None:
m.Add(cum == spent_t)
else:
m.Add(cum == self._cum + spent_t)
paid = m.NewIntVar(0, cum_max // 100, f"econ_paid_{t}")
m.AddDivisionEquality(paid, cum, 100)
gain = m.NewIntVar(0, 10 * (cum_max // 100), f"econ_gainR_{t}")
if self._paid is None:
m.Add(gain == 10 * paid)
else:
m.Add(gain == 10 * (paid - self._paid))
ctx["gain_R"][t].append(gain)
self._cum = cum
self._paid = paid
# ======================================================================
# MODEL
# ======================================================================
def solve(
*,
initial=INITIAL,
arrivals=ARRIVALS,
max_res=MAX_RES,
max_vat=MAX_VAT,
time_limit=60.0,
num_workers=8,
verbose=True,
fixed_choices=FIXED_CHOICES,
resource_constraints=None,
objective_factors=None,
objective_mode=None,
):
if objective_factors is None:
objective_factors = OBJECTIVE_FACTORS
if objective_mode is None:
objective_mode = OBJECTIVE_MODE
_validate_objective(objective_factors, objective_mode)
# Normalized copy: every key present, missing keys = 0.
obj_factors = {k: objective_factors.get(k, 0) for k in "EBSCRLX"}
# Pad short initial tuples (legacy 4-tuples) with 0 Renown/Luxuries/Express tickets.
initial = tuple(initial)
if len(initial) < 7:
initial = initial + (0,) * (7 - len(initial))
# ---- build the city list -----------------------------------------
# cities: list[tuple[int, dict]] where the dict has keys "type",
# "adjacent_to", "base", "vats", "departure_step" (see normalize_city).
# Adjacency is consumed by the Industrialist grant wiring below.
cities = []
for s in range(1, NUM_STEPS + 1):
for entry in arrivals.get(s, []):
cities.append((s, normalize_city(entry)))
N = len(cities)
m = cp_model.CpModel()
def AND(a, b):
"""Boolean AND of two 0/1 vars, returned as a new 0/1 var."""
c = m.NewBoolVar("")
m.AddMultiplicationEquality(c, [a, b])
return c
# ---- state variables ---------------------------------------------
isH, isF, isM, isMon, present = {}, {}, {}, {}, {}
hasA, hasB, hasD = {}, {}, {}
vE, vB, vS = {}, {}, {} # foundry vats at start of step t
# action variables (sparse, only created for enabled actions)
col, ua, ub, ud = {}, {}, {}, {}
ow = {}
noop = {}
rH, rF, rM = {}, {}, {}
cvE, cvB, cvS = {}, {}, {}
owcvE, owcvB, owcvS = {}, {}, {}
mE, mB, mS, mC = {}, {}, {}, {}
owmE, owmB, owmS, owmC = {}, {}, {}, {}
for i in range(N):
a_step, a_city = cities[i]
a_type = a_city["type"]
d_step = a_city["departure_step"]
for t in range(1, NUM_STEPS + 2):
isH[i, t] = m.NewBoolVar(f"isH_{i}_{t}")
isF[i, t] = m.NewBoolVar(f"isF_{i}_{t}")
isM[i, t] = m.NewBoolVar(f"isM_{i}_{t}")
isMon[i, t] = m.NewBoolVar(f"isMon_{i}_{t}")
present[i, t] = m.NewBoolVar(f"present_{i}_{t}")
hasA[i, t] = m.NewBoolVar(f"hasA_{i}_{t}")
hasB[i, t] = m.NewBoolVar(f"hasB_{i}_{t}")
hasD[i, t] = m.NewBoolVar(f"hasD_{i}_{t}")
vE[i, t] = m.NewIntVar(0, max_vat, f"vE_{i}_{t}")
vB[i, t] = m.NewIntVar(0, max_vat, f"vB_{i}_{t}")
vS[i, t] = m.NewIntVar(0, max_vat, f"vS_{i}_{t}")
# presence: present from arrival step until departure step (exclusive)
m.Add(present[i, t] == (1 if a_step <= t < d_step else 0))
# exactly one type iff present
m.Add(isH[i, t] + isF[i, t] + isM[i, t] + isMon[i, t] == present[i, t])
# ---- per-step gain/cost accumulators (linear expressions) ---------
gain_E = {t: [] for t in range(1, NUM_STEPS + 1)}
gain_B = {t: [] for t in range(1, NUM_STEPS + 1)}
gain_S = {t: [] for t in range(1, NUM_STEPS + 1)}
gain_C = {t: [] for t in range(1, NUM_STEPS + 1)}
# Renown (R) and Express tickets (X) currently have NO production sources;
# their accumulators exist only for plumbing uniformity and stay empty.
# Luxuries (L) can only be gained via the trade-goods pool split below.
gain_R = {t: [] for t in range(1, NUM_STEPS + 1)}
gain_L = {t: [] for t in range(1, NUM_STEPS + 1)}
gain_X = {t: [] for t in range(1, NUM_STEPS + 1)}
cost_C = {t: [] for t in range(1, NUM_STEPS + 1)}
cost_S = {t: [] for t in range(1, NUM_STEPS + 1)}
# Trade-goods pool (per step): new generic-trade-good sources (Baron, Fence,
# phase B/C) deposit IntVars here; centralized split logic routes the pool
# into gain_E/B/S/C below. Metro picks are NOT routed through here.
tg_pool = {t: [] for t in range(1, NUM_STEPS + 1)}
# Per-step capital spent tracker (mirrors entries appended to cost_C).
capital_spent = {t: [] for t in range(1, NUM_STEPS + 1)}
# ---- Agent Registry (populated by phase B/C/D; empty in phase A) ----
agent_registry = []
event_agent_registry = []
# ---- Action Registry ----
action_registry = []
if ENABLED_ACTIONS.get("collect", True):
action_registry.append(CollectAction())
if ENABLED_ACTIONS.get("upgrade_a", True):
action_registry.append(UpgradeAction("upgrade_a", "A"))
if ENABLED_ACTIONS.get("upgrade_b", True):
action_registry.append(UpgradeAction("upgrade_b", "B"))
if ENABLED_ACTIONS.get("upgrade_d", True):
action_registry.append(UpgradeAction("upgrade_d", "D"))
if ENABLED_ACTIONS.get("overwork", True):
action_registry.append(OverworkAction())
if ENABLED_ACTIONS.get("renovate_h", True):
action_registry.append(RenovateAction("renovate_h", "H"))
if ENABLED_ACTIONS.get("renovate_f", True):
action_registry.append(RenovateAction("renovate_f", "F"))
if ENABLED_ACTIONS.get("noop", True):
action_registry.append(NoOpAction())
# ---- Governor variable matrix ----
# governor[(i, t, name)] = BoolVar (or constant 0) for whether agent `name`
# governs city i at step t. Created only when agent is available at step t
# AND the city has arrived; otherwise NewConstant(0).
governor = {}
arrival_step_map = {i: cities[i][0] for i in range(N)}
for name, avail_steps in AGENT_AVAILABILITY.items():
avail_set = set(avail_steps)
for i in range(N):
for t in range(1, NUM_STEPS + 1):
if t in avail_set and t >= arrival_step_map[i]:
governor[i, t, name] = m.NewBoolVar(f"gov_{name}_{i}_{t}")
else:
governor[i, t, name] = m.NewConstant(0)
# At each (i, t): at most one agent governs city i.
for i in range(N):
for t in range(1, NUM_STEPS + 1):
terms = [governor[i, t, n] for n in AGENT_AVAILABILITY]
if terms:
m.Add(sum(terms) <= 1)
# At each (name, t): at most one city has agent `name` assigned.
for name in AGENT_AVAILABILITY:
for t in range(1, NUM_STEPS + 1):
m.Add(sum(governor[i, t, name] for i in range(N)) <= 1)
# Phase B agent registration
agent_classes = {
"capitalist": Capitalist,
"baron": Baron,
"prodigy": Prodigy,
"provisioner": Provisioner,
"metallurgist": Metallurgist,
"builder": Builder,
"courier": Courier,
"planner": Planner,
"foreman": Foreman,
"industrialist": Industrialist,
"vinter": Vinter,
"artificer": Artificer,
"cosmopolitan": Cosmopolitan,
}
for name, cls in agent_classes.items():
if name in AGENT_AVAILABILITY:
agent_registry.append(cls(name, AGENT_AVAILABILITY[name]))
event_agent_classes = {
"fence": Fence,
"connoisseur": Connoisseur,
"economist": Economist,
"influencer": Influencer,
}
for name, cls in event_agent_classes.items():
if name in AGENT_AVAILABILITY:
event_agent_registry.append(cls(name, AGENT_AVAILABILITY[name]))
# Shared state populated by agent declare_vars (e.g., Builder's fire map).
builder_fire_map = {}
# One-shot agent declarations.
fence_deposits = {}
baron_deposits = {}
_agent_decl_ctx = {
"model": m,
"N": N,
"governor": governor,
"tg_pool": tg_pool,
"capital_spent": capital_spent,
"gain_E": gain_E,
"gain_B": gain_B,
"gain_S": gain_S,
"gain_C": gain_C,
"gain_R": gain_R,
"gain_L": gain_L,
"gain_X": gain_X,
"cost_C": cost_C,
"cost_S": cost_S,
"builder_fire": builder_fire_map,
"fence_deposits": fence_deposits,
"baron_deposits": baron_deposits,
}
for agent in agent_registry:
agent.declare_vars(m, _agent_decl_ctx)
for agent in event_agent_registry:
agent.declare_vars(m, _agent_decl_ctx)
if fixed_choices and "governors" in fixed_choices:
for (city, step, agent_name), value in fixed_choices["governors"].items():
if governor.get((city, step, agent_name)) is not None:
m.Add(governor[city, step, agent_name] == (1 if value else 0))
# ---- Industrialist (Network) grant map -----------------------------
# When the Industrialist governs city i at step t, city i AND every city
# listed in cities[i]'s `adjacent_to` gain the Infrastructure Upgrade
# (ASSUMPTION: modeled as upgrade a / hasA). The grant vars are fed into
# the hasA max-equality transition sources below (same pattern as
# builder_fire -> hasD), so the grant can never contradict the
# transition. Grants aimed at cities that are absent at t+1 (not yet
# arrived or departed) are dropped automatically because no transition
# constraint is built for those steps.
indus_grant_map = {} # (target_city, t) -> [governor BoolVars]
if "industrialist" in AGENT_AVAILABILITY:
_indus_steps = set(AGENT_AVAILABILITY["industrialist"])
for i in range(N):
for t in range(1, NUM_STEPS + 1):
# Skip steps where the var is a hardwired 0 constant.
if t not in _indus_steps or t < arrival_step_map[i]:
continue
g = governor.get((i, t, "industrialist"))
if g is None:
continue
targets = [i] + list(cities[i][1].get("adjacent_to", []))
for j in targets:
if 0 <= j < N:
indus_grant_map.setdefault((j, t), []).append(g)
# ---- per-city logic -----------------------------------------------
for i in range(N):
a_step, a_city = cities[i]
a_type = a_city["type"]
# initial type at arrival step
init = {"H": isH, "F": isF, "M": isM, "N": isMon}
m.Add(init[a_type][i, a_step] == 1)
# no upgrades at arrival
m.Add(hasA[i, a_step] == 0)
m.Add(hasB[i, a_step] == 0)
m.Add(hasD[i, a_step] == 0)
# vats at arrival
m.Add(vE[i, a_step] == a_city["vats"]["E"])
m.Add(vB[i, a_step] == a_city["vats"]["B"])
m.Add(vS[i, a_step] == a_city["vats"]["S"])
# before arrival: everything zero
for t in range(1, a_step):
for v in (isH, isF, isM, isMon, hasA, hasB, hasD, vE, vB, vS):
m.Add(v[i, t] == 0)
# after departure: everything zero from d_step onward
d_step = a_city["departure_step"]
for t in range(d_step, NUM_STEPS + 2):
for v in (isH, isF, isM, isMon, hasA, hasB, hasD, vE, vB, vS):
m.Add(v[i, t] == 0)
# action + transition logic for active steps
for t in range(a_step, min(d_step, NUM_STEPS + 1)):
P = present[i, t]
# Build context for action methods
ctx = {
"noop": noop,
"model": m,
"i": i,
"t": t,
"N": N,
"max_vat": max_vat,
"action_vars": {act.name: {} for act in action_registry},
"arrival_step": {i: a_step for i in range(N)},
"isH": isH,
"isF": isF,
"isM": isM,
"isMon": isMon,
"present": present,
"hasA": hasA,
"hasB": hasB,
"hasD": hasD,
"vE": vE,
"vB": vB,
"vS": vS,
"col": col,
"ua": ua,
"ub": ub,
"ud": ud,
"ow": ow,
"rH": rH,
"rF": rF,
"rM": rM,
"cvE": cvE,
"cvB": cvB,
"cvS": cvS,
"owcvE": owcvE,
"owcvB": owcvB,
"owcvS": owcvS,
"mE": mE,
"mB": mB,
"mS": mS,
"mC": mC,
"owmE": owmE,
"owmB": owmB,
"owmS": owmS,
"owmC": owmC,
"gain_E": gain_E,
"gain_B": gain_B,
"gain_S": gain_S,
"gain_C": gain_C,
"gain_R": gain_R,
"gain_L": gain_L,
"gain_X": gain_X,
"cost_C": cost_C,
"cost_S": cost_S,
"tg_pool": tg_pool,
"capital_spent": capital_spent,
"governor": governor,
"renovations": {},
"vat_next": {},
"builder_fire": builder_fire_map,
"baron_deposits": baron_deposits,
"cities": cities,
"NUM_STEPS": NUM_STEPS,
}
# Declare all action variables
for action in action_registry:
action.declare_vars(m, i, t, ctx)
# After declaring all action variables for (i,t)
if fixed_choices and "actions" in fixed_choices:
action_key = (i, t)
if action_key in fixed_choices["actions"]:
fixed_action = fixed_choices["actions"][action_key]
# Set the chosen action to 1, others to 0
for action in action_registry:
var = action.selector_var(i, t, ctx)
if var is not None:
if action.name == fixed_action:
m.Add(var == 1)
else:
m.Add(var == 0)
# renovations helper (used by multiple actions)
ren = m.NewBoolVar("")
rH_act = rH.get((i, t))
rF_act = rF.get((i, t))
rM_act = rM.get((i, t))
ren_sum = sum(x for x in [rH_act, rF_act, rM_act] if x is not None)
m.Add(ren == ren_sum)
ctx["renovations"][i, t] = ren
no_ren = m.NewBoolVar("")
m.Add(no_ren == 1 - ren)
# Add all constraints
for action in action_registry:
action.add_constraints(m, i, t, ctx)
# LLM allowed renovation into metropolis, need to prevent that now
if rM.get((i, t)) is not None:
m.Add(rM[i, t] == 0)
# Renovation must change type (renovate-to-X requires not-X now)
for r, target in [
(rH.get((i, t)), isH),
(rF.get((i, t)), isF),
(rM.get((i, t)), isM),
]:
if r is not None:
m.Add(target[i, t] == 0).OnlyEnforceIf(r)
# Exactly one action while present (sum all action selectors).
# Foreman (Restructure): when the Foreman governs this city and
# it renovates, the renovation does not consume the action — the
# city MAY take one additional (non-renovate) action this step.
# `extra` is optional (<= foreman AND renovate) so a lone
# renovate is still legal under the Foreman.
action_sum = sum(
action.selector_var(i, t, ctx)
for action in action_registry
if action.selector_var(i, t, ctx) is not None
)
foreman_g = governor.get((i, t, "foreman"))
if foreman_g is not None and "foreman" in AGENT_AVAILABILITY:
fg_ren = AND(foreman_g, ren)
extra = m.NewBoolVar("")
m.Add(extra <= fg_ren)
m.Add(extra <= P)
m.Add(action_sum == P + extra)
else:
m.Add(action_sum == P)
# Agent per-(city, step) hooks - run BEFORE upgrade transitions so
# Builder can register its fire var that feeds into d_keep.
for agent in agent_registry:
agent.apply(m, i, t, ctx)
# Upgrade transitions (skip propagation past last present step)
if t + 1 < d_step:
# Industrialist grants are extra OR-sources for hasA, exactly
# like builder_fire is for hasD below.
a_sources = [hasA[i, t], ua.get((i, t), m.NewConstant(0))]
a_sources += indus_grant_map.get((i, t), [])
m.AddMaxEquality(hasA[i, t + 1], a_sources)
m.AddMaxEquality(
hasB[i, t + 1], [hasB[i, t], ub.get((i, t), m.NewConstant(0))]
)
d_keep = m.NewBoolVar("")
d_sources = [hasD[i, t], ud.get((i, t), m.NewConstant(0))]
_bfire = builder_fire_map.get((i, t))
if _bfire is not None:
d_sources.append(_bfire)
m.AddMaxEquality(d_keep, d_sources)
m.Add(hasD[i, t + 1] == 0).OnlyEnforceIf(ren)
m.Add(hasD[i, t + 1] == d_keep).OnlyEnforceIf(no_ren)
# Gain/cost contributions
for action in action_registry:
action.contributes_gains_costs(m, i, t, ctx)
if t + 1 < d_step:
# Transition contributions (including vats)
for action in action_registry:
action.transition_contrib(m, i, t, ctx)
# Type transition t -> t+1
m.Add(isH[i, t + 1] == isH[i, t]).OnlyEnforceIf(no_ren)
m.Add(isF[i, t + 1] == isF[i, t]).OnlyEnforceIf(no_ren)
m.Add(isM[i, t + 1] == isM[i, t]).OnlyEnforceIf(no_ren)
for r, target in [
(rH.get((i, t)), isH),
(rF.get((i, t)), isF),
(rM.get((i, t)), isM),
]:
if r is not None:
m.Add(target[i, t + 1] == 1).OnlyEnforceIf(r)
for r1, t1, t2 in [
(rH.get((i, t)), isF, isM),
(rF.get((i, t)), isH, isM),
(rM.get((i, t)), isH, isF),
]:
if r1 is not None:
m.Add(t1[i, t + 1] == 0).OnlyEnforceIf(r1)
m.Add(t2[i, t + 1] == 0).OnlyEnforceIf(r1)
# ---- Centralized vat transitions (mirrors main.py exactly) ----
# Collect/overwork vat selectors (None if action disabled)
_cvE = cvE.get((i, t))
_cvB = cvB.get((i, t))
_cvS = cvS.get((i, t))
_owcvE = owcvE.get((i, t))
_owcvB = owcvB.get((i, t))
_owcvS = owcvS.get((i, t))
# Vat increment: 1, +1 with Overflow Vats (hasD) OR the
# Metallurgist governing this Foundry. Non-stacking per
# agents.txt ("Does not stack with actual Upgrade"), hence
# max() rather than a sum.
inc = m.NewIntVar(1, 3, "")
metallurgist_g = governor.get((i, t, "metallurgist"))
if metallurgist_g is not None and "metallurgist" in AGENT_AVAILABILITY:
metal_active = m.NewBoolVar("")
m.AddMultiplicationEquality(
metal_active, [metallurgist_g, isF[i, t]]
)
d_or_metal = m.NewBoolVar("")
m.AddMaxEquality(d_or_metal, [hasD[i, t], metal_active])
m.Add(inc == 1 + d_or_metal)
else:
m.Add(inc == 1 + hasD[i, t])
vEn = m.NewIntVar(0, max_vat, "")
vBn = m.NewIntVar(0, max_vat, "")
vSn = m.NewIntVar(0, max_vat, "")
# For each vat selector (collect E, collect B, collect S, overwork E, overwork B, overwork S)
# apply the same reset/increment pattern from main.py
for sel_E, sel_B, sel_S in [
(_cvE, _cvB, _cvS),
(_owcvE, _owcvB, _owcvS),
]:
if sel_E is not None:
m.Add(vEn == 0).OnlyEnforceIf(sel_E)
m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_E)
m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_E)
if sel_B is not None:
m.Add(vBn == 0).OnlyEnforceIf(sel_B)
m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_B)
m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_B)
if sel_S is not None:
m.Add(vSn == 0).OnlyEnforceIf(sel_S)
m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_S)
m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_S)
# foundry but neither collecting nor overworking: vats unchanged
# f_noncollect_noow = isF AND NOT fcol AND NOT fow = isF - fcol - fow
_fcol_count = sum(s for s in [_cvE, _cvB, _cvS] if s is not None)
_fow_count = sum(s for s in [_owcvE, _owcvB, _owcvS] if s is not None)
f_noncollect_noow = m.NewBoolVar("")
m.Add(f_noncollect_noow == isF[i, t] - _fcol_count - _fow_count)
m.Add(vEn == vE[i, t]).OnlyEnforceIf(f_noncollect_noow)
m.Add(vBn == vB[i, t]).OnlyEnforceIf(f_noncollect_noow)
m.Add(vSn == vS[i, t]).OnlyEnforceIf(f_noncollect_noow)
# assign vat[i, t+1]:
# renovate-to-foundry -> reset to 1
# continuing foundry -> vat_next
# otherwise (not foundry next) -> 0
cont_F = AND(isF[i, t], no_ren)
for vnext, vn in (
(vE[i, t + 1], vEn),
(vB[i, t + 1], vBn),
(vS[i, t + 1], vSn),
):
if rF.get((i, t)) is not None:
m.Add(vnext == 1).OnlyEnforceIf(rF[i, t])
m.Add(vnext == vn).OnlyEnforceIf(cont_F)
not_F_next = isF[i, t + 1].Not()
m.Add(vE[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
m.Add(vB[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
m.Add(vS[i, t + 1] == 0).OnlyEnforceIf(not_F_next)
# ---- global constraints (per action type, per step) ----
for t in range(1, NUM_STEPS + 1):
ctx_global = {"N": N, "ow": ow, "t": t}
for action in action_registry:
action.add_global_constraints(m, t, ctx_global)
# ---- event-agent per-step hooks (registry empty in phase A) ----
_event_ctx = {
"model": m,
"N": N,
"governor": governor,
"tg_pool": tg_pool,
"capital_spent": capital_spent,
"gain_E": gain_E,
"gain_B": gain_B,
"gain_S": gain_S,
"gain_C": gain_C,
"gain_R": gain_R,
"gain_L": gain_L,
"gain_X": gain_X,
"cost_C": cost_C,
"cost_S": cost_S,
"fence_deposits": fence_deposits,
"baron_deposits": baron_deposits,
}
for t in range(1, NUM_STEPS + 1):
for agent in event_agent_registry:
if t in agent.available_steps:
agent.apply_event(m, t, _event_ctx)
# ---- trade-goods pool split (routes pool[t] -> gain_E/B/S/C/L) ----
# Generic-trade-good sources (Baron, Fence in later phases) deposit
# IntVars into tg_pool[t]; here we declare a 5-way split into the
# resource gain accumulators (Luxuries can only be gained this way).
# If the pool is empty for step t, the split vars are constrained to 0.
for t in range(1, NUM_STEPS + 1):
pool_total = m.NewIntVar(0, max_res, f"tg_total_{t}")
if tg_pool[t]:
m.Add(pool_total == sum(tg_pool[t]))
else:
m.Add(pool_total == 0)
tg_E = m.NewIntVar(0, max_res, f"tg_E_{t}")
tg_B = m.NewIntVar(0, max_res, f"tg_B_{t}")
tg_S = m.NewIntVar(0, max_res, f"tg_S_{t}")
tg_C = m.NewIntVar(0, max_res, f"tg_C_{t}")
tg_L = m.NewIntVar(0, max_res, f"tg_L_{t}")
m.Add(tg_E + tg_B + tg_S + tg_C + tg_L == pool_total)
# Each trade good (10 scaled units) must be allocated entirely to one resource
m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_E, 10)
m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_B, 10)
m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_S, 10)
m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_L, 10)
gain_E[t].append(tg_E)
gain_B[t].append(tg_B)
gain_S[t].append(tg_S)
gain_C[t].append(tg_C)
gain_L[t].append(tg_L)
# ---- resource pool recursion --------------------------------------
E = {1: m.NewIntVar(initial[0], initial[0], "E1")}
B = {1: m.NewIntVar(initial[1], initial[1], "B1")}
S = {1: m.NewIntVar(initial[2], initial[2], "S1")}
C = {1: m.NewIntVar(initial[3], initial[3], "C1")}
R = {1: m.NewIntVar(initial[4], initial[4], "R1")}
L = {1: m.NewIntVar(initial[5], initial[5], "L1")}
X = {1: m.NewIntVar(initial[6], initial[6], "X1")}
for t in range(1, NUM_STEPS + 1):
E[t + 1] = m.NewIntVar(0, max_res, f"E_{t + 1}")
B[t + 1] = m.NewIntVar(0, max_res, f"B_{t + 1}")
S[t + 1] = m.NewIntVar(0, max_res, f"S_{t + 1}")
C[t + 1] = m.NewIntVar(0, max_res, f"C_{t + 1}")
R[t + 1] = m.NewIntVar(0, max_res, f"R_{t + 1}")
L[t + 1] = m.NewIntVar(0, max_res, f"L_{t + 1}")
X[t + 1] = m.NewIntVar(0, max_res, f"X_{t + 1}")
m.Add(C[t] - sum(cost_C[t]) >= 0)
m.Add(S[t] - sum(cost_S[t]) >= 0)
m.Add(E[t + 1] == E[t] + sum(gain_E[t]))
m.Add(B[t + 1] == B[t] + sum(gain_B[t]))
m.Add(S[t + 1] == S[t] - sum(cost_S[t]) + sum(gain_S[t]))
m.Add(C[t + 1] == C[t] - sum(cost_C[t]) + sum(gain_C[t]))
m.Add(R[t + 1] == R[t] + sum(gain_R[t]))
m.Add(L[t + 1] == L[t] + sum(gain_L[t]))
m.Add(X[t + 1] == X[t] + sum(gain_X[t]))
finalE, finalB, finalS = E[NUM_STEPS + 1], B[NUM_STEPS + 1], S[NUM_STEPS + 1]
# ---- Apply resource constraints ----
if resource_constraints is None:
resource_constraints = RESOURCE_CONSTRAINTS
if verbose and resource_constraints:
print(f"Adding {len(resource_constraints)} resource constraint(s)")
res_dict = {
"E": E,
"B": B,
"S": S,
"C": C,
"R": R,
"L": L,
"X": X,
}
for i, constraint_fn in enumerate(resource_constraints):
constraint = constraint_fn(res_dict)
if verbose:
print(f" Constraint {i + 1}: {constraint}")
print(f" Type: {type(constraint)}")
print(f" Constraint object: {constraint}")
if constraint is not None:
m.Add(constraint)
# ---- Phase 1: real per-resource ceilings ----
def _ceiling(var):
s = cp_model.CpSolver()
s.parameters.max_time_in_seconds = 20.0
s.parameters.num_search_workers = num_workers
m.Maximize(var)
st = s.Solve(m)
return (
int(s.ObjectiveValue())
if st in (cp_model.OPTIMAL, cp_model.FEASIBLE)
else max_res
)
finals = {
"E": finalE,
"B": finalB,
"S": finalS,
"C": C[NUM_STEPS + 1],
"R": R[NUM_STEPS + 1],
"L": L[NUM_STEPS + 1],
"X": X[NUM_STEPS + 1],
}
# Ceilings only for resources that appear in the objective: each
# ceiling solve costs up to 20s, and only objective resources need
# bounds (product mode) / benefit from the redundant cap constraint.
caps = {}
for k, var in finals.items():
if obj_factors[k] != 0:
caps[k] = _ceiling(var)
m.Add(var <= caps[k])
# ======================================================================
# OBJECTIVE IS SET HERE
# ======================================================================
if objective_mode == "sum":
# Linear: CP-SAT takes weighted sums (negative weights included)
# directly, no auxiliary variables needed.
m.Maximize(sum(f * finals[k] for k, f in obj_factors.items() if f))
else:
# Product: maximize prod(finals[k] ** obj_factors[k]). Expand the
# exponents into a flat factor list and fold pairwise, carrying a
# running upper bound from the Phase-1 caps.
factor_keys = [k for k, f in obj_factors.items() for _ in range(f)]
obj = finals[factor_keys[0]]
bound = caps[factor_keys[0]]
for k in factor_keys[1:]:
bound *= caps[k]
nxt = m.NewIntVar(0, bound, "")
m.AddMultiplicationEquality(nxt, [obj, finals[k]])
obj = nxt
m.Maximize(obj)
# ---- Phase 2: solve the product to optimality ----
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = time_limit
solver.parameters.num_search_workers = num_workers
status = None
if verbose:
status = solver.Solve(
m,
printer.IntermediateSolutionPrinter(
{"electrum": finalE, "brass": finalB, "steel": finalS}, scale=0.1
),
)
else:
status = solver.Solve(m)
if verbose:
caps_str = " ".join(f"{k}<={v}" for k, v in caps.items())
print(f"(resource ceilings used: {caps_str})")
_report(
solver,
status,
cities,
N,
isH,
isF,
isM,
isMon,
col,
ua,
ub,
ud,
ow,
rH,
rF,
rM,
cvE,
cvB,
cvS,
owcvE,
owcvB,
owcvS,
mE,
mB,
mS,
mC,
owmE,
owmB,
owmS,
owmC,
hasA,
hasB,
hasD,
vE,
vB,
vS,
E,
B,
S,
C,
R,
L,
X,
finalE,
finalB,
finalS,
action_registry,
governor,
agent_registry,
capital_spent,
fence_deposits,
baron_deposits,
obj_factors=obj_factors,
obj_mode=objective_mode,
)
return solver, status
def _report(
solver,
status,
cities,
N,
isH,
isF,
isM,
isMon,
col,
ua,
ub,
ud,
ow,
rH,
rF,
rM,
cvE,
cvB,
cvS,
owcvE,
owcvB,
owcvS,
mE,
mB,
mS,
mC,
owmE,
owmB,
owmS,
owmC,
hasA,
hasB,
hasD,
vE,
vB,
vS,
E,
B,
S,
C,
R,
L,
X,
finalE,
finalB,
finalS,
action_registry,
governor=None,
agent_registry=None,
capital_spent=None,
fence_deposits=None,
baron_deposits=None,
obj_factors=None,
obj_mode=None,
):
print("status:", solver.StatusName(status))
if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE):
return
typ_name = {}
for i in range(N):
for t in range(1, NUM_STEPS + 1):
if solver.Value(isH[i, t]):
typ_name[i, t] = "Hub"
elif solver.Value(isF[i, t]):
typ_name[i, t] = "Foundry"
elif solver.Value(isM[i, t]):
typ_name[i, t] = "Metro"
elif solver.Value(isMon[i, t]):
typ_name[i, t] = "Monument"
else:
typ_name[i, t] = "-"
def action_str(i, t):
# Try each action in order until one is active
ctx = {
"typ_name": typ_name,
"cvE": cvE,
"cvB": cvB,
"cvS": cvS,
"mE": mE,
"mB": mB,
"mS": mS,
"mC": mC,
"owcvE": owcvE,
"owcvB": owcvB,
"owcvS": owcvS,
"owmE": owmE,
"owmB": owmB,
"owmS": owmS,
"owmC": owmC,
}
if solver.Value(col.get((i, t), 0)):
if typ_name[i, t] == "Foundry":
v = (
"E"
if solver.Value(cvE[i, t])
else "B"
if solver.Value(cvB[i, t])
else "S"
)
return f"Collect vat {v}"
if typ_name[i, t] == "Metro":
picks = []
for nm, var in (("E", mE), ("B", mB), ("S", mS), ("C", mC)):
picks += [nm] * (solver.Value(var[i, t]) // 10)
return "Collect {" + ",".join(picks) + "}"
return "Collect (+Capital)"
if solver.Value(ow.get((i, t), 0)):
if typ_name[i, t] == "Foundry":
v = (
"E"
if solver.Value(owcvE[i, t])
else "B"
if solver.Value(owcvB[i, t])
else "S"
)
return f"Overwork vat {v} (2x)"
if typ_name[i, t] == "Metro":
picks = []
for nm, var in (("E", owmE), ("B", owmB), ("S", owmS), ("C", owmC)):
picks += [nm] * (solver.Value(var[i, t]) // 10)
return "Overwork {" + ",".join(picks) + "} (2x)"
return "Overwork (+Capital 2x)"
if solver.Value(ua.get((i, t), 0)):
return "Upgrade a (cost-reduction)"
if solver.Value(ub.get((i, t), 0)):
return "Upgrade b (collect-bonus)"
if solver.Value(ud.get((i, t), 0)):
return "Upgrade d (vat-increment)"
if solver.Value(rH.get((i, t), 0)):
return "Renovate -> Hub"
if solver.Value(rF.get((i, t), 0)):
return "Renovate -> Foundry"
if solver.Value(rM.get((i, t), 0)):
return "Renovate -> Metro"
return "(inactive)"
print("\nPer-step pools (start of step):")
print(" step: E B S C R L X")
for t in range(1, NUM_STEPS + 2):
label = f"after5" if t == NUM_STEPS + 1 else f"start {t}"
c_val = solver.Value(C[t])
c_str = f"{c_val / 10:6.1f}"
print(
f" {label:>8}: {solver.Value(E[t]) / 10:6.1f} {solver.Value(B[t]) / 10:6.1f} "
f"{solver.Value(S[t]) / 10:6.1f} {c_str} {solver.Value(R[t]) / 10:6.1f} "
f"{solver.Value(L[t]) / 10:6.1f} {solver.Value(X[t]) / 10:6.1f}"
)
if capital_spent is not None:
print("\n capital spent per step:")
for t in range(1, NUM_STEPS + 1):
spent = sum(solver.Value(x) for x in capital_spent[t]) / 10
print(f" step {t}: {spent:6.1f}")
if fence_deposits:
print("\n Fence deposits (trade goods):")
for t in fence_deposits:
print(
f" step {t}: {fence_deposits[t] / 10:.1f} (scaled: {fence_deposits[t]})"
)
if baron_deposits:
print("\n Baron deposits (trade goods):")
for t in sorted(baron_deposits.keys()):
amt = solver.Value(baron_deposits[t])
print(f" step {t}: {amt / 10:.1f} (scaled: {amt})")
# Resolve governor assignments for display
def gov_for(i, t):
if not governor or not agent_registry:
return "-"
names = [a.name for a in agent_registry]
for name in names:
v = governor.get((i, t, name))
if v is None:
continue
try:
if solver.Value(v):
return name
except Exception:
continue
return "-"
print("\nActions:")
for i in range(N):
a_step, a_city = cities[i]
a_type = a_city["type"] if isinstance(a_city, dict) else a_city
print(f" City {i} (arrives step {a_step} as {a_type}):")
for t in range(a_step, NUM_STEPS + 1):
ups = "".join(
n
for n, h in (("a", hasA), ("b", hasB), ("d", hasD))
if solver.Value(h[i, t])
)
extra = (
f" vats(E{solver.Value(vE[i, t])},B{solver.Value(vB[i, t])},S{solver.Value(vS[i, t])})"
if typ_name[i, t] == "Foundry"
else ""
)
gov = gov_for(i, t)
print(
f" step {t}: [{typ_name[i, t]:7}] {action_str(i, t):28}"
f" upg[{ups}] gov[{gov}]{extra}"
)
fe, fb, fs = solver.Value(finalE), solver.Value(finalB), solver.Value(finalS)
vals = {
"E": fe,
"B": fb,
"S": fs,
"C": solver.Value(C[NUM_STEPS + 1]),
"R": solver.Value(R[NUM_STEPS + 1]),
"L": solver.Value(L[NUM_STEPS + 1]),
"X": solver.Value(X[NUM_STEPS + 1]),
}
if obj_factors is None:
# Legacy fallback: old hardcoded E*B*S display.
obj_str = f"product(scaled) = {fe * fb * fs / 1000}"
elif obj_mode == "sum":
terms = [
f"{f}{k}" if abs(f) != 1 else (k if f > 0 else f"-{k}")
for k, f in obj_factors.items()
if f
]
expr = " + ".join(terms).replace("+ -", "- ")
raw = sum(f * vals[k] for k, f in obj_factors.items())
obj_str = f"objective {expr} = {raw / 10:.1f}"
else:
expr = "*".join(
k if f == 1 else f"{k}^{f}" for k, f in obj_factors.items() if f
)
raw, n = 1, 0
for k, f in obj_factors.items():
raw *= vals[k] ** f
n += f
# Resource values are x10-scaled, so descale by 10^(sum of exponents).
obj_str = f"objective {expr} = {raw / 10**n}"
print(
f"\nFINAL E={fe / 10:.1f} B={fb / 10:.1f} S={fs / 10:.1f} "
f"{obj_str} sum = {(fe + fb + fs) / 10:.1f}"
)
if __name__ == "__main__":
solve()