Hi,
I’m trying to fit a local linear trend + daily seasonality state space model using GaussianHMM, that is:
Y_t = F @ theta_t + v_t; v_t ~ N(0,sigma_v)
theta_t = G @ theta_t-1 + w_t; w_t ~ MVN(0, Sigma_w), diag(Sigma_w)(sigma_w1,sigma_w2,sigma_w3, 0,...)
with Y_t \in R^{1}
and theta_{t} \in R^{8}
.
It took me a while but I’m able to fit the model.
Unfortunately when sampling(forecasting) I receive the following error message ValueError: duration, event_shape mismatch: 147 vs (13,1)
.
I suspect that the problem arises as a result of the way in which I construct the covariance matrix of the transition distribution W.
Any suggestions on how I can fix that problem? The code can be found below.
Since the tutorials were not really helpful (e.g. GaussianHMM expects the observation_matrix to have shape ==(…,hidden_dim, obs_dim)) I would be happy to contribute a tutorial myself as soon as I manage to fix the problem.
Kind Regards
beta21
import numpy as np
import pyro
import pyro.distributions as dist
import torch
from pyro.contrib.forecast import Forecaster, ForecastingModel
class DLM1(ForecastingModel):
def model(self, zero_data, covariates):
"""
Y_t = F @ theta_t + v_t; v_t ~ N(0,sigma_v)
theta_t = G @ theta_t-1 + w_t; w_t ~ MVN(0, Sigma_w), diag(Sigma_w)=(sigma_w1,sigma_w2,sigma_w3, 0,...)
"""
T = zero_data.size(-2)
obs_dim = 1
hidden_dim = 8
obs_matrix = torch.as_tensor(
np.array([[1, 0, 1, 0, 0, 0, 0, 0]]), dtype=torch.float32
)
# assert obs_matrix.size() == (obs_dim, hidden_dim)
# assert obs_matrix.dtype == torch.float32
trans_matrix = torch.as_tensor(
np.array(
[
[1, 1, 0, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0, 0, 0],
[0, 0, -1, -1, -1, -1, -1, -1],
[0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 0, 0, 1, 0],
]
),
dtype=torch.float32,
)
# assert trans_matrix.dtype == torch.float32
# assert trans_matrix.size() == (hidden_dim,hidden_dim)
# Initial distriburion
init_dist = dist.Normal(0, 10).expand([8]).to_event(1)
# assert init_dist.event_shape == (hidden_dim,)
# observation noise
sigma_v = pyro.sample("sigma_v", dist.HalfNormal(scale=1.0))
# assert sigma_v.size() == ()
obs_noise_dist = dist.Normal(0, sigma_v.unsqueeze(-1)).to_event(1)
# assert obs_noise_dist.event_shape == (obs_dim,)
# transition noise
sigma_w1 = pyro.sample("sigma_w1", dist.HalfNormal(scale=1.0).expand([1]))
# assert sigma_w1.size() == (1,)
sigma_w2 = pyro.sample("sigma_w2", dist.HalfNormal(scale=1.0).expand([1]))
# assert sigma_w2.size() == (1,)
sigma_w3 = pyro.sample("sigma_w3", dist.HalfNormal(scale=1.0).expand([1]))
# assert sigma_w3.size() == (1,)
k = sigma_w1.size()[:-1]
W = torch.zeros(k + (hidden_dim, hidden_dim))
W[..., 0, 0] = sigma_w1.squeeze(-1)
W[..., 1, 1] = sigma_w2.squeeze(-1)
W[..., 2, 2] = sigma_w3.squeeze(-1)
W = W + torch.eye(hidden_dim) * 1e-4
assert W.size()[-2:] == (hidden_dim, hidden_dim)
trans_noise_dist = dist.MultivariateNormal(
loc=torch.zeros(k + (hidden_dim,)), covariance_matrix=W
)
# assert trans_noise_dist.event_shape == (hidden_dim,)
noise_dist = dist.GaussianHMM(
initial_dist=init_dist,
transition_matrix=trans_matrix.T,
transition_dist=trans_noise_dist,
observation_matrix=obs_matrix.T,
observation_dist=obs_noise_dist,
duration=T,
)
# assert noise_dist.event_shape == (T, 1)
self.predict(noise_dist, torch.zeros((T, 1)))
if __name__ == "__main__":
obs = np.hstack([np.array([-1, 2, 3, 5, -1, -3, 0])] * 21)
T = len(obs)
T1 = T - 14
obs = obs + np.arange(T) * 0.4
obs = obs + np.random.normal(size=T)
obs = torch.as_tensor(obs[:, np.newaxis], dtype=torch.float32)
pyro.set_rng_seed(1)
pyro.clear_param_store()
assert obs.size() == (T, 1)
assert obs.dtype == torch.float32
covariates = torch.zeros(len(obs), 0) # empty
forecaster = Forecaster(DLM1(), obs[:T1], covariates=covariates[:T1], learning_rate=0.1,num_steps=400
)
samples = forecaster(obs[:T1], covariates, num_samples=13)