Some checks failed
Python Linting / Run Ruff (push) Has been cancelled
Python Linting / Run Pylint (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.10, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.11, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-stable (12.1, , linux, 3.12, [self-hosted Linux], stable) (push) Has been cancelled
Full Comfy CI Workflow Runs / test-unix-nightly (12.1, , linux, 3.11, [self-hosted Linux], nightly) (push) Has been cancelled
Execution Tests / test (macos-latest) (push) Has been cancelled
Execution Tests / test (ubuntu-latest) (push) Has been cancelled
Execution Tests / test (windows-latest) (push) Has been cancelled
Test server launches without errors / test (push) Has been cancelled
Unit Tests / test (macos-latest) (push) Has been cancelled
Unit Tests / test (ubuntu-latest) (push) Has been cancelled
Unit Tests / test (windows-2022) (push) Has been cancelled
Includes 30 custom nodes committed directly, 7 Civitai-exclusive loras stored via Git LFS, and a setup script that installs all dependencies and downloads HuggingFace-hosted models on vast.ai. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
152 lines
6.4 KiB
Python
152 lines
6.4 KiB
Python
import hashlib
|
|
import os
|
|
import requests
|
|
from typing import Optional, Any
|
|
from collections.abc import Collection, Iterator
|
|
from pathlib import Path
|
|
from tqdm import tqdm
|
|
import folder_paths
|
|
import re
|
|
|
|
def sanitize_filename(filename: str) -> str:
|
|
"""Remove characters that are unsafe for filenames."""
|
|
# Remove characters that are generally unsafe across file systems
|
|
unsafe_chars = r'[<>:"|?*\x00-\x1f]'
|
|
sanitized = re.sub(unsafe_chars, '', filename)
|
|
|
|
# Remove trailing periods and spaces (problematic on Windows)
|
|
sanitized = sanitized.rstrip('. ')
|
|
return sanitized
|
|
|
|
def get_sha256(file_path: str) -> str:
|
|
"""
|
|
Given the file path, finds a matching sha256 file, or creates one
|
|
based on the headers in the source file
|
|
"""
|
|
file_no_ext = os.path.splitext(file_path)[0]
|
|
hash_file = file_no_ext + ".sha256"
|
|
|
|
if os.path.exists(hash_file):
|
|
try:
|
|
with open(hash_file, "r") as f:
|
|
return f.read().strip()
|
|
except OSError as e:
|
|
print(f"ComfyUI-Image-Saver: Error reading existing hash file: {e}")
|
|
|
|
sha256_hash = hashlib.sha256()
|
|
with open(file_path, "rb") as f:
|
|
file_size = os.fstat(f.fileno()).st_size
|
|
block_size = 1048576 # 1 MB
|
|
|
|
print(f"ComfyUI-Image-Saver: Calculating sha256 for '{Path(file_path).stem}'")
|
|
with tqdm(None, None, file_size, unit="B", unit_scale=True, unit_divisor=1024) as progress_bar:
|
|
for byte_block in iter(lambda: f.read(block_size), b""):
|
|
progress_bar.update(len(byte_block))
|
|
sha256_hash.update(byte_block)
|
|
|
|
try:
|
|
with open(hash_file, "w") as f:
|
|
f.write(sha256_hash.hexdigest())
|
|
except OSError as e:
|
|
print(f"ComfyUI-Image-Saver: Error writing hash to {hash_file}: {e}")
|
|
|
|
return sha256_hash.hexdigest()
|
|
|
|
def full_embedding_path_for(embedding: str) -> Optional[str]:
|
|
"""
|
|
Based on a embedding name, eg: EasyNegative, finds the path as known in comfy, including extension
|
|
"""
|
|
matching_embedding = get_file_path_match("embeddings", embedding)
|
|
if matching_embedding is None:
|
|
print(f'ComfyUI-Image-Saver: could not find full path to embedding "{embedding}"')
|
|
return None
|
|
return folder_paths.get_full_path("embeddings", matching_embedding)
|
|
|
|
def full_lora_path_for(lora: str) -> Optional[str]:
|
|
"""
|
|
Based on a lora name, e.g., 'epi_noise_offset2', finds the path as known in comfy, including extension.
|
|
"""
|
|
# Find the matching lora path
|
|
matching_lora = get_file_path_match("loras", lora)
|
|
if matching_lora is None:
|
|
print(f'ComfyUI-Image-Saver: could not find full path to lora "{lora}"')
|
|
return None
|
|
return folder_paths.get_full_path("loras", matching_lora)
|
|
|
|
def full_checkpoint_path_for(model_name: str) -> str:
|
|
if not model_name:
|
|
return ''
|
|
|
|
supported_extensions = set(folder_paths.supported_pt_extensions) | {".gguf"}
|
|
|
|
matching_checkpoint = get_file_path_match("checkpoints", model_name, supported_extensions)
|
|
if matching_checkpoint is not None:
|
|
return folder_paths.get_full_path("checkpoints", matching_checkpoint)
|
|
|
|
matching_model = get_file_path_match("diffusion_models", model_name, supported_extensions)
|
|
if matching_model:
|
|
return folder_paths.get_full_path("diffusion_models", matching_model)
|
|
|
|
print(f'Could not find full path to checkpoint "{model_name}"')
|
|
return ''
|
|
|
|
def get_file_path_iterator(folder_name: str, supported_extensions: Optional[Collection[str]] = None) -> Iterator[Path]:
|
|
"""
|
|
Returns an iterator over valid file paths for the specified model folder.
|
|
"""
|
|
if supported_extensions is None:
|
|
return (Path(x) for x in folder_paths.get_filename_list(folder_name))
|
|
else:
|
|
return custom_file_path_generator(folder_name, supported_extensions)
|
|
|
|
def custom_file_path_generator(folder_name: str, supported_extensions: Collection[str]) -> Iterator[Path]:
|
|
"""
|
|
Generator function for file paths, allowing for a customized extension check.
|
|
"""
|
|
model_paths = folder_paths.folder_names_and_paths.get(folder_name, [[], set()])[0]
|
|
for path in model_paths:
|
|
if os.path.exists(path):
|
|
base_path = Path(path)
|
|
for root, _, files in os.walk(path):
|
|
root_path = Path(root).relative_to(base_path)
|
|
for file in files:
|
|
file_path = root_path / file
|
|
if file_path.suffix.lower() in supported_extensions:
|
|
yield file_path
|
|
|
|
def get_file_path_match(folder_name: str, file_name: str, supported_extensions: Optional[Collection[str]] = None) -> Optional[str]:
|
|
supported_extensions_fallback = supported_extensions if supported_extensions is not None else folder_paths.supported_pt_extensions
|
|
file_path = Path(file_name)
|
|
|
|
# first try full path match, then fallback to just name match, matching the extension if appropriate
|
|
if file_path.suffix.lower() not in supported_extensions_fallback:
|
|
matching_file_path = next((p for p in get_file_path_iterator(folder_name, supported_extensions) if p.with_suffix('') == file_path), None)
|
|
matching_file_path = (matching_file_path if matching_file_path is not None else
|
|
next((p for p in get_file_path_iterator(folder_name, supported_extensions) if p.stem == file_path.name), None))
|
|
else:
|
|
matching_file_path = next((p for p in get_file_path_iterator(folder_name, supported_extensions) if p == file_path), None)
|
|
matching_file_path = (matching_file_path if matching_file_path is not None else
|
|
next((p for p in get_file_path_iterator(folder_name, supported_extensions) if p.name == file_path.name), None))
|
|
|
|
return str(matching_file_path) if matching_file_path is not None else None
|
|
|
|
def http_get_json(url: str) -> dict[str, Any] | None:
|
|
try:
|
|
response = requests.get(url, timeout=300)
|
|
except requests.exceptions.Timeout:
|
|
print(f"ComfyUI-Image-Saver: HTTP GET Request timed out for {url}")
|
|
return None
|
|
except requests.exceptions.ConnectionError as e:
|
|
print(f"ComfyUI-Image-Saver: Warning - Network connection error for {url}: {e}")
|
|
return None
|
|
|
|
if not response.ok:
|
|
print(f"ComfyUI-Image-Saver: HTTP GET Request failed with error code: {response.status_code}: {response.reason}")
|
|
return None
|
|
|
|
try:
|
|
return response.json()
|
|
except ValueError as e:
|
|
print(f"ComfyUI-Image-Saver: HTTP Response JSON error: {e}")
|
|
return None
|