"""Utility functions for Noisemaker."""
from __future__ import annotations
import json
import os
import subprocess
from enum import Enum
from typing import Any
import tensorflow as tf
from loguru import logger as default_logger
from PIL import Image
from noisemaker.constants import ColorSpace
[docs]
def save(tensor: tf.Tensor, name: str = "noise.png") -> None:
"""
Save an image Tensor to a file.
Args:
tensor: Image tensor to save
name: Filename, ending with .png or .jpg
Returns:
None
"""
tensor = tf.image.convert_image_dtype(tensor, tf.uint8, saturate=True)
if name.lower().endswith(".png"):
data = tf.image.encode_png(tensor).numpy()
elif name.lower().endswith((".jpg", ".jpeg")):
data = tf.image.encode_jpeg(tensor).numpy()
else:
raise ValueError("Filename should end with .png or .jpg")
with open(name, "wb") as fh:
fh.write(data)
[docs]
def load(filename: str, channels: int | None = None) -> tf.Tensor:
"""
Load a .png or .jpg by filename.
Args:
filename: Path to the image file
channels: Optional number of channels to force
Returns:
Loaded image tensor
"""
with open(filename, "rb") as fh:
if filename.lower().endswith(".png"):
return tf.image.decode_png(fh.read(), channels=channels)
elif filename.lower().endswith((".jpg", ".jpeg")):
return tf.image.decode_jpeg(fh.read(), channels=channels)
[docs]
def magick(pattern: str, name: str) -> Any:
"""
Create a GIF from frames using ``ffmpeg``.
Args:
pattern: Frame filename pattern (e.g., ``/tmp/dir/*png`` or ``/tmp/dir/%04d.png``)
name: Output filename
Returns:
Result of subprocess call
"""
# Convert glob pattern to directory and ffmpeg pattern
# If pattern is like "/tmp/dir/*png", convert to "/tmp/dir/%04d.png"
import os
import tempfile
if "*" in pattern:
directory = os.path.dirname(pattern)
input_pattern = os.path.join(directory, "%04d.png")
else:
input_pattern = pattern
# Use ffmpeg to create GIF with good quality palette
# First generate a palette
palette_file = os.path.join(tempfile.gettempdir(), "noisemaker_palette.png")
palette_cmd = [
"ffmpeg",
"-y",
"-framerate",
"20",
"-i",
input_pattern,
"-vf",
"scale=flags=lanczos,palettegen",
palette_file,
]
check_call(palette_cmd, quiet=True)
# Then use the palette to create the GIF
gif_cmd = [
"ffmpeg",
"-y",
"-framerate",
"20",
"-i",
input_pattern,
"-i",
palette_file,
"-lavfi",
"scale=flags=lanczos[x];[x][1:v]paletteuse",
name,
]
return check_call(gif_cmd)
[docs]
def watermark(text: str, filename: str) -> Any:
"""
Annotate an image.
Args:
text: Text to add to the image
filename: Image filename to annotate
Returns:
Result of subprocess call
"""
return check_call(
["mood", "--filename", filename, "--text", text, "--font", "Nunito-VariableFont_wght", "--font-size", "16", "--no-rect", "--bottom", "--right"]
)
[docs]
def check_call(command: list[str], quiet: bool = False) -> Any:
"""Execute a subprocess command."""
try:
subprocess.run(command, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True)
except Exception as e:
if not quiet:
log_subprocess_error(command, e)
raise
[docs]
def log_subprocess_error(command: str | list[str], e: Exception) -> None:
"""Log subprocess execution errors with appropriate detail.
Args:
command: The command that failed.
e: The exception that was raised.
"""
if isinstance(e, subprocess.CalledProcessError):
logger.error(f"{e}: {e.output.strip()}")
else:
logger.error(f"Command '{command}' failed to execute: {e}")
[docs]
def get_noisemaker_dir() -> str:
"""Get the Noisemaker configuration directory path.
Returns:
Path to ~/.noisemaker or NOISEMAKER_DIR environment variable.
"""
return os.environ.get("NOISEMAKER_DIR", os.path.join(os.path.expanduser("~"), ".noisemaker"))
[docs]
def dumps(kwargs: dict[str, Any]) -> str:
"""Serialize kwargs to JSON, converting Enums to strings.
Args:
kwargs: Dictionary to serialize.
Returns:
Pretty-printed JSON string with sorted keys.
"""
out = {}
for k, v in kwargs.items():
if isinstance(v, Enum):
out[k] = str(v)
else:
out[k] = v
return json.dumps(out, indent=4, sort_keys=True)
[docs]
def shape_from_params(width: int, height: int, color_space: ColorSpace, with_alpha: bool) -> list[int]:
"""Construct a shape array from image parameters.
Args:
width: Image width in pixels.
height: Image height in pixels.
color_space: Color space (grayscale or color).
with_alpha: Include alpha channel.
Returns:
Shape list [height, width, channels].
"""
if color_space == ColorSpace.grayscale:
shape = [height, width, 1]
else:
shape = [height, width, 3]
if with_alpha:
shape[2] += 1
return shape
[docs]
def shape_from_file(filename: str) -> list[int]:
"""Extract image dimensions from a file using PIL.
Uses PIL to avoid adding operations to the TensorFlow computation graph.
Args:
filename: Path to image file.
Returns:
Shape list [height, width, channels].
"""
image = Image.open(filename)
input_width, input_height = image.size
return [input_height, input_width, len(image.getbands())]
[docs]
def from_srgb(srgb: tf.Tensor) -> tf.Tensor:
"""Convert an sRGB image tensor to Linear RGB color space.
Args:
srgb: Image tensor in sRGB color space.
Returns:
Image tensor in Linear RGB color space.
"""
condition = tf.less(srgb, 0.04045)
linear_rgb = tf.where(condition, srgb / 12.92, tf.pow((srgb + 0.055) / 1.055, 2.4))
return linear_rgb
[docs]
def from_linear_rgb(linear_rgb: tf.Tensor) -> tf.Tensor:
"""Convert a Linear RGB image tensor to sRGB color space.
Args:
linear_rgb: Image tensor in Linear RGB color space.
Returns:
Image tensor in sRGB color space.
"""
condition = tf.less(linear_rgb, 0.0031308)
srgb = tf.where(condition, linear_rgb * 12.92, 1.055 * tf.pow(linear_rgb, 1 / 2.4) - 0.055)
return srgb
_LOGS_DIR = os.path.join(get_noisemaker_dir(), "logs")
os.makedirs(_LOGS_DIR, exist_ok=True)
logger = default_logger
# logger.remove(0) # Remove loguru's default STDERR log handler
logger.add(os.path.join(_LOGS_DIR, "noisemaker.log"), retention="7 days")