I previously posted this question at Proper shape for Predictive with parallel=True - Pyro Discussion Forum, but it was difficult to follow with my specific example. I’ve created an easily reproducible example but have the same question. I am using spark but it can easily be avoided.
import pandas as pd
from pyspark.sql.functions import *
import numpy as np
import pyro
from pyro.nn import PyroModule, PyroSample
from pyro.contrib.autoname import *
import torch
import torch.nn as nn
from torch.distributions.utils import broadcast_all
from torch.distributions import constraints
import pyro.distributions as dist
from pyro.infer.autoguide import AutoDiagonalNormal, AutoMultivariateNormal, AutoLowRankMultivariateNormal, AutoLaplaceApproximation, init_to_mean
from pyro.infer import SVI, Trace_ELBO,TraceEnum_ELBO, Predictive
import pyro.poutine as poutine
from petastorm.spark import SparkDatasetConverter, make_spark_converter
from collections import OrderedDict
n_row = 1000
rng = np.random.default_rng()
df = pd.DataFrame({'x1' : rng.standard_normal(n_row),
'x2' : np.random.uniform(1, 10, n_row)})
df['sigma'] = df['x2']
df['mu'] = 2*df['x1'] + df['x2'] + 1000
df['y'] = rng.standard_normal(n_row) * df['sigma'] + df['mu']
df = spark.createDataFrame(df)
if torch.cuda.is_available():
device = torch.device('cuda')
else:
device = torch.device('cpu')
x_feat = ['x1', 'x2']
y_name = 'y'
torch.set_default_tensor_type("torch.cuda.FloatTensor")
df.sort('x1', 'x2').display()
# spark stuff
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache")
df_conv = make_spark_converter(df)
class myLinear(nn.Module):
def __init__(self, in_features, out_features, bias=True, _print=False):
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.bias = bias
self._print = _print
self.weight = torch.nn.Parameter(torch.randn(out_features, in_features))
self.bias = torch.nn.Parameter(torch.randn(out_features))
def forward(self, input):
if self._print:
print('myLin weight', self.weight.shape, self.weight)
print('myLin input', input.shape)
print('myLin bias', self.bias.shape, self.bias)
print('myLin input.dim', input.dim())
if input.dim() > 1:
x, y = input.shape
if y != self.in_features:
sys.exit(f'Wrong Input Features. Please use tensor with {self.in_features} Input Features')
output = (input @ self.weight.transpose(-1, -2)) + self.bias
#output = input @ self.weight.squeeze().t() + self.bias
if self._print:
print('myLin output', output.shape)
return output
class net(PyroModule):
#
@name_count
def __init__(self, mu_l, sh_l):
super().__init__()
# parameter names list
self.parameter_names = []
layers = []
for i in range(len(mu_l)-1):
layers.append(('mu_fc' + str(i), myLinear(mu_l[i], mu_l[i+1], _print=False)))
if i != (len(mu_l)-2): layers.append(('mu_ReLU' + str(i), nn.ReLU()))
mu = OrderedDict(layers)
self.mu = nn.Sequential(mu)
for name, param in self.mu.named_parameters():
self.parameter_names.append(name)
pyro.nn.module.to_pyro_module_(self.mu)
for m in self.mu.modules():
if m._pyro_name == 'mu.mu_fc' + str(len(mu_l)-2):
for name, value in list(m.named_parameters(recurse=False)):
setattr(m, name, PyroSample(prior=dist.Normal(0., 1.).expand(value.shape).to_event(value.dim())))
# sigma
layers = []
for i in range(len(sh_l)-1):
layers.append(('sh_fc' + str(i), myLinear(sh_l[i], sh_l[i+1])))
if i != (len(sh_l)-2): layers.append(('sh_ReLU' + str(i), nn.ReLU()))
shape = OrderedDict(layers)
self.shape = nn.Sequential(shape)
for name, param in self.shape.named_parameters():
self.parameter_names.append(name)
pyro.nn.module.to_pyro_module_(self.shape)
for m in self.shape.modules():
if m._pyro_name == 'shape.sh_fc' + str(len(sh_l)-2):
for name, value in list(m.named_parameters(recurse=False)):
setattr(m, name, PyroSample(prior=dist.Normal(0., 1.).expand(value.shape).to_event(value.dim())))
def forward(self, x, y=None):
mu = self.mu(x).exp().clamp(min = .000001, max = 1e6).squeeze()
shape = self.shape(x).exp().clamp(min = 0.000001, max = 1e1).squeeze()
with pyro.plate("data", x.shape[0], device = device, dim=-1): #
obs = pyro.sample("obs", dist.Normal(mu, shape), obs=y)
return torch.cat((mu, shape), 0)
def train_and_evaluate_SVI(svi, model, guide, bs, ne):
model = model.to(device)
with df_conv.make_torch_dataloader(batch_size=bs) as train_dataloader:
train_dataloader_iter = iter(train_dataloader)
steps_per_epoch = len(df_conv) // bs
for epoch in range(ne):
if (epoch + 1) % 10 == 0: print('-' * 10)
if (epoch + 1) % 10 == 0: print('Epoch {}/{}'.format(epoch + 1, ne))
train_loss = train_one_epoch_SVI(svi, train_dataloader_iter, steps_per_epoch, epoch, device)
return train_loss
def train_one_epoch_SVI(svi, train_dataloader_iter, steps_per_epoch, epoch, device):
model.train()
running_loss = 0.0
total_lives = 0
# Iterate over the data for one epoch.
for step in range(steps_per_epoch):
pd_batch = next(train_dataloader_iter)
pd_batch['features'] = torch.transpose(torch.stack([pd_batch[x] for x in x_feat]), 0, 1)
inputs = pd_batch['features'].to(device)
labels = pd_batch[y_name].to(device)
loss = svi.step(inputs, labels)
# statistics
running_loss += loss
total_lives += inputs.shape[0]
epoch_loss = running_loss / total_lives
print('Train Loss: {:.4f}'.format(epoch_loss))
return epoch_loss
# train
model = net(mu_l = [2, 3, 4, 1], sh_l = [2, 3, 4, 1])
pyro.clear_param_store()
pyro.set_rng_seed(123456789)
loss_fnc = Trace_ELBO(num_particles=2, vectorize_particles=True)
model = model.to(device)
guide = AutoDiagonalNormal(model)
adam = pyro.optim.ClippedAdam({"lr": 0.001, 'betas': (.95, .999), 'weight_decay' : .25, 'clip_norm' : 10.})
svi = SVI(model, guide, adam, loss=loss_fnc)
train_and_evaluate_SVI(svi=svi, model = model, guide = guide, bs = 10, ne = 20)
# posterior
predictive_obs = Predictive(model, guide=guide, num_samples=int(3), return_sites = ['obs', '_RETURN'], parallel = True)
with df_conv.make_torch_dataloader(batch_size=2) as dl:
dl_iter = iter(dl)
steps = 1
# Iterate over all the validation data.
for step in range(steps):
pd_batch = next(dl_iter)
pd_batch['features'] = torch.transpose(torch.stack([pd_batch[x] for x in x_feat]), 0, 1)#.double()
inputs = pd_batch['features'].to(device)
samples_obs = predictive_obs(inputs)
The above trains with both the default Trace_ELBO
and Trace_ELBO(num_particles=2, vectorize_particles=True)
. However, the posterior prediction part fails when parallel = True
but not when parallel=False
. When it is set to True
, I get the following error:
RuntimeError: The size of tensor a (2) must match the size of tensor b (3) at non-singleton dimension 1
Trace Shapes:
Param Sites:
mu.mu_fc0.weight 3 2
mu.mu_fc0.bias 3
mu.mu_fc1.weight 4 3
mu.mu_fc1.bias 4
Sample Sites:
mu.mu_fc2.weight dist 3 1 | 1 4
value 3 | 1 4
mu.mu_fc2.bias dist 3 1 | 1
value 3 | 1
I’ve tried different solutions such as adding plates, but I can’t seem to get both training and Predictive to work.