Files
jaidaken f09734b0ee
Some checks failed
Python Linting / Run Ruff (push) Has been cancelled
Python Linting / Run Pylint (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.10, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.11, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.12, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-unix-nightly (12.1, , linux, 3.11, [self-hosted Linux], nightly) (push) Has been cancelled
Execution Tests / test (macos-latest) (push) Has been cancelled
Execution Tests / test (ubuntu-latest) (push) Has been cancelled
Execution Tests / test (windows-latest) (push) Has been cancelled
Test server launches without errors / test (push) Has been cancelled
Unit Tests / test (macos-latest) (push) Has been cancelled
Unit Tests / test (ubuntu-latest) (push) Has been cancelled
Unit Tests / test (windows-2022) (push) Has been cancelled
Add custom nodes, Civitai loras (LFS), and vast.ai setup script
Includes 30 custom nodes committed directly, 7 Civitai-exclusive
loras stored via Git LFS, and a setup script that installs all
dependencies and downloads HuggingFace-hosted models on vast.ai.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 00:56:42 +00:00

585 lines
33 KiB
Python

import os
from datetime import datetime
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import json
import numpy as np
import re
from PIL import Image
import torch
import folder_paths
from nodes import MAX_RESOLUTION
from .saver.saver import save_image
from .utils import sanitize_filename, get_sha256, full_checkpoint_path_for
from .utils_civitai import get_civitai_sampler_name, get_civitai_metadata, MAX_HASH_LENGTH
from .prompt_metadata_extractor import PromptMetadataExtractor
def parse_checkpoint_name(ckpt_name: str) -> str:
return os.path.basename(ckpt_name)
def parse_checkpoint_name_without_extension(ckpt_name: str) -> str:
filename = parse_checkpoint_name(ckpt_name)
name_without_ext, ext = os.path.splitext(filename)
supported_extensions = folder_paths.supported_pt_extensions | {".gguf"}
# Only remove extension if it's a known model file extension
if ext.lower() in supported_extensions:
return name_without_ext
else:
return filename # Keep full name if extension isn't recognized
def get_timestamp(time_format: str) -> str:
now = datetime.now()
try:
timestamp = now.strftime(time_format)
except:
timestamp = now.strftime("%Y-%m-%d-%H%M%S")
return timestamp
def apply_custom_time_format(filename: str) -> str:
"""
Replace %time_format<strftime_format> patterns with formatted datetime.
Example: %time_format<%Y-%m-%d> becomes 2026-01-17
"""
now = datetime.now()
# Pattern to match %time_format<XXX> where XXX is any strftime format string
# Use negative lookahead to exclude %time_format itself from variable delimiters
pattern = r'%time_format<([^>]*)>'
def replace_format(match):
format_str = match.group(1)
try:
return now.strftime(format_str)
except:
# If format is invalid, return original
return match.group(0)
return re.sub(pattern, replace_format, filename)
def save_json(image_info: dict[str, Any] | None, filename: str) -> None:
try:
workflow = (image_info or {}).get('workflow')
if workflow is None:
print('No image info found, skipping saving of JSON')
with open(f'{filename}.json', 'w') as workflow_file:
json.dump(workflow, workflow_file)
print(f'Saved workflow to {filename}.json')
except Exception as e:
print(f'Failed to save workflow as json due to: {e}, proceeding with the remainder of saving execution')
def make_pathname(filename: str, width: int, height: int, seed: int, modelname: str, counter: int, time_format: str, sampler_name: str, steps: int, cfg: float, scheduler_name: str, denoise: float, clip_skip: int, custom: str) -> str:
# Process custom time_format patterns first
filename = apply_custom_time_format(filename)
filename = filename.replace("%date", get_timestamp("%Y-%m-%d"))
filename = filename.replace("%time", get_timestamp(time_format))
filename = filename.replace("%model", parse_checkpoint_name(modelname))
filename = filename.replace("%width", str(width))
filename = filename.replace("%height", str(height))
filename = filename.replace("%seed", str(seed))
filename = filename.replace("%counter", str(counter))
filename = filename.replace("%sampler_name", sampler_name)
filename = filename.replace("%steps", str(steps))
filename = filename.replace("%cfg", str(cfg))
filename = filename.replace("%scheduler_name", scheduler_name)
filename = filename.replace("%basemodelname", parse_checkpoint_name_without_extension(modelname))
filename = filename.replace("%denoise", str(denoise))
filename = filename.replace("%clip_skip", str(clip_skip))
filename = filename.replace("%custom", custom)
directory, basename = os.path.split(filename)
sanitized_basename = sanitize_filename(basename)
return os.path.join(directory, sanitized_basename)
def make_filename(filename: str, width: int, height: int, seed: int, modelname: str, counter: int, time_format: str, sampler_name: str, steps: int, cfg: float, scheduler_name: str, denoise: float, clip_skip: int, custom: str) -> str:
filename = make_pathname(filename, width, height, seed, modelname, counter, time_format, sampler_name, steps, cfg, scheduler_name, denoise, clip_skip, custom)
return get_timestamp(time_format) if filename == "" else filename
@dataclass
class Metadata:
modelname: str
positive: str
negative: str
width: int
height: int
seed: int
steps: int
cfg: float
sampler_name: str
scheduler_name: str
denoise: float
clip_skip: int
custom: str
additional_hashes: str
ckpt_path: str
a111_params: str
final_hashes: str
class ImageSaverMetadata:
@classmethod
def INPUT_TYPES(cls) -> dict[str, Any]:
return {
"optional": {
"modelname": ("STRING", {"default": '', "multiline": False, "tooltip": "model name (can be multiple, separated by commas)"}),
"positive": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "positive prompt"}),
"negative": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "negative prompt"}),
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image width"}),
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image height"}),
"seed_value": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "seed"}),
"steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "number of steps"}),
"cfg": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0, "tooltip": "CFG value"}),
"sampler_name": ("STRING", {"default": '', "multiline": False, "tooltip": "sampler name (as string)"}),
"scheduler_name": ("STRING", {"default": 'normal', "multiline": False, "tooltip": "scheduler name (as string)"}),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "tooltip": "denoise value"}),
"clip_skip": ("INT", {"default": 0, "min": -24, "max": 24, "tooltip": "skip last CLIP layers (positive or negative value, 0 for no skip)"}),
"additional_hashes": ("STRING", {"default": "", "multiline": False, "tooltip": "hashes separated by commas, optionally with names. 'Name:HASH' (e.g., 'MyLoRA:FF735FF83F98')\nWith download_civitai_data set to true, weights can be added as well. (e.g., 'HASH:Weight', 'Name:HASH:Weight')"}),
"download_civitai_data": ("BOOLEAN", {"default": True, "tooltip": "Download and cache data from civitai.com to save correct metadata. Allows LoRA weights to be saved to the metadata."}),
"easy_remix": ("BOOLEAN", {"default": True, "tooltip": "Strip LoRAs and simplify 'embedding:path' from the prompt to make the Remix option on civitai.com more seamless."}),
"custom": ("STRING", {"default": "", "multiline": False, "tooltip": "custom string to add to the metadata, inserted into the a111 string between clip skip and model hash"}),
},
}
RETURN_TYPES = ("METADATA","STRING","STRING")
RETURN_NAMES = ("metadata","hashes","a1111_params")
OUTPUT_TOOLTIPS = ("metadata for Image Saver Simple","Comma-separated list of the hashes to chain with other Image Saver additional_hashes","Written parameters to the image metadata")
FUNCTION = "get_metadata"
CATEGORY = "ImageSaver"
DESCRIPTION = "Prepare metadata for Image Saver Simple"
def get_metadata(
self,
modelname: str = "",
positive: str = "unknown",
negative: str = "unknown",
width: int = 512,
height: int = 512,
seed_value: int = 0,
steps: int = 20,
cfg: float = 7.0,
sampler_name: str = "",
scheduler_name: str = "normal",
denoise: float = 1.0,
clip_skip: int = 0,
custom: str = "",
additional_hashes: str = "",
download_civitai_data: bool = True,
easy_remix: bool = True,
) -> tuple[Metadata, str, str]:
metadata = ImageSaverMetadata.make_metadata(modelname, positive, negative, width, height, seed_value, steps, cfg, sampler_name, scheduler_name, denoise, clip_skip, custom, additional_hashes, download_civitai_data, easy_remix)
return (metadata, metadata.final_hashes, metadata.a111_params)
@staticmethod
def make_metadata(modelname: str, positive: str, negative: str, width: int, height: int, seed_value: int, steps: int, cfg: float, sampler_name: str, scheduler_name: str, denoise: float, clip_skip: int, custom: str, additional_hashes: str, download_civitai_data: bool, easy_remix: bool) -> Metadata:
modelname, additional_hashes = ImageSaver.get_multiple_models(modelname, additional_hashes)
ckpt_path = full_checkpoint_path_for(modelname)
if ckpt_path:
modelhash = get_sha256(ckpt_path)[:10]
else:
modelhash = ""
metadata_extractor = PromptMetadataExtractor([positive, negative])
embeddings = metadata_extractor.get_embeddings()
loras = metadata_extractor.get_loras()
civitai_sampler_name = get_civitai_sampler_name(sampler_name.replace('_gpu', ''), scheduler_name)
basemodelname = parse_checkpoint_name_without_extension(modelname)
# Get existing hashes from model, loras, and embeddings
existing_hashes = {modelhash.lower()} | {t[2].lower() for t in loras.values()} | {t[2].lower() for t in embeddings.values()}
# Parse manual hashes
manual_entries = ImageSaver.parse_manual_hashes(additional_hashes, existing_hashes, download_civitai_data)
# Get Civitai metadata
civitai_resources, hashes, add_model_hash = get_civitai_metadata(modelname, ckpt_path, modelhash, loras, embeddings, manual_entries, download_civitai_data)
if easy_remix:
positive = ImageSaver.clean_prompt(positive, metadata_extractor)
negative = ImageSaver.clean_prompt(negative, metadata_extractor)
positive_a111_params = positive.strip()
negative_a111_params = f"\nNegative prompt: {negative.strip()}"
clip_skip_str = f", Clip skip: {abs(clip_skip)}" if clip_skip != 0 else ""
custom_str = f", {custom}" if custom else ""
model_hash_str = f", Model hash: {add_model_hash}" if add_model_hash else ""
hashes_str = f", Hashes: {json.dumps(hashes, separators=(',', ':'))}" if hashes else ""
a111_params = (
f"{positive_a111_params}{negative_a111_params}\n"
f"Steps: {steps}, Sampler: {civitai_sampler_name}, CFG scale: {cfg}, Seed: {seed_value}, "
f"Size: {width}x{height}{clip_skip_str}{custom_str}{model_hash_str}, Model: {basemodelname}{hashes_str}, Version: ComfyUI"
)
# Add Civitai resource listing
if download_civitai_data and civitai_resources:
a111_params += f", Civitai resources: {json.dumps(civitai_resources, separators=(',', ':'))}"
# Combine all resources (model, loras, embeddings, manual entries) for final hash string
all_resources = { modelname: ( ckpt_path, None, modelhash ) } | loras | embeddings | manual_entries
hash_parts = []
for name, (_, weight, hash_value) in (all_resources.items() if isinstance(all_resources, dict) else all_resources):
# Format: "name:hash" or "name:hash:weight" depending on download_civitai_data
if name:
# Extract clean name (only remove actual model file extensions, preserve dots in model names)
filename = name.split(':')[-1]
name_without_ext, ext = os.path.splitext(filename)
supported_extensions = folder_paths.supported_pt_extensions | {".gguf"}
# Only remove extension if it's a known model file extension
if ext.lower() in supported_extensions:
clean_name = name_without_ext
else:
clean_name = filename # Keep full name if extension isn't recognized
name_part = f"{clean_name}:"
else:
name_part = ""
# Skip entries without a valid hash
if not hash_value:
continue
weight_part = f":{weight}" if weight is not None and download_civitai_data else ""
hash_parts.append(f"{name_part}{hash_value}{weight_part}")
final_hashes = ",".join(hash_parts)
metadata = Metadata(modelname, positive, negative, width, height, seed_value, steps, cfg, sampler_name, scheduler_name, denoise, clip_skip, custom, additional_hashes, ckpt_path, a111_params, final_hashes)
return metadata
class ImageSaverSimple:
@classmethod
def INPUT_TYPES(cls) -> dict[str, Any]:
return {
"required": {
"images": ("IMAGE", { "tooltip": "image(s) to save"}),
"filename": ("STRING", {"default": '%time_%basemodelname_%seed', "multiline": False, "tooltip": "filename (available variables: %date, %time, %time_format<format>, %model, %width, %height, %seed, %counter, %sampler_name, %steps, %cfg, %scheduler_name, %basemodelname, %denoise, %clip_skip)"}),
"path": ("STRING", {"default": '', "multiline": False, "tooltip": "path to save the images (under Comfy's save directory)"}),
"extension": (['png', 'jpeg', 'jpg', 'webp'], { "tooltip": "file extension/type to save image as"}),
"lossless_webp": ("BOOLEAN", {"default": True, "tooltip": "if True, saved WEBP files will be lossless"}),
"quality_jpeg_or_webp": ("INT", {"default": 100, "min": 1, "max": 100, "tooltip": "quality setting of JPEG/WEBP"}),
"optimize_png": ("BOOLEAN", {"default": False, "tooltip": "if True, saved PNG files will be optimized (can reduce file size but is slower)"}),
"embed_workflow": ("BOOLEAN", {"default": True, "tooltip": "if True, embeds the workflow in the saved image files.\nStable for PNG, experimental for WEBP.\nJPEG experimental and only if metadata size is below 65535 bytes"}),
"save_workflow_as_json": ("BOOLEAN", {"default": False, "tooltip": "if True, also saves the workflow as a separate JSON file"}),
},
"optional": {
"metadata": ("METADATA", {"default": None, "tooltip": "metadata to embed in the image"}),
"counter": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "counter"}),
"time_format": ("STRING", {"default": "%Y-%m-%d-%H%M%S", "multiline": False, "tooltip": "timestamp format"}),
"show_preview": ("BOOLEAN", {"default": True, "tooltip": "if True, displays saved images in the UI preview"}),
},
"hidden": {
"prompt": "PROMPT",
"extra_pnginfo": "EXTRA_PNGINFO",
},
}
RETURN_TYPES = ("STRING","STRING")
RETURN_NAMES = ("hashes","a1111_params")
OUTPUT_TOOLTIPS = ("Comma-separated list of the hashes to chain with other Image Saver additional_hashes","Written parameters to the image metadata")
FUNCTION = "save_images"
OUTPUT_NODE = True
CATEGORY = "ImageSaver"
DESCRIPTION = "Save images with civitai-compatible generation metadata"
def save_images(self,
images: list[torch.Tensor],
filename: str,
path: str,
extension: str,
lossless_webp: bool,
quality_jpeg_or_webp: int,
optimize_png: bool,
embed_workflow: bool = True,
save_workflow_as_json: bool = False,
show_preview: bool = True,
metadata: Metadata | None = None,
counter: int = 0,
time_format: str = "%Y-%m-%d-%H%M%S",
prompt: dict[str, Any] | None = None,
extra_pnginfo: dict[str, Any] | None = None,
) -> dict[str, Any]:
if metadata is None:
metadata = Metadata('', '', '', 512, 512, 0, 20, 7.0, '', 'normal', 1.0, 0, '', '', '', '', '')
path = make_pathname(path, metadata.width, metadata.height, metadata.seed, metadata.modelname, counter, time_format, metadata.sampler_name, metadata.steps, metadata.cfg, metadata.scheduler_name, metadata.denoise, metadata.clip_skip, metadata.custom)
filenames = ImageSaver.save_images(images, filename, extension, path, quality_jpeg_or_webp, lossless_webp, optimize_png, prompt, extra_pnginfo, save_workflow_as_json, embed_workflow, counter, time_format, metadata)
subfolder = os.path.normpath(path)
result: dict[str, Any] = {
"result": (metadata.final_hashes, metadata.a111_params),
}
if show_preview:
result["ui"] = {"images": [{"filename": filename, "subfolder": subfolder if subfolder != '.' else '', "type": 'output'} for filename in filenames]}
return result
class ImageSaver:
@classmethod
def INPUT_TYPES(cls) -> dict[str, Any]:
return {
"required": {
"images": ("IMAGE", { "tooltip": "image(s) to save"}),
"filename": ("STRING", {"default": '%time_%basemodelname_%seed', "multiline": False, "tooltip": "filename (available variables: %date, %time, %time_format<format>, %model, %width, %height, %seed, %counter, %sampler_name, %steps, %cfg, %scheduler_name, %basemodelname, %denoise, %clip_skip)"}),
"path": ("STRING", {"default": '', "multiline": False, "tooltip": "path to save the images (under Comfy's save directory)"}),
"extension": (['png', 'jpeg', 'jpg', 'webp'], { "tooltip": "file extension/type to save image as"}),
},
"optional": {
"steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "number of steps"}),
"cfg": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0, "tooltip": "CFG value"}),
"modelname": ("STRING", {"default": '', "multiline": False, "tooltip": "model name (can be multiple, separated by commas)"}),
"sampler_name": ("STRING", {"default": '', "multiline": False, "tooltip": "sampler name (as string)"}),
"scheduler_name": ("STRING", {"default": 'normal', "multiline": False, "tooltip": "scheduler name (as string)"}),
"positive": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "positive prompt"}),
"negative": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "negative prompt"}),
"seed_value": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "seed"}),
"width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image width"}),
"height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image height"}),
"lossless_webp": ("BOOLEAN", {"default": True, "tooltip": "if True, saved WEBP files will be lossless"}),
"quality_jpeg_or_webp": ("INT", {"default": 100, "min": 1, "max": 100, "tooltip": "quality setting of JPEG/WEBP"}),
"optimize_png": ("BOOLEAN", {"default": False, "tooltip": "if True, saved PNG files will be optimized (can reduce file size but is slower)"}),
"counter": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "counter"}),
"denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "tooltip": "denoise value"}),
"clip_skip": ("INT", {"default": 0, "min": -24, "max": 24, "tooltip": "skip last CLIP layers (positive or negative value, 0 for no skip)"}),
"time_format": ("STRING", {"default": "%Y-%m-%d-%H%M%S", "multiline": False, "tooltip": "timestamp format"}),
"save_workflow_as_json": ("BOOLEAN", {"default": False, "tooltip": "if True, also saves the workflow as a separate JSON file"}),
"embed_workflow": ("BOOLEAN", {"default": True, "tooltip": "if True, embeds the workflow in the saved image files.\nStable for PNG, experimental for WEBP.\nJPEG experimental and only if metadata size is below 65535 bytes"}),
"additional_hashes": ("STRING", {"default": "", "multiline": False, "tooltip": "hashes separated by commas, optionally with names. 'Name:HASH' (e.g., 'MyLoRA:FF735FF83F98')\nWith download_civitai_data set to true, weights can be added as well. (e.g., 'HASH:Weight', 'Name:HASH:Weight')"}),
"download_civitai_data": ("BOOLEAN", {"default": True, "tooltip": "Download and cache data from civitai.com to save correct metadata. Allows LoRA weights to be saved to the metadata."}),
"easy_remix": ("BOOLEAN", {"default": True, "tooltip": "Strip LoRAs and simplify 'embedding:path' from the prompt to make the Remix option on civitai.com more seamless."}),
"show_preview": ("BOOLEAN", {"default": True, "tooltip": "if True, displays saved images in the UI preview"}),
"custom": ("STRING", {"default": "", "multiline": False, "tooltip": "custom string to add to the metadata, inserted into the a111 string between clip skip and model hash"}),
},
"hidden": {
"prompt": "PROMPT",
"extra_pnginfo": "EXTRA_PNGINFO",
},
}
RETURN_TYPES = ("STRING","STRING")
RETURN_NAMES = ("hashes","a1111_params")
OUTPUT_TOOLTIPS = ("Comma-separated list of the hashes to chain with other Image Saver additional_hashes","Written parameters to the image metadata")
FUNCTION = "save_files"
OUTPUT_NODE = True
CATEGORY = "ImageSaver"
DESCRIPTION = "Save images with civitai-compatible generation metadata"
def save_files(
self,
images: list[torch.Tensor],
filename: str,
path: str,
extension: str,
steps: int = 20,
cfg: float = 7.0,
modelname: str = "",
sampler_name: str = "",
scheduler_name: str = "normal",
positive: str = "unknown",
negative: str = "unknown",
seed_value: int = 0,
width: int = 512,
height: int = 512,
lossless_webp: bool = True,
quality_jpeg_or_webp: int = 100,
optimize_png: bool = False,
counter: int = 0,
denoise: float = 1.0,
clip_skip: int = 0,
time_format: str = "%Y-%m-%d-%H%M%S",
save_workflow_as_json: bool = False,
embed_workflow: bool = True,
additional_hashes: str = "",
download_civitai_data: bool = True,
easy_remix: bool = True,
show_preview: bool = True,
custom: str = "",
prompt: dict[str, Any] | None = None,
extra_pnginfo: dict[str, Any] | None = None,
) -> dict[str, Any]:
metadata = ImageSaverMetadata.make_metadata(modelname, positive, negative, width, height, seed_value, steps, cfg, sampler_name, scheduler_name, denoise, clip_skip, custom, additional_hashes, download_civitai_data, easy_remix)
path = make_pathname(path, metadata.width, metadata.height, metadata.seed, metadata.modelname, counter, time_format, metadata.sampler_name, metadata.steps, metadata.cfg, metadata.scheduler_name, metadata.denoise, metadata.clip_skip, metadata.custom)
filenames = ImageSaver.save_images(images, filename, extension, path, quality_jpeg_or_webp, lossless_webp, optimize_png, prompt, extra_pnginfo, save_workflow_as_json, embed_workflow, counter, time_format, metadata)
subfolder = os.path.normpath(path)
result: dict[str, Any] = {
"result": (metadata.final_hashes, metadata.a111_params),
}
if show_preview:
result["ui"] = {"images": [{"filename": filename, "subfolder": subfolder if subfolder != '.' else '', "type": 'output'} for filename in filenames]}
return result
@staticmethod
def save_images(
images: list[torch.Tensor],
filename_pattern: str,
extension: str,
path: str,
quality_jpeg_or_webp: int,
lossless_webp: bool,
optimize_png: bool,
prompt: dict[str, Any] | None,
extra_pnginfo: dict[str, Any] | None,
save_workflow_as_json: bool,
embed_workflow: bool,
counter: int,
time_format: str,
metadata: Metadata
) -> list[str]:
filename_prefix = make_filename(filename_pattern, metadata.width, metadata.height, metadata.seed, metadata.modelname, counter, time_format, metadata.sampler_name, metadata.steps, metadata.cfg, metadata.scheduler_name, metadata.denoise, metadata.clip_skip, metadata.custom)
output_path = os.path.join(folder_paths.output_directory, path)
if output_path.strip() != '':
if not os.path.exists(output_path.strip()):
print(f'The path `{output_path.strip()}` specified doesn\'t exist! Creating directory.')
os.makedirs(output_path, exist_ok=True)
result_paths: list[str] = list()
num_images = len(images)
for idx, image in enumerate(images):
i = 255. * image.cpu().numpy()
img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8))
current_filename_prefix = ImageSaver.get_unique_filename(output_path, filename_prefix, extension, batch_size=num_images, batch_index=idx)
final_filename = f"{current_filename_prefix}.{extension}"
filepath = os.path.join(output_path, final_filename)
save_image(img, filepath, extension, quality_jpeg_or_webp, lossless_webp, optimize_png, metadata.a111_params, prompt, extra_pnginfo, embed_workflow)
if save_workflow_as_json:
save_json(extra_pnginfo, os.path.join(output_path, current_filename_prefix))
result_paths.append(final_filename)
return result_paths
# Match 'anything' or 'anything:anything' with trimmed white space
re_manual_hash = re.compile(r'^\s*([^:]+?)(?:\s*:\s*([^\s:][^:]*?))?\s*$')
# Match 'anything', 'anything:anything' or 'anything:anything:number' with trimmed white space
re_manual_hash_weights = re.compile(r'^\s*([^:]+?)(?:\s*:\s*([^\s:][^:]*?))?(?:\s*:\s*([-+]?(?:\d+(?:\.\d*)?|\.\d+)))?\s*$')
@staticmethod
def get_multiple_models(modelname: str, additional_hashes: str) -> tuple[str, str]:
model_names = [m.strip() for m in modelname.split(',')]
modelname = model_names[0] # Use the first model as the primary one
# Process additional model names and add to additional_hashes
for additional_model in model_names[1:]:
additional_ckpt_path = full_checkpoint_path_for(additional_model)
if additional_ckpt_path:
additional_modelhash = get_sha256(additional_ckpt_path)[:10]
# Add to additional_hashes in "name:HASH" format
if additional_hashes:
additional_hashes += ","
additional_hashes += f"{additional_model}:{additional_modelhash}"
return modelname, additional_hashes
@staticmethod
def parse_manual_hashes(additional_hashes: str, existing_hashes: set[str], download_civitai_data: bool) -> dict[str, tuple[str | None, float | None, str]]:
"""Process additional_hashes input (a string) by normalizing, removing extra spaces/newlines, and splitting by comma"""
manual_entries: dict[str, tuple[str | None, float | None, str]] = {}
unnamed_count = 0
additional_hash_split = additional_hashes.replace("\n", ",").split(",") if additional_hashes else []
for entry in additional_hash_split:
match = (ImageSaver.re_manual_hash_weights if download_civitai_data else ImageSaver.re_manual_hash).search(entry)
if match is None:
print(f"ComfyUI-Image-Saver: Invalid additional hash string: '{entry}'")
continue
groups = tuple(group for group in match.groups() if group)
# Read weight and remove from groups, if needed
weight = None
if download_civitai_data and len(groups) > 1:
try:
weight = float(groups[-1])
groups = groups[:-1]
except (ValueError, TypeError):
pass
# Read hash, optionally preceded by name
name, hash = groups if len(groups) > 1 else (None, groups[0])
if len(hash) > MAX_HASH_LENGTH:
print(f"ComfyUI-Image-Saver: Skipping hash. Length exceeds maximum of {MAX_HASH_LENGTH} characters: {hash}")
continue
if any(hash.lower() == existing_hash.lower() for _, _, existing_hash in manual_entries.values()):
print(f"ComfyUI-Image-Saver: Skipping duplicate hash: {hash}")
continue # Skip duplicates
if hash.lower() in existing_hashes:
print(f"ComfyUI-Image-Saver: Skipping manual hash already present in resources: {hash}")
continue
if name is None:
unnamed_count += 1
name = f"manual{unnamed_count}"
elif name in manual_entries:
print(f"ComfyUI-Image-Saver: Duplicate manual hash name '{name}' is being overwritten.")
manual_entries[name] = (None, weight, hash)
if len(manual_entries) > 29:
print("ComfyUI-Image-Saver: Reached maximum limit of 30 manual hashes. Skipping the rest.")
break
return manual_entries
@staticmethod
def clean_prompt(prompt: str, metadata_extractor: PromptMetadataExtractor) -> str:
"""Clean prompts for easier remixing by removing LoRAs and simplifying embeddings."""
# Strip loras
prompt = re.sub(metadata_extractor.LORA, "", prompt)
# Shorten 'embedding:path/to/my_embedding' -> 'my_embedding'
# Note: Possible inaccurate embedding name if the filename has been renamed from the default
prompt = re.sub(metadata_extractor.EMBEDDING, lambda match: Path(match.group(1)).stem, prompt)
# Remove prompt control edits. e.g., 'STYLE(A1111, mean)', 'SHIFT(1)`, etc.`
prompt = re.sub(r'\b[A-Z]+\([^)]*\)', "", prompt)
return prompt
@staticmethod
def get_unique_filename(output_path: str, filename_prefix: str, extension: str, batch_size: int = 1, batch_index: int = 0) -> str:
existing_files = [f for f in os.listdir(output_path) if f.startswith(filename_prefix) and f.endswith(extension)]
# For single images with no existing files, return plain filename
if batch_size == 1 and not existing_files:
return f"{filename_prefix}"
# For batches or when files exist, always use numbered suffix
suffixes: list[int] = []
for f in existing_files:
name, _ = os.path.splitext(f)
parts = name.split('_')
if parts[-1].isdigit():
suffixes.append(int(parts[-1]))
if suffixes:
# Start numbering after the highest existing suffix
base_suffix = max(suffixes) + 1
else:
# No numbered files exist yet
if existing_files:
# Plain file exists, start at 1 (the plain file is effectively 0)
base_suffix = 1
else:
# No files at all, start at 1
base_suffix = 1
return f"{filename_prefix}_{base_suffix + batch_index:02d}"