import torch import numpy as np import comfy.utils, comfy.sample, comfy.samplers, comfy.controlnet, comfy.model_base, comfy.model_management, comfy.sampler_helpers, comfy.supported_models from comfy.model_patcher import ModelPatcher from nodes import RepeatLatentBatch, CLIPTextEncode, VAEEncodeForInpaint from ..modules.layer_diffuse import LayerMethod from ..config import * from .. import easyCache, sampler # 预采样设置(基础) class samplerSettings: def __init__(self): pass @classmethod def INPUT_TYPES(cls): return {"required": {"pipe": ("PIPE_LINE",), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}), "sampler_name": (comfy.samplers.KSampler.SAMPLERS,), "scheduler": (comfy.samplers.KSampler.SCHEDULERS + NEW_SCHEDULERS,), "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), "seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}), }, "optional": { "image_to_latent": ("IMAGE",), "latent": ("LATENT",), }, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"}, } RETURN_TYPES = ("PIPE_LINE", ) RETURN_NAMES = ("pipe",) FUNCTION = "settings" CATEGORY = "EasyUse/PreSampling" def settings(self, pipe, steps, cfg, sampler_name, scheduler, denoise, seed, image_to_latent=None, latent=None, prompt=None, extra_pnginfo=None, my_unique_id=None): # 图生图转换 vae = pipe["vae"] batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1 if image_to_latent is not None: _, height, width, _ = image_to_latent.shape if height == 1 and width == 1: samples = pipe["samples"] images = pipe["images"] else: samples = {"samples": vae.encode(image_to_latent[:, :, :, :3])} samples = RepeatLatentBatch().repeat(samples, batch_size)[0] images = image_to_latent elif latent is not None: samples = latent images = pipe["images"] else: samples = pipe["samples"] images = pipe["images"] new_pipe = { "model": pipe['model'], "positive": pipe['positive'], "negative": pipe['negative'], "vae": pipe['vae'], "clip": pipe['clip'], "samples": samples, "images": images, "seed": seed, "loader_settings": { **pipe["loader_settings"], "steps": steps, "cfg": cfg, "sampler_name": sampler_name, "scheduler": scheduler, "denoise": denoise, "add_noise": "enabled" } } del pipe return {"ui": {"value": [seed]}, "result": (new_pipe,)} # 预采样设置(高级) class samplerSettingsAdvanced: def __init__(self): pass @classmethod def INPUT_TYPES(cls): return {"required": {"pipe": ("PIPE_LINE",), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}), "sampler_name": (comfy.samplers.KSampler.SAMPLERS,), "scheduler": (comfy.samplers.KSampler.SCHEDULERS + NEW_SCHEDULERS,), "start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}), "end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}), "add_noise": (["enable (CPU)", "enable (GPU=A1111)", "disable"], {"default": "enable (CPU)"}), "seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}), "return_with_leftover_noise": (["disable", "enable"], ), }, "optional": { "image_to_latent": ("IMAGE",), "latent": ("LATENT",) }, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"}, } RETURN_TYPES = ("PIPE_LINE", ) RETURN_NAMES = ("pipe",) FUNCTION = "settings" CATEGORY = "EasyUse/PreSampling" def settings(self, pipe, steps, cfg, sampler_name, scheduler, start_at_step, end_at_step, add_noise, seed, return_with_leftover_noise, image_to_latent=None, latent=None, prompt=None, extra_pnginfo=None, my_unique_id=None): # 图生图转换 vae = pipe["vae"] batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1 if image_to_latent is not None: _, height, width, _ = image_to_latent.shape if height == 1 and width == 1: samples = pipe["samples"] images = pipe["images"] else: samples = {"samples": vae.encode(image_to_latent[:, :, :, :3])} samples = RepeatLatentBatch().repeat(samples, batch_size)[0] images = image_to_latent elif latent is not None: samples = latent images = pipe["images"] else: samples = pipe["samples"] images = pipe["images"] force_full_denoise = True if return_with_leftover_noise == "enable": force_full_denoise = False new_pipe = { "model": pipe['model'], "positive": pipe['positive'], "negative": pipe['negative'], "vae": pipe['vae'], "clip": pipe['clip'], "samples": samples, "images": images, "seed": seed, "loader_settings": { **pipe["loader_settings"], "steps": steps, "cfg": cfg, "sampler_name": sampler_name, "scheduler": scheduler, "start_step": start_at_step, "last_step": end_at_step, "denoise": 1.0, "add_noise": add_noise, "force_full_denoise": force_full_denoise } } del pipe return {"ui": {"value": [seed]}, "result": (new_pipe,)} # 预采样设置(噪声注入) class samplerSettingsNoiseIn: def __init__(self): pass @classmethod def INPUT_TYPES(cls): return {"required": {"pipe": ("PIPE_LINE",), "factor": ("FLOAT", {"default": 0.1, "min": 0.0, "max": 1.0, "step":0.01, "round": 0.01}), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}), "sampler_name": (comfy.samplers.KSampler.SAMPLERS,), "scheduler": (comfy.samplers.KSampler.SCHEDULERS+NEW_SCHEDULERS,), "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), "seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}), }, "optional": { "optional_noise_seed": ("INT",{"forceInput": True}), "optional_latent": ("LATENT",), }, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"}, } RETURN_TYPES = ("PIPE_LINE", ) RETURN_NAMES = ("pipe",) FUNCTION = "settings" CATEGORY = "EasyUse/PreSampling" def slerp(self, val, low, high): dims = low.shape low = low.reshape(dims[0], -1) high = high.reshape(dims[0], -1) low_norm = low / torch.norm(low, dim=1, keepdim=True) high_norm = high / torch.norm(high, dim=1, keepdim=True) low_norm[low_norm != low_norm] = 0.0 high_norm[high_norm != high_norm] = 0.0 omega = torch.acos((low_norm * high_norm).sum(1)) so = torch.sin(omega) res = (torch.sin((1.0 - val) * omega) / so).unsqueeze(1) * low + (torch.sin(val * omega) / so).unsqueeze( 1) * high return res.reshape(dims) def prepare_mask(self, mask, shape): mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(shape[2], shape[3]), mode="bilinear") mask = mask.expand((-1, shape[1], -1, -1)) if mask.shape[0] < shape[0]: mask = mask.repeat((shape[0] - 1) // mask.shape[0] + 1, 1, 1, 1)[:shape[0]] return mask def expand_mask(self, mask, expand, tapered_corners): try: import scipy c = 0 if tapered_corners else 1 kernel = np.array([[c, 1, c], [1, 1, 1], [c, 1, c]]) mask = mask.reshape((-1, mask.shape[-2], mask.shape[-1])) out = [] for m in mask: output = m.numpy() for _ in range(abs(expand)): if expand < 0: output = scipy.ndimage.grey_erosion(output, footprint=kernel) else: output = scipy.ndimage.grey_dilation(output, footprint=kernel) output = torch.from_numpy(output) out.append(output) return torch.stack(out, dim=0) except: return None def settings(self, pipe, factor, steps, cfg, sampler_name, scheduler, denoise, seed, optional_noise_seed=None, optional_latent=None, prompt=None, extra_pnginfo=None, my_unique_id=None): latent = optional_latent if optional_latent is not None else pipe["samples"] model = pipe["model"] # generate base noise batch_size, _, height, width = latent["samples"].shape generator = torch.manual_seed(seed) base_noise = torch.randn((1, 4, height, width), dtype=torch.float32, device="cpu", generator=generator).repeat(batch_size, 1, 1, 1).cpu() # generate variation noise if optional_noise_seed is None or optional_noise_seed == seed: optional_noise_seed = seed+1 generator = torch.manual_seed(optional_noise_seed) variation_noise = torch.randn((batch_size, 4, height, width), dtype=torch.float32, device="cpu", generator=generator).cpu() slerp_noise = self.slerp(factor, base_noise, variation_noise) end_at_step = steps # min(steps, end_at_step) start_at_step = round(end_at_step - end_at_step * denoise) device = comfy.model_management.get_torch_device() comfy.model_management.load_model_gpu(model) model_patcher = comfy.model_patcher.ModelPatcher(model.model, load_device=device, offload_device=comfy.model_management.unet_offload_device()) sampler = comfy.samplers.KSampler(model_patcher, steps=steps, device=device, sampler=sampler_name, scheduler=scheduler, denoise=1.0, model_options=model.model_options) sigmas = sampler.sigmas sigma = sigmas[start_at_step] - sigmas[end_at_step] sigma /= model.model.latent_format.scale_factor sigma = sigma.cpu().numpy() work_latent = latent.copy() work_latent["samples"] = latent["samples"].clone() + slerp_noise * sigma if "noise_mask" in latent: noise_mask = self.prepare_mask(latent["noise_mask"], latent['samples'].shape) work_latent["samples"] = noise_mask * work_latent["samples"] + (1-noise_mask) * latent["samples"] work_latent['noise_mask'] = self.expand_mask(latent["noise_mask"].clone(), 5, True) if pipe is None: pipe = {} new_pipe = { "model": pipe['model'], "positive": pipe['positive'], "negative": pipe['negative'], "vae": pipe['vae'], "clip": pipe['clip'], "samples": work_latent, "images": pipe['images'], "seed": seed, "loader_settings": { **pipe["loader_settings"], "steps": steps, "cfg": cfg, "sampler_name": sampler_name, "scheduler": scheduler, "denoise": denoise, "add_noise": "disable" } } return (new_pipe,) # 预采样设置(自定义) class samplerCustomSettings: def __init__(self): pass @classmethod def INPUT_TYPES(cls): return {"required": { "pipe": ("PIPE_LINE",), "guider": (['CFG','DualCFG','Basic', 'IP2P+CFG', 'IP2P+DualCFG','IP2P+Basic'],{"default":"Basic"}), "cfg": ("FLOAT", {"default": 3.5, "min": 0.0, "max": 100.0}), "cfg_negative": ("FLOAT", {"default": 1.5, "min": 0.0, "max": 100.0}), "sampler_name": (comfy.samplers.KSampler.SAMPLERS + ['inversed_euler'],), "scheduler": (comfy.samplers.KSampler.SCHEDULERS + ['karrasADV','exponentialADV','polyExponential', 'sdturbo', 'vp', 'alignYourSteps', 'gits'],), "coeff": ("FLOAT", {"default": 1.20, "min": 0.80, "max": 1.50, "step": 0.05}), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "sigma_max": ("FLOAT", {"default": 14.614642, "min": 0.0, "max": 1000.0, "step": 0.01, "round": False}), "sigma_min": ("FLOAT", {"default": 0.0291675, "min": 0.0, "max": 1000.0, "step": 0.01, "round": False}), "rho": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0, "step": 0.01, "round": False}), "beta_d": ("FLOAT", {"default": 19.9, "min": 0.0, "max": 1000.0, "step": 0.01, "round": False}), "beta_min": ("FLOAT", {"default": 0.1, "min": 0.0, "max": 1000.0, "step": 0.01, "round": False}), "eps_s": ("FLOAT", {"default": 0.001, "min": 0.0, "max": 1.0, "step": 0.0001, "round": False}), "flip_sigmas": ("BOOLEAN", {"default": False}), "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), "add_noise": (["enable (CPU)", "enable (GPU=A1111)", "disable"], {"default": "enable (CPU)"}), "seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}), }, "optional": { "image_to_latent": ("IMAGE",), "latent": ("LATENT",), "optional_sampler":("SAMPLER",), "optional_sigmas":("SIGMAS",), }, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"}, } RETURN_TYPES = ("PIPE_LINE", ) RETURN_NAMES = ("pipe",) FUNCTION = "settings" CATEGORY = "EasyUse/PreSampling" def ip2p(self, positive, negative, vae, pixels, latent=None): if latent is not None: concat_latent = latent else: x = (pixels.shape[1] // 8) * 8 y = (pixels.shape[2] // 8) * 8 if pixels.shape[1] != x or pixels.shape[2] != y: x_offset = (pixels.shape[1] % 8) // 2 y_offset = (pixels.shape[2] % 8) // 2 pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] concat_latent = vae.encode(pixels) out_latent = {} out_latent["samples"] = torch.zeros_like(concat_latent) out = [] for conditioning in [positive, negative]: c = [] for t in conditioning: d = t[1].copy() d["concat_latent_image"] = concat_latent n = [t[0], d] c.append(n) out.append(c) return (out[0], out[1], out_latent) def settings(self, pipe, guider, cfg, cfg_negative, sampler_name, scheduler, coeff, steps, sigma_max, sigma_min, rho, beta_d, beta_min, eps_s, flip_sigmas, denoise, add_noise, seed, image_to_latent=None, latent=None, optional_sampler=None, optional_sigmas=None, prompt=None, extra_pnginfo=None, my_unique_id=None): # 图生图转换 vae = pipe["vae"] model = pipe["model"] positive = pipe['positive'] negative = pipe['negative'] batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1 if image_to_latent is not None: _, height, width, _ = image_to_latent.shape if height == 1 and width == 1: samples = pipe["samples"] images = pipe["images"] else: if "IP2P" in guider: positive, negative, latent = self.ip2p(pipe['positive'], pipe['negative'], vae, image_to_latent) samples = latent else: samples = {"samples": vae.encode(image_to_latent[:, :, :, :3])} samples = RepeatLatentBatch().repeat(samples, batch_size)[0] images = image_to_latent elif latent is not None: if "IP2P" in guider: positive, negative, latent = self.ip2p(pipe['positive'], pipe['negative'], latent=latent) samples = latent else: samples = latent images = pipe["images"] else: samples = pipe["samples"] images = pipe["images"] new_pipe = { "model": model, "positive": positive, "negative": negative, "vae": pipe['vae'], "clip": pipe['clip'], "samples": samples, "images": images, "seed": seed, "loader_settings": { **pipe["loader_settings"], "middle": pipe['negative'], "steps": steps, "cfg": cfg, "cfg_negative": cfg_negative, "sampler_name": sampler_name, "scheduler": scheduler, "denoise": denoise, "add_noise": add_noise, "custom": { "guider": guider, "coeff": coeff, "sigma_max": sigma_max, "sigma_min": sigma_min, "rho": rho, "beta_d": beta_d, "beta_min": beta_min, "eps_s": beta_min, "flip_sigmas": flip_sigmas }, "optional_sampler": optional_sampler, "optional_sigmas": optional_sigmas } } del pipe return {"ui": {"value": [seed]}, "result": (new_pipe,)} # 预采样设置(SDTurbo) from ..libs.gradual_latent_hires_fix import sample_dpmpp_2s_ancestral, sample_dpmpp_2m_sde, sample_lcm, sample_euler_ancestral class sdTurboSettings: def __init__(self): pass @classmethod def INPUT_TYPES(cls): return {"required": { "pipe": ("PIPE_LINE",), "steps": ("INT", {"default": 1, "min": 1, "max": 10}), "cfg": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0}), "sampler_name": (comfy.samplers.SAMPLER_NAMES,), "eta": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01, "round": False}), "s_noise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01, "round": False}), "upscale_ratio": ("FLOAT", {"default": 2.0, "min": 0.0, "max": 16.0, "step": 0.01, "round": False}), "start_step": ("INT", {"default": 5, "min": 0, "max": 1000, "step": 1}), "end_step": ("INT", {"default": 15, "min": 0, "max": 1000, "step": 1}), "upscale_n_step": ("INT", {"default": 3, "min": 0, "max": 1000, "step": 1}), "unsharp_kernel_size": ("INT", {"default": 3, "min": 1, "max": 21, "step": 1}), "unsharp_sigma": ("FLOAT", {"default": 0.5, "min": 0.0, "max": 10.0, "step": 0.01, "round": False}), "unsharp_strength": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 10.0, "step": 0.01, "round": False}), "seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}), }, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"}, } RETURN_TYPES = ("PIPE_LINE",) RETURN_NAMES = ("pipe",) FUNCTION = "settings" CATEGORY = "EasyUse/PreSampling" def settings(self, pipe, steps, cfg, sampler_name, eta, s_noise, upscale_ratio, start_step, end_step, upscale_n_step, unsharp_kernel_size, unsharp_sigma, unsharp_strength, seed, prompt=None, extra_pnginfo=None, my_unique_id=None): model = pipe['model'] # sigma timesteps = torch.flip(torch.arange(1, 11) * 100 - 1, (0,))[:steps] sigmas = model.model.model_sampling.sigma(timesteps) sigmas = torch.cat([sigmas, sigmas.new_zeros([1])]) #sampler sample_function = None extra_options = { "eta": eta, "s_noise": s_noise, "upscale_ratio": upscale_ratio, "start_step": start_step, "end_step": end_step, "upscale_n_step": upscale_n_step, "unsharp_kernel_size": unsharp_kernel_size, "unsharp_sigma": unsharp_sigma, "unsharp_strength": unsharp_strength, } if sampler_name == "euler_ancestral": sample_function = sample_euler_ancestral elif sampler_name == "dpmpp_2s_ancestral": sample_function = sample_dpmpp_2s_ancestral elif sampler_name == "dpmpp_2m_sde": sample_function = sample_dpmpp_2m_sde elif sampler_name == "lcm": sample_function = sample_lcm if sample_function is not None: unsharp_kernel_size = unsharp_kernel_size if unsharp_kernel_size % 2 == 1 else unsharp_kernel_size + 1 extra_options["unsharp_kernel_size"] = unsharp_kernel_size _sampler = comfy.samplers.KSAMPLER(sample_function, extra_options) else: _sampler = comfy.samplers.sampler_object(sampler_name) extra_options = None new_pipe = { "model": pipe['model'], "positive": pipe['positive'], "negative": pipe['negative'], "vae": pipe['vae'], "clip": pipe['clip'], "samples": pipe["samples"], "images": pipe["images"], "seed": seed, "loader_settings": { **pipe["loader_settings"], "extra_options": extra_options, "sampler": _sampler, "sigmas": sigmas, "steps": steps, "cfg": cfg, "add_noise": "enabled" } } del pipe return {"ui": {"value": [seed]}, "result": (new_pipe,)} # cascade预采样参数 class cascadeSettings: def __init__(self): pass @classmethod def INPUT_TYPES(cls): return {"required": {"pipe": ("PIPE_LINE",), "encode_vae_name": (["None"] + folder_paths.get_filename_list("vae"),), "decode_vae_name": (["None"] + folder_paths.get_filename_list("vae"),), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "cfg": ("FLOAT", {"default": 4.0, "min": 0.0, "max": 100.0}), "sampler_name": (comfy.samplers.KSampler.SAMPLERS, {"default":"euler_ancestral"}), "scheduler": (comfy.samplers.KSampler.SCHEDULERS, {"default":"simple"}), "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), "seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}), }, "optional": { "image_to_latent_c": ("IMAGE",), "latent_c": ("LATENT",), }, "hidden":{"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"}, } RETURN_TYPES = ("PIPE_LINE",) RETURN_NAMES = ("pipe",) FUNCTION = "settings" CATEGORY = "EasyUse/PreSampling" def settings(self, pipe, encode_vae_name, decode_vae_name, steps, cfg, sampler_name, scheduler, denoise, seed, model=None, image_to_latent_c=None, latent_c=None, prompt=None, extra_pnginfo=None, my_unique_id=None): images, samples_c = None, None samples = pipe['samples'] batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1 encode_vae_name = encode_vae_name if encode_vae_name is not None else pipe['loader_settings']['encode_vae_name'] decode_vae_name = decode_vae_name if decode_vae_name is not None else pipe['loader_settings']['decode_vae_name'] if image_to_latent_c is not None: if encode_vae_name != 'None': encode_vae = easyCache.load_vae(encode_vae_name) else: encode_vae = pipe['vae'][0] if "compression" not in pipe["loader_settings"]: raise Exception("compression is not found") compression = pipe["loader_settings"]['compression'] width = image_to_latent_c.shape[-2] height = image_to_latent_c.shape[-3] out_width = (width // compression) * encode_vae.downscale_ratio out_height = (height // compression) * encode_vae.downscale_ratio s = comfy.utils.common_upscale(image_to_latent_c.movedim(-1, 1), out_width, out_height, "bicubic", "center").movedim(1, -1) c_latent = encode_vae.encode(s[:, :, :, :3]) b_latent = torch.zeros([c_latent.shape[0], 4, height // 4, width // 4]) samples_c = {"samples": c_latent} samples_c = RepeatLatentBatch().repeat(samples_c, batch_size)[0] samples_b = {"samples": b_latent} samples_b = RepeatLatentBatch().repeat(samples_b, batch_size)[0] samples = (samples_c, samples_b) images = image_to_latent_c elif latent_c is not None: samples_c = latent_c samples = (samples_c, samples[1]) images = pipe["images"] if samples_c is not None: samples = (samples_c, samples[1]) new_pipe = { "model": pipe['model'], "positive": pipe['positive'], "negative": pipe['negative'], "vae": pipe['vae'], "clip": pipe['clip'], "samples": samples, "images": images, "seed": seed, "loader_settings": { **pipe["loader_settings"], "encode_vae_name": encode_vae_name, "decode_vae_name": decode_vae_name, "steps": steps, "cfg": cfg, "sampler_name": sampler_name, "scheduler": scheduler, "denoise": denoise, "add_noise": "enabled" } } sampler.update_value_by_id("pipe_line", my_unique_id, new_pipe) del pipe return {"ui": {"value": [seed]}, "result": (new_pipe,)} # layerDiffusion预采样参数 class layerDiffusionSettings: def __init__(self): pass @classmethod def INPUT_TYPES(cls): return {"required": { "pipe": ("PIPE_LINE",), "method": ([LayerMethod.FG_ONLY_ATTN.value, LayerMethod.FG_ONLY_CONV.value, LayerMethod.EVERYTHING.value, LayerMethod.FG_TO_BLEND.value, LayerMethod.BG_TO_BLEND.value],), "weight": ("FLOAT",{"default": 1.0, "min": -1, "max": 3, "step": 0.05},), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}), "sampler_name": (comfy.samplers.KSampler.SAMPLERS, {"default": "euler"}), "scheduler": (comfy.samplers.KSampler.SCHEDULERS+ NEW_SCHEDULERS, {"default": "normal"}), "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), "seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}), }, "optional": { "image": ("IMAGE",), "blended_image": ("IMAGE",), "mask": ("MASK",), # "latent": ("LATENT",), # "blended_latent": ("LATENT",), }, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"}, } RETURN_TYPES = ("PIPE_LINE",) RETURN_NAMES = ("pipe",) FUNCTION = "settings" CATEGORY = "EasyUse/PreSampling" def get_layer_diffusion_method(self, method, has_blend_latent): method = LayerMethod(method) if has_blend_latent: if method == LayerMethod.BG_TO_BLEND: method = LayerMethod.BG_BLEND_TO_FG elif method == LayerMethod.FG_TO_BLEND: method = LayerMethod.FG_BLEND_TO_BG return method def settings(self, pipe, method, weight, steps, cfg, sampler_name, scheduler, denoise, seed, image=None, blended_image=None, mask=None, prompt=None, extra_pnginfo=None, my_unique_id=None): blend_samples = pipe['blend_samples'] if "blend_samples" in pipe else None vae = pipe["vae"] batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1 method = self.get_layer_diffusion_method(method, blend_samples is not None or blended_image is not None) if image is not None or "image" in pipe: image = image if image is not None else pipe['image'] if mask is not None: print('inpaint') samples, = VAEEncodeForInpaint().encode(vae, image, mask) else: samples = {"samples": vae.encode(image[:,:,:,:3])} samples = RepeatLatentBatch().repeat(samples, batch_size)[0] images = image elif "samp_images" in pipe: samples = {"samples": vae.encode(pipe["samp_images"][:,:,:,:3])} samples = RepeatLatentBatch().repeat(samples, batch_size)[0] images = pipe["samp_images"] else: if method not in [LayerMethod.FG_ONLY_ATTN, LayerMethod.FG_ONLY_CONV, LayerMethod.EVERYTHING]: raise Exception("image is missing") samples = pipe["samples"] images = pipe["images"] if method in [LayerMethod.BG_BLEND_TO_FG, LayerMethod.FG_BLEND_TO_BG]: if blended_image is None and blend_samples is None: raise Exception("blended_image is missing") elif blended_image is not None: blend_samples = {"samples": vae.encode(blended_image[:,:,:,:3])} blend_samples = RepeatLatentBatch().repeat(blend_samples, batch_size)[0] new_pipe = { "model": pipe['model'], "positive": pipe['positive'], "negative": pipe['negative'], "vae": pipe['vae'], "clip": pipe['clip'], "samples": samples, "blend_samples": blend_samples, "images": images, "seed": seed, "loader_settings": { **pipe["loader_settings"], "steps": steps, "cfg": cfg, "sampler_name": sampler_name, "scheduler": scheduler, "denoise": denoise, "add_noise": "enabled", "layer_diffusion_method": method, "layer_diffusion_weight": weight, } } del pipe return {"ui": {"value": [seed]}, "result": (new_pipe,)} # 预采样设置(layerDiffuse附加) class layerDiffusionSettingsADDTL: def __init__(self): pass @classmethod def INPUT_TYPES(cls): return {"required": { "pipe": ("PIPE_LINE",), "foreground_prompt": ("STRING", {"default": "", "placeholder": "Foreground Additional Prompt", "multiline": True}), "background_prompt": ("STRING", {"default": "", "placeholder": "Background Additional Prompt", "multiline": True}), "blended_prompt": ("STRING", {"default": "", "placeholder": "Blended Additional Prompt", "multiline": True}), }, "optional": { "optional_fg_cond": ("CONDITIONING",), "optional_bg_cond": ("CONDITIONING",), "optional_blended_cond": ("CONDITIONING",), }, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"}, } RETURN_TYPES = ("PIPE_LINE",) RETURN_NAMES = ("pipe",) FUNCTION = "settings" CATEGORY = "EasyUse/PreSampling" def settings(self, pipe, foreground_prompt, background_prompt, blended_prompt, optional_fg_cond=None, optional_bg_cond=None, optional_blended_cond=None, prompt=None, extra_pnginfo=None, my_unique_id=None): fg_cond, bg_cond, blended_cond = None, None, None clip = pipe['clip'] if optional_fg_cond is not None: fg_cond = optional_fg_cond elif foreground_prompt != "": fg_cond, = CLIPTextEncode().encode(clip, foreground_prompt) if optional_bg_cond is not None: bg_cond = optional_bg_cond elif background_prompt != "": bg_cond, = CLIPTextEncode().encode(clip, background_prompt) if optional_blended_cond is not None: blended_cond = optional_blended_cond elif blended_prompt != "": blended_cond, = CLIPTextEncode().encode(clip, blended_prompt) new_pipe = { **pipe, "loader_settings": { **pipe["loader_settings"], "layer_diffusion_cond": (fg_cond, bg_cond, blended_cond) } } del pipe return (new_pipe,) # 预采样设置(动态CFG) from ..libs.dynthres_core import DynThresh class dynamicCFGSettings: def __init__(self): pass @classmethod def INPUT_TYPES(cls): return {"required": {"pipe": ("PIPE_LINE",), "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0}), "cfg_mode": (DynThresh.Modes,), "cfg_scale_min": ("FLOAT", {"default": 3.5, "min": 0.0, "max": 100.0, "step": 0.5}), "sampler_name": (comfy.samplers.KSampler.SAMPLERS,), "scheduler": (comfy.samplers.KSampler.SCHEDULERS+NEW_SCHEDULERS,), "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), "seed": ("INT", {"default": 0, "min": 0, "max": MAX_SEED_NUM}), }, "optional":{ "image_to_latent": ("IMAGE",), "latent": ("LATENT",) }, "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", "my_unique_id": "UNIQUE_ID"}, } RETURN_TYPES = ("PIPE_LINE",) RETURN_NAMES = ("pipe",) FUNCTION = "settings" CATEGORY = "EasyUse/PreSampling" def settings(self, pipe, steps, cfg, cfg_mode, cfg_scale_min,sampler_name, scheduler, denoise, seed, image_to_latent=None, latent=None, prompt=None, extra_pnginfo=None, my_unique_id=None): dynamic_thresh = DynThresh(7.0, 1.0,"CONSTANT", 0, cfg_mode, cfg_scale_min, 0, 0, 999, False, "MEAN", "AD", 1) def sampler_dyn_thresh(args): input = args["input"] cond = input - args["cond"] uncond = input - args["uncond"] cond_scale = args["cond_scale"] time_step = args["timestep"] dynamic_thresh.step = 999 - time_step[0] return input - dynamic_thresh.dynthresh(cond, uncond, cond_scale, None) model = pipe['model'] m = model.clone() m.set_model_sampler_cfg_function(sampler_dyn_thresh) # 图生图转换 vae = pipe["vae"] batch_size = pipe["loader_settings"]["batch_size"] if "batch_size" in pipe["loader_settings"] else 1 if image_to_latent is not None: samples = {"samples": vae.encode(image_to_latent[:, :, :, :3])} samples = RepeatLatentBatch().repeat(samples, batch_size)[0] images = image_to_latent elif latent is not None: samples = RepeatLatentBatch().repeat(latent, batch_size)[0] images = pipe["images"] else: samples = pipe["samples"] images = pipe["images"] new_pipe = { "model": m, "positive": pipe['positive'], "negative": pipe['negative'], "vae": pipe['vae'], "clip": pipe['clip'], "samples": samples, "images": images, "seed": seed, "loader_settings": { **pipe["loader_settings"], "steps": steps, "cfg": cfg, "sampler_name": sampler_name, "scheduler": scheduler, "denoise": denoise }, } del pipe return {"ui": {"value": [seed]}, "result": (new_pipe,)} # 动态CFG class dynamicThresholdingFull: @classmethod def INPUT_TYPES(s): return { "required": { "model": ("MODEL",), "mimic_scale": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0, "step": 0.5}), "threshold_percentile": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), "mimic_mode": (DynThresh.Modes,), "mimic_scale_min": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 100.0, "step": 0.5}), "cfg_mode": (DynThresh.Modes,), "cfg_scale_min": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 100.0, "step": 0.5}), "sched_val": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 100.0, "step": 0.01}), "separate_feature_channels": (["enable", "disable"],), "scaling_startpoint": (DynThresh.Startpoints,), "variability_measure": (DynThresh.Variabilities,), "interpolate_phi": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}), } } RETURN_TYPES = ("MODEL",) FUNCTION = "patch" CATEGORY = "EasyUse/PreSampling" def patch(self, model, mimic_scale, threshold_percentile, mimic_mode, mimic_scale_min, cfg_mode, cfg_scale_min, sched_val, separate_feature_channels, scaling_startpoint, variability_measure, interpolate_phi): dynamic_thresh = DynThresh(mimic_scale, threshold_percentile, mimic_mode, mimic_scale_min, cfg_mode, cfg_scale_min, sched_val, 0, 999, separate_feature_channels == "enable", scaling_startpoint, variability_measure, interpolate_phi) def sampler_dyn_thresh(args): input = args["input"] cond = input - args["cond"] uncond = input - args["uncond"] cond_scale = args["cond_scale"] time_step = args["timestep"] dynamic_thresh.step = 999 - time_step[0] return input - dynamic_thresh.dynthresh(cond, uncond, cond_scale, None) m = model.clone() m.set_model_sampler_cfg_function(sampler_dyn_thresh) return (m,) NODE_CLASS_MAPPINGS = { "easy preSampling": samplerSettings, "easy preSamplingAdvanced": samplerSettingsAdvanced, "easy preSamplingNoiseIn": samplerSettingsNoiseIn, "easy preSamplingCustom": samplerCustomSettings, "easy preSamplingSdTurbo": sdTurboSettings, "easy preSamplingDynamicCFG": dynamicCFGSettings, "easy preSamplingCascade": cascadeSettings, "easy preSamplingLayerDiffusion": layerDiffusionSettings, "easy preSamplingLayerDiffusionADDTL": layerDiffusionSettingsADDTL, "dynamicThresholdingFull": dynamicThresholdingFull, } NODE_DISPLAY_NAME_MAPPINGS = { "easy preSampling": "PreSampling", "easy preSamplingAdvanced": "PreSampling (Advanced)", "easy preSamplingNoiseIn": "PreSampling (NoiseIn)", "easy preSamplingCustom": "PreSampling (Custom)", "easy preSamplingSdTurbo": "PreSampling (SDTurbo)", "easy preSamplingDynamicCFG": "PreSampling (DynamicCFG)", "easy preSamplingCascade": "PreSampling (Cascade)", "easy preSamplingLayerDiffusion": "PreSampling (LayerDiffuse)", "easy preSamplingLayerDiffusionADDTL": "PreSampling (LayerDiffuse ADDTL)", "dynamicThresholdingFull": "DynamicThresholdingFull", }