179 lines
6.6 KiB
Python
179 lines
6.6 KiB
Python
import torch
|
|
|
|
from src.diffusion.base.guidance import *
|
|
from src.diffusion.base.scheduling import *
|
|
from src.diffusion.base.sampling import *
|
|
|
|
from typing import Callable
|
|
|
|
|
|
def shift_respace_fn(t, shift=3.0):
|
|
return t / (t + (1 - t) * shift)
|
|
|
|
def ode_step_fn(x, v, dt, s, w):
|
|
return x + v * dt
|
|
|
|
def sde_mean_step_fn(x, v, dt, s, w):
|
|
return x + v * dt + s * w * dt
|
|
|
|
def sde_step_fn(x, v, dt, s, w):
|
|
return x + v*dt + s * w* dt + torch.sqrt(2*w*dt)*torch.randn_like(x)
|
|
|
|
def sde_preserve_step_fn(x, v, dt, s, w):
|
|
return x + v*dt + 0.5*s*w* dt + torch.sqrt(w*dt)*torch.randn_like(x)
|
|
|
|
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class EulerSampler(BaseSampler):
|
|
def __init__(
|
|
self,
|
|
w_scheduler: BaseScheduler = None,
|
|
timeshift=1.0,
|
|
step_fn: Callable = ode_step_fn,
|
|
last_step=None,
|
|
last_step_fn: Callable = ode_step_fn,
|
|
*args,
|
|
**kwargs
|
|
):
|
|
super().__init__(*args, **kwargs)
|
|
self.step_fn = step_fn
|
|
self.last_step = last_step
|
|
self.last_step_fn = last_step_fn
|
|
self.w_scheduler = w_scheduler
|
|
self.timeshift = timeshift
|
|
|
|
if self.last_step is None or self.num_steps == 1:
|
|
self.last_step = 1.0 / self.num_steps
|
|
|
|
timesteps = torch.linspace(0.0, 1 - self.last_step, self.num_steps)
|
|
timesteps = torch.cat([timesteps, torch.tensor([1.0])], dim=0)
|
|
self.timesteps = shift_respace_fn(timesteps, self.timeshift)
|
|
|
|
assert self.last_step > 0.0
|
|
assert self.scheduler is not None
|
|
assert self.w_scheduler is not None or self.step_fn in [ode_step_fn, ]
|
|
if self.w_scheduler is not None:
|
|
if self.step_fn == ode_step_fn:
|
|
logger.warning("current sampler is ODE sampler, but w_scheduler is enabled")
|
|
|
|
def _impl_sampling(self, net, noise, condition, uncondition):
|
|
"""
|
|
sampling process of Euler sampler
|
|
-
|
|
"""
|
|
batch_size = noise.shape[0]
|
|
steps = self.timesteps.to(noise.device)
|
|
cfg_condition = torch.cat([uncondition, condition], dim=0)
|
|
x = noise
|
|
for i, (t_cur, t_next) in enumerate(zip(steps[:-1], steps[1:])):
|
|
dt = t_next - t_cur
|
|
t_cur = t_cur.repeat(batch_size)
|
|
sigma = self.scheduler.sigma(t_cur)
|
|
dalpha_over_alpha = self.scheduler.dalpha_over_alpha(t_cur)
|
|
dsigma_mul_sigma = self.scheduler.dsigma_mul_sigma(t_cur)
|
|
if self.w_scheduler:
|
|
w = self.w_scheduler.w(t_cur)
|
|
else:
|
|
w = 0.0
|
|
|
|
cfg_x = torch.cat([x, x], dim=0)
|
|
cfg_t = t_cur.repeat(2)
|
|
out = net(cfg_x, cfg_t, cfg_condition)
|
|
out = self.guidance_fn(out, self.guidance)
|
|
v = out
|
|
s = ((1/dalpha_over_alpha)*v - x)/(sigma**2 - (1/dalpha_over_alpha)*dsigma_mul_sigma)
|
|
if i < self.num_steps -1 :
|
|
x = self.step_fn(x, v, dt, s=s, w=w)
|
|
else:
|
|
x = self.last_step_fn(x, v, dt, s=s, w=w)
|
|
return x
|
|
|
|
|
|
class HeunSampler(BaseSampler):
|
|
def __init__(
|
|
self,
|
|
scheduler: BaseScheduler = None,
|
|
w_scheduler: BaseScheduler = None,
|
|
exact_henu=False,
|
|
timeshift=1.0,
|
|
step_fn: Callable = ode_step_fn,
|
|
last_step=None,
|
|
last_step_fn: Callable = ode_step_fn,
|
|
*args,
|
|
**kwargs
|
|
):
|
|
super().__init__(*args, **kwargs)
|
|
self.scheduler = scheduler
|
|
self.exact_henu = exact_henu
|
|
self.step_fn = step_fn
|
|
self.last_step = last_step
|
|
self.last_step_fn = last_step_fn
|
|
self.w_scheduler = w_scheduler
|
|
self.timeshift = timeshift
|
|
|
|
timesteps = torch.linspace(0.0, 1 - self.last_step, self.num_steps)
|
|
timesteps = torch.cat([timesteps, torch.tensor([1.0])], dim=0)
|
|
self.timesteps = shift_respace_fn(timesteps, self.timeshift)
|
|
|
|
if self.last_step is None or self.num_steps == 1:
|
|
self.last_step = 1.0 / self.num_steps
|
|
assert self.last_step > 0.0
|
|
assert self.scheduler is not None
|
|
assert self.w_scheduler is not None or self.step_fn in [ode_step_fn, ]
|
|
if self.w_scheduler is not None:
|
|
if self.step_fn == ode_step_fn:
|
|
logger.warning("current sampler is ODE sampler, but w_scheduler is enabled")
|
|
|
|
def _impl_sampling(self, net, noise, condition, uncondition):
|
|
"""
|
|
sampling process of Henu sampler
|
|
-
|
|
"""
|
|
batch_size = noise.shape[0]
|
|
steps = self.timesteps.to(noise.device)
|
|
cfg_condition = torch.cat([uncondition, condition], dim=0)
|
|
x = noise
|
|
v_hat, s_hat = 0.0, 0.0
|
|
for i, (t_cur, t_next) in enumerate(zip(steps[:-1], steps[1:])):
|
|
dt = t_next - t_cur
|
|
t_cur = t_cur.repeat(batch_size)
|
|
sigma = self.scheduler.sigma(t_cur)
|
|
alpha_over_dalpha = 1/self.scheduler.dalpha_over_alpha(t_cur)
|
|
dsigma_mul_sigma = self.scheduler.dsigma_mul_sigma(t_cur)
|
|
t_hat = t_next
|
|
t_hat = t_hat.repeat(batch_size)
|
|
sigma_hat = self.scheduler.sigma(t_hat)
|
|
alpha_over_dalpha_hat = 1 / self.scheduler.dalpha_over_alpha(t_hat)
|
|
dsigma_mul_sigma_hat = self.scheduler.dsigma_mul_sigma(t_hat)
|
|
|
|
if self.w_scheduler:
|
|
w = self.w_scheduler.w(t_cur)
|
|
else:
|
|
w = 0.0
|
|
if i == 0 or self.exact_henu:
|
|
cfg_x = torch.cat([x, x], dim=0)
|
|
cfg_t_cur = t_cur.repeat(2)
|
|
out = net(cfg_x, cfg_t_cur, cfg_condition)
|
|
out = self.guidance_fn(out, self.guidance)
|
|
v = out
|
|
s = ((alpha_over_dalpha)*v - x)/(sigma**2 - (alpha_over_dalpha)*dsigma_mul_sigma)
|
|
else:
|
|
v = v_hat
|
|
s = s_hat
|
|
x_hat = self.step_fn(x, v, dt, s=s, w=w)
|
|
# henu correct
|
|
if i < self.num_steps -1:
|
|
cfg_x_hat = torch.cat([x_hat, x_hat], dim=0)
|
|
cfg_t_hat = t_hat.repeat(2)
|
|
out = net(cfg_x_hat, cfg_t_hat, cfg_condition)
|
|
out = self.guidance_fn(out, self.guidance)
|
|
v_hat = out
|
|
s_hat = ((alpha_over_dalpha_hat)* v_hat - x_hat) / (sigma_hat ** 2 - (alpha_over_dalpha_hat) * dsigma_mul_sigma_hat)
|
|
v = (v + v_hat) / 2
|
|
s = (s + s_hat) / 2
|
|
x = self.step_fn(x, v, dt, s=s, w=w)
|
|
else:
|
|
x = self.last_step_fn(x, v, dt, s=s, w=w)
|
|
return x |