Files
logimage-generator/docs/superpowers/plans/2026-05-21-pipeline-conversion-ameliore.md
T
Vincent Bourdon 5a03f8a38d import initial
2026-06-10 10:21:18 +02:00

18 KiB

Pipeline de conversion amélioré — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Remplacer le pipeline Canny+blend+Otsu par un pipeline bilateral+posterize+Otsu+cleanup morphologique qui garantit ≤ 6 blocs par ligne/colonne.

Architecture: Le PillowImageConverter reçoit des paramètres configurables (max_clue_blocks, min_component_size, max_cleanup_iterations) et applique une boucle de nettoyage morphologique jusqu'à ce que la complexité des clues soit acceptable. Une LocalFileImageSource permet de tester avec des images locales sans appel API.

Tech Stack: Python 3.12, OpenCV (cv2), NumPy, Pillow


Fichiers

  • Modifier: src/logimage/infrastructure/image/pillow_converter.py
  • Créer: src/logimage/infrastructure/image/local_file_source.py
  • Modifier: src/logimage/cli/main.py
  • Modifier: tests/infrastructure/test_pillow_converter.py
  • Créer: tests/fixtures/simple_icon.png (généré dans les tests)

Task 1 : Nouveau pipeline PillowImageConverter

Files:

  • Modify: src/logimage/infrastructure/image/pillow_converter.py

  • Modify: tests/infrastructure/test_pillow_converter.py

  • Step 1: Écrire les tests qui couvrent le nouveau comportement

Remplacer le contenu de tests/infrastructure/test_pillow_converter.py par :

import io
import numpy as np
from PIL import Image, ImageDraw
import pytest
from logimage.infrastructure.image.pillow_converter import PillowImageConverter
from logimage.domain.value_objects.grid import Grid


def _make_jpeg_bytes(width: int = 100, height: int = 100, color: tuple[int, int, int] = (128, 64, 192)) -> bytes:
    img = Image.new("RGB", (width, height), color=color)
    buf = io.BytesIO()
    img.save(buf, format="JPEG")
    return buf.getvalue()


def _make_silhouette_bytes(width: int = 100, height: int = 100) -> bytes:
    """Image avec un carré noir centré sur fond blanc."""
    img = Image.new("RGB", (width, height), color=(255, 255, 255))
    draw = ImageDraw.Draw(img)
    margin = width // 4
    draw.rectangle([margin, margin, width - margin, height - margin], fill=(0, 0, 0))
    buf = io.BytesIO()
    img.save(buf, format="PNG")
    return buf.getvalue()


def test_output_grid_has_correct_dimensions() -> None:
    converter = PillowImageConverter()
    grid = converter.to_grid(_make_jpeg_bytes(), width=10, height=10)
    assert grid.width == 10
    assert grid.height == 10


def test_output_is_grid_instance() -> None:
    converter = PillowImageConverter()
    result = converter.to_grid(_make_jpeg_bytes(), width=15, height=15)
    assert isinstance(result, Grid)


def test_output_cells_are_booleans() -> None:
    converter = PillowImageConverter()
    grid = converter.to_grid(_make_jpeg_bytes(), width=10, height=10)
    for row_idx in range(grid.height):
        for cell in grid.row(row_idx):
            assert isinstance(cell, bool)


def test_black_image_produces_all_true_cells() -> None:
    img = Image.new("RGB", (100, 100), color=(0, 0, 0))
    buf = io.BytesIO()
    img.save(buf, format="JPEG")
    converter = PillowImageConverter()
    grid = converter.to_grid(buf.getvalue(), width=10, height=10)
    total_true = sum(cell for row in range(grid.height) for cell in grid.row(row))
    assert total_true > 0


def test_white_image_produces_mostly_false_cells() -> None:
    img = Image.new("RGB", (100, 100), color=(255, 255, 255))
    buf = io.BytesIO()
    img.save(buf, format="JPEG")
    converter = PillowImageConverter()
    grid = converter.to_grid(buf.getvalue(), width=10, height=10)
    total_true = sum(cell for row in range(grid.height) for cell in grid.row(row))
    assert total_true < grid.width * grid.height


def test_max_clue_blocks_respected_on_silhouette() -> None:
    """Une silhouette simple doit produire ≤ max_clue_blocks blocs par ligne/colonne."""
    converter = PillowImageConverter(max_clue_blocks=6)
    grid = converter.to_grid(_make_silhouette_bytes(), width=20, height=20)
    for i in range(grid.height):
        blocks = _count_blocks(grid.row(i))
        assert blocks <= 6, f"Ligne {i} a {blocks} blocs (max=6)"
    for j in range(grid.width):
        blocks = _count_blocks(grid.col(j))
        assert blocks <= 6, f"Colonne {j} a {blocks} blocs (max=6)"


def test_custom_max_clue_blocks() -> None:
    """max_clue_blocks=3 produit une grille encore plus simple."""
    converter = PillowImageConverter(max_clue_blocks=3)
    grid = converter.to_grid(_make_silhouette_bytes(), width=15, height=15)
    for i in range(grid.height):
        assert _count_blocks(grid.row(i)) <= 3
    for j in range(grid.width):
        assert _count_blocks(grid.col(j)) <= 3


def test_accepts_png_bytes() -> None:
    converter = PillowImageConverter()
    grid = converter.to_grid(_make_silhouette_bytes(), width=10, height=10)
    assert grid.width == 10


def _count_blocks(line: tuple[bool, ...]) -> int:
    blocks = 0
    in_block = False
    for cell in line:
        if cell:
            if not in_block:
                blocks += 1
                in_block = True
        else:
            in_block = False
    return blocks
  • Step 2: Vérifier que les nouveaux tests échouent
pytest tests/infrastructure/test_pillow_converter.py -v

Attendu : test_max_clue_blocks_respected_on_silhouette et test_custom_max_clue_blocks FAIL (le pipeline actuel ne garantit pas le seuil).

  • Step 3: Implémenter le nouveau pipeline

Remplacer le contenu de src/logimage/infrastructure/image/pillow_converter.py par :

import io
import cv2
import numpy as np
from PIL import Image
from logimage.domain.ports.image_converter import ImageConverter
from logimage.domain.value_objects.grid import Grid


class PillowImageConverter(ImageConverter):
    def __init__(
        self,
        max_clue_blocks: int = 6,
        min_component_size: int = 2,
        max_cleanup_iterations: int = 3,
    ) -> None:
        self.max_clue_blocks = max_clue_blocks
        self.min_component_size = min_component_size
        self.max_cleanup_iterations = max_cleanup_iterations

    def to_grid(self, image_bytes: bytes, width: int, height: int) -> Grid:
        img = Image.open(io.BytesIO(image_bytes)).convert("L")
        img = img.resize((width, height), Image.LANCZOS)
        arr = np.array(img, dtype=np.uint8)

        filtered = cv2.bilateralFilter(arr, d=9, sigmaColor=75, sigmaSpace=75)

        median = int(np.median(filtered))
        posterized = np.where(filtered < median, np.uint8(0), np.uint8(255))

        _, binary = cv2.threshold(
            posterized, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU
        )

        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
        binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
        binary = self._remove_small_components(binary)

        kernel_size = 3
        for _ in range(self.max_cleanup_iterations):
            if self._max_blocks(binary) <= self.max_clue_blocks:
                break
            kernel_size += 1
            k = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size, kernel_size))
            binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, k)
            binary = self._remove_small_components(binary)

        cells = [
            [bool(binary[y, x]) for x in range(width)]
            for y in range(height)
        ]
        return Grid.from_list(cells)

    def _remove_small_components(self, binary: np.ndarray) -> np.ndarray:
        num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
            binary, connectivity=8
        )
        result = np.zeros_like(binary)
        for label in range(1, num_labels):
            if stats[label, cv2.CC_STAT_AREA] >= self.min_component_size:
                result[labels == label] = 255
        return result

    def _max_blocks(self, binary: np.ndarray) -> int:
        max_b = 0
        for row in binary:
            max_b = max(max_b, self._count_blocks(row))
        for col in binary.T:
            max_b = max(max_b, self._count_blocks(col))
        return max_b

    def _count_blocks(self, line: np.ndarray) -> int:
        blocks = 0
        in_block = False
        for pixel in line:
            if pixel > 0:
                if not in_block:
                    blocks += 1
                    in_block = True
            else:
                in_block = False
        return blocks
  • Step 4: Vérifier que tous les tests passent
pytest tests/infrastructure/test_pillow_converter.py -v

Attendu : tous PASS.

  • Step 5: Lancer la suite complète
pytest -v

Attendu : tous PASS.

  • Step 6: Commit
git add src/logimage/infrastructure/image/pillow_converter.py tests/infrastructure/test_pillow_converter.py
git commit -m "feat: improve conversion pipeline with bilateral filter, posterize and morphological cleanup"

Task 2 : LocalFileImageSource

Files:

  • Create: src/logimage/infrastructure/image/local_file_source.py

  • Create: tests/infrastructure/test_local_file_source.py

  • Step 1: Écrire le test

Créer tests/infrastructure/test_local_file_source.py :

import io
import pytest
from pathlib import Path
from PIL import Image
from logimage.infrastructure.image.local_file_source import LocalFileImageSource
from logimage.domain.value_objects.image_data import ImageData


def test_fetch_returns_image_data(tmp_path: Path) -> None:
    img = Image.new("RGB", (50, 50), color=(100, 150, 200))
    path = tmp_path / "test_icon.png"
    img.save(path, format="PNG")

    source = LocalFileImageSource(path)
    result = source.fetch()

    assert isinstance(result, ImageData)
    assert result.content == path.read_bytes()
    assert result.title == "test_icon"


def test_fetch_title_is_stem(tmp_path: Path) -> None:
    img = Image.new("RGB", (10, 10))
    path = tmp_path / "my_cool_image.jpg"
    img.save(path, format="JPEG")

    source = LocalFileImageSource(path)
    result = source.fetch()

    assert result.title == "my_cool_image"


def test_fetch_raises_if_file_not_found() -> None:
    source = LocalFileImageSource(Path("/nonexistent/image.png"))
    with pytest.raises(FileNotFoundError):
        source.fetch()
  • Step 2: Vérifier que les tests échouent
pytest tests/infrastructure/test_local_file_source.py -v

Attendu : FAIL (module inexistant).

  • Step 3: Implémenter LocalFileImageSource

Créer src/logimage/infrastructure/image/local_file_source.py :

from pathlib import Path
from logimage.domain.ports.image_source import ImageSource
from logimage.domain.value_objects.image_data import ImageData


class LocalFileImageSource(ImageSource):
    def __init__(self, path: Path) -> None:
        self._path = path

    def fetch(self, theme: str | None = None) -> ImageData:
        if not self._path.exists():
            raise FileNotFoundError(f"Image not found: {self._path}")
        return ImageData(content=self._path.read_bytes(), title=self._path.stem)
  • Step 4: Vérifier que les tests passent
pytest tests/infrastructure/test_local_file_source.py -v

Attendu : tous PASS.

  • Step 5: Lancer la suite complète
pytest -v

Attendu : tous PASS.

  • Step 6: Commit
git add src/logimage/infrastructure/image/local_file_source.py tests/infrastructure/test_local_file_source.py
git commit -m "feat: add LocalFileImageSource for local image input"

Task 3 : Option --local-image dans le CLI

Files:

  • Modify: src/logimage/cli/main.py

  • Modify: tests/cli/test_main_routing.py

  • Step 1: Lire les tests CLI existants pour comprendre le pattern

cat -n tests/cli/test_main_routing.py
  • Step 2: Écrire le test pour --local-image

Ajouter à tests/cli/test_main_routing.py (après les imports existants, ajouter l'import et les tests suivants) :

# Ajouter en haut du fichier si pas déjà présent :
# from pathlib import Path
# import io
# from PIL import Image

def _make_png_file(tmp_path: Path) -> Path:
    img = Image.new("RGB", (50, 50), color=(200, 100, 50))
    path = tmp_path / "icon.png"
    img.save(path, format="PNG")
    return path


def test_local_image_flag_uses_local_source(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
    icon = _make_png_file(tmp_path)
    output = tmp_path / "out.pdf"

    captured_source = {}

    original_init = GeneratePuzzlesUseCase.__init__

    def patched_init(self, image_source, image_converter, pdf_exporter):
        captured_source["source"] = image_source
        original_init(self, image_source, image_converter, pdf_exporter)

    monkeypatch.setattr(GeneratePuzzlesUseCase, "__init__", patched_init)
    monkeypatch.setattr("logimage.application.use_cases.generate_puzzles.GeneratePuzzlesUseCase.execute", lambda *a, **kw: None)

    from logimage.cli.main import main
    monkeypatch.setattr("sys.argv", ["logimage", "--local-image", str(icon), "--output", str(output)])
    main()

    from logimage.infrastructure.image.local_file_source import LocalFileImageSource
    assert isinstance(captured_source["source"], LocalFileImageSource)

Note: si le pattern de test existant est différent (mocks directs, fixtures spécifiques), adapte ce test au style du fichier.

  • Step 3: Vérifier que le test échoue
pytest tests/cli/test_main_routing.py -v -k "test_local_image"

Attendu : FAIL.

  • Step 4: Ajouter l'option --local-image dans main.py

Dans src/logimage/cli/main.py, après les imports existants ajouter :

from logimage.infrastructure.image.local_file_source import LocalFileImageSource

Dans la fonction main(), après la définition de --output, ajouter l'argument :

    parser.add_argument(
        "--local-image",
        type=Path,
        metavar="PATH",
        help="Use a local image file instead of fetching from an API",
    )

Remplacer le bloc qui crée image_source (lignes avec pexels_key, unsplash_key) par :

    image_source: ImageSource
    if args.local_image:
        image_source = LocalFileImageSource(args.local_image)
    elif pexels_key:
        image_source = PexelsImageSource(api_key=pexels_key)
    elif unsplash_key:
        image_source = UnsplashImageSource(api_key=unsplash_key)
    else:
        print(
            "Error: no image API key found.\n"
            "Set PEXELS_API_KEY or UNSPLASH_ACCESS_KEY in your .env file.",
            file=sys.stderr,
        )
        sys.exit(1)
  • Step 5: Vérifier que les tests passent
pytest tests/cli/ -v

Attendu : tous PASS.

  • Step 6: Lancer la suite complète
pytest -v

Attendu : tous PASS.

  • Step 7: Commit
git add src/logimage/cli/main.py tests/cli/test_main_routing.py
git commit -m "feat: add --local-image CLI option for local file input"

Task 4 : Test d'intégration avec image locale

Files:

  • Create: tests/infrastructure/test_pipeline_integration.py

  • Step 1: Écrire le test d'intégration

Créer tests/infrastructure/test_pipeline_integration.py :

import io
from pathlib import Path
from PIL import Image, ImageDraw
from logimage.infrastructure.image.pillow_converter import PillowImageConverter
from logimage.infrastructure.image.local_file_source import LocalFileImageSource


def _make_icon_bytes() -> bytes:
    """Maison simple : rectangle + triangle sur fond blanc."""
    img = Image.new("RGB", (100, 100), color=(255, 255, 255))
    draw = ImageDraw.Draw(img)
    # Corps de la maison
    draw.rectangle([25, 50, 75, 90], fill=(0, 0, 0))
    # Toit (triangle approximatif)
    draw.polygon([(50, 10), (20, 50), (80, 50)], fill=(0, 0, 0))
    buf = io.BytesIO()
    img.save(buf, format="PNG")
    return buf.getvalue()


def _count_blocks(line: tuple[bool, ...]) -> int:
    blocks = 0
    in_block = False
    for cell in line:
        if cell:
            if not in_block:
                blocks += 1
                in_block = True
        else:
            in_block = False
    return blocks


def test_house_icon_produces_playable_grid() -> None:
    """Une icône maison simple doit donner une grille jouable (≤ 6 blocs)."""
    converter = PillowImageConverter(max_clue_blocks=6)
    grid = converter.to_grid(_make_icon_bytes(), width=20, height=20)

    for i in range(grid.height):
        blocks = _count_blocks(grid.row(i))
        assert blocks <= 6, f"Ligne {i} a {blocks} blocs"

    for j in range(grid.width):
        blocks = _count_blocks(grid.col(j))
        assert blocks <= 6, f"Colonne {j} a {blocks} blocs"


def test_local_source_feeds_converter(tmp_path: Path) -> None:
    """LocalFileImageSource + PillowImageConverter produisent une grille valide."""
    icon_path = tmp_path / "house.png"
    icon_path.write_bytes(_make_icon_bytes())

    source = LocalFileImageSource(icon_path)
    image_data = source.fetch()

    converter = PillowImageConverter(max_clue_blocks=6)
    grid = converter.to_grid(image_data.content, width=15, height=15)

    assert grid.width == 15
    assert grid.height == 15
  • Step 2: Vérifier que les tests passent
pytest tests/infrastructure/test_pipeline_integration.py -v

Attendu : tous PASS.

  • Step 3: Lancer la suite complète
pytest -v

Attendu : tous PASS.

  • Step 4: Commit
git add tests/infrastructure/test_pipeline_integration.py
git commit -m "test: add integration tests for local image pipeline"