import warnings
from functools import wraps
import numpy as np
import matplotlib.pyplot as plt
import casadi
import do_mpc
from .util import FixedKeysDict, SetDict
[docs]
class Simulator(object):
def __init__(self, f, h, dt=0.01,
discrete=False,
n=None, m=None,
state_names=None, input_names=None, measurement_names=None,
params_simulator=None, mpc_horizon=10):
""" Simulator.
:param callable f: dynamics function f(X, U, t)
:param callable h: measurement function h(X, U, t)
:param float dt: sampling time in seconds
:param int n: number of states, optional but cannot be set if state_names is set
:param int m: number of inputs, optional but cannot be set if input_names is set
:param list state_names: names of states, optional but cannot be set if n is set
:param list input_names: names of inputs, optional but cannot be set if m is set
:param list measurement_names: names of measurements, optional
:param dict params_simulator: simulation parameters, optional
"""
self.f = f
self.h = ensure_float_output(h)
self.dt = dt
# Set state names
if state_names is None: # default state names
if n is None:
raise ValueError('must set state_names or n')
else:
self.n = int(n)
self.state_names = ['x_' + str(n) for n in range(self.n)]
else: # state names given
if n is not None:
raise ValueError('cannot set state_names and n')
self.state_names = list(state_names)
self.n = len(self.state_names)
if len(self.state_names) != self.n:
raise ValueError('state_names must have length equal to x0')
# Set input names
if input_names is None: # default input names
if m is None:
raise ValueError('must set input_names or m')
else:
self.m = int(m)
self.input_names = ['u_' + str(m) for m in range(self.m)]
else: # input names given
if m is not None:
raise ValueError('cannot set input_names and m')
self.input_names = list(input_names)
self.m = len(self.input_names)
if len(self.input_names) != self.m:
raise ValueError('input_names must have length equal to u0')
# Run measurement function to get measurement size
x0 = np.ones(self.n)
u0 = np.ones(self.m)
y = self.h(np.ravel(x0), np.ravel(u0))
self.p = len(y) # number of measurements
# Set measurement names
if measurement_names is None: # default measurement names
self.measurement_names = ['y_' + str(p) for p in range(self.p)]
else:
self.measurement_names = measurement_names
if len(self.measurement_names) != self.p:
raise ValueError('measurement_names must have length equal to y')
# Initialize time vector
self.w = 11 # initialize for w time-steps, but this can change later
self.time = np.arange(0, self.w * self.dt, step=self.dt) # time vector
# Define initial states & initialize state time-series
self.x0 = {}
self.x = {}
for n, state_name in enumerate(self.state_names):
self.x0[state_name] = x0[n]
self.x[state_name] = x0[n] * np.ones(self.w)
self.x0 = FixedKeysDict(self.x0)
self.x = FixedKeysDict(self.x)
# Initialize input time-series
self.u = {}
for m, input_name in enumerate(self.input_names):
self.u[input_name] = u0[m] * np.ones(self.w)
self.u = FixedKeysDict(self.u)
# Initialize measurement time-series
self.y = {}
for p, measurement_name in enumerate(self.measurement_names):
self.y[measurement_name] = 0.0 * np.ones(self.w)
self.y = FixedKeysDict(self.y)
# Initialize state set-points
self.setpoint = {}
for n, state_name in enumerate(self.state_names):
self.setpoint[state_name] = 0.0 * np.ones(self.w)
self.setpoint = FixedKeysDict(self.setpoint)
# Define MPC model
if discrete:
model_type = 'discrete'
else:
model_type = 'continuous'
self.model = do_mpc.model.Model(model_type)
# Define state variables
X = []
for n, state_name in enumerate(self.state_names):
x = self.model.set_variable(var_type='_x', var_name=state_name, shape=(1, 1))
X.append(x)
# Define input variables
U = []
for m, input_name in enumerate(self.input_names):
u = self.model.set_variable(var_type='_u', var_name=input_name, shape=(1, 1))
U.append(u)
# Define dynamics
Xdot = self.f(X, U)
for n, state_name in enumerate(self.state_names):
self.model.set_rhs(state_name, casadi.SX(Xdot[n]))
# Add time-varying set-point variables for later use with MPC
for n, state_name in enumerate(self.state_names):
x = self.model.set_variable(var_type='_tvp', var_name=state_name + str('_set'), shape=(1, 1))
# Build model
self.model.setup()
# Define simulator & simulator parameters
self.simulator = do_mpc.simulator.Simulator(self.model)
# Set simulation parameters
if params_simulator is None:
if self.model.model_type == 'continuous':
self.params_simulator = {
'integration_tool': 'idas', # cvodes, idas
'abstol': 1e-8,
'reltol': 1e-8,
't_step': self.dt
}
else:
self.params_simulator = {
't_step': self.dt
}
else:
self.params_simulator = params_simulator
self.simulator.set_param(**self.params_simulator)
# Setup MPC
self.mpc = do_mpc.controller.MPC(self.model)
self.mpc_horizon = mpc_horizon
setup_mpc = {
'n_horizon': self.mpc_horizon,
'n_robust': 0,
'open_loop': 0,
't_step': self.dt,
'state_discretization': 'collocation',
'collocation_type': 'radau',
'collocation_deg': 2,
'collocation_ni': 1,
'store_full_solution': False,
# Use MA27 linear solver in ipopt for faster calculations:
'nlpsol_opts': {'ipopt.linear_solver': 'mumps', # mumps, MA27
'ipopt.print_level': 0,
'ipopt.sb': 'yes',
'print_time': 0,
}
}
self.mpc.set_param(**setup_mpc)
# Get template's for MPC time-varying parameters
self.mpc_tvp_template = self.mpc.get_tvp_template()
self.simulator_tvp_template = self.simulator.get_tvp_template()
# Set time-varying set-point functions
self.mpc.set_tvp_fun(self.mpc_tvp_function)
self.simulator.set_tvp_fun(self.simulator_tvp_function)
# Setup simulator
self.simulator.setup()
[docs]
def simulator_tvp_function(self, t):
""" Set the set-point function for MPC simulator.
:param t: current time
"""
mpc_horizon = self.mpc._settings.n_horizon
# Set current step index
t_s = float(np.asarray(t).squeeze())
dt_s = float(np.asarray(self.dt).squeeze())
k_step = int(np.rint(t_s / dt_s))
if k_step >= mpc_horizon: # point is beyond end of input data
k_step = mpc_horizon - 1 # set point beyond input data to last point
# Update current set-point
for n, state_name in enumerate(self.state_names):
self.simulator_tvp_template[state_name + '_set'] = self.setpoint[state_name][k_step]
return self.simulator_tvp_template
[docs]
def mpc_tvp_function(self, t):
""" Set the set-point function for MPC optimizer.
"""
mpc_horizon = self.mpc._settings.n_horizon
# Set current step index
t_s = float(np.asarray(t).squeeze())
dt_s = float(np.asarray(self.dt).squeeze())
k_step = int(np.rint(t_s / dt_s))
# Update set-point time horizon
for k in range(mpc_horizon + 1):
k_set = k_step + k
if k_set >= self.w: # horizon is beyond end of input data
k_set = self.w - 1 # set part of horizon beyond input data to last point
# Update each set-point over time horizon
for n, state_name in enumerate(self.state_names):
self.mpc_tvp_template['_tvp', k, state_name + '_set'] = self.setpoint[state_name][k_set]
return self.mpc_tvp_template
[docs]
def set_initial_state(self, x0):
""" Update the initial state.
"""
if x0 is not None: # initial state given
if isinstance(x0, dict): # in dict format
SetDict().set_dict_with_overwrite(self.x0, x0) # update only the states in the dict given
elif isinstance(x0, list) or isinstance(x0, tuple) or (
x0, np.ndarray): # list, tuple, or numpy array format
x0 = np.array(x0).squeeze()
for n, key in enumerate(self.x0.keys()): # each state
self.x0[key] = x0[n]
else:
raise ValueError('x0 must be either a dict, tuple, list, or numpy array')
[docs]
def update_dict(self, data=None, name=None):
""" Update.
"""
update = getattr(self, name)
if data is not None: # data given
if isinstance(data, dict): # in dict format
SetDict().set_dict_with_overwrite(update, data) # update only the inputs in the dict given
# Normalize unset keys to be the length of the set keys be repeating the 1st element
unset_key = set(update.keys()) - set(data.keys()) # find keys that were not set
set_key = set(data.keys()) # find keys that were set
if unset_key != set_key:
w = data[list(set_key)[0]].squeeze().shape[0] # size of 1st set key
for k in unset_key: # update each unset key
update[k] = update[k][0] * np.ones(w)
elif isinstance(data, list) or isinstance(data, tuple): # list or tuple format, each input vector in each element
for n, k in enumerate(update.keys()): # each state
update[k] = data[n]
elif isinstance(data, np.ndarray): # numpy array format given as matrix where columns are the different inputs
if len(data.shape) <= 1: # given as 1d array, so convert to column vector
data = np.atleast_2d(data).T
for n, key in enumerate(update.keys()): # each input
update[key] = data[:, n]
else:
raise ValueError(f'{name} must be either a dict, tuple, list, or numpy array')
# Make sure inputs are the same size
points = np.array([update[key].shape[0] for key in update.keys()])
points_check = points == points[0]
if not np.all(points_check):
raise ValueError(f'{name} inputs are not all the same length')
[docs]
def simulate(self, x0=None, u=None, aux=None, mpc=False, return_full_output=False):
"""
Simulate the system.
:param x0: initial state dict or array
:param u: input dict or array, if True then mpc must be None
:param aux: auxiliary input
:param mpc: boolean to run MPC, if True then u must be None
:param return_full_output: boolean to run (time, x, u, y) instead of y
"""
if (mpc is True) and (u is not None):
raise Exception('u must be None if running MPC')
if (mpc is False) and (u is None):
warnings.warn('not running MPC or setting u directly')
# Update the initial state
if x0 is None:
if mpc: # set the initial state to start at set-point if running MPC
x0 = {}
for state_name in self.state_names:
x0[state_name] = self.setpoint[state_name][0]
self.set_initial_state(x0=x0)
else:
self.set_initial_state(x0=x0)
# Update the inputs
self.update_dict(u, name='u')
# Concatenate the inputs, where rows are individual inputs and columns are time-steps
if mpc:
self.w = np.vstack(list(self.setpoint.values())).shape[1]
u_sim = np.zeros((self.w, self.m)) # preallocate input array
else:
self.w = np.vstack(list(self.u.values())).shape[1]
u_sim = np.vstack(list(self.u.values())).T
# Update time vector
T = (self.w - 1) * self.dt
self.time = np.linspace(0, T, num=self.w)
# Set array to store simulated states, where rows are individual states and columns are time-steps
x_step = np.array(list(self.x0.values())) # initialize state
x = np.nan * np.zeros((self.w, self.n))
x[0, :] = x_step.copy()
# Set array to store simulated measurements
y = np.nan * np.zeros((self.w, self.p))
# Initialize the simulator
# self.simulator = do_mpc.simulator.Simulator(self.model)
# self.simulator.set_param(**self.params_simulator)
# self.simulator.set_tvp_fun(self.simulator_tvp_function)
# self.simulator.setup()
self.simulator.reset_history() # reset simulator history (super important for speed)
self.simulator.t0 = self.time[0]
self.simulator.x0 = x_step.copy()
self.simulator.set_initial_guess()
# Initialize MPC
if mpc:
self.mpc.setup()
self.mpc.t0 = self.time[0]
self.mpc.x0 = x_step.copy()
self.mpc.u0 = np.zeros((self.m, 1))
self.mpc.set_initial_guess()
# Run simulation
for k in range(1, self.w):
# Set input
if mpc: # run MPC step
u_step = self.mpc.make_step(x_step)
else: # use inputs directly
u_step = u_sim[k - 1:k, :].T
# Calculate current measurements
y_step = self.h(np.ravel(x_step), np.ravel(u_step))
# Simulate one time step given current inputs
x_step = self.simulator.make_step(u_step)
# Store inputs
u_sim[k - 1, :] = u_step.squeeze()
# Store state
x[k, :] = x_step.squeeze()
# Store measurements
y[k - 1, :] = y_step.squeeze()
# Last input has no effect, so keep it the same as previous time-step
if mpc:
u_sim[-1, :] = u_sim[-2, :]
# Last measurement
y[-1, :] = self.h(np.ravel(x[-1, :]), np.ravel(u_sim[-1, :]))
# Update the inputs
self.update_dict(u_sim, name='u')
# Update state trajectory
self.update_dict(x, name='x')
# Update measurements
self.update_dict(y, name='y')
# Return the measurements in array format
y_array = np.vstack(list(self.y.values())).T
if return_full_output:
return self.time.copy(), self.x.copy(), self.u.copy(), self.y.copy()
else:
return y_array
def get_time_states_inputs_measurements(self):
return self.time.copy(), self.x.copy(), self.u.copy(), self.y.copy()
[docs]
def plot(self, name='x', dpi=150, plot_kwargs=None):
""" Plot states, inputs.
"""
if plot_kwargs is None:
plot_kwargs = {
'color': 'black',
'linewidth': 2.0,
'linestyle': '-',
'marker': '.',
'markersize': 0
}
if name == 'x':
plot_kwargs['color'] = 'firebrick'
elif name == 'u':
plot_kwargs['color'] = 'royalblue'
elif name == 'y':
plot_kwargs['color'] = 'seagreen'
elif name == 'setpoint':
plot_kwargs['color'] = 'gray'
plot_dict = getattr(self, name)
plot_data = np.array(list(plot_dict.values()))
n = plot_data.shape[0]
fig, ax = plt.subplots(n, 1, figsize=(4, n * 1.5), dpi=dpi, sharex=True)
ax = np.atleast_1d(ax)
for n, key in enumerate(plot_dict.keys()):
ax[n].plot(self.time, plot_dict[key], label=name, **plot_kwargs)
ax[n].set_ylabel(key, fontsize=7)
# Also plot the states if plotting setpoint
if name == 'setpoint':
ax[n].plot(self.time, self.x[key], label=key, color='firebrick', linestyle='-', linewidth=0.5)
ax[n].legend(fontsize=6)
y = self.x[key]
else:
y = plot_dict[key]
# Set y-axis limits
y_min = np.min(y)
y_max = np.max(y)
delta = y_max - y_min
if np.abs(delta) < 0.01:
margin = 0.1
ax[n].set_ylim(y_min - margin, y_max + margin)
ax[-1].set_xlabel('time', fontsize=7)
ax[0].set_title(name, fontsize=8, fontweight='bold')
for a in ax.flat:
a.tick_params(axis='both', labelsize=6)
def ensure_float_output(func):
@wraps(func)
def wrapper(*args, **kwargs):
output = func(*args, **kwargs)
return np.array([float(e) for e in output])
return wrapper