From 35b55ddd607faba9745a7783d40abe544be3a5a4 Mon Sep 17 00:00:00 2001 From: Pieter Pas Date: Thu, 19 Oct 2023 15:36:15 +0200 Subject: [PATCH] [Py] update MPC examples --- .../mpc/hanging-chain/hanging-chain-mpc.py | 118 +++++++++--------- examples/Python/mpc/inverted-pendulum-mpc.py | 99 +++++++-------- 2 files changed, 112 insertions(+), 105 deletions(-) diff --git a/examples/Python/mpc/hanging-chain/hanging-chain-mpc.py b/examples/Python/mpc/hanging-chain/hanging-chain-mpc.py index 3ca514c0ec..e19c200764 100644 --- a/examples/Python/mpc/hanging-chain/hanging-chain-mpc.py +++ b/examples/Python/mpc/hanging-chain/hanging-chain-mpc.py @@ -38,7 +38,8 @@ y_init = cs.MX.sym("y_init", *y_null.shape) # Initial state model_params = cs.MX.sym("params", *model.params.shape) # Parameters -U = cs.MX.sym("U", dim * N_horiz) # Control signals over horizon +num_var = dim * N_horiz +U = cs.MX.sym("U", num_var) # Control signals over horizon U_mat = model.input_to_matrix(U) # Input as dim by N_horiz matrix constr_param = cs.MX.sym("c", 3) # Coefficients of cubic constraint function mpc_param = cs.vertcat(y_init, model_params, constr_param) # All parameters @@ -52,7 +53,7 @@ # Accumulate the cost of the outputs and inputs mpc_y_cost = cs.sum2(stage_y_cost.map(N_horiz)(mpc_sim)) mpc_u_cost = cs.sum2(stage_u_cost.map(N_horiz)(U_mat)) -mpc_cost_fun = cs.Function('f_mpc', [U, mpc_param], [mpc_y_cost + mpc_u_cost]) +mpc_cost = mpc_y_cost + mpc_u_cost # Constraints @@ -68,51 +69,49 @@ constr += [y_c[-1] - g_constr(constr_param, y_c[-dim])] # Ball N+1 constr_fun = cs.Function("c", [y_c, constr_param], [cs.vertcat(*constr)]) # Constraint function for all stages in the horizon -mpc_constr = constr_fun.map(N_horiz)(mpc_sim, constr_param) -mpc_constr_fun = cs.Function("g_mpc", [U, mpc_param], [cs.vec(mpc_constr)]) +mpc_constr = cs.vec(constr_fun.map(N_horiz)(mpc_sim, constr_param)) +num_constr = (N + 1) * N_horiz # Fill in the constraint coefficients c(x-a)³ + d(x-a) + b a, b, c, d = 0.6, -1.4, 5, 2.2 constr_coeff = [c, -3 * a * c, 3 * a * a * c + d] constr_lb = b - c * a**3 - d * a +# Box constraints on actuator: +C = -1 * np.ones(num_var), +1 * np.ones(num_var) # lower bound, upper bound +# Constant term of the cubic state constraints as a one-sided box: +D = constr_lb * np.ones(num_constr), +np.inf * np.ones(num_constr) # %% NLP formulation -import alpaqa.casadi_loader as cl +import alpaqa -# Generate C code for the cost and constraint function, compile them, and load +# Generate C code for the cost and constraint functions, compile them, and load # them as an alpaqa problem description: -problem = cl.generate_and_compile_casadi_problem( - f=mpc_cost_fun, - g=mpc_constr_fun, - sym=cs.MX.sym, -) -# Box constraints on actuator: -problem.C.lowerbound = -1 * np.ones((dim * N_horiz, )) -problem.C.upperbound = +1 * np.ones((dim * N_horiz, )) -# Constant term of the cubic state constraints: -problem.D.lowerbound = constr_lb * np.ones((problem.m, )) +problem = ( + alpaqa.minimize(mpc_cost, U) # objective and variables f(x; p) + .subject_to_box(C) # box constraints on variables x ∊ C + .subject_to(mpc_constr, D) # general constraints g(x; p) ∊ D + .with_param(mpc_param) # parameter to be changed later p +).compile(sym=cs.MX.sym) # %% NLP solver -import alpaqa as pa from datetime import timedelta # Configure an alpaqa solver: -solver = pa.ALMSolver( +solver = alpaqa.ALMSolver( alm_params={ - 'tolerance': 1e-4, - 'dual_tolerance': 1e-4, - 'initial_penalty': 1e4, - 'max_iter': 100, - 'max_time': timedelta(seconds=0.2), - 'max_total_num_retries': 0, + "tolerance": 1e-3, + "dual_tolerance": 1e-3, + "initial_penalty": 1e4, + "max_iter": 100, + "max_time": timedelta(seconds=0.2), }, - inner_solver=pa.PANOCSolver( + inner_solver=alpaqa.PANOCSolver( panoc_params={ - 'stop_crit': pa.ProjGradNorm2, - 'max_time': timedelta(seconds=0.02), + "stop_crit": alpaqa.FPRNorm, + "max_time": timedelta(seconds=0.02), }, - lbfgs_params={'memory': N_horiz}, + lbfgs_params={"memory": N_horiz}, ), ) @@ -122,35 +121,36 @@ # Wrap the solver in a class that solves the optimal control problem at each # time step, implementing warm starting: class MPCController: - - def __init__(self, model: HangingChain, problem: pa.CasADiProblem): + def __init__(self, model: HangingChain, problem: alpaqa.CasADiProblem): self.model = model self.problem = problem self.tot_it = 0 self.tot_time = timedelta() self.max_time = timedelta() self.failures = 0 - self.U = np.zeros((N_horiz * dim, )) - self.λ = np.zeros(((N + 1) * N_horiz, )) + self.U = np.zeros(problem.n) + self.λ = np.zeros(problem.m) def __call__(self, y_n): y_n = np.array(y_n).ravel() # Set the current state as the initial state - self.problem.param[:y_n.shape[0]] = y_n + self.problem.param[: y_n.shape[0]] = y_n # Shift over the previous solution and Lagrange multipliers self.U = np.concatenate((self.U[dim:], self.U[-dim:])) - self.λ = np.concatenate((self.λ[N + 1:], self.λ[-N - 1:])) + self.λ = np.concatenate((self.λ[N + 1 :], self.λ[-N - 1 :])) # Solve the optimal control problem # (warm start using the shifted previous solution and multipliers) self.U, self.λ, stats = solver(self.problem, self.U, self.λ) # Print some solver statistics - print(stats['status'], stats['outer_iterations'], - stats['inner']['iterations'], stats['elapsed_time'], - stats['inner_convergence_failures']) - self.tot_it += stats['inner']['iterations'] - self.failures += stats['status'] != pa.SolverStatus.Converged - self.tot_time += stats['elapsed_time'] - self.max_time = max(self.max_time, stats['elapsed_time']) + print( + f'{stats["status"]} outer={stats["outer_iterations"]} ' + f'inner={stats["inner"]["iterations"]} time={stats["elapsed_time"]} ' + f'failures={stats["inner_convergence_failures"]}' + ) + self.tot_it += stats["inner"]["iterations"] + self.failures += stats["status"] != alpaqa.SolverStatus.Converged + self.tot_time += stats["elapsed_time"] + self.max_time = max(self.max_time, stats["elapsed_time"]) # Print the Lagrange multipliers, shows that constraints are active print(np.linalg.norm(self.λ)) # Return the optimal control signal for the first time step @@ -174,9 +174,11 @@ def __call__(self, y_n): y_mpc[:, n] = y_n y_mpc = np.hstack((y_dist, y_mpc)) -print(f"{controller.tot_it} iterations, {controller.failures} failures") -print(f"time: {controller.tot_time} (total), {controller.max_time} (max), " - f"{controller.tot_time / N_sim} (avg)") +print(f"{controller.tot_it} inner iterations, {controller.failures} failures") +print( + f"time: {controller.tot_time} (total), {controller.max_time} (max), " + f"{controller.tot_time / N_sim} (avg)" +) # %% Visualize the results @@ -184,13 +186,13 @@ def __call__(self, y_n): import matplotlib as mpl from matplotlib import animation, patheffects -mpl.rcParams['animation.frame_format'] = 'svg' +mpl.rcParams["animation.frame_format"] = "svg" # Plot the chains fig, ax = plt.subplots() x, y, z = model.state_to_pos(y_null) -line, = ax.plot(x, y, '-o', label='Without MPC') -line_ctrl, = ax.plot(x, y, '-o', label='With MPC') +(line,) = ax.plot(x, y, "-o", label="Without MPC") +(line_ctrl,) = ax.plot(x, y, "-o", label="With MPC") plt.legend() plt.ylim([-2.5, 1]) plt.xlim([-0.25, 1.25]) @@ -201,7 +203,7 @@ def __call__(self, y_n): X, Y = np.meshgrid(x, y) Z = g_constr(constr_coeff, X) + constr_lb - Y fx = [patheffects.withTickedStroke(spacing=7, linewidth=0.8)] -cgc = plt.contour(X, Y, Z, [0], colors='tab:green', linewidths=0.8) +cgc = plt.contour(X, Y, Z, [0], colors="tab:green", linewidths=0.8) plt.setp(cgc.collections, path_effects=fx) @@ -210,6 +212,7 @@ class Animation: def __call__(self, i): x, y, z = model.state_to_pos(y_sim[:, i]) + y = z if dim == 3 else y for p in self.points: p.remove() self.points = [] @@ -217,22 +220,25 @@ def __call__(self, i): line.set_ydata(y) viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb if np.sum(viol): - self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12) + self.points += ax.plot(x[viol], y[viol], "rx", markersize=12) x, y, z = model.state_to_pos(y_mpc[:, i]) + y = z if dim == 3 else y line_ctrl.set_xdata(x) line_ctrl.set_ydata(y) viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb if np.sum(viol): - self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12) + self.points += ax.plot(x[viol], y[viol], "rx", markersize=12) return [line, line_ctrl] + self.points -ani = animation.FuncAnimation(fig, - Animation(), - interval=1000 * Ts, - blit=True, - repeat=True, - frames=1 + N_dist + N_sim) +ani = animation.FuncAnimation( + fig, + Animation(), + interval=1000 * Ts, + blit=True, + repeat=True, + frames=1 + N_dist + N_sim, +) # Show the animation plt.show() diff --git a/examples/Python/mpc/inverted-pendulum-mpc.py b/examples/Python/mpc/inverted-pendulum-mpc.py index 3575896ab0..4915a478bb 100644 --- a/examples/Python/mpc/inverted-pendulum-mpc.py +++ b/examples/Python/mpc/inverted-pendulum-mpc.py @@ -27,7 +27,7 @@ N_sim = 300 # Simulation length [time steps] # Continous-time dynamics -a_pend = (l_pend * cs.sin(θ) * ω**2 - g_gravity * cs.cos(θ) * cs.sin(θ)) +a_pend = l_pend * cs.sin(θ) * ω**2 - g_gravity * cs.cos(θ) * cs.sin(θ) a = (F - b_cart * v + m_pend * a_pend) / (m_cart + m_pend) α = (g_gravity * cs.sin(θ) - a * cs.cos(θ)) / l_pend f_c = cs.Function("f_c", [state, F], [cs.vertcat(ω, α, v, a)]) @@ -64,43 +64,42 @@ mpc_y_cost = cs.sum2(stage_cost_x.map(N_horiz - 1)(mpc_x[:, :-1], Q)) mpc_u_cost = cs.sum2(stage_cost_u.map(N_horiz)(mpc_u, R)) mpc_terminal_cost = terminal_cost_x(mpc_x[:, -1], Qf) -mpc_cost_fun = cs.Function('f_mpc', [cs.vec(mpc_u), mpc_param], - [mpc_y_cost + mpc_u_cost + mpc_terminal_cost]) +mpc_cost = mpc_y_cost + mpc_u_cost + mpc_terminal_cost + +# Box constraints on the actuator force: +C = -F_max * np.ones(N_horiz), +F_max * np.ones(N_horiz) # Compile into an alpaqa problem -from alpaqa import casadi_loader as cl +import alpaqa # Generate C code for the cost function, compile it, and load it as an # alpaqa problem description: -problem = cl.generate_and_compile_casadi_problem( - f=mpc_cost_fun, - g=None, - sym=cs.MX.sym, -) -# Box constraints on the actuator force: -problem.C.lowerbound = -F_max * np.ones((N_horiz, )) -problem.C.upperbound = +F_max * np.ones((N_horiz, )) +problem = ( + alpaqa.minimize(mpc_cost, cs.vec(mpc_u)) # objective and variables + .subject_to_box(C) # box constraints on the variables + .with_param(mpc_param) # parameters to be changed later +).compile(sym=cs.MX.sym) -import alpaqa as pa from datetime import timedelta # Configure an alpaqa solver: -Solver = pa.PANOCSolver +Solver = alpaqa.PANOCSolver inner_solver = Solver( panoc_params={ - 'max_time': timedelta(seconds=Ts), - 'max_iter': 200, - 'stop_crit': pa.PANOCStopCrit.ProjGradUnitNorm2, + "max_time": timedelta(seconds=Ts), + "max_iter": 200, + "stop_crit": alpaqa.PANOCStopCrit.ProjGradUnitNorm2, }, - direction=pa.StructuredLBFGSDirection( - lbfgs_params={'memory': 15}, - direction_params={'hessian_vec_factor': 0}, + direction=alpaqa.StructuredLBFGSDirection( + lbfgs_params={"memory": 15}, + direction_params={"hessian_vec_factor": 0}, ), ) -alm_params={ - 'tolerance': 1e-4,'initial_tolerance': 1e-4, +alm_params = { + "tolerance": 1e-4, + "initial_tolerance": 1e-4, } -solver = pa.ALMSolver( +solver = alpaqa.ALMSolver( alm_params=alm_params, inner_solver=inner_solver, ) @@ -108,17 +107,17 @@ # %% Controller class + # Wrap the solver in a class that solves the optimal control problem at each # time step, implementing warm starting: class MPCController: - - def __init__(self, problem: pa.CasADiProblem): + def __init__(self, problem: alpaqa.CasADiProblem): self.problem = problem self.tot_it = 0 self.tot_time = timedelta() self.max_time = timedelta() self.failures = 0 - self.u = np.ones((nu * N_horiz, )) + self.u = np.ones(problem.n) def __call__(self, state_n): state_n = np.array(state_n).ravel() @@ -130,13 +129,16 @@ def __call__(self, state_n): # (warm start using the shifted previous solution) self.u, _, stats = solver(self.problem, self.u) # Print some solver statistics - print(f"{stats['status']!s:24} {stats['inner']['iterations']:4} " - f"{stats['elapsed_time']} {stats['inner_convergence_failures']} " - f"{stats['ε']:1.3e} {stats['inner']['final_γ']:1.3e}") - self.tot_it += stats['inner']['iterations'] - self.failures += stats['status'] != pa.SolverStatus.Converged - self.tot_time += stats['elapsed_time'] - self.max_time = max(self.max_time, stats['elapsed_time']) + print( + f"{stats['status']!s:24} iter={stats['inner']['iterations']:4} " + f"time={stats['elapsed_time']} " + f"failures={stats['inner_convergence_failures']} " + f"tol={stats['ε']:1.3e} stepsize={stats['inner']['final_γ']:1.3e}" + ) + self.tot_it += stats["inner"]["iterations"] + self.failures += stats["status"] != alpaqa.SolverStatus.Converged + self.tot_time += stats["elapsed_time"] + self.max_time = max(self.max_time, stats["elapsed_time"]) # Return the optimal control signal for the first time step return self.u[:nu] @@ -164,8 +166,10 @@ def __call__(self, state_n): mpc_states[:, n + 1] = f_d(mpc_states[:, n], mpc_inputs[:, n]).T print(f"{controller.tot_it} iterations, {controller.failures} failures") -print(f"time: {controller.tot_time} (total), {controller.max_time} (max), " - f"{controller.tot_time / N_sim} (avg)") +print( + f"time: {controller.tot_time} (total), {controller.max_time} (max), " + f"{controller.tot_time / N_sim} (avg)" +) # %% Visualize the results @@ -174,30 +178,29 @@ def __call__(self, state_n): import matplotlib as mpl from matplotlib import animation -mpl.rcParams['animation.frame_format'] = 'svg' +mpl.rcParams["animation.frame_format"] = "svg" # Plot the cart and pendulum fig, ax = plt.subplots() h = 0.04 x = [state_0[2], state_0[2] + l_pend * np.sin(state_0[0])] y = [h, h + l_pend * np.cos(state_0[0])] -target_pend, = ax.plot(x, y, '--', color='tab:blue', linewidth=1, label='Initial state') +(target_pend,) = ax.plot(x, y, "--", color="tab:blue", label="Initial state") state_N = [0, 0, 0, 0] x = [state_N[2], state_N[2] + l_pend * np.sin(state_N[0])] y = [h, h + l_pend * np.cos(state_N[0])] -target_pend, = ax.plot(x, y, '--', color='tab:green', linewidth=1, label='Target state') -pend, = ax.plot(x, y, '-o', color='tab:orange', alpha=0.9, label='MPC') -cart = plt.Rectangle((-2 * h, 0), 4 * h, h, color='tab:orange') +(target_pend,) = ax.plot(x, y, "--", color="tab:green", label="Target state") +(pend,) = ax.plot(x, y, "-o", color="tab:orange", alpha=0.9, label="MPC") +cart = plt.Rectangle((-2 * h, 0), 4 * h, h, color="tab:orange") ax.add_patch(cart) plt.legend() plt.ylim([-l_pend + h, l_pend + 2 * h]) plt.xlim([-1.5 * l_pend, +1.5 * l_pend]) -plt.gca().set_aspect('equal', 'box') +plt.gca().set_aspect("equal", "box") plt.tight_layout() class Animation: - def __call__(self, n): state_n = mpc_states[:, n] x = [state_n[2], state_n[2] + l_pend * np.sin(state_n[0])] @@ -208,19 +211,17 @@ def __call__(self, n): return [pend, cart] -ani = animation.FuncAnimation(fig, - Animation(), - interval=1000 * Ts, - blit=True, - repeat=True, - frames=N_sim) +ani = animation.FuncAnimation( + fig, Animation(), interval=1000 * Ts, blit=True, repeat=True, frames=N_sim +) fig, axs = plt.subplots(5, sharex=True, figsize=(6, 9)) ts = np.arange(N_sim + 1) * Ts labels = [ "Angle $\\theta$ [rad]", "Angular velocity $\\omega$ [rad/s]", - "Position $x$ [m]", "Velocity $v$ [m/s]", + "Position $x$ [m]", + "Velocity $v$ [m/s]", ] for i, (ax, lbl) in enumerate(zip(axs[:-1], labels)): ax.plot(ts, mpc_states[i, :])