Skip to content

Commit

Permalink
[Py] update MPC examples
Browse files Browse the repository at this point in the history
  • Loading branch information
tttapa committed Oct 19, 2023
1 parent a1c4c9d commit 35b55dd
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 105 deletions.
118 changes: 62 additions & 56 deletions examples/Python/mpc/hanging-chain/hanging-chain-mpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@

y_init = cs.MX.sym("y_init", *y_null.shape) # Initial state
model_params = cs.MX.sym("params", *model.params.shape) # Parameters
U = cs.MX.sym("U", dim * N_horiz) # Control signals over horizon
num_var = dim * N_horiz
U = cs.MX.sym("U", num_var) # Control signals over horizon
U_mat = model.input_to_matrix(U) # Input as dim by N_horiz matrix
constr_param = cs.MX.sym("c", 3) # Coefficients of cubic constraint function
mpc_param = cs.vertcat(y_init, model_params, constr_param) # All parameters
Expand All @@ -52,7 +53,7 @@
# Accumulate the cost of the outputs and inputs
mpc_y_cost = cs.sum2(stage_y_cost.map(N_horiz)(mpc_sim))
mpc_u_cost = cs.sum2(stage_u_cost.map(N_horiz)(U_mat))
mpc_cost_fun = cs.Function('f_mpc', [U, mpc_param], [mpc_y_cost + mpc_u_cost])
mpc_cost = mpc_y_cost + mpc_u_cost

# Constraints

Expand All @@ -68,51 +69,49 @@
constr += [y_c[-1] - g_constr(constr_param, y_c[-dim])] # Ball N+1
constr_fun = cs.Function("c", [y_c, constr_param], [cs.vertcat(*constr)])
# Constraint function for all stages in the horizon
mpc_constr = constr_fun.map(N_horiz)(mpc_sim, constr_param)
mpc_constr_fun = cs.Function("g_mpc", [U, mpc_param], [cs.vec(mpc_constr)])
mpc_constr = cs.vec(constr_fun.map(N_horiz)(mpc_sim, constr_param))
num_constr = (N + 1) * N_horiz
# Fill in the constraint coefficients c(x-a)³ + d(x-a) + b
a, b, c, d = 0.6, -1.4, 5, 2.2
constr_coeff = [c, -3 * a * c, 3 * a * a * c + d]
constr_lb = b - c * a**3 - d * a
# Box constraints on actuator:
C = -1 * np.ones(num_var), +1 * np.ones(num_var) # lower bound, upper bound
# Constant term of the cubic state constraints as a one-sided box:
D = constr_lb * np.ones(num_constr), +np.inf * np.ones(num_constr)

# %% NLP formulation

import alpaqa.casadi_loader as cl
import alpaqa

# Generate C code for the cost and constraint function, compile them, and load
# Generate C code for the cost and constraint functions, compile them, and load
# them as an alpaqa problem description:
problem = cl.generate_and_compile_casadi_problem(
f=mpc_cost_fun,
g=mpc_constr_fun,
sym=cs.MX.sym,
)
# Box constraints on actuator:
problem.C.lowerbound = -1 * np.ones((dim * N_horiz, ))
problem.C.upperbound = +1 * np.ones((dim * N_horiz, ))
# Constant term of the cubic state constraints:
problem.D.lowerbound = constr_lb * np.ones((problem.m, ))
problem = (
alpaqa.minimize(mpc_cost, U) # objective and variables f(x; p)
.subject_to_box(C) # box constraints on variables x ∊ C
.subject_to(mpc_constr, D) # general constraints g(x; p) ∊ D
.with_param(mpc_param) # parameter to be changed later p
).compile(sym=cs.MX.sym)

# %% NLP solver

import alpaqa as pa
from datetime import timedelta

# Configure an alpaqa solver:
solver = pa.ALMSolver(
solver = alpaqa.ALMSolver(
alm_params={
'tolerance': 1e-4,
'dual_tolerance': 1e-4,
'initial_penalty': 1e4,
'max_iter': 100,
'max_time': timedelta(seconds=0.2),
'max_total_num_retries': 0,
"tolerance": 1e-3,
"dual_tolerance": 1e-3,
"initial_penalty": 1e4,
"max_iter": 100,
"max_time": timedelta(seconds=0.2),
},
inner_solver=pa.PANOCSolver(
inner_solver=alpaqa.PANOCSolver(
panoc_params={
'stop_crit': pa.ProjGradNorm2,
'max_time': timedelta(seconds=0.02),
"stop_crit": alpaqa.FPRNorm,
"max_time": timedelta(seconds=0.02),
},
lbfgs_params={'memory': N_horiz},
lbfgs_params={"memory": N_horiz},
),
)

Expand All @@ -122,35 +121,36 @@
# Wrap the solver in a class that solves the optimal control problem at each
# time step, implementing warm starting:
class MPCController:

def __init__(self, model: HangingChain, problem: pa.CasADiProblem):
def __init__(self, model: HangingChain, problem: alpaqa.CasADiProblem):
self.model = model
self.problem = problem
self.tot_it = 0
self.tot_time = timedelta()
self.max_time = timedelta()
self.failures = 0
self.U = np.zeros((N_horiz * dim, ))
self.λ = np.zeros(((N + 1) * N_horiz, ))
self.U = np.zeros(problem.n)
self.λ = np.zeros(problem.m)

def __call__(self, y_n):
y_n = np.array(y_n).ravel()
# Set the current state as the initial state
self.problem.param[:y_n.shape[0]] = y_n
self.problem.param[: y_n.shape[0]] = y_n
# Shift over the previous solution and Lagrange multipliers
self.U = np.concatenate((self.U[dim:], self.U[-dim:]))
self.λ = np.concatenate((self.λ[N + 1:], self.λ[-N - 1:]))
self.λ = np.concatenate((self.λ[N + 1 :], self.λ[-N - 1 :]))
# Solve the optimal control problem
# (warm start using the shifted previous solution and multipliers)
self.U, self.λ, stats = solver(self.problem, self.U, self.λ)
# Print some solver statistics
print(stats['status'], stats['outer_iterations'],
stats['inner']['iterations'], stats['elapsed_time'],
stats['inner_convergence_failures'])
self.tot_it += stats['inner']['iterations']
self.failures += stats['status'] != pa.SolverStatus.Converged
self.tot_time += stats['elapsed_time']
self.max_time = max(self.max_time, stats['elapsed_time'])
print(
f'{stats["status"]} outer={stats["outer_iterations"]} '
f'inner={stats["inner"]["iterations"]} time={stats["elapsed_time"]} '
f'failures={stats["inner_convergence_failures"]}'
)
self.tot_it += stats["inner"]["iterations"]
self.failures += stats["status"] != alpaqa.SolverStatus.Converged
self.tot_time += stats["elapsed_time"]
self.max_time = max(self.max_time, stats["elapsed_time"])
# Print the Lagrange multipliers, shows that constraints are active
print(np.linalg.norm(self.λ))
# Return the optimal control signal for the first time step
Expand All @@ -174,23 +174,25 @@ def __call__(self, y_n):
y_mpc[:, n] = y_n
y_mpc = np.hstack((y_dist, y_mpc))

print(f"{controller.tot_it} iterations, {controller.failures} failures")
print(f"time: {controller.tot_time} (total), {controller.max_time} (max), "
f"{controller.tot_time / N_sim} (avg)")
print(f"{controller.tot_it} inner iterations, {controller.failures} failures")
print(
f"time: {controller.tot_time} (total), {controller.max_time} (max), "
f"{controller.tot_time / N_sim} (avg)"
)

# %% Visualize the results

import matplotlib.pyplot as plt
import matplotlib as mpl
from matplotlib import animation, patheffects

mpl.rcParams['animation.frame_format'] = 'svg'
mpl.rcParams["animation.frame_format"] = "svg"

# Plot the chains
fig, ax = plt.subplots()
x, y, z = model.state_to_pos(y_null)
line, = ax.plot(x, y, '-o', label='Without MPC')
line_ctrl, = ax.plot(x, y, '-o', label='With MPC')
(line,) = ax.plot(x, y, "-o", label="Without MPC")
(line_ctrl,) = ax.plot(x, y, "-o", label="With MPC")
plt.legend()
plt.ylim([-2.5, 1])
plt.xlim([-0.25, 1.25])
Expand All @@ -201,7 +203,7 @@ def __call__(self, y_n):
X, Y = np.meshgrid(x, y)
Z = g_constr(constr_coeff, X) + constr_lb - Y
fx = [patheffects.withTickedStroke(spacing=7, linewidth=0.8)]
cgc = plt.contour(X, Y, Z, [0], colors='tab:green', linewidths=0.8)
cgc = plt.contour(X, Y, Z, [0], colors="tab:green", linewidths=0.8)
plt.setp(cgc.collections, path_effects=fx)


Expand All @@ -210,29 +212,33 @@ class Animation:

def __call__(self, i):
x, y, z = model.state_to_pos(y_sim[:, i])
y = z if dim == 3 else y
for p in self.points:
p.remove()
self.points = []
line.set_xdata(x)
line.set_ydata(y)
viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
if np.sum(viol):
self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12)
self.points += ax.plot(x[viol], y[viol], "rx", markersize=12)
x, y, z = model.state_to_pos(y_mpc[:, i])
y = z if dim == 3 else y
line_ctrl.set_xdata(x)
line_ctrl.set_ydata(y)
viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
if np.sum(viol):
self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12)
self.points += ax.plot(x[viol], y[viol], "rx", markersize=12)
return [line, line_ctrl] + self.points


ani = animation.FuncAnimation(fig,
Animation(),
interval=1000 * Ts,
blit=True,
repeat=True,
frames=1 + N_dist + N_sim)
ani = animation.FuncAnimation(
fig,
Animation(),
interval=1000 * Ts,
blit=True,
repeat=True,
frames=1 + N_dist + N_sim,
)

# Show the animation
plt.show()
Loading

0 comments on commit 35b55dd

Please sign in to comment.