Skip to content

FATROP and IPOPT Produce Different Results #38

@MohamedElMistiri

Description

@MohamedElMistiri

For difficult NMPC problems, I am getting a mismatch between IPOPT and FATROP. When I attempted to reduce 'insol_lu_fact_tol' I got the following error:
RuntimeError: Error in Opti::solve [OptiNode] at .../casadi/core/optistack.cpp:217: Error in Function::call for 'solver' [FatropInterface] at .../casadi/core/function.cpp:1458: Error in Function::call for 'solver' [FatropInterface] at .../casadi/core/function.cpp:362: .../casadi/core/generic_type.cpp:412: Assertion "is_bool()" failed: type mismatch

Here is my formulation:

class MPC:
    def __init__(self, model:CartPoleSystem, Tf:float=2, dt:float=0.025,
                 R:np.ndarray=np.diag([1.0]), R_du:np.ndarray=np.diag([4e-1]), 
                 Q:np.ndarray=np.diag([1.0, 1.0, 1.0, 1.0]), Q_e:np.ndarray=np.diag([1.0, 1.0, 1.0, 1.0]), 
                 W_pe:float=1.0, W_ke:float=1.0, W_pe_e:float=1.0, W_ke_e:float=1.0,
                 x_min:np.ndarray=np.array([-0.7, -np.inf, -np.inf, -np.inf]),
                 x_max:np.ndarray=np.array([0.7, np.inf, np.inf, np.inf]),
                 u_min:np.ndarray=np.array([-5.0]),
                 u_max:np.ndarray=np.array([5.0]),
                 solver_type:str='fatrop',
                 Qsx:np.ndarray=None,
                 Qsx_e:np.ndarray=None,
                 discretization_method:str='rk4',
                jit:bool=False,
                 ):
        self.model = model
        self.Tf = Tf
        self.dt = dt
        self.R = R  # Control input cost matrix
        self.R_du = R_du  # Control input change cost matrix
        self.Q = Q  # State cost matrix
        self.Q_e = Q_e  # Terminal state cost matrix
        self.W_pe = W_pe  # Weight for potential energy in terminal cost
        self.W_ke = W_ke  # Weight for kinetic energy in terminal cost
        self.W_pe_e = W_pe_e  # Weight for potential energy in terminal cost
        self.W_ke_e = W_ke_e  # Weight for kinetic energy in terminal cost
        self.x_min = x_min  # State lower bounds
        self.x_max = x_max  # State upper bounds
        self.u_min = u_min  # Control input lower bounds
        self.u_max = u_max  # Control input upper bounds
        self.solver_type = solver_type  # Type of solver to use
        self.Qsx = Qsx  # Optional slack cost matrix for states
        self.Qsx_e = Qsx_e  # Optional slack cost matrix for terminal states
        self.N_horizon = int(Tf / dt)  # Number of stages in the prediction horizon
        self.K = self.N_horizon + 1  # Total number of stages including the terminal stage
        self.nx = model.nx
        self.nu = model.nu
        self.f = self.model.system_dynamics_ode()  # Get the system dynamics function
        if discretization_method == 'collocation':
            self.discretization_method = discretization_method
            self.deg = 3
            self.tau = np.append(0, ca.collocation_points(self.deg, 'legendre'))  # Collocation points
            self.C = np.zeros((self.deg+1, self.deg+1)) # Coefficients of the collocation equation
            self.D = np.zeros(self.deg+1) # Coefficients of the continuity equation
            self.B = np.zeros(self.deg+1)  # Coefficients of the quadrature function
            # Construct polynomial basis
            for j in range(self.deg+1):
                # Construct Lagrange polynomials to get the polynomial basis at the collocation point
                p = np.poly1d([1])
                for r in range(self.deg+1):
                    if r != j:
                        p *= np.poly1d([1, -self.tau[r]]) / (self.tau[j]-self.tau[r])
                
                # Evaluate the polynomial at the final time to get the coefficients of the continuity equation
                self.D[j] = p(1.0)
                
                # Evaluate the time derivative of the polynomial at all collocation points to get the coefficients of the continuity equation
                pder = np.polyder(p)
                for r in range(self.deg+1):
                    self.C[j, r] = pder(self.tau[r])
                
                # Evaluate the integral of the polynomial to get the coefficients of the quadrature function
                pint = np.polyint(p)
                self.B[j] = pint(1.0)
        else:
            self.discretization_method = 'rk4'  # Default to RK4 method
        # Create Casadi Opti object
        self.opti = ca.Opti()
        
        # Define optimization variables
        self.X = []
        self.U = []
        if discretization_method == 'collocation':
            self.X_col = []  # For collocation variables
        self.eps_x = []
        for k in range(self.K):
            self.X.append(self.opti.variable(self.nx))  # State variables
            if k < self.N_horizon:
                self.U.append(self.opti.variable(self.nu))  # Control input variables
                if discretization_method == 'collocation':
                    self.X_col.append([self.opti.variable(self.nx) for _ in range(self.deg)])  # Collocation variables
            if self.Qsx is not None:
                self.eps_x.append(self.opti.variable(self.nx))  # Slack variables for state constraints

        self.X0 = self.opti.parameter(self.nx)  # Initial state parameter
        self.Xe = self.opti.parameter(self.nx)  # Terminal state parameter
        self.X_ref = self.opti.parameter(self.nx, self.K)  # State reference parameter
        self.U_prev = self.opti.parameter(self.nu)  # Previous control input parameter
        self.trajectory_tracking = False
        self.use_energy = True  # Use energy in the cost function
        self.use_move_suppression = True  # Use move suppression in the cost function
        
        self._set_up_optimization_problem()  # Set up the optimization problem
        
        if self.solver_type == 'fatrop':
            opti_options = {
                    'structure_detection': 'auto',  # Automatically detect the structure of the problem
                    'expand': True,  # Expand the problem for better performance
                    'fatrop.mu_init': 1e-0,  # Initial value for the barrier parameter
                    'jit': jit,  # Just-In-Time compilation
                    'debug': True,  # Enable debugging
                    "fatrop.print_level":-1,  # Set the print level for fatrop
                    "fatrop.tol": 1e-2,  # Tolerance for the solver
                    "fatrop.linsol_lu_fact_tol": 1e-2,  # Tolerance for LU factorization
                    'jit_options': {"flags": "-O3", "verbose": False}  # JIT options for optimization
                }
        elif self.solver_type == 'ipopt':
            opti_options = {
                    "expand": True,
                    # "ipopt.linear_solver": "MA27",  # Use MA57 (fastest HSL solver)
                    "ipopt.max_iter": 1000,
                    "ipopt.tol": 1e-2,
                    "ipopt.print_level": 0,
                    # "print_time": True,
                    # "ipopt.warm_start_init_point": "yes",  # Enable warm starting
                    # "ipopt.hessian_approximation": "limited-memory",  # Use L-BFGS approximation
                    # "ipopt.limited_memory_max_history": 10  # Number of history pairs for L-BFGS
                } # Use MUMPS for linear solver
            
        self.opti.solver(self.solver_type, opti_options)  # Set the solver  
        
    def _discrete_dynamics(self, uk, xk, x_colk, k):
        """Discretize the system dynamics using RK4 method."""
        dt = self.model.dt
        f = self.f  # Get the system dynamics function
        if self.discretization_method == 'rk4':
            # Runge-Kutta 4th order method
            k1 = f(xk, uk)
            k2 = f(xk + dt/2 * k1, uk)
            k3 = f(xk + dt/2 * k2, uk)
            k4 = f(xk + dt * k3, uk)
            return xk + dt/6 * (k1 + 2*k2 + 2*k3 + k4)
        elif self.discretization_method == 'collocation':
            # Collocation method (not implemented here, placeholder)
            # This should return the next state based on collocation points
            # raise NotImplementedError("Collocation method is not implemented yet.")\
            x_next = self.D[0] * xk
            xpk = []
            fjk = []
            for j in range(1, self.deg+1):
                xp = self.C[0, j] * xk
                for r in range(self.deg):
                    xp += self.C[r+1, j] * x_colk[r]
                xpk.append(xp)
                fj = f(x_colk[j-1], uk)
                fjk.append(fj)
                x_next += self.D[j] * x_colk[j-1]
            return x_next, fjk, xpk  # Return the next state, collocation function values, and collocation points

    def _add_constraints(self, fjk, xpk, k, X0, Xe=None):
        """
        Add constraints to the optimization problem.
        """
        cc = []
        xk = self.X[k]  # Current state variable
        uk = self.U[k] if k < self.N_horizon else None  # Current
        
        # Equality constraints for system states
        if k == 0:
            # for i in range(self.nx):
            cc.append(xk - X0 == 0)
        # elif k == self.N_horizon and self.trajectory_tracking is not None:
        #     for i in range(self.nx):
        #         cc.append(xk[i] - Xe[i] == 0)
        
        # Ineuality constraints
        # State constraints
        if k > 0:
            print(k)
            print(f"eps_x: {len(self.eps_x)}")
            eps_x_k = self.eps_x[k] if self.Qsx is not None else np.zeros(self.nx)
            for i in range(self.nx):
                cc.append((self.x_min[i] - eps_x_k[i] <= (xk[i] <= self.x_max[i] + eps_x_k[i])))
                
        # Control input constraints
        if k < self.N_horizon:
            for i in range(self.nu):
                cc.append((self.u_min[i] <= (uk[i] <= self.u_max[i])))
                
            if self.discretization_method == 'collocation':
                # collocation constraints
                for j in range(self.deg):
                    cc.append(self.dt * fjk[j] == xpk[j])
                    eps_x_k = self.eps_x[k] if self.Qsx is not None else np.zeros(self.nx)
                    for i in range(self.nx):
                        cc.append((self.x_min[i] - eps_x_k[i] <= (self.X_col[k][j][i] <= self.x_max[i] + eps_x_k[i])))

        # Slack variable constraints
        if k > 0 and self.Qsx is not None:
            eps_x_k = self.eps_x[k]
            for i in range(self.nx):
                cc.append(np.inf >= (eps_x_k[i] >= 0))

        return cc
    
    def _cost_function(self):
        """Set up the cost function for the optimization problem."""
        cost_val = 0
        for k in range(self.N_horizon):
            x_k = self.X[k]
            u_k = self.U[k]
            if self.Qsx is not None:
                eps_x_k = self.eps_x[k]
            
            if self.trajectory_tracking:
                x_ref_k = self.X_ref[:, k]
                cost_val += (x_k - x_ref_k).T @ self.Q @ (x_k - x_ref_k)
            else:
                cost_val += (x_k-self.Xe).T @ self.Q @ (x_k-self.Xe)  # State cost
                
            cost_val += u_k.T @ self.R @ u_k  # Control input cost
            
            if self.use_move_suppression:
                if k == 0:
                    delta_u_k = u_k - self.U_prev
                else:
                    delta_u_k = u_k - self.U[k-1]
                cost_val += (delta_u_k).T @ self.R_du @ (delta_u_k)  # Control input change cost
                
            if self.use_energy:
                KE_k = self.model.kinetic_energy(x_k)  # Kinetic energy at state x_k
                PE_k = self.model.potential_energy(x_k)  # Potential energy at state x
                cost_val += self.W_ke * KE_k - self.W_pe * PE_k
            
            if self.Qsx is not None:
                cost_val += eps_x_k.T @ self.Qsx @ eps_x_k  # slack cost
        # Terminal cost
        x_e = self.X[self.N_horizon]
        if self.trajectory_tracking:
            x_ref_e = self.X_ref[:, self.N_horizon]
            cost_val += (x_e - x_ref_e).T @ self.Q_e @ (x_e - x_ref_e)
        else:
            cost_val += (x_e - self.Xe).T @ self.Q_e @ (x_e - self.Xe)  # Terminal state cost
        if self.use_energy:
            KE_e = self.model.kinetic_energy(x_e)  # Kinetic energy at terminal state
            PE_e = self.model.potential_energy(x_e)  # Potential energy at terminal state
            cost_val += self.W_ke_e * KE_e - self.W_pe_e * PE_e  # Terminal energy cost
        if self.Qsx_e is not None:
            eps_x_e = self.eps_x[self.N_horizon]
            cost_val += eps_x_e.T @ self.Qsx_e @ eps_x_e  # slack cost at terminal state
        return cost_val
        
    def _set_up_optimization_problem(self):
        """
        Set up the optimization problem for the MPC controller.
        """
        x = self.X  # State variables
        u = self.U  # Control input variables
        if self.discretization_method == 'collocation':
            x_col = self.X_col  # Collocation variables (if using collocation method)
        elif self.discretization_method == 'rk4':
            x_col = None  # No collocation variables for RK4 method
            fjk = None  # No collocation function values for RK4 method
            xpk = None  # No collocation points for RK4 method
        else:
            raise ValueError(f"Unsupported discretization method: {self.discretization_method}")
        
        # Add constraints for the system dynamics following Fatrop's structure
        for k in range(self.K):
            # Add dynamics constraints
            if k < self.N_horizon:
                if self.discretization_method == 'collocation':
                    # For collocation method
                    x_next, fjk, xpk = self._discrete_dynamics(u[k], x[k], x_col[k], k)
                elif self.discretization_method == 'rk4':
                    x_next = self._discrete_dynamics(u[k], x[k], None, k)  # Get the next state using RK4
                    fjk = None  # No collocation function values for RK4
                    xpk = None  # No collocation points for RK4
                else:
                    raise ValueError(f"Unsupported discretization method: {self.discretization_method}")

                self.opti.subject_to(x[k+1] == x_next)  # Add the continuity constraint
                # using direct integration (very slow and sometimes fails)
                # res = self.model.integrator(x0=x[k], p=u[k])  # Call the system dynamics function
                # x_next = res['xf']  # Extract the next state
                # self.opti.subject_to(x[k+1] == res['xf'])  # Add the continuity constraint

                
            # Add Trajectory constraints
            path_constr = self._add_constraints(fjk, xpk, k, self.X0, self.X_ref)
            for constr in path_constr:
                self.opti.subject_to(constr)
        
        # Set the objective function
        J = self._cost_function()
        self.opti.minimize(J)  # Minimize the cost function
        
    def solve(self, x0, xe=None, x_ref=None, u_prev=None):
        """
        Solve the MPC optimization problem.
        """
        # if xe is not None:
        self.opti.set_value(self.Xe, xe)
        # if x_ref is not None and self.trajectory_tracking:
        self.opti.set_value(self.X_ref, x_ref.T)
        # if u_prev is not None and self.use_move_suppression:
        self.opti.set_value(self.U_prev, u_prev)
        self.opti.set_value(self.X0, x0)
        
        for k in range(self.K):
            if self.trajectory_tracking and x_ref is not None:
                for i in range(self.nx):
                    self.opti.set_initial(self.X[k][i], x_ref[k, i])
            elif xe is not None:
                for i in range(self.nx):
                    self.opti.set_initial(self.X[k][i], x0[i] + (xe[i] - x0[i]) * k / self.N_horizon)
            else:
                for i in range(self.nx):
                    self.opti.set_initial(self.X[k][i], x0[i])
            if k < self.N_horizon:
                for i in range(self.nu):
                    if k == 0 and u_prev is not None:
                        self.opti.set_initial(self.U[k][i], u_prev[i])
                    else:
                        self.opti.set_initial(self.U[k][i], 0.0)
        print("=" * 100)
        print(f"Solving MPC Optimization Problem with initial state: {x0}")
        print()
        # try:
        sol = self.opti.solve()  # Solve the optimization problem
        U_opt = sol.value(ca.horzcat(*self.U[:-1]))  # Optimal control inputs
        X_opt = sol.value(ca.horzcat(*self.X))  # Optimal state trajectory
        time_total = sol.stats()['t_wall_total'] * 1000  # Total time taken to solve
        cost = sol.value(self.opti.f)
        print(f"Optimal control inputs: {U_opt[0]}, Cost: {cost}, Time taken: {time_total:.4f} ms")
        print("Solution found:", sol.stats()['success'])
        print("=" * 100)
        return X_opt, U_opt, time_total, cost, True
        
        # except Exception as e:
        #     print(f"Error solving MPC problem: {e}")
        #     try:
        #         # Try to get debug solution
        #         X_opt = np.zeros((self.nx, self.K))
        #         U_opt = np.zeros((self.nu, self.N_horizon))
                
        #         for k in range(self.K):
        #             X_opt[:, k] = self.opti.debug.value(self.X[k])
        #             if k < self.N_horizon:
        #                 U_opt[:, k] = self.opti.debug.value(self.U[k])
                
        #         cost = self.opti.debug.value(self.opti.f)
        #         return X_opt, U_opt, None, cost, False
        #     except:
        #         return None, None, None, None, False

I would really appreciate any help.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions