For difficult NMPC problems, I am getting a mismatch between IPOPT and FATROP. When I attempted to reduce 'insol_lu_fact_tol' I got the following error:
RuntimeError: Error in Opti::solve [OptiNode] at .../casadi/core/optistack.cpp:217: Error in Function::call for 'solver' [FatropInterface] at .../casadi/core/function.cpp:1458: Error in Function::call for 'solver' [FatropInterface] at .../casadi/core/function.cpp:362: .../casadi/core/generic_type.cpp:412: Assertion "is_bool()" failed: type mismatch
class MPC:
def __init__(self, model:CartPoleSystem, Tf:float=2, dt:float=0.025,
R:np.ndarray=np.diag([1.0]), R_du:np.ndarray=np.diag([4e-1]),
Q:np.ndarray=np.diag([1.0, 1.0, 1.0, 1.0]), Q_e:np.ndarray=np.diag([1.0, 1.0, 1.0, 1.0]),
W_pe:float=1.0, W_ke:float=1.0, W_pe_e:float=1.0, W_ke_e:float=1.0,
x_min:np.ndarray=np.array([-0.7, -np.inf, -np.inf, -np.inf]),
x_max:np.ndarray=np.array([0.7, np.inf, np.inf, np.inf]),
u_min:np.ndarray=np.array([-5.0]),
u_max:np.ndarray=np.array([5.0]),
solver_type:str='fatrop',
Qsx:np.ndarray=None,
Qsx_e:np.ndarray=None,
discretization_method:str='rk4',
jit:bool=False,
):
self.model = model
self.Tf = Tf
self.dt = dt
self.R = R # Control input cost matrix
self.R_du = R_du # Control input change cost matrix
self.Q = Q # State cost matrix
self.Q_e = Q_e # Terminal state cost matrix
self.W_pe = W_pe # Weight for potential energy in terminal cost
self.W_ke = W_ke # Weight for kinetic energy in terminal cost
self.W_pe_e = W_pe_e # Weight for potential energy in terminal cost
self.W_ke_e = W_ke_e # Weight for kinetic energy in terminal cost
self.x_min = x_min # State lower bounds
self.x_max = x_max # State upper bounds
self.u_min = u_min # Control input lower bounds
self.u_max = u_max # Control input upper bounds
self.solver_type = solver_type # Type of solver to use
self.Qsx = Qsx # Optional slack cost matrix for states
self.Qsx_e = Qsx_e # Optional slack cost matrix for terminal states
self.N_horizon = int(Tf / dt) # Number of stages in the prediction horizon
self.K = self.N_horizon + 1 # Total number of stages including the terminal stage
self.nx = model.nx
self.nu = model.nu
self.f = self.model.system_dynamics_ode() # Get the system dynamics function
if discretization_method == 'collocation':
self.discretization_method = discretization_method
self.deg = 3
self.tau = np.append(0, ca.collocation_points(self.deg, 'legendre')) # Collocation points
self.C = np.zeros((self.deg+1, self.deg+1)) # Coefficients of the collocation equation
self.D = np.zeros(self.deg+1) # Coefficients of the continuity equation
self.B = np.zeros(self.deg+1) # Coefficients of the quadrature function
# Construct polynomial basis
for j in range(self.deg+1):
# Construct Lagrange polynomials to get the polynomial basis at the collocation point
p = np.poly1d([1])
for r in range(self.deg+1):
if r != j:
p *= np.poly1d([1, -self.tau[r]]) / (self.tau[j]-self.tau[r])
# Evaluate the polynomial at the final time to get the coefficients of the continuity equation
self.D[j] = p(1.0)
# Evaluate the time derivative of the polynomial at all collocation points to get the coefficients of the continuity equation
pder = np.polyder(p)
for r in range(self.deg+1):
self.C[j, r] = pder(self.tau[r])
# Evaluate the integral of the polynomial to get the coefficients of the quadrature function
pint = np.polyint(p)
self.B[j] = pint(1.0)
else:
self.discretization_method = 'rk4' # Default to RK4 method
# Create Casadi Opti object
self.opti = ca.Opti()
# Define optimization variables
self.X = []
self.U = []
if discretization_method == 'collocation':
self.X_col = [] # For collocation variables
self.eps_x = []
for k in range(self.K):
self.X.append(self.opti.variable(self.nx)) # State variables
if k < self.N_horizon:
self.U.append(self.opti.variable(self.nu)) # Control input variables
if discretization_method == 'collocation':
self.X_col.append([self.opti.variable(self.nx) for _ in range(self.deg)]) # Collocation variables
if self.Qsx is not None:
self.eps_x.append(self.opti.variable(self.nx)) # Slack variables for state constraints
self.X0 = self.opti.parameter(self.nx) # Initial state parameter
self.Xe = self.opti.parameter(self.nx) # Terminal state parameter
self.X_ref = self.opti.parameter(self.nx, self.K) # State reference parameter
self.U_prev = self.opti.parameter(self.nu) # Previous control input parameter
self.trajectory_tracking = False
self.use_energy = True # Use energy in the cost function
self.use_move_suppression = True # Use move suppression in the cost function
self._set_up_optimization_problem() # Set up the optimization problem
if self.solver_type == 'fatrop':
opti_options = {
'structure_detection': 'auto', # Automatically detect the structure of the problem
'expand': True, # Expand the problem for better performance
'fatrop.mu_init': 1e-0, # Initial value for the barrier parameter
'jit': jit, # Just-In-Time compilation
'debug': True, # Enable debugging
"fatrop.print_level":-1, # Set the print level for fatrop
"fatrop.tol": 1e-2, # Tolerance for the solver
"fatrop.linsol_lu_fact_tol": 1e-2, # Tolerance for LU factorization
'jit_options': {"flags": "-O3", "verbose": False} # JIT options for optimization
}
elif self.solver_type == 'ipopt':
opti_options = {
"expand": True,
# "ipopt.linear_solver": "MA27", # Use MA57 (fastest HSL solver)
"ipopt.max_iter": 1000,
"ipopt.tol": 1e-2,
"ipopt.print_level": 0,
# "print_time": True,
# "ipopt.warm_start_init_point": "yes", # Enable warm starting
# "ipopt.hessian_approximation": "limited-memory", # Use L-BFGS approximation
# "ipopt.limited_memory_max_history": 10 # Number of history pairs for L-BFGS
} # Use MUMPS for linear solver
self.opti.solver(self.solver_type, opti_options) # Set the solver
def _discrete_dynamics(self, uk, xk, x_colk, k):
"""Discretize the system dynamics using RK4 method."""
dt = self.model.dt
f = self.f # Get the system dynamics function
if self.discretization_method == 'rk4':
# Runge-Kutta 4th order method
k1 = f(xk, uk)
k2 = f(xk + dt/2 * k1, uk)
k3 = f(xk + dt/2 * k2, uk)
k4 = f(xk + dt * k3, uk)
return xk + dt/6 * (k1 + 2*k2 + 2*k3 + k4)
elif self.discretization_method == 'collocation':
# Collocation method (not implemented here, placeholder)
# This should return the next state based on collocation points
# raise NotImplementedError("Collocation method is not implemented yet.")\
x_next = self.D[0] * xk
xpk = []
fjk = []
for j in range(1, self.deg+1):
xp = self.C[0, j] * xk
for r in range(self.deg):
xp += self.C[r+1, j] * x_colk[r]
xpk.append(xp)
fj = f(x_colk[j-1], uk)
fjk.append(fj)
x_next += self.D[j] * x_colk[j-1]
return x_next, fjk, xpk # Return the next state, collocation function values, and collocation points
def _add_constraints(self, fjk, xpk, k, X0, Xe=None):
"""
Add constraints to the optimization problem.
"""
cc = []
xk = self.X[k] # Current state variable
uk = self.U[k] if k < self.N_horizon else None # Current
# Equality constraints for system states
if k == 0:
# for i in range(self.nx):
cc.append(xk - X0 == 0)
# elif k == self.N_horizon and self.trajectory_tracking is not None:
# for i in range(self.nx):
# cc.append(xk[i] - Xe[i] == 0)
# Ineuality constraints
# State constraints
if k > 0:
print(k)
print(f"eps_x: {len(self.eps_x)}")
eps_x_k = self.eps_x[k] if self.Qsx is not None else np.zeros(self.nx)
for i in range(self.nx):
cc.append((self.x_min[i] - eps_x_k[i] <= (xk[i] <= self.x_max[i] + eps_x_k[i])))
# Control input constraints
if k < self.N_horizon:
for i in range(self.nu):
cc.append((self.u_min[i] <= (uk[i] <= self.u_max[i])))
if self.discretization_method == 'collocation':
# collocation constraints
for j in range(self.deg):
cc.append(self.dt * fjk[j] == xpk[j])
eps_x_k = self.eps_x[k] if self.Qsx is not None else np.zeros(self.nx)
for i in range(self.nx):
cc.append((self.x_min[i] - eps_x_k[i] <= (self.X_col[k][j][i] <= self.x_max[i] + eps_x_k[i])))
# Slack variable constraints
if k > 0 and self.Qsx is not None:
eps_x_k = self.eps_x[k]
for i in range(self.nx):
cc.append(np.inf >= (eps_x_k[i] >= 0))
return cc
def _cost_function(self):
"""Set up the cost function for the optimization problem."""
cost_val = 0
for k in range(self.N_horizon):
x_k = self.X[k]
u_k = self.U[k]
if self.Qsx is not None:
eps_x_k = self.eps_x[k]
if self.trajectory_tracking:
x_ref_k = self.X_ref[:, k]
cost_val += (x_k - x_ref_k).T @ self.Q @ (x_k - x_ref_k)
else:
cost_val += (x_k-self.Xe).T @ self.Q @ (x_k-self.Xe) # State cost
cost_val += u_k.T @ self.R @ u_k # Control input cost
if self.use_move_suppression:
if k == 0:
delta_u_k = u_k - self.U_prev
else:
delta_u_k = u_k - self.U[k-1]
cost_val += (delta_u_k).T @ self.R_du @ (delta_u_k) # Control input change cost
if self.use_energy:
KE_k = self.model.kinetic_energy(x_k) # Kinetic energy at state x_k
PE_k = self.model.potential_energy(x_k) # Potential energy at state x
cost_val += self.W_ke * KE_k - self.W_pe * PE_k
if self.Qsx is not None:
cost_val += eps_x_k.T @ self.Qsx @ eps_x_k # slack cost
# Terminal cost
x_e = self.X[self.N_horizon]
if self.trajectory_tracking:
x_ref_e = self.X_ref[:, self.N_horizon]
cost_val += (x_e - x_ref_e).T @ self.Q_e @ (x_e - x_ref_e)
else:
cost_val += (x_e - self.Xe).T @ self.Q_e @ (x_e - self.Xe) # Terminal state cost
if self.use_energy:
KE_e = self.model.kinetic_energy(x_e) # Kinetic energy at terminal state
PE_e = self.model.potential_energy(x_e) # Potential energy at terminal state
cost_val += self.W_ke_e * KE_e - self.W_pe_e * PE_e # Terminal energy cost
if self.Qsx_e is not None:
eps_x_e = self.eps_x[self.N_horizon]
cost_val += eps_x_e.T @ self.Qsx_e @ eps_x_e # slack cost at terminal state
return cost_val
def _set_up_optimization_problem(self):
"""
Set up the optimization problem for the MPC controller.
"""
x = self.X # State variables
u = self.U # Control input variables
if self.discretization_method == 'collocation':
x_col = self.X_col # Collocation variables (if using collocation method)
elif self.discretization_method == 'rk4':
x_col = None # No collocation variables for RK4 method
fjk = None # No collocation function values for RK4 method
xpk = None # No collocation points for RK4 method
else:
raise ValueError(f"Unsupported discretization method: {self.discretization_method}")
# Add constraints for the system dynamics following Fatrop's structure
for k in range(self.K):
# Add dynamics constraints
if k < self.N_horizon:
if self.discretization_method == 'collocation':
# For collocation method
x_next, fjk, xpk = self._discrete_dynamics(u[k], x[k], x_col[k], k)
elif self.discretization_method == 'rk4':
x_next = self._discrete_dynamics(u[k], x[k], None, k) # Get the next state using RK4
fjk = None # No collocation function values for RK4
xpk = None # No collocation points for RK4
else:
raise ValueError(f"Unsupported discretization method: {self.discretization_method}")
self.opti.subject_to(x[k+1] == x_next) # Add the continuity constraint
# using direct integration (very slow and sometimes fails)
# res = self.model.integrator(x0=x[k], p=u[k]) # Call the system dynamics function
# x_next = res['xf'] # Extract the next state
# self.opti.subject_to(x[k+1] == res['xf']) # Add the continuity constraint
# Add Trajectory constraints
path_constr = self._add_constraints(fjk, xpk, k, self.X0, self.X_ref)
for constr in path_constr:
self.opti.subject_to(constr)
# Set the objective function
J = self._cost_function()
self.opti.minimize(J) # Minimize the cost function
def solve(self, x0, xe=None, x_ref=None, u_prev=None):
"""
Solve the MPC optimization problem.
"""
# if xe is not None:
self.opti.set_value(self.Xe, xe)
# if x_ref is not None and self.trajectory_tracking:
self.opti.set_value(self.X_ref, x_ref.T)
# if u_prev is not None and self.use_move_suppression:
self.opti.set_value(self.U_prev, u_prev)
self.opti.set_value(self.X0, x0)
for k in range(self.K):
if self.trajectory_tracking and x_ref is not None:
for i in range(self.nx):
self.opti.set_initial(self.X[k][i], x_ref[k, i])
elif xe is not None:
for i in range(self.nx):
self.opti.set_initial(self.X[k][i], x0[i] + (xe[i] - x0[i]) * k / self.N_horizon)
else:
for i in range(self.nx):
self.opti.set_initial(self.X[k][i], x0[i])
if k < self.N_horizon:
for i in range(self.nu):
if k == 0 and u_prev is not None:
self.opti.set_initial(self.U[k][i], u_prev[i])
else:
self.opti.set_initial(self.U[k][i], 0.0)
print("=" * 100)
print(f"Solving MPC Optimization Problem with initial state: {x0}")
print()
# try:
sol = self.opti.solve() # Solve the optimization problem
U_opt = sol.value(ca.horzcat(*self.U[:-1])) # Optimal control inputs
X_opt = sol.value(ca.horzcat(*self.X)) # Optimal state trajectory
time_total = sol.stats()['t_wall_total'] * 1000 # Total time taken to solve
cost = sol.value(self.opti.f)
print(f"Optimal control inputs: {U_opt[0]}, Cost: {cost}, Time taken: {time_total:.4f} ms")
print("Solution found:", sol.stats()['success'])
print("=" * 100)
return X_opt, U_opt, time_total, cost, True
# except Exception as e:
# print(f"Error solving MPC problem: {e}")
# try:
# # Try to get debug solution
# X_opt = np.zeros((self.nx, self.K))
# U_opt = np.zeros((self.nu, self.N_horizon))
# for k in range(self.K):
# X_opt[:, k] = self.opti.debug.value(self.X[k])
# if k < self.N_horizon:
# U_opt[:, k] = self.opti.debug.value(self.U[k])
# cost = self.opti.debug.value(self.opti.f)
# return X_opt, U_opt, None, cost, False
# except:
# return None, None, None, None, False
I would really appreciate any help.
For difficult NMPC problems, I am getting a mismatch between IPOPT and FATROP. When I attempted to reduce 'insol_lu_fact_tol' I got the following error:
RuntimeError: Error in Opti::solve [OptiNode] at .../casadi/core/optistack.cpp:217: Error in Function::call for 'solver' [FatropInterface] at .../casadi/core/function.cpp:1458: Error in Function::call for 'solver' [FatropInterface] at .../casadi/core/function.cpp:362: .../casadi/core/generic_type.cpp:412: Assertion "is_bool()" failed: type mismatchHere is my formulation:
I would really appreciate any help.