Tim-Salzmann/l4casadi

Using a neural network as the terminal cost with l4casadi + acados

Closed this issue · 3 comments

Hey,

Not an issue but a question. I'm hoping to get confirmation that it is possible to use a neural network as the terminal cost for an MPC built through l4casadi + acados. This would be similar to what's been done in the AC4MPC paper, however I couldn't locate their code.

I'd like to do something similar to ocp.model.cost_expr_ext_cost_e = l4c_y_expr.

Thanks in advance!

Update:

I set the external cost to a l4casadi model using the code below, can someone confirm that this is the correct idea?

import casadi as cs
import numpy as np
import torch
import l4casadi as l4c
from acados_template import AcadosOcpSolver, AcadosOcp, AcadosModel
import time

COST = 'NONLINEAR_LS'


class MultiLayerPerceptron(torch.nn.Module):
    def __init__(self):
        super().__init__()

        self.input_layer = torch.nn.Linear(2, 512)

        hidden_layers = []
        for i in range(2):
            hidden_layers.append(torch.nn.Linear(512, 512))

        self.hidden_layer = torch.nn.ModuleList(hidden_layers)
        self.out_layer = torch.nn.Linear(512, 1)

        # Model is not trained -- setting output to zero
        with torch.no_grad():
            self.out_layer.bias.fill_(0.)
            self.out_layer.weight.fill_(0.)

    def forward(self, x):
        x = self.input_layer(x)
        for layer in self.hidden_layer:
            x = torch.tanh(layer(x))
        x = self.out_layer(x)
        return x


class DoubleIntegratorWithLearnedDynamics:
    def __init__(self, learned_dyn):
        self.learned_dyn = learned_dyn

    def model(self):
        s = cs.MX.sym('s', 1)
        s_dot = cs.MX.sym('s_dot', 1)
        s_dot_dot = cs.MX.sym('s_dot_dot', 1)
        u = cs.MX.sym('u', 1)
        x = cs.vertcat(s, s_dot)
        x_dot = cs.vertcat(s_dot, s_dot_dot)

        res_model = self.learned_dyn(x)

        f_expl = cs.vertcat(
            s_dot,
            u
        ) + res_model

        x_start = np.zeros((2, ))

        # store to struct
        model = cs.types.SimpleNamespace()
        model.x = x
        model.xdot = x_dot
        model.u = u
        model.z = cs.vertcat([])
        model.p = cs.vertcat([])
        model.f_expl = f_expl
        model.x_start = x_start
        model.constraints = cs.vertcat([])
        model.name = "wr"

        return model


class MPC:
    def __init__(self, model, N, external_shared_lib_dir, external_shared_lib_name):
        self.N = N
        self.model = model
        self.external_shared_lib_dir = external_shared_lib_dir
        self.external_shared_lib_name = external_shared_lib_name

    @property
    def solver(self):
        return AcadosOcpSolver(self.ocp())

    def ocp(self):
        model = self.model

        t_horizon = 1.
        N = self.N

        # Get model
        model_ac = self.acados_model(model=model)
        model_ac.p = model.p

        # Dimensions
        nx = 2
        nu = 1
        # ny = 1

        # Create OCP object to formulate the optimization
        ocp = AcadosOcp()
        ocp.model = model_ac
        ocp.dims.N = N
        ocp.dims.nx = nx
        ocp.dims.nu = nu
        # ocp.dims.ny = ny
        ocp.solver_options.tf = t_horizon

        if COST == 'LINEAR_LS':
            # Initialize cost function
            ocp.cost.cost_type = 'LINEAR_LS'
            ocp.cost.cost_type_e = 'LINEAR_LS'

            ocp.cost.W = np.array([[1.]])

            ocp.cost.Vx = np.zeros((ny, nx))
            ocp.cost.Vx[0, 0] = 1.
            ocp.cost.Vu = np.zeros((ny, nu))
            o
```cp.cost.Vz = np.array([[]])
            ocp.cost.Vx_e = np.zeros((ny, nx))

            l4c_y_expr = None
        else:
            ocp.cost.cost_type = 'EXTERNAL'
            ocp.cost.cost_type_e = 'EXTERNAL'

            x = ocp.model.x

            # Trivial PyTorch index 0
            l4c_y_expr = l4c.L4CasADi(lambda x: x.reshape((2,1)), name='x_expr')
            ocp.model.cost_expr_ext_cost = l4c_y_expr(x)
            ocp.model.cost_expr_ext_cost_e = l4c_y_expr(x)

        # ocp.cost.W_e = np.array([[0.]])
        # ocp.cost.yref_e = np.array([0.])

        # Initial reference trajectory (will be overwritten)
        # ocp.cost.yref = np.zeros(1)

        # Initial state (will be overwritten)
        ocp.constraints.x0 = model.x_start

        # Set constraints
        a_max = 10
        ocp.constraints.lbu = np.array([-a_max])
        ocp.constraints.ubu = np.array([a_max])
        ocp.constraints.idxbu = np.array([0])

        # Solver options
        ocp.solver_options.qp_solver = 'FULL_CONDENSING_HPIPM'
        ocp.solver_options.hessian_approx = 'GAUSS_NEWTON'
        ocp.solver_options.integrator_type = 'ERK'
        ocp.solver_options.nlp_solver_type = 'SQP_RTI'
        ocp.solver_options.model_external_shared_lib_dir = self.external_shared_lib_dir
        if COST == 'LINEAR_LS':
            ocp.solver_options.model_external_shared_lib_name = self.external_shared_lib_name
        else:
            ocp.solver_options.model_external_shared_lib_name = self.external_shared_lib_name + ' -l' + l4c_y_expr.name

        return ocp

    def acados_model(self, model):
        model_ac = AcadosModel()
        model_ac.f_impl_expr = model.xdot - model.f_expl
        model_ac.f_expl_expr = model.f_expl
        model_ac.x = model.x
        model_ac.xdot = model.xdot
        model_ac.u = model.u
        model_ac.name = model.name
        return model_ac


def run():
    N = 10
    learned_dyn_model = l4c.L4CasADi(MultiLayerPerceptron(), model_expects_batch_dim=True, name='learned_dyn')

    model = DoubleIntegratorWithLearnedDynamics(learned_dyn_model)
    solver = MPC(model=model.model(), N=N,
                 external_shared_lib_dir=learned_dyn_model.shared_lib_dir,
                 external_shared_lib_name=learned_dyn_model.name).solver

    x = []
    x_ref = []
    ts = 1. / N
    xt = np.array([1., 0.])
    opt_times = []

    for i in range(50):
        now = time.time()
        t = np.linspace(i * ts, i * ts + 1., 10)
        yref = np.sin(0.5 * t + np.pi / 2)
        x_ref.append(yref[0])
        for t, ref in enumerate(yref):
            solver.set(t, "yref", ref)
        solver.set(0, "lbx", xt)
        solver.set(0, "ubx", xt)
        solver.solve()
        xt = solver.get(1, "x")
        x.append(xt)

        x_l = []
        for i in range(N):
            x_l.append(solver.get(i, "x"))

        elapsed = time.time() - now
        opt_times.append(elapsed)

    print(f'Mean iteration time: {1000*np.mean(opt_times):.1f}ms -- {1/np.mean(opt_times):.0f}Hz)')


if __name__ == '__main__':
    run()

Hi,

Thanks for reaching out! Could you clarify what your problem is? Is the code giving you an error / are the results not what you expect?

Best
Tim

Hey Tim,

The solver was throwing an error but I wasn't sure if it was because of the additional pytorch model or other parts of my implementation. Turns out it was a bug on my end. Thanks though!