I was trying out curve fitting using Pyro (instead of using Least Squares methods with χ2 loss functions). I’ve included a minimum working example fitting a sine curve to simulated noise, which works fairly well.
The std estimates on the parameter however are smaller than those of curve_fit
or other methods, and I’m not sure why. The mean values are comparable. I might be doing something wrong in my definition of the model
and guide
.
Additionally I’ve noticed by inspecting the parameter store that the σ
parameters are negative, even though I put positive constraints on them. I’m very new to pyro so it’s very possible that I’m missing something here.
Finally I set sigma
, the parameter that defines the noise
std on top of the data as a pyro parameter. Is there some way to extract what optimal values SVI found for this?
import math
from tqdm import tqdm
import torch
import pyro
import pyro.optim as optim
import pyro.distributions as dist
from torch.distributions import constraints
from pyro.infer import SVI, Trace_ELBO, Predictive
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
from scipy.optimize import curve_fit
np.random.seed(1)
def summary(samples):
site_stats = {}
for site_name, values in samples.items():
marginal_site = pd.DataFrame(values)
describe = marginal_site.describe(
percentiles=[.05, 0.25, 0.5, 0.75, 0.95]
).transpose()
site_stats[site_name] = describe[
["mean", "std", "5%", "25%", "50%", "75%", "95%"]
]
return site_stats
def fn(x, ω, ϕ, b):
return np.sin(ω*x + ϕ) + b
σ_noise = 1/7
ω = 2*np.pi*2.2
ϕ = np.random.rand()*2*np.pi
b = np.random.randn()
x = np.linspace(0,2,101)
y = fn(x, ω, ϕ, b)
y += np.random.randn(y.size) * σ_noise
dat = savgol_filter(y-np.mean(y), window_length = 11, polyorder = 3)
ω_guess = (
2*math.pi * len(np.where(np.sign(dat)[1:] != np.sign(dat)[:-1])[0])
/ (2 * (x.max() - x.min()))
)
b_guess = np.mean(y)
bounds = ([0.8*ω_guess,0,-10], [1.2*ω_guess, 2*math.pi, 10])
popt, pcov = curve_fit(
fn,x,y,p0 = [ω_guess, 1., b_guess],
bounds = bounds, sigma = np.ones(len(x))*σ_noise
)
def model(x,y,guess):
ωμ = pyro.param("ωμ", torch.tensor(guess[0]), constraint = constraints.positive)
ωσ = pyro.param("ωσ", torch.tensor(0.2), constraint = constraints.positive)
ω = pyro.sample("ω", dist.Normal(ωμ, ωσ))
ϕμ = pyro.param(
"ϕμ", torch.tensor(guess[1]),
constraint = constraints.interval(0, 2*math.pi)
)
ϕσ = pyro.param("ϕσ", torch.tensor(1.), constraint = constraints.positive)
ϕ = pyro.sample("ϕ", dist.Normal(ϕμ,ϕσ))
bμ = pyro.param(
"bμ", torch.tensor(guess[2]),
constraint = constraints.interval(-10, 10)
)
bσ = pyro.param("bσ", torch.tensor(0.2), constraint = constraints.positive)
b = pyro.sample("b", dist.Normal(bμ, bσ))
values = torch.sin(ω*x+ϕ)+b
sigma = pyro.param(
"sigma", torch.tensor(1.),
constraint = constraints.positive
)
with pyro.plate("data", len(y)):
pyro.sample("obs", dist.Normal(0, sigma), obs = values-y)
def guide(x,y,guess):
ωμ = pyro.param("ωμ", torch.tensor(guess[0]), constraint = constraints.positive)
ωσ = pyro.param("ωσ", torch.tensor(0.2), constraint = constraints.positive)
ω = pyro.sample("ω", dist.Normal(ωμ, ωσ))
ϕμ = pyro.param(
"ϕμ", torch.tensor(guess[1]),
constraint = constraints.interval(0, 2*math.pi)
)
ϕσ = pyro.param("ϕσ", torch.tensor(1.), constraint = constraints.positive)
ϕ = pyro.sample("ϕ", dist.Normal(ϕμ,ϕσ))
bμ = pyro.param(
"bμ", torch.tensor(guess[2]),
constraint = constraints.interval(-10, 10)
)
bσ = pyro.param("bσ", torch.tensor(0.2), constraint = constraints.positive)
b = pyro.sample("b", dist.Normal(bμ,bσ))
sigma = pyro.param(
"sigma", torch.tensor(1.0),
constraint = constraints.positive
)
svi = SVI(model,
guide,
optim.Adam({"lr": .05}),
loss=Trace_ELBO()
)
pyro.clear_param_store()
num_iters = 5_000
for i in tqdm(range(num_iters)):
elbo = svi.step(torch.tensor(x), torch.tensor(y), popt)
if i % 500 == 0:
print("Elbo loss: {}".format(elbo))
pstore = pyro.get_param_store()
for s,v in pstore.named_parameters():
print(s,v)
from pyro.infer import Predictive
num_samples = 1_000
predictive = Predictive(model, guide=guide, num_samples=num_samples)
svi_samples = {k: v.reshape(num_samples).detach().cpu().numpy()
for k, v in predictive(
torch.tensor(x), torch.tensor(y), popt
).items()
if k != "obs"}
for s, v in summary(svi_samples).items():
print(s, v)