diff --git a/src/zimscraperlib/image/optimization.py b/src/zimscraperlib/image/optimization.py index 6a57936..31f76f0 100644 --- a/src/zimscraperlib/image/optimization.py +++ b/src/zimscraperlib/image/optimization.py @@ -22,6 +22,7 @@ import os import pathlib import subprocess +import tempfile from dataclasses import dataclass from typing import overload @@ -38,7 +39,7 @@ ) from PIL import Image -from zimscraperlib.image.conversion import convert_image +from zimscraperlib.constants import ALPHA_NOT_SUPPORTED from zimscraperlib.image.probing import format_for from zimscraperlib.image.utils import save_image @@ -82,43 +83,67 @@ class OptimizePngOptions: @overload def optimize_png( src: pathlib.Path | io.BytesIO, - dst: io.BytesIO | None = None, + dst: None, options: OptimizePngOptions | None = None, + *, + convert: bool = False, +) -> io.BytesIO: ... +@overload +def optimize_png( + src: pathlib.Path | io.BytesIO, + dst: io.BytesIO, + options: OptimizePngOptions | None = None, + *, + convert: bool = False, ) -> io.BytesIO: ... @overload def optimize_png( src: pathlib.Path | io.BytesIO, dst: pathlib.Path, options: OptimizePngOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path: ... def optimize_png( src: pathlib.Path | io.BytesIO, dst: pathlib.Path | io.BytesIO | None = None, options: OptimizePngOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path | io.BytesIO: """method to optimize PNG files using a pure python external optimizer""" - ensure_matches(src, "PNG") - - img = Image.open(src) if options is None: options = OptimizePngOptions() - if options.remove_transparency: - img = remove_alpha(img, options.background_color) + if not convert: + ensure_matches(src, "PNG") + + with Image.open(src) as img: + if convert: + src_fmt = format_for(src, from_suffix=False) + if src_fmt and src_fmt != "PNG": + if img.mode == "RGBA": + img = img.convert("RGB") + + if options.remove_transparency: + img = remove_alpha(img, options.background_color) + + if options.reduce_colors: + img, _, _ = do_reduce_colors(img, options.max_colors) + + if not options.fast_mode and img.mode == "P": + img, _ = rebuild_palette(img) - if options.reduce_colors: - img, _, _ = do_reduce_colors(img, options.max_colors) + if dst is None: + dst = io.BytesIO() - if not options.fast_mode and img.mode == "P": - img, _ = rebuild_palette(img) + img.save(dst, optimize=True, format="PNG") + + if isinstance(dst, io.BytesIO): + dst.seek(0) - if dst is None: - dst = io.BytesIO() - img.save(dst, optimize=True, format="PNG") - if not isinstance(dst, pathlib.Path): - dst.seek(0) return dst @@ -144,79 +169,101 @@ class OptimizeJpgOptions: @overload def optimize_jpeg( src: pathlib.Path | io.BytesIO, - dst: io.BytesIO | None = None, + dst: None, options: OptimizeJpgOptions | None = None, + *, + convert: bool = False, +) -> io.BytesIO: ... +@overload +def optimize_jpeg( + src: pathlib.Path | io.BytesIO, + dst: io.BytesIO, + options: OptimizeJpgOptions | None = None, + *, + convert: bool = False, ) -> io.BytesIO: ... @overload def optimize_jpeg( src: pathlib.Path | io.BytesIO, dst: pathlib.Path, options: OptimizeJpgOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path: ... def optimize_jpeg( src: pathlib.Path | io.BytesIO, dst: pathlib.Path | io.BytesIO | None = None, options: OptimizeJpgOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path | io.BytesIO: """method to optimize JPEG files using a pure python external optimizer""" if options is None: options = OptimizeJpgOptions() - ensure_matches(src, "JPEG") - - img = Image.open(src) - orig_size = ( - os.path.getsize(src) - if isinstance(src, pathlib.Path) - else src.getbuffer().nbytes - ) - - had_exif = False - if ( - not isinstance(src, pathlib.Path) - and piexif.load(src.getvalue())[ # pyright: ignore[reportUnknownMemberType] - "Exif" - ] - ) or ( - isinstance(src, pathlib.Path) - and piexif.load(str(src))["Exif"] # pyright: ignore[reportUnknownMemberType] - ): - had_exif = True + if not convert: + ensure_matches(src, "JPEG") - # only use progressive if file size is bigger - use_progressive_jpg = orig_size > 10240 # 10KiB # noqa: PLR2004 + with Image.open(src) as img: + orig_size = ( + os.path.getsize(src) + if isinstance(src, pathlib.Path) + else src.getbuffer().nbytes + ) - if options.fast_mode: - quality_setting = options.quality - else: - quality_setting, _ = jpeg_dynamic_quality(img) - - if dst is None: - dst = io.BytesIO() - - img.save( - dst, - quality=quality_setting, - optimize=True, - progressive=use_progressive_jpg, - format="JPEG", - ) - - if isinstance(dst, io.BytesIO): - dst.seek(0) - - if options.keep_exif and had_exif: - piexif.transplant( # pyright: ignore[reportUnknownMemberType] - exif_src=( - str(src.resolve()) if isinstance(src, pathlib.Path) else src.getvalue() - ), - image=( - str(dst.resolve()) if isinstance(dst, pathlib.Path) else dst.getvalue() - ), - new_file=dst, + had_exif = False + if not convert: + if ( + not isinstance(src, pathlib.Path) + and piexif.load(src.getvalue())[ # pyright: ignore[reportUnknownMemberType] + "Exif" + ] + ) or ( + isinstance(src, pathlib.Path) + and piexif.load(str(src))["Exif"] # pyright: ignore[reportUnknownMemberType] + ): + had_exif = True + + if convert: + src_fmt = format_for(src, from_suffix=False) + if src_fmt and src_fmt != "JPEG": + if img.mode == "RGBA": + img = img.convert("RGB") + + # only use progressive if file size is bigger + use_progressive_jpg = orig_size > 10240 # 10KiB # noqa: PLR2004 + + if options.fast_mode: + quality_setting = options.quality + else: + quality_setting, _ = jpeg_dynamic_quality(img) + + if dst is None: + dst = io.BytesIO() + + img.save( + dst, + quality=quality_setting, + optimize=True, + progressive=use_progressive_jpg, + format="JPEG", ) + if isinstance(dst, io.BytesIO): + dst.seek(0) + + if options.keep_exif and had_exif: + piexif.transplant( # pyright: ignore[reportUnknownMemberType] + exif_src=( + str(src.resolve()) if isinstance(src, pathlib.Path) else src.getvalue() + ), + image=( + str(dst.resolve()) if isinstance(dst, pathlib.Path) else dst.getvalue() + ), + new_file=dst, + ) + return dst @@ -246,49 +293,70 @@ class OptimizeWebpOptions: @overload def optimize_webp( src: pathlib.Path | io.BytesIO, - dst: io.BytesIO | None = None, + dst: None, + options: OptimizeWebpOptions | None = None, + *, + convert: bool = False, +) -> io.BytesIO: ... +@overload +def optimize_webp( + src: pathlib.Path | io.BytesIO, + dst: io.BytesIO, options: OptimizeWebpOptions | None = None, + *, + convert: bool = False, ) -> io.BytesIO: ... @overload def optimize_webp( src: pathlib.Path | io.BytesIO, dst: pathlib.Path, options: OptimizeWebpOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path: ... def optimize_webp( src: pathlib.Path | io.BytesIO, dst: pathlib.Path | io.BytesIO | None = None, options: OptimizeWebpOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path | io.BytesIO: """method to optimize WebP using Pillow options""" if options is None: options = OptimizeWebpOptions() - ensure_matches(src, "WEBP") + if not convert: + ensure_matches(src, "WEBP") + params: dict[str, bool | int | None] = { "lossless": options.lossless, "quality": options.quality, "method": options.method, } - webp_image = Image.open(src) - if dst is None: - dst = io.BytesIO() - webp_image.save(dst, format="WEBP", **params) - dst.seek(0) - else: - try: - save_image(webp_image, dst, fmt="WEBP", **params) - except Exception as exc: # pragma: no cover - if ( - isinstance(src, pathlib.Path) - and isinstance(dst, pathlib.Path) - and src.resolve() != dst.resolve() - and dst.exists() - ): - dst.unlink() - raise exc + with Image.open(src) as img: + if convert: + src_fmt = format_for(src, from_suffix=False) + if src_fmt and src_fmt != "WEBP": + if img.mode == "RGBA": + img = img.convert("RGB") + + if dst is None: + dst = io.BytesIO() + img.save(dst, format="WEBP", **params) + dst.seek(0) + elif isinstance(dst, io.BytesIO): + img.save(dst, format="WEBP", **params) + dst.seek(0) + else: + try: + img.save(dst, format="WEBP", **params) + except Exception as exc: + if dst.exists(): + dst.unlink() + raise exc + return dst @@ -322,37 +390,79 @@ class OptimizeGifOptions: def optimize_gif( - src: pathlib.Path, dst: pathlib.Path, options: OptimizeGifOptions | None = None + src: pathlib.Path | io.BytesIO, + dst: pathlib.Path, + options: OptimizeGifOptions | None = None, + *, + convert: bool = False, ) -> pathlib.Path: """method to optimize GIFs using gifsicle >= 1.92""" if options is None: options = OptimizeGifOptions() - ensure_matches(src, "GIF") - - # use gifsicle - args = ["/usr/bin/env", "gifsicle"] - if options.optimize_level: - args += [f"-O{options.optimize_level}"] - if options.max_colors: - args += ["--colors", str(options.max_colors)] - if options.lossiness: - args += [f"--lossy={options.lossiness}"] - if options.no_extensions: - args += ["--no-extensions"] - if options.interlace: - args += ["--interlace"] - args += [str(src)] - with open(dst, "w") as out_file: - gifsicle = subprocess.run(args, stdout=out_file, check=False) - - # remove dst if gifsicle failed and src is different from dst - if gifsicle.returncode != 0 and src.resolve() != dst.resolve() and dst.exists(): - dst.unlink() # pragma: no cover - - # raise error if unsuccessful - gifsicle.check_returncode() + temp_files = [] + src_path = None + + try: + if convert: + src_fmt = format_for(src, from_suffix=False) + if src_fmt and src_fmt != "GIF": + + with Image.open(src) as img: + if img.mode == "RGBA": + img = img.convert("RGB") + temp_converted = pathlib.Path(tempfile.mktemp(suffix=".gif")) + img.save(temp_converted, format="GIF") + temp_files.append(temp_converted) + src_path = temp_converted + else: + if isinstance(src, io.BytesIO): + temp_gif = pathlib.Path(tempfile.mktemp(suffix=".gif")) + temp_gif.write_bytes(src.read()) + temp_files.append(temp_gif) + src_path = temp_gif + else: + src_path = src + else: + ensure_matches(src, "GIF") + + if isinstance(src, io.BytesIO): + temp_gif = pathlib.Path(tempfile.mktemp(suffix=".gif")) + temp_gif.write_bytes(src.read()) + temp_files.append(temp_gif) + src_path = temp_gif + else: + src_path = src + + # use gifsicle + args = ["/usr/bin/env", "gifsicle"] + if options.optimize_level: + args += [f"-O{options.optimize_level}"] + if options.max_colors: + args += ["--colors", str(options.max_colors)] + if options.lossiness: + args += [f"--lossy={options.lossiness}"] + if options.no_extensions: + args += ["--no-extensions"] + if options.interlace: + args += ["--interlace"] + args += [str(src_path)] + with open(dst, "wb") as out_file: + gifsicle = subprocess.run(args, stdout=out_file, check=False) + + # remove dst if gifsicle failed and src is different from dst + if gifsicle.returncode != 0 and src_path != dst and dst.exists(): + dst.unlink() # pragma: no cover + + # raise error if unsuccessful + gifsicle.check_returncode() + + finally: + for temp_file in temp_files: + if temp_file and temp_file.exists(): + temp_file.unlink() + return dst @@ -383,16 +493,18 @@ def of( def optimize_image( - src: pathlib.Path, + src: pathlib.Path | io.BytesIO | bytes, dst: pathlib.Path, options: OptimizeOptions | None = None, *, + fmt: str | None = None, delete_src: bool | None = False, convert: bool | str | None = False, ): """Optimize image, automatically selecting correct optimizer Arguments: + fmt: format of the source image, needed when src is io.BytesIO or bytes delete_src: whether to remove src file upon success (boolean) values: True | False convert: whether/how to convert from source before optimizing (str or boolean) @@ -403,7 +515,19 @@ def optimize_image( if options is None: options = OptimizeOptions.of() - src_format, dst_format = format_for(src, from_suffix=False), format_for(dst) + if isinstance(src, bytes): + src = io.BytesIO(src) + + if delete_src and isinstance(src, io.BytesIO): + raise ValueError("delete_src is not applicable when src is io.BytesIO or bytes") + + dst_format = format_for(dst) + if isinstance(src, io.BytesIO): + if not fmt: + raise ValueError("fmt is required when src is io.BytesIO or bytes") + src_format = fmt.upper() + else: + src_format = format_for(src, from_suffix=False) if src_format is None: # pragma: no cover # never supposed to happens since we get format from suffix, but good for type @@ -413,26 +537,26 @@ def optimize_image( raise ValueError("Impossible to guess format from dst image") # if requested, convert src to requested format into dst path if convert and src_format != dst_format: - src_format = dst_format = convert if isinstance(convert, str) else dst_format - convert_image(src, dst, fmt=src_format) - src_img = pathlib.Path(dst) + output_format = ( + convert if isinstance(convert, str) else dst_format + ).lower() else: - src_img = pathlib.Path(src) - - src_format = src_format.lower() - if src_format in ("jpg", "jpeg"): - optimize_jpeg(src=src_img, dst=dst, options=options.jpg) - elif src_format == "gif": - optimize_gif(src=src_img, dst=dst, options=options.gif) - elif src_format == "png": - optimize_png(src=src_img, dst=dst, options=options.png) - elif src_format == "webp": - optimize_webp(src=src_img, dst=dst, options=options.webp) + output_format = src_format.lower() + + needs_convert = bool(convert and src_format != dst_format) + + if output_format in ("jpg", "jpeg"): + optimize_jpeg(src=src, dst=dst, options=options.jpg, convert=needs_convert) + elif output_format == "gif": + optimize_gif(src=src, dst=dst, options=options.gif, convert=needs_convert) + elif output_format == "png": + optimize_png(src=src, dst=dst, options=options.png, convert=needs_convert) + elif output_format == "webp": + optimize_webp(src=src, dst=dst, options=options.webp, convert=needs_convert) else: raise NotImplementedError( - f"Image format '{src_format}' cannot yet be optimized" + f"Image format '{output_format}' cannot yet be optimized" ) - # delete src image if requested - if delete_src and src.exists() and src.resolve() != dst.resolve(): + if delete_src and isinstance(src, pathlib.Path) and src.exists() and not src.samefile(dst): src.unlink()