From d17ffbdb45fdd05e5bcd8a26a00bf0b7fbff7edd Mon Sep 17 00:00:00 2001 From: Pagwin Date: Mon, 8 Jun 2026 14:34:45 -0400 Subject: [PATCH] moved main-2 to main --- main-2.py | 1659 ----------------------------------------------------- main.py | 1553 +++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 1244 insertions(+), 1968 deletions(-) delete mode 100644 main-2.py diff --git a/main-2.py b/main-2.py deleted file mode 100644 index 984d07d..0000000 --- a/main-2.py +++ /dev/null @@ -1,1659 +0,0 @@ -""" -City Resource Optimization -> CP-SAT (Google OR-Tools) [ACTION REGISTRY VERSION] - -Maximise Electrum * Brass * Steel after the gains of step 5. - -The objective is a product of three variables, so this is a constraint- -programming / nonlinear problem, hence CP-SAT (cp_model) rather than the -LP/MIP solver. Read the comments at: - - PARAMETERS (initial pools + arrival schedule + bonus mode) - - "OBJECTIVE IS SET HERE" (the product being maximised -- tweak freely) -""" - -from ortools.sat.python import cp_model -import printer - -# ====================================================================== -# PARAMETERS -- edit these -# ====================================================================== - -# Starting resource pools at the start of step 1: (Electrum, Brass, Steel, Capital) -# Note: all resource values are scaled by 10 internally so that fractional -# trade-good splits (Baron/Fence et al, phase B/C) can be modeled with integers. -# Display in _report divides by 10. -INITIAL = (30, 30, 30, 30) - -# Arrival schedule. Key = step (1..5), value = list of city types that arrive -# at the START of that step. Types: 'H' Hub, 'F' Foundry, 'M' Metropolis, 'N' Monument. -# Total cities across all steps must be <= 7. Arriving cities act that same step. -# Each entry may be either a string shorthand (e.g. "H") or a dict, e.g. -# {"type": "F", "adjacent_to": [city_idx, ...]} -# `adjacent_to` is currently parsed but unused; see normalize_city(). -ARRIVALS = { - 1: ["H", "F", "H", "H", "N"], - 2: [], - 3: [], - 4: [], - 5: [], -} - - -def normalize_city(c): - if isinstance(c, str): - return {"type": c, "adjacent_to": []} - return {"adjacent_to": [], **c} - - -# Collect Bonus (b) adds +1 to whatever a Collect gives. On a foundry that is -# +1 of the resource collected (i.e. the chosen vat's value + 1). This is the -# same uniform rule as Hub (+1 Capital) and Metropolis (+1 resource pick). - -NUM_STEPS = 5 -MAX_RES = ( - 2000 # upper bound on any resource pool (scaled x10; raise if you expect more) -) -MAX_VAT = 15 # upper bound on a foundry vat value (1 + 2*nsteps is plenty); unscaled - - -# Bastion counts indexed by t-1; bastions on map at step t. Reserved for -# phase B/C/D agents (no effect yet). -BASTION_COUNTS = [3, 3, 3, 3, 3] - -# Agent name -> list of steps where the agent is available. -# Empty/missing means the agent is disabled. Phase B/C/D will populate. -AGENT_AVAILABILITY = { - "capitalist": [], - "baron": [], - "prodigy": [], - "provisioner": [], - "metallurgist": [], - "builder": [], - "courier": [], - "planner": [1, 2, 3, 4, 5], - "fence": [], - "foreman": [], - "industrialist": [], - "economist": [], -} - -# ====================================================================== -# ACTION REGISTRY CONFIGURATION -# ====================================================================== - -ENABLED_ACTIONS = { - "collect": True, - "upgrade_a": True, - "upgrade_b": True, - "upgrade_d": True, - "overwork": True, - "renovate_h": True, - "renovate_f": True, - # "renovate_m": False, # currently always disabled in main.py -} - -# ====================================================================== -# ACTION REGISTRY BASE CLASS -# ====================================================================== - - -class Action: - """Base class for all actions. Subclasses implement the action's behavior.""" - - def __init__(self, name): - self.name = name - - def declare_vars(self, model, i, t, ctx): - """Declare the action's decision variable(s) for city i at step t. - - Should create and store a selector var (selector_var) in ctx['action_vars'][self.name]. - """ - pass - - def add_constraints(self, model, i, t, ctx): - """Add precondition and legality constraints for this action.""" - pass - - def add_global_constraints(self, model, t, ctx): - """Add per-step global constraints (e.g. overwork's 'at most 1 per step').""" - pass - - def contributes_gains_costs(self, model, i, t, ctx): - """Add contributions to gain_E/B/S/C and cost_C/S accumulators.""" - pass - - def transition_contrib(self, model, i, t, ctx): - """Handle state transitions (type, upgrade, vat) caused by this action.""" - pass - - def selector_var(self, i, t, ctx): - """Return the boolean variable that is 1 iff this action is chosen for (i,t). - - Used to form the "exactly one action" constraint. - """ - return ctx["action_vars"].get(self.name, {}).get((i, t), None) - - def describe(self, solver, i, t, ctx): - """Return a human-readable string describing what this action does for (i,t). - - Called during reporting if this action is the chosen one. - """ - return f"{self.name}" - - -# (VatsMixin removed — vat transitions are handled centrally in solve()) - - -# ====================================================================== -# CONCRETE ACTIONS -# ====================================================================== - - -class CollectAction(Action): - def __init__(self): - super().__init__("collect") - self.AND_cache = {} - - def declare_vars(self, model, i, t, ctx): - m = model - col = m.NewBoolVar(f"col_{i}_{t}") - cvE = m.NewBoolVar(f"cvE_{i}_{t}") - cvB = m.NewBoolVar(f"cvB_{i}_{t}") - cvS = m.NewBoolVar(f"cvS_{i}_{t}") - mE = m.NewIntVar(0, 30, f"mE_{i}_{t}") - mB = m.NewIntVar(0, 30, f"mB_{i}_{t}") - mS = m.NewIntVar(0, 30, f"mS_{i}_{t}") - mC = m.NewIntVar(0, 30, f"mC_{i}_{t}") - - ctx["action_vars"]["collect"][(i, t)] = col - ctx["col"][i, t] = col - ctx["cvE"][i, t] = cvE - ctx["cvB"][i, t] = cvB - ctx["cvS"][i, t] = cvS - ctx["mE"][i, t] = mE - ctx["mB"][i, t] = mB - ctx["mS"][i, t] = mS - ctx["mC"][i, t] = mC - - def add_constraints(self, model, i, t, ctx): - m = model - isF = ctx["isF"] - isM = ctx["isM"] - isMon = ctx["isMon"] - hasB = ctx["hasB"] - col = ctx["col"][i, t] - cvE, cvB, cvS = ctx["cvE"][i, t], ctx["cvB"][i, t], ctx["cvS"][i, t] - mE, mB, mS, mC = ( - ctx["mE"][i, t], - ctx["mB"][i, t], - ctx["mS"][i, t], - ctx["mC"][i, t], - ) - - # Monument can't collect - m.Add(col == 0).OnlyEnforceIf(isMon[i, t]) - - # Foundry: exactly one vat chosen iff foundry collects - fcol = self._AND(model, isF[i, t], col, ctx) - m.Add(cvE + cvB + cvS == fcol) - - # Metropolis: pick (2 + hasB) resources - mcol = self._AND(model, isM[i, t], col, ctx) - npick = m.NewIntVar(0, 3, "") - m.Add(npick == 2 + hasB[i, t]).OnlyEnforceIf(mcol) - m.Add(npick == 0).OnlyEnforceIf(mcol.Not()) - # Each "pick" is 10 units of the chosen resource (x10 scaling). - m.Add(mE + mB + mS + mC == npick * 10) - - def add_global_constraints(self, model, t, ctx): - pass - - def contributes_gains_costs(self, model, i, t, ctx): - m = model - isH = ctx["isH"] - isF = ctx["isF"] - isM = ctx["isM"] - hasB = ctx["hasB"] - vE, vB, vS = ctx["vE"], ctx["vB"], ctx["vS"] - col = ctx["col"][i, t] - cvE, cvB, cvS = ctx["cvE"][i, t], ctx["cvB"][i, t], ctx["cvS"][i, t] - mE, mB, mS, mC = ( - ctx["mE"][i, t], - ctx["mB"][i, t], - ctx["mS"][i, t], - ctx["mC"][i, t], - ) - - gain_E = ctx["gain_E"][t] - gain_B = ctx["gain_B"][t] - gain_S = ctx["gain_S"][t] - gain_C = ctx["gain_C"][t] - cost_C = ctx["cost_C"][t] - capital_spent = ctx["capital_spent"][t] - - # Collect sub-choices - fcol = self._AND(model, isF[i, t], col, ctx) - mcol = self._AND(model, isM[i, t], col, ctx) - hcol = self._AND(model, isH[i, t], col, ctx) - - # Costs: foundry & metropolis collect each cost 1 Capital (x10 scaled) - fcol_cost = m.NewIntVar(0, 10, "") - m.Add(fcol_cost == 10 * fcol) - cost_C.append(fcol_cost) - capital_spent.append(fcol_cost) - mcol_cost = m.NewIntVar(0, 10, "") - m.Add(mcol_cost == 10 * mcol) - cost_C.append(mcol_cost) - capital_spent.append(mcol_cost) - - # Hub collect: +2 Capital (+1 more if collect-bonus b) -> x10 - hub_gain = m.NewIntVar(0, 30, "") - m.Add(hub_gain == 10 * (2 + hasB[i, t])).OnlyEnforceIf(hcol) - m.Add(hub_gain == 0).OnlyEnforceIf(hcol.Not()) - gain_C.append(hub_gain) - - # Metropolis collect: each "pick" is already 10 units (mE/mB/mS/mC). - gain_E.append(mE) - gain_B.append(mB) - gain_S.append(mS) - gain_C.append(mC) - - # Foundry collect: gain chosen vat's value as that resource (vat unscaled), - # then scale x10 for the resource gain. - gEf = m.NewIntVar(0, ctx["max_vat"], "") - gBf = m.NewIntVar(0, ctx["max_vat"], "") - gSf = m.NewIntVar(0, ctx["max_vat"], "") - m.AddMultiplicationEquality(gEf, [cvE, vE[i, t]]) - m.AddMultiplicationEquality(gBf, [cvB, vB[i, t]]) - m.AddMultiplicationEquality(gSf, [cvS, vS[i, t]]) - gEf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "") - gBf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "") - gSf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "") - m.Add(gEf_scaled == 10 * gEf) - m.Add(gBf_scaled == 10 * gBf) - m.Add(gSf_scaled == 10 * gSf) - gain_E.append(gEf_scaled) - gain_B.append(gBf_scaled) - gain_S.append(gSf_scaled) - - # Collect Bonus (b): adds +1 to the amount a Collect gives -> +10 scaled - be = self._AND(model, cvE, hasB[i, t], ctx) - bb = self._AND(model, cvB, hasB[i, t], ctx) - bs = self._AND(model, cvS, hasB[i, t], ctx) - be10 = m.NewIntVar(0, 10, "") - bb10 = m.NewIntVar(0, 10, "") - bs10 = m.NewIntVar(0, 10, "") - m.Add(be10 == 10 * be) - m.Add(bb10 == 10 * bb) - m.Add(bs10 == 10 * bs) - gain_E.append(be10) - gain_B.append(bb10) - gain_S.append(bs10) - - def transition_contrib(self, model, i, t, ctx): - pass # Vat transitions handled centrally in solve() - - def describe(self, solver, i, t, ctx): - typ_name = ctx["typ_name"].get((i, t), "-") - cvE = ctx["cvE"].get((i, t)) - cvB = ctx["cvB"].get((i, t)) - cvS = ctx["cvS"].get((i, t)) - mE = ctx["mE"].get((i, t)) - mB = ctx["mB"].get((i, t)) - mS = ctx["mS"].get((i, t)) - mC = ctx["mC"].get((i, t)) - - if typ_name == "Foundry": - v = "E" if solver.Value(cvE) else "B" if solver.Value(cvB) else "S" - return f"Collect vat {v}" - if typ_name == "Metro": - picks = [] - for nm, var in (("E", mE), ("B", mB), ("S", mS), ("C", mC)): - picks += [nm] * (solver.Value(var) // 10) - return "Collect {" + ",".join(picks) + "}" - return "Collect (+Capital)" - - def _AND(self, model, a, b, ctx): - key = (id(a), id(b)) - if key not in self.AND_cache: - c = model.NewBoolVar("") - model.AddMultiplicationEquality(c, [a, b]) - self.AND_cache[key] = c - return self.AND_cache[key] - - -class UpgradeAction(Action): - """Base class for upgrade actions (a, b, d).""" - - def __init__(self, name, upgrade_key): - super().__init__(name) - self.upgrade_key = upgrade_key # 'A', 'B', or 'D' - - def declare_vars(self, model, i, t, ctx): - m = model - u = m.NewBoolVar(f"{self.upgrade_key.lower()}_{i}_{t}") - ctx["action_vars"][self.name][(i, t)] = u - ctx[f"u{self.upgrade_key.lower()}"][i, t] = u - - def add_constraints(self, model, i, t, ctx): - m = model - u = ctx[f"u{self.upgrade_key.lower()}"][i, t] - has_key = ctx[f"has{self.upgrade_key}"] - isMon = ctx["isMon"] - isF = ctx["isF"] - - # Can't re-acquire - m.Add(has_key[i, t] == 0).OnlyEnforceIf(u) - - # Monument can't upgrade - m.Add(u == 0).OnlyEnforceIf(isMon[i, t]) - - # D is foundry-only - if self.upgrade_key == "D": - m.Add(isF[i, t] == 1).OnlyEnforceIf(u) - - def add_global_constraints(self, model, t, ctx): - pass - - def contributes_gains_costs(self, model, i, t, ctx): - m = model - u = ctx[f"u{self.upgrade_key.lower()}"][i, t] - hasA = ctx["hasA"] - cost_S = ctx["cost_S"][t] - - # Upgrades b,d cost 2 Steel, reduced by 1 if cost-reduction (a) already held - # (x10 scaled: 20 / 10). - if self.upgrade_key in ("B", "D"): - c = m.NewIntVar(0, 20, "") - both = m.NewBoolVar("") - m.AddMultiplicationEquality(both, [u, hasA[i, t]]) - m.Add(c == 20).OnlyEnforceIf(u, hasA[i, t].Not()) - m.Add(c == 10).OnlyEnforceIf(both) - m.Add(c == 0).OnlyEnforceIf(u.Not()) - cost_S.append(c) - - def transition_contrib(self, model, i, t, ctx): - pass - - def describe(self, solver, i, t, ctx): - if self.upgrade_key == "A": - return "Upgrade a (cost-reduction)" - elif self.upgrade_key == "B": - return "Upgrade b (collect-bonus)" - elif self.upgrade_key == "D": - return "Upgrade d (vat-increment)" - return self.name - - -class OverworkAction(Action): - def __init__(self): - super().__init__("overwork") - self.AND_cache = {} - - def declare_vars(self, model, i, t, ctx): - m = model - ow = m.NewBoolVar(f"ow_{i}_{t}") - owcvE = m.NewBoolVar(f"owcvE_{i}_{t}") - owcvB = m.NewBoolVar(f"owcvB_{i}_{t}") - owcvS = m.NewBoolVar(f"owcvS_{i}_{t}") - owmE = m.NewIntVar(0, 60, f"owmE_{i}_{t}") - owmB = m.NewIntVar(0, 60, f"owmB_{i}_{t}") - owmS = m.NewIntVar(0, 60, f"owmS_{i}_{t}") - owmC = m.NewIntVar(0, 60, f"owmC_{i}_{t}") - - ctx["action_vars"]["overwork"][(i, t)] = ow - ctx["ow"][i, t] = ow - ctx["owcvE"][i, t] = owcvE - ctx["owcvB"][i, t] = owcvB - ctx["owcvS"][i, t] = owcvS - ctx["owmE"][i, t] = owmE - ctx["owmB"][i, t] = owmB - ctx["owmS"][i, t] = owmS - ctx["owmC"][i, t] = owmC - - def add_constraints(self, model, i, t, ctx): - m = model - isF = ctx["isF"] - isM = ctx["isM"] - isMon = ctx["isMon"] - hasB = ctx["hasB"] - ow = ctx["ow"][i, t] - owcvE = ctx["owcvE"][i, t] - owcvB = ctx["owcvB"][i, t] - owcvS = ctx["owcvS"][i, t] - owmE = ctx["owmE"][i, t] - owmB = ctx["owmB"][i, t] - owmS = ctx["owmS"][i, t] - owmC = ctx["owmC"][i, t] - - # Monument can't overwork - m.Add(ow == 0).OnlyEnforceIf(isMon[i, t]) - - # Foundry overwork: exactly one vat chosen iff foundry overworks - fow = self._AND(model, isF[i, t], ow, ctx) - m.Add(owcvE + owcvB + owcvS == fow) - - # Metropolis overwork: pick 2*(2 + hasB) resources - mow = self._AND(model, isM[i, t], ow, ctx) - ow_npick = m.NewIntVar(0, 6, "") - m.Add(ow_npick == 2 * (2 + hasB[i, t])).OnlyEnforceIf(mow) - m.Add(ow_npick == 0).OnlyEnforceIf(mow.Not()) - # Each "pick" is 10 units (x10 scaling). - m.Add(owmE + owmB + owmS + owmC == ow_npick * 10) - - # Overwork cooldown: city that overworked last step can't collect or overwork - if t > ctx["arrival_step"][i]: - col = ctx["col"].get((i, t)) - if col is not None: - m.Add(col == 0).OnlyEnforceIf(ctx["ow"][i, t - 1]) - m.Add(ow == 0).OnlyEnforceIf(ctx["ow"][i, t - 1]) - - # Planner gating: when planner is enabled, a city may overwork only if - # the planner agent governs it. Removes the global "at most 1" cap. - if "planner" in AGENT_AVAILABILITY: - planner_g = ctx["governor"].get((i, t, "planner")) - if planner_g is not None: - m.Add(ow == 0).OnlyEnforceIf(planner_g.Not()) - else: - m.Add(ow == 0) - - def add_global_constraints(self, model, t, ctx): - m = model - ow = ctx["ow"] - N = ctx["N"] - if "planner" not in AGENT_AVAILABILITY: - # At most 1 city overworks per step - m.Add(sum(ow[i, t] for i in range(N)) <= 1) - # else: gating handled per-city by Planner - - def contributes_gains_costs(self, model, i, t, ctx): - m = model - isH = ctx["isH"] - isF = ctx["isF"] - isM = ctx["isM"] - hasB = ctx["hasB"] - vE, vB, vS = ctx["vE"], ctx["vB"], ctx["vS"] - ow = ctx["ow"][i, t] - owcvE = ctx["owcvE"][i, t] - owcvB = ctx["owcvB"][i, t] - owcvS = ctx["owcvS"][i, t] - owmE = ctx["owmE"][i, t] - owmB = ctx["owmB"][i, t] - owmS = ctx["owmS"][i, t] - owmC = ctx["owmC"][i, t] - - gain_E = ctx["gain_E"][t] - gain_B = ctx["gain_B"][t] - gain_S = ctx["gain_S"][t] - gain_C = ctx["gain_C"][t] - - # Overwork sub-choices - fow = self._AND(model, isF[i, t], ow, ctx) - mow = self._AND(model, isM[i, t], ow, ctx) - how = self._AND(model, isH[i, t], ow, ctx) - - # Hub overwork: 2*(2 + hasB) Capital -> x10 - hub_ow_gain = m.NewIntVar(0, 60, "") - m.Add(hub_ow_gain == 20 * (2 + hasB[i, t])).OnlyEnforceIf(how) - m.Add(hub_ow_gain == 0).OnlyEnforceIf(how.Not()) - gain_C.append(hub_ow_gain) - - # Metropolis overwork: each "pick" is already 10 units (owm* vars). - gain_E.append(owmE) - gain_B.append(owmB) - gain_S.append(owmS) - gain_C.append(owmC) - - # Foundry overwork: 2 * chosen vat's value, then x10 scaling. - _owE = m.NewIntVar(0, ctx["max_vat"], "") - _owB = m.NewIntVar(0, ctx["max_vat"], "") - _owS = m.NewIntVar(0, ctx["max_vat"], "") - m.AddMultiplicationEquality(_owE, [owcvE, vE[i, t]]) - m.AddMultiplicationEquality(_owB, [owcvB, vB[i, t]]) - m.AddMultiplicationEquality(_owS, [owcvS, vS[i, t]]) - owgEf = m.NewIntVar(0, 20 * ctx["max_vat"], "") - owgBf = m.NewIntVar(0, 20 * ctx["max_vat"], "") - owgSf = m.NewIntVar(0, 20 * ctx["max_vat"], "") - m.Add(owgEf == 20 * _owE) - m.Add(owgBf == 20 * _owB) - m.Add(owgSf == 20 * _owS) - gain_E.append(owgEf) - gain_B.append(owgBf) - gain_S.append(owgSf) - - # Collect Bonus (b) for foundry overwork: 2 * (+1) of that resource -> x10 = +20 - ow_be = self._AND(model, owcvE, hasB[i, t], ctx) - ow_bb = self._AND(model, owcvB, hasB[i, t], ctx) - ow_bs = self._AND(model, owcvS, hasB[i, t], ctx) - ow_be20 = m.NewIntVar(0, 20, "") - ow_bb20 = m.NewIntVar(0, 20, "") - ow_bs20 = m.NewIntVar(0, 20, "") - m.Add(ow_be20 == 20 * ow_be) - m.Add(ow_bb20 == 20 * ow_bb) - m.Add(ow_bs20 == 20 * ow_bs) - gain_E.append(ow_be20) - gain_B.append(ow_bb20) - gain_S.append(ow_bs20) - - def transition_contrib(self, model, i, t, ctx): - pass # Vat transitions handled centrally in solve() - - def describe(self, solver, i, t, ctx): - typ_name = ctx["typ_name"].get((i, t), "-") - owcvE = ctx["owcvE"].get((i, t)) - owcvB = ctx["owcvB"].get((i, t)) - owcvS = ctx["owcvS"].get((i, t)) - owmE = ctx["owmE"].get((i, t)) - owmB = ctx["owmB"].get((i, t)) - owmS = ctx["owmS"].get((i, t)) - owmC = ctx["owmC"].get((i, t)) - - if typ_name == "Foundry": - v = "E" if solver.Value(owcvE) else "B" if solver.Value(owcvB) else "S" - return f"Overwork vat {v} (2x)" - if typ_name == "Metro": - picks = [] - for nm, var in (("E", owmE), ("B", owmB), ("S", owmS), ("C", owmC)): - picks += [nm] * (solver.Value(var) // 10) - return "Overwork {" + ",".join(picks) + "} (2x)" - return "Overwork (+Capital 2x)" - - def _AND(self, model, a, b, ctx): - key = (id(a), id(b)) - if not hasattr(self, "AND_cache"): - self.AND_cache = {} - if key not in self.AND_cache: - c = model.NewBoolVar("") - model.AddMultiplicationEquality(c, [a, b]) - self.AND_cache[key] = c - return self.AND_cache[key] - - -class RenovateAction(Action): - """Base class for renovation actions.""" - - def __init__(self, name, target_type): - super().__init__(name) - self.target_type = target_type # 'H', 'F', etc. - - def declare_vars(self, model, i, t, ctx): - m = model - r = m.NewBoolVar(f"r{self.target_type}_{i}_{t}") - ctx["action_vars"][self.name][(i, t)] = r - ctx[f"r{self.target_type}"][i, t] = r - - def add_constraints(self, model, i, t, ctx): - m = model - r = ctx[f"r{self.target_type}"][i, t] - target = ctx[f"is{self.target_type}"] - - # Renovate-to-X requires not-X now - m.Add(target[i, t] == 0).OnlyEnforceIf(r) - - def add_global_constraints(self, model, t, ctx): - pass - - def contributes_gains_costs(self, model, i, t, ctx): - pass - - def transition_contrib(self, model, i, t, ctx): - pass - - def describe(self, solver, i, t, ctx): - type_name = {"H": "Hub", "F": "Foundry", "M": "Metro"} - return f"Renovate -> {type_name.get(self.target_type, self.target_type)}" - - -# ====================================================================== -# AGENT REGISTRY BASE CLASSES -# ====================================================================== - - -class Agent: - """Governor-style agent. Assigned to a city per step via governor[i,t,name].""" - - name: str - available_steps: list # steps where this agent may be assigned - - def __init__(self, name, available_steps): - self.name = name - self.available_steps = list(available_steps) - - def declare_vars(self, model, ctx): - """One-time vars (e.g. one-shot flags). Called once before per-(i,t) loop.""" - pass - - def apply(self, model, i, t, ctx): - """Per-(city,step) effect. Gate logic on ctx['governor'][i,t,self.name].""" - pass - - -class EventAgent: - """Triggers automatically each available step. No city target. - Reads/writes ctx (typically tg_pool or gain_*).""" - - name: str - available_steps: list - - def __init__(self, name, available_steps): - self.name = name - self.available_steps = list(available_steps) - - def declare_vars(self, model, ctx): - pass - - def apply_event(self, model, t, ctx): - pass - - -# ====================================================================== -# PHASE B AGENTS -# ====================================================================== - - -class Capitalist(Agent): - """+20 Capital (scaled = +2 Capital) when governing a Collect action.""" - - def apply(self, model, i, t, ctx): - m = model - g = ctx["governor"].get((i, t, self.name)) - if g is None: - return - col = ctx["col"].get((i, t)) - if col is None: - return - both = m.NewBoolVar("") - m.AddMultiplicationEquality(both, [g, col]) - bonus = m.NewIntVar(0, 20, "") - m.Add(bonus == 20).OnlyEnforceIf(both) - m.Add(bonus == 0).OnlyEnforceIf(both.Not()) - ctx["gain_C"][t].append(bonus) - - -class Baron(Agent): - """BASTION_COUNTS[t-1]*10 trade-goods when governing a Collect action.""" - - def apply(self, model, i, t, ctx): - m = model - g = ctx["governor"].get((i, t, self.name)) - if g is None: - return - col = ctx["col"].get((i, t)) - if col is None: - return - amt = BASTION_COUNTS[t - 1] * 10 - both = m.NewBoolVar("") - m.AddMultiplicationEquality(both, [g, col]) - deposit = m.NewIntVar(0, max(amt, 0), "") - m.Add(deposit == amt).OnlyEnforceIf(both) - m.Add(deposit == 0).OnlyEnforceIf(both.Not()) - ctx["tg_pool"][t].append(deposit) - - -class Prodigy(Agent): - """+20 Steel (refund 2 Steel) when governing AND city upgrades B or D.""" - - def apply(self, model, i, t, ctx): - m = model - g = ctx["governor"].get((i, t, self.name)) - if g is None: - return - for u_var in (ctx["ub"].get((i, t)), ctx["ud"].get((i, t))): - if u_var is None: - continue - both = m.NewBoolVar("") - m.AddMultiplicationEquality(both, [g, u_var]) - refund = m.NewIntVar(0, 20, "") - m.Add(refund == 20).OnlyEnforceIf(both) - m.Add(refund == 0).OnlyEnforceIf(both.Not()) - ctx["gain_S"][t].append(refund) - - -class Provisioner(Agent): - """+15 Electrum (+1.5 scaled) when governing, unconditional.""" - - def apply(self, model, i, t, ctx): - m = model - g = ctx["governor"].get((i, t, self.name)) - if g is None: - return - bonus = m.NewIntVar(0, 15, "") - m.Add(bonus == 15).OnlyEnforceIf(g) - m.Add(bonus == 0).OnlyEnforceIf(g.Not()) - ctx["gain_E"][t].append(bonus) - - -class Metallurgist(Agent): - """Foundry-only: adds +1 to vat increment. Implemented centrally in solve(). - This class exists only for registry uniformity.""" - - def apply(self, model, i, t, ctx): - # Vat-level effect handled in solve()'s centralized vat-transition block. - pass - - -class Builder(Agent): - """One-shot: grants hasD[i,t+1]=1 when governing a Foundry city. - Fires at most once across all (i,t).""" - - def declare_vars(self, model, ctx): - ctx.setdefault("builder_fire", {}) - - def apply(self, model, i, t, ctx): - m = model - g = ctx["governor"].get((i, t, self.name)) - if g is None: - return - isF = ctx["isF"] - fire = m.NewBoolVar(f"builder_fire_{i}_{t}") - m.AddMultiplicationEquality(fire, [g, isF[i, t]]) - ctx.setdefault("builder_fire", {})[(i, t)] = fire - - -class Courier(Agent): - """One-shot globally: +30 to gain_C, gain_S, gain_B on the first step - where Courier governs anywhere.""" - - def declare_vars(self, model, ctx): - m = model - N = ctx["N"] - governor = ctx["governor"] - gain_C = ctx["gain_C"] - gain_S = ctx["gain_S"] - gain_B = ctx["gain_B"] - - courier_any = {} - courier_fired = {} - ever_fired = {} - prev_ever = None - for t in range(1, NUM_STEPS + 1): - terms = [] - for i in range(N): - v = governor.get((i, t, self.name)) - if v is not None: - terms.append(v) - any_t = m.NewBoolVar(f"courier_any_{t}") - if terms: - # any_t = OR(terms) - m.AddMaxEquality(any_t, terms) - else: - m.Add(any_t == 0) - courier_any[t] = any_t - - fired_t = m.NewBoolVar(f"courier_fired_{t}") - if prev_ever is None: - # fired_t == any_t (no prior history) - m.Add(fired_t == any_t) - else: - # fired_t = any_t AND NOT prev_ever - m.Add(fired_t <= any_t) - m.Add(fired_t <= 1 - prev_ever) - m.Add(fired_t >= any_t - prev_ever) - courier_fired[t] = fired_t - - ever_t = m.NewBoolVar(f"courier_ever_{t}") - if prev_ever is None: - m.Add(ever_t == fired_t) - else: - m.AddMaxEquality(ever_t, [prev_ever, fired_t]) - ever_fired[t] = ever_t - prev_ever = ever_t - - # Add +30 contribution to gain_C/S/B - for accum in (gain_C[t], gain_S[t], gain_B[t]): - bonus = m.NewIntVar(0, 30, "") - m.Add(bonus == 30).OnlyEnforceIf(fired_t) - m.Add(bonus == 0).OnlyEnforceIf(fired_t.Not()) - accum.append(bonus) - - ctx["courier_any"] = courier_any - ctx["courier_fired"] = courier_fired - - def apply(self, model, i, t, ctx): - pass - - -# ====================================================================== -# PHASE C AGENTS -# ====================================================================== - - -class Planner(Agent): - """Effect realized via OverworkAction (per-city gating). No-op here.""" - - def apply(self, model, i, t, ctx): - pass - - -class Foreman(Agent): - """TODO: Restructure ability — allows other industry actions during renovation. - The current model uses 'exactly one action per city per step', so renovation - already blocks no specific action separately. Foreman becomes meaningful only - once renovation-blocks-action semantics are modeled. - """ - - def apply(self, model, i, t, ctx): - pass - - -class Industrialist(Agent): - """TODO: Network ability — grants Infrastructure to governed city and its - adjacent cities when appointed governor. Requires the city's adjacency list - (city_dict['adjacent_to']) to be populated and an Infrastructure mechanic - in the model. Neither is implemented yet. - """ - - def apply(self, model, i, t, ctx): - pass - - -class Economist(Agent): - """TODO: Velocity of Money — gain 1 Renown per 10 Capital collected and spent - on Collection. Renown is not in the objective (current objective is E²·B·S²). - capital_spent tracking exists (ctx['capital_spent']) but renown accumulator - and objective integration do not. - """ - - def apply(self, model, i, t, ctx): - pass - - -class Fence(EventAgent): - """Each available step, deposits +20 (=2 trade goods x10) into tg_pool[t].""" - - def apply_event(self, model, t, ctx): - ctx["tg_pool"][t].append(model.NewConstant(20)) - - -# ====================================================================== -# MODEL -# ====================================================================== - - -def solve( - initial=INITIAL, - arrivals=ARRIVALS, - max_res=MAX_RES, - max_vat=MAX_VAT, - time_limit=60.0, - num_workers=8, - verbose=True, -): - - # ---- build the city list ----------------------------------------- - # cities: list[tuple[int, dict]] where the dict has keys "type" and - # "adjacent_to". TODO: adjacency unused; for Industrialist agent. - cities = [] - for s in range(1, NUM_STEPS + 1): - for entry in arrivals.get(s, []): - cities.append((s, normalize_city(entry))) - N = len(cities) - - m = cp_model.CpModel() - - def AND(a, b): - """Boolean AND of two 0/1 vars, returned as a new 0/1 var.""" - c = m.NewBoolVar("") - m.AddMultiplicationEquality(c, [a, b]) - return c - - # ---- state variables --------------------------------------------- - isH, isF, isM, isMon, present = {}, {}, {}, {}, {} - hasA, hasB, hasD = {}, {}, {} - vE, vB, vS = {}, {}, {} # foundry vats at start of step t - - # action variables (sparse, only created for enabled actions) - col, ua, ub, ud = {}, {}, {}, {} - ow = {} - rH, rF, rM = {}, {}, {} - cvE, cvB, cvS = {}, {}, {} - owcvE, owcvB, owcvS = {}, {}, {} - mE, mB, mS, mC = {}, {}, {}, {} - owmE, owmB, owmS, owmC = {}, {}, {}, {} - - for i in range(N): - a_step, a_city = cities[i] - a_type = a_city["type"] - for t in range(1, NUM_STEPS + 2): - isH[i, t] = m.NewBoolVar(f"isH_{i}_{t}") - isF[i, t] = m.NewBoolVar(f"isF_{i}_{t}") - isM[i, t] = m.NewBoolVar(f"isM_{i}_{t}") - isMon[i, t] = m.NewBoolVar(f"isMon_{i}_{t}") - present[i, t] = m.NewBoolVar(f"present_{i}_{t}") - hasA[i, t] = m.NewBoolVar(f"hasA_{i}_{t}") - hasB[i, t] = m.NewBoolVar(f"hasB_{i}_{t}") - hasD[i, t] = m.NewBoolVar(f"hasD_{i}_{t}") - vE[i, t] = m.NewIntVar(0, max_vat, f"vE_{i}_{t}") - vB[i, t] = m.NewIntVar(0, max_vat, f"vB_{i}_{t}") - vS[i, t] = m.NewIntVar(0, max_vat, f"vS_{i}_{t}") - - # presence: present from arrival step onward, persists - m.Add(present[i, t] == (1 if t >= a_step else 0)) - # exactly one type iff present - m.Add(isH[i, t] + isF[i, t] + isM[i, t] + isMon[i, t] == present[i, t]) - - # ---- per-step gain/cost accumulators (linear expressions) --------- - gain_E = {t: [] for t in range(1, NUM_STEPS + 1)} - gain_B = {t: [] for t in range(1, NUM_STEPS + 1)} - gain_S = {t: [] for t in range(1, NUM_STEPS + 1)} - gain_C = {t: [] for t in range(1, NUM_STEPS + 1)} - cost_C = {t: [] for t in range(1, NUM_STEPS + 1)} - cost_S = {t: [] for t in range(1, NUM_STEPS + 1)} - - # Trade-goods pool (per step): new generic-trade-good sources (Baron, Fence, - # phase B/C) deposit IntVars here; centralized split logic routes the pool - # into gain_E/B/S/C below. Metro picks are NOT routed through here. - tg_pool = {t: [] for t in range(1, NUM_STEPS + 1)} - - # Per-step capital spent tracker (mirrors entries appended to cost_C). - capital_spent = {t: [] for t in range(1, NUM_STEPS + 1)} - - # ---- Agent Registry (populated by phase B/C/D; empty in phase A) ---- - agent_registry = [] - event_agent_registry = [] - - # ---- Action Registry ---- - action_registry = [] - if ENABLED_ACTIONS.get("collect", True): - action_registry.append(CollectAction()) - if ENABLED_ACTIONS.get("upgrade_a", True): - action_registry.append(UpgradeAction("upgrade_a", "A")) - if ENABLED_ACTIONS.get("upgrade_b", True): - action_registry.append(UpgradeAction("upgrade_b", "B")) - if ENABLED_ACTIONS.get("upgrade_d", True): - action_registry.append(UpgradeAction("upgrade_d", "D")) - if ENABLED_ACTIONS.get("overwork", True): - action_registry.append(OverworkAction()) - if ENABLED_ACTIONS.get("renovate_h", True): - action_registry.append(RenovateAction("renovate_h", "H")) - if ENABLED_ACTIONS.get("renovate_f", True): - action_registry.append(RenovateAction("renovate_f", "F")) - - # ---- Governor variable matrix ---- - # governor[(i, t, name)] = BoolVar (or constant 0) for whether agent `name` - # governs city i at step t. Created only when agent is available at step t - # AND the city has arrived; otherwise NewConstant(0). - governor = {} - arrival_step_map = {i: cities[i][0] for i in range(N)} - for name, avail_steps in AGENT_AVAILABILITY.items(): - avail_set = set(avail_steps) - for i in range(N): - for t in range(1, NUM_STEPS + 1): - if t in avail_set and t >= arrival_step_map[i]: - governor[i, t, name] = m.NewBoolVar(f"gov_{name}_{i}_{t}") - else: - governor[i, t, name] = m.NewConstant(0) - - # At each (i, t): at most one agent governs city i. - for i in range(N): - for t in range(1, NUM_STEPS + 1): - terms = [governor[i, t, n] for n in AGENT_AVAILABILITY] - if terms: - m.Add(sum(terms) <= 1) - - # At each (name, t): at most one city has agent `name` assigned. - for name in AGENT_AVAILABILITY: - for t in range(1, NUM_STEPS + 1): - m.Add(sum(governor[i, t, name] for i in range(N)) <= 1) - - # Phase B agent registration - agent_classes = { - "capitalist": Capitalist, - "baron": Baron, - "prodigy": Prodigy, - "provisioner": Provisioner, - "metallurgist": Metallurgist, - "builder": Builder, - "courier": Courier, - "planner": Planner, - "foreman": Foreman, - "industrialist": Industrialist, - "economist": Economist, - } - for name, cls in agent_classes.items(): - if name in AGENT_AVAILABILITY: - agent_registry.append(cls(name, AGENT_AVAILABILITY[name])) - - event_agent_classes = { - "fence": Fence, - } - for name, cls in event_agent_classes.items(): - if name in AGENT_AVAILABILITY: - event_agent_registry.append(cls(name, AGENT_AVAILABILITY[name])) - - # Shared state populated by agent declare_vars (e.g., Builder's fire map). - builder_fire_map = {} - - # One-shot agent declarations. - _agent_decl_ctx = { - "model": m, - "N": N, - "governor": governor, - "tg_pool": tg_pool, - "capital_spent": capital_spent, - "gain_E": gain_E, - "gain_B": gain_B, - "gain_S": gain_S, - "gain_C": gain_C, - "cost_C": cost_C, - "cost_S": cost_S, - "builder_fire": builder_fire_map, - } - for agent in agent_registry: - agent.declare_vars(m, _agent_decl_ctx) - for agent in event_agent_registry: - agent.declare_vars(m, _agent_decl_ctx) - - # ---- per-city logic ----------------------------------------------- - for i in range(N): - a_step, a_city = cities[i] - a_type = a_city["type"] - - # initial type at arrival step - init = {"H": isH, "F": isF, "M": isM, "N": isMon} - m.Add(init[a_type][i, a_step] == 1) - # no upgrades at arrival - m.Add(hasA[i, a_step] == 0) - m.Add(hasB[i, a_step] == 0) - m.Add(hasD[i, a_step] == 0) - # vats at arrival - m.Add(vE[i, a_step] == 1) - m.Add(vB[i, a_step] == 1) - m.Add(vS[i, a_step] == 1) - - # before arrival: everything zero - for t in range(1, a_step): - for v in (isH, isF, isM, isMon, hasA, hasB, hasD, vE, vB, vS): - m.Add(v[i, t] == 0) - - # action + transition logic for active steps - for t in range(a_step, NUM_STEPS + 1): - P = present[i, t] - - # Build context for action methods - ctx = { - "model": m, - "i": i, - "t": t, - "N": N, - "max_vat": max_vat, - "action_vars": {act.name: {} for act in action_registry}, - "arrival_step": {i: a_step for i in range(N)}, - "isH": isH, - "isF": isF, - "isM": isM, - "isMon": isMon, - "present": present, - "hasA": hasA, - "hasB": hasB, - "hasD": hasD, - "vE": vE, - "vB": vB, - "vS": vS, - "col": col, - "ua": ua, - "ub": ub, - "ud": ud, - "ow": ow, - "rH": rH, - "rF": rF, - "rM": rM, - "cvE": cvE, - "cvB": cvB, - "cvS": cvS, - "owcvE": owcvE, - "owcvB": owcvB, - "owcvS": owcvS, - "mE": mE, - "mB": mB, - "mS": mS, - "mC": mC, - "owmE": owmE, - "owmB": owmB, - "owmS": owmS, - "owmC": owmC, - "gain_E": gain_E, - "gain_B": gain_B, - "gain_S": gain_S, - "gain_C": gain_C, - "cost_C": cost_C, - "cost_S": cost_S, - "tg_pool": tg_pool, - "capital_spent": capital_spent, - "governor": governor, - "renovations": {}, - "vat_next": {}, - "builder_fire": builder_fire_map, - } - - # Declare all action variables - for action in action_registry: - action.declare_vars(m, i, t, ctx) - - # renovations helper (used by multiple actions) - ren = m.NewBoolVar("") - rH_act = rH.get((i, t)) - rF_act = rF.get((i, t)) - rM_act = rM.get((i, t)) - ren_sum = sum(x for x in [rH_act, rF_act, rM_act] if x is not None) - m.Add(ren == ren_sum) - ctx["renovations"][i, t] = ren - no_ren = m.NewBoolVar("") - m.Add(no_ren == 1 - ren) - - # Add all constraints - for action in action_registry: - action.add_constraints(m, i, t, ctx) - - # LLM allowed renovation into metropolis, need to prevent that now - if rM.get((i, t)) is not None: - m.Add(rM[i, t] == 0) - - # Renovation must change type (renovate-to-X requires not-X now) - for r, target in [ - (rH.get((i, t)), isH), - (rF.get((i, t)), isF), - (rM.get((i, t)), isM), - ]: - if r is not None: - m.Add(target[i, t] == 0).OnlyEnforceIf(r) - - # Exactly one action while present (sum all action selectors) - action_sum = sum( - action.selector_var(i, t, ctx) - for action in action_registry - if action.selector_var(i, t, ctx) is not None - ) - m.Add(action_sum == P) - - # Agent per-(city, step) hooks - run BEFORE upgrade transitions so - # Builder can register its fire var that feeds into d_keep. - for agent in agent_registry: - agent.apply(m, i, t, ctx) - - # Upgrade transitions - m.AddMaxEquality( - hasA[i, t + 1], [hasA[i, t], ua.get((i, t), m.NewConstant(0))] - ) - m.AddMaxEquality( - hasB[i, t + 1], [hasB[i, t], ub.get((i, t), m.NewConstant(0))] - ) - d_keep = m.NewBoolVar("") - d_sources = [hasD[i, t], ud.get((i, t), m.NewConstant(0))] - _bfire = builder_fire_map.get((i, t)) - if _bfire is not None: - d_sources.append(_bfire) - m.AddMaxEquality(d_keep, d_sources) - m.Add(hasD[i, t + 1] == 0).OnlyEnforceIf(ren) - m.Add(hasD[i, t + 1] == d_keep).OnlyEnforceIf(no_ren) - - # Gain/cost contributions - for action in action_registry: - action.contributes_gains_costs(m, i, t, ctx) - - # Transition contributions (including vats) - for action in action_registry: - action.transition_contrib(m, i, t, ctx) - - # Type transition t -> t+1 - m.Add(isH[i, t + 1] == isH[i, t]).OnlyEnforceIf(no_ren) - m.Add(isF[i, t + 1] == isF[i, t]).OnlyEnforceIf(no_ren) - m.Add(isM[i, t + 1] == isM[i, t]).OnlyEnforceIf(no_ren) - for r, target in [ - (rH.get((i, t)), isH), - (rF.get((i, t)), isF), - (rM.get((i, t)), isM), - ]: - if r is not None: - m.Add(target[i, t + 1] == 1).OnlyEnforceIf(r) - - for r1, t1, t2 in [ - (rH.get((i, t)), isF, isM), - (rF.get((i, t)), isH, isM), - (rM.get((i, t)), isH, isF), - ]: - if r1 is not None: - m.Add(t1[i, t + 1] == 0).OnlyEnforceIf(r1) - m.Add(t2[i, t + 1] == 0).OnlyEnforceIf(r1) - - # ---- Centralized vat transitions (mirrors main.py exactly) ---- - # Collect/overwork vat selectors (None if action disabled) - _cvE = cvE.get((i, t)) - _cvB = cvB.get((i, t)) - _cvS = cvS.get((i, t)) - _owcvE = owcvE.get((i, t)) - _owcvB = owcvB.get((i, t)) - _owcvS = owcvS.get((i, t)) - - inc = m.NewIntVar(1, 3, "") - metallurgist_g = governor.get((i, t, "metallurgist")) - if metallurgist_g is not None and "metallurgist" in AGENT_AVAILABILITY: - metal_active = m.NewBoolVar("") - m.AddMultiplicationEquality(metal_active, [metallurgist_g, isF[i, t]]) - m.Add(inc == 1 + hasD[i, t] + metal_active) - else: - m.Add(inc == 1 + hasD[i, t]) - - vEn = m.NewIntVar(0, max_vat, "") - vBn = m.NewIntVar(0, max_vat, "") - vSn = m.NewIntVar(0, max_vat, "") - - # For each vat selector (collect E, collect B, collect S, overwork E, overwork B, overwork S) - # apply the same reset/increment pattern from main.py - for sel_E, sel_B, sel_S in [ - (_cvE, _cvB, _cvS), - (_owcvE, _owcvB, _owcvS), - ]: - if sel_E is not None: - m.Add(vEn == 0).OnlyEnforceIf(sel_E) - m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_E) - m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_E) - if sel_B is not None: - m.Add(vBn == 0).OnlyEnforceIf(sel_B) - m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_B) - m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_B) - if sel_S is not None: - m.Add(vSn == 0).OnlyEnforceIf(sel_S) - m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_S) - m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_S) - - # foundry but neither collecting nor overworking: vats unchanged - # f_noncollect_noow = isF AND NOT fcol AND NOT fow = isF - fcol - fow - _fcol_count = sum(s for s in [_cvE, _cvB, _cvS] if s is not None) - _fow_count = sum(s for s in [_owcvE, _owcvB, _owcvS] if s is not None) - f_noncollect_noow = m.NewBoolVar("") - m.Add(f_noncollect_noow == isF[i, t] - _fcol_count - _fow_count) - m.Add(vEn == vE[i, t]).OnlyEnforceIf(f_noncollect_noow) - m.Add(vBn == vB[i, t]).OnlyEnforceIf(f_noncollect_noow) - m.Add(vSn == vS[i, t]).OnlyEnforceIf(f_noncollect_noow) - - # assign vat[i, t+1]: - # renovate-to-foundry -> reset to 1 - # continuing foundry -> vat_next - # otherwise (not foundry next) -> 0 - cont_F = AND(isF[i, t], no_ren) - for vnext, vn in ( - (vE[i, t + 1], vEn), - (vB[i, t + 1], vBn), - (vS[i, t + 1], vSn), - ): - if rF.get((i, t)) is not None: - m.Add(vnext == 1).OnlyEnforceIf(rF[i, t]) - m.Add(vnext == vn).OnlyEnforceIf(cont_F) - not_F_next = isF[i, t + 1].Not() - m.Add(vE[i, t + 1] == 0).OnlyEnforceIf(not_F_next) - m.Add(vB[i, t + 1] == 0).OnlyEnforceIf(not_F_next) - m.Add(vS[i, t + 1] == 0).OnlyEnforceIf(not_F_next) - - # ---- Builder one-shot: at most one fire across all (i, t) ---- - if builder_fire_map: - m.Add(sum(builder_fire_map.values()) <= 1) - - # ---- global constraints (per action type, per step) ---- - for t in range(1, NUM_STEPS + 1): - ctx_global = {"N": N, "ow": ow, "t": t} - for action in action_registry: - action.add_global_constraints(m, t, ctx_global) - - # ---- event-agent per-step hooks (registry empty in phase A) ---- - _event_ctx = { - "model": m, - "N": N, - "governor": governor, - "tg_pool": tg_pool, - "capital_spent": capital_spent, - "gain_E": gain_E, - "gain_B": gain_B, - "gain_S": gain_S, - "gain_C": gain_C, - "cost_C": cost_C, - "cost_S": cost_S, - } - for t in range(1, NUM_STEPS + 1): - for agent in event_agent_registry: - if t in agent.available_steps: - agent.apply_event(m, t, _event_ctx) - - # ---- trade-goods pool split (routes pool[t] -> gain_E/B/S/C) ---- - # Generic-trade-good sources (Baron, Fence in later phases) deposit - # IntVars into tg_pool[t]; here we declare a 4-way split into the - # resource gain accumulators. If the pool is empty for step t, the split - # vars are constrained to 0. - for t in range(1, NUM_STEPS + 1): - pool_total = m.NewIntVar(0, max_res, f"tg_total_{t}") - if tg_pool[t]: - m.Add(pool_total == sum(tg_pool[t])) - else: - m.Add(pool_total == 0) - tg_E = m.NewIntVar(0, max_res, f"tg_E_{t}") - tg_B = m.NewIntVar(0, max_res, f"tg_B_{t}") - tg_S = m.NewIntVar(0, max_res, f"tg_S_{t}") - tg_C = m.NewIntVar(0, max_res, f"tg_C_{t}") - m.Add(tg_E + tg_B + tg_S + tg_C == pool_total) - gain_E[t].append(tg_E) - gain_B[t].append(tg_B) - gain_S[t].append(tg_S) - gain_C[t].append(tg_C) - - # ---- resource pool recursion -------------------------------------- - E = {1: m.NewIntVar(initial[0], initial[0], "E1")} - B = {1: m.NewIntVar(initial[1], initial[1], "B1")} - S = {1: m.NewIntVar(initial[2], initial[2], "S1")} - C = {1: m.NewIntVar(initial[3], initial[3], "C1")} - for t in range(1, NUM_STEPS + 1): - E[t + 1] = m.NewIntVar(0, max_res, f"E_{t + 1}") - B[t + 1] = m.NewIntVar(0, max_res, f"B_{t + 1}") - S[t + 1] = m.NewIntVar(0, max_res, f"S_{t + 1}") - C[t + 1] = m.NewIntVar(0, max_res, f"C_{t + 1}") - m.Add(C[t] - sum(cost_C[t]) >= 0) - m.Add(S[t] - sum(cost_S[t]) >= 0) - m.Add(E[t + 1] == E[t] + sum(gain_E[t])) - m.Add(B[t + 1] == B[t] + sum(gain_B[t])) - m.Add(S[t + 1] == S[t] - sum(cost_S[t]) + sum(gain_S[t])) - m.Add(C[t + 1] == C[t] - sum(cost_C[t]) + sum(gain_C[t])) - - finalE, finalB, finalS = E[NUM_STEPS + 1], B[NUM_STEPS + 1], S[NUM_STEPS + 1] - - # ---- Phase 1: real per-resource ceilings ---- - def _ceiling(var): - s = cp_model.CpSolver() - s.parameters.max_time_in_seconds = 20.0 - s.parameters.num_search_workers = num_workers - m.Maximize(var) - st = s.Solve(m) - return ( - int(s.ObjectiveValue()) - if st in (cp_model.OPTIMAL, cp_model.FEASIBLE) - else max_res - ) - - capE = _ceiling(finalE) - capB = _ceiling(finalB) - capS = _ceiling(finalS) - m.Add(finalE <= capE) - m.Add(finalB <= capB) - m.Add(finalS <= capS) - - # ====================================================================== - # OBJECTIVE IS SET HERE - # ====================================================================== - def Eprod(v): - return v * v - - def Bprod(v): - return v - - def Sprod(v): - return v * v - - prodEE = m.NewIntVar(0, Eprod(capE), "prodEE") - m.AddMultiplicationEquality(prodEE, [finalE]) - prodSS = m.NewIntVar(0, Sprod(capS), "prodSS") - m.AddMultiplicationEquality(prodSS, [finalS]) - prodBB = m.NewIntVar(0, Bprod(capB), "prodBB") - m.AddMultiplicationEquality(prodBB, [finalB]) - prodEB = m.NewIntVar(0, Eprod(capE) * Bprod(capB), "prodEB") - m.AddMultiplicationEquality(prodEB, [prodEE, prodBB]) - obj = m.NewIntVar(0, Eprod(capE) * Bprod(capB) * Sprod(capS), "obj") - m.AddMultiplicationEquality(obj, [prodEB, prodSS]) - m.Maximize(obj) - - # ---- Phase 2: solve the product to optimality ---- - solver = cp_model.CpSolver() - solver.parameters.max_time_in_seconds = time_limit - solver.parameters.num_search_workers = num_workers - status = solver.Solve( - m, - printer.IntermediateSolutionPrinter( - {"electrum": finalE, "brass": finalB, "steel": finalS}, scale=0.01 - ), - ) - - if verbose: - print(f"(resource ceilings used: E<={capE} B<={capB} S<={capS})") - _report( - solver, - status, - cities, - N, - isH, - isF, - isM, - isMon, - col, - ua, - ub, - ud, - ow, - rH, - rF, - rM, - cvE, - cvB, - cvS, - owcvE, - owcvB, - owcvS, - mE, - mB, - mS, - mC, - owmE, - owmB, - owmS, - owmC, - hasA, - hasB, - hasD, - vE, - vB, - vS, - E, - B, - S, - C, - finalE, - finalB, - finalS, - action_registry, - governor, - agent_registry, - capital_spent, - ) - return solver, status - - -def _report( - solver, - status, - cities, - N, - isH, - isF, - isM, - isMon, - col, - ua, - ub, - ud, - ow, - rH, - rF, - rM, - cvE, - cvB, - cvS, - owcvE, - owcvB, - owcvS, - mE, - mB, - mS, - mC, - owmE, - owmB, - owmS, - owmC, - hasA, - hasB, - hasD, - vE, - vB, - vS, - E, - B, - S, - C, - finalE, - finalB, - finalS, - action_registry, - governor=None, - agent_registry=None, - capital_spent=None, -): - print("status:", solver.StatusName(status)) - if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE): - return - - typ_name = {} - for i in range(N): - for t in range(1, NUM_STEPS + 1): - if solver.Value(isH[i, t]): - typ_name[i, t] = "Hub" - elif solver.Value(isF[i, t]): - typ_name[i, t] = "Foundry" - elif solver.Value(isM[i, t]): - typ_name[i, t] = "Metro" - elif solver.Value(isMon[i, t]): - typ_name[i, t] = "Monument" - else: - typ_name[i, t] = "-" - - def action_str(i, t): - # Try each action in order until one is active - ctx = { - "typ_name": typ_name, - "cvE": cvE, - "cvB": cvB, - "cvS": cvS, - "mE": mE, - "mB": mB, - "mS": mS, - "mC": mC, - "owcvE": owcvE, - "owcvB": owcvB, - "owcvS": owcvS, - "owmE": owmE, - "owmB": owmB, - "owmS": owmS, - "owmC": owmC, - } - - if solver.Value(col.get((i, t), 0)): - if typ_name[i, t] == "Foundry": - v = ( - "E" - if solver.Value(cvE[i, t]) - else "B" - if solver.Value(cvB[i, t]) - else "S" - ) - return f"Collect vat {v}" - if typ_name[i, t] == "Metro": - picks = [] - for nm, var in (("E", mE), ("B", mB), ("S", mS), ("C", mC)): - picks += [nm] * (solver.Value(var[i, t]) // 10) - return "Collect {" + ",".join(picks) + "}" - return "Collect (+Capital)" - if solver.Value(ow.get((i, t), 0)): - if typ_name[i, t] == "Foundry": - v = ( - "E" - if solver.Value(owcvE[i, t]) - else "B" - if solver.Value(owcvB[i, t]) - else "S" - ) - return f"Overwork vat {v} (2x)" - if typ_name[i, t] == "Metro": - picks = [] - for nm, var in (("E", owmE), ("B", owmB), ("S", owmS), ("C", owmC)): - picks += [nm] * (solver.Value(var[i, t]) // 10) - return "Overwork {" + ",".join(picks) + "} (2x)" - return "Overwork (+Capital 2x)" - if solver.Value(ua.get((i, t), 0)): - return "Upgrade a (cost-reduction)" - if solver.Value(ub.get((i, t), 0)): - return "Upgrade b (collect-bonus)" - if solver.Value(ud.get((i, t), 0)): - return "Upgrade d (vat-increment)" - if solver.Value(rH.get((i, t), 0)): - return "Renovate -> Hub" - if solver.Value(rF.get((i, t), 0)): - return "Renovate -> Foundry" - if solver.Value(rM.get((i, t), 0)): - return "Renovate -> Metro" - return "(inactive)" - - print("\nPer-step pools (start of step):") - print(" step: E B S C") - for t in range(1, NUM_STEPS + 2): - label = f"after5" if t == NUM_STEPS + 1 else f"start {t}" - print( - f" {label:>8}: {solver.Value(E[t]) / 10:6.1f} {solver.Value(B[t]) / 10:6.1f} " - f"{solver.Value(S[t]) / 10:6.1f} {solver.Value(C[t]) / 10:6.1f}" - ) - - if capital_spent is not None: - print("\n capital spent per step:") - for t in range(1, NUM_STEPS + 1): - spent = sum(solver.Value(x) for x in capital_spent[t]) / 10 - print(f" step {t}: {spent:6.1f}") - - # Resolve governor assignments for display - def gov_for(i, t): - if not governor or not agent_registry: - return "-" - names = [a.name for a in agent_registry] - for name in names: - v = governor.get((i, t, name)) - if v is None: - continue - try: - if solver.Value(v): - return name - except Exception: - continue - return "-" - - print("\nActions:") - for i in range(N): - a_step, a_city = cities[i] - a_type = a_city["type"] if isinstance(a_city, dict) else a_city - print(f" City {i} (arrives step {a_step} as {a_type}):") - for t in range(a_step, NUM_STEPS + 1): - ups = "".join( - n - for n, h in (("a", hasA), ("b", hasB), ("d", hasD)) - if solver.Value(h[i, t]) - ) - extra = ( - f" vats(E{solver.Value(vE[i, t])},B{solver.Value(vB[i, t])},S{solver.Value(vS[i, t])})" - if typ_name[i, t] == "Foundry" - else "" - ) - gov = gov_for(i, t) - print( - f" step {t}: [{typ_name[i, t]:7}] {action_str(i, t):28}" - f" upg[{ups}] gov[{gov}]{extra}" - ) - - fe, fb, fs = solver.Value(finalE), solver.Value(finalB), solver.Value(finalS) - print( - f"\nFINAL E={fe / 10:.1f} B={fb / 10:.1f} S={fs / 10:.1f} " - f"product(scaled) = {fe * fb * fs / 1000} sum = {(fe + fb + fs) / 10:.1f}" - ) - - -if __name__ == "__main__": - solve() diff --git a/main.py b/main.py index a5ab35d..984d07d 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,5 @@ """ -City Resource Optimization -> CP-SAT (Google OR-Tools) +City Resource Optimization -> CP-SAT (Google OR-Tools) [ACTION REGISTRY VERSION] Maximise Electrum * Brass * Steel after the gains of step 5. @@ -18,11 +18,17 @@ import printer # ====================================================================== # Starting resource pools at the start of step 1: (Electrum, Brass, Steel, Capital) -INITIAL = (3, 3, 3, 3) +# Note: all resource values are scaled by 10 internally so that fractional +# trade-good splits (Baron/Fence et al, phase B/C) can be modeled with integers. +# Display in _report divides by 10. +INITIAL = (30, 30, 30, 30) # Arrival schedule. Key = step (1..5), value = list of city types that arrive # at the START of that step. Types: 'H' Hub, 'F' Foundry, 'M' Metropolis, 'N' Monument. # Total cities across all steps must be <= 7. Arriving cities act that same step. +# Each entry may be either a string shorthand (e.g. "H") or a dict, e.g. +# {"type": "F", "adjacent_to": [city_idx, ...]} +# `adjacent_to` is currently parsed but unused; see normalize_city(). ARRIVALS = { 1: ["H", "F", "H", "H", "N"], 2: [], @@ -31,15 +37,831 @@ ARRIVALS = { 5: [], } + +def normalize_city(c): + if isinstance(c, str): + return {"type": c, "adjacent_to": []} + return {"adjacent_to": [], **c} + + # Collect Bonus (b) adds +1 to whatever a Collect gives. On a foundry that is # +1 of the resource collected (i.e. the chosen vat's value + 1). This is the # same uniform rule as Hub (+1 Capital) and Metropolis (+1 resource pick). NUM_STEPS = 5 MAX_RES = ( - 200 # upper bound on any resource pool (raise if you expect more; affects speed) + 2000 # upper bound on any resource pool (scaled x10; raise if you expect more) ) -MAX_VAT = 15 # upper bound on a foundry vat value (1 + 2*nsteps is plenty) +MAX_VAT = 15 # upper bound on a foundry vat value (1 + 2*nsteps is plenty); unscaled + + +# Bastion counts indexed by t-1; bastions on map at step t. Reserved for +# phase B/C/D agents (no effect yet). +BASTION_COUNTS = [3, 3, 3, 3, 3] + +# Agent name -> list of steps where the agent is available. +# Empty/missing means the agent is disabled. Phase B/C/D will populate. +AGENT_AVAILABILITY = { + "capitalist": [], + "baron": [], + "prodigy": [], + "provisioner": [], + "metallurgist": [], + "builder": [], + "courier": [], + "planner": [1, 2, 3, 4, 5], + "fence": [], + "foreman": [], + "industrialist": [], + "economist": [], +} + +# ====================================================================== +# ACTION REGISTRY CONFIGURATION +# ====================================================================== + +ENABLED_ACTIONS = { + "collect": True, + "upgrade_a": True, + "upgrade_b": True, + "upgrade_d": True, + "overwork": True, + "renovate_h": True, + "renovate_f": True, + # "renovate_m": False, # currently always disabled in main.py +} + +# ====================================================================== +# ACTION REGISTRY BASE CLASS +# ====================================================================== + + +class Action: + """Base class for all actions. Subclasses implement the action's behavior.""" + + def __init__(self, name): + self.name = name + + def declare_vars(self, model, i, t, ctx): + """Declare the action's decision variable(s) for city i at step t. + + Should create and store a selector var (selector_var) in ctx['action_vars'][self.name]. + """ + pass + + def add_constraints(self, model, i, t, ctx): + """Add precondition and legality constraints for this action.""" + pass + + def add_global_constraints(self, model, t, ctx): + """Add per-step global constraints (e.g. overwork's 'at most 1 per step').""" + pass + + def contributes_gains_costs(self, model, i, t, ctx): + """Add contributions to gain_E/B/S/C and cost_C/S accumulators.""" + pass + + def transition_contrib(self, model, i, t, ctx): + """Handle state transitions (type, upgrade, vat) caused by this action.""" + pass + + def selector_var(self, i, t, ctx): + """Return the boolean variable that is 1 iff this action is chosen for (i,t). + + Used to form the "exactly one action" constraint. + """ + return ctx["action_vars"].get(self.name, {}).get((i, t), None) + + def describe(self, solver, i, t, ctx): + """Return a human-readable string describing what this action does for (i,t). + + Called during reporting if this action is the chosen one. + """ + return f"{self.name}" + + +# (VatsMixin removed — vat transitions are handled centrally in solve()) + + +# ====================================================================== +# CONCRETE ACTIONS +# ====================================================================== + + +class CollectAction(Action): + def __init__(self): + super().__init__("collect") + self.AND_cache = {} + + def declare_vars(self, model, i, t, ctx): + m = model + col = m.NewBoolVar(f"col_{i}_{t}") + cvE = m.NewBoolVar(f"cvE_{i}_{t}") + cvB = m.NewBoolVar(f"cvB_{i}_{t}") + cvS = m.NewBoolVar(f"cvS_{i}_{t}") + mE = m.NewIntVar(0, 30, f"mE_{i}_{t}") + mB = m.NewIntVar(0, 30, f"mB_{i}_{t}") + mS = m.NewIntVar(0, 30, f"mS_{i}_{t}") + mC = m.NewIntVar(0, 30, f"mC_{i}_{t}") + + ctx["action_vars"]["collect"][(i, t)] = col + ctx["col"][i, t] = col + ctx["cvE"][i, t] = cvE + ctx["cvB"][i, t] = cvB + ctx["cvS"][i, t] = cvS + ctx["mE"][i, t] = mE + ctx["mB"][i, t] = mB + ctx["mS"][i, t] = mS + ctx["mC"][i, t] = mC + + def add_constraints(self, model, i, t, ctx): + m = model + isF = ctx["isF"] + isM = ctx["isM"] + isMon = ctx["isMon"] + hasB = ctx["hasB"] + col = ctx["col"][i, t] + cvE, cvB, cvS = ctx["cvE"][i, t], ctx["cvB"][i, t], ctx["cvS"][i, t] + mE, mB, mS, mC = ( + ctx["mE"][i, t], + ctx["mB"][i, t], + ctx["mS"][i, t], + ctx["mC"][i, t], + ) + + # Monument can't collect + m.Add(col == 0).OnlyEnforceIf(isMon[i, t]) + + # Foundry: exactly one vat chosen iff foundry collects + fcol = self._AND(model, isF[i, t], col, ctx) + m.Add(cvE + cvB + cvS == fcol) + + # Metropolis: pick (2 + hasB) resources + mcol = self._AND(model, isM[i, t], col, ctx) + npick = m.NewIntVar(0, 3, "") + m.Add(npick == 2 + hasB[i, t]).OnlyEnforceIf(mcol) + m.Add(npick == 0).OnlyEnforceIf(mcol.Not()) + # Each "pick" is 10 units of the chosen resource (x10 scaling). + m.Add(mE + mB + mS + mC == npick * 10) + + def add_global_constraints(self, model, t, ctx): + pass + + def contributes_gains_costs(self, model, i, t, ctx): + m = model + isH = ctx["isH"] + isF = ctx["isF"] + isM = ctx["isM"] + hasB = ctx["hasB"] + vE, vB, vS = ctx["vE"], ctx["vB"], ctx["vS"] + col = ctx["col"][i, t] + cvE, cvB, cvS = ctx["cvE"][i, t], ctx["cvB"][i, t], ctx["cvS"][i, t] + mE, mB, mS, mC = ( + ctx["mE"][i, t], + ctx["mB"][i, t], + ctx["mS"][i, t], + ctx["mC"][i, t], + ) + + gain_E = ctx["gain_E"][t] + gain_B = ctx["gain_B"][t] + gain_S = ctx["gain_S"][t] + gain_C = ctx["gain_C"][t] + cost_C = ctx["cost_C"][t] + capital_spent = ctx["capital_spent"][t] + + # Collect sub-choices + fcol = self._AND(model, isF[i, t], col, ctx) + mcol = self._AND(model, isM[i, t], col, ctx) + hcol = self._AND(model, isH[i, t], col, ctx) + + # Costs: foundry & metropolis collect each cost 1 Capital (x10 scaled) + fcol_cost = m.NewIntVar(0, 10, "") + m.Add(fcol_cost == 10 * fcol) + cost_C.append(fcol_cost) + capital_spent.append(fcol_cost) + mcol_cost = m.NewIntVar(0, 10, "") + m.Add(mcol_cost == 10 * mcol) + cost_C.append(mcol_cost) + capital_spent.append(mcol_cost) + + # Hub collect: +2 Capital (+1 more if collect-bonus b) -> x10 + hub_gain = m.NewIntVar(0, 30, "") + m.Add(hub_gain == 10 * (2 + hasB[i, t])).OnlyEnforceIf(hcol) + m.Add(hub_gain == 0).OnlyEnforceIf(hcol.Not()) + gain_C.append(hub_gain) + + # Metropolis collect: each "pick" is already 10 units (mE/mB/mS/mC). + gain_E.append(mE) + gain_B.append(mB) + gain_S.append(mS) + gain_C.append(mC) + + # Foundry collect: gain chosen vat's value as that resource (vat unscaled), + # then scale x10 for the resource gain. + gEf = m.NewIntVar(0, ctx["max_vat"], "") + gBf = m.NewIntVar(0, ctx["max_vat"], "") + gSf = m.NewIntVar(0, ctx["max_vat"], "") + m.AddMultiplicationEquality(gEf, [cvE, vE[i, t]]) + m.AddMultiplicationEquality(gBf, [cvB, vB[i, t]]) + m.AddMultiplicationEquality(gSf, [cvS, vS[i, t]]) + gEf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "") + gBf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "") + gSf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "") + m.Add(gEf_scaled == 10 * gEf) + m.Add(gBf_scaled == 10 * gBf) + m.Add(gSf_scaled == 10 * gSf) + gain_E.append(gEf_scaled) + gain_B.append(gBf_scaled) + gain_S.append(gSf_scaled) + + # Collect Bonus (b): adds +1 to the amount a Collect gives -> +10 scaled + be = self._AND(model, cvE, hasB[i, t], ctx) + bb = self._AND(model, cvB, hasB[i, t], ctx) + bs = self._AND(model, cvS, hasB[i, t], ctx) + be10 = m.NewIntVar(0, 10, "") + bb10 = m.NewIntVar(0, 10, "") + bs10 = m.NewIntVar(0, 10, "") + m.Add(be10 == 10 * be) + m.Add(bb10 == 10 * bb) + m.Add(bs10 == 10 * bs) + gain_E.append(be10) + gain_B.append(bb10) + gain_S.append(bs10) + + def transition_contrib(self, model, i, t, ctx): + pass # Vat transitions handled centrally in solve() + + def describe(self, solver, i, t, ctx): + typ_name = ctx["typ_name"].get((i, t), "-") + cvE = ctx["cvE"].get((i, t)) + cvB = ctx["cvB"].get((i, t)) + cvS = ctx["cvS"].get((i, t)) + mE = ctx["mE"].get((i, t)) + mB = ctx["mB"].get((i, t)) + mS = ctx["mS"].get((i, t)) + mC = ctx["mC"].get((i, t)) + + if typ_name == "Foundry": + v = "E" if solver.Value(cvE) else "B" if solver.Value(cvB) else "S" + return f"Collect vat {v}" + if typ_name == "Metro": + picks = [] + for nm, var in (("E", mE), ("B", mB), ("S", mS), ("C", mC)): + picks += [nm] * (solver.Value(var) // 10) + return "Collect {" + ",".join(picks) + "}" + return "Collect (+Capital)" + + def _AND(self, model, a, b, ctx): + key = (id(a), id(b)) + if key not in self.AND_cache: + c = model.NewBoolVar("") + model.AddMultiplicationEquality(c, [a, b]) + self.AND_cache[key] = c + return self.AND_cache[key] + + +class UpgradeAction(Action): + """Base class for upgrade actions (a, b, d).""" + + def __init__(self, name, upgrade_key): + super().__init__(name) + self.upgrade_key = upgrade_key # 'A', 'B', or 'D' + + def declare_vars(self, model, i, t, ctx): + m = model + u = m.NewBoolVar(f"{self.upgrade_key.lower()}_{i}_{t}") + ctx["action_vars"][self.name][(i, t)] = u + ctx[f"u{self.upgrade_key.lower()}"][i, t] = u + + def add_constraints(self, model, i, t, ctx): + m = model + u = ctx[f"u{self.upgrade_key.lower()}"][i, t] + has_key = ctx[f"has{self.upgrade_key}"] + isMon = ctx["isMon"] + isF = ctx["isF"] + + # Can't re-acquire + m.Add(has_key[i, t] == 0).OnlyEnforceIf(u) + + # Monument can't upgrade + m.Add(u == 0).OnlyEnforceIf(isMon[i, t]) + + # D is foundry-only + if self.upgrade_key == "D": + m.Add(isF[i, t] == 1).OnlyEnforceIf(u) + + def add_global_constraints(self, model, t, ctx): + pass + + def contributes_gains_costs(self, model, i, t, ctx): + m = model + u = ctx[f"u{self.upgrade_key.lower()}"][i, t] + hasA = ctx["hasA"] + cost_S = ctx["cost_S"][t] + + # Upgrades b,d cost 2 Steel, reduced by 1 if cost-reduction (a) already held + # (x10 scaled: 20 / 10). + if self.upgrade_key in ("B", "D"): + c = m.NewIntVar(0, 20, "") + both = m.NewBoolVar("") + m.AddMultiplicationEquality(both, [u, hasA[i, t]]) + m.Add(c == 20).OnlyEnforceIf(u, hasA[i, t].Not()) + m.Add(c == 10).OnlyEnforceIf(both) + m.Add(c == 0).OnlyEnforceIf(u.Not()) + cost_S.append(c) + + def transition_contrib(self, model, i, t, ctx): + pass + + def describe(self, solver, i, t, ctx): + if self.upgrade_key == "A": + return "Upgrade a (cost-reduction)" + elif self.upgrade_key == "B": + return "Upgrade b (collect-bonus)" + elif self.upgrade_key == "D": + return "Upgrade d (vat-increment)" + return self.name + + +class OverworkAction(Action): + def __init__(self): + super().__init__("overwork") + self.AND_cache = {} + + def declare_vars(self, model, i, t, ctx): + m = model + ow = m.NewBoolVar(f"ow_{i}_{t}") + owcvE = m.NewBoolVar(f"owcvE_{i}_{t}") + owcvB = m.NewBoolVar(f"owcvB_{i}_{t}") + owcvS = m.NewBoolVar(f"owcvS_{i}_{t}") + owmE = m.NewIntVar(0, 60, f"owmE_{i}_{t}") + owmB = m.NewIntVar(0, 60, f"owmB_{i}_{t}") + owmS = m.NewIntVar(0, 60, f"owmS_{i}_{t}") + owmC = m.NewIntVar(0, 60, f"owmC_{i}_{t}") + + ctx["action_vars"]["overwork"][(i, t)] = ow + ctx["ow"][i, t] = ow + ctx["owcvE"][i, t] = owcvE + ctx["owcvB"][i, t] = owcvB + ctx["owcvS"][i, t] = owcvS + ctx["owmE"][i, t] = owmE + ctx["owmB"][i, t] = owmB + ctx["owmS"][i, t] = owmS + ctx["owmC"][i, t] = owmC + + def add_constraints(self, model, i, t, ctx): + m = model + isF = ctx["isF"] + isM = ctx["isM"] + isMon = ctx["isMon"] + hasB = ctx["hasB"] + ow = ctx["ow"][i, t] + owcvE = ctx["owcvE"][i, t] + owcvB = ctx["owcvB"][i, t] + owcvS = ctx["owcvS"][i, t] + owmE = ctx["owmE"][i, t] + owmB = ctx["owmB"][i, t] + owmS = ctx["owmS"][i, t] + owmC = ctx["owmC"][i, t] + + # Monument can't overwork + m.Add(ow == 0).OnlyEnforceIf(isMon[i, t]) + + # Foundry overwork: exactly one vat chosen iff foundry overworks + fow = self._AND(model, isF[i, t], ow, ctx) + m.Add(owcvE + owcvB + owcvS == fow) + + # Metropolis overwork: pick 2*(2 + hasB) resources + mow = self._AND(model, isM[i, t], ow, ctx) + ow_npick = m.NewIntVar(0, 6, "") + m.Add(ow_npick == 2 * (2 + hasB[i, t])).OnlyEnforceIf(mow) + m.Add(ow_npick == 0).OnlyEnforceIf(mow.Not()) + # Each "pick" is 10 units (x10 scaling). + m.Add(owmE + owmB + owmS + owmC == ow_npick * 10) + + # Overwork cooldown: city that overworked last step can't collect or overwork + if t > ctx["arrival_step"][i]: + col = ctx["col"].get((i, t)) + if col is not None: + m.Add(col == 0).OnlyEnforceIf(ctx["ow"][i, t - 1]) + m.Add(ow == 0).OnlyEnforceIf(ctx["ow"][i, t - 1]) + + # Planner gating: when planner is enabled, a city may overwork only if + # the planner agent governs it. Removes the global "at most 1" cap. + if "planner" in AGENT_AVAILABILITY: + planner_g = ctx["governor"].get((i, t, "planner")) + if planner_g is not None: + m.Add(ow == 0).OnlyEnforceIf(planner_g.Not()) + else: + m.Add(ow == 0) + + def add_global_constraints(self, model, t, ctx): + m = model + ow = ctx["ow"] + N = ctx["N"] + if "planner" not in AGENT_AVAILABILITY: + # At most 1 city overworks per step + m.Add(sum(ow[i, t] for i in range(N)) <= 1) + # else: gating handled per-city by Planner + + def contributes_gains_costs(self, model, i, t, ctx): + m = model + isH = ctx["isH"] + isF = ctx["isF"] + isM = ctx["isM"] + hasB = ctx["hasB"] + vE, vB, vS = ctx["vE"], ctx["vB"], ctx["vS"] + ow = ctx["ow"][i, t] + owcvE = ctx["owcvE"][i, t] + owcvB = ctx["owcvB"][i, t] + owcvS = ctx["owcvS"][i, t] + owmE = ctx["owmE"][i, t] + owmB = ctx["owmB"][i, t] + owmS = ctx["owmS"][i, t] + owmC = ctx["owmC"][i, t] + + gain_E = ctx["gain_E"][t] + gain_B = ctx["gain_B"][t] + gain_S = ctx["gain_S"][t] + gain_C = ctx["gain_C"][t] + + # Overwork sub-choices + fow = self._AND(model, isF[i, t], ow, ctx) + mow = self._AND(model, isM[i, t], ow, ctx) + how = self._AND(model, isH[i, t], ow, ctx) + + # Hub overwork: 2*(2 + hasB) Capital -> x10 + hub_ow_gain = m.NewIntVar(0, 60, "") + m.Add(hub_ow_gain == 20 * (2 + hasB[i, t])).OnlyEnforceIf(how) + m.Add(hub_ow_gain == 0).OnlyEnforceIf(how.Not()) + gain_C.append(hub_ow_gain) + + # Metropolis overwork: each "pick" is already 10 units (owm* vars). + gain_E.append(owmE) + gain_B.append(owmB) + gain_S.append(owmS) + gain_C.append(owmC) + + # Foundry overwork: 2 * chosen vat's value, then x10 scaling. + _owE = m.NewIntVar(0, ctx["max_vat"], "") + _owB = m.NewIntVar(0, ctx["max_vat"], "") + _owS = m.NewIntVar(0, ctx["max_vat"], "") + m.AddMultiplicationEquality(_owE, [owcvE, vE[i, t]]) + m.AddMultiplicationEquality(_owB, [owcvB, vB[i, t]]) + m.AddMultiplicationEquality(_owS, [owcvS, vS[i, t]]) + owgEf = m.NewIntVar(0, 20 * ctx["max_vat"], "") + owgBf = m.NewIntVar(0, 20 * ctx["max_vat"], "") + owgSf = m.NewIntVar(0, 20 * ctx["max_vat"], "") + m.Add(owgEf == 20 * _owE) + m.Add(owgBf == 20 * _owB) + m.Add(owgSf == 20 * _owS) + gain_E.append(owgEf) + gain_B.append(owgBf) + gain_S.append(owgSf) + + # Collect Bonus (b) for foundry overwork: 2 * (+1) of that resource -> x10 = +20 + ow_be = self._AND(model, owcvE, hasB[i, t], ctx) + ow_bb = self._AND(model, owcvB, hasB[i, t], ctx) + ow_bs = self._AND(model, owcvS, hasB[i, t], ctx) + ow_be20 = m.NewIntVar(0, 20, "") + ow_bb20 = m.NewIntVar(0, 20, "") + ow_bs20 = m.NewIntVar(0, 20, "") + m.Add(ow_be20 == 20 * ow_be) + m.Add(ow_bb20 == 20 * ow_bb) + m.Add(ow_bs20 == 20 * ow_bs) + gain_E.append(ow_be20) + gain_B.append(ow_bb20) + gain_S.append(ow_bs20) + + def transition_contrib(self, model, i, t, ctx): + pass # Vat transitions handled centrally in solve() + + def describe(self, solver, i, t, ctx): + typ_name = ctx["typ_name"].get((i, t), "-") + owcvE = ctx["owcvE"].get((i, t)) + owcvB = ctx["owcvB"].get((i, t)) + owcvS = ctx["owcvS"].get((i, t)) + owmE = ctx["owmE"].get((i, t)) + owmB = ctx["owmB"].get((i, t)) + owmS = ctx["owmS"].get((i, t)) + owmC = ctx["owmC"].get((i, t)) + + if typ_name == "Foundry": + v = "E" if solver.Value(owcvE) else "B" if solver.Value(owcvB) else "S" + return f"Overwork vat {v} (2x)" + if typ_name == "Metro": + picks = [] + for nm, var in (("E", owmE), ("B", owmB), ("S", owmS), ("C", owmC)): + picks += [nm] * (solver.Value(var) // 10) + return "Overwork {" + ",".join(picks) + "} (2x)" + return "Overwork (+Capital 2x)" + + def _AND(self, model, a, b, ctx): + key = (id(a), id(b)) + if not hasattr(self, "AND_cache"): + self.AND_cache = {} + if key not in self.AND_cache: + c = model.NewBoolVar("") + model.AddMultiplicationEquality(c, [a, b]) + self.AND_cache[key] = c + return self.AND_cache[key] + + +class RenovateAction(Action): + """Base class for renovation actions.""" + + def __init__(self, name, target_type): + super().__init__(name) + self.target_type = target_type # 'H', 'F', etc. + + def declare_vars(self, model, i, t, ctx): + m = model + r = m.NewBoolVar(f"r{self.target_type}_{i}_{t}") + ctx["action_vars"][self.name][(i, t)] = r + ctx[f"r{self.target_type}"][i, t] = r + + def add_constraints(self, model, i, t, ctx): + m = model + r = ctx[f"r{self.target_type}"][i, t] + target = ctx[f"is{self.target_type}"] + + # Renovate-to-X requires not-X now + m.Add(target[i, t] == 0).OnlyEnforceIf(r) + + def add_global_constraints(self, model, t, ctx): + pass + + def contributes_gains_costs(self, model, i, t, ctx): + pass + + def transition_contrib(self, model, i, t, ctx): + pass + + def describe(self, solver, i, t, ctx): + type_name = {"H": "Hub", "F": "Foundry", "M": "Metro"} + return f"Renovate -> {type_name.get(self.target_type, self.target_type)}" + + +# ====================================================================== +# AGENT REGISTRY BASE CLASSES +# ====================================================================== + + +class Agent: + """Governor-style agent. Assigned to a city per step via governor[i,t,name].""" + + name: str + available_steps: list # steps where this agent may be assigned + + def __init__(self, name, available_steps): + self.name = name + self.available_steps = list(available_steps) + + def declare_vars(self, model, ctx): + """One-time vars (e.g. one-shot flags). Called once before per-(i,t) loop.""" + pass + + def apply(self, model, i, t, ctx): + """Per-(city,step) effect. Gate logic on ctx['governor'][i,t,self.name].""" + pass + + +class EventAgent: + """Triggers automatically each available step. No city target. + Reads/writes ctx (typically tg_pool or gain_*).""" + + name: str + available_steps: list + + def __init__(self, name, available_steps): + self.name = name + self.available_steps = list(available_steps) + + def declare_vars(self, model, ctx): + pass + + def apply_event(self, model, t, ctx): + pass + + +# ====================================================================== +# PHASE B AGENTS +# ====================================================================== + + +class Capitalist(Agent): + """+20 Capital (scaled = +2 Capital) when governing a Collect action.""" + + def apply(self, model, i, t, ctx): + m = model + g = ctx["governor"].get((i, t, self.name)) + if g is None: + return + col = ctx["col"].get((i, t)) + if col is None: + return + both = m.NewBoolVar("") + m.AddMultiplicationEquality(both, [g, col]) + bonus = m.NewIntVar(0, 20, "") + m.Add(bonus == 20).OnlyEnforceIf(both) + m.Add(bonus == 0).OnlyEnforceIf(both.Not()) + ctx["gain_C"][t].append(bonus) + + +class Baron(Agent): + """BASTION_COUNTS[t-1]*10 trade-goods when governing a Collect action.""" + + def apply(self, model, i, t, ctx): + m = model + g = ctx["governor"].get((i, t, self.name)) + if g is None: + return + col = ctx["col"].get((i, t)) + if col is None: + return + amt = BASTION_COUNTS[t - 1] * 10 + both = m.NewBoolVar("") + m.AddMultiplicationEquality(both, [g, col]) + deposit = m.NewIntVar(0, max(amt, 0), "") + m.Add(deposit == amt).OnlyEnforceIf(both) + m.Add(deposit == 0).OnlyEnforceIf(both.Not()) + ctx["tg_pool"][t].append(deposit) + + +class Prodigy(Agent): + """+20 Steel (refund 2 Steel) when governing AND city upgrades B or D.""" + + def apply(self, model, i, t, ctx): + m = model + g = ctx["governor"].get((i, t, self.name)) + if g is None: + return + for u_var in (ctx["ub"].get((i, t)), ctx["ud"].get((i, t))): + if u_var is None: + continue + both = m.NewBoolVar("") + m.AddMultiplicationEquality(both, [g, u_var]) + refund = m.NewIntVar(0, 20, "") + m.Add(refund == 20).OnlyEnforceIf(both) + m.Add(refund == 0).OnlyEnforceIf(both.Not()) + ctx["gain_S"][t].append(refund) + + +class Provisioner(Agent): + """+15 Electrum (+1.5 scaled) when governing, unconditional.""" + + def apply(self, model, i, t, ctx): + m = model + g = ctx["governor"].get((i, t, self.name)) + if g is None: + return + bonus = m.NewIntVar(0, 15, "") + m.Add(bonus == 15).OnlyEnforceIf(g) + m.Add(bonus == 0).OnlyEnforceIf(g.Not()) + ctx["gain_E"][t].append(bonus) + + +class Metallurgist(Agent): + """Foundry-only: adds +1 to vat increment. Implemented centrally in solve(). + This class exists only for registry uniformity.""" + + def apply(self, model, i, t, ctx): + # Vat-level effect handled in solve()'s centralized vat-transition block. + pass + + +class Builder(Agent): + """One-shot: grants hasD[i,t+1]=1 when governing a Foundry city. + Fires at most once across all (i,t).""" + + def declare_vars(self, model, ctx): + ctx.setdefault("builder_fire", {}) + + def apply(self, model, i, t, ctx): + m = model + g = ctx["governor"].get((i, t, self.name)) + if g is None: + return + isF = ctx["isF"] + fire = m.NewBoolVar(f"builder_fire_{i}_{t}") + m.AddMultiplicationEquality(fire, [g, isF[i, t]]) + ctx.setdefault("builder_fire", {})[(i, t)] = fire + + +class Courier(Agent): + """One-shot globally: +30 to gain_C, gain_S, gain_B on the first step + where Courier governs anywhere.""" + + def declare_vars(self, model, ctx): + m = model + N = ctx["N"] + governor = ctx["governor"] + gain_C = ctx["gain_C"] + gain_S = ctx["gain_S"] + gain_B = ctx["gain_B"] + + courier_any = {} + courier_fired = {} + ever_fired = {} + prev_ever = None + for t in range(1, NUM_STEPS + 1): + terms = [] + for i in range(N): + v = governor.get((i, t, self.name)) + if v is not None: + terms.append(v) + any_t = m.NewBoolVar(f"courier_any_{t}") + if terms: + # any_t = OR(terms) + m.AddMaxEquality(any_t, terms) + else: + m.Add(any_t == 0) + courier_any[t] = any_t + + fired_t = m.NewBoolVar(f"courier_fired_{t}") + if prev_ever is None: + # fired_t == any_t (no prior history) + m.Add(fired_t == any_t) + else: + # fired_t = any_t AND NOT prev_ever + m.Add(fired_t <= any_t) + m.Add(fired_t <= 1 - prev_ever) + m.Add(fired_t >= any_t - prev_ever) + courier_fired[t] = fired_t + + ever_t = m.NewBoolVar(f"courier_ever_{t}") + if prev_ever is None: + m.Add(ever_t == fired_t) + else: + m.AddMaxEquality(ever_t, [prev_ever, fired_t]) + ever_fired[t] = ever_t + prev_ever = ever_t + + # Add +30 contribution to gain_C/S/B + for accum in (gain_C[t], gain_S[t], gain_B[t]): + bonus = m.NewIntVar(0, 30, "") + m.Add(bonus == 30).OnlyEnforceIf(fired_t) + m.Add(bonus == 0).OnlyEnforceIf(fired_t.Not()) + accum.append(bonus) + + ctx["courier_any"] = courier_any + ctx["courier_fired"] = courier_fired + + def apply(self, model, i, t, ctx): + pass + + +# ====================================================================== +# PHASE C AGENTS +# ====================================================================== + + +class Planner(Agent): + """Effect realized via OverworkAction (per-city gating). No-op here.""" + + def apply(self, model, i, t, ctx): + pass + + +class Foreman(Agent): + """TODO: Restructure ability — allows other industry actions during renovation. + The current model uses 'exactly one action per city per step', so renovation + already blocks no specific action separately. Foreman becomes meaningful only + once renovation-blocks-action semantics are modeled. + """ + + def apply(self, model, i, t, ctx): + pass + + +class Industrialist(Agent): + """TODO: Network ability — grants Infrastructure to governed city and its + adjacent cities when appointed governor. Requires the city's adjacency list + (city_dict['adjacent_to']) to be populated and an Infrastructure mechanic + in the model. Neither is implemented yet. + """ + + def apply(self, model, i, t, ctx): + pass + + +class Economist(Agent): + """TODO: Velocity of Money — gain 1 Renown per 10 Capital collected and spent + on Collection. Renown is not in the objective (current objective is E²·B·S²). + capital_spent tracking exists (ctx['capital_spent']) but renown accumulator + and objective integration do not. + """ + + def apply(self, model, i, t, ctx): + pass + + +class Fence(EventAgent): + """Each available step, deposits +20 (=2 trade goods x10) into tg_pool[t].""" + + def apply_event(self, model, t, ctx): + ctx["tg_pool"][t].append(model.NewConstant(20)) # ====================================================================== @@ -58,10 +880,12 @@ def solve( ): # ---- build the city list ----------------------------------------- - cities = [] # list of (arrival_step, arrival_type) + # cities: list[tuple[int, dict]] where the dict has keys "type" and + # "adjacent_to". TODO: adjacency unused; for Industrialist agent. + cities = [] for s in range(1, NUM_STEPS + 1): - for typ in arrivals.get(s, []): - cities.append((s, typ)) + for entry in arrivals.get(s, []): + cities.append((s, normalize_city(entry))) N = len(cities) m = cp_model.CpModel() @@ -73,22 +897,22 @@ def solve( return c # ---- state variables --------------------------------------------- - # Indexed [i][t]; t in 1..NUM_STEPS+1 for state (t=NUM_STEPS+1 == "after step 5"). isH, isF, isM, isMon, present = {}, {}, {}, {}, {} hasA, hasB, hasD = {}, {}, {} vE, vB, vS = {}, {}, {} # foundry vats at start of step t - # action variables, t in 1..NUM_STEPS - col, ua, ub, ud = {}, {}, {}, {} # collect / upgrade a,b,d - ow = {} # overwork (global limit: 1 per step) - rH, rF, rM = {}, {}, {} # renovate -> Hub/Foundry/Metro - cvE, cvB, cvS = {}, {}, {} # foundry: which vat collected (normal collect) - owcvE, owcvB, owcvS = {}, {}, {} # foundry: which vat collected (overwork) - mE, mB, mS, mC = {}, {}, {}, {} # metropolis: resources picked (+1 each) - owmE, owmB, owmS, owmC = {}, {}, {}, {} # metropolis: resources picked (overwork) + # action variables (sparse, only created for enabled actions) + col, ua, ub, ud = {}, {}, {}, {} + ow = {} + rH, rF, rM = {}, {}, {} + cvE, cvB, cvS = {}, {}, {} + owcvE, owcvB, owcvS = {}, {}, {} + mE, mB, mS, mC = {}, {}, {}, {} + owmE, owmB, owmS, owmC = {}, {}, {}, {} for i in range(N): - a_step, a_type = cities[i] + a_step, a_city = cities[i] + a_type = a_city["type"] for t in range(1, NUM_STEPS + 2): isH[i, t] = m.NewBoolVar(f"isH_{i}_{t}") isF[i, t] = m.NewBoolVar(f"isF_{i}_{t}") @@ -107,41 +931,122 @@ def solve( # exactly one type iff present m.Add(isH[i, t] + isF[i, t] + isM[i, t] + isMon[i, t] == present[i, t]) - for t in range(1, NUM_STEPS + 1): - col[i, t] = m.NewBoolVar(f"col_{i}_{t}") - ua[i, t] = m.NewBoolVar(f"ua_{i}_{t}") - ub[i, t] = m.NewBoolVar(f"ub_{i}_{t}") - ud[i, t] = m.NewBoolVar(f"ud_{i}_{t}") - ow[i, t] = m.NewBoolVar(f"ow_{i}_{t}") - rH[i, t] = m.NewBoolVar(f"rH_{i}_{t}") - rF[i, t] = m.NewBoolVar(f"rF_{i}_{t}") - rM[i, t] = m.NewBoolVar(f"rM_{i}_{t}") - cvE[i, t] = m.NewBoolVar(f"cvE_{i}_{t}") - cvB[i, t] = m.NewBoolVar(f"cvB_{i}_{t}") - cvS[i, t] = m.NewBoolVar(f"cvS_{i}_{t}") - owcvE[i, t] = m.NewBoolVar(f"owcvE_{i}_{t}") - owcvB[i, t] = m.NewBoolVar(f"owcvB_{i}_{t}") - owcvS[i, t] = m.NewBoolVar(f"owcvS_{i}_{t}") - mE[i, t] = m.NewIntVar(0, 3, f"mE_{i}_{t}") - mB[i, t] = m.NewIntVar(0, 3, f"mB_{i}_{t}") - mS[i, t] = m.NewIntVar(0, 3, f"mS_{i}_{t}") - mC[i, t] = m.NewIntVar(0, 3, f"mC_{i}_{t}") - owmE[i, t] = m.NewIntVar(0, 6, f"owmE_{i}_{t}") - owmB[i, t] = m.NewIntVar(0, 6, f"owmB_{i}_{t}") - owmS[i, t] = m.NewIntVar(0, 6, f"owmS_{i}_{t}") - owmC[i, t] = m.NewIntVar(0, 6, f"owmC_{i}_{t}") - # ---- per-step gain/cost accumulators (linear expressions) --------- gain_E = {t: [] for t in range(1, NUM_STEPS + 1)} gain_B = {t: [] for t in range(1, NUM_STEPS + 1)} gain_S = {t: [] for t in range(1, NUM_STEPS + 1)} gain_C = {t: [] for t in range(1, NUM_STEPS + 1)} - cost_C = {t: [] for t in range(1, NUM_STEPS + 1)} # capital cost (collects) - cost_S = {t: [] for t in range(1, NUM_STEPS + 1)} # steel cost (upgrades b,d) + cost_C = {t: [] for t in range(1, NUM_STEPS + 1)} + cost_S = {t: [] for t in range(1, NUM_STEPS + 1)} + + # Trade-goods pool (per step): new generic-trade-good sources (Baron, Fence, + # phase B/C) deposit IntVars here; centralized split logic routes the pool + # into gain_E/B/S/C below. Metro picks are NOT routed through here. + tg_pool = {t: [] for t in range(1, NUM_STEPS + 1)} + + # Per-step capital spent tracker (mirrors entries appended to cost_C). + capital_spent = {t: [] for t in range(1, NUM_STEPS + 1)} + + # ---- Agent Registry (populated by phase B/C/D; empty in phase A) ---- + agent_registry = [] + event_agent_registry = [] + + # ---- Action Registry ---- + action_registry = [] + if ENABLED_ACTIONS.get("collect", True): + action_registry.append(CollectAction()) + if ENABLED_ACTIONS.get("upgrade_a", True): + action_registry.append(UpgradeAction("upgrade_a", "A")) + if ENABLED_ACTIONS.get("upgrade_b", True): + action_registry.append(UpgradeAction("upgrade_b", "B")) + if ENABLED_ACTIONS.get("upgrade_d", True): + action_registry.append(UpgradeAction("upgrade_d", "D")) + if ENABLED_ACTIONS.get("overwork", True): + action_registry.append(OverworkAction()) + if ENABLED_ACTIONS.get("renovate_h", True): + action_registry.append(RenovateAction("renovate_h", "H")) + if ENABLED_ACTIONS.get("renovate_f", True): + action_registry.append(RenovateAction("renovate_f", "F")) + + # ---- Governor variable matrix ---- + # governor[(i, t, name)] = BoolVar (or constant 0) for whether agent `name` + # governs city i at step t. Created only when agent is available at step t + # AND the city has arrived; otherwise NewConstant(0). + governor = {} + arrival_step_map = {i: cities[i][0] for i in range(N)} + for name, avail_steps in AGENT_AVAILABILITY.items(): + avail_set = set(avail_steps) + for i in range(N): + for t in range(1, NUM_STEPS + 1): + if t in avail_set and t >= arrival_step_map[i]: + governor[i, t, name] = m.NewBoolVar(f"gov_{name}_{i}_{t}") + else: + governor[i, t, name] = m.NewConstant(0) + + # At each (i, t): at most one agent governs city i. + for i in range(N): + for t in range(1, NUM_STEPS + 1): + terms = [governor[i, t, n] for n in AGENT_AVAILABILITY] + if terms: + m.Add(sum(terms) <= 1) + + # At each (name, t): at most one city has agent `name` assigned. + for name in AGENT_AVAILABILITY: + for t in range(1, NUM_STEPS + 1): + m.Add(sum(governor[i, t, name] for i in range(N)) <= 1) + + # Phase B agent registration + agent_classes = { + "capitalist": Capitalist, + "baron": Baron, + "prodigy": Prodigy, + "provisioner": Provisioner, + "metallurgist": Metallurgist, + "builder": Builder, + "courier": Courier, + "planner": Planner, + "foreman": Foreman, + "industrialist": Industrialist, + "economist": Economist, + } + for name, cls in agent_classes.items(): + if name in AGENT_AVAILABILITY: + agent_registry.append(cls(name, AGENT_AVAILABILITY[name])) + + event_agent_classes = { + "fence": Fence, + } + for name, cls in event_agent_classes.items(): + if name in AGENT_AVAILABILITY: + event_agent_registry.append(cls(name, AGENT_AVAILABILITY[name])) + + # Shared state populated by agent declare_vars (e.g., Builder's fire map). + builder_fire_map = {} + + # One-shot agent declarations. + _agent_decl_ctx = { + "model": m, + "N": N, + "governor": governor, + "tg_pool": tg_pool, + "capital_spent": capital_spent, + "gain_E": gain_E, + "gain_B": gain_B, + "gain_S": gain_S, + "gain_C": gain_C, + "cost_C": cost_C, + "cost_S": cost_S, + "builder_fire": builder_fire_map, + } + for agent in agent_registry: + agent.declare_vars(m, _agent_decl_ctx) + for agent in event_agent_registry: + agent.declare_vars(m, _agent_decl_ctx) # ---- per-city logic ----------------------------------------------- for i in range(N): - a_step, a_type = cities[i] + a_step, a_city = cities[i] + a_type = a_city["type"] # initial type at arrival step init = {"H": isH, "F": isF, "M": isM, "N": isMon} @@ -159,250 +1064,204 @@ def solve( for t in range(1, a_step): for v in (isH, isF, isM, isMon, hasA, hasB, hasD, vE, vB, vS): m.Add(v[i, t] == 0) - if t <= NUM_STEPS: - for v in ( - col, - ua, - ub, - ud, - ow, - rH, - rF, - rM, - cvE, - cvB, - cvS, - owcvE, - owcvB, - owcvS, - mE, - mB, - mS, - mC, - owmE, - owmB, - owmS, - owmC, - ): - m.Add(v[i, t] == 0) # action + transition logic for active steps for t in range(a_step, NUM_STEPS + 1): P = present[i, t] - ren = m.NewBoolVar("") # any renovation this step - m.Add(ren == rH[i, t] + rF[i, t] + rM[i, t]) + + # Build context for action methods + ctx = { + "model": m, + "i": i, + "t": t, + "N": N, + "max_vat": max_vat, + "action_vars": {act.name: {} for act in action_registry}, + "arrival_step": {i: a_step for i in range(N)}, + "isH": isH, + "isF": isF, + "isM": isM, + "isMon": isMon, + "present": present, + "hasA": hasA, + "hasB": hasB, + "hasD": hasD, + "vE": vE, + "vB": vB, + "vS": vS, + "col": col, + "ua": ua, + "ub": ub, + "ud": ud, + "ow": ow, + "rH": rH, + "rF": rF, + "rM": rM, + "cvE": cvE, + "cvB": cvB, + "cvS": cvS, + "owcvE": owcvE, + "owcvB": owcvB, + "owcvS": owcvS, + "mE": mE, + "mB": mB, + "mS": mS, + "mC": mC, + "owmE": owmE, + "owmB": owmB, + "owmS": owmS, + "owmC": owmC, + "gain_E": gain_E, + "gain_B": gain_B, + "gain_S": gain_S, + "gain_C": gain_C, + "cost_C": cost_C, + "cost_S": cost_S, + "tg_pool": tg_pool, + "capital_spent": capital_spent, + "governor": governor, + "renovations": {}, + "vat_next": {}, + "builder_fire": builder_fire_map, + } + + # Declare all action variables + for action in action_registry: + action.declare_vars(m, i, t, ctx) + + # renovations helper (used by multiple actions) + ren = m.NewBoolVar("") + rH_act = rH.get((i, t)) + rF_act = rF.get((i, t)) + rM_act = rM.get((i, t)) + ren_sum = sum(x for x in [rH_act, rF_act, rM_act] if x is not None) + m.Add(ren == ren_sum) + ctx["renovations"][i, t] = ren no_ren = m.NewBoolVar("") m.Add(no_ren == 1 - ren) - # exactly one action while present - m.Add( - col[i, t] - + ua[i, t] - + ub[i, t] - + ud[i, t] - + ow[i, t] - + rH[i, t] - + rF[i, t] - + rM[i, t] - == P - ) - - # renovation must change type (renovate-to-X requires not-X now) - m.Add(isH[i, t] == 0).OnlyEnforceIf(rH[i, t]) - m.Add(isF[i, t] == 0).OnlyEnforceIf(rF[i, t]) - m.Add(isM[i, t] == 0).OnlyEnforceIf(rM[i, t]) - - # upgrade legality - m.Add(hasA[i, t] == 0).OnlyEnforceIf(ua[i, t]) # can't re-acquire - m.Add(hasB[i, t] == 0).OnlyEnforceIf(ub[i, t]) - m.Add(hasD[i, t] == 0).OnlyEnforceIf(ud[i, t]) - m.Add(isF[i, t] == 1).OnlyEnforceIf(ud[i, t]) # d is foundry-only + # Add all constraints + for action in action_registry: + action.add_constraints(m, i, t, ctx) # LLM allowed renovation into metropolis, need to prevent that now - m.Add(rM[i, t] == 0) + if rM.get((i, t)) is not None: + m.Add(rM[i, t] == 0) - # Monument constraints due to maximizing resources and not wanting enemy to get free upgrades - m.Add(col[i, t] == 0).OnlyEnforceIf(isMon[i, t]) - m.Add(ua[i, t] == 0).OnlyEnforceIf(isMon[i, t]) - m.Add(ub[i, t] == 0).OnlyEnforceIf(isMon[i, t]) - m.Add(ud[i, t] == 0).OnlyEnforceIf(isMon[i, t]) - m.Add(ow[i, t] == 0).OnlyEnforceIf(isMon[i, t]) + # Renovation must change type (renovate-to-X requires not-X now) + for r, target in [ + (rH.get((i, t)), isH), + (rF.get((i, t)), isF), + (rM.get((i, t)), isM), + ]: + if r is not None: + m.Add(target[i, t] == 0).OnlyEnforceIf(r) - # overwork cooldown: city that overworked last step can't collect or overwork - if t > a_step: - m.Add(col[i, t] == 0).OnlyEnforceIf(ow[i, t - 1]) - m.Add(ow[i, t] == 0).OnlyEnforceIf(ow[i, t - 1]) + # Exactly one action while present (sum all action selectors) + action_sum = sum( + action.selector_var(i, t, ctx) + for action in action_registry + if action.selector_var(i, t, ctx) is not None + ) + m.Add(action_sum == P) - # ---- type transition t -> t+1 ---- + # Agent per-(city, step) hooks - run BEFORE upgrade transitions so + # Builder can register its fire var that feeds into d_keep. + for agent in agent_registry: + agent.apply(m, i, t, ctx) + + # Upgrade transitions + m.AddMaxEquality( + hasA[i, t + 1], [hasA[i, t], ua.get((i, t), m.NewConstant(0))] + ) + m.AddMaxEquality( + hasB[i, t + 1], [hasB[i, t], ub.get((i, t), m.NewConstant(0))] + ) + d_keep = m.NewBoolVar("") + d_sources = [hasD[i, t], ud.get((i, t), m.NewConstant(0))] + _bfire = builder_fire_map.get((i, t)) + if _bfire is not None: + d_sources.append(_bfire) + m.AddMaxEquality(d_keep, d_sources) + m.Add(hasD[i, t + 1] == 0).OnlyEnforceIf(ren) + m.Add(hasD[i, t + 1] == d_keep).OnlyEnforceIf(no_ren) + + # Gain/cost contributions + for action in action_registry: + action.contributes_gains_costs(m, i, t, ctx) + + # Transition contributions (including vats) + for action in action_registry: + action.transition_contrib(m, i, t, ctx) + + # Type transition t -> t+1 m.Add(isH[i, t + 1] == isH[i, t]).OnlyEnforceIf(no_ren) m.Add(isF[i, t + 1] == isF[i, t]).OnlyEnforceIf(no_ren) m.Add(isM[i, t + 1] == isM[i, t]).OnlyEnforceIf(no_ren) - for r, target in ((rH[i, t], isH), (rF[i, t], isF), (rM[i, t], isM)): - m.Add(target[i, t + 1] == 1).OnlyEnforceIf(r) - m.Add(isF[i, t + 1] == 0).OnlyEnforceIf(rH[i, t]) - m.Add(isM[i, t + 1] == 0).OnlyEnforceIf(rH[i, t]) - m.Add(isH[i, t + 1] == 0).OnlyEnforceIf(rF[i, t]) - m.Add(isM[i, t + 1] == 0).OnlyEnforceIf(rF[i, t]) - m.Add(isH[i, t + 1] == 0).OnlyEnforceIf(rM[i, t]) - m.Add(isF[i, t + 1] == 0).OnlyEnforceIf(rM[i, t]) + for r, target in [ + (rH.get((i, t)), isH), + (rF.get((i, t)), isF), + (rM.get((i, t)), isM), + ]: + if r is not None: + m.Add(target[i, t + 1] == 1).OnlyEnforceIf(r) - # ---- upgrade transition t -> t+1 ---- - # a, b survive renovation and are monotone - m.AddMaxEquality(hasA[i, t + 1], [hasA[i, t], ua[i, t]]) - m.AddMaxEquality(hasB[i, t + 1], [hasB[i, t], ub[i, t]]) - # d is stripped on renovation, else monotone - m.Add(hasD[i, t + 1] == 0).OnlyEnforceIf(ren) - d_keep = m.NewBoolVar("") - m.AddMaxEquality(d_keep, [hasD[i, t], ud[i, t]]) - m.Add(hasD[i, t + 1] == d_keep).OnlyEnforceIf(no_ren) + for r1, t1, t2 in [ + (rH.get((i, t)), isF, isM), + (rF.get((i, t)), isH, isM), + (rM.get((i, t)), isH, isF), + ]: + if r1 is not None: + m.Add(t1[i, t + 1] == 0).OnlyEnforceIf(r1) + m.Add(t2[i, t + 1] == 0).OnlyEnforceIf(r1) - # ---- collect sub-choices ---- - fcol = AND(isF[i, t], col[i, t]) - mcol = AND(isM[i, t], col[i, t]) - hcol = AND(isH[i, t], col[i, t]) - # foundry: exactly one vat chosen iff foundry collects - m.Add(cvE[i, t] + cvB[i, t] + cvS[i, t] == fcol) - # metropolis: pick (2 + hasB) resources (+1 each); else nothing - npick = m.NewIntVar(0, 3, "") - m.Add(npick == 2 + hasB[i, t]).OnlyEnforceIf(mcol) - m.Add(npick == 0).OnlyEnforceIf(mcol.Not()) - m.Add(mE[i, t] + mB[i, t] + mS[i, t] + mC[i, t] == npick) + # ---- Centralized vat transitions (mirrors main.py exactly) ---- + # Collect/overwork vat selectors (None if action disabled) + _cvE = cvE.get((i, t)) + _cvB = cvB.get((i, t)) + _cvS = cvS.get((i, t)) + _owcvE = owcvE.get((i, t)) + _owcvB = owcvB.get((i, t)) + _owcvS = owcvS.get((i, t)) - # ================= COSTS ================= - # foundry & metropolis collect each cost 1 Capital - cost_C[t].append(fcol) - cost_C[t].append(mcol) - # upgrades b,d cost 2 Steel, reduced by 1 if cost-reduction (a) already held - for upg in (ub[i, t], ud[i, t]): - c = m.NewIntVar(0, 2, "") - both = AND(upg, hasA[i, t]) - m.Add(c == 2).OnlyEnforceIf(upg, hasA[i, t].Not()) - m.Add(c == 1).OnlyEnforceIf(both) - m.Add(c == 0).OnlyEnforceIf(upg.Not()) - cost_S[t].append(c) + inc = m.NewIntVar(1, 3, "") + metallurgist_g = governor.get((i, t, "metallurgist")) + if metallurgist_g is not None and "metallurgist" in AGENT_AVAILABILITY: + metal_active = m.NewBoolVar("") + m.AddMultiplicationEquality(metal_active, [metallurgist_g, isF[i, t]]) + m.Add(inc == 1 + hasD[i, t] + metal_active) + else: + m.Add(inc == 1 + hasD[i, t]) - # ================= GAINS ================= - # Hub collect: +2 Capital (+1 more if collect-bonus b) - hub_gain = m.NewIntVar(0, 3, "") - m.Add(hub_gain == 2 + hasB[i, t]).OnlyEnforceIf(hcol) - m.Add(hub_gain == 0).OnlyEnforceIf(hcol.Not()) - gain_C[t].append(hub_gain) - - # Metropolis collect: +1 per pick - gain_E[t].append(mE[i, t]) - gain_B[t].append(mB[i, t]) - gain_S[t].append(mS[i, t]) - gain_C[t].append(mC[i, t]) - - # Foundry collect: gain chosen vat's value as that resource - gEf = m.NewIntVar(0, max_vat, "") - gBf = m.NewIntVar(0, max_vat, "") - gSf = m.NewIntVar(0, max_vat, "") - m.AddMultiplicationEquality(gEf, [cvE[i, t], vE[i, t]]) - m.AddMultiplicationEquality(gBf, [cvB[i, t], vB[i, t]]) - m.AddMultiplicationEquality(gSf, [cvS[i, t], vS[i, t]]) - gain_E[t].append(gEf) - gain_B[t].append(gBf) - gain_S[t].append(gSf) - - # Collect Bonus (b): adds +1 to the amount a Collect gives. For a - # foundry that means +1 of the collected resource (vat value + 1), - # the same uniform "+1 to what Collect gives" rule as Hub/Metro. - gain_E[t].append(AND(cvE[i, t], hasB[i, t])) - gain_B[t].append(AND(cvB[i, t], hasB[i, t])) - gain_S[t].append(AND(cvS[i, t], hasB[i, t])) - - # ---- overwork sub-choices (double-collect; no Capital cost) ---- - fow = AND(isF[i, t], ow[i, t]) - mow = AND(isM[i, t], ow[i, t]) - how = AND(isH[i, t], ow[i, t]) - - # foundry overwork: exactly one vat chosen iff foundry overworks - m.Add(owcvE[i, t] + owcvB[i, t] + owcvS[i, t] == fow) - - # metropolis overwork: pick 2*(2 + hasB) resources; else nothing - ow_npick = m.NewIntVar(0, 6, "") - m.Add(ow_npick == 2 * (2 + hasB[i, t])).OnlyEnforceIf(mow) - m.Add(ow_npick == 0).OnlyEnforceIf(mow.Not()) - m.Add(owmE[i, t] + owmB[i, t] + owmS[i, t] + owmC[i, t] == ow_npick) - - # ================= OVERWORK GAINS (2x normal collect) ================= - # Hub overwork: 2*(2 + hasB) Capital - hub_ow_gain = m.NewIntVar(0, 6, "") - m.Add(hub_ow_gain == 2 * (2 + hasB[i, t])).OnlyEnforceIf(how) - m.Add(hub_ow_gain == 0).OnlyEnforceIf(how.Not()) - gain_C[t].append(hub_ow_gain) - - # Metropolis overwork: +1 per pick (picks are already doubled via ow_npick) - gain_E[t].append(owmE[i, t]) - gain_B[t].append(owmB[i, t]) - gain_S[t].append(owmS[i, t]) - gain_C[t].append(owmC[i, t]) - - # Foundry overwork: 2 * chosen vat's value - owgEf = m.NewIntVar(0, 2 * max_vat, "") - owgBf = m.NewIntVar(0, 2 * max_vat, "") - owgSf = m.NewIntVar(0, 2 * max_vat, "") - _owE = m.NewIntVar(0, max_vat, "") - _owB = m.NewIntVar(0, max_vat, "") - _owS = m.NewIntVar(0, max_vat, "") - m.AddMultiplicationEquality(_owE, [owcvE[i, t], vE[i, t]]) - m.AddMultiplicationEquality(_owB, [owcvB[i, t], vB[i, t]]) - m.AddMultiplicationEquality(_owS, [owcvS[i, t], vS[i, t]]) - m.Add(owgEf == 2 * _owE) - m.Add(owgBf == 2 * _owB) - m.Add(owgSf == 2 * _owS) - gain_E[t].append(owgEf) - gain_B[t].append(owgBf) - gain_S[t].append(owgSf) - - # Collect Bonus (b) for foundry overwork: 2 * (+1) = +2 of that resource - ow_be = AND(owcvE[i, t], hasB[i, t]) - ow_bb = AND(owcvB[i, t], hasB[i, t]) - ow_bs = AND(owcvS[i, t], hasB[i, t]) - gain_E[t].append(ow_be) - gain_E[t].append(ow_be) # added twice == *2 - gain_B[t].append(ow_bb) - gain_B[t].append(ow_bb) - gain_S[t].append(ow_bs) - gain_S[t].append(ow_bs) - - # ---- vat update producing vat[i, t+1] ---- - # increment added to the two non-collected vats (1, or 2 with upgrade d) - inc = m.NewIntVar(1, 2, "") - m.Add(inc == 1 + hasD[i, t]) - - # vat_next = result of this step's action (only meaningful if foundry) vEn = m.NewIntVar(0, max_vat, "") vBn = m.NewIntVar(0, max_vat, "") vSn = m.NewIntVar(0, max_vat, "") - # collect E: E->0, B,S += inc - m.Add(vEn == 0).OnlyEnforceIf(cvE[i, t]) - m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(cvE[i, t]) - m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(cvE[i, t]) - # collect B - m.Add(vBn == 0).OnlyEnforceIf(cvB[i, t]) - m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(cvB[i, t]) - m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(cvB[i, t]) - # collect S - m.Add(vSn == 0).OnlyEnforceIf(cvS[i, t]) - m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(cvS[i, t]) - m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(cvS[i, t]) - # overwork vat transitions: same reset/increment as normal collect - m.Add(vEn == 0).OnlyEnforceIf(owcvE[i, t]) - m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(owcvE[i, t]) - m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(owcvE[i, t]) - m.Add(vBn == 0).OnlyEnforceIf(owcvB[i, t]) - m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(owcvB[i, t]) - m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(owcvB[i, t]) - m.Add(vSn == 0).OnlyEnforceIf(owcvS[i, t]) - m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(owcvS[i, t]) - m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(owcvS[i, t]) + + # For each vat selector (collect E, collect B, collect S, overwork E, overwork B, overwork S) + # apply the same reset/increment pattern from main.py + for sel_E, sel_B, sel_S in [ + (_cvE, _cvB, _cvS), + (_owcvE, _owcvB, _owcvS), + ]: + if sel_E is not None: + m.Add(vEn == 0).OnlyEnforceIf(sel_E) + m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_E) + m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_E) + if sel_B is not None: + m.Add(vBn == 0).OnlyEnforceIf(sel_B) + m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_B) + m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_B) + if sel_S is not None: + m.Add(vSn == 0).OnlyEnforceIf(sel_S) + m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_S) + m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_S) # foundry but neither collecting nor overworking: vats unchanged # f_noncollect_noow = isF AND NOT fcol AND NOT fow = isF - fcol - fow + _fcol_count = sum(s for s in [_cvE, _cvB, _cvS] if s is not None) + _fow_count = sum(s for s in [_owcvE, _owcvB, _owcvS] if s is not None) f_noncollect_noow = m.NewBoolVar("") - m.Add(f_noncollect_noow == isF[i, t] - fcol - fow) + m.Add(f_noncollect_noow == isF[i, t] - _fcol_count - _fow_count) m.Add(vEn == vE[i, t]).OnlyEnforceIf(f_noncollect_noow) m.Add(vBn == vB[i, t]).OnlyEnforceIf(f_noncollect_noow) m.Add(vSn == vS[i, t]).OnlyEnforceIf(f_noncollect_noow) @@ -411,25 +1270,69 @@ def solve( # renovate-to-foundry -> reset to 1 # continuing foundry -> vat_next # otherwise (not foundry next) -> 0 - cont_F = AND(isF[i, t], no_ren) # stays a foundry next step + cont_F = AND(isF[i, t], no_ren) for vnext, vn in ( (vE[i, t + 1], vEn), (vB[i, t + 1], vBn), (vS[i, t + 1], vSn), ): - m.Add(vnext == 1).OnlyEnforceIf(rF[i, t]) + if rF.get((i, t)) is not None: + m.Add(vnext == 1).OnlyEnforceIf(rF[i, t]) m.Add(vnext == vn).OnlyEnforceIf(cont_F) - m.Add(isF[i, t + 1] == 0).OnlyEnforceIf( - rF[i, t].Not(), cont_F.Not() - ) # tautology guard not_F_next = isF[i, t + 1].Not() m.Add(vE[i, t + 1] == 0).OnlyEnforceIf(not_F_next) m.Add(vB[i, t + 1] == 0).OnlyEnforceIf(not_F_next) m.Add(vS[i, t + 1] == 0).OnlyEnforceIf(not_F_next) - # ---- global overwork limit: at most 1 city overworks per step ---- + # ---- Builder one-shot: at most one fire across all (i, t) ---- + if builder_fire_map: + m.Add(sum(builder_fire_map.values()) <= 1) + + # ---- global constraints (per action type, per step) ---- for t in range(1, NUM_STEPS + 1): - m.Add(sum(ow[i, t] for i in range(N)) <= 1) + ctx_global = {"N": N, "ow": ow, "t": t} + for action in action_registry: + action.add_global_constraints(m, t, ctx_global) + + # ---- event-agent per-step hooks (registry empty in phase A) ---- + _event_ctx = { + "model": m, + "N": N, + "governor": governor, + "tg_pool": tg_pool, + "capital_spent": capital_spent, + "gain_E": gain_E, + "gain_B": gain_B, + "gain_S": gain_S, + "gain_C": gain_C, + "cost_C": cost_C, + "cost_S": cost_S, + } + for t in range(1, NUM_STEPS + 1): + for agent in event_agent_registry: + if t in agent.available_steps: + agent.apply_event(m, t, _event_ctx) + + # ---- trade-goods pool split (routes pool[t] -> gain_E/B/S/C) ---- + # Generic-trade-good sources (Baron, Fence in later phases) deposit + # IntVars into tg_pool[t]; here we declare a 4-way split into the + # resource gain accumulators. If the pool is empty for step t, the split + # vars are constrained to 0. + for t in range(1, NUM_STEPS + 1): + pool_total = m.NewIntVar(0, max_res, f"tg_total_{t}") + if tg_pool[t]: + m.Add(pool_total == sum(tg_pool[t])) + else: + m.Add(pool_total == 0) + tg_E = m.NewIntVar(0, max_res, f"tg_E_{t}") + tg_B = m.NewIntVar(0, max_res, f"tg_B_{t}") + tg_S = m.NewIntVar(0, max_res, f"tg_S_{t}") + tg_C = m.NewIntVar(0, max_res, f"tg_C_{t}") + m.Add(tg_E + tg_B + tg_S + tg_C == pool_total) + gain_E[t].append(tg_E) + gain_B[t].append(tg_B) + gain_S[t].append(tg_S) + gain_C[t].append(tg_C) # ---- resource pool recursion -------------------------------------- E = {1: m.NewIntVar(initial[0], initial[0], "E1")} @@ -441,10 +1344,8 @@ def solve( B[t + 1] = m.NewIntVar(0, max_res, f"B_{t + 1}") S[t + 1] = m.NewIntVar(0, max_res, f"S_{t + 1}") C[t + 1] = m.NewIntVar(0, max_res, f"C_{t + 1}") - # costs are paid from the START-of-step pool (must work out before gains) m.Add(C[t] - sum(cost_C[t]) >= 0) m.Add(S[t] - sum(cost_S[t]) >= 0) - # next pool = start - costs + gains m.Add(E[t + 1] == E[t] + sum(gain_E[t])) m.Add(B[t + 1] == B[t] + sum(gain_B[t])) m.Add(S[t + 1] == S[t] - sum(cost_S[t]) + sum(gain_S[t])) @@ -452,13 +1353,7 @@ def solve( finalE, finalB, finalS = E[NUM_STEPS + 1], B[NUM_STEPS + 1], S[NUM_STEPS + 1] - # ---- Phase 1: real per-resource ceilings (fast LINEAR maximisations) ---- - # The triple product E*B*S has a very loose relaxation, so proving optimality - # directly is slow. We first find the true max each resource can reach on its - # own (a linear objective, solved to optimality quickly), then clamp the final - # pools to those ceilings. This tightens the product's propagation enough to - # prove optimality, and is valid because no feasible solution can exceed an - # individual resource's standalone maximum. + # ---- Phase 1: real per-resource ceilings ---- def _ceiling(var): s = cp_model.CpSolver() s.parameters.max_time_in_seconds = 20.0 @@ -479,21 +1374,8 @@ def solve( m.Add(finalS <= capS) # ====================================================================== - # OBJECTIVE IS SET HERE -- maximise Electrum * Brass * Steel (post step 5) - # To change the objective, edit the three "final" pools and/or the product - # below. (finalE/finalB/finalS are the pools after step 5's gains.) + # OBJECTIVE IS SET HERE # ====================================================================== - ## NOTE: product can be changed here - # prodEB = m.NewIntVar(0, capE * capB, "prodEB") - # m.AddMultiplicationEquality(prodEB, [finalE, finalB]) - # obj = m.NewIntVar(0, capE * capB * capS, "obj") - # m.AddMultiplicationEquality(obj, [prodEB, finalS]) - # m.Maximize(obj) - - # Linear objective instead (it sucks) - # m.Maximize(finalE + finalB + finalS) - - # New Product def Eprod(v): return v * v @@ -522,7 +1404,7 @@ def solve( status = solver.Solve( m, printer.IntermediateSolutionPrinter( - {"electrum": finalE, "brass": finalB, "steel": finalS} + {"electrum": finalE, "brass": finalB, "steel": finalS}, scale=0.01 ), ) @@ -572,6 +1454,10 @@ def solve( finalE, finalB, finalS, + action_registry, + governor, + agent_registry, + capital_spent, ) return solver, status @@ -620,10 +1506,15 @@ def _report( finalE, finalB, finalS, + action_registry, + governor=None, + agent_registry=None, + capital_spent=None, ): print("status:", solver.StatusName(status)) if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE): return + typ_name = {} for i in range(N): for t in range(1, NUM_STEPS + 1): @@ -639,7 +1530,26 @@ def _report( typ_name[i, t] = "-" def action_str(i, t): - if solver.Value(col[i, t]): + # Try each action in order until one is active + ctx = { + "typ_name": typ_name, + "cvE": cvE, + "cvB": cvB, + "cvS": cvS, + "mE": mE, + "mB": mB, + "mS": mS, + "mC": mC, + "owcvE": owcvE, + "owcvB": owcvB, + "owcvS": owcvS, + "owmE": owmE, + "owmB": owmB, + "owmS": owmS, + "owmC": owmC, + } + + if solver.Value(col.get((i, t), 0)): if typ_name[i, t] == "Foundry": v = ( "E" @@ -652,10 +1562,10 @@ def _report( if typ_name[i, t] == "Metro": picks = [] for nm, var in (("E", mE), ("B", mB), ("S", mS), ("C", mC)): - picks += [nm] * solver.Value(var[i, t]) + picks += [nm] * (solver.Value(var[i, t]) // 10) return "Collect {" + ",".join(picks) + "}" return "Collect (+Capital)" - if solver.Value(ow[i, t]): + if solver.Value(ow.get((i, t), 0)): if typ_name[i, t] == "Foundry": v = ( "E" @@ -668,35 +1578,58 @@ def _report( if typ_name[i, t] == "Metro": picks = [] for nm, var in (("E", owmE), ("B", owmB), ("S", owmS), ("C", owmC)): - picks += [nm] * solver.Value(var[i, t]) + picks += [nm] * (solver.Value(var[i, t]) // 10) return "Overwork {" + ",".join(picks) + "} (2x)" return "Overwork (+Capital 2x)" - if solver.Value(ua[i, t]): + if solver.Value(ua.get((i, t), 0)): return "Upgrade a (cost-reduction)" - if solver.Value(ub[i, t]): + if solver.Value(ub.get((i, t), 0)): return "Upgrade b (collect-bonus)" - if solver.Value(ud[i, t]): + if solver.Value(ud.get((i, t), 0)): return "Upgrade d (vat-increment)" - if solver.Value(rH[i, t]): + if solver.Value(rH.get((i, t), 0)): return "Renovate -> Hub" - if solver.Value(rF[i, t]): + if solver.Value(rF.get((i, t), 0)): return "Renovate -> Foundry" - if solver.Value(rM[i, t]): + if solver.Value(rM.get((i, t), 0)): return "Renovate -> Metro" return "(inactive)" print("\nPer-step pools (start of step):") - print(" step: E B S C") + print(" step: E B S C") for t in range(1, NUM_STEPS + 2): label = f"after5" if t == NUM_STEPS + 1 else f"start {t}" print( - f" {label:>8}: {solver.Value(E[t]):3} {solver.Value(B[t]):3} " - f"{solver.Value(S[t]):3} {solver.Value(C[t]):3}" + f" {label:>8}: {solver.Value(E[t]) / 10:6.1f} {solver.Value(B[t]) / 10:6.1f} " + f"{solver.Value(S[t]) / 10:6.1f} {solver.Value(C[t]) / 10:6.1f}" ) + if capital_spent is not None: + print("\n capital spent per step:") + for t in range(1, NUM_STEPS + 1): + spent = sum(solver.Value(x) for x in capital_spent[t]) / 10 + print(f" step {t}: {spent:6.1f}") + + # Resolve governor assignments for display + def gov_for(i, t): + if not governor or not agent_registry: + return "-" + names = [a.name for a in agent_registry] + for name in names: + v = governor.get((i, t, name)) + if v is None: + continue + try: + if solver.Value(v): + return name + except Exception: + continue + return "-" + print("\nActions:") for i in range(N): - a_step, a_type = cities[i] + a_step, a_city = cities[i] + a_type = a_city["type"] if isinstance(a_city, dict) else a_city print(f" City {i} (arrives step {a_step} as {a_type}):") for t in range(a_step, NUM_STEPS + 1): ups = "".join( @@ -709,14 +1642,16 @@ def _report( if typ_name[i, t] == "Foundry" else "" ) + gov = gov_for(i, t) print( f" step {t}: [{typ_name[i, t]:7}] {action_str(i, t):28}" - f" upg[{ups}]{extra}" + f" upg[{ups}] gov[{gov}]{extra}" ) fe, fb, fs = solver.Value(finalE), solver.Value(finalB), solver.Value(finalS) print( - f"\nFINAL E={fe} B={fb} S={fs} product = {fe * fb * fs} sum = {fe + fb + fs}" + f"\nFINAL E={fe / 10:.1f} B={fb / 10:.1f} S={fs / 10:.1f} " + f"product(scaled) = {fe * fb * fs / 1000} sum = {(fe + fb + fs) / 10:.1f}" )