import os from datetime import datetime from dataclasses import dataclass from pathlib import Path from typing import Any import json import numpy as np import re from PIL import Image import torch import folder_paths from nodes import MAX_RESOLUTION from .saver.saver import save_image from .utils import sanitize_filename, get_sha256, full_checkpoint_path_for from .utils_civitai import get_civitai_sampler_name, get_civitai_metadata, MAX_HASH_LENGTH from .prompt_metadata_extractor import PromptMetadataExtractor def parse_checkpoint_name(ckpt_name: str) -> str: return os.path.basename(ckpt_name) def parse_checkpoint_name_without_extension(ckpt_name: str) -> str: filename = parse_checkpoint_name(ckpt_name) name_without_ext, ext = os.path.splitext(filename) supported_extensions = folder_paths.supported_pt_extensions | {".gguf"} # Only remove extension if it's a known model file extension if ext.lower() in supported_extensions: return name_without_ext else: return filename # Keep full name if extension isn't recognized def get_timestamp(time_format: str) -> str: now = datetime.now() try: timestamp = now.strftime(time_format) except: timestamp = now.strftime("%Y-%m-%d-%H%M%S") return timestamp def apply_custom_time_format(filename: str) -> str: """ Replace %time_format patterns with formatted datetime. Example: %time_format<%Y-%m-%d> becomes 2026-01-17 """ now = datetime.now() # Pattern to match %time_format where XXX is any strftime format string # Use negative lookahead to exclude %time_format itself from variable delimiters pattern = r'%time_format<([^>]*)>' def replace_format(match): format_str = match.group(1) try: return now.strftime(format_str) except: # If format is invalid, return original return match.group(0) return re.sub(pattern, replace_format, filename) def save_json(image_info: dict[str, Any] | None, filename: str) -> None: try: workflow = (image_info or {}).get('workflow') if workflow is None: print('No image info found, skipping saving of JSON') with open(f'{filename}.json', 'w') as workflow_file: json.dump(workflow, workflow_file) print(f'Saved workflow to {filename}.json') except Exception as e: print(f'Failed to save workflow as json due to: {e}, proceeding with the remainder of saving execution') def make_pathname(filename: str, width: int, height: int, seed: int, modelname: str, counter: int, time_format: str, sampler_name: str, steps: int, cfg: float, scheduler_name: str, denoise: float, clip_skip: int, custom: str) -> str: # Process custom time_format patterns first filename = apply_custom_time_format(filename) filename = filename.replace("%date", get_timestamp("%Y-%m-%d")) filename = filename.replace("%time", get_timestamp(time_format)) filename = filename.replace("%model", parse_checkpoint_name(modelname)) filename = filename.replace("%width", str(width)) filename = filename.replace("%height", str(height)) filename = filename.replace("%seed", str(seed)) filename = filename.replace("%counter", str(counter)) filename = filename.replace("%sampler_name", sampler_name) filename = filename.replace("%steps", str(steps)) filename = filename.replace("%cfg", str(cfg)) filename = filename.replace("%scheduler_name", scheduler_name) filename = filename.replace("%basemodelname", parse_checkpoint_name_without_extension(modelname)) filename = filename.replace("%denoise", str(denoise)) filename = filename.replace("%clip_skip", str(clip_skip)) filename = filename.replace("%custom", custom) directory, basename = os.path.split(filename) sanitized_basename = sanitize_filename(basename) return os.path.join(directory, sanitized_basename) def make_filename(filename: str, width: int, height: int, seed: int, modelname: str, counter: int, time_format: str, sampler_name: str, steps: int, cfg: float, scheduler_name: str, denoise: float, clip_skip: int, custom: str) -> str: filename = make_pathname(filename, width, height, seed, modelname, counter, time_format, sampler_name, steps, cfg, scheduler_name, denoise, clip_skip, custom) return get_timestamp(time_format) if filename == "" else filename @dataclass class Metadata: modelname: str positive: str negative: str width: int height: int seed: int steps: int cfg: float sampler_name: str scheduler_name: str denoise: float clip_skip: int custom: str additional_hashes: str ckpt_path: str a111_params: str final_hashes: str class ImageSaverMetadata: @classmethod def INPUT_TYPES(cls) -> dict[str, Any]: return { "optional": { "modelname": ("STRING", {"default": '', "multiline": False, "tooltip": "model name (can be multiple, separated by commas)"}), "positive": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "positive prompt"}), "negative": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "negative prompt"}), "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image width"}), "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image height"}), "seed_value": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "seed"}), "steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "number of steps"}), "cfg": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0, "tooltip": "CFG value"}), "sampler_name": ("STRING", {"default": '', "multiline": False, "tooltip": "sampler name (as string)"}), "scheduler_name": ("STRING", {"default": 'normal', "multiline": False, "tooltip": "scheduler name (as string)"}), "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "tooltip": "denoise value"}), "clip_skip": ("INT", {"default": 0, "min": -24, "max": 24, "tooltip": "skip last CLIP layers (positive or negative value, 0 for no skip)"}), "additional_hashes": ("STRING", {"default": "", "multiline": False, "tooltip": "hashes separated by commas, optionally with names. 'Name:HASH' (e.g., 'MyLoRA:FF735FF83F98')\nWith download_civitai_data set to true, weights can be added as well. (e.g., 'HASH:Weight', 'Name:HASH:Weight')"}), "download_civitai_data": ("BOOLEAN", {"default": True, "tooltip": "Download and cache data from civitai.com to save correct metadata. Allows LoRA weights to be saved to the metadata."}), "easy_remix": ("BOOLEAN", {"default": True, "tooltip": "Strip LoRAs and simplify 'embedding:path' from the prompt to make the Remix option on civitai.com more seamless."}), "custom": ("STRING", {"default": "", "multiline": False, "tooltip": "custom string to add to the metadata, inserted into the a111 string between clip skip and model hash"}), }, } RETURN_TYPES = ("METADATA","STRING","STRING") RETURN_NAMES = ("metadata","hashes","a1111_params") OUTPUT_TOOLTIPS = ("metadata for Image Saver Simple","Comma-separated list of the hashes to chain with other Image Saver additional_hashes","Written parameters to the image metadata") FUNCTION = "get_metadata" CATEGORY = "ImageSaver" DESCRIPTION = "Prepare metadata for Image Saver Simple" def get_metadata( self, modelname: str = "", positive: str = "unknown", negative: str = "unknown", width: int = 512, height: int = 512, seed_value: int = 0, steps: int = 20, cfg: float = 7.0, sampler_name: str = "", scheduler_name: str = "normal", denoise: float = 1.0, clip_skip: int = 0, custom: str = "", additional_hashes: str = "", download_civitai_data: bool = True, easy_remix: bool = True, ) -> tuple[Metadata, str, str]: metadata = ImageSaverMetadata.make_metadata(modelname, positive, negative, width, height, seed_value, steps, cfg, sampler_name, scheduler_name, denoise, clip_skip, custom, additional_hashes, download_civitai_data, easy_remix) return (metadata, metadata.final_hashes, metadata.a111_params) @staticmethod def make_metadata(modelname: str, positive: str, negative: str, width: int, height: int, seed_value: int, steps: int, cfg: float, sampler_name: str, scheduler_name: str, denoise: float, clip_skip: int, custom: str, additional_hashes: str, download_civitai_data: bool, easy_remix: bool) -> Metadata: modelname, additional_hashes = ImageSaver.get_multiple_models(modelname, additional_hashes) ckpt_path = full_checkpoint_path_for(modelname) if ckpt_path: modelhash = get_sha256(ckpt_path)[:10] else: modelhash = "" metadata_extractor = PromptMetadataExtractor([positive, negative]) embeddings = metadata_extractor.get_embeddings() loras = metadata_extractor.get_loras() civitai_sampler_name = get_civitai_sampler_name(sampler_name.replace('_gpu', ''), scheduler_name) basemodelname = parse_checkpoint_name_without_extension(modelname) # Get existing hashes from model, loras, and embeddings existing_hashes = {modelhash.lower()} | {t[2].lower() for t in loras.values()} | {t[2].lower() for t in embeddings.values()} # Parse manual hashes manual_entries = ImageSaver.parse_manual_hashes(additional_hashes, existing_hashes, download_civitai_data) # Get Civitai metadata civitai_resources, hashes, add_model_hash = get_civitai_metadata(modelname, ckpt_path, modelhash, loras, embeddings, manual_entries, download_civitai_data) if easy_remix: positive = ImageSaver.clean_prompt(positive, metadata_extractor) negative = ImageSaver.clean_prompt(negative, metadata_extractor) positive_a111_params = positive.strip() negative_a111_params = f"\nNegative prompt: {negative.strip()}" clip_skip_str = f", Clip skip: {abs(clip_skip)}" if clip_skip != 0 else "" custom_str = f", {custom}" if custom else "" model_hash_str = f", Model hash: {add_model_hash}" if add_model_hash else "" hashes_str = f", Hashes: {json.dumps(hashes, separators=(',', ':'))}" if hashes else "" a111_params = ( f"{positive_a111_params}{negative_a111_params}\n" f"Steps: {steps}, Sampler: {civitai_sampler_name}, CFG scale: {cfg}, Seed: {seed_value}, " f"Size: {width}x{height}{clip_skip_str}{custom_str}{model_hash_str}, Model: {basemodelname}{hashes_str}, Version: ComfyUI" ) # Add Civitai resource listing if download_civitai_data and civitai_resources: a111_params += f", Civitai resources: {json.dumps(civitai_resources, separators=(',', ':'))}" # Combine all resources (model, loras, embeddings, manual entries) for final hash string all_resources = { modelname: ( ckpt_path, None, modelhash ) } | loras | embeddings | manual_entries hash_parts = [] for name, (_, weight, hash_value) in (all_resources.items() if isinstance(all_resources, dict) else all_resources): # Format: "name:hash" or "name:hash:weight" depending on download_civitai_data if name: # Extract clean name (only remove actual model file extensions, preserve dots in model names) filename = name.split(':')[-1] name_without_ext, ext = os.path.splitext(filename) supported_extensions = folder_paths.supported_pt_extensions | {".gguf"} # Only remove extension if it's a known model file extension if ext.lower() in supported_extensions: clean_name = name_without_ext else: clean_name = filename # Keep full name if extension isn't recognized name_part = f"{clean_name}:" else: name_part = "" # Skip entries without a valid hash if not hash_value: continue weight_part = f":{weight}" if weight is not None and download_civitai_data else "" hash_parts.append(f"{name_part}{hash_value}{weight_part}") final_hashes = ",".join(hash_parts) metadata = Metadata(modelname, positive, negative, width, height, seed_value, steps, cfg, sampler_name, scheduler_name, denoise, clip_skip, custom, additional_hashes, ckpt_path, a111_params, final_hashes) return metadata class ImageSaverSimple: @classmethod def INPUT_TYPES(cls) -> dict[str, Any]: return { "required": { "images": ("IMAGE", { "tooltip": "image(s) to save"}), "filename": ("STRING", {"default": '%time_%basemodelname_%seed', "multiline": False, "tooltip": "filename (available variables: %date, %time, %time_format, %model, %width, %height, %seed, %counter, %sampler_name, %steps, %cfg, %scheduler_name, %basemodelname, %denoise, %clip_skip)"}), "path": ("STRING", {"default": '', "multiline": False, "tooltip": "path to save the images (under Comfy's save directory)"}), "extension": (['png', 'jpeg', 'jpg', 'webp'], { "tooltip": "file extension/type to save image as"}), "lossless_webp": ("BOOLEAN", {"default": True, "tooltip": "if True, saved WEBP files will be lossless"}), "quality_jpeg_or_webp": ("INT", {"default": 100, "min": 1, "max": 100, "tooltip": "quality setting of JPEG/WEBP"}), "optimize_png": ("BOOLEAN", {"default": False, "tooltip": "if True, saved PNG files will be optimized (can reduce file size but is slower)"}), "embed_workflow": ("BOOLEAN", {"default": True, "tooltip": "if True, embeds the workflow in the saved image files.\nStable for PNG, experimental for WEBP.\nJPEG experimental and only if metadata size is below 65535 bytes"}), "save_workflow_as_json": ("BOOLEAN", {"default": False, "tooltip": "if True, also saves the workflow as a separate JSON file"}), }, "optional": { "metadata": ("METADATA", {"default": None, "tooltip": "metadata to embed in the image"}), "counter": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "counter"}), "time_format": ("STRING", {"default": "%Y-%m-%d-%H%M%S", "multiline": False, "tooltip": "timestamp format"}), "show_preview": ("BOOLEAN", {"default": True, "tooltip": "if True, displays saved images in the UI preview"}), }, "hidden": { "prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", }, } RETURN_TYPES = ("STRING","STRING") RETURN_NAMES = ("hashes","a1111_params") OUTPUT_TOOLTIPS = ("Comma-separated list of the hashes to chain with other Image Saver additional_hashes","Written parameters to the image metadata") FUNCTION = "save_images" OUTPUT_NODE = True CATEGORY = "ImageSaver" DESCRIPTION = "Save images with civitai-compatible generation metadata" def save_images(self, images: list[torch.Tensor], filename: str, path: str, extension: str, lossless_webp: bool, quality_jpeg_or_webp: int, optimize_png: bool, embed_workflow: bool = True, save_workflow_as_json: bool = False, show_preview: bool = True, metadata: Metadata | None = None, counter: int = 0, time_format: str = "%Y-%m-%d-%H%M%S", prompt: dict[str, Any] | None = None, extra_pnginfo: dict[str, Any] | None = None, ) -> dict[str, Any]: if metadata is None: metadata = Metadata('', '', '', 512, 512, 0, 20, 7.0, '', 'normal', 1.0, 0, '', '', '', '', '') path = make_pathname(path, metadata.width, metadata.height, metadata.seed, metadata.modelname, counter, time_format, metadata.sampler_name, metadata.steps, metadata.cfg, metadata.scheduler_name, metadata.denoise, metadata.clip_skip, metadata.custom) filenames = ImageSaver.save_images(images, filename, extension, path, quality_jpeg_or_webp, lossless_webp, optimize_png, prompt, extra_pnginfo, save_workflow_as_json, embed_workflow, counter, time_format, metadata) subfolder = os.path.normpath(path) result: dict[str, Any] = { "result": (metadata.final_hashes, metadata.a111_params), } if show_preview: result["ui"] = {"images": [{"filename": filename, "subfolder": subfolder if subfolder != '.' else '', "type": 'output'} for filename in filenames]} return result class ImageSaver: @classmethod def INPUT_TYPES(cls) -> dict[str, Any]: return { "required": { "images": ("IMAGE", { "tooltip": "image(s) to save"}), "filename": ("STRING", {"default": '%time_%basemodelname_%seed', "multiline": False, "tooltip": "filename (available variables: %date, %time, %time_format, %model, %width, %height, %seed, %counter, %sampler_name, %steps, %cfg, %scheduler_name, %basemodelname, %denoise, %clip_skip)"}), "path": ("STRING", {"default": '', "multiline": False, "tooltip": "path to save the images (under Comfy's save directory)"}), "extension": (['png', 'jpeg', 'jpg', 'webp'], { "tooltip": "file extension/type to save image as"}), }, "optional": { "steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "number of steps"}), "cfg": ("FLOAT", {"default": 7.0, "min": 0.0, "max": 100.0, "tooltip": "CFG value"}), "modelname": ("STRING", {"default": '', "multiline": False, "tooltip": "model name (can be multiple, separated by commas)"}), "sampler_name": ("STRING", {"default": '', "multiline": False, "tooltip": "sampler name (as string)"}), "scheduler_name": ("STRING", {"default": 'normal', "multiline": False, "tooltip": "scheduler name (as string)"}), "positive": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "positive prompt"}), "negative": ("STRING", {"default": 'unknown', "multiline": True, "tooltip": "negative prompt"}), "seed_value": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "seed"}), "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image width"}), "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8, "tooltip": "image height"}), "lossless_webp": ("BOOLEAN", {"default": True, "tooltip": "if True, saved WEBP files will be lossless"}), "quality_jpeg_or_webp": ("INT", {"default": 100, "min": 1, "max": 100, "tooltip": "quality setting of JPEG/WEBP"}), "optimize_png": ("BOOLEAN", {"default": False, "tooltip": "if True, saved PNG files will be optimized (can reduce file size but is slower)"}), "counter": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "tooltip": "counter"}), "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "tooltip": "denoise value"}), "clip_skip": ("INT", {"default": 0, "min": -24, "max": 24, "tooltip": "skip last CLIP layers (positive or negative value, 0 for no skip)"}), "time_format": ("STRING", {"default": "%Y-%m-%d-%H%M%S", "multiline": False, "tooltip": "timestamp format"}), "save_workflow_as_json": ("BOOLEAN", {"default": False, "tooltip": "if True, also saves the workflow as a separate JSON file"}), "embed_workflow": ("BOOLEAN", {"default": True, "tooltip": "if True, embeds the workflow in the saved image files.\nStable for PNG, experimental for WEBP.\nJPEG experimental and only if metadata size is below 65535 bytes"}), "additional_hashes": ("STRING", {"default": "", "multiline": False, "tooltip": "hashes separated by commas, optionally with names. 'Name:HASH' (e.g., 'MyLoRA:FF735FF83F98')\nWith download_civitai_data set to true, weights can be added as well. (e.g., 'HASH:Weight', 'Name:HASH:Weight')"}), "download_civitai_data": ("BOOLEAN", {"default": True, "tooltip": "Download and cache data from civitai.com to save correct metadata. Allows LoRA weights to be saved to the metadata."}), "easy_remix": ("BOOLEAN", {"default": True, "tooltip": "Strip LoRAs and simplify 'embedding:path' from the prompt to make the Remix option on civitai.com more seamless."}), "show_preview": ("BOOLEAN", {"default": True, "tooltip": "if True, displays saved images in the UI preview"}), "custom": ("STRING", {"default": "", "multiline": False, "tooltip": "custom string to add to the metadata, inserted into the a111 string between clip skip and model hash"}), }, "hidden": { "prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO", }, } RETURN_TYPES = ("STRING","STRING") RETURN_NAMES = ("hashes","a1111_params") OUTPUT_TOOLTIPS = ("Comma-separated list of the hashes to chain with other Image Saver additional_hashes","Written parameters to the image metadata") FUNCTION = "save_files" OUTPUT_NODE = True CATEGORY = "ImageSaver" DESCRIPTION = "Save images with civitai-compatible generation metadata" def save_files( self, images: list[torch.Tensor], filename: str, path: str, extension: str, steps: int = 20, cfg: float = 7.0, modelname: str = "", sampler_name: str = "", scheduler_name: str = "normal", positive: str = "unknown", negative: str = "unknown", seed_value: int = 0, width: int = 512, height: int = 512, lossless_webp: bool = True, quality_jpeg_or_webp: int = 100, optimize_png: bool = False, counter: int = 0, denoise: float = 1.0, clip_skip: int = 0, time_format: str = "%Y-%m-%d-%H%M%S", save_workflow_as_json: bool = False, embed_workflow: bool = True, additional_hashes: str = "", download_civitai_data: bool = True, easy_remix: bool = True, show_preview: bool = True, custom: str = "", prompt: dict[str, Any] | None = None, extra_pnginfo: dict[str, Any] | None = None, ) -> dict[str, Any]: metadata = ImageSaverMetadata.make_metadata(modelname, positive, negative, width, height, seed_value, steps, cfg, sampler_name, scheduler_name, denoise, clip_skip, custom, additional_hashes, download_civitai_data, easy_remix) path = make_pathname(path, metadata.width, metadata.height, metadata.seed, metadata.modelname, counter, time_format, metadata.sampler_name, metadata.steps, metadata.cfg, metadata.scheduler_name, metadata.denoise, metadata.clip_skip, metadata.custom) filenames = ImageSaver.save_images(images, filename, extension, path, quality_jpeg_or_webp, lossless_webp, optimize_png, prompt, extra_pnginfo, save_workflow_as_json, embed_workflow, counter, time_format, metadata) subfolder = os.path.normpath(path) result: dict[str, Any] = { "result": (metadata.final_hashes, metadata.a111_params), } if show_preview: result["ui"] = {"images": [{"filename": filename, "subfolder": subfolder if subfolder != '.' else '', "type": 'output'} for filename in filenames]} return result @staticmethod def save_images( images: list[torch.Tensor], filename_pattern: str, extension: str, path: str, quality_jpeg_or_webp: int, lossless_webp: bool, optimize_png: bool, prompt: dict[str, Any] | None, extra_pnginfo: dict[str, Any] | None, save_workflow_as_json: bool, embed_workflow: bool, counter: int, time_format: str, metadata: Metadata ) -> list[str]: filename_prefix = make_filename(filename_pattern, metadata.width, metadata.height, metadata.seed, metadata.modelname, counter, time_format, metadata.sampler_name, metadata.steps, metadata.cfg, metadata.scheduler_name, metadata.denoise, metadata.clip_skip, metadata.custom) output_path = os.path.join(folder_paths.output_directory, path) if output_path.strip() != '': if not os.path.exists(output_path.strip()): print(f'The path `{output_path.strip()}` specified doesn\'t exist! Creating directory.') os.makedirs(output_path, exist_ok=True) result_paths: list[str] = list() num_images = len(images) for idx, image in enumerate(images): i = 255. * image.cpu().numpy() img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) current_filename_prefix = ImageSaver.get_unique_filename(output_path, filename_prefix, extension, batch_size=num_images, batch_index=idx) final_filename = f"{current_filename_prefix}.{extension}" filepath = os.path.join(output_path, final_filename) save_image(img, filepath, extension, quality_jpeg_or_webp, lossless_webp, optimize_png, metadata.a111_params, prompt, extra_pnginfo, embed_workflow) if save_workflow_as_json: save_json(extra_pnginfo, os.path.join(output_path, current_filename_prefix)) result_paths.append(final_filename) return result_paths # Match 'anything' or 'anything:anything' with trimmed white space re_manual_hash = re.compile(r'^\s*([^:]+?)(?:\s*:\s*([^\s:][^:]*?))?\s*$') # Match 'anything', 'anything:anything' or 'anything:anything:number' with trimmed white space re_manual_hash_weights = re.compile(r'^\s*([^:]+?)(?:\s*:\s*([^\s:][^:]*?))?(?:\s*:\s*([-+]?(?:\d+(?:\.\d*)?|\.\d+)))?\s*$') @staticmethod def get_multiple_models(modelname: str, additional_hashes: str) -> tuple[str, str]: model_names = [m.strip() for m in modelname.split(',')] modelname = model_names[0] # Use the first model as the primary one # Process additional model names and add to additional_hashes for additional_model in model_names[1:]: additional_ckpt_path = full_checkpoint_path_for(additional_model) if additional_ckpt_path: additional_modelhash = get_sha256(additional_ckpt_path)[:10] # Add to additional_hashes in "name:HASH" format if additional_hashes: additional_hashes += "," additional_hashes += f"{additional_model}:{additional_modelhash}" return modelname, additional_hashes @staticmethod def parse_manual_hashes(additional_hashes: str, existing_hashes: set[str], download_civitai_data: bool) -> dict[str, tuple[str | None, float | None, str]]: """Process additional_hashes input (a string) by normalizing, removing extra spaces/newlines, and splitting by comma""" manual_entries: dict[str, tuple[str | None, float | None, str]] = {} unnamed_count = 0 additional_hash_split = additional_hashes.replace("\n", ",").split(",") if additional_hashes else [] for entry in additional_hash_split: match = (ImageSaver.re_manual_hash_weights if download_civitai_data else ImageSaver.re_manual_hash).search(entry) if match is None: print(f"ComfyUI-Image-Saver: Invalid additional hash string: '{entry}'") continue groups = tuple(group for group in match.groups() if group) # Read weight and remove from groups, if needed weight = None if download_civitai_data and len(groups) > 1: try: weight = float(groups[-1]) groups = groups[:-1] except (ValueError, TypeError): pass # Read hash, optionally preceded by name name, hash = groups if len(groups) > 1 else (None, groups[0]) if len(hash) > MAX_HASH_LENGTH: print(f"ComfyUI-Image-Saver: Skipping hash. Length exceeds maximum of {MAX_HASH_LENGTH} characters: {hash}") continue if any(hash.lower() == existing_hash.lower() for _, _, existing_hash in manual_entries.values()): print(f"ComfyUI-Image-Saver: Skipping duplicate hash: {hash}") continue # Skip duplicates if hash.lower() in existing_hashes: print(f"ComfyUI-Image-Saver: Skipping manual hash already present in resources: {hash}") continue if name is None: unnamed_count += 1 name = f"manual{unnamed_count}" elif name in manual_entries: print(f"ComfyUI-Image-Saver: Duplicate manual hash name '{name}' is being overwritten.") manual_entries[name] = (None, weight, hash) if len(manual_entries) > 29: print("ComfyUI-Image-Saver: Reached maximum limit of 30 manual hashes. Skipping the rest.") break return manual_entries @staticmethod def clean_prompt(prompt: str, metadata_extractor: PromptMetadataExtractor) -> str: """Clean prompts for easier remixing by removing LoRAs and simplifying embeddings.""" # Strip loras prompt = re.sub(metadata_extractor.LORA, "", prompt) # Shorten 'embedding:path/to/my_embedding' -> 'my_embedding' # Note: Possible inaccurate embedding name if the filename has been renamed from the default prompt = re.sub(metadata_extractor.EMBEDDING, lambda match: Path(match.group(1)).stem, prompt) # Remove prompt control edits. e.g., 'STYLE(A1111, mean)', 'SHIFT(1)`, etc.` prompt = re.sub(r'\b[A-Z]+\([^)]*\)', "", prompt) return prompt @staticmethod def get_unique_filename(output_path: str, filename_prefix: str, extension: str, batch_size: int = 1, batch_index: int = 0) -> str: existing_files = [f for f in os.listdir(output_path) if f.startswith(filename_prefix) and f.endswith(extension)] # For single images with no existing files, return plain filename if batch_size == 1 and not existing_files: return f"{filename_prefix}" # For batches or when files exist, always use numbered suffix suffixes: list[int] = [] for f in existing_files: name, _ = os.path.splitext(f) parts = name.split('_') if parts[-1].isdigit(): suffixes.append(int(parts[-1])) if suffixes: # Start numbering after the highest existing suffix base_suffix = max(suffixes) + 1 else: # No numbered files exist yet if existing_files: # Plain file exists, start at 1 (the plain file is effectively 0) base_suffix = 1 else: # No files at all, start at 1 base_suffix = 1 return f"{filename_prefix}_{base_suffix + batch_index:02d}"