hokusai/bulkgen/providers/image.py
Konstantin Fickel cf73511876
Some checks failed
Continuous Integration / Build Package (push) Successful in 41s
Continuous Integration / Lint, Check & Test (push) Failing after 59s
refactor: replace blackforest package with custom async BFL client
Implement bulkgen/providers/bfl.py with a fully async httpx-based client
that supports all current and future BFL models (including flux-2-*).
Remove the blackforest dependency and simplify the image provider by
eliminating the asyncio.to_thread wrapper.
2026-02-14 16:44:36 +01:00

61 lines
1.8 KiB
Python

"""BlackForestLabs image generation provider."""
from __future__ import annotations
import base64
from pathlib import Path
from typing import override
import httpx
from bulkgen.config import TargetConfig
from bulkgen.providers import Provider
from bulkgen.providers.bfl import BFLClient
def _encode_image_b64(path: Path) -> str:
"""Read an image file and return its base64-encoded representation."""
return base64.b64encode(path.read_bytes()).decode("ascii")
class ImageProvider(Provider):
"""Generates images via the BlackForestLabs API."""
_client: BFLClient
def __init__(self, api_key: str) -> None:
self._client = BFLClient(api_key=api_key)
@override
async def generate(
self,
target_name: str,
target_config: TargetConfig,
resolved_prompt: str,
resolved_model: str,
project_dir: Path,
) -> None:
output_path = project_dir / target_name
inputs: dict[str, object] = {"prompt": resolved_prompt}
if target_config.width is not None:
inputs["width"] = target_config.width
if target_config.height is not None:
inputs["height"] = target_config.height
if target_config.reference_image is not None:
ref_path = project_dir / target_config.reference_image
inputs["image_prompt"] = _encode_image_b64(ref_path)
for control_name in target_config.control_images:
ctrl_path = project_dir / control_name
inputs["control_image"] = _encode_image_b64(ctrl_path)
result = await self._client.generate(resolved_model, inputs)
async with httpx.AsyncClient() as http:
response = await http.get(result.sample_url)
_ = response.raise_for_status()
_ = output_path.write_bytes(response.content)