Files
jaidaken f09734b0ee
Some checks failed
Python Linting / Run Ruff (push) Has been cancelled
Python Linting / Run Pylint (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.10, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.11, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.12, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-unix-nightly (12.1, , linux, 3.11, [self-hosted Linux], nightly) (push) Has been cancelled
Execution Tests / test (macos-latest) (push) Has been cancelled
Execution Tests / test (ubuntu-latest) (push) Has been cancelled
Execution Tests / test (windows-latest) (push) Has been cancelled
Test server launches without errors / test (push) Has been cancelled
Unit Tests / test (macos-latest) (push) Has been cancelled
Unit Tests / test (ubuntu-latest) (push) Has been cancelled
Unit Tests / test (windows-2022) (push) Has been cancelled
Add custom nodes, Civitai loras (LFS), and vast.ai setup script
Includes 30 custom nodes committed directly, 7 Civitai-exclusive
loras stored via Git LFS, and a setup script that installs all
dependencies and downloads HuggingFace-hosted models on vast.ai.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 00:56:42 +00:00

152 lines
6.4 KiB
Python

import hashlib
import os
import requests
from typing import Optional, Any
from collections.abc import Collection, Iterator
from pathlib import Path
from tqdm import tqdm
import folder_paths
import re
def sanitize_filename(filename: str) -> str:
"""Remove characters that are unsafe for filenames."""
# Remove characters that are generally unsafe across file systems
unsafe_chars = r'[<>:"|?*\x00-\x1f]'
sanitized = re.sub(unsafe_chars, '', filename)
# Remove trailing periods and spaces (problematic on Windows)
sanitized = sanitized.rstrip('. ')
return sanitized
def get_sha256(file_path: str) -> str:
"""
Given the file path, finds a matching sha256 file, or creates one
based on the headers in the source file
"""
file_no_ext = os.path.splitext(file_path)[0]
hash_file = file_no_ext + ".sha256"
if os.path.exists(hash_file):
try:
with open(hash_file, "r") as f:
return f.read().strip()
except OSError as e:
print(f"ComfyUI-Image-Saver: Error reading existing hash file: {e}")
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
file_size = os.fstat(f.fileno()).st_size
block_size = 1048576 # 1 MB
print(f"ComfyUI-Image-Saver: Calculating sha256 for '{Path(file_path).stem}'")
with tqdm(None, None, file_size, unit="B", unit_scale=True, unit_divisor=1024) as progress_bar:
for byte_block in iter(lambda: f.read(block_size), b""):
progress_bar.update(len(byte_block))
sha256_hash.update(byte_block)
try:
with open(hash_file, "w") as f:
f.write(sha256_hash.hexdigest())
except OSError as e:
print(f"ComfyUI-Image-Saver: Error writing hash to {hash_file}: {e}")
return sha256_hash.hexdigest()
def full_embedding_path_for(embedding: str) -> Optional[str]:
"""
Based on a embedding name, eg: EasyNegative, finds the path as known in comfy, including extension
"""
matching_embedding = get_file_path_match("embeddings", embedding)
if matching_embedding is None:
print(f'ComfyUI-Image-Saver: could not find full path to embedding "{embedding}"')
return None
return folder_paths.get_full_path("embeddings", matching_embedding)
def full_lora_path_for(lora: str) -> Optional[str]:
"""
Based on a lora name, e.g., 'epi_noise_offset2', finds the path as known in comfy, including extension.
"""
# Find the matching lora path
matching_lora = get_file_path_match("loras", lora)
if matching_lora is None:
print(f'ComfyUI-Image-Saver: could not find full path to lora "{lora}"')
return None
return folder_paths.get_full_path("loras", matching_lora)
def full_checkpoint_path_for(model_name: str) -> str:
if not model_name:
return ''
supported_extensions = set(folder_paths.supported_pt_extensions) | {".gguf"}
matching_checkpoint = get_file_path_match("checkpoints", model_name, supported_extensions)
if matching_checkpoint is not None:
return folder_paths.get_full_path("checkpoints", matching_checkpoint)
matching_model = get_file_path_match("diffusion_models", model_name, supported_extensions)
if matching_model:
return folder_paths.get_full_path("diffusion_models", matching_model)
print(f'Could not find full path to checkpoint "{model_name}"')
return ''
def get_file_path_iterator(folder_name: str, supported_extensions: Optional[Collection[str]] = None) -> Iterator[Path]:
"""
Returns an iterator over valid file paths for the specified model folder.
"""
if supported_extensions is None:
return (Path(x) for x in folder_paths.get_filename_list(folder_name))
else:
return custom_file_path_generator(folder_name, supported_extensions)
def custom_file_path_generator(folder_name: str, supported_extensions: Collection[str]) -> Iterator[Path]:
"""
Generator function for file paths, allowing for a customized extension check.
"""
model_paths = folder_paths.folder_names_and_paths.get(folder_name, [[], set()])[0]
for path in model_paths:
if os.path.exists(path):
base_path = Path(path)
for root, _, files in os.walk(path):
root_path = Path(root).relative_to(base_path)
for file in files:
file_path = root_path / file
if file_path.suffix.lower() in supported_extensions:
yield file_path
def get_file_path_match(folder_name: str, file_name: str, supported_extensions: Optional[Collection[str]] = None) -> Optional[str]:
supported_extensions_fallback = supported_extensions if supported_extensions is not None else folder_paths.supported_pt_extensions
file_path = Path(file_name)
# first try full path match, then fallback to just name match, matching the extension if appropriate
if file_path.suffix.lower() not in supported_extensions_fallback:
matching_file_path = next((p for p in get_file_path_iterator(folder_name, supported_extensions) if p.with_suffix('') == file_path), None)
matching_file_path = (matching_file_path if matching_file_path is not None else
next((p for p in get_file_path_iterator(folder_name, supported_extensions) if p.stem == file_path.name), None))
else:
matching_file_path = next((p for p in get_file_path_iterator(folder_name, supported_extensions) if p == file_path), None)
matching_file_path = (matching_file_path if matching_file_path is not None else
next((p for p in get_file_path_iterator(folder_name, supported_extensions) if p.name == file_path.name), None))
return str(matching_file_path) if matching_file_path is not None else None
def http_get_json(url: str) -> dict[str, Any] | None:
try:
response = requests.get(url, timeout=300)
except requests.exceptions.Timeout:
print(f"ComfyUI-Image-Saver: HTTP GET Request timed out for {url}")
return None
except requests.exceptions.ConnectionError as e:
print(f"ComfyUI-Image-Saver: Warning - Network connection error for {url}: {e}")
return None
if not response.ok:
print(f"ComfyUI-Image-Saver: HTTP GET Request failed with error code: {response.status_code}: {response.reason}")
return None
try:
return response.json()
except ValueError as e:
print(f"ComfyUI-Image-Saver: HTTP Response JSON error: {e}")
return None