Sure, I am sorry if it was too vague.
I use autoregressive NN for the neural spline flow, similar to the example from pyro
def init_flow(hidden_dims = [10, 10], input_dim=10):
count_bins = 8
base_dist = dist.Normal(torch.zeros(input_dim), torch.ones(input_dim), validate_args=False)
hidden_dims = input_dim*np.array(hidden_dims)
param_dims = [count_bins, count_bins, count_bins - 1, count_bins]
hypernet = AutoRegressiveNN(input_dim, hidden_dims, param_dims=param_dims)
transform = T.SplineAutoregressive(input_dim, hypernet, count_bins=count_bins, bound = 10)
pyro.module("my_transform", transform) # doctest: +SKIP
flow_dist = dist.TransformedDistribution(base_dist, [transform], validate_args=True)
return flow_dist, transform
And the function E(x) is a function with many local minima. The idea is that with the probabilistic model we sample many different regions of the parameter space, to eventually converge to a good local minima or maxima.
With the following function one can reproduce the problem:
def random_fct(params):
fcts = [torch.sin, torch.cos]
sums = []
signs = []
for j in range(len(params)):
prods = []
for i in range(len(params)):
f = np.random.choice(fcts)
prods.append(f)
signs.append((-1)**np.random.randint(0,2))
sums.append(prods)
return sums, signs
def test_fct(params, signs, sums):
S = 0
for j in range(len(params)):
prods = 0
for i in range(len(params)):
prods *= sums[j][i](params[I])**2
S += prods*signs[j]
return S
The optimisation is done with the following loop:
learning_rate = 0.2
epochs = 100
batch= 50
flow_dist, transform = init_flow(input_dim = n_qubits*n_layers)
optimizer = ClippedSGD(transform.parameters(), lr = learning_rate, clip_norm = 0.1)
sums, signs = random_fct(params[0][0])
test_fct(params[0][0], signs, sums)
train_progress = []
lp_progress = []
exp_progress = []
norm_dist = dist.Normal(torch.zeros(input_dim), torch.ones(input_dim), validate_args=False)
for i in tqdm(range(epochs)):
norm_samples = norm_dist.sample(torch.tensor([batch, 1])).detach()
params = transform(norm_samples).detach()
expects = torch.tensor([test_fct(params[i][0], signs, sums) for i in range(len(params))])
lp = flow_dist.log_prob(params)
log_expect = lp.reshape(expects.shape)*(expects.detach())
loss = -(log_expect.mean())
optimizer.zero_grad()
loss.backward(retain_graph=True)
optimizer.step()
train_progress.append(loss.item())
lp_progress.append(lp.mean().item())
exp_progress.append(expects.mean().item())
plt.plot(train_progress)
plt.show()
plt.plot(lp_progress)
plt.show()
plt.plot(exp_progress)
plt.show()
The clipped SGD is defined as
class ClippedSGD(Optimizer):
def __init__(self, params, lr: float = 1e-3,
eps: float = 1e-8, clip_norm: float = 1.0):
defaults = dict(lr=lr, eps=eps, clip_norm=clip_norm)
super().__init__(params, defaults)
def step(self, closure: Optional[Callable] = None) -> Optional[Any]:
loss = None
if closure is not None:
loss = closure()
for group in self.param_groups:
for p in group['params']:
if p.grad is None:
continue
grad = p.grad.data
grad.clamp_(-group['clip_norm'], group['clip_norm'])
denom = torch.tensor([1.0])
p.data.addcdiv_(grad, denom, value=-group["lr"])
return loss
With standard SGD the gradients immediately explode and the optimisation fails.