""" City Resource Optimization -> CP-SAT (Google OR-Tools) [ACTION REGISTRY VERSION] Maximise Electrum * Brass * Steel after the gains of step 5. The objective is a product of three variables, so this is a constraint- programming / nonlinear problem, hence CP-SAT (cp_model) rather than the LP/MIP solver. Read the comments at: - PARAMETERS (initial pools + arrival schedule + bonus mode) - "OBJECTIVE IS SET HERE" (the product being maximised -- tweak freely) """ from ortools.sat.python import cp_model import printer # ====================================================================== # PARAMETERS -- edit these # ====================================================================== # Starting resource pools at the start of step 1: (Electrum, Brass, Steel, Capital) # Note: all resource values are scaled by 10 internally so that fractional # trade-good splits (Baron/Fence et al, phase B/C) can be modeled with integers. # Display in _report divides by 10. INITIAL = (30, 30, 30, 30) # fixed_choices = { # "actions": { # (city_idx, step): "action_name", # e.g., (0, 2): "collect" # # ... # }, # "governors": { # (city_idx, step, agent_name): bool, # e.g., (1, 1, "provisioner"): True # # ... # } # } # # None for nothing # FIXED_CHOICES = None FIXED_CHOICES = { "actions": { (0, 1): "renovate_h", (1, 1): "renovate_h", (2, 1): "renovate_h", (3, 1): "overwork", (0, 2): "collect", (1, 2): "collect", (2, 2): "overwork", (3, 2): "upgrade_a", (0, 3): "collect", (1, 3): "collect", (2, 3): "upgrade_a", (3, 3): "overwork", (0, 4): "collect", (1, 4): "collect", (2, 4): "overwork", (3, 4): "upgrade_b", (0, 5): "collect", (1, 5): "collect", (2, 5): "upgrade_b", (3, 5): "overwork", } } # Resource constraints: list of callables that receive a dict with keys E, B, S, C # mapping to resource variables indexed by step. Each callable returns a constraint # (or None to skip) to be passed to m.Add(). Example: # lambda res: res["E"][3] >= 50 # Ensure at least 50 E at step 3 RESOURCE_CONSTRAINTS = [] # Maximize criteria for solve(). Factor = exponent in "product" mode, # weight in "sum" mode. Keys: E, B, S, C; missing keys = 0 (resource # excluded from the objective — it is NOT forced to zero). Negative # factors are allowed only in "sum" mode (a negative exponent would mean # division, which integer CP-SAT cannot express). OBJECTIVE_FACTORS = {"E": 2, "B": 1, "S": 2} OBJECTIVE_MODE = "product" # "product" or "sum" def _validate_objective(factors, mode): if mode not in ("product", "sum"): raise ValueError(f"objective_mode must be 'product' or 'sum', got {mode!r}") unknown = set(factors) - {"E", "B", "S", "C"} if unknown: raise ValueError(f"unknown objective_factors keys: {sorted(unknown)}") for k, v in factors.items(): if not isinstance(v, int) or isinstance(v, bool): raise ValueError(f"objective factor {k}={v!r} must be an int") if mode == "product" and v < 0: raise ValueError( f"objective factor {k}={v} is negative; negative exponents " "are not expressible in product mode" ) if not any(factors.values()): raise ValueError("objective_factors needs at least one nonzero factor") # Arrival schedule. Key = step (1..5), value = list of city types that arrive # at the START of that step. Types: 'H' Hub, 'F' Foundry, 'M' Metropolis, 'N' Monument. # Arriving cities act that same step. # Each entry may be either a string shorthand (e.g. "H") or a dict, e.g. # {"type": "F", "adjacent_to": [city_idx, ...], "vats": {"E": 1, "B": 1, "S": 1}, # "departure_step": NUM_STEPS + 1} # `adjacent_to` is currently parsed but unused; see normalize_city(). # Optional `vats` overrides initial vat values at arrival (each defaults to 1). # Optional `departure_step` is the first step at which the city is ABSENT; # default NUM_STEPS+1 means the city stays through the whole simulation. ARRIVALS = { 1: [ {"type": "F", "adjacent_to": []}, {"type": "F", "adjacent_to": []}, {"type": "F", "adjacent_to": []}, {"type": "H", "adjacent_to": []}, ], 2: [], 3: [], 4: [], 5: [], } def normalize_city(c): if isinstance(c, str): c = {"type": c} vats_in = c.get("vats", {}) or {} vats = { "E": vats_in.get("E", 1), "B": vats_in.get("B", 1), "S": vats_in.get("S", 1), } return { "type": c["type"], "adjacent_to": c.get("adjacent_to", []), "vats": vats, "departure_step": c.get("departure_step", NUM_STEPS + 1), } # Collect Bonus (b) adds +1 to whatever a Collect gives. On a foundry that is # +1 of the resource collected (i.e. the chosen vat's value + 1). This is the # same uniform rule as Hub (+1 Capital) and Metropolis (+1 resource pick). NUM_STEPS = 5 MAX_RES = ( 2000 # upper bound on any resource pool (scaled x10; raise if you expect more) ) MAX_VAT = 15 # upper bound on a foundry vat value (1 + 2*nsteps is plenty); unscaled # Bastion counts indexed by t-1; bastions on map at step t. Reserved for # phase B/C/D agents (no effect yet). BASTION_COUNTS = [3, 3, 3, 3, 3] # Agent name -> list of steps where the agent is available. # Empty/missing means the agent is disabled. Phase B/C/D will populate. AGENT_AVAILABILITY = { "capitalist": [], "baron": [], "prodigy": [], "provisioner": [], "metallurgist": [], "builder": [], "courier": [], "planner": [], "fence": [], "foreman": [], "industrialist": [], } # ====================================================================== # ACTION REGISTRY CONFIGURATION # ====================================================================== ENABLED_ACTIONS = { "collect": True, "upgrade_a": True, "upgrade_b": True, "upgrade_d": True, "overwork": True, "renovate_h": True, "renovate_f": True, "noop": True, # "renovate_m": False, # currently always disabled in main.py } # ====================================================================== # ACTION REGISTRY BASE CLASS # ====================================================================== class Action: """Base class for all actions. Subclasses implement the action's behavior.""" def __init__(self, name): self.name = name def declare_vars(self, model, i, t, ctx): """Declare the action's decision variable(s) for city i at step t. Should create and store a selector var (selector_var) in ctx['action_vars'][self.name]. """ pass def add_constraints(self, model, i, t, ctx): """Add precondition and legality constraints for this action.""" pass def add_global_constraints(self, model, t, ctx): """Add per-step global constraints (e.g. overwork's 'at most 1 per step').""" pass def contributes_gains_costs(self, model, i, t, ctx): """Add contributions to gain_E/B/S/C and cost_C/S accumulators.""" pass def transition_contrib(self, model, i, t, ctx): """Handle state transitions (type, upgrade, vat) caused by this action.""" pass def selector_var(self, i, t, ctx): """Return the boolean variable that is 1 iff this action is chosen for (i,t). Used to form the "exactly one action" constraint. """ return ctx["action_vars"].get(self.name, {}).get((i, t), None) def describe(self, solver, i, t, ctx): """Return a human-readable string describing what this action does for (i,t). Called during reporting if this action is the chosen one. """ return f"{self.name}" # (VatsMixin removed — vat transitions are handled centrally in solve()) # ====================================================================== # CONCRETE ACTIONS # ====================================================================== class NoOpAction(Action): def __init__(self): super().__init__("noop") def declare_vars(self, model, i, t, ctx): noop = model.NewBoolVar(f"noop_{i}_{t}") ctx["action_vars"]["noop"][(i, t)] = noop ctx["noop"][i, t] = noop # only allowed after overwork to avoid more things needing to be checked def add_constraints(self, model, i, t, ctx): noop = ctx["noop"][i, t] if t == ctx["arrival_step"][i]: # Can't noop at arrival (no prior step) model.Add(noop == 0) else: # noop ≤ ow[i, t-1]: noop can be 1 only if overwork was 1 last step model.Add(noop <= ctx["ow"][i, t - 1]) def add_global_constraints(self, model, t, ctx): pass def contributes_gains_costs(self, model, i, t, ctx): # No gains or costs pass def transition_contrib(self, model, i, t, ctx): pass def describe(self, solver, i, t, ctx): return "Rest (no action)" class CollectAction(Action): def __init__(self): super().__init__("collect") self.AND_cache = {} def declare_vars(self, model, i, t, ctx): m = model col = m.NewBoolVar(f"col_{i}_{t}") cvE = m.NewBoolVar(f"cvE_{i}_{t}") cvB = m.NewBoolVar(f"cvB_{i}_{t}") cvS = m.NewBoolVar(f"cvS_{i}_{t}") mE = m.NewIntVar(0, 30, f"mE_{i}_{t}") mB = m.NewIntVar(0, 30, f"mB_{i}_{t}") mS = m.NewIntVar(0, 30, f"mS_{i}_{t}") mC = m.NewIntVar(0, 30, f"mC_{i}_{t}") ctx["action_vars"]["collect"][(i, t)] = col ctx["col"][i, t] = col ctx["cvE"][i, t] = cvE ctx["cvB"][i, t] = cvB ctx["cvS"][i, t] = cvS ctx["mE"][i, t] = mE ctx["mB"][i, t] = mB ctx["mS"][i, t] = mS ctx["mC"][i, t] = mC def add_constraints(self, model, i, t, ctx): m = model isF = ctx["isF"] isM = ctx["isM"] isMon = ctx["isMon"] hasB = ctx["hasB"] col = ctx["col"][i, t] cvE, cvB, cvS = ctx["cvE"][i, t], ctx["cvB"][i, t], ctx["cvS"][i, t] mE, mB, mS, mC = ( ctx["mE"][i, t], ctx["mB"][i, t], ctx["mS"][i, t], ctx["mC"][i, t], ) # Monument can't collect m.Add(col == 0).OnlyEnforceIf(isMon[i, t]) # Foundry: exactly one vat chosen iff foundry collects fcol = self._AND(model, isF[i, t], col, ctx) m.Add(cvE + cvB + cvS == fcol) # Metropolis: pick (2 + hasB) resources mcol = self._AND(model, isM[i, t], col, ctx) npick = m.NewIntVar(0, 3, "") m.Add(npick == 2 + hasB[i, t]).OnlyEnforceIf(mcol) m.Add(npick == 0).OnlyEnforceIf(mcol.Not()) # Each "pick" is 10 units of the chosen resource (x10 scaling). m.Add(mE + mB + mS + mC == npick * 10) def add_global_constraints(self, model, t, ctx): pass def contributes_gains_costs(self, model, i, t, ctx): m = model isH = ctx["isH"] isF = ctx["isF"] isM = ctx["isM"] hasB = ctx["hasB"] vE, vB, vS = ctx["vE"], ctx["vB"], ctx["vS"] col = ctx["col"][i, t] cvE, cvB, cvS = ctx["cvE"][i, t], ctx["cvB"][i, t], ctx["cvS"][i, t] mE, mB, mS, mC = ( ctx["mE"][i, t], ctx["mB"][i, t], ctx["mS"][i, t], ctx["mC"][i, t], ) gain_E = ctx["gain_E"][t] gain_B = ctx["gain_B"][t] gain_S = ctx["gain_S"][t] gain_C = ctx["gain_C"][t] cost_C = ctx["cost_C"][t] capital_spent = ctx["capital_spent"][t] # Collect sub-choices fcol = self._AND(model, isF[i, t], col, ctx) mcol = self._AND(model, isM[i, t], col, ctx) hcol = self._AND(model, isH[i, t], col, ctx) # Costs: foundry & metropolis collect each cost 1 Capital (x10 scaled) fcol_cost = m.NewIntVar(0, 10, "") m.Add(fcol_cost == 10 * fcol) cost_C.append(fcol_cost) capital_spent.append(fcol_cost) mcol_cost = m.NewIntVar(0, 10, "") m.Add(mcol_cost == 10 * mcol) cost_C.append(mcol_cost) capital_spent.append(mcol_cost) # Hub collect: +2 Capital (+1 more if collect-bonus b) -> x10 hub_gain = m.NewIntVar(0, 30, "") m.Add(hub_gain == 10 * (2 + hasB[i, t])).OnlyEnforceIf(hcol) m.Add(hub_gain == 0).OnlyEnforceIf(hcol.Not()) gain_C.append(hub_gain) # Metropolis collect: each "pick" is already 10 units (mE/mB/mS/mC). gain_E.append(mE) gain_B.append(mB) gain_S.append(mS) gain_C.append(mC) # Foundry collect: gain chosen vat's value as that resource (vat unscaled), # then scale x10 for the resource gain. gEf = m.NewIntVar(0, ctx["max_vat"], "") gBf = m.NewIntVar(0, ctx["max_vat"], "") gSf = m.NewIntVar(0, ctx["max_vat"], "") m.AddMultiplicationEquality(gEf, [cvE, vE[i, t]]) m.AddMultiplicationEquality(gBf, [cvB, vB[i, t]]) m.AddMultiplicationEquality(gSf, [cvS, vS[i, t]]) gEf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "") gBf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "") gSf_scaled = m.NewIntVar(0, 10 * ctx["max_vat"], "") m.Add(gEf_scaled == 10 * gEf) m.Add(gBf_scaled == 10 * gBf) m.Add(gSf_scaled == 10 * gSf) gain_E.append(gEf_scaled) gain_B.append(gBf_scaled) gain_S.append(gSf_scaled) # Collect Bonus (b): adds +1 to the amount a Collect gives -> +10 scaled be = self._AND(model, cvE, hasB[i, t], ctx) bb = self._AND(model, cvB, hasB[i, t], ctx) bs = self._AND(model, cvS, hasB[i, t], ctx) be10 = m.NewIntVar(0, 10, "") bb10 = m.NewIntVar(0, 10, "") bs10 = m.NewIntVar(0, 10, "") m.Add(be10 == 10 * be) m.Add(bb10 == 10 * bb) m.Add(bs10 == 10 * bs) gain_E.append(be10) gain_B.append(bb10) gain_S.append(bs10) def transition_contrib(self, model, i, t, ctx): pass # Vat transitions handled centrally in solve() def describe(self, solver, i, t, ctx): typ_name = ctx["typ_name"].get((i, t), "-") cvE = ctx["cvE"].get((i, t)) cvB = ctx["cvB"].get((i, t)) cvS = ctx["cvS"].get((i, t)) mE = ctx["mE"].get((i, t)) mB = ctx["mB"].get((i, t)) mS = ctx["mS"].get((i, t)) mC = ctx["mC"].get((i, t)) if typ_name == "Foundry": v = "E" if solver.Value(cvE) else "B" if solver.Value(cvB) else "S" return f"Collect vat {v}" if typ_name == "Metro": picks = [] for nm, var in (("E", mE), ("B", mB), ("S", mS), ("C", mC)): picks += [nm] * (solver.Value(var) // 10) return "Collect {" + ",".join(picks) + "}" return "Collect (+Capital)" def _AND(self, model, a, b, ctx): key = (id(a), id(b)) if key not in self.AND_cache: c = model.NewBoolVar("") model.AddMultiplicationEquality(c, [a, b]) self.AND_cache[key] = c return self.AND_cache[key] class UpgradeAction(Action): """Base class for upgrade actions (a, b, d).""" def __init__(self, name, upgrade_key): super().__init__(name) self.upgrade_key = upgrade_key # 'A', 'B', or 'D' def declare_vars(self, model, i, t, ctx): m = model u = m.NewBoolVar(f"{self.upgrade_key.lower()}_{i}_{t}") ctx["action_vars"][self.name][(i, t)] = u ctx[f"u{self.upgrade_key.lower()}"][i, t] = u def add_constraints(self, model, i, t, ctx): m = model u = ctx[f"u{self.upgrade_key.lower()}"][i, t] has_key = ctx[f"has{self.upgrade_key}"] isMon = ctx["isMon"] isF = ctx["isF"] # Can't re-acquire m.Add(has_key[i, t] == 0).OnlyEnforceIf(u) # Monument can't upgrade m.Add(u == 0).OnlyEnforceIf(isMon[i, t]) # D is foundry-only if self.upgrade_key == "D": m.Add(isF[i, t] == 1).OnlyEnforceIf(u) def add_global_constraints(self, model, t, ctx): pass def contributes_gains_costs(self, model, i, t, ctx): m = model u = ctx[f"u{self.upgrade_key.lower()}"][i, t] hasA = ctx["hasA"] cost_S = ctx["cost_S"][t] # Upgrades b,d cost 2 Steel, reduced by 1 if cost-reduction (a) already held # (x10 scaled: 20 / 10). if self.upgrade_key in ("B", "D"): c = m.NewIntVar(0, 20, "") both = m.NewBoolVar("") m.AddMultiplicationEquality(both, [u, hasA[i, t]]) m.Add(c == 20).OnlyEnforceIf(u, hasA[i, t].Not()) m.Add(c == 10).OnlyEnforceIf(both) m.Add(c == 0).OnlyEnforceIf(u.Not()) cost_S.append(c) def transition_contrib(self, model, i, t, ctx): pass def describe(self, solver, i, t, ctx): if self.upgrade_key == "A": return "Upgrade a (cost-reduction)" elif self.upgrade_key == "B": return "Upgrade b (collect-bonus)" elif self.upgrade_key == "D": return "Upgrade d (vat-increment)" return self.name class OverworkAction(Action): def __init__(self): super().__init__("overwork") self.AND_cache = {} def declare_vars(self, model, i, t, ctx): m = model ow = m.NewBoolVar(f"ow_{i}_{t}") owcvE = m.NewBoolVar(f"owcvE_{i}_{t}") owcvB = m.NewBoolVar(f"owcvB_{i}_{t}") owcvS = m.NewBoolVar(f"owcvS_{i}_{t}") owmE = m.NewIntVar(0, 60, f"owmE_{i}_{t}") owmB = m.NewIntVar(0, 60, f"owmB_{i}_{t}") owmS = m.NewIntVar(0, 60, f"owmS_{i}_{t}") owmC = m.NewIntVar(0, 60, f"owmC_{i}_{t}") ctx["action_vars"]["overwork"][(i, t)] = ow ctx["ow"][i, t] = ow ctx["owcvE"][i, t] = owcvE ctx["owcvB"][i, t] = owcvB ctx["owcvS"][i, t] = owcvS ctx["owmE"][i, t] = owmE ctx["owmB"][i, t] = owmB ctx["owmS"][i, t] = owmS ctx["owmC"][i, t] = owmC def add_constraints(self, model, i, t, ctx): m = model isF = ctx["isF"] isM = ctx["isM"] isMon = ctx["isMon"] hasB = ctx["hasB"] ow = ctx["ow"][i, t] owcvE = ctx["owcvE"][i, t] owcvB = ctx["owcvB"][i, t] owcvS = ctx["owcvS"][i, t] owmE = ctx["owmE"][i, t] owmB = ctx["owmB"][i, t] owmS = ctx["owmS"][i, t] owmC = ctx["owmC"][i, t] # Monument can't overwork m.Add(ow == 0).OnlyEnforceIf(isMon[i, t]) # Foundry overwork: exactly one vat chosen iff foundry overworks fow = self._AND(model, isF[i, t], ow, ctx) m.Add(owcvE + owcvB + owcvS == fow) # Metropolis overwork: pick 2*(2 + hasB) resources mow = self._AND(model, isM[i, t], ow, ctx) ow_npick = m.NewIntVar(0, 6, "") m.Add(ow_npick == 2 * (2 + hasB[i, t])).OnlyEnforceIf(mow) m.Add(ow_npick == 0).OnlyEnforceIf(mow.Not()) # Each "pick" is 10 units (x10 scaling). m.Add(owmE + owmB + owmS + owmC == ow_npick * 10) # Overwork cooldown: city that overworked last step can't collect or overwork if t > ctx["arrival_step"][i]: col = ctx["col"].get((i, t)) if col is not None: m.Add(col == 0).OnlyEnforceIf(ctx["ow"][i, t - 1]) m.Add(ow == 0).OnlyEnforceIf(ctx["ow"][i, t - 1]) # Planner gating: when planner is enabled, a city may overwork only if # the planner agent governs it. Removes the global "at most 1" cap. if "planner" in AGENT_AVAILABILITY: planner_g = ctx["governor"].get((i, t, "planner")) if planner_g is not None: m.Add(ow == 0).OnlyEnforceIf(planner_g.Not()) else: m.Add(ow == 0) def add_global_constraints(self, model, t, ctx): m = model ow = ctx["ow"] N = ctx["N"] # no planner = no overworks if "planner" not in AGENT_AVAILABILITY: m.Add(sum(ow[i, t] for i in range(N)) <= 0) # else: gating handled per-city by Planner def contributes_gains_costs(self, model, i, t, ctx): m = model isH = ctx["isH"] isF = ctx["isF"] isM = ctx["isM"] hasB = ctx["hasB"] vE, vB, vS = ctx["vE"], ctx["vB"], ctx["vS"] ow = ctx["ow"][i, t] owcvE = ctx["owcvE"][i, t] owcvB = ctx["owcvB"][i, t] owcvS = ctx["owcvS"][i, t] owmE = ctx["owmE"][i, t] owmB = ctx["owmB"][i, t] owmS = ctx["owmS"][i, t] owmC = ctx["owmC"][i, t] gain_E = ctx["gain_E"][t] gain_B = ctx["gain_B"][t] gain_S = ctx["gain_S"][t] gain_C = ctx["gain_C"][t] # Overwork sub-choices fow = self._AND(model, isF[i, t], ow, ctx) mow = self._AND(model, isM[i, t], ow, ctx) how = self._AND(model, isH[i, t], ow, ctx) # Hub overwork: 2*(2 + hasB) Capital -> x10 hub_ow_gain = m.NewIntVar(0, 60, "") m.Add(hub_ow_gain == 20 * (2 + hasB[i, t])).OnlyEnforceIf(how) m.Add(hub_ow_gain == 0).OnlyEnforceIf(how.Not()) gain_C.append(hub_ow_gain) # Metropolis overwork: each "pick" is already 10 units (owm* vars). gain_E.append(owmE) gain_B.append(owmB) gain_S.append(owmS) gain_C.append(owmC) # Foundry overwork: 2 * chosen vat's value, then x10 scaling. _owE = m.NewIntVar(0, ctx["max_vat"], "") _owB = m.NewIntVar(0, ctx["max_vat"], "") _owS = m.NewIntVar(0, ctx["max_vat"], "") m.AddMultiplicationEquality(_owE, [owcvE, vE[i, t]]) m.AddMultiplicationEquality(_owB, [owcvB, vB[i, t]]) m.AddMultiplicationEquality(_owS, [owcvS, vS[i, t]]) owgEf = m.NewIntVar(0, 20 * ctx["max_vat"], "") owgBf = m.NewIntVar(0, 20 * ctx["max_vat"], "") owgSf = m.NewIntVar(0, 20 * ctx["max_vat"], "") m.Add(owgEf == 20 * _owE) m.Add(owgBf == 20 * _owB) m.Add(owgSf == 20 * _owS) gain_E.append(owgEf) gain_B.append(owgBf) gain_S.append(owgSf) # Collect Bonus (b) for foundry overwork: 2 * (+1) of that resource -> x10 = +20 ow_be = self._AND(model, owcvE, hasB[i, t], ctx) ow_bb = self._AND(model, owcvB, hasB[i, t], ctx) ow_bs = self._AND(model, owcvS, hasB[i, t], ctx) ow_be20 = m.NewIntVar(0, 20, "") ow_bb20 = m.NewIntVar(0, 20, "") ow_bs20 = m.NewIntVar(0, 20, "") m.Add(ow_be20 == 20 * ow_be) m.Add(ow_bb20 == 20 * ow_bb) m.Add(ow_bs20 == 20 * ow_bs) gain_E.append(ow_be20) gain_B.append(ow_bb20) gain_S.append(ow_bs20) def transition_contrib(self, model, i, t, ctx): pass # Vat transitions handled centrally in solve() def describe(self, solver, i, t, ctx): typ_name = ctx["typ_name"].get((i, t), "-") owcvE = ctx["owcvE"].get((i, t)) owcvB = ctx["owcvB"].get((i, t)) owcvS = ctx["owcvS"].get((i, t)) owmE = ctx["owmE"].get((i, t)) owmB = ctx["owmB"].get((i, t)) owmS = ctx["owmS"].get((i, t)) owmC = ctx["owmC"].get((i, t)) if typ_name == "Foundry": v = "E" if solver.Value(owcvE) else "B" if solver.Value(owcvB) else "S" return f"Overwork vat {v} (2x)" if typ_name == "Metro": picks = [] for nm, var in (("E", owmE), ("B", owmB), ("S", owmS), ("C", owmC)): picks += [nm] * (solver.Value(var) // 10) return "Overwork {" + ",".join(picks) + "} (2x)" return "Overwork (+Capital 2x)" def _AND(self, model, a, b, ctx): key = (id(a), id(b)) if not hasattr(self, "AND_cache"): self.AND_cache = {} if key not in self.AND_cache: c = model.NewBoolVar("") model.AddMultiplicationEquality(c, [a, b]) self.AND_cache[key] = c return self.AND_cache[key] class RenovateAction(Action): """Base class for renovation actions.""" def __init__(self, name, target_type): super().__init__(name) self.target_type = target_type # 'H', 'F', etc. def declare_vars(self, model, i, t, ctx): m = model r = m.NewBoolVar(f"r{self.target_type}_{i}_{t}") ctx["action_vars"][self.name][(i, t)] = r ctx[f"r{self.target_type}"][i, t] = r def add_constraints(self, model, i, t, ctx): m = model r = ctx[f"r{self.target_type}"][i, t] target = ctx[f"is{self.target_type}"] # Renovate-to-X requires not-X now m.Add(target[i, t] == 0).OnlyEnforceIf(r) def add_global_constraints(self, model, t, ctx): pass def contributes_gains_costs(self, model, i, t, ctx): pass def transition_contrib(self, model, i, t, ctx): pass def describe(self, solver, i, t, ctx): type_name = {"H": "Hub", "F": "Foundry", "M": "Metro"} return f"Renovate -> {type_name.get(self.target_type, self.target_type)}" # ====================================================================== # AGENT REGISTRY BASE CLASSES # ====================================================================== class Agent: """Governor-style agent. Assigned to a city per step via governor[i,t,name].""" name: str available_steps: list # steps where this agent may be assigned def __init__(self, name, available_steps): self.name = name self.available_steps = list(available_steps) def declare_vars(self, model, ctx): """One-time vars (e.g. one-shot flags). Called once before per-(i,t) loop.""" pass def apply(self, model, i, t, ctx): """Per-(city,step) effect. Gate logic on ctx['governor'][i,t,self.name].""" pass class EventAgent: """Triggers automatically each available step. No city target. Reads/writes ctx (typically tg_pool or gain_*).""" name: str available_steps: list def __init__(self, name, available_steps): self.name = name self.available_steps = list(available_steps) def declare_vars(self, model, ctx): pass def apply_event(self, model, t, ctx): pass # ====================================================================== # PHASE B AGENTS # ====================================================================== class Capitalist(Agent): """+20 Capital (scaled = +2 Capital) when governing a Collect action.""" def apply(self, model, i, t, ctx): m = model g = ctx["governor"].get((i, t, self.name)) if g is None: return col = ctx["col"].get((i, t)) if col is None: return both = m.NewBoolVar("") m.AddMultiplicationEquality(both, [g, col]) bonus = m.NewIntVar(0, 20, "") m.Add(bonus == 20).OnlyEnforceIf(both) m.Add(bonus == 0).OnlyEnforceIf(both.Not()) ctx["gain_C"][t].append(bonus) class Baron(Agent): """BASTION_COUNTS[t-1]*10 trade-goods when governing a Collect action.""" def apply(self, model, i, t, ctx): m = model g = ctx["governor"].get((i, t, self.name)) if g is None: return col = ctx["col"].get((i, t)) if col is None: return amt = BASTION_COUNTS[t - 1] * 10 both = m.NewBoolVar("") m.AddMultiplicationEquality(both, [g, col]) deposit = m.NewIntVar(0, max(amt, 0), "") m.Add(deposit == amt).OnlyEnforceIf(both) m.Add(deposit == 0).OnlyEnforceIf(both.Not()) ctx["tg_pool"][t].append(deposit) ctx.setdefault("baron_deposits", {})[t] = deposit class Prodigy(Agent): """+20 Steel (refund 2 Steel) when governing AND city upgrades B or D.""" def apply(self, model, i, t, ctx): m = model g = ctx["governor"].get((i, t, self.name)) if g is None: return for u_var in (ctx["ub"].get((i, t)), ctx["ud"].get((i, t))): if u_var is None: continue both = m.NewBoolVar("") m.AddMultiplicationEquality(both, [g, u_var]) refund = m.NewIntVar(0, 20, "") m.Add(refund == 20).OnlyEnforceIf(both) m.Add(refund == 0).OnlyEnforceIf(both.Not()) ctx["gain_S"][t].append(refund) class Provisioner(Agent): """+15 Electrum (+1.5 scaled) when governing, unconditional.""" def apply(self, model, i, t, ctx): m = model g = ctx["governor"].get((i, t, self.name)) if g is None: return bonus = m.NewIntVar(0, 15, "") m.Add(bonus == 15).OnlyEnforceIf(g) m.Add(bonus == 0).OnlyEnforceIf(g.Not()) ctx["gain_E"][t].append(bonus) class Metallurgist(Agent): """Foundry-only: adds +1 to vat increment. Implemented centrally in solve(). This class exists only for registry uniformity.""" def apply(self, model, i, t, ctx): # Vat-level effect handled in solve()'s centralized vat-transition block. pass class Builder(Agent): """One-shot: grants hasD[i,t+1]=1 when governing a Foundry city. Fires at most once across all (i,t).""" def declare_vars(self, model, ctx): ctx.setdefault("builder_fire", {}) def apply(self, model, i, t, ctx): m = model g = ctx["governor"].get((i, t, self.name)) if g is None: return isF = ctx["isF"] fire = m.NewBoolVar(f"builder_fire_{i}_{t}") m.AddMultiplicationEquality(fire, [g, isF[i, t]]) ctx.setdefault("builder_fire", {})[(i, t)] = fire class Courier(Agent): """One-shot globally: +30 to gain_C, gain_S, gain_B on the first step where Courier governs anywhere.""" def declare_vars(self, model, ctx): m = model N = ctx["N"] governor = ctx["governor"] gain_C = ctx["gain_C"] gain_S = ctx["gain_S"] gain_B = ctx["gain_B"] courier_any = {} courier_fired = {} ever_fired = {} prev_ever = None for t in range(1, NUM_STEPS + 1): terms = [] for i in range(N): v = governor.get((i, t, self.name)) if v is not None: terms.append(v) any_t = m.NewBoolVar(f"courier_any_{t}") if terms: # any_t = OR(terms) m.AddMaxEquality(any_t, terms) else: m.Add(any_t == 0) courier_any[t] = any_t fired_t = m.NewBoolVar(f"courier_fired_{t}") if prev_ever is None: # fired_t == any_t (no prior history) m.Add(fired_t == any_t) else: # fired_t = any_t AND NOT prev_ever m.Add(fired_t <= any_t) m.Add(fired_t <= 1 - prev_ever) m.Add(fired_t >= any_t - prev_ever) courier_fired[t] = fired_t ever_t = m.NewBoolVar(f"courier_ever_{t}") if prev_ever is None: m.Add(ever_t == fired_t) else: m.AddMaxEquality(ever_t, [prev_ever, fired_t]) ever_fired[t] = ever_t prev_ever = ever_t # Add +30 contribution to gain_C/S/B for accum in (gain_C[t], gain_S[t], gain_B[t]): bonus = m.NewIntVar(0, 30, "") m.Add(bonus == 30).OnlyEnforceIf(fired_t) m.Add(bonus == 0).OnlyEnforceIf(fired_t.Not()) accum.append(bonus) ctx["courier_any"] = courier_any ctx["courier_fired"] = courier_fired def apply(self, model, i, t, ctx): pass # ====================================================================== # PHASE C AGENTS # ====================================================================== class Planner(Agent): """Effect realized via OverworkAction (per-city gating). No-op here.""" def apply(self, model, i, t, ctx): pass class Foreman(Agent): """TODO: Restructure ability — allows other industry actions during renovation. The current model uses 'exactly one action per city per step', so renovation already blocks no specific action separately. Foreman becomes meaningful only once renovation-blocks-action semantics are modeled. """ def apply(self, model, i, t, ctx): pass class Industrialist(Agent): """Network ability — grants Infrastructure (hasA) to governed city and its adjacent cities when appointed governor.""" def apply(self, model, i, t, ctx): m = model g = ctx["governor"].get((i, t, self.name)) if g is None: return cities = ctx.get("cities") hasA = ctx.get("hasA") NUM_STEPS = ctx.get("NUM_STEPS", 5) if cities is None or hasA is None: return a_step, a_city = cities[i] adjacent_indices = a_city.get("adjacent_to", []) # When Industrialist governs city i at step t, grant hasA to city i and adjacent cities at t+1 if t + 1 <= NUM_STEPS: # Grant to governed city m.Add(hasA[i, t + 1] == 1).OnlyEnforceIf(g) # Grant to adjacent cities for adj_i in adjacent_indices: m.Add(hasA[adj_i, t + 1] == 1).OnlyEnforceIf(g) class Fence(EventAgent): """Each available step, deposits +20 (=2 trade goods x10) into tg_pool[t].""" def apply_event(self, model, t, ctx): deposit = model.NewConstant(20) ctx["tg_pool"][t].append(deposit) ctx.setdefault("fence_deposits", {})[t] = 20 # ====================================================================== # MODEL # ====================================================================== def solve( *, initial=INITIAL, arrivals=ARRIVALS, max_res=MAX_RES, max_vat=MAX_VAT, time_limit=60.0, num_workers=8, verbose=True, fixed_choices=FIXED_CHOICES, resource_constraints=None, objective_factors=None, objective_mode=None, ): if objective_factors is None: objective_factors = OBJECTIVE_FACTORS if objective_mode is None: objective_mode = OBJECTIVE_MODE _validate_objective(objective_factors, objective_mode) # Normalized copy: every key present, missing keys = 0. obj_factors = {k: objective_factors.get(k, 0) for k in "EBSC"} # ---- build the city list ----------------------------------------- # cities: list[tuple[int, dict]] where the dict has keys "type" and # "adjacent_to". TODO: adjacency unused; for Industrialist agent. cities = [] for s in range(1, NUM_STEPS + 1): for entry in arrivals.get(s, []): cities.append((s, normalize_city(entry))) N = len(cities) m = cp_model.CpModel() def AND(a, b): """Boolean AND of two 0/1 vars, returned as a new 0/1 var.""" c = m.NewBoolVar("") m.AddMultiplicationEquality(c, [a, b]) return c # ---- state variables --------------------------------------------- isH, isF, isM, isMon, present = {}, {}, {}, {}, {} hasA, hasB, hasD = {}, {}, {} vE, vB, vS = {}, {}, {} # foundry vats at start of step t # action variables (sparse, only created for enabled actions) col, ua, ub, ud = {}, {}, {}, {} ow = {} noop = {} rH, rF, rM = {}, {}, {} cvE, cvB, cvS = {}, {}, {} owcvE, owcvB, owcvS = {}, {}, {} mE, mB, mS, mC = {}, {}, {}, {} owmE, owmB, owmS, owmC = {}, {}, {}, {} for i in range(N): a_step, a_city = cities[i] a_type = a_city["type"] d_step = a_city["departure_step"] for t in range(1, NUM_STEPS + 2): isH[i, t] = m.NewBoolVar(f"isH_{i}_{t}") isF[i, t] = m.NewBoolVar(f"isF_{i}_{t}") isM[i, t] = m.NewBoolVar(f"isM_{i}_{t}") isMon[i, t] = m.NewBoolVar(f"isMon_{i}_{t}") present[i, t] = m.NewBoolVar(f"present_{i}_{t}") hasA[i, t] = m.NewBoolVar(f"hasA_{i}_{t}") hasB[i, t] = m.NewBoolVar(f"hasB_{i}_{t}") hasD[i, t] = m.NewBoolVar(f"hasD_{i}_{t}") vE[i, t] = m.NewIntVar(0, max_vat, f"vE_{i}_{t}") vB[i, t] = m.NewIntVar(0, max_vat, f"vB_{i}_{t}") vS[i, t] = m.NewIntVar(0, max_vat, f"vS_{i}_{t}") # presence: present from arrival step until departure step (exclusive) m.Add(present[i, t] == (1 if a_step <= t < d_step else 0)) # exactly one type iff present m.Add(isH[i, t] + isF[i, t] + isM[i, t] + isMon[i, t] == present[i, t]) # ---- per-step gain/cost accumulators (linear expressions) --------- gain_E = {t: [] for t in range(1, NUM_STEPS + 1)} gain_B = {t: [] for t in range(1, NUM_STEPS + 1)} gain_S = {t: [] for t in range(1, NUM_STEPS + 1)} gain_C = {t: [] for t in range(1, NUM_STEPS + 1)} cost_C = {t: [] for t in range(1, NUM_STEPS + 1)} cost_S = {t: [] for t in range(1, NUM_STEPS + 1)} # Trade-goods pool (per step): new generic-trade-good sources (Baron, Fence, # phase B/C) deposit IntVars here; centralized split logic routes the pool # into gain_E/B/S/C below. Metro picks are NOT routed through here. tg_pool = {t: [] for t in range(1, NUM_STEPS + 1)} # Per-step capital spent tracker (mirrors entries appended to cost_C). capital_spent = {t: [] for t in range(1, NUM_STEPS + 1)} # ---- Agent Registry (populated by phase B/C/D; empty in phase A) ---- agent_registry = [] event_agent_registry = [] # ---- Action Registry ---- action_registry = [] if ENABLED_ACTIONS.get("collect", True): action_registry.append(CollectAction()) if ENABLED_ACTIONS.get("upgrade_a", True): action_registry.append(UpgradeAction("upgrade_a", "A")) if ENABLED_ACTIONS.get("upgrade_b", True): action_registry.append(UpgradeAction("upgrade_b", "B")) if ENABLED_ACTIONS.get("upgrade_d", True): action_registry.append(UpgradeAction("upgrade_d", "D")) if ENABLED_ACTIONS.get("overwork", True): action_registry.append(OverworkAction()) if ENABLED_ACTIONS.get("renovate_h", True): action_registry.append(RenovateAction("renovate_h", "H")) if ENABLED_ACTIONS.get("renovate_f", True): action_registry.append(RenovateAction("renovate_f", "F")) if ENABLED_ACTIONS.get("noop", True): action_registry.append(NoOpAction()) # ---- Governor variable matrix ---- # governor[(i, t, name)] = BoolVar (or constant 0) for whether agent `name` # governs city i at step t. Created only when agent is available at step t # AND the city has arrived; otherwise NewConstant(0). governor = {} arrival_step_map = {i: cities[i][0] for i in range(N)} for name, avail_steps in AGENT_AVAILABILITY.items(): avail_set = set(avail_steps) for i in range(N): for t in range(1, NUM_STEPS + 1): if t in avail_set and t >= arrival_step_map[i]: governor[i, t, name] = m.NewBoolVar(f"gov_{name}_{i}_{t}") else: governor[i, t, name] = m.NewConstant(0) # At each (i, t): at most one agent governs city i. for i in range(N): for t in range(1, NUM_STEPS + 1): terms = [governor[i, t, n] for n in AGENT_AVAILABILITY] if terms: m.Add(sum(terms) <= 1) # At each (name, t): at most one city has agent `name` assigned. for name in AGENT_AVAILABILITY: for t in range(1, NUM_STEPS + 1): m.Add(sum(governor[i, t, name] for i in range(N)) <= 1) # Phase B agent registration agent_classes = { "capitalist": Capitalist, "baron": Baron, "prodigy": Prodigy, "provisioner": Provisioner, "metallurgist": Metallurgist, "builder": Builder, "courier": Courier, "planner": Planner, "foreman": Foreman, "industrialist": Industrialist, } for name, cls in agent_classes.items(): if name in AGENT_AVAILABILITY: agent_registry.append(cls(name, AGENT_AVAILABILITY[name])) event_agent_classes = { "fence": Fence, } for name, cls in event_agent_classes.items(): if name in AGENT_AVAILABILITY: event_agent_registry.append(cls(name, AGENT_AVAILABILITY[name])) # Shared state populated by agent declare_vars (e.g., Builder's fire map). builder_fire_map = {} # One-shot agent declarations. fence_deposits = {} baron_deposits = {} _agent_decl_ctx = { "model": m, "N": N, "governor": governor, "tg_pool": tg_pool, "capital_spent": capital_spent, "gain_E": gain_E, "gain_B": gain_B, "gain_S": gain_S, "gain_C": gain_C, "cost_C": cost_C, "cost_S": cost_S, "builder_fire": builder_fire_map, "fence_deposits": fence_deposits, "baron_deposits": baron_deposits, } for agent in agent_registry: agent.declare_vars(m, _agent_decl_ctx) for agent in event_agent_registry: agent.declare_vars(m, _agent_decl_ctx) if fixed_choices and "governors" in fixed_choices: for (city, step, agent_name), value in fixed_choices["governors"].items(): if governor.get((city, step, agent_name)) is not None: m.Add(governor[city, step, agent_name] == (1 if value else 0)) # ---- per-city logic ----------------------------------------------- for i in range(N): a_step, a_city = cities[i] a_type = a_city["type"] # initial type at arrival step init = {"H": isH, "F": isF, "M": isM, "N": isMon} m.Add(init[a_type][i, a_step] == 1) # no upgrades at arrival m.Add(hasA[i, a_step] == 0) m.Add(hasB[i, a_step] == 0) m.Add(hasD[i, a_step] == 0) # vats at arrival m.Add(vE[i, a_step] == a_city["vats"]["E"]) m.Add(vB[i, a_step] == a_city["vats"]["B"]) m.Add(vS[i, a_step] == a_city["vats"]["S"]) # before arrival: everything zero for t in range(1, a_step): for v in (isH, isF, isM, isMon, hasA, hasB, hasD, vE, vB, vS): m.Add(v[i, t] == 0) # after departure: everything zero from d_step onward d_step = a_city["departure_step"] for t in range(d_step, NUM_STEPS + 2): for v in (isH, isF, isM, isMon, hasA, hasB, hasD, vE, vB, vS): m.Add(v[i, t] == 0) # action + transition logic for active steps for t in range(a_step, min(d_step, NUM_STEPS + 1)): P = present[i, t] # Build context for action methods ctx = { "noop": noop, "model": m, "i": i, "t": t, "N": N, "max_vat": max_vat, "action_vars": {act.name: {} for act in action_registry}, "arrival_step": {i: a_step for i in range(N)}, "isH": isH, "isF": isF, "isM": isM, "isMon": isMon, "present": present, "hasA": hasA, "hasB": hasB, "hasD": hasD, "vE": vE, "vB": vB, "vS": vS, "col": col, "ua": ua, "ub": ub, "ud": ud, "ow": ow, "rH": rH, "rF": rF, "rM": rM, "cvE": cvE, "cvB": cvB, "cvS": cvS, "owcvE": owcvE, "owcvB": owcvB, "owcvS": owcvS, "mE": mE, "mB": mB, "mS": mS, "mC": mC, "owmE": owmE, "owmB": owmB, "owmS": owmS, "owmC": owmC, "gain_E": gain_E, "gain_B": gain_B, "gain_S": gain_S, "gain_C": gain_C, "cost_C": cost_C, "cost_S": cost_S, "tg_pool": tg_pool, "capital_spent": capital_spent, "governor": governor, "renovations": {}, "vat_next": {}, "builder_fire": builder_fire_map, "baron_deposits": baron_deposits, "cities": cities, "NUM_STEPS": NUM_STEPS, } # Declare all action variables for action in action_registry: action.declare_vars(m, i, t, ctx) # After declaring all action variables for (i,t) if fixed_choices and "actions" in fixed_choices: action_key = (i, t) if action_key in fixed_choices["actions"]: fixed_action = fixed_choices["actions"][action_key] # Set the chosen action to 1, others to 0 for action in action_registry: var = action.selector_var(i, t, ctx) if var is not None: if action.name == fixed_action: m.Add(var == 1) else: m.Add(var == 0) # renovations helper (used by multiple actions) ren = m.NewBoolVar("") rH_act = rH.get((i, t)) rF_act = rF.get((i, t)) rM_act = rM.get((i, t)) ren_sum = sum(x for x in [rH_act, rF_act, rM_act] if x is not None) m.Add(ren == ren_sum) ctx["renovations"][i, t] = ren no_ren = m.NewBoolVar("") m.Add(no_ren == 1 - ren) # Add all constraints for action in action_registry: action.add_constraints(m, i, t, ctx) # LLM allowed renovation into metropolis, need to prevent that now if rM.get((i, t)) is not None: m.Add(rM[i, t] == 0) # Renovation must change type (renovate-to-X requires not-X now) for r, target in [ (rH.get((i, t)), isH), (rF.get((i, t)), isF), (rM.get((i, t)), isM), ]: if r is not None: m.Add(target[i, t] == 0).OnlyEnforceIf(r) # Exactly one action while present (sum all action selectors) action_sum = sum( action.selector_var(i, t, ctx) for action in action_registry if action.selector_var(i, t, ctx) is not None ) m.Add(action_sum == P) # Agent per-(city, step) hooks - run BEFORE upgrade transitions so # Builder can register its fire var that feeds into d_keep. for agent in agent_registry: agent.apply(m, i, t, ctx) # Upgrade transitions (skip propagation past last present step) if t + 1 < d_step: m.AddMaxEquality( hasA[i, t + 1], [hasA[i, t], ua.get((i, t), m.NewConstant(0))] ) m.AddMaxEquality( hasB[i, t + 1], [hasB[i, t], ub.get((i, t), m.NewConstant(0))] ) d_keep = m.NewBoolVar("") d_sources = [hasD[i, t], ud.get((i, t), m.NewConstant(0))] _bfire = builder_fire_map.get((i, t)) if _bfire is not None: d_sources.append(_bfire) m.AddMaxEquality(d_keep, d_sources) m.Add(hasD[i, t + 1] == 0).OnlyEnforceIf(ren) m.Add(hasD[i, t + 1] == d_keep).OnlyEnforceIf(no_ren) # Gain/cost contributions for action in action_registry: action.contributes_gains_costs(m, i, t, ctx) if t + 1 < d_step: # Transition contributions (including vats) for action in action_registry: action.transition_contrib(m, i, t, ctx) # Type transition t -> t+1 m.Add(isH[i, t + 1] == isH[i, t]).OnlyEnforceIf(no_ren) m.Add(isF[i, t + 1] == isF[i, t]).OnlyEnforceIf(no_ren) m.Add(isM[i, t + 1] == isM[i, t]).OnlyEnforceIf(no_ren) for r, target in [ (rH.get((i, t)), isH), (rF.get((i, t)), isF), (rM.get((i, t)), isM), ]: if r is not None: m.Add(target[i, t + 1] == 1).OnlyEnforceIf(r) for r1, t1, t2 in [ (rH.get((i, t)), isF, isM), (rF.get((i, t)), isH, isM), (rM.get((i, t)), isH, isF), ]: if r1 is not None: m.Add(t1[i, t + 1] == 0).OnlyEnforceIf(r1) m.Add(t2[i, t + 1] == 0).OnlyEnforceIf(r1) # ---- Centralized vat transitions (mirrors main.py exactly) ---- # Collect/overwork vat selectors (None if action disabled) _cvE = cvE.get((i, t)) _cvB = cvB.get((i, t)) _cvS = cvS.get((i, t)) _owcvE = owcvE.get((i, t)) _owcvB = owcvB.get((i, t)) _owcvS = owcvS.get((i, t)) inc = m.NewIntVar(1, 3, "") metallurgist_g = governor.get((i, t, "metallurgist")) if metallurgist_g is not None and "metallurgist" in AGENT_AVAILABILITY: metal_active = m.NewBoolVar("") m.AddMultiplicationEquality( metal_active, [metallurgist_g, isF[i, t]] ) m.Add(inc == 1 + metal_active) else: m.Add(inc == 1 + hasD[i, t]) vEn = m.NewIntVar(0, max_vat, "") vBn = m.NewIntVar(0, max_vat, "") vSn = m.NewIntVar(0, max_vat, "") # For each vat selector (collect E, collect B, collect S, overwork E, overwork B, overwork S) # apply the same reset/increment pattern from main.py for sel_E, sel_B, sel_S in [ (_cvE, _cvB, _cvS), (_owcvE, _owcvB, _owcvS), ]: if sel_E is not None: m.Add(vEn == 0).OnlyEnforceIf(sel_E) m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_E) m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_E) if sel_B is not None: m.Add(vBn == 0).OnlyEnforceIf(sel_B) m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_B) m.Add(vSn == vS[i, t] + inc).OnlyEnforceIf(sel_B) if sel_S is not None: m.Add(vSn == 0).OnlyEnforceIf(sel_S) m.Add(vEn == vE[i, t] + inc).OnlyEnforceIf(sel_S) m.Add(vBn == vB[i, t] + inc).OnlyEnforceIf(sel_S) # foundry but neither collecting nor overworking: vats unchanged # f_noncollect_noow = isF AND NOT fcol AND NOT fow = isF - fcol - fow _fcol_count = sum(s for s in [_cvE, _cvB, _cvS] if s is not None) _fow_count = sum(s for s in [_owcvE, _owcvB, _owcvS] if s is not None) f_noncollect_noow = m.NewBoolVar("") m.Add(f_noncollect_noow == isF[i, t] - _fcol_count - _fow_count) m.Add(vEn == vE[i, t]).OnlyEnforceIf(f_noncollect_noow) m.Add(vBn == vB[i, t]).OnlyEnforceIf(f_noncollect_noow) m.Add(vSn == vS[i, t]).OnlyEnforceIf(f_noncollect_noow) # assign vat[i, t+1]: # renovate-to-foundry -> reset to 1 # continuing foundry -> vat_next # otherwise (not foundry next) -> 0 cont_F = AND(isF[i, t], no_ren) for vnext, vn in ( (vE[i, t + 1], vEn), (vB[i, t + 1], vBn), (vS[i, t + 1], vSn), ): if rF.get((i, t)) is not None: m.Add(vnext == 1).OnlyEnforceIf(rF[i, t]) m.Add(vnext == vn).OnlyEnforceIf(cont_F) not_F_next = isF[i, t + 1].Not() m.Add(vE[i, t + 1] == 0).OnlyEnforceIf(not_F_next) m.Add(vB[i, t + 1] == 0).OnlyEnforceIf(not_F_next) m.Add(vS[i, t + 1] == 0).OnlyEnforceIf(not_F_next) # ---- Builder one-shot: at most one fire across all (i, t) ---- # if builder_fire_map: # m.Add(sum(builder_fire_map.values()) <= 1) # ---- global constraints (per action type, per step) ---- for t in range(1, NUM_STEPS + 1): ctx_global = {"N": N, "ow": ow, "t": t} for action in action_registry: action.add_global_constraints(m, t, ctx_global) # ---- event-agent per-step hooks (registry empty in phase A) ---- _event_ctx = { "model": m, "N": N, "governor": governor, "tg_pool": tg_pool, "capital_spent": capital_spent, "gain_E": gain_E, "gain_B": gain_B, "gain_S": gain_S, "gain_C": gain_C, "cost_C": cost_C, "cost_S": cost_S, "fence_deposits": fence_deposits, "baron_deposits": baron_deposits, } for t in range(1, NUM_STEPS + 1): for agent in event_agent_registry: if t in agent.available_steps: agent.apply_event(m, t, _event_ctx) # ---- trade-goods pool split (routes pool[t] -> gain_E/B/S/C) ---- # Generic-trade-good sources (Baron, Fence in later phases) deposit # IntVars into tg_pool[t]; here we declare a 4-way split into the # resource gain accumulators. If the pool is empty for step t, the split # vars are constrained to 0. for t in range(1, NUM_STEPS + 1): pool_total = m.NewIntVar(0, max_res, f"tg_total_{t}") if tg_pool[t]: m.Add(pool_total == sum(tg_pool[t])) else: m.Add(pool_total == 0) tg_E = m.NewIntVar(0, max_res, f"tg_E_{t}") tg_B = m.NewIntVar(0, max_res, f"tg_B_{t}") tg_S = m.NewIntVar(0, max_res, f"tg_S_{t}") tg_C = m.NewIntVar(0, max_res, f"tg_C_{t}") m.Add(tg_E + tg_B + tg_S + tg_C == pool_total) # Each trade good (10 scaled units) must be allocated entirely to one resource m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_E, 10) m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_B, 10) m.AddModuloEquality(m.NewIntVar(0, 0, ""), tg_S, 10) gain_E[t].append(tg_E) gain_B[t].append(tg_B) gain_S[t].append(tg_S) gain_C[t].append(tg_C) # ---- resource pool recursion -------------------------------------- E = {1: m.NewIntVar(initial[0], initial[0], "E1")} B = {1: m.NewIntVar(initial[1], initial[1], "B1")} S = {1: m.NewIntVar(initial[2], initial[2], "S1")} C = {1: m.NewIntVar(initial[3], initial[3], "C1")} for t in range(1, NUM_STEPS + 1): E[t + 1] = m.NewIntVar(0, max_res, f"E_{t + 1}") B[t + 1] = m.NewIntVar(0, max_res, f"B_{t + 1}") S[t + 1] = m.NewIntVar(0, max_res, f"S_{t + 1}") C[t + 1] = m.NewIntVar(0, max_res, f"C_{t + 1}") m.Add(C[t] - sum(cost_C[t]) >= 0) m.Add(S[t] - sum(cost_S[t]) >= 0) m.Add(E[t + 1] == E[t] + sum(gain_E[t])) m.Add(B[t + 1] == B[t] + sum(gain_B[t])) m.Add(S[t + 1] == S[t] - sum(cost_S[t]) + sum(gain_S[t])) m.Add(C[t + 1] == C[t] - sum(cost_C[t]) + sum(gain_C[t])) finalE, finalB, finalS = E[NUM_STEPS + 1], B[NUM_STEPS + 1], S[NUM_STEPS + 1] # ---- Apply resource constraints ---- if resource_constraints is None: resource_constraints = RESOURCE_CONSTRAINTS if verbose and resource_constraints: print(f"Adding {len(resource_constraints)} resource constraint(s)") res_dict = { "E": E, "B": B, "S": S, "C": C, } for i, constraint_fn in enumerate(resource_constraints): constraint = constraint_fn(res_dict) if verbose: print(f" Constraint {i + 1}: {constraint}") print(f" Type: {type(constraint)}") print(f" Constraint object: {constraint}") if constraint is not None: m.Add(constraint) # ---- Phase 1: real per-resource ceilings ---- def _ceiling(var): s = cp_model.CpSolver() s.parameters.max_time_in_seconds = 20.0 s.parameters.num_search_workers = num_workers m.Maximize(var) st = s.Solve(m) return ( int(s.ObjectiveValue()) if st in (cp_model.OPTIMAL, cp_model.FEASIBLE) else max_res ) finals = {"E": finalE, "B": finalB, "S": finalS, "C": C[NUM_STEPS + 1]} # Ceilings only for resources that appear in the objective: each # ceiling solve costs up to 20s, and only objective resources need # bounds (product mode) / benefit from the redundant cap constraint. caps = {} for k, var in finals.items(): if obj_factors[k] != 0: caps[k] = _ceiling(var) m.Add(var <= caps[k]) # ====================================================================== # OBJECTIVE IS SET HERE # ====================================================================== if objective_mode == "sum": # Linear: CP-SAT takes weighted sums (negative weights included) # directly, no auxiliary variables needed. m.Maximize(sum(f * finals[k] for k, f in obj_factors.items() if f)) else: # Product: maximize prod(finals[k] ** obj_factors[k]). Expand the # exponents into a flat factor list and fold pairwise, carrying a # running upper bound from the Phase-1 caps. factor_keys = [k for k, f in obj_factors.items() for _ in range(f)] obj = finals[factor_keys[0]] bound = caps[factor_keys[0]] for k in factor_keys[1:]: bound *= caps[k] nxt = m.NewIntVar(0, bound, "") m.AddMultiplicationEquality(nxt, [obj, finals[k]]) obj = nxt m.Maximize(obj) # ---- Phase 2: solve the product to optimality ---- solver = cp_model.CpSolver() solver.parameters.max_time_in_seconds = time_limit solver.parameters.num_search_workers = num_workers status = None if verbose: status = solver.Solve( m, printer.IntermediateSolutionPrinter( {"electrum": finalE, "brass": finalB, "steel": finalS}, scale=0.1 ), ) else: status = solver.Solve(m) if verbose: caps_str = " ".join(f"{k}<={v}" for k, v in caps.items()) print(f"(resource ceilings used: {caps_str})") _report( solver, status, cities, N, isH, isF, isM, isMon, col, ua, ub, ud, ow, rH, rF, rM, cvE, cvB, cvS, owcvE, owcvB, owcvS, mE, mB, mS, mC, owmE, owmB, owmS, owmC, hasA, hasB, hasD, vE, vB, vS, E, B, S, C, finalE, finalB, finalS, action_registry, governor, agent_registry, capital_spent, fence_deposits, baron_deposits, obj_factors=obj_factors, obj_mode=objective_mode, ) return solver, status def _report( solver, status, cities, N, isH, isF, isM, isMon, col, ua, ub, ud, ow, rH, rF, rM, cvE, cvB, cvS, owcvE, owcvB, owcvS, mE, mB, mS, mC, owmE, owmB, owmS, owmC, hasA, hasB, hasD, vE, vB, vS, E, B, S, C, finalE, finalB, finalS, action_registry, governor=None, agent_registry=None, capital_spent=None, fence_deposits=None, baron_deposits=None, obj_factors=None, obj_mode=None, ): print("status:", solver.StatusName(status)) if status not in (cp_model.OPTIMAL, cp_model.FEASIBLE): return typ_name = {} for i in range(N): for t in range(1, NUM_STEPS + 1): if solver.Value(isH[i, t]): typ_name[i, t] = "Hub" elif solver.Value(isF[i, t]): typ_name[i, t] = "Foundry" elif solver.Value(isM[i, t]): typ_name[i, t] = "Metro" elif solver.Value(isMon[i, t]): typ_name[i, t] = "Monument" else: typ_name[i, t] = "-" def action_str(i, t): # Try each action in order until one is active ctx = { "typ_name": typ_name, "cvE": cvE, "cvB": cvB, "cvS": cvS, "mE": mE, "mB": mB, "mS": mS, "mC": mC, "owcvE": owcvE, "owcvB": owcvB, "owcvS": owcvS, "owmE": owmE, "owmB": owmB, "owmS": owmS, "owmC": owmC, } if solver.Value(col.get((i, t), 0)): if typ_name[i, t] == "Foundry": v = ( "E" if solver.Value(cvE[i, t]) else "B" if solver.Value(cvB[i, t]) else "S" ) return f"Collect vat {v}" if typ_name[i, t] == "Metro": picks = [] for nm, var in (("E", mE), ("B", mB), ("S", mS), ("C", mC)): picks += [nm] * (solver.Value(var[i, t]) // 10) return "Collect {" + ",".join(picks) + "}" return "Collect (+Capital)" if solver.Value(ow.get((i, t), 0)): if typ_name[i, t] == "Foundry": v = ( "E" if solver.Value(owcvE[i, t]) else "B" if solver.Value(owcvB[i, t]) else "S" ) return f"Overwork vat {v} (2x)" if typ_name[i, t] == "Metro": picks = [] for nm, var in (("E", owmE), ("B", owmB), ("S", owmS), ("C", owmC)): picks += [nm] * (solver.Value(var[i, t]) // 10) return "Overwork {" + ",".join(picks) + "} (2x)" return "Overwork (+Capital 2x)" if solver.Value(ua.get((i, t), 0)): return "Upgrade a (cost-reduction)" if solver.Value(ub.get((i, t), 0)): return "Upgrade b (collect-bonus)" if solver.Value(ud.get((i, t), 0)): return "Upgrade d (vat-increment)" if solver.Value(rH.get((i, t), 0)): return "Renovate -> Hub" if solver.Value(rF.get((i, t), 0)): return "Renovate -> Foundry" if solver.Value(rM.get((i, t), 0)): return "Renovate -> Metro" return "(inactive)" print("\nPer-step pools (start of step):") print(" step: E B S C") for t in range(1, NUM_STEPS + 2): label = f"after5" if t == NUM_STEPS + 1 else f"start {t}" c_val = solver.Value(C[t]) c_str = f"{c_val / 10:6.1f}" print( f" {label:>8}: {solver.Value(E[t]) / 10:6.1f} {solver.Value(B[t]) / 10:6.1f} " f"{solver.Value(S[t]) / 10:6.1f} {c_str}" ) if capital_spent is not None: print("\n capital spent per step:") for t in range(1, NUM_STEPS + 1): spent = sum(solver.Value(x) for x in capital_spent[t]) / 10 print(f" step {t}: {spent:6.1f}") if fence_deposits: print("\n Fence deposits (trade goods):") for t in fence_deposits: print( f" step {t}: {fence_deposits[t] / 10:.1f} (scaled: {fence_deposits[t]})" ) if baron_deposits: print("\n Baron deposits (trade goods):") for t in sorted(baron_deposits.keys()): amt = solver.Value(baron_deposits[t]) print(f" step {t}: {amt / 10:.1f} (scaled: {amt})") # Resolve governor assignments for display def gov_for(i, t): if not governor or not agent_registry: return "-" names = [a.name for a in agent_registry] for name in names: v = governor.get((i, t, name)) if v is None: continue try: if solver.Value(v): return name except Exception: continue return "-" print("\nActions:") for i in range(N): a_step, a_city = cities[i] a_type = a_city["type"] if isinstance(a_city, dict) else a_city print(f" City {i} (arrives step {a_step} as {a_type}):") for t in range(a_step, NUM_STEPS + 1): ups = "".join( n for n, h in (("a", hasA), ("b", hasB), ("d", hasD)) if solver.Value(h[i, t]) ) extra = ( f" vats(E{solver.Value(vE[i, t])},B{solver.Value(vB[i, t])},S{solver.Value(vS[i, t])})" if typ_name[i, t] == "Foundry" else "" ) gov = gov_for(i, t) print( f" step {t}: [{typ_name[i, t]:7}] {action_str(i, t):28}" f" upg[{ups}] gov[{gov}]{extra}" ) fe, fb, fs = solver.Value(finalE), solver.Value(finalB), solver.Value(finalS) vals = {"E": fe, "B": fb, "S": fs, "C": solver.Value(C[NUM_STEPS + 1])} if obj_factors is None: # Legacy fallback: old hardcoded E*B*S display. obj_str = f"product(scaled) = {fe * fb * fs / 1000}" elif obj_mode == "sum": terms = [ f"{f}{k}" if abs(f) != 1 else (k if f > 0 else f"-{k}") for k, f in obj_factors.items() if f ] expr = " + ".join(terms).replace("+ -", "- ") raw = sum(f * vals[k] for k, f in obj_factors.items()) obj_str = f"objective {expr} = {raw / 10:.1f}" else: expr = "*".join( k if f == 1 else f"{k}^{f}" for k, f in obj_factors.items() if f ) raw, n = 1, 0 for k, f in obj_factors.items(): raw *= vals[k] ** f n += f # Resource values are x10-scaled, so descale by 10^(sum of exponents). obj_str = f"objective {expr} = {raw / 10**n}" print( f"\nFINAL E={fe / 10:.1f} B={fb / 10:.1f} S={fs / 10:.1f} " f"{obj_str} sum = {(fe + fb + fs) / 10:.1f}" ) if __name__ == "__main__": solve()