# metadata/parser.py import json import logging from pathlib import Path from typing import Optional, Dict, Any, Tuple from PIL import Image import piexif import piexif.helper logger = logging.getLogger("ComfyGallery.MetadataParser") class MetadataParser: @staticmethod def extract_raw_metadata(filepath: str) -> Tuple[Optional[str], Optional[str]]: path = Path(filepath) if not path.exists(): return None, None suffix = path.suffix.lower() if suffix in ('.png', '.webp'): return MetadataParser._extract_from_png_webp(path) elif suffix in ('.jpg', '.jpeg'): return MetadataParser._extract_from_jpeg(path) return None, None @staticmethod def _extract_from_png_webp(path: Path) -> Tuple[Optional[str], Optional[str]]: try: with Image.open(path) as img: info = img.info prompt = info.get("prompt") workflow = info.get("workflow") if not prompt and "comment" in info: prompt = info.get("comment") return prompt, workflow except Exception as e: logger.error(f"Ошибка чтения PNG/WebP метаданных {path.name}: {e}") return None, None @staticmethod def _extract_from_jpeg(path: Path) -> Tuple[Optional[str], Optional[str]]: try: with Image.open(path) as img: if "exif" not in img.info: return None, None exif_dict = piexif.load(img.info["exif"]) user_comment_bytes = exif_dict.get("Exif", {}).get(piexif.ExifIFD.UserComment, b"") if not user_comment_bytes: return None, None try: comment_str = piexif.helper.UserComment.load(user_comment_bytes) except ValueError: comment_str = user_comment_bytes.decode('utf-8', errors='ignore') if comment_str.startswith("{"): try: data = json.loads(comment_str) prompt = data.get("prompt") workflow = data.get("workflow") if isinstance(prompt, dict): prompt = json.dumps(prompt) if isinstance(workflow, dict): workflow = json.dumps(workflow) return prompt, workflow except json.JSONDecodeError: return comment_str, None return None, None except Exception as e: logger.error(f"Ошибка чтения JPEG EXIF {path.name}: {e}") return None, None @classmethod def parse_comfy_parameters(cls, prompt_json_str: Optional[str]) -> Dict[str, Any]: result = { "positive_prompt": None, "negative_prompt": None, "seed": None, "model_name": None, "sampler": None, "steps": None, "cfg": None } if not prompt_json_str: return result try: prompt_graph = json.loads(prompt_json_str) if not isinstance(prompt_graph, dict): return result except json.JSONDecodeError: return result sampler_node = None for node_id, node in prompt_graph.items(): class_type = node.get("class_type", "") if "KSampler" in class_type: sampler_node = node break if sampler_node: inputs = sampler_node.get("inputs", {}) result["seed"] = inputs.get("seed") or inputs.get("noise_seed") result["steps"] = inputs.get("steps") result["cfg"] = inputs.get("cfg") result["sampler"] = inputs.get("sampler_name") result["positive_prompt"] = cls._trace_conditioning(inputs.get("positive"), prompt_graph) result["negative_prompt"] = cls._trace_conditioning(inputs.get("negative"), prompt_graph) result["model_name"] = cls._trace_model(inputs.get("model"), prompt_graph) else: positives = [] for node in prompt_graph.values(): if node.get("class_type") == "CLIPTextEncode": text = node.get("inputs", {}).get("text", "") if text and len(text.strip()) > 0: positives.append(text.strip()) if positives: result["positive_prompt"] = "\n---\n".join(positives) def clean_string(val) -> Optional[str]: if val is None: return None if isinstance(val, list): if all(isinstance(x, str) for x in val): return "\n".join(val) return json.dumps(val) if isinstance(val, dict): return json.dumps(val) return str(val) def clean_int(val) -> Optional[int]: if val is None or isinstance(val, (list, dict)): return None try: return int(val) except (ValueError, TypeError): return None def clean_float(val) -> Optional[float]: if val is None or isinstance(val, (list, dict)): return None try: return float(val) except (ValueError, TypeError): return None result["positive_prompt"] = clean_string(result["positive_prompt"]) result["negative_prompt"] = clean_string(result["negative_prompt"]) result["model_name"] = clean_string(result["model_name"]) result["sampler"] = clean_string(result["sampler"]) result["seed"] = clean_int(result["seed"]) result["steps"] = clean_int(result["steps"]) result["cfg"] = clean_float(result["cfg"]) return result @classmethod def _trace_conditioning(cls, link: Optional[list], graph: dict) -> Optional[str]: if not link or not isinstance(link, list) or len(link) < 1: return None node_id = str(link[0]) node = graph.get(node_id) if not node: return None class_type = node.get("class_type", "") inputs = node.get("inputs", {}) if class_type in ("CLIPTextEncode", "CLIPTextEncodeSDXL", "CLIPTextEncodeSequence"): return inputs.get("text") or inputs.get("text_g") if "Conditioning" in class_type: for key, val in inputs.items(): if isinstance(val, list) and len(val) >= 1: text = cls._trace_conditioning(val, graph) if text: return text return None @classmethod def _trace_model(cls, link: Optional[list], graph: dict) -> Optional[str]: if not link or not isinstance(link, list) or len(link) < 1: return None node_id = str(link[0]) node = graph.get(node_id) if not node: return None class_type = node.get("class_type", "") inputs = node.get("inputs", {}) if "CheckpointLoader" in class_type: return inputs.get("ckpt_name") elif "LoraLoader" in class_type or "ModelMerge" in class_type: return cls._trace_model(inputs.get("model"), graph) return None