Shape issue when sampling from GaussianHMM

Hi,

I’m trying to fit a local linear trend + daily seasonality state space model using GaussianHMM, that is:

Y_t = F @ theta_t + v_t; v_t ~ N(0,sigma_v)
theta_t = G @ theta_t-1 + w_t; w_t ~ MVN(0, Sigma_w), diag(Sigma_w)(sigma_w1,sigma_w2,sigma_w3, 0,...)

with Y_t \in R^{1} and theta_{t} \in R^{8}.

It took me a while but I’m able to fit the model.
Unfortunately when sampling(forecasting) I receive the following error message ValueError: duration, event_shape mismatch: 147 vs (13,1).
I suspect that the problem arises as a result of the way in which I construct the covariance matrix of the transition distribution W.

Any suggestions on how I can fix that problem? The code can be found below.

Since the tutorials were not really helpful (e.g. GaussianHMM expects the observation_matrix to have shape ==(…,hidden_dim, obs_dim)) I would be happy to contribute a tutorial myself as soon as I manage to fix the problem.

Kind Regards
beta21

import numpy as np
import pyro
import pyro.distributions as dist
import torch
from pyro.contrib.forecast import Forecaster, ForecastingModel


class DLM1(ForecastingModel):
    def model(self, zero_data, covariates):
        """
        Y_t = F @ theta_t + v_t; v_t ~ N(0,sigma_v)
        theta_t = G @ theta_t-1 + w_t; w_t ~ MVN(0, Sigma_w), diag(Sigma_w)=(sigma_w1,sigma_w2,sigma_w3, 0,...)
        """
        T = zero_data.size(-2)
        obs_dim = 1
        hidden_dim = 8

        obs_matrix = torch.as_tensor(
            np.array([[1, 0, 1, 0, 0, 0, 0, 0]]), dtype=torch.float32
        )
        # assert obs_matrix.size() == (obs_dim, hidden_dim)
        # assert obs_matrix.dtype == torch.float32
        trans_matrix = torch.as_tensor(
            np.array(
                [
                    [1, 1, 0, 0, 0, 0, 0, 0],
                    [0, 1, 0, 0, 0, 0, 0, 0],
                    [0, 0, -1, -1, -1, -1, -1, -1],
                    [0, 0, 1, 0, 0, 0, 0, 0],
                    [0, 0, 0, 1, 0, 0, 0, 0],
                    [0, 0, 0, 0, 1, 0, 0, 0],
                    [0, 0, 0, 0, 0, 1, 0, 0],
                    [0, 0, 0, 0, 0, 0, 1, 0],
                ]
            ),
            dtype=torch.float32,
        )
        # assert trans_matrix.dtype == torch.float32
        # assert trans_matrix.size() == (hidden_dim,hidden_dim)

        # Initial distriburion
        init_dist = dist.Normal(0, 10).expand([8]).to_event(1)
        # assert init_dist.event_shape == (hidden_dim,)

        # observation noise
        sigma_v = pyro.sample("sigma_v", dist.HalfNormal(scale=1.0))
        # assert sigma_v.size() == ()
        obs_noise_dist = dist.Normal(0, sigma_v.unsqueeze(-1)).to_event(1)
        # assert obs_noise_dist.event_shape == (obs_dim,)

        # transition noise
        sigma_w1 = pyro.sample("sigma_w1", dist.HalfNormal(scale=1.0).expand([1]))
        # assert sigma_w1.size() == (1,)
        sigma_w2 = pyro.sample("sigma_w2", dist.HalfNormal(scale=1.0).expand([1]))
        # assert sigma_w2.size() == (1,)
        sigma_w3 = pyro.sample("sigma_w3", dist.HalfNormal(scale=1.0).expand([1]))
        # assert sigma_w3.size() == (1,)

        k = sigma_w1.size()[:-1]
        W = torch.zeros(k + (hidden_dim, hidden_dim))
        W[..., 0, 0] = sigma_w1.squeeze(-1)
        W[..., 1, 1] = sigma_w2.squeeze(-1)
        W[..., 2, 2] = sigma_w3.squeeze(-1)
        W = W + torch.eye(hidden_dim) * 1e-4
        assert W.size()[-2:] == (hidden_dim, hidden_dim)

        trans_noise_dist = dist.MultivariateNormal(
            loc=torch.zeros(k + (hidden_dim,)), covariance_matrix=W
        )
        # assert trans_noise_dist.event_shape == (hidden_dim,)

        noise_dist = dist.GaussianHMM(
            initial_dist=init_dist,
            transition_matrix=trans_matrix.T,
            transition_dist=trans_noise_dist,
            observation_matrix=obs_matrix.T,
            observation_dist=obs_noise_dist,
            duration=T,
        )
        # assert noise_dist.event_shape == (T, 1)
        self.predict(noise_dist, torch.zeros((T, 1)))


if __name__ == "__main__":

    obs = np.hstack([np.array([-1, 2, 3, 5, -1, -3, 0])] * 21)
    T = len(obs)
    T1 = T - 14
    obs = obs + np.arange(T) * 0.4
    obs = obs + np.random.normal(size=T)
    obs = torch.as_tensor(obs[:, np.newaxis], dtype=torch.float32)

    pyro.set_rng_seed(1)
    pyro.clear_param_store()

    assert obs.size() == (T, 1)
    assert obs.dtype == torch.float32
    covariates = torch.zeros(len(obs), 0)  # empty

    forecaster = Forecaster(DLM1(), obs[:T1], covariates=covariates[:T1], learning_rate=0.1,num_steps=400
    )
    samples = forecaster(obs[:T1], covariates, num_samples=13)

Hi @beta21, we’d definitely appreciate tutorials about forecasting!

It looks like you can replace the MultivariateNormal noise_dist with a simple Normal(...).to_event(1) since your covariance is diagonal. This seems to work:



class DLM1(ForecastingModel):
    def model(self, zero_data, covariates):
        ...
        # transition noise
        sigma_w1 = pyro.sample("sigma_w1", dist.HalfNormal(scale=1.0))
        sigma_w2 = pyro.sample("sigma_w2", dist.HalfNormal(scale=1.0))
        sigma_w3 = pyro.sample("sigma_w3", dist.HalfNormal(scale=1.0))

        w = 1e-2 + torch.cat(
            [
                torch.stack([sigma_w1, sigma_w2, sigma_w3], dim=-1),
                torch.zeros(sigma_w1.shape + (hidden_dim - 3,)),
            ],
            dim=-1,
        )
        trans_noise_dist = dist.Normal(0, w).to_event(1)

        ...