Files
jaidaken f09734b0ee
Some checks failed
Python Linting / Run Ruff (push) Has been cancelled
Python Linting / Run Pylint (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.10, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.11, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.12, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-unix-nightly (12.1, , linux, 3.11, [self-hosted Linux], nightly) (push) Has been cancelled
Execution Tests / test (macos-latest) (push) Has been cancelled
Execution Tests / test (ubuntu-latest) (push) Has been cancelled
Execution Tests / test (windows-latest) (push) Has been cancelled
Test server launches without errors / test (push) Has been cancelled
Unit Tests / test (macos-latest) (push) Has been cancelled
Unit Tests / test (ubuntu-latest) (push) Has been cancelled
Unit Tests / test (windows-2022) (push) Has been cancelled
Add custom nodes, Civitai loras (LFS), and vast.ai setup script
Includes 30 custom nodes committed directly, 7 Civitai-exclusive
loras stored via Git LFS, and a setup script that installs all
dependencies and downloads HuggingFace-hosted models on vast.ai.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 00:56:42 +00:00

236 lines
9.0 KiB
Python

from contextlib import suppress
from typing import Callable
import torch
import comfy.model_management
from comfy.comfy_types.node_typing import IO, ComfyNodeABC, InputTypeDict
from comfy.ldm.modules.attention import BasicTransformerBlock, CrossAttention, optimized_attention
from comfy.model_base import BaseModel
from comfy.model_patcher import ModelPatcher
from .guidance_utils import parse_unet_blocks
COND = 0
UNCOND = 1
def nag_attn2_replace_wrapper(
nag_scale: float,
tau: float,
alpha: float,
sigma_start: float,
sigma_end: float,
k_neg: torch.Tensor,
v_neg: torch.Tensor,
prev_attn2_replace: Callable | None = None,
):
# Modified Algorithm 1 from 2505.21179 'Normalized Attention Guidance: Universal Negative Guidance for Diffusion Models'
def nag_attn2_replace(q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, extra_options: dict):
heads = extra_options["n_heads"]
attn_precision = extra_options.get("attn_precision")
sigma = extra_options["sigmas"]
cond_or_uncond: list[int] = extra_options.get("cond_or_uncond") # type: ignore
# Perform batched CA
z = (
optimized_attention(q, k, v, heads, attn_precision)
if prev_attn2_replace is None
else prev_attn2_replace(q, k, v, extra_options)
)
if nag_scale == 0 or not (sigma_end < sigma[0] <= sigma_start) or COND not in cond_or_uncond:
return z
bs = q.shape[0] // len(cond_or_uncond) * cond_or_uncond.count(COND)
k_neg_, v_neg_ = k_neg.repeat_interleave(bs, dim=0), v_neg.repeat_interleave(bs, dim=0)
# Get conditional queries for NAG
# Assume that cond_or_uncond has a layout [1, 1..., 0, 0...]
q_chunked = q.chunk(len(cond_or_uncond))
q_pos = torch.cat(q_chunked[cond_or_uncond.index(COND) :])
# Apply NAG only to conditional parts of batched CA
z_chunked = z.chunk(len(cond_or_uncond))
z_pos = torch.cat(z_chunked[cond_or_uncond.index(COND) :])
z_neg = optimized_attention(q_pos, k_neg_, v_neg_, heads, attn_precision)
z_tilde = z_pos + nag_scale * (z_pos - z_neg)
norm_pos = torch.norm(z_pos, p=1, dim=-1, keepdim=True)
norm_tilde = torch.norm(z_tilde, p=1, dim=-1, keepdim=True)
ratio = norm_tilde / norm_pos
z_hat = torch.where(ratio > tau, tau, ratio) / ratio * z_tilde
z_nag = alpha * z_hat + (1 - alpha) * z_pos
# Prepend unconditional CA result to NAG result
if UNCOND in cond_or_uncond:
z_nag = torch.cat(z_chunked[cond_or_uncond.index(UNCOND) : cond_or_uncond.index(COND)] + (z_nag,))
return z_nag
return nag_attn2_replace
class NormalizedAttentionGuidance(ComfyNodeABC):
@classmethod
def INPUT_TYPES(cls) -> InputTypeDict:
return {
"required": {
"model": (
IO.MODEL,
{
"tooltip": (
"The diffusion model.\n"
"If you are using any other attn2 replacer (such as `IPAdapter`), you should place this node after it."
)
},
),
"negative": (
IO.CONDITIONING,
{"tooltip": "Negative conditioning: either the one you use for CFG or a completely different one."},
),
"scale": (
IO.FLOAT,
{
"default": 2.0,
"min": 0.0,
"max": 100.0,
"step": 0.1,
"round": 0.01,
"tooltip": "Scale of NAG, does nothing when `tau=0`.",
},
),
"tau": (
IO.FLOAT,
{
"default": 2.5,
"min": 0.0,
"max": 100.0,
"step": 0.1,
"round": 0.01,
"tooltip": "Normalization threshold, larger value should increase `scale` impact.",
},
),
"alpha": (
IO.FLOAT,
{
"default": 0.5,
"min": 0.0,
"max": 1.0,
"step": 0.001,
"round": 0.001,
"tooltip": "Linear interpolation between original (at `alpha=0`) and NAG (at `alpha=1`) results.",
},
),
"sigma_start": (IO.FLOAT, {"default": -1.0, "min": -1.0, "max": 10000.0, "step": 0.01, "round": False}),
"sigma_end": (IO.FLOAT, {"default": -1.0, "min": -1.0, "max": 10000.0, "step": 0.01, "round": False}),
},
"optional": {
"unet_block_list": (
IO.STRING,
{
"default": "",
"tooltip": (
"Comma-separated blocks to which NAG is being applied to. When the list is empty, NAG is being applied to all block.\n"
"Read README from sd-perturbed-attention for more details."
),
},
),
},
}
RETURN_TYPES = (IO.MODEL,)
FUNCTION = "patch"
DESCRIPTION = (
"An additional way to apply negative prompts to the image.\n"
"It's compatible with CFG, PAG, and other guidances, and can be used with guidance- and step-distilled models as well.\n"
"It's also compatible with other attn2 replacers (such as `IPAdapter`) - but make sure to place NAG node **after** other model patches!"
)
CATEGORY = "model_patches/unet"
def patch(
self,
model: ModelPatcher,
negative,
scale=2.0,
tau=2.5,
alpha=0.5,
sigma_start: float = -1.0,
sigma_end: float = -1.0,
unet_block_list="",
):
m = model.clone()
inner_model: BaseModel = m.model
dtype = inner_model.get_dtype()
if inner_model.manual_cast_dtype is not None:
dtype = inner_model.manual_cast_dtype
device_model = inner_model.device
device_infer = comfy.model_management.get_torch_device()
sigma_start = float("inf") if sigma_start < 0 else sigma_start
negative_cond = negative[0][0].to(device_model, dtype=dtype)
blocks, block_names = parse_unet_blocks(m, unet_block_list, "attn2") if unet_block_list else (None, None)
# Apply NAG only to transformer blocks with cross-attention (attn2)
for name, module in (
(n, m)
for n, m in inner_model.diffusion_model.named_modules()
if isinstance(m, BasicTransformerBlock) and getattr(m, "attn2", None)
):
attn2: CrossAttention = module.attn2 # type: ignore
parts: list[str] = name.split(".")
block_name: str = parts[0].split("_")[0]
block_id = int(parts[1])
if block_name == "middle":
block_id = block_id - 1
t_idx = None
if "transformer_blocks" in parts:
t_pos = parts.index("transformer_blocks") + 1
t_idx = int(parts[t_pos])
if not blocks or (block_name, block_id, t_idx) in blocks or (block_name, block_id, None) in blocks:
k_neg, v_neg = attn2.to_k(negative_cond), attn2.to_v(negative_cond)
# Compatibility with other attn2 replaces (such as IPAdapter)
prev_attn2_replace = None
with suppress(KeyError):
block = (block_name, block_id, t_idx)
block_full = (block_name, block_id)
attn2_patches = m.model_options["transformer_options"]["patches_replace"]["attn2"]
if block_full in attn2_patches:
prev_attn2_replace = attn2_patches[block_full]
elif block in attn2_patches:
prev_attn2_replace = attn2_patches[block]
nag_attn2_replace = nag_attn2_replace_wrapper(
scale,
tau,
alpha,
sigma_start,
sigma_end,
k_neg.to(device_infer, dtype=dtype),
v_neg.to(device_infer, dtype=dtype),
prev_attn2_replace,
)
m.set_model_attn2_replace(nag_attn2_replace, block_name, block_id, t_idx)
return (m,)
NODE_CLASS_MAPPINGS = {
"NormalizedAttentionGuidance": NormalizedAttentionGuidance,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"NormalizedAttentionGuidance": "Normalized Attention Guidance",
}