diff --git a/docs/src/content/docs/configuration/invokeai-yaml.mdx b/docs/src/content/docs/configuration/invokeai-yaml.mdx
index 987c8eb98a2..c43b76d82ed 100644
--- a/docs/src/content/docs/configuration/invokeai-yaml.mdx
+++ b/docs/src/content/docs/configuration/invokeai-yaml.mdx
@@ -131,7 +131,7 @@ Available strategies:
| `type` | `outputs/images/general/abc123.png` | Organize images by image category. |
| `hash` | `outputs/images/ab/abc123.png` | Use the first two characters of the image UUID for filesystem performance with large collections. |
-Changing this setting only affects newly-created images. Existing images remain in their current locations.
+Changing this setting only affects newly-created images. Existing images remain in their current locations unless you run [Image Storage Maintenance](/features/image-storage-maintenance/).
#### Logging
diff --git a/docs/src/content/docs/features/gallery.mdx b/docs/src/content/docs/features/gallery.mdx
index 1d490b33d76..989ddf12710 100644
--- a/docs/src/content/docs/features/gallery.mdx
+++ b/docs/src/content/docs/features/gallery.mdx
@@ -57,6 +57,10 @@ Each board has a context menu accessible via right-click (or Ctrl+click).
Deleting a board will **permanently delete all images** contained within it. Proceed with caution!
:::
+### Image Storage Maintenance
+
+Administrators can use [Image Storage Maintenance](/features/image-storage-maintenance/) to move existing image files and thumbnails into the current image subfolder strategy. This is separate from board organization and does not change image names, boards, generation metadata, or gallery records.
+
### Board Contents
Every board is organized into two distinct tabs:
diff --git a/docs/src/content/docs/features/image-storage-maintenance.mdx b/docs/src/content/docs/features/image-storage-maintenance.mdx
new file mode 100644
index 00000000000..180ac6dcc35
--- /dev/null
+++ b/docs/src/content/docs/features/image-storage-maintenance.mdx
@@ -0,0 +1,34 @@
+---
+title: Image Storage Maintenance
+description: Move existing images into the current image subfolder strategy with crash recovery.
+---
+
+InvokeAI can move existing images into the folder layout selected by [`image_subfolder_strategy`](/configuration/invokeai-yaml/#image-subfolder-strategy). This is useful after changing the image subfolder strategy from `flat` to `date`, `type`, or `hash`, or when reorganizing an older image library.
+
+This operation changes where image files and thumbnails are stored on disk. It does not change image names, boards, generation metadata, or gallery records.
+
+## Access
+
+Image storage maintenance is available from the in-application Settings panel.
+
+Administrators can start image moves, force recovery, and inspect the current move status. If multi-user mode is disabled, the single local user has the same access. Non-admin users in multi-user mode cannot run or inspect image move status.
+
+## Maintenance Mode
+
+Image moves run as a maintenance operation. Before a move starts, InvokeAI checks that no queue work is active. While the move is running, InvokeAI prevents image reads, uploads, deletes, generation jobs, and gallery mutations from racing with the filesystem move.
+
+The UI shows the move or recovery state until the job is complete or requires manual attention. Gallery images and thumbnails may be unavailable while maintenance is active.
+
+## Crash Recovery
+
+The move process is crash-recoverable. InvokeAI records each move job in its database before moving files, moves full-size images and thumbnails on disk, and updates `images.image_subfolder` only after the filesystem move succeeds.
+
+If InvokeAI stops during a move, recovery resumes incomplete jobs. If recovery finds an ambiguous filesystem state, such as both the old and new full-size image files existing or neither file existing, it halts the job for manual repair instead of blindly updating the database.
+
+Recovery finalizes jobs that were already recorded before the stop. Images that had not yet been planned into a move job are not moved by recovery itself. The Settings panel reports how many images still need moving; run **Start Move** again to continue.
+
+Missing intermediate image files are treated as already cleaned up and do not halt the move. Missing non-intermediate image files still require operator attention.
+
+For the `date` strategy, existing images are moved according to their original image creation timestamp stored in the database, not according to the time the maintenance job is run.
+
+Empty source directories left behind by successful moves are removed when safe.
diff --git a/invokeai/app/api/dependencies.py b/invokeai/app/api/dependencies.py
index e7468c1bca4..3092f5ab71a 100644
--- a/invokeai/app/api/dependencies.py
+++ b/invokeai/app/api/dependencies.py
@@ -25,6 +25,7 @@
)
from invokeai.app.services.external_generation.startup import sync_configured_external_starter_models
from invokeai.app.services.image_files.image_files_disk import DiskImageFileStorage
+from invokeai.app.services.image_moves.image_moves_default import ImageMoveService
from invokeai.app.services.image_records.image_records_sqlite import SqliteImageRecordStorage
from invokeai.app.services.images.images_default import ImageService
from invokeai.app.services.invocation_cache.invocation_cache_memory import MemoryInvocationCache
@@ -130,6 +131,7 @@ def initialize(
events = FastAPIEventService(event_handler_id, loop=loop)
bulk_download = BulkDownloadService()
image_records = SqliteImageRecordStorage(db=db)
+ image_moves = ImageMoveService(db=db, image_files=image_files, config=configuration, logger=logger)
images = ImageService()
invocation_cache = MemoryInvocationCache(max_cache_size=config.node_cache_size)
tensors = ObjectSerializerForwardCache(
@@ -198,6 +200,7 @@ def initialize(
configuration=configuration,
events=events,
image_files=image_files,
+ image_moves=image_moves,
image_records=image_records,
images=images,
invocation_cache=invocation_cache,
diff --git a/invokeai/app/api/routers/board_images.py b/invokeai/app/api/routers/board_images.py
index f94e4f2437c..ea0273f02d6 100644
--- a/invokeai/app/api/routers/board_images.py
+++ b/invokeai/app/api/routers/board_images.py
@@ -3,6 +3,7 @@
from invokeai.app.api.auth_dependencies import CurrentUserOrDefault
from invokeai.app.api.dependencies import ApiDependencies
+from invokeai.app.api.routers.image_move_maintenance import assert_image_move_maintenance_inactive
from invokeai.app.services.images.images_common import AddImagesToBoardResult, RemoveImagesFromBoardResult
board_images_router = APIRouter(prefix="/v1/board_images", tags=["boards"])
@@ -65,6 +66,7 @@ async def add_image_to_board(
"""Creates a board_image"""
_assert_board_write_access(board_id, current_user)
_assert_image_direct_owner(image_name, current_user)
+ assert_image_move_maintenance_inactive()
try:
added_images: set[str] = set()
affected_boards: set[str] = set()
@@ -100,6 +102,7 @@ async def remove_image_from_board(
old_board_id = ApiDependencies.invoker.services.images.get_dto(image_name).board_id or "none"
if old_board_id != "none":
_assert_board_write_access(old_board_id, current_user)
+ assert_image_move_maintenance_inactive()
removed_images: set[str] = set()
affected_boards: set[str] = set()
ApiDependencies.invoker.services.board_images.remove_image_from_board(image_name=image_name)
@@ -133,6 +136,13 @@ async def add_images_to_board(
) -> AddImagesToBoardResult:
"""Adds a list of images to a board"""
_assert_board_write_access(board_id, current_user)
+ try:
+ assert_image_move_maintenance_inactive()
+ except HTTPException:
+ for image_name in image_names:
+ _assert_image_direct_owner(image_name, current_user)
+ raise
+
try:
added_images: set[str] = set()
affected_boards: set[str] = set()
@@ -178,6 +188,15 @@ async def remove_images_from_board(
image_names: list[str] = Body(description="The names of the images to remove", embed=True),
) -> RemoveImagesFromBoardResult:
"""Removes a list of images from their board, if they had one"""
+ try:
+ assert_image_move_maintenance_inactive()
+ except HTTPException:
+ for image_name in image_names:
+ old_board_id = ApiDependencies.invoker.services.images.get_dto(image_name).board_id or "none"
+ if old_board_id != "none":
+ _assert_board_write_access(old_board_id, current_user)
+ raise
+
try:
removed_images: set[str] = set()
affected_boards: set[str] = set()
diff --git a/invokeai/app/api/routers/boards.py b/invokeai/app/api/routers/boards.py
index 6897e90aff4..067936834ee 100644
--- a/invokeai/app/api/routers/boards.py
+++ b/invokeai/app/api/routers/boards.py
@@ -6,6 +6,7 @@
from invokeai.app.api.auth_dependencies import CurrentUserOrDefault
from invokeai.app.api.dependencies import ApiDependencies
+from invokeai.app.api.routers.image_move_maintenance import assert_image_move_maintenance_inactive
from invokeai.app.services.board_records.board_records_common import BoardChanges, BoardRecordOrderBy, BoardVisibility
from invokeai.app.services.boards.boards_common import BoardDTO
from invokeai.app.services.image_records.image_records_common import ImageCategory
@@ -118,6 +119,7 @@ async def delete_board(
try:
if include_images is True:
+ assert_image_move_maintenance_inactive()
deleted_images = ApiDependencies.invoker.services.board_images.get_all_board_image_names_for_board(
board_id=board_id,
categories=None,
@@ -142,6 +144,8 @@ async def delete_board(
deleted_board_images=deleted_board_images,
deleted_images=[],
)
+ except HTTPException:
+ raise
except Exception:
raise HTTPException(status_code=500, detail="Failed to delete board")
diff --git a/invokeai/app/api/routers/image_move_maintenance.py b/invokeai/app/api/routers/image_move_maintenance.py
new file mode 100644
index 00000000000..2f05492671b
--- /dev/null
+++ b/invokeai/app/api/routers/image_move_maintenance.py
@@ -0,0 +1,17 @@
+from fastapi import HTTPException, status
+
+from invokeai.app.api.dependencies import ApiDependencies
+
+IMAGE_MOVE_MAINTENANCE_ACTIVE_DETAIL = "Image storage maintenance is active"
+
+
+def assert_image_move_maintenance_inactive() -> None:
+ invoker = getattr(ApiDependencies, "invoker", None)
+ if invoker is None:
+ return
+ image_moves = getattr(invoker.services, "image_moves", None)
+ if image_moves is not None and image_moves.is_maintenance_active():
+ raise HTTPException(
+ status_code=status.HTTP_409_CONFLICT,
+ detail=IMAGE_MOVE_MAINTENANCE_ACTIVE_DETAIL,
+ )
diff --git a/invokeai/app/api/routers/image_moves.py b/invokeai/app/api/routers/image_moves.py
new file mode 100644
index 00000000000..0fe328ea9ea
--- /dev/null
+++ b/invokeai/app/api/routers/image_moves.py
@@ -0,0 +1,94 @@
+from fastapi import HTTPException, status
+from fastapi.routing import APIRouter
+from pydantic import BaseModel, Field
+
+from invokeai.app.api.auth_dependencies import AdminUserOrDefault
+from invokeai.app.api.dependencies import ApiDependencies
+from invokeai.app.services.image_moves.image_moves_default import (
+ ImageMoveBackgroundOperation,
+ ImageMoveBackgroundStatus,
+ ImageMoveJob,
+ ImageMoveJobAlreadyRunning,
+ ImageMoveQueueActive,
+ MoveJobState,
+)
+
+image_moves_router = APIRouter(prefix="/v1/image_moves", tags=["image_moves"])
+
+
+class ImageMoveJobResponse(BaseModel):
+ id: int = Field(description="The image move job id.")
+ state: MoveJobState = Field(description="The image move job state.")
+ error_message: str | None = Field(default=None, description="The last error recorded for the job, if any.")
+
+
+class ImageMoveStatusResponse(BaseModel):
+ is_running: bool = Field(description="Whether an image move background operation is currently running.")
+ operation: ImageMoveBackgroundOperation | None = Field(
+ default=None, description="The active background operation, if any."
+ )
+ active_job_id: int | None = Field(default=None, description="The active journal job id, if any.")
+ latest_job: ImageMoveJobResponse | None = Field(default=None, description="The latest journal job, if any.")
+ last_error: str | None = Field(default=None, description="The last background worker error, if any.")
+ needs_move_count: int = Field(description="The number of images that do not match the current subfolder strategy.")
+
+
+def _get_image_move_service():
+ image_moves = getattr(ApiDependencies.invoker.services, "image_moves", None)
+ if image_moves is None:
+ raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Image move service unavailable")
+ return image_moves
+
+
+def _job_to_response(job: ImageMoveJob | None) -> ImageMoveJobResponse | None:
+ if job is None:
+ return None
+ return ImageMoveJobResponse(id=job.id, state=job.state, error_message=job.error_message)
+
+
+def _status_to_response(service_status: ImageMoveBackgroundStatus | dict) -> ImageMoveStatusResponse:
+ if isinstance(service_status, dict):
+ return ImageMoveStatusResponse(**service_status)
+ return ImageMoveStatusResponse(
+ is_running=service_status.is_running,
+ operation=service_status.operation,
+ active_job_id=service_status.active_job_id,
+ latest_job=_job_to_response(service_status.latest_job),
+ last_error=service_status.last_error,
+ needs_move_count=service_status.needs_move_count,
+ )
+
+
+@image_moves_router.post(
+ "/start",
+ operation_id="start_image_move",
+ response_model=ImageMoveStatusResponse,
+ status_code=status.HTTP_202_ACCEPTED,
+)
+async def start_image_move(_: AdminUserOrDefault) -> ImageMoveStatusResponse:
+ try:
+ return _status_to_response(_get_image_move_service().start_background_move_all())
+ except (ImageMoveJobAlreadyRunning, ImageMoveQueueActive) as e:
+ raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) from e
+
+
+@image_moves_router.post(
+ "/recover",
+ operation_id="start_image_move_recovery",
+ response_model=ImageMoveStatusResponse,
+ status_code=status.HTTP_202_ACCEPTED,
+)
+async def start_image_move_recovery(_: AdminUserOrDefault) -> ImageMoveStatusResponse:
+ try:
+ return _status_to_response(_get_image_move_service().start_background_recovery())
+ except ImageMoveJobAlreadyRunning as e:
+ raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) from e
+
+
+@image_moves_router.get(
+ "/status",
+ operation_id="get_image_move_status",
+ response_model=ImageMoveStatusResponse,
+)
+async def get_image_move_status(_: AdminUserOrDefault) -> ImageMoveStatusResponse:
+ return _status_to_response(_get_image_move_service().get_background_status())
diff --git a/invokeai/app/api/routers/images.py b/invokeai/app/api/routers/images.py
index 976434c68f2..fb756ab8812 100644
--- a/invokeai/app/api/routers/images.py
+++ b/invokeai/app/api/routers/images.py
@@ -21,6 +21,7 @@
from invokeai.app.api.routers._access import (
assert_image_read_access as _assert_image_read_access,
)
+from invokeai.app.api.routers.image_move_maintenance import assert_image_move_maintenance_inactive
from invokeai.app.invocations.fields import MetadataField
from invokeai.app.services.image_records.image_records_common import (
ImageCategory,
@@ -108,6 +109,8 @@ async def upload_image(
):
raise HTTPException(status_code=403, detail="Not authorized to upload to this board")
+ assert_image_move_maintenance_inactive()
+
if not file.content_type or not file.content_type.startswith("image"):
raise HTTPException(status_code=415, detail="Not an image")
@@ -195,6 +198,7 @@ async def delete_image(
) -> DeleteImagesResult:
"""Deletes an image"""
_assert_image_owner(image_name, current_user)
+ assert_image_move_maintenance_inactive()
deleted_images: set[str] = set()
affected_boards: set[str] = set()
@@ -222,6 +226,7 @@ async def clear_intermediates(
"""Clears all intermediates. Requires admin."""
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Only admins can clear all intermediates")
+ assert_image_move_maintenance_inactive()
try:
count_deleted = ApiDependencies.invoker.services.images.delete_intermediates()
@@ -255,6 +260,7 @@ async def update_image(
) -> ImageDTO:
"""Updates an image"""
_assert_image_owner(image_name, current_user)
+ assert_image_move_maintenance_inactive()
try:
return ApiDependencies.invoker.services.images.update(image_name, image_changes)
@@ -311,6 +317,7 @@ async def get_image_workflow(
image_name: str = Path(description="The name of image whose workflow to get"),
) -> WorkflowAndGraphResponse:
_assert_image_read_access(image_name, current_user)
+ assert_image_move_maintenance_inactive()
try:
workflow = ApiDependencies.invoker.services.images.get_workflow(image_name)
@@ -351,8 +358,11 @@ async def get_image_full(
This endpoint is intentionally unauthenticated because browsers load images
via
tags which cannot send Bearer tokens. Image names are UUIDs,
- providing security through unguessability.
+ providing security through unguessability. Returns 409 while image storage
+ maintenance is active.
"""
+ assert_image_move_maintenance_inactive()
+
try:
path = ApiDependencies.invoker.services.images.get_path(image_name)
with open(path, "rb") as f:
@@ -384,8 +394,11 @@ async def get_image_thumbnail(
This endpoint is intentionally unauthenticated because browsers load images
via
tags which cannot send Bearer tokens. Image names are UUIDs,
- providing security through unguessability.
+ providing security through unguessability. Returns 409 while image storage
+ maintenance is active.
"""
+ assert_image_move_maintenance_inactive()
+
try:
path = ApiDependencies.invoker.services.images.get_path(image_name, thumbnail=True)
with open(path, "rb") as f:
@@ -469,6 +482,13 @@ async def delete_images_from_list(
current_user: CurrentUserOrDefault,
image_names: list[str] = Body(description="The list of names of images to delete", embed=True),
) -> DeleteImagesResult:
+ try:
+ assert_image_move_maintenance_inactive()
+ except HTTPException:
+ for image_name in image_names:
+ _assert_image_owner(image_name, current_user)
+ raise
+
try:
deleted_images: set[str] = set()
affected_boards: set[str] = set()
@@ -499,6 +519,7 @@ async def delete_uncategorized_images(
current_user: CurrentUserOrDefault,
) -> DeleteImagesResult:
"""Deletes all uncategorized images owned by the current user (or all if admin)"""
+ assert_image_move_maintenance_inactive()
image_names = ApiDependencies.invoker.services.board_images.get_all_board_image_names_for_board(
board_id="none", categories=None, is_intermediate=None
@@ -535,6 +556,13 @@ async def star_images_in_list(
current_user: CurrentUserOrDefault,
image_names: list[str] = Body(description="The list of names of images to star", embed=True),
) -> StarredImagesResult:
+ try:
+ assert_image_move_maintenance_inactive()
+ except HTTPException:
+ for image_name in image_names:
+ _assert_image_owner(image_name, current_user)
+ raise
+
try:
starred_images: set[str] = set()
affected_boards: set[str] = set()
@@ -565,6 +593,13 @@ async def unstar_images_in_list(
current_user: CurrentUserOrDefault,
image_names: list[str] = Body(description="The list of names of images to unstar", embed=True),
) -> UnstarredImagesResult:
+ try:
+ assert_image_move_maintenance_inactive()
+ except HTTPException:
+ for image_name in image_names:
+ _assert_image_owner(image_name, current_user)
+ raise
+
try:
unstarred_images: set[str] = set()
affected_boards: set[str] = set()
@@ -624,6 +659,8 @@ async def download_images_from_list(
for name in image_names:
_assert_image_read_access(name, current_user)
+ assert_image_move_maintenance_inactive()
+
bulk_download_item_id: str = ApiDependencies.invoker.services.bulk_download.generate_item_id(board_id)
background_tasks.add_task(
diff --git a/invokeai/app/api/routers/recall_parameters.py b/invokeai/app/api/routers/recall_parameters.py
index 31120d59a02..ec3c93e1ade 100644
--- a/invokeai/app/api/routers/recall_parameters.py
+++ b/invokeai/app/api/routers/recall_parameters.py
@@ -9,6 +9,7 @@
from invokeai.app.api.auth_dependencies import CurrentUserOrDefault
from invokeai.app.api.dependencies import ApiDependencies
+from invokeai.app.api.routers.image_move_maintenance import assert_image_move_maintenance_inactive
from invokeai.backend.image_util.controlnet_processor import process_controlnet_image
from invokeai.backend.model_manager.taxonomy import ModelType
@@ -437,9 +438,10 @@ async def update_recall_parameters(
"""
logger = ApiDependencies.invoker.services.logger
- # Validate image access before processing — prevents information leakage
+ # Validate image access before processing - prevents information leakage
# (dimensions) and derived-image minting via ControlNet preprocessors.
_assert_recall_image_access(parameters, current_user)
+ assert_image_move_maintenance_inactive()
try:
# In strict mode, include all parameters so the frontend clears anything
diff --git a/invokeai/app/api/routers/session_queue.py b/invokeai/app/api/routers/session_queue.py
index 41a5a411c7a..841ec2b9ffb 100644
--- a/invokeai/app/api/routers/session_queue.py
+++ b/invokeai/app/api/routers/session_queue.py
@@ -6,6 +6,7 @@
from invokeai.app.api.auth_dependencies import AdminUserOrDefault, CurrentUserOrDefault
from invokeai.app.api.dependencies import ApiDependencies
+from invokeai.app.api.routers.image_move_maintenance import assert_image_move_maintenance_inactive
from invokeai.app.services.session_processor.session_processor_common import SessionProcessorStatus
from invokeai.app.services.session_queue.session_queue_common import (
Batch,
@@ -97,6 +98,8 @@ async def enqueue_batch(
prepend: bool = Body(default=False, description="Whether or not to prepend this batch in the queue"),
) -> EnqueueBatchResult:
"""Processes a batch and enqueues the output graphs for execution for the current user."""
+ assert_image_move_maintenance_inactive()
+
try:
return await ApiDependencies.invoker.services.session_queue.enqueue_batch(
queue_id=queue_id, batch=batch, prepend=prepend, user_id=current_user.user_id
diff --git a/invokeai/app/api/routers/utilities.py b/invokeai/app/api/routers/utilities.py
index 568546603ab..b86db5fd31c 100644
--- a/invokeai/app/api/routers/utilities.py
+++ b/invokeai/app/api/routers/utilities.py
@@ -15,6 +15,7 @@
from invokeai.app.api.auth_dependencies import CurrentUserOrDefault
from invokeai.app.api.dependencies import ApiDependencies
from invokeai.app.api.routers._access import assert_image_read_access
+from invokeai.app.api.routers.image_move_maintenance import assert_image_move_maintenance_inactive
from invokeai.app.services.image_files.image_files_common import ImageFileNotFoundException
from invokeai.app.services.model_records.model_records_base import UnknownModelException
from invokeai.backend.llava_onevision_pipeline import LlavaOnevisionPipeline
@@ -204,6 +205,8 @@ def _run_image_to_prompt(image_name: str, model_key: str, instruction: str) -> s
)
async def image_to_prompt(current_user: CurrentUserOrDefault, body: ImageToPromptRequest) -> ImageToPromptResponse:
"""Generate a descriptive prompt from an image using a vision-language model."""
+ assert_image_move_maintenance_inactive()
+
# Reuse the image-read access check so non-owners can't probe stored images
# via this endpoint (mirrors the policy in routers/images.py).
assert_image_read_access(body.image_name, current_user)
diff --git a/invokeai/app/api_app.py b/invokeai/app/api_app.py
index 4b79e1eeb0c..ed02d75eae3 100644
--- a/invokeai/app/api_app.py
+++ b/invokeai/app/api_app.py
@@ -23,6 +23,7 @@
client_state,
custom_nodes,
download_queue,
+ image_moves,
images,
model_manager,
model_relationships,
@@ -176,6 +177,7 @@ async def dispatch(self, request: Request, call_next: RequestResponseEndpoint):
app.include_router(utilities.utilities_router, prefix="/api")
app.include_router(model_manager.model_manager_router, prefix="/api")
app.include_router(download_queue.download_queue_router, prefix="/api")
+app.include_router(image_moves.image_moves_router, prefix="/api")
app.include_router(images.images_router, prefix="/api")
app.include_router(boards.boards_router, prefix="/api")
app.include_router(board_images.board_images_router, prefix="/api")
diff --git a/invokeai/app/services/image_files/image_files_base.py b/invokeai/app/services/image_files/image_files_base.py
index 7464cd7941d..764efd8833c 100644
--- a/invokeai/app/services/image_files/image_files_base.py
+++ b/invokeai/app/services/image_files/image_files_base.py
@@ -18,6 +18,23 @@ def get_path(self, image_name: str, thumbnail: bool = False, image_subfolder: st
"""Gets the internal path to an image or thumbnail."""
pass
+ @property
+ @abstractmethod
+ def image_root(self) -> Path:
+ """Gets the root directory for full-size images."""
+ pass
+
+ @property
+ @abstractmethod
+ def thumbnail_root(self) -> Path:
+ """Gets the root directory for thumbnails."""
+ pass
+
+ @abstractmethod
+ def evict_cache_paths(self, paths: list[Path]) -> None:
+ """Evicts any cached image objects for the provided paths."""
+ pass
+
# TODO: We need to validate paths before starlette makes the FileResponse, else we get a
# 500 internal server error. I don't like having this method on the service.
@abstractmethod
diff --git a/invokeai/app/services/image_files/image_files_disk.py b/invokeai/app/services/image_files/image_files_disk.py
index 12b737a7cf1..c54f767b164 100644
--- a/invokeai/app/services/image_files/image_files_disk.py
+++ b/invokeai/app/services/image_files/image_files_disk.py
@@ -32,6 +32,18 @@ def __init__(self, output_folder: Union[str, Path]):
def start(self, invoker: Invoker) -> None:
self.__invoker = invoker
+ @property
+ def image_root(self) -> Path:
+ return self.__output_folder.resolve()
+
+ @property
+ def thumbnail_root(self) -> Path:
+ return self.__thumbnails_folder.resolve()
+
+ def evict_cache_paths(self, paths: list[Path]) -> None:
+ for path in paths:
+ self.__cache.pop(path.resolve(), None)
+
def get(self, image_name: str, image_subfolder: str = "") -> PILImageType:
try:
image_path = self.get_path(image_name, image_subfolder=image_subfolder)
@@ -85,8 +97,7 @@ def save(
compress_level=self.__invoker.services.configuration.pil_compress_level,
)
- thumbnail_name = get_thumbnail_name(image_name)
- thumbnail_path = self.get_path(thumbnail_name, thumbnail=True, image_subfolder=image_subfolder)
+ thumbnail_path = self.get_path(image_name, thumbnail=True, image_subfolder=image_subfolder)
# Ensure thumbnail subfolder directories exist
thumbnail_path.parent.mkdir(parents=True, exist_ok=True)
@@ -108,8 +119,7 @@ def delete(self, image_name: str, image_subfolder: str = "") -> None:
if image_path in self.__cache:
del self.__cache[image_path]
- thumbnail_name = get_thumbnail_name(image_name)
- thumbnail_path = self.get_path(thumbnail_name, True, image_subfolder=image_subfolder)
+ thumbnail_path = self.get_path(image_name, True, image_subfolder=image_subfolder)
if thumbnail_path.exists():
thumbnail_path.unlink()
diff --git a/invokeai/app/services/image_moves/__init__.py b/invokeai/app/services/image_moves/__init__.py
new file mode 100644
index 00000000000..69defbd891e
--- /dev/null
+++ b/invokeai/app/services/image_moves/__init__.py
@@ -0,0 +1,3 @@
+from invokeai.app.services.image_moves.image_moves_default import ImageMoveService
+
+__all__ = ["ImageMoveService"]
diff --git a/invokeai/app/services/image_moves/image_moves_default.py b/invokeai/app/services/image_moves/image_moves_default.py
new file mode 100644
index 00000000000..1ced4ef1b4e
--- /dev/null
+++ b/invokeai/app/services/image_moves/image_moves_default.py
@@ -0,0 +1,816 @@
+import os
+import tempfile
+import threading
+from concurrent.futures import Future, ThreadPoolExecutor
+from dataclasses import dataclass
+from datetime import datetime
+from pathlib import Path
+from typing import Literal, Sequence, cast
+
+from PIL import Image
+
+from invokeai.app.services.config import InvokeAIAppConfig
+from invokeai.app.services.image_files.image_files_base import ImageFileStorageBase
+from invokeai.app.services.image_records.image_records_common import ImageCategory
+from invokeai.app.services.session_queue.session_queue_common import DEFAULT_QUEUE_ID
+from invokeai.app.services.shared.sqlite.sqlite_database import SqliteDatabase
+from invokeai.app.util.thumbnails import make_thumbnail
+
+MoveJobState = Literal["planned", "moving", "moved", "committed", "error"]
+ImageMoveBackgroundOperation = Literal["move_all", "recovery"]
+
+
+@dataclass(frozen=True)
+class PlannedImageMove:
+ image_name: str
+ old_subfolder: str
+ new_subfolder: str
+ is_intermediate: bool
+ old_path: Path
+ new_path: Path
+ old_thumbnail_path: Path
+ new_thumbnail_path: Path
+
+
+@dataclass(frozen=True)
+class ImageMoveJob:
+ id: int
+ state: MoveJobState
+ error_message: str | None
+
+
+@dataclass(frozen=True)
+class ImageMoveResult:
+ planned: int = 0
+ committed: int = 0
+ errors: int = 0
+
+
+@dataclass(frozen=True)
+class ImageMoveBackgroundStatus:
+ is_running: bool
+ operation: ImageMoveBackgroundOperation | None
+ active_job_id: int | None
+ latest_job: ImageMoveJob | None
+ last_error: str | None
+ needs_move_count: int
+
+
+class ImageMoveJobAlreadyRunning(Exception):
+ pass
+
+
+class ImageMoveQueueActive(Exception):
+ pass
+
+
+class ImageMoveService:
+ def __init__(
+ self,
+ db: SqliteDatabase,
+ image_files: ImageFileStorageBase,
+ config: InvokeAIAppConfig,
+ logger,
+ ) -> None:
+ self._db = db
+ self.image_files = image_files
+ self._config = config
+ self._logger = logger
+ self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="image-move")
+ self._future_lock = threading.Lock()
+ self._future: Future | None = None
+ self._future_operation: ImageMoveBackgroundOperation | None = None
+ self._last_background_error: str | None = None
+ self._invoker = None
+ self._session_queue = None
+
+ def start(self, invoker) -> None:
+ self._invoker = invoker
+ self._session_queue = getattr(invoker.services, "session_queue", None)
+ result = self.startup_recovery()
+ if result.committed > 0 or result.errors > 0:
+ self._logger.info(
+ "Image move startup recovery completed: committed=%s, errors=%s",
+ result.committed,
+ result.errors,
+ )
+
+ def set_session_queue(self, session_queue) -> None:
+ self._session_queue = session_queue
+
+ def stop(self, *args, **kwargs) -> None:
+ self._executor.shutdown(wait=True, cancel_futures=False)
+
+ def start_background_move_all(self) -> ImageMoveBackgroundStatus:
+ return self._start_background_operation("move_all", self.move_all_images, require_idle_queue=True)
+
+ def start_background_recovery(self) -> ImageMoveBackgroundStatus:
+ return self._start_background_operation("recovery", self.startup_recovery)
+
+ def get_background_status(self) -> ImageMoveBackgroundStatus:
+ with self._future_lock:
+ self._refresh_finished_future_locked()
+ return self._build_background_status_locked()
+
+ def is_maintenance_active(self) -> bool:
+ with self._future_lock:
+ self._refresh_finished_future_locked()
+ is_running = self._future is not None and not self._future.done()
+ operation_reserved = self._future_operation is not None
+ return operation_reserved or is_running or self._get_active_job_id() is not None
+
+ def _assert_no_active_queue_work(self) -> None:
+ session_queue = self._session_queue
+ if session_queue is None and self._invoker is not None:
+ session_queue = getattr(self._invoker.services, "session_queue", None)
+ if session_queue is None:
+ return
+ queue_status = session_queue.get_queue_status(DEFAULT_QUEUE_ID)
+ if queue_status.pending > 0 or queue_status.in_progress > 0:
+ raise ImageMoveQueueActive("Cannot start image move while queue work is active")
+
+ def _start_background_operation(
+ self,
+ operation: ImageMoveBackgroundOperation,
+ target,
+ require_idle_queue: bool = False,
+ ) -> ImageMoveBackgroundStatus:
+ with self._future_lock:
+ self._refresh_finished_future_locked()
+ if self._future_operation is not None or (self._future is not None and not self._future.done()):
+ raise ImageMoveJobAlreadyRunning("An image move job is already running")
+ active_job_id = self._get_active_job_id()
+ if operation != "recovery" and active_job_id is not None:
+ raise ImageMoveJobAlreadyRunning("An image move job is already active")
+ self._last_background_error = None
+ self._future_operation = operation
+
+ try:
+ if require_idle_queue:
+ self._assert_no_active_queue_work()
+ future = self._executor.submit(self._run_background_operation, operation, target)
+ except Exception:
+ with self._future_lock:
+ self._future_operation = None
+ raise
+
+ with self._future_lock:
+ self._future = future
+ return self._build_background_status_locked()
+
+ def _run_background_operation(self, operation: ImageMoveBackgroundOperation, target) -> None:
+ try:
+ target()
+ except Exception as e:
+ self._record_background_error(str(e))
+ self._logger.exception("Image move background operation failed: %s", operation)
+
+ def _record_background_error(self, message: str) -> None:
+ with self._future_lock:
+ self._last_background_error = message
+ active_job_id = self._get_active_job_id()
+ if active_job_id is not None:
+ try:
+ self.record_job_error_message(active_job_id, message)
+ except Exception:
+ self._logger.exception("Failed to record image move background error on active job")
+
+ def _refresh_finished_future_locked(self) -> None:
+ if self._future is None or not self._future.done():
+ return
+ self._future = None
+ self._future_operation = None
+
+ def _build_background_status_locked(self) -> ImageMoveBackgroundStatus:
+ latest_job = self.get_latest_job()
+ return ImageMoveBackgroundStatus(
+ is_running=self._future is not None and not self._future.done(),
+ operation=self._future_operation,
+ active_job_id=self._get_active_job_id(),
+ latest_job=latest_job,
+ last_error=self._last_background_error,
+ needs_move_count=self.count_images_needing_move(),
+ )
+
+ def move_all_images(self) -> ImageMoveResult:
+ recovered = self.startup_recovery()
+ last_image_name = ""
+ planned = 0
+ committed = recovered.committed
+ errors = recovered.errors
+
+ while True:
+ moves, plan_errors = self._plan_batch(
+ last_image_name=last_image_name, limit=100, record_missing_errors=True
+ )
+ errors += plan_errors
+ if not moves:
+ next_name = self._next_image_name(last_image_name)
+ if next_name is None:
+ break
+ last_image_name = next_name
+ continue
+
+ job_id = self.create_move_job(moves)
+ planned += len(moves)
+ try:
+ self.perform_filesystem_moves(job_id)
+ self.commit_database_updates(job_id)
+ committed += len(moves)
+ except Exception as e:
+ errors += 1
+ self.record_job_error_message(job_id, str(e))
+ raise
+ last_image_name = moves[-1].image_name
+
+ return ImageMoveResult(planned=planned, committed=committed, errors=errors)
+
+ def startup_recovery(self) -> ImageMoveResult:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ """--sql
+ SELECT id FROM image_subfolder_move_jobs
+ WHERE state IN ('planned', 'moving', 'moved')
+ ORDER BY id;
+ """
+ )
+ job_ids = [cast(int, row[0]) for row in cursor.fetchall()]
+
+ committed = 0
+ errors = 0
+ for job_id in job_ids:
+ try:
+ self.complete_partial_filesystem_moves(job_id)
+ self.cleanup_empty_source_dirs(job_id)
+ self.commit_database_updates(job_id)
+ committed += len(self._get_items(job_id))
+ except Exception as e:
+ errors += 1
+ if self._is_unrecoverable_error(e):
+ self.mark_job_unrecoverable(job_id, str(e))
+ else:
+ self.record_job_error_message(job_id, str(e))
+ return ImageMoveResult(committed=committed, errors=errors)
+
+ def plan_batch(self, last_image_name: str, limit: int) -> list[PlannedImageMove]:
+ moves, _errors = self._plan_batch(last_image_name=last_image_name, limit=limit, record_missing_errors=False)
+ return moves
+
+ def count_images_needing_move(self) -> int:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ """--sql
+ SELECT image_name, image_subfolder, image_category, is_intermediate, created_at
+ FROM images
+ WHERE deleted_at IS NULL;
+ """
+ )
+ rows = cursor.fetchall()
+
+ count = 0
+ for row in rows:
+ old_subfolder = cast(str, row["image_subfolder"] or "")
+ new_subfolder = self._get_new_subfolder(
+ image_name=cast(str, row["image_name"]),
+ image_category=ImageCategory(row["image_category"]),
+ is_intermediate=bool(row["is_intermediate"]),
+ created_at=row["created_at"],
+ )
+ if new_subfolder != old_subfolder:
+ count += 1
+ return count
+
+ def _plan_batch(
+ self, last_image_name: str, limit: int, record_missing_errors: bool
+ ) -> tuple[list[PlannedImageMove], int]:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ """--sql
+ SELECT image_name, image_subfolder, image_category, is_intermediate, created_at
+ FROM images
+ WHERE image_name > ?
+ AND deleted_at IS NULL
+ ORDER BY image_name
+ LIMIT ?;
+ """,
+ (last_image_name, limit),
+ )
+ rows = cursor.fetchall()
+
+ moves: list[PlannedImageMove] = []
+ for row in rows:
+ image_name = cast(str, row["image_name"])
+ old_subfolder = cast(str, row["image_subfolder"] or "")
+ new_subfolder = self._get_new_subfolder(
+ image_name=image_name,
+ image_category=ImageCategory(row["image_category"]),
+ is_intermediate=bool(row["is_intermediate"]),
+ created_at=row["created_at"],
+ )
+ if new_subfolder == old_subfolder:
+ continue
+ moves.append(
+ PlannedImageMove(
+ image_name=image_name,
+ old_subfolder=old_subfolder,
+ new_subfolder=new_subfolder,
+ is_intermediate=bool(row["is_intermediate"]),
+ old_path=self.image_files.get_path(image_name, image_subfolder=old_subfolder),
+ new_path=self.image_files.get_path(image_name, image_subfolder=new_subfolder),
+ old_thumbnail_path=self.image_files.get_path(
+ image_name, thumbnail=True, image_subfolder=old_subfolder
+ ),
+ new_thumbnail_path=self.image_files.get_path(
+ image_name, thumbnail=True, image_subfolder=new_subfolder
+ ),
+ )
+ )
+ errors = 0
+ if record_missing_errors:
+ moves, errors = self._record_missing_source_errors(moves)
+ self.preflight_moves(moves)
+ return moves, errors
+
+ def create_move_job(self, moves: Sequence[PlannedImageMove]) -> int:
+ if not moves:
+ raise ValueError("Cannot create an image move job with no items")
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ """--sql
+ SELECT 1
+ FROM image_subfolder_move_jobs
+ WHERE state NOT IN ('committed', 'error')
+ LIMIT 1;
+ """
+ )
+ if cursor.fetchone() is not None:
+ raise ValueError("Cannot create image move job while another active image move job exists")
+ cursor.execute("INSERT INTO image_subfolder_move_jobs (state) VALUES ('planned');")
+ job_id = cast(int, cursor.lastrowid)
+ cursor.executemany(
+ """--sql
+ INSERT INTO image_subfolder_move_items (
+ job_id,
+ image_name,
+ old_subfolder,
+ new_subfolder,
+ is_intermediate,
+ old_path,
+ new_path,
+ old_thumbnail_path,
+ new_thumbnail_path,
+ state
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'planned');
+ """,
+ [
+ (
+ job_id,
+ move.image_name,
+ move.old_subfolder,
+ move.new_subfolder,
+ int(move.is_intermediate),
+ str(move.old_path),
+ str(move.new_path),
+ str(move.old_thumbnail_path),
+ str(move.new_thumbnail_path),
+ )
+ for move in moves
+ ],
+ )
+ return job_id
+
+ def create_error_move_job(self, move: PlannedImageMove, message: str) -> int:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ "INSERT INTO image_subfolder_move_jobs (state, error_message) VALUES ('error', ?);",
+ (message,),
+ )
+ job_id = cast(int, cursor.lastrowid)
+ cursor.execute(
+ """--sql
+ INSERT INTO image_subfolder_move_items (
+ job_id,
+ image_name,
+ old_subfolder,
+ new_subfolder,
+ is_intermediate,
+ old_path,
+ new_path,
+ old_thumbnail_path,
+ new_thumbnail_path,
+ state,
+ error_message
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'error', ?);
+ """,
+ (
+ job_id,
+ move.image_name,
+ move.old_subfolder,
+ move.new_subfolder,
+ int(move.is_intermediate),
+ str(move.old_path),
+ str(move.new_path),
+ str(move.old_thumbnail_path),
+ str(move.new_thumbnail_path),
+ message,
+ ),
+ )
+ return job_id
+
+ def preflight_moves(self, moves: Sequence[PlannedImageMove]) -> None:
+ destinations: set[Path] = set()
+ thumbnail_destinations: set[Path] = set()
+ for move in moves:
+ if not move.old_path.exists():
+ if not move.is_intermediate:
+ raise FileNotFoundError(f"Source image does not exist: {move.old_path}")
+ continue
+ if move.new_path.exists():
+ raise FileExistsError(f"Destination image already exists: {move.new_path}")
+ if move.old_path == move.new_path:
+ raise ValueError(f"Old and new paths are identical for {move.image_name}")
+ if move.new_path in destinations:
+ raise ValueError(f"Duplicate destination path: {move.new_path}")
+ destinations.add(move.new_path)
+ if move.new_thumbnail_path in thumbnail_destinations:
+ raise ValueError(f"Duplicate destination thumbnail path: {move.new_thumbnail_path}")
+ thumbnail_destinations.add(move.new_thumbnail_path)
+ if self._has_active_job_for_image(move.image_name):
+ raise ValueError(f"Image {move.image_name} already has an active image move job")
+ self._assert_same_filesystem(move.old_path, move.new_path)
+ if move.old_thumbnail_path.exists():
+ if move.new_thumbnail_path.exists():
+ raise FileExistsError(f"Destination thumbnail already exists: {move.new_thumbnail_path}")
+ self._assert_same_filesystem(move.old_thumbnail_path, move.new_thumbnail_path)
+
+ def _record_missing_source_errors(self, moves: Sequence[PlannedImageMove]) -> tuple[list[PlannedImageMove], int]:
+ remaining_moves: list[PlannedImageMove] = []
+ errors = 0
+ for move in moves:
+ if move.old_path.exists() or move.is_intermediate:
+ remaining_moves.append(move)
+ continue
+ message = f"Source image does not exist: {move.old_path}"
+ self.create_error_move_job(move, message)
+ self._logger.error(message)
+ errors += 1
+ return remaining_moves, errors
+
+ def perform_filesystem_moves(self, job_id: int) -> None:
+ self._set_job_state(job_id, "moving")
+ self.complete_partial_filesystem_moves(job_id)
+ self.cleanup_empty_source_dirs(job_id)
+ self._set_job_state(job_id, "moved")
+
+ def complete_partial_filesystem_moves(self, job_id: int) -> None:
+ items = self._get_items(job_id)
+ if not items:
+ raise ValueError(f"Image move job {job_id} has no items")
+ for item in items:
+ old_path = self.image_files.get_path(item.image_name, image_subfolder=item.old_subfolder)
+ new_path = self.image_files.get_path(item.image_name, image_subfolder=item.new_subfolder)
+ old_thumbnail_path = self.image_files.get_path(
+ item.image_name, thumbnail=True, image_subfolder=item.old_subfolder
+ )
+ new_thumbnail_path = self.image_files.get_path(
+ item.image_name, thumbnail=True, image_subfolder=item.new_subfolder
+ )
+ old_exists = old_path.exists()
+ new_exists = new_path.exists()
+ if old_exists and new_exists:
+ raise RuntimeError(f"Both old and new image files exist for {item.image_name}")
+ if not old_exists and not new_exists:
+ if item.is_intermediate:
+ self._mark_missing_intermediate_moved(
+ job_id=job_id,
+ image_name=item.image_name,
+ old_path=old_path,
+ new_path=new_path,
+ old_thumbnail_path=old_thumbnail_path,
+ new_thumbnail_path=new_thumbnail_path,
+ )
+ continue
+ raise RuntimeError(f"Neither old nor new image file exists for {item.image_name}")
+ if old_exists:
+ new_path.parent.mkdir(parents=True, exist_ok=True)
+ os.replace(old_path, new_path)
+ self._fsync_file(new_path)
+ self._fsync_dir(new_path.parent)
+ self._fsync_dir(old_path.parent)
+
+ old_thumbnail_exists = old_thumbnail_path.exists()
+ new_thumbnail_exists = new_thumbnail_path.exists()
+ if old_thumbnail_exists and new_thumbnail_exists:
+ self._regenerate_thumbnail(new_path, new_thumbnail_path)
+ old_thumbnail_path.unlink()
+ self._fsync_dir(old_thumbnail_path.parent)
+ elif old_thumbnail_exists and not new_thumbnail_exists:
+ new_thumbnail_path.parent.mkdir(parents=True, exist_ok=True)
+ os.replace(old_thumbnail_path, new_thumbnail_path)
+ self._fsync_file(new_thumbnail_path)
+ self._fsync_dir(new_thumbnail_path.parent)
+ self._fsync_dir(old_thumbnail_path.parent)
+ elif not new_thumbnail_exists:
+ self._regenerate_thumbnail(new_path, new_thumbnail_path)
+
+ self.image_files.evict_cache_paths([old_path, new_path, old_thumbnail_path, new_thumbnail_path])
+ self.mark_item_moved(job_id, item.image_name)
+
+ def cleanup_empty_source_dirs(self, job_id: int) -> None:
+ for item in self._get_items(job_id):
+ self._remove_empty_parents(
+ self.image_files.get_path(item.image_name, image_subfolder=item.old_subfolder).parent,
+ self.image_files.image_root,
+ )
+ self._remove_empty_parents(
+ self.image_files.get_path(item.image_name, thumbnail=True, image_subfolder=item.old_subfolder).parent,
+ self.image_files.thumbnail_root,
+ )
+
+ def commit_database_updates(self, job_id: int) -> None:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ """--sql
+ UPDATE images
+ SET image_subfolder = (
+ SELECT item.new_subfolder
+ FROM image_subfolder_move_items AS item
+ WHERE item.job_id = ?
+ AND item.image_name = images.image_name
+ )
+ WHERE image_name IN (
+ SELECT image_name
+ FROM image_subfolder_move_items
+ WHERE job_id = ?
+ AND state = 'moved'
+ )
+ AND image_subfolder = (
+ SELECT item.old_subfolder
+ FROM image_subfolder_move_items AS item
+ WHERE item.job_id = ?
+ AND item.image_name = images.image_name
+ );
+ """,
+ (job_id, job_id, job_id),
+ )
+ cursor.execute(
+ """--sql
+ SELECT COUNT(*)
+ FROM image_subfolder_move_items AS item
+ LEFT JOIN images ON images.image_name = item.image_name
+ WHERE item.job_id = ?
+ AND (
+ images.image_name IS NULL
+ OR images.deleted_at IS NOT NULL
+ OR images.image_subfolder != item.new_subfolder
+ );
+ """,
+ (job_id,),
+ )
+ invalid_count = cast(int, cursor.fetchone()[0])
+ if invalid_count:
+ raise RuntimeError(f"Image move job {job_id} failed commit validation")
+ cursor.execute(
+ "UPDATE image_subfolder_move_items SET state = 'committed' WHERE job_id = ?;",
+ (job_id,),
+ )
+ cursor.execute(
+ "UPDATE image_subfolder_move_jobs SET state = 'committed', error_message = NULL WHERE id = ?;",
+ (job_id,),
+ )
+
+ def mark_item_moved(self, job_id: int, image_name: str) -> None:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ "UPDATE image_subfolder_move_items SET state = 'moved' WHERE job_id = ? AND image_name = ?;",
+ (job_id, image_name),
+ )
+
+ def record_job_error_message(self, job_id: int, message: str) -> None:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ "UPDATE image_subfolder_move_jobs SET error_message = ? WHERE id = ?;",
+ (message, job_id),
+ )
+
+ def mark_job_unrecoverable(self, job_id: int, message: str) -> None:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ "UPDATE image_subfolder_move_jobs SET state = 'error', error_message = ? WHERE id = ?;",
+ (message, job_id),
+ )
+ cursor.execute(
+ "UPDATE image_subfolder_move_items SET state = 'error', error_message = ? WHERE job_id = ?;",
+ (message, job_id),
+ )
+
+ def get_job(self, job_id: int) -> ImageMoveJob:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ "SELECT id, state, error_message FROM image_subfolder_move_jobs WHERE id = ?;",
+ (job_id,),
+ )
+ row = cursor.fetchone()
+ if row is None:
+ raise ValueError(f"Image move job not found: {job_id}")
+ return ImageMoveJob(
+ id=cast(int, row["id"]), state=cast(MoveJobState, row["state"]), error_message=row["error_message"]
+ )
+
+ def get_latest_job(self) -> ImageMoveJob | None:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ """--sql
+ SELECT id, state, error_message
+ FROM image_subfolder_move_jobs
+ ORDER BY id DESC
+ LIMIT 1;
+ """
+ )
+ row = cursor.fetchone()
+ if row is None:
+ return None
+ return ImageMoveJob(
+ id=cast(int, row["id"]), state=cast(MoveJobState, row["state"]), error_message=row["error_message"]
+ )
+
+ def _get_new_subfolder(
+ self, image_name: str, image_category: ImageCategory, is_intermediate: bool, created_at: str | datetime
+ ) -> str:
+ strategy = self._config.image_subfolder_strategy
+ if strategy == "flat":
+ return ""
+ if strategy == "type":
+ return "intermediate" if is_intermediate else image_category.value
+ if strategy == "hash":
+ return image_name[:2]
+ if strategy == "date":
+ timestamp = created_at if isinstance(created_at, datetime) else datetime.fromisoformat(created_at)
+ return f"{timestamp.year}/{timestamp.month:02d}/{timestamp.day:02d}"
+ raise ValueError(f"Unknown image subfolder strategy: {strategy}")
+
+ def _get_items(self, job_id: int) -> list[PlannedImageMove]:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ """--sql
+ SELECT image_name, old_subfolder, new_subfolder, is_intermediate
+ FROM image_subfolder_move_items
+ WHERE job_id = ?
+ ORDER BY image_name;
+ """,
+ (job_id,),
+ )
+ rows = cursor.fetchall()
+ return [
+ PlannedImageMove(
+ image_name=row["image_name"],
+ old_subfolder=row["old_subfolder"],
+ new_subfolder=row["new_subfolder"],
+ is_intermediate=bool(row["is_intermediate"]),
+ old_path=self.image_files.get_path(row["image_name"], image_subfolder=row["old_subfolder"]),
+ new_path=self.image_files.get_path(row["image_name"], image_subfolder=row["new_subfolder"]),
+ old_thumbnail_path=self.image_files.get_path(
+ row["image_name"], thumbnail=True, image_subfolder=row["old_subfolder"]
+ ),
+ new_thumbnail_path=self.image_files.get_path(
+ row["image_name"], thumbnail=True, image_subfolder=row["new_subfolder"]
+ ),
+ )
+ for row in rows
+ ]
+
+ def _set_job_state(self, job_id: int, state: MoveJobState) -> None:
+ with self._db.transaction() as cursor:
+ cursor.execute("UPDATE image_subfolder_move_jobs SET state = ? WHERE id = ?;", (state, job_id))
+
+ def _has_active_job_for_image(self, image_name: str) -> bool:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ """--sql
+ SELECT 1
+ FROM image_subfolder_move_items AS item
+ JOIN image_subfolder_move_jobs AS job ON job.id = item.job_id
+ WHERE item.image_name = ?
+ AND job.state NOT IN ('committed', 'error')
+ LIMIT 1;
+ """,
+ (image_name,),
+ )
+ return cursor.fetchone() is not None
+
+ def _get_active_job_id(self) -> int | None:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ """--sql
+ SELECT id
+ FROM image_subfolder_move_jobs
+ WHERE state NOT IN ('committed', 'error')
+ ORDER BY id
+ LIMIT 1;
+ """
+ )
+ row = cursor.fetchone()
+ return None if row is None else cast(int, row["id"])
+
+ def _next_image_name(self, last_image_name: str) -> str | None:
+ with self._db.transaction() as cursor:
+ cursor.execute(
+ """--sql
+ SELECT image_name FROM images
+ WHERE image_name > ?
+ AND deleted_at IS NULL
+ ORDER BY image_name
+ LIMIT 1;
+ """,
+ (last_image_name,),
+ )
+ row = cursor.fetchone()
+ return None if row is None else cast(str, row[0])
+
+ def _regenerate_thumbnail(self, image_path: Path, thumbnail_path: Path) -> None:
+ thumbnail_path.parent.mkdir(parents=True, exist_ok=True)
+ with Image.open(image_path) as image:
+ thumbnail = make_thumbnail(image)
+ with tempfile.NamedTemporaryFile(
+ dir=thumbnail_path.parent, prefix=f".{thumbnail_path.name}.", suffix=".tmp", delete=False
+ ) as temp_file:
+ temp_path = Path(temp_file.name)
+ try:
+ thumbnail.save(temp_path, format="WEBP")
+ self._fsync_file(temp_path)
+ os.replace(temp_path, thumbnail_path)
+ self._fsync_file(thumbnail_path)
+ self._fsync_dir(thumbnail_path.parent)
+ finally:
+ temp_path.unlink(missing_ok=True)
+
+ def _mark_missing_intermediate_moved(
+ self,
+ job_id: int,
+ image_name: str,
+ old_path: Path,
+ new_path: Path,
+ old_thumbnail_path: Path,
+ new_thumbnail_path: Path,
+ ) -> None:
+ for path in (old_thumbnail_path, new_thumbnail_path):
+ if path.exists():
+ path.unlink()
+ self._fsync_dir(path.parent)
+ self.image_files.evict_cache_paths([old_path, new_path, old_thumbnail_path, new_thumbnail_path])
+ self.mark_item_moved(job_id, image_name)
+
+ def _remove_empty_parents(self, start: Path, root: Path) -> None:
+ root = root.resolve()
+ current = start.resolve()
+ while current != root and current.is_relative_to(root):
+ try:
+ current.rmdir()
+ except OSError:
+ return
+ current = current.parent
+
+ def _assert_same_filesystem(self, source: Path, destination: Path) -> None:
+ source_parent = source.parent
+ destination_parent = self._nearest_existing_parent(destination.parent)
+ if source_parent.stat().st_dev != destination_parent.stat().st_dev:
+ raise ValueError(f"Cross-filesystem image move is not supported: {source} -> {destination}")
+
+ def _nearest_existing_parent(self, path: Path) -> Path:
+ current = path
+ while not current.exists():
+ if current.parent == current:
+ raise FileNotFoundError(f"No existing parent found for {path}")
+ current = current.parent
+ return current
+
+ def _fsync_file(self, path: Path) -> None:
+ try:
+ with path.open("rb") as file:
+ os.fsync(file.fileno())
+ except OSError as e:
+ self._logger.debug("Unable to fsync file: %s: %s", path, e)
+
+ def _fsync_dir(self, path: Path) -> None:
+ try:
+ dir_fd = os.open(path, os.O_RDONLY)
+ except OSError as e:
+ self._logger.debug("Unable to open directory for fsync: %s: %s", path, e)
+ return
+ try:
+ os.fsync(dir_fd)
+ except OSError as e:
+ self._logger.debug("Unable to fsync directory: %s: %s", path, e)
+ finally:
+ try:
+ os.close(dir_fd)
+ except OSError as e:
+ self._logger.debug("Unable to close directory fsync handle: %s: %s", path, e)
+
+ def _is_unrecoverable_error(self, error: Exception) -> bool:
+ return isinstance(error, RuntimeError) and (
+ str(error).startswith("Both old and new image files exist")
+ or str(error).startswith("Neither old nor new image file exists")
+ or str(error).startswith("Image move job")
+ and "has no items" in str(error)
+ )
diff --git a/invokeai/app/services/invocation_services.py b/invokeai/app/services/invocation_services.py
index 2c95f87b41d..54f9d82b786 100644
--- a/invokeai/app/services/invocation_services.py
+++ b/invokeai/app/services/invocation_services.py
@@ -23,6 +23,7 @@
from invokeai.app.services.events.events_base import EventServiceBase
from invokeai.app.services.external_generation.external_generation_base import ExternalGenerationServiceBase
from invokeai.app.services.image_files.image_files_base import ImageFileStorageBase
+ from invokeai.app.services.image_moves.image_moves_default import ImageMoveService
from invokeai.app.services.image_records.image_records_base import ImageRecordStorageBase
from invokeai.app.services.images.images_base import ImageServiceABC
from invokeai.app.services.invocation_cache.invocation_cache_base import InvocationCacheBase
@@ -79,6 +80,7 @@ def __init__(
workflow_thumbnails: "WorkflowThumbnailServiceBase",
client_state_persistence: "ClientStatePersistenceABC",
users: "UserServiceBase",
+ image_moves: "ImageMoveService | None" = None,
):
self.board_images = board_images
self.board_image_records = board_image_records
@@ -99,6 +101,7 @@ def __init__(
self.external_generation = external_generation
self.performance_statistics = performance_statistics
self.session_queue = session_queue
+ self.image_moves = image_moves
self.session_processor = session_processor
self.invocation_cache = invocation_cache
self.names = names
diff --git a/invokeai/app/services/session_processor/session_processor_default.py b/invokeai/app/services/session_processor/session_processor_default.py
index 7159c19e746..b10aa6bbb6e 100644
--- a/invokeai/app/services/session_processor/session_processor_default.py
+++ b/invokeai/app/services/session_processor/session_processor_default.py
@@ -416,6 +416,10 @@ def get_status(self) -> SessionProcessorStatus:
is_processing=self._queue_item is not None,
)
+ def _is_image_move_maintenance_active(self) -> bool:
+ image_moves = getattr(self._invoker.services, "image_moves", None)
+ return image_moves is not None and image_moves.is_maintenance_active()
+
def _process(
self,
stop_event: ThreadEvent,
@@ -437,6 +441,11 @@ def _process(
# If we are paused, wait for resume event
resume_event.wait()
+ if self._is_image_move_maintenance_active():
+ self._invoker.services.logger.debug("Image storage maintenance is active")
+ poll_now_event.wait(self._polling_interval)
+ continue
+
# Get the next session to process
self._queue_item = self._invoker.services.session_queue.dequeue()
diff --git a/invokeai/app/services/shared/sqlite/sqlite_util.py b/invokeai/app/services/shared/sqlite/sqlite_util.py
index 12642610c8c..3e1d5c53f3e 100644
--- a/invokeai/app/services/shared/sqlite/sqlite_util.py
+++ b/invokeai/app/services/shared/sqlite/sqlite_util.py
@@ -34,6 +34,7 @@
from invokeai.app.services.shared.sqlite_migrator.migrations.migration_29 import build_migration_29
from invokeai.app.services.shared.sqlite_migrator.migrations.migration_30 import build_migration_30
from invokeai.app.services.shared.sqlite_migrator.migrations.migration_31 import build_migration_31
+from invokeai.app.services.shared.sqlite_migrator.migrations.migration_32 import build_migration_32
from invokeai.app.services.shared.sqlite_migrator.sqlite_migrator_impl import SqliteMigrator
@@ -85,6 +86,7 @@ def init_db(config: InvokeAIAppConfig, logger: Logger, image_files: ImageFileSto
migrator.register_migration(build_migration_29())
migrator.register_migration(build_migration_30())
migrator.register_migration(build_migration_31())
+ migrator.register_migration(build_migration_32())
migrator.run_migrations()
return db
diff --git a/invokeai/app/services/shared/sqlite_migrator/migrations/migration_32.py b/invokeai/app/services/shared/sqlite_migrator/migrations/migration_32.py
new file mode 100644
index 00000000000..d15413b4ac7
--- /dev/null
+++ b/invokeai/app/services/shared/sqlite_migrator/migrations/migration_32.py
@@ -0,0 +1,72 @@
+import sqlite3
+
+from invokeai.app.services.shared.sqlite_migrator.sqlite_migrator_common import Migration
+
+
+class Migration32Callback:
+ def __call__(self, cursor: sqlite3.Cursor) -> None:
+ cursor.execute(
+ """--sql
+ CREATE TABLE IF NOT EXISTS image_subfolder_move_jobs (
+ id INTEGER PRIMARY KEY,
+ state TEXT NOT NULL CHECK (
+ state IN ('planned', 'moving', 'moved', 'committed', 'error')
+ ),
+ created_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
+ updated_at DATETIME NOT NULL DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')),
+ error_message TEXT
+ );
+ """
+ )
+ cursor.execute(
+ """--sql
+ CREATE TABLE IF NOT EXISTS image_subfolder_move_items (
+ job_id INTEGER NOT NULL REFERENCES image_subfolder_move_jobs(id),
+ image_name TEXT NOT NULL REFERENCES images(image_name),
+ old_subfolder TEXT NOT NULL,
+ new_subfolder TEXT NOT NULL,
+ is_intermediate BOOLEAN NOT NULL DEFAULT FALSE,
+ old_path TEXT,
+ new_path TEXT,
+ old_thumbnail_path TEXT,
+ new_thumbnail_path TEXT,
+ state TEXT NOT NULL CHECK (
+ state IN ('planned', 'moved', 'committed', 'error')
+ ),
+ error_message TEXT,
+ PRIMARY KEY (job_id, image_name)
+ );
+ """
+ )
+ cursor.execute(
+ """--sql
+ CREATE INDEX IF NOT EXISTS idx_image_subfolder_move_items_job_state
+ ON image_subfolder_move_items(job_id, state);
+ """
+ )
+ cursor.execute(
+ """--sql
+ CREATE INDEX IF NOT EXISTS idx_image_subfolder_move_items_image_name
+ ON image_subfolder_move_items(image_name);
+ """
+ )
+ cursor.execute(
+ """--sql
+ CREATE TRIGGER IF NOT EXISTS tg_image_subfolder_move_jobs_updated_at
+ AFTER UPDATE
+ ON image_subfolder_move_jobs FOR EACH ROW
+ BEGIN
+ UPDATE image_subfolder_move_jobs
+ SET updated_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')
+ WHERE id = old.id;
+ END;
+ """
+ )
+
+
+def build_migration_32() -> Migration:
+ return Migration(
+ from_version=31,
+ to_version=32,
+ callback=Migration32Callback(),
+ )
diff --git a/invokeai/frontend/web/openapi.json b/invokeai/frontend/web/openapi.json
index e13946511e2..b75d46e3196 100644
--- a/invokeai/frontend/web/openapi.json
+++ b/invokeai/frontend/web/openapi.json
@@ -4154,6 +4154,78 @@
]
}
},
+ "/api/v1/image_moves/start": {
+ "post": {
+ "tags": ["image_moves"],
+ "summary": "Start Image Move",
+ "operationId": "start_image_move",
+ "responses": {
+ "202": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/ImageMoveStatusResponse"
+ }
+ }
+ }
+ }
+ },
+ "security": [
+ {
+ "HTTPBearer": []
+ }
+ ]
+ }
+ },
+ "/api/v1/image_moves/recover": {
+ "post": {
+ "tags": ["image_moves"],
+ "summary": "Start Image Move Recovery",
+ "operationId": "start_image_move_recovery",
+ "responses": {
+ "202": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/ImageMoveStatusResponse"
+ }
+ }
+ }
+ }
+ },
+ "security": [
+ {
+ "HTTPBearer": []
+ }
+ ]
+ }
+ },
+ "/api/v1/image_moves/status": {
+ "get": {
+ "tags": ["image_moves"],
+ "summary": "Get Image Move Status",
+ "operationId": "get_image_move_status",
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/ImageMoveStatusResponse"
+ }
+ }
+ }
+ }
+ },
+ "security": [
+ {
+ "HTTPBearer": []
+ }
+ ]
+ }
+ },
"/api/v1/images/upload": {
"post": {
"tags": ["images"],
@@ -4803,7 +4875,7 @@
"head": {
"tags": ["images"],
"summary": "Get Image Full",
- "description": "Gets a full-resolution image file.\n\nThis endpoint is intentionally unauthenticated because browsers load images\nvia
tags which cannot send Bearer tokens. Image names are UUIDs,\nproviding security through unguessability.",
+ "description": "Gets a full-resolution image file.\n\nThis endpoint is intentionally unauthenticated because browsers load images\nvia
tags which cannot send Bearer tokens. Image names are UUIDs,\nproviding security through unguessability. Returns 409 while image storage\nmaintenance is active.",
"operationId": "get_image_full_head",
"parameters": [
{
@@ -4843,7 +4915,7 @@
"get": {
"tags": ["images"],
"summary": "Get Image Full",
- "description": "Gets a full-resolution image file.\n\nThis endpoint is intentionally unauthenticated because browsers load images\nvia
tags which cannot send Bearer tokens. Image names are UUIDs,\nproviding security through unguessability.",
+ "description": "Gets a full-resolution image file.\n\nThis endpoint is intentionally unauthenticated because browsers load images\nvia
tags which cannot send Bearer tokens. Image names are UUIDs,\nproviding security through unguessability. Returns 409 while image storage\nmaintenance is active.",
"operationId": "get_image_full",
"parameters": [
{
@@ -4885,7 +4957,7 @@
"get": {
"tags": ["images"],
"summary": "Get Image Thumbnail",
- "description": "Gets a thumbnail image file.\n\nThis endpoint is intentionally unauthenticated because browsers load images\nvia
tags which cannot send Bearer tokens. Image names are UUIDs,\nproviding security through unguessability.",
+ "description": "Gets a thumbnail image file.\n\nThis endpoint is intentionally unauthenticated because browsers load images\nvia
tags which cannot send Bearer tokens. Image names are UUIDs,\nproviding security through unguessability. Returns 409 while image storage\nmaintenance is active.",
"operationId": "get_image_thumbnail",
"parameters": [
{
@@ -33493,6 +33565,101 @@
"$ref": "#/components/schemas/MaskOutput"
}
},
+ "ImageMoveJobResponse": {
+ "properties": {
+ "id": {
+ "type": "integer",
+ "title": "Id",
+ "description": "The image move job id."
+ },
+ "state": {
+ "type": "string",
+ "enum": ["planned", "moving", "moved", "committed", "error"],
+ "title": "State",
+ "description": "The image move job state."
+ },
+ "error_message": {
+ "anyOf": [
+ {
+ "type": "string"
+ },
+ {
+ "type": "null"
+ }
+ ],
+ "title": "Error Message",
+ "description": "The last error recorded for the job, if any."
+ }
+ },
+ "type": "object",
+ "required": ["id", "state"],
+ "title": "ImageMoveJobResponse"
+ },
+ "ImageMoveStatusResponse": {
+ "properties": {
+ "is_running": {
+ "type": "boolean",
+ "title": "Is Running",
+ "description": "Whether an image move background operation is currently running."
+ },
+ "operation": {
+ "anyOf": [
+ {
+ "type": "string",
+ "enum": ["move_all", "recovery"]
+ },
+ {
+ "type": "null"
+ }
+ ],
+ "title": "Operation",
+ "description": "The active background operation, if any."
+ },
+ "active_job_id": {
+ "anyOf": [
+ {
+ "type": "integer"
+ },
+ {
+ "type": "null"
+ }
+ ],
+ "title": "Active Job Id",
+ "description": "The active journal job id, if any."
+ },
+ "latest_job": {
+ "anyOf": [
+ {
+ "$ref": "#/components/schemas/ImageMoveJobResponse"
+ },
+ {
+ "type": "null"
+ }
+ ],
+ "description": "The latest journal job, if any."
+ },
+ "last_error": {
+ "anyOf": [
+ {
+ "type": "string"
+ },
+ {
+ "type": "null"
+ }
+ ],
+ "title": "Last Error",
+ "description": "The last background worker error, if any."
+ },
+ "needs_move_count": {
+ "type": "integer",
+ "title": "Needs Move Count",
+ "description": "The number of images that do not match the current subfolder strategy."
+ }
+ },
+ "type": "object",
+ "required": ["is_running", "needs_move_count"],
+ "title": "ImageMoveStatusResponse"
+ },
"ImageMultiplyInvocation": {
"category": "image",
"class": "invocation",
diff --git a/invokeai/frontend/web/public/locales/en.json b/invokeai/frontend/web/public/locales/en.json
index 3e88d460e55..24baa7cf1ac 100644
--- a/invokeai/frontend/web/public/locales/en.json
+++ b/invokeai/frontend/web/public/locales/en.json
@@ -1846,6 +1846,26 @@
"imageSubfolderStrategySaveFailed": "Failed to save Image Subfolder Strategy",
"imageSubfolderStrategyType": "Type",
"imageSubfolderStrategyUnknown": "Unknown ({{strategy}})",
+ "imageStorageMaintenance": "Image Storage Maintenance",
+ "imageStorageMaintenanceOperationMove": "Move Images",
+ "imageStorageMaintenanceOperationNone": "None",
+ "imageStorageMaintenanceOperationRecovery": "Recovery",
+ "imageStorageMaintenanceRecover": "Recover",
+ "imageStorageMaintenanceRecoveryFailed": "Failed to start Image Storage Maintenance recovery",
+ "imageStorageMaintenanceStart": "Start Move",
+ "imageStorageMaintenanceStartFailed": "Failed to start Image Storage Maintenance",
+ "imageStorageMaintenanceStateCommitted": "Committed",
+ "imageStorageMaintenanceStateError": "Error",
+ "imageStorageMaintenanceStateMoved": "Moved",
+ "imageStorageMaintenanceStateMoving": "Moving",
+ "imageStorageMaintenanceStateNone": "No jobs",
+ "imageStorageMaintenanceStatePlanned": "Planned",
+ "imageStorageMaintenanceStatusIncomplete_one": "{{count}} image still needs moving",
+ "imageStorageMaintenanceStatusIncomplete_other": "{{count}} images still need moving",
+ "imageStorageMaintenanceStatusIdle": "Status: {{state}}",
+ "imageStorageMaintenanceStatusNeedsRecovery": "Recovery needed for job {{jobId}}",
+ "imageStorageMaintenanceStatusRunning": "Running: {{operation}}",
+ "imageStorageMaintenanceStatusUnavailable": "Status unavailable",
"maxQueueHistory": "Max Queue History",
"maxQueueHistorySaveFailed": "Failed to save Max Queue History",
"models": "Models",
diff --git a/invokeai/frontend/web/src/features/system/components/SettingsModal/SettingsImageStorageMaintenance.tsx b/invokeai/frontend/web/src/features/system/components/SettingsModal/SettingsImageStorageMaintenance.tsx
new file mode 100644
index 00000000000..49018e657aa
--- /dev/null
+++ b/invokeai/frontend/web/src/features/system/components/SettingsModal/SettingsImageStorageMaintenance.tsx
@@ -0,0 +1,169 @@
+import { Button, Flex, FormControl, FormLabel, Text } from '@invoke-ai/ui-library';
+import { useAppDispatch, useAppSelector } from 'app/store/storeHooks';
+import { selectCurrentUser } from 'features/auth/store/authSlice';
+import { toast } from 'features/toast/toast';
+import { memo, useCallback, useEffect, useMemo, useState } from 'react';
+import { useTranslation } from 'react-i18next';
+import { api, LIST_TAG } from 'services/api';
+import { useGetRuntimeConfigQuery } from 'services/api/endpoints/appInfo';
+import {
+ useGetImageMoveStatusQuery,
+ useStartImageMoveMutation,
+ useStartImageMoveRecoveryMutation,
+} from 'services/api/endpoints/imageMoves';
+import type { S } from 'services/api/types';
+
+const getOperationKey = (operation: S['ImageMoveStatusResponse']['operation']) => {
+ if (operation === 'move_all') {
+ return 'settings.imageStorageMaintenanceOperationMove';
+ }
+ if (operation === 'recovery') {
+ return 'settings.imageStorageMaintenanceOperationRecovery';
+ }
+ return 'settings.imageStorageMaintenanceOperationNone';
+};
+
+const getJobStateKey = (state: S['ImageMoveJobResponse']['state'] | undefined) => {
+ if (state === 'planned') {
+ return 'settings.imageStorageMaintenanceStatePlanned';
+ }
+ if (state === 'moving') {
+ return 'settings.imageStorageMaintenanceStateMoving';
+ }
+ if (state === 'moved') {
+ return 'settings.imageStorageMaintenanceStateMoved';
+ }
+ if (state === 'committed') {
+ return 'settings.imageStorageMaintenanceStateCommitted';
+ }
+ if (state === 'error') {
+ return 'settings.imageStorageMaintenanceStateError';
+ }
+ return 'settings.imageStorageMaintenanceStateNone';
+};
+
+const invalidatedImageMoveJobIds = new Set();
+
+export const SettingsImageStorageMaintenance = memo(() => {
+ const { t } = useTranslation();
+ const dispatch = useAppDispatch();
+ const currentUser = useAppSelector(selectCurrentUser);
+ const { data: runtimeConfig } = useGetRuntimeConfigQuery();
+ const canAccess = runtimeConfig ? !runtimeConfig.config.multiuser || Boolean(currentUser?.is_admin) : false;
+ const [startImageMove, startImageMoveState] = useStartImageMoveMutation();
+ const [startImageMoveRecovery, startImageMoveRecoveryState] = useStartImageMoveRecoveryMutation();
+ const [shouldPollStatus, setShouldPollStatus] = useState(false);
+ const { data: status, isFetching } = useGetImageMoveStatusQuery(undefined, {
+ skip: !canAccess,
+ pollingInterval: shouldPollStatus ? 2000 : 0,
+ });
+
+ const isRunning = status?.is_running ?? false;
+ const latestJob = status?.latest_job;
+ const needsMoveCount = status?.needs_move_count ?? 0;
+ const hasActiveJob = status?.active_job_id !== null && status?.active_job_id !== undefined;
+ const isBusy = isRunning || startImageMoveState.isLoading || startImageMoveRecoveryState.isLoading;
+
+ useEffect(() => {
+ setShouldPollStatus(canAccess && isRunning);
+ }, [canAccess, isRunning]);
+
+ useEffect(() => {
+ if (!latestJob || latestJob.state !== 'committed' || isRunning) {
+ return;
+ }
+ if (invalidatedImageMoveJobIds.has(latestJob.id)) {
+ return;
+ }
+ invalidatedImageMoveJobIds.add(latestJob.id);
+ dispatch(
+ api.util.invalidateTags([
+ 'Image',
+ 'ImageList',
+ 'ImageMetadata',
+ 'ImageWorkflow',
+ 'ImageNameList',
+ 'ImageCollectionCounts',
+ 'ImageMoveStatus',
+ { type: 'ImageCollection', id: LIST_TAG },
+ ])
+ );
+ }, [dispatch, isRunning, latestJob]);
+
+ const statusText = useMemo(() => {
+ if (!status) {
+ return t('settings.imageStorageMaintenanceStatusUnavailable');
+ }
+ if (hasActiveJob && !isRunning) {
+ return t('settings.imageStorageMaintenanceStatusNeedsRecovery', { jobId: status.active_job_id });
+ }
+ if (isRunning) {
+ return t('settings.imageStorageMaintenanceStatusRunning', { operation: t(getOperationKey(status.operation)) });
+ }
+ if (needsMoveCount > 0) {
+ return t('settings.imageStorageMaintenanceStatusIncomplete', { count: needsMoveCount });
+ }
+ return t('settings.imageStorageMaintenanceStatusIdle', { state: t(getJobStateKey(latestJob?.state)) });
+ }, [hasActiveJob, isRunning, latestJob?.state, needsMoveCount, status, t]);
+
+ const onStart = useCallback(async () => {
+ try {
+ setShouldPollStatus(true);
+ await startImageMove().unwrap();
+ } catch {
+ setShouldPollStatus(false);
+ toast({
+ id: 'IMAGE_STORAGE_MAINTENANCE_START_FAILED',
+ title: t('settings.imageStorageMaintenanceStartFailed'),
+ status: 'error',
+ });
+ }
+ }, [startImageMove, t]);
+
+ const onRecover = useCallback(async () => {
+ try {
+ setShouldPollStatus(true);
+ await startImageMoveRecovery().unwrap();
+ } catch {
+ setShouldPollStatus(false);
+ toast({
+ id: 'IMAGE_STORAGE_MAINTENANCE_RECOVERY_FAILED',
+ title: t('settings.imageStorageMaintenanceRecoveryFailed'),
+ status: 'error',
+ });
+ }
+ }, [startImageMoveRecovery, t]);
+
+ if (!canAccess) {
+ return null;
+ }
+
+ return (
+
+ {t('settings.imageStorageMaintenance')}
+
+
+
+
+ {statusText}
+ {latestJob?.error_message || status?.last_error ? (
+ {latestJob?.error_message || status?.last_error}
+ ) : null}
+
+ );
+});
+
+SettingsImageStorageMaintenance.displayName = 'SettingsImageStorageMaintenance';
diff --git a/invokeai/frontend/web/src/features/system/components/SettingsModal/SettingsModal.tsx b/invokeai/frontend/web/src/features/system/components/SettingsModal/SettingsModal.tsx
index 64478953a37..2fa8d6fde29 100644
--- a/invokeai/frontend/web/src/features/system/components/SettingsModal/SettingsModal.tsx
+++ b/invokeai/frontend/web/src/features/system/components/SettingsModal/SettingsModal.tsx
@@ -28,6 +28,7 @@ import { useRefreshAfterResetModal } from 'features/system/components/SettingsMo
import { SettingsDeveloperLogIsEnabled } from 'features/system/components/SettingsModal/SettingsDeveloperLogIsEnabled';
import { SettingsDeveloperLogLevel } from 'features/system/components/SettingsModal/SettingsDeveloperLogLevel';
import { SettingsDeveloperLogNamespaces } from 'features/system/components/SettingsModal/SettingsDeveloperLogNamespaces';
+import { SettingsImageStorageMaintenance } from 'features/system/components/SettingsModal/SettingsImageStorageMaintenance';
import { SettingsImageSubfolderStrategySelect } from 'features/system/components/SettingsModal/SettingsImageSubfolderStrategySelect';
import { useClearIntermediates } from 'features/system/components/SettingsModal/useClearIntermediates';
import { StickyScrollable } from 'features/system/components/StickyScrollable';
@@ -321,6 +322,7 @@ const SettingsModal = (props: { children: ReactElement }) => {
+
diff --git a/invokeai/frontend/web/src/services/api/endpoints/imageMoves.ts b/invokeai/frontend/web/src/services/api/endpoints/imageMoves.ts
new file mode 100644
index 00000000000..444ecf2379e
--- /dev/null
+++ b/invokeai/frontend/web/src/services/api/endpoints/imageMoves.ts
@@ -0,0 +1,42 @@
+import type { paths } from 'services/api/schema';
+
+import { api, buildV1Url, LIST_TAG } from '..';
+
+const buildImageMovesUrl = (path: string = '') => buildV1Url(`image_moves/${path}`);
+
+type ImageMoveStatusResponse =
+ paths['/api/v1/image_moves/status']['get']['responses']['200']['content']['application/json'];
+
+const imageMovesApi = api.injectEndpoints({
+ endpoints: (build) => ({
+ getImageMoveStatus: build.query({
+ query: () => ({
+ url: buildImageMovesUrl('status'),
+ method: 'GET',
+ }),
+ providesTags: ['ImageMoveStatus', 'FetchOnReconnect'],
+ }),
+ startImageMove: build.mutation({
+ query: () => ({
+ url: buildImageMovesUrl('start'),
+ method: 'POST',
+ }),
+ invalidatesTags: ['ImageMoveStatus'],
+ }),
+ startImageMoveRecovery: build.mutation({
+ query: () => ({
+ url: buildImageMovesUrl('recover'),
+ method: 'POST',
+ }),
+ invalidatesTags: [
+ 'ImageMoveStatus',
+ 'ImageNameList',
+ 'ImageCollectionCounts',
+ { type: 'ImageCollection', id: LIST_TAG },
+ ],
+ }),
+ }),
+});
+
+export const { useGetImageMoveStatusQuery, useStartImageMoveMutation, useStartImageMoveRecoveryMutation } =
+ imageMovesApi;
diff --git a/invokeai/frontend/web/src/services/api/index.ts b/invokeai/frontend/web/src/services/api/index.ts
index a586273f3a7..8df82602e8c 100644
--- a/invokeai/frontend/web/src/services/api/index.ts
+++ b/invokeai/frontend/web/src/services/api/index.ts
@@ -23,6 +23,7 @@ const tagTypes = [
'ImageList',
'ImageMetadata',
'ImageWorkflow',
+ 'ImageMoveStatus',
'ImageCollectionCounts',
'ImageCollection',
'ImageMetadataFromFile',
diff --git a/invokeai/frontend/web/src/services/api/schema.ts b/invokeai/frontend/web/src/services/api/schema.ts
index 7ca0f26fe9f..05185092379 100644
--- a/invokeai/frontend/web/src/services/api/schema.ts
+++ b/invokeai/frontend/web/src/services/api/schema.ts
@@ -1001,6 +1001,57 @@ export type paths = {
patch?: never;
trace?: never;
};
+ "/api/v1/image_moves/start": {
+ parameters: {
+ query?: never;
+ header?: never;
+ path?: never;
+ cookie?: never;
+ };
+ get?: never;
+ put?: never;
+ /** Start Image Move */
+ post: operations["start_image_move"];
+ delete?: never;
+ options?: never;
+ head?: never;
+ patch?: never;
+ trace?: never;
+ };
+ "/api/v1/image_moves/recover": {
+ parameters: {
+ query?: never;
+ header?: never;
+ path?: never;
+ cookie?: never;
+ };
+ get?: never;
+ put?: never;
+ /** Start Image Move Recovery */
+ post: operations["start_image_move_recovery"];
+ delete?: never;
+ options?: never;
+ head?: never;
+ patch?: never;
+ trace?: never;
+ };
+ "/api/v1/image_moves/status": {
+ parameters: {
+ query?: never;
+ header?: never;
+ path?: never;
+ cookie?: never;
+ };
+ /** Get Image Move Status */
+ get: operations["get_image_move_status"];
+ put?: never;
+ post?: never;
+ delete?: never;
+ options?: never;
+ head?: never;
+ patch?: never;
+ trace?: never;
+ };
"/api/v1/images/upload": {
parameters: {
query?: never;
@@ -1147,7 +1198,8 @@ export type paths = {
*
* This endpoint is intentionally unauthenticated because browsers load images
* via
tags which cannot send Bearer tokens. Image names are UUIDs,
- * providing security through unguessability.
+ * providing security through unguessability. Returns 409 while image storage
+ * maintenance is active.
*/
get: operations["get_image_full"];
put?: never;
@@ -1160,7 +1212,8 @@ export type paths = {
*
* This endpoint is intentionally unauthenticated because browsers load images
* via
tags which cannot send Bearer tokens. Image names are UUIDs,
- * providing security through unguessability.
+ * providing security through unguessability. Returns 409 while image storage
+ * maintenance is active.
*/
head: operations["get_image_full_head"];
patch?: never;
@@ -1179,7 +1232,8 @@ export type paths = {
*
* This endpoint is intentionally unauthenticated because browsers load images
* via
tags which cannot send Bearer tokens. Image names are UUIDs,
- * providing security through unguessability.
+ * providing security through unguessability. Returns 409 while image storage
+ * maintenance is active.
*/
get: operations["get_image_thumbnail"];
put?: never;
@@ -14304,6 +14358,55 @@ export type components = {
*/
type: "image_mask_to_tensor";
};
+ /** ImageMoveJobResponse */
+ ImageMoveJobResponse: {
+ /**
+ * Id
+ * @description The image move job id.
+ */
+ id: number;
+ /**
+ * State
+ * @description The image move job state.
+ * @enum {string}
+ */
+ state: "planned" | "moving" | "moved" | "committed" | "error";
+ /**
+ * Error Message
+ * @description The last error recorded for the job, if any.
+ */
+ error_message?: string | null;
+ };
+ /** ImageMoveStatusResponse */
+ ImageMoveStatusResponse: {
+ /**
+ * Is Running
+ * @description Whether an image move background operation is currently running.
+ */
+ is_running: boolean;
+ /**
+ * Operation
+ * @description The active background operation, if any.
+ */
+ operation?: ("move_all" | "recovery") | null;
+ /**
+ * Active Job Id
+ * @description The active journal job id, if any.
+ */
+ active_job_id?: number | null;
+ /** @description The latest journal job, if any. */
+ latest_job?: components["schemas"]["ImageMoveJobResponse"] | null;
+ /**
+ * Last Error
+ * @description The last background worker error, if any.
+ */
+ last_error?: string | null;
+ /**
+ * Needs Move Count
+ * @description The number of images that do not match the current subfolder strategy.
+ */
+ needs_move_count: number;
+ };
/**
* Multiply Images
* @description Multiplies two images together using `PIL.ImageChops.multiply()`.
@@ -34819,6 +34922,66 @@ export interface operations {
};
};
};
+ start_image_move: {
+ parameters: {
+ query?: never;
+ header?: never;
+ path?: never;
+ cookie?: never;
+ };
+ requestBody?: never;
+ responses: {
+ /** @description Successful Response */
+ 202: {
+ headers: {
+ [name: string]: unknown;
+ };
+ content: {
+ "application/json": components["schemas"]["ImageMoveStatusResponse"];
+ };
+ };
+ };
+ };
+ start_image_move_recovery: {
+ parameters: {
+ query?: never;
+ header?: never;
+ path?: never;
+ cookie?: never;
+ };
+ requestBody?: never;
+ responses: {
+ /** @description Successful Response */
+ 202: {
+ headers: {
+ [name: string]: unknown;
+ };
+ content: {
+ "application/json": components["schemas"]["ImageMoveStatusResponse"];
+ };
+ };
+ };
+ };
+ get_image_move_status: {
+ parameters: {
+ query?: never;
+ header?: never;
+ path?: never;
+ cookie?: never;
+ };
+ requestBody?: never;
+ responses: {
+ /** @description Successful Response */
+ 200: {
+ headers: {
+ [name: string]: unknown;
+ };
+ content: {
+ "application/json": components["schemas"]["ImageMoveStatusResponse"];
+ };
+ };
+ };
+ };
upload_image: {
parameters: {
query: {
diff --git a/tests/app/routers/test_board_images_maintenance.py b/tests/app/routers/test_board_images_maintenance.py
new file mode 100644
index 00000000000..90c7df5aa47
--- /dev/null
+++ b/tests/app/routers/test_board_images_maintenance.py
@@ -0,0 +1,154 @@
+from unittest.mock import MagicMock
+
+import pytest
+from fastapi.testclient import TestClient
+
+from invokeai.app.api.auth_dependencies import get_current_user_or_default
+from invokeai.app.api.dependencies import ApiDependencies
+from invokeai.app.api_app import app
+from invokeai.app.services.auth.token_service import TokenData
+from invokeai.app.services.board_records.board_records_common import BoardVisibility
+from invokeai.app.services.boards.boards_common import BoardDTO
+from invokeai.app.services.invoker import Invoker
+
+
+class MockApiDependencies(ApiDependencies):
+ invoker: Invoker
+
+ def __init__(self, invoker) -> None:
+ self.invoker = invoker
+
+
+@pytest.fixture
+def client() -> TestClient:
+ return TestClient(app)
+
+
+@pytest.mark.parametrize(
+ ("method", "path", "json_body"),
+ [
+ ("post", "/api/v1/board_images/", {"board_id": "board-id", "image_name": "image.png"}),
+ ("delete", "/api/v1/board_images/", {"image_name": "image.png"}),
+ ("post", "/api/v1/board_images/batch", {"board_id": "board-id", "image_names": ["image.png"]}),
+ ("post", "/api/v1/board_images/batch/delete", {"image_names": ["image.png"]}),
+ ],
+)
+def test_board_image_mutations_are_blocked_during_image_move_maintenance(
+ monkeypatch: pytest.MonkeyPatch,
+ mock_invoker: Invoker,
+ client: TestClient,
+ method: str,
+ path: str,
+ json_body: dict,
+) -> None:
+ mock_deps = MockApiDependencies(mock_invoker)
+ mock_invoker.services.image_moves = MagicMock()
+ mock_invoker.services.image_moves.is_maintenance_active.return_value = True
+ monkeypatch.setattr(mock_invoker.services.image_records, "get_user_id", MagicMock(return_value="system"))
+ monkeypatch.setattr(mock_invoker.services.images, "get_dto", MagicMock(return_value=MagicMock(board_id=None)))
+ monkeypatch.setattr(
+ mock_invoker.services.boards,
+ "get_dto",
+ MagicMock(
+ return_value=BoardDTO(
+ board_id="board-id",
+ board_name="Board",
+ user_id="system",
+ created_at="2024-01-01 00:00:00.000",
+ updated_at="2024-01-01 00:00:00.000",
+ archived=False,
+ board_visibility=BoardVisibility.Private,
+ cover_image_name=None,
+ image_count=0,
+ asset_count=0,
+ )
+ ),
+ )
+ monkeypatch.setattr("invokeai.app.api.routers.board_images.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.routers._access.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.routers.image_move_maintenance.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.auth_dependencies.ApiDependencies", mock_deps)
+
+ response = client.request(method, path, json=json_body)
+
+ assert response.status_code == 409
+ assert response.json()["detail"] == "Image storage maintenance is active"
+
+
+def test_board_image_mutation_checks_access_before_image_move_maintenance(
+ monkeypatch: pytest.MonkeyPatch,
+ mock_invoker: Invoker,
+ client: TestClient,
+) -> None:
+ mock_deps = MockApiDependencies(mock_invoker)
+ mock_invoker.services.image_moves = MagicMock()
+ mock_invoker.services.image_moves.is_maintenance_active.return_value = True
+ monkeypatch.setattr(mock_invoker.services.image_records, "get_user_id", MagicMock(return_value="other-user"))
+ monkeypatch.setattr(
+ mock_invoker.services.boards,
+ "get_dto",
+ MagicMock(
+ return_value=BoardDTO(
+ board_id="board-id",
+ board_name="Board",
+ user_id="system",
+ created_at="2024-01-01 00:00:00.000",
+ updated_at="2024-01-01 00:00:00.000",
+ archived=False,
+ board_visibility=BoardVisibility.Private,
+ cover_image_name=None,
+ image_count=0,
+ asset_count=0,
+ )
+ ),
+ )
+ monkeypatch.setattr("invokeai.app.api.routers.board_images.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.routers._access.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.routers.image_move_maintenance.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.auth_dependencies.ApiDependencies", mock_deps)
+
+ async def current_user_override() -> TokenData:
+ return TokenData(user_id="request-user", email="request-user@example.com", is_admin=False)
+
+ app.dependency_overrides[get_current_user_or_default] = current_user_override
+ try:
+ response = client.post("/api/v1/board_images/", json={"board_id": "board-id", "image_name": "image.png"})
+
+ assert response.status_code == 403
+ mock_invoker.services.image_moves.is_maintenance_active.assert_not_called()
+ finally:
+ app.dependency_overrides.pop(get_current_user_or_default, None)
+
+
+def test_delete_board_with_images_is_blocked_during_image_move_maintenance(
+ monkeypatch: pytest.MonkeyPatch,
+ mock_invoker: Invoker,
+ client: TestClient,
+) -> None:
+ mock_deps = MockApiDependencies(mock_invoker)
+ mock_invoker.services.image_moves = MagicMock()
+ mock_invoker.services.image_moves.is_maintenance_active.return_value = True
+ mock_invoker.services.images.delete_images_on_board = MagicMock()
+ mock_invoker.services.boards.get_dto = MagicMock(
+ return_value=BoardDTO(
+ board_id="board-id",
+ board_name="Board",
+ user_id="system",
+ created_at="2024-01-01 00:00:00.000",
+ updated_at="2024-01-01 00:00:00.000",
+ archived=False,
+ board_visibility=BoardVisibility.Private,
+ cover_image_name=None,
+ image_count=0,
+ asset_count=0,
+ )
+ )
+ monkeypatch.setattr("invokeai.app.api.routers.boards.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.routers.image_move_maintenance.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.auth_dependencies.ApiDependencies", mock_deps)
+
+ response = client.delete("/api/v1/boards/board-id?include_images=true")
+
+ assert response.status_code == 409
+ assert response.json()["detail"] == "Image storage maintenance is active"
+ mock_invoker.services.images.delete_images_on_board.assert_not_called()
diff --git a/tests/app/routers/test_image_moves.py b/tests/app/routers/test_image_moves.py
new file mode 100644
index 00000000000..697447269c3
--- /dev/null
+++ b/tests/app/routers/test_image_moves.py
@@ -0,0 +1,199 @@
+import logging
+from unittest.mock import MagicMock
+
+import pytest
+from fastapi import status
+from fastapi.testclient import TestClient
+
+from invokeai.app.api.dependencies import ApiDependencies
+from invokeai.app.api_app import app
+from invokeai.app.services.auth.token_service import set_jwt_secret
+from invokeai.app.services.board_image_records.board_image_records_sqlite import SqliteBoardImageRecordStorage
+from invokeai.app.services.board_records.board_records_sqlite import SqliteBoardRecordStorage
+from invokeai.app.services.boards.boards_default import BoardService
+from invokeai.app.services.bulk_download.bulk_download_default import BulkDownloadService
+from invokeai.app.services.client_state_persistence.client_state_persistence_sqlite import ClientStatePersistenceSqlite
+from invokeai.app.services.config.config_default import InvokeAIAppConfig
+from invokeai.app.services.image_moves.image_moves_default import ImageMoveJobAlreadyRunning, ImageMoveQueueActive
+from invokeai.app.services.image_records.image_records_sqlite import SqliteImageRecordStorage
+from invokeai.app.services.images.images_default import ImageService
+from invokeai.app.services.invocation_cache.invocation_cache_memory import MemoryInvocationCache
+from invokeai.app.services.invocation_services import InvocationServices
+from invokeai.app.services.invocation_stats.invocation_stats_default import InvocationStatsService
+from invokeai.app.services.invoker import Invoker
+from invokeai.app.services.users.users_common import UserCreateRequest
+from invokeai.app.services.users.users_default import UserService
+from invokeai.app.services.workflow_records.workflow_records_sqlite import SqliteWorkflowRecordsStorage
+from invokeai.backend.util.logging import InvokeAILogger
+from tests.fixtures.sqlite_database import create_mock_sqlite_database
+from tests.test_nodes import TestEventService
+
+
+class MockApiDependencies(ApiDependencies):
+ invoker: Invoker
+
+ def __init__(self, invoker: Invoker) -> None:
+ self.invoker = invoker
+
+
+@pytest.fixture
+def client() -> TestClient:
+ return TestClient(app)
+
+
+@pytest.fixture
+def mock_services() -> InvocationServices:
+ configuration = InvokeAIAppConfig(use_memory_db=True, node_cache_size=0)
+ logger = InvokeAILogger.get_logger()
+ db = create_mock_sqlite_database(configuration, logger)
+ image_moves = MagicMock()
+ return InvocationServices(
+ board_image_records=SqliteBoardImageRecordStorage(db=db),
+ board_images=None, # type: ignore
+ board_records=SqliteBoardRecordStorage(db=db),
+ boards=BoardService(),
+ bulk_download=BulkDownloadService(),
+ configuration=configuration,
+ events=TestEventService(),
+ image_files=None, # type: ignore
+ image_records=SqliteImageRecordStorage(db=db),
+ images=ImageService(),
+ invocation_cache=MemoryInvocationCache(max_cache_size=0),
+ logger=logging, # type: ignore
+ model_images=None, # type: ignore
+ model_manager=None, # type: ignore
+ download_queue=None, # type: ignore
+ external_generation=None, # type: ignore
+ names=None, # type: ignore
+ performance_statistics=InvocationStatsService(),
+ session_processor=None, # type: ignore
+ session_queue=None, # type: ignore
+ urls=None, # type: ignore
+ workflow_records=SqliteWorkflowRecordsStorage(db=db),
+ tensors=None, # type: ignore
+ conditioning=None, # type: ignore
+ style_preset_records=None, # type: ignore
+ style_preset_image_files=None, # type: ignore
+ workflow_thumbnails=None, # type: ignore
+ model_relationship_records=None, # type: ignore
+ model_relationships=None, # type: ignore
+ client_state_persistence=ClientStatePersistenceSqlite(db=db),
+ users=UserService(db),
+ image_moves=image_moves,
+ )
+
+
+@pytest.fixture
+def mock_invoker(mock_services: InvocationServices, monkeypatch: pytest.MonkeyPatch) -> Invoker:
+ invoker = Invoker(services=mock_services)
+ mock_deps = MockApiDependencies(invoker)
+ monkeypatch.setattr("invokeai.app.api.routers.image_moves.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.auth_dependencies.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.routers.auth.ApiDependencies", mock_deps)
+ return invoker
+
+
+def _status_payload(is_running: bool = True, operation: str = "move_all") -> dict:
+ return {
+ "is_running": is_running,
+ "operation": operation,
+ "active_job_id": None,
+ "latest_job": None,
+ "last_error": None,
+ "needs_move_count": 0,
+ }
+
+
+def _create_user(invoker: Invoker, email: str, is_admin: bool) -> None:
+ invoker.services.users.create(
+ UserCreateRequest(
+ email=email,
+ display_name=email,
+ password="TestPass123",
+ is_admin=is_admin,
+ )
+ )
+
+
+def _login(client: TestClient, email: str) -> str:
+ response = client.post("/api/v1/auth/login", json={"email": email, "password": "TestPass123"})
+ assert response.status_code == status.HTTP_200_OK
+ return response.json()["token"]
+
+
+def test_start_image_move_returns_accepted_without_running_job_inline(
+ client: TestClient, mock_invoker: Invoker
+) -> None:
+ image_moves = mock_invoker.services.image_moves
+ image_moves.start_background_move_all.return_value = _status_payload()
+
+ response = client.post("/api/v1/image_moves/start")
+
+ assert response.status_code == status.HTTP_202_ACCEPTED
+ assert response.json()["is_running"] is True
+ image_moves.start_background_move_all.assert_called_once_with()
+ image_moves.move_all_images.assert_not_called()
+
+
+def test_start_image_move_rejects_overlapping_background_job(client: TestClient, mock_invoker: Invoker) -> None:
+ image_moves = mock_invoker.services.image_moves
+ image_moves.start_background_move_all.side_effect = ImageMoveJobAlreadyRunning("already running")
+
+ response = client.post("/api/v1/image_moves/start")
+
+ assert response.status_code == status.HTTP_409_CONFLICT
+ assert response.json()["detail"] == "already running"
+
+
+def test_start_image_move_rejects_active_queue_work(client: TestClient, mock_invoker: Invoker) -> None:
+ image_moves = mock_invoker.services.image_moves
+ image_moves.start_background_move_all.side_effect = ImageMoveQueueActive("queue work is active")
+
+ response = client.post("/api/v1/image_moves/start")
+
+ assert response.status_code == status.HTTP_409_CONFLICT
+ assert response.json()["detail"] == "queue work is active"
+
+
+def test_force_recovery_returns_accepted(client: TestClient, mock_invoker: Invoker) -> None:
+ image_moves = mock_invoker.services.image_moves
+ image_moves.start_background_recovery.return_value = _status_payload(operation="recovery")
+
+ response = client.post("/api/v1/image_moves/recover")
+
+ assert response.status_code == status.HTTP_202_ACCEPTED
+ assert response.json()["operation"] == "recovery"
+ image_moves.start_background_recovery.assert_called_once_with()
+
+
+def test_image_move_status_uses_service_status(client: TestClient, mock_invoker: Invoker) -> None:
+ image_moves = mock_invoker.services.image_moves
+ image_moves.get_background_status.return_value = _status_payload(is_running=False)
+
+ response = client.get("/api/v1/image_moves/status")
+
+ assert response.status_code == status.HTTP_200_OK
+ assert response.json()["is_running"] is False
+ assert response.json()["needs_move_count"] == 0
+ image_moves.get_background_status.assert_called_once_with()
+
+
+@pytest.mark.parametrize(
+ ("method", "path"),
+ [
+ ("post", "/api/v1/image_moves/start"),
+ ("post", "/api/v1/image_moves/recover"),
+ ("get", "/api/v1/image_moves/status"),
+ ],
+)
+def test_image_move_endpoints_require_admin_in_multiuser_mode(
+ client: TestClient, mock_invoker: Invoker, method: str, path: str
+) -> None:
+ set_jwt_secret("test-secret")
+ mock_invoker.services.configuration.multiuser = True
+ _create_user(mock_invoker, "user@test.com", is_admin=False)
+ token = _login(client, "user@test.com")
+
+ response = getattr(client, method)(path, headers={"Authorization": f"Bearer {token}"})
+
+ assert response.status_code == status.HTTP_403_FORBIDDEN
diff --git a/tests/app/routers/test_images.py b/tests/app/routers/test_images.py
index 619ecb78c4f..1e4270abff7 100644
--- a/tests/app/routers/test_images.py
+++ b/tests/app/routers/test_images.py
@@ -1,13 +1,16 @@
import os
from pathlib import Path
from typing import Any
+from unittest.mock import MagicMock
import pytest
from fastapi import BackgroundTasks
from fastapi.testclient import TestClient
+from invokeai.app.api.auth_dependencies import get_current_user_or_default
from invokeai.app.api.dependencies import ApiDependencies
from invokeai.app.api_app import app
+from invokeai.app.services.auth.token_service import TokenData
from invokeai.app.services.board_records.board_records_common import BoardRecord
from invokeai.app.services.invoker import Invoker
@@ -66,6 +69,98 @@ def mock_add_task(*args, **kwargs):
monkeypatch.setattr(BackgroundTasks, "add_task", mock_add_task)
+def prepare_image_maintenance_test(monkeypatch: Any, mock_invoker: Invoker) -> None:
+ mock_deps = MockApiDependencies(mock_invoker)
+ mock_invoker.services.image_moves = MagicMock()
+ mock_invoker.services.image_moves.is_maintenance_active.return_value = True
+ monkeypatch.setattr(mock_invoker.services.image_records, "get_user_id", MagicMock(return_value="system"))
+ monkeypatch.setattr(mock_invoker.services.board_image_records, "get_board_for_image", MagicMock(return_value=None))
+ monkeypatch.setattr("invokeai.app.api.routers.images.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.routers._access.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.routers.image_move_maintenance.ApiDependencies", mock_deps)
+ monkeypatch.setattr("invokeai.app.api.auth_dependencies.ApiDependencies", mock_deps)
+
+
+@pytest.mark.parametrize(
+ ("method", "path", "json_body"),
+ [
+ ("get", "/api/v1/images/i/test.png/full", None),
+ ("head", "/api/v1/images/i/test.png/full", None),
+ ("get", "/api/v1/images/i/test.png/thumbnail", None),
+ ("get", "/api/v1/images/i/test.png/workflow", None),
+ ("delete", "/api/v1/images/i/test.png", None),
+ ("delete", "/api/v1/images/intermediates", None),
+ ("delete", "/api/v1/images/uncategorized", None),
+ ("patch", "/api/v1/images/i/test.png", {"starred": True}),
+ ("post", "/api/v1/images/delete", {"image_names": ["test.png"]}),
+ ("post", "/api/v1/images/star", {"image_names": ["test.png"]}),
+ ("post", "/api/v1/images/unstar", {"image_names": ["test.png"]}),
+ ("post", "/api/v1/images/download", {"image_names": ["test.png"]}),
+ ],
+)
+def test_image_operations_are_blocked_during_image_move_maintenance(
+ monkeypatch: Any, mock_invoker: Invoker, client: TestClient, method: str, path: str, json_body: dict | None
+) -> None:
+ prepare_image_maintenance_test(monkeypatch, mock_invoker)
+
+ if json_body is not None:
+ response = getattr(client, method)(path, json=json_body)
+ else:
+ response = getattr(client, method)(path)
+
+ assert response.status_code == 409
+ if method != "head":
+ assert response.json()["detail"] == "Image storage maintenance is active"
+
+
+def test_image_mutation_checks_access_before_image_move_maintenance(
+ monkeypatch: Any, mock_invoker: Invoker, client: TestClient
+) -> None:
+ prepare_image_maintenance_test(monkeypatch, mock_invoker)
+ monkeypatch.setattr(mock_invoker.services.image_records, "get_user_id", MagicMock(return_value="other-user"))
+
+ async def current_user_override() -> TokenData:
+ return TokenData(user_id="request-user", email="request-user@example.com", is_admin=False)
+
+ app.dependency_overrides[get_current_user_or_default] = current_user_override
+ try:
+ response = client.delete("/api/v1/images/i/test.png")
+
+ assert response.status_code == 403
+ mock_invoker.services.image_moves.is_maintenance_active.assert_not_called()
+ finally:
+ app.dependency_overrides.pop(get_current_user_or_default, None)
+
+
+def test_image_upload_is_blocked_during_image_move_maintenance(
+ monkeypatch: Any, mock_invoker: Invoker, client: TestClient
+) -> None:
+ prepare_image_maintenance_test(monkeypatch, mock_invoker)
+
+ response = client.post(
+ "/api/v1/images/upload",
+ params={"image_category": "general", "is_intermediate": False},
+ files={"file": ("test.png", b"not-read-during-maintenance", "image/png")},
+ )
+
+ assert response.status_code == 409
+ assert response.json()["detail"] == "Image storage maintenance is active"
+
+
+def test_image_to_prompt_is_blocked_during_image_move_maintenance(
+ monkeypatch: Any, mock_invoker: Invoker, client: TestClient
+) -> None:
+ prepare_image_maintenance_test(monkeypatch, mock_invoker)
+
+ response = client.post(
+ "/api/v1/utilities/image-to-prompt",
+ json={"image_name": "test.png", "model_key": "model-key", "instruction": "describe"},
+ )
+
+ assert response.status_code == 409
+ assert response.json()["detail"] == "Image storage maintenance is active"
+
+
def test_download_images_with_empty_image_list_and_no_board_id(
monkeypatch: Any, mock_invoker: Invoker, client: TestClient
) -> None:
diff --git a/tests/app/routers/test_recall_parameters.py b/tests/app/routers/test_recall_parameters.py
index 9dddf497ec6..777de45974d 100644
--- a/tests/app/routers/test_recall_parameters.py
+++ b/tests/app/routers/test_recall_parameters.py
@@ -89,6 +89,19 @@ def _load(image_name: str) -> Optional[dict[str, Any]]:
return _load
+def test_recall_parameters_is_blocked_during_image_move_maintenance(
+ monkeypatch: Any, patched_dependencies: MockApiDependencies, mock_invoker: Invoker, client: TestClient
+) -> None:
+ mock_invoker.services.image_moves = MagicMock()
+ mock_invoker.services.image_moves.is_maintenance_active.return_value = True
+ monkeypatch.setattr("invokeai.app.api.routers.image_move_maintenance.ApiDependencies", patched_dependencies)
+
+ response = client.post("/api/v1/recall/default", json={"positive_prompt": "hello"})
+
+ assert response.status_code == 409
+ assert response.json()["detail"] == "Image storage maintenance is active"
+
+
class TestReferenceImagesRecall:
def test_reference_images_forwarded_when_image_exists(
self, monkeypatch: Any, patched_dependencies: MockApiDependencies, client: TestClient
diff --git a/tests/app/routers/test_session_queue_image_move_maintenance.py b/tests/app/routers/test_session_queue_image_move_maintenance.py
new file mode 100644
index 00000000000..3ccb65dc3d9
--- /dev/null
+++ b/tests/app/routers/test_session_queue_image_move_maintenance.py
@@ -0,0 +1,35 @@
+from unittest.mock import MagicMock
+
+import pytest
+from fastapi import HTTPException
+
+from invokeai.app.api.dependencies import ApiDependencies
+from invokeai.app.api.routers.session_queue import enqueue_batch
+from invokeai.app.services.session_queue.session_queue_common import DEFAULT_QUEUE_ID, Batch
+from invokeai.app.services.shared.graph import Graph
+
+
+class MockApiDependencies(ApiDependencies):
+ def __init__(self, invoker) -> None:
+ self.invoker = invoker
+
+
+@pytest.mark.anyio
+async def test_enqueue_batch_is_blocked_during_image_move_maintenance(
+ monkeypatch: pytest.MonkeyPatch, mock_invoker
+) -> None:
+ mock_deps = MockApiDependencies(mock_invoker)
+ mock_invoker.services.image_moves = MagicMock()
+ mock_invoker.services.image_moves.is_maintenance_active.return_value = True
+ monkeypatch.setattr("invokeai.app.api.routers.image_move_maintenance.ApiDependencies", mock_deps)
+
+ with pytest.raises(HTTPException) as exc:
+ await enqueue_batch(
+ current_user=MagicMock(user_id="user-id"),
+ queue_id=DEFAULT_QUEUE_ID,
+ batch=Batch(graph=Graph()),
+ prepend=False,
+ )
+
+ assert exc.value.status_code == 409
+ assert exc.value.detail == "Image storage maintenance is active"
diff --git a/tests/app/services/image_files/test_image_files_disk.py b/tests/app/services/image_files/test_image_files_disk.py
index 9db9f7ba5d0..58c1a78635f 100644
--- a/tests/app/services/image_files/test_image_files_disk.py
+++ b/tests/app/services/image_files/test_image_files_disk.py
@@ -134,7 +134,9 @@ def test_save_and_delete_with_subfolder(self, disk_storage: DiskImageFileStorage
# Thumbnail file exists in mirrored subfolder
thumbnail_name = get_thumbnail_name(image_name)
- thumb_path = disk_storage.get_path(thumbnail_name, thumbnail=True, image_subfolder=subfolder)
+ thumb_path = disk_storage.get_path(image_name, thumbnail=True, image_subfolder=subfolder)
+ assert thumb_path.name == thumbnail_name
+ assert not thumb_path.name.startswith("thumbnail_thumbnail_")
assert thumb_path.exists()
# Round-trip read
diff --git a/tests/app/services/image_moves/test_image_moves_default.py b/tests/app/services/image_moves/test_image_moves_default.py
new file mode 100644
index 00000000000..0d4bb1ca996
--- /dev/null
+++ b/tests/app/services/image_moves/test_image_moves_default.py
@@ -0,0 +1,725 @@
+import os
+import threading
+from pathlib import Path
+from shutil import copy2
+from unittest.mock import MagicMock, patch
+
+import pytest
+from PIL import Image
+
+from invokeai.app.services.config.config_default import InvokeAIAppConfig
+from invokeai.app.services.image_files.image_files_disk import DiskImageFileStorage
+from invokeai.app.services.image_moves.image_moves_default import (
+ ImageMoveQueueActive,
+ ImageMoveService,
+)
+from invokeai.app.services.image_records.image_records_common import ImageCategory, ResourceOrigin
+from invokeai.app.services.image_records.image_records_sqlite import SqliteImageRecordStorage
+from invokeai.app.services.session_queue.session_queue_common import DEFAULT_QUEUE_ID, SessionQueueStatus
+from invokeai.app.services.shared.sqlite.sqlite_database import SqliteDatabase
+from invokeai.app.services.shared.sqlite.sqlite_util import init_db
+from invokeai.backend.util.logging import InvokeAILogger
+
+
+def _build_db(tmp_path: Path) -> SqliteDatabase:
+ logger = InvokeAILogger.get_logger()
+ config = InvokeAIAppConfig(use_memory_db=False)
+ config._root = tmp_path
+ image_files = DiskImageFileStorage(tmp_path / "images")
+ return init_db(config=config, logger=logger, image_files=image_files)
+
+
+def _save_record(
+ records: SqliteImageRecordStorage,
+ image_name: str,
+ subfolder: str,
+ created_at: str,
+ is_intermediate: bool = False,
+) -> None:
+ records.save(
+ image_name=image_name,
+ image_origin=ResourceOrigin.INTERNAL,
+ image_category=ImageCategory.GENERAL,
+ width=16,
+ height=16,
+ has_workflow=False,
+ is_intermediate=is_intermediate,
+ image_subfolder=subfolder,
+ )
+ with records._db.transaction() as cursor:
+ cursor.execute("UPDATE images SET created_at = ? WHERE image_name = ?;", (created_at, image_name))
+
+
+def _save_image(
+ service: ImageMoveService,
+ records: SqliteImageRecordStorage,
+ image_name: str,
+ subfolder: str,
+ created_at: str,
+ color: str,
+ is_intermediate: bool = False,
+) -> None:
+ _save_record(
+ records,
+ image_name=image_name,
+ subfolder=subfolder,
+ created_at=created_at,
+ is_intermediate=is_intermediate,
+ )
+ service.image_files.save(Image.new("RGB", (16, 16), color), image_name=image_name, image_subfolder=subfolder)
+
+
+def _service(tmp_path: Path, strategy: str = "date") -> tuple[ImageMoveService, SqliteImageRecordStorage]:
+ db = _build_db(tmp_path)
+ records = SqliteImageRecordStorage(db=db)
+ storage = DiskImageFileStorage(tmp_path / "images")
+ invoker = MagicMock()
+ invoker.services.configuration.pil_compress_level = 6
+ storage.start(invoker)
+ config = InvokeAIAppConfig(use_memory_db=True, image_subfolder_strategy=strategy)
+ config._root = tmp_path
+ service = ImageMoveService(db=db, image_files=storage, config=config, logger=InvokeAILogger.get_logger())
+ return service, records
+
+
+def _job_item_states(service: ImageMoveService, job_id: int) -> dict[str, str]:
+ with service._db.transaction() as cursor:
+ cursor.execute(
+ "SELECT image_name, state FROM image_subfolder_move_items WHERE job_id = ? ORDER BY image_name;",
+ (job_id,),
+ )
+ return {row["image_name"]: row["state"] for row in cursor.fetchall()}
+
+
+def _job_states(service: ImageMoveService) -> dict[int, str]:
+ with service._db.transaction() as cursor:
+ cursor.execute("SELECT id, state FROM image_subfolder_move_jobs ORDER BY id;")
+ return {row["id"]: row["state"] for row in cursor.fetchall()}
+
+
+def test_move_all_images_uses_created_at_for_date_strategy(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-a.png"
+ _save_record(records, image_name=image_name, subfolder="", created_at="2024-02-03 04:05:06.000")
+ service.image_files.save(Image.new("RGB", (16, 16), "red"), image_name=image_name)
+
+ result = service.move_all_images()
+
+ assert result.planned == 1
+ assert result.committed == 1
+ record = records.get(image_name)
+ assert record.image_subfolder == "2024/02/03"
+ assert service.image_files.get_path(image_name, image_subfolder="2024/02/03").exists()
+ assert not service.image_files.get_path(image_name, image_subfolder="").exists()
+
+
+def test_missing_intermediate_source_file_is_treated_as_success(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "missing-intermediate.png"
+ _save_record(
+ records,
+ image_name=image_name,
+ subfolder="",
+ created_at="2024-02-04 04:05:06.000",
+ is_intermediate=True,
+ )
+
+ result = service.move_all_images()
+
+ assert result.planned == 1
+ assert result.committed == 1
+ assert result.errors == 0
+ record = records.get(image_name)
+ assert record.image_subfolder == "2024/02/04"
+ assert service.get_latest_job().state == "committed"
+
+
+def test_missing_intermediate_source_file_removes_orphaned_thumbnail(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "missing-intermediate-with-thumbnail.png"
+ old_subfolder = "old/intermediate"
+ _save_image(
+ service,
+ records,
+ image_name=image_name,
+ subfolder=old_subfolder,
+ created_at="2024-02-04 04:05:06.000",
+ color="red",
+ is_intermediate=True,
+ )
+ old_path = service.image_files.get_path(image_name, image_subfolder=old_subfolder)
+ old_thumbnail_path = service.image_files.get_path(image_name, thumbnail=True, image_subfolder=old_subfolder)
+ assert old_thumbnail_path.exists()
+ old_path.unlink()
+
+ result = service.move_all_images()
+
+ assert result.committed == 1
+ assert not old_thumbnail_path.exists()
+ assert records.get(image_name).image_subfolder == "2024/02/04"
+
+
+def test_missing_non_intermediate_source_file_still_fails(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "missing-general.png"
+ _save_record(
+ records,
+ image_name=image_name,
+ subfolder="",
+ created_at="2024-02-04 04:05:06.000",
+ is_intermediate=False,
+ )
+
+ with pytest.raises(FileNotFoundError, match="Source image does not exist"):
+ service.plan_batch(last_image_name="", limit=100)
+
+
+def test_move_all_images_continues_after_missing_non_intermediate_source_file(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ missing_image_name = "missing-general.png"
+ valid_image_name = "valid-general.png"
+ _save_record(
+ records,
+ image_name=missing_image_name,
+ subfolder="",
+ created_at="2024-02-04 04:05:06.000",
+ is_intermediate=False,
+ )
+ _save_image(service, records, valid_image_name, "", "2024-02-05 04:05:06.000", "blue")
+
+ result = service.move_all_images()
+
+ assert result.errors == 1
+ assert result.committed == 1
+ assert records.get(missing_image_name).image_subfolder == ""
+ assert records.get(valid_image_name).image_subfolder == "2024/02/05"
+ assert "error" in _job_states(service).values()
+ assert "committed" in _job_states(service).values()
+
+
+def test_recovery_treats_missing_intermediate_source_file_as_success(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "missing-intermediate-recovery.png"
+ _save_record(
+ records,
+ image_name=image_name,
+ subfolder="",
+ created_at="2024-02-05 04:05:06.000",
+ is_intermediate=True,
+ )
+ moves = service.plan_batch(last_image_name="", limit=100)
+ job_id = service.create_move_job(moves)
+
+ recovered = service.startup_recovery()
+
+ assert recovered.committed == 1
+ assert recovered.errors == 0
+ assert records.get(image_name).image_subfolder == "2024/02/05"
+ assert service.get_job(job_id).state == "committed"
+ assert _job_item_states(service, job_id) == {image_name: "committed"}
+
+
+def test_startup_recovery_commits_after_files_moved_but_db_not_updated(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-b.png"
+ _save_record(records, image_name=image_name, subfolder="", created_at="2025-06-07 08:09:10.000")
+ service.image_files.save(Image.new("RGB", (16, 16), "blue"), image_name=image_name)
+
+ moves = service.plan_batch(last_image_name="", limit=100)
+ job_id = service.create_move_job(moves)
+ service.perform_filesystem_moves(job_id)
+
+ assert records.get(image_name).image_subfolder == ""
+
+ recovered = service.startup_recovery()
+
+ assert recovered.committed == 1
+ assert records.get(image_name).image_subfolder == "2025/06/07"
+ assert service.get_job(job_id).state == "committed"
+
+
+def test_status_reports_unplanned_images_after_recovery(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ _save_image(service, records, "image-recovered.png", "", "2024-02-03 04:05:06.000", "red")
+ _save_image(service, records, "image-unplanned.png", "", "2024-02-04 04:05:06.000", "blue")
+ moves = service.plan_batch(last_image_name="", limit=1)
+ job_id = service.create_move_job(moves)
+ service.perform_filesystem_moves(job_id)
+
+ recovered = service.startup_recovery()
+ status = service.get_background_status()
+
+ assert recovered.committed == 1
+ assert records.get("image-recovered.png").image_subfolder == "2024/02/03"
+ assert records.get("image-unplanned.png").image_subfolder == ""
+ assert status.active_job_id is None
+ assert status.latest_job is not None
+ assert status.latest_job.state == "committed"
+ assert status.needs_move_count == 1
+
+
+def test_cleanup_empty_source_directories_after_move(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-c.png"
+ old_subfolder = "old/nested"
+ _save_record(records, image_name=image_name, subfolder=old_subfolder, created_at="2024-11-12 01:02:03.000")
+ service.image_files.save(Image.new("RGB", (16, 16), "green"), image_name=image_name, image_subfolder=old_subfolder)
+ old_parent = service.image_files.get_path(image_name, image_subfolder=old_subfolder).parent
+ old_thumb_parent = service.image_files.get_path(image_name, thumbnail=True, image_subfolder=old_subfolder).parent
+
+ service.move_all_images()
+
+ assert not old_parent.exists()
+ assert not old_thumb_parent.exists()
+ assert service.image_files.image_root.exists()
+ assert service.image_files.thumbnail_root.exists()
+
+
+@pytest.mark.skipif(not hasattr(os, "symlink"), reason="symlinks are not supported on this platform")
+def test_cleanup_empty_source_directories_stays_within_symlinked_root(tmp_path: Path) -> None:
+ service, _records = _service(tmp_path, strategy="date")
+ real_root = tmp_path / "real-root"
+ linked_root = tmp_path / "linked-root"
+ sibling = tmp_path / "sibling"
+ real_root.mkdir()
+ sibling.mkdir()
+ try:
+ linked_root.symlink_to(real_root, target_is_directory=True)
+ except OSError as e:
+ pytest.skip(f"symlink creation is not available: {e}")
+ nested = linked_root / "old" / "nested"
+ nested.mkdir(parents=True)
+
+ service._remove_empty_parents(nested, linked_root)
+
+ assert real_root.exists()
+ assert linked_root.exists()
+ assert sibling.exists()
+ assert not (real_root / "old").exists()
+
+
+def test_startup_recovery_cleans_empty_source_directories(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-recovery-cleanup.png"
+ old_subfolder = "old/recovery"
+ _save_image(service, records, image_name, old_subfolder, "2024-11-13 01:02:03.000", "green")
+ moves = service.plan_batch(last_image_name="", limit=100)
+ job_id = service.create_move_job(moves)
+ move = moves[0]
+ old_parent = service.image_files.get_path(image_name, image_subfolder=old_subfolder).parent
+ old_thumb_parent = service.image_files.get_path(image_name, thumbnail=True, image_subfolder=old_subfolder).parent
+ move.new_path.parent.mkdir(parents=True, exist_ok=True)
+ move.new_thumbnail_path.parent.mkdir(parents=True, exist_ok=True)
+ move.old_path.replace(move.new_path)
+ move.old_thumbnail_path.replace(move.new_thumbnail_path)
+
+ recovered = service.startup_recovery()
+
+ assert recovered.committed == 1
+ assert recovered.errors == 0
+ assert not old_parent.exists()
+ assert not old_thumb_parent.exists()
+ assert service.get_job(job_id).state == "committed"
+
+
+def test_preflight_rejects_active_uncommitted_job_for_same_image(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-d.png"
+ _save_record(records, image_name=image_name, subfolder="", created_at="2024-01-02 03:04:05.000")
+ service.image_files.save(Image.new("RGB", (16, 16), "yellow"), image_name=image_name)
+
+ moves = service.plan_batch(last_image_name="", limit=100)
+ service.create_move_job(moves)
+
+ with pytest.raises(ValueError, match="active image move job"):
+ service.plan_batch(last_image_name="", limit=100)
+
+
+def test_create_move_job_rejects_second_active_job_from_stale_plan(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-active-race.png"
+ _save_image(service, records, image_name, "", "2024-01-03 03:04:05.000", "yellow")
+
+ stale_plan_a = service.plan_batch(last_image_name="", limit=100)
+ stale_plan_b = service.plan_batch(last_image_name="", limit=100)
+ service.create_move_job(stale_plan_a)
+
+ with pytest.raises(ValueError, match="active image move job"):
+ service.create_move_job(stale_plan_b)
+
+
+def test_startup_recovery_completes_planned_job_before_any_file_move(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-e.png"
+ _save_image(service, records, image_name, "", "2024-03-04 05:06:07.000", "purple")
+
+ moves = service.plan_batch(last_image_name="", limit=100)
+ job_id = service.create_move_job(moves)
+
+ recovered_once = service.startup_recovery()
+ recovered_twice = service.startup_recovery()
+
+ assert recovered_once.committed == 1
+ assert recovered_once.errors == 0
+ assert recovered_twice.committed == 0
+ assert recovered_twice.errors == 0
+ assert records.get(image_name).image_subfolder == "2024/03/04"
+ assert service.get_job(job_id).state == "committed"
+ assert _job_item_states(service, job_id) == {image_name: "committed"}
+
+
+def test_background_recovery_can_start_when_journal_job_is_active(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-background-recovery.png"
+ _save_image(service, records, image_name, "", "2024-03-05 05:06:07.000", "purple")
+ job_id = service.create_move_job(service.plan_batch(last_image_name="", limit=100))
+
+ status = service.start_background_recovery()
+ assert status.is_running is True
+ assert status.operation == "recovery"
+
+ assert service._future is not None
+ service._future.result(timeout=5)
+
+ assert records.get(image_name).image_subfolder == "2024/03/05"
+ assert service.get_job(job_id).state == "committed"
+
+
+def test_start_runs_recovery_before_normal_operation(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-startup-recovery.png"
+ _save_image(service, records, image_name, "", "2024-03-05 05:06:07.000", "purple")
+ job_id = service.create_move_job(service.plan_batch(last_image_name="", limit=100))
+ service.perform_filesystem_moves(job_id)
+
+ service.start(MagicMock())
+
+ assert records.get(image_name).image_subfolder == "2024/03/05"
+ assert service.get_job(job_id).state == "committed"
+ assert service.is_maintenance_active() is False
+
+
+def test_start_leaves_maintenance_active_when_recovery_remains_incomplete(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-startup-recovery-retry.png"
+ _save_image(service, records, image_name, "", "2024-03-05 05:06:07.000", "purple")
+ service.create_move_job(service.plan_batch(last_image_name="", limit=100))
+
+ with patch.object(service, "complete_partial_filesystem_moves", side_effect=OSError("temporary failure")):
+ service.start(MagicMock())
+
+ assert records.get(image_name).image_subfolder == ""
+ assert service.is_maintenance_active() is True
+
+
+@pytest.mark.parametrize(("pending", "in_progress"), [(1, 0), (0, 1)])
+def test_background_move_rejects_active_queue_work(tmp_path: Path, pending: int, in_progress: int) -> None:
+ service, _records = _service(tmp_path, strategy="date")
+ invoker = MagicMock()
+ invoker.services.session_queue.get_queue_status.return_value = SessionQueueStatus(
+ queue_id=DEFAULT_QUEUE_ID,
+ item_id=None,
+ batch_id=None,
+ session_id=None,
+ pending=pending,
+ in_progress=in_progress,
+ completed=0,
+ failed=0,
+ canceled=0,
+ total=1,
+ )
+ service.start(invoker)
+
+ with pytest.raises(ImageMoveQueueActive, match="queue work is active"):
+ service.start_background_move_all()
+
+
+def test_background_move_is_reserved_before_queue_check(tmp_path: Path) -> None:
+ service, _records = _service(tmp_path, strategy="date")
+ invoker = MagicMock()
+
+ def get_queue_status(queue_id: str) -> SessionQueueStatus:
+ assert queue_id == DEFAULT_QUEUE_ID
+ assert service.is_maintenance_active() is True
+ return SessionQueueStatus(
+ queue_id=DEFAULT_QUEUE_ID,
+ item_id=None,
+ batch_id=None,
+ session_id=None,
+ pending=1,
+ in_progress=0,
+ completed=0,
+ failed=0,
+ canceled=0,
+ total=1,
+ )
+
+ invoker.services.session_queue.get_queue_status.side_effect = get_queue_status
+ service.start(invoker)
+
+ with pytest.raises(ImageMoveQueueActive, match="queue work is active"):
+ service.start_background_move_all()
+
+ assert service.is_maintenance_active() is False
+
+
+def test_maintenance_is_active_while_background_job_or_uncommitted_journal_exists(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-maintenance-active.png"
+ _save_image(service, records, image_name, "", "2024-03-05 05:06:07.000", "purple")
+ service.create_move_job(service.plan_batch(last_image_name="", limit=100))
+
+ assert service.is_maintenance_active() is True
+
+ release_worker = threading.Event()
+
+ def wait_for_release() -> None:
+ release_worker.wait(timeout=5)
+
+ service._start_background_operation("recovery", wait_for_release)
+ try:
+ assert service.is_maintenance_active() is True
+ finally:
+ release_worker.set()
+ assert service._future is not None
+ service._future.result(timeout=5)
+
+
+def test_background_worker_error_is_exposed_in_status(tmp_path: Path) -> None:
+ service, _records = _service(tmp_path, strategy="date")
+ started_worker = threading.Event()
+ release_worker = threading.Event()
+
+ def raise_error() -> None:
+ started_worker.set()
+ release_worker.wait(timeout=5)
+ raise RuntimeError("background failed")
+
+ status = service._start_background_operation("move_all", raise_error)
+ assert started_worker.wait(timeout=5) is True
+ assert status.is_running is True
+
+ assert service._future is not None
+ release_worker.set()
+ service._future.result(timeout=5)
+
+ status = service.get_background_status()
+ assert status.is_running is False
+ assert status.operation is None
+ assert status.last_error == "background failed"
+
+
+def test_stop_waits_for_active_background_job_without_recording_error(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-background-stop.png"
+ _save_image(service, records, image_name, "", "2024-03-05 05:06:07.000", "purple")
+ job_id = service.create_move_job(service.plan_batch(last_image_name="", limit=100))
+ release_worker = threading.Event()
+
+ def wait_for_shutdown() -> None:
+ release_worker.wait(timeout=5)
+
+ service._start_background_operation("recovery", wait_for_shutdown)
+
+ stop_thread = threading.Thread(target=service.stop)
+ stop_thread.start()
+ assert stop_thread.is_alive()
+
+ release_worker.set()
+ stop_thread.join(timeout=5)
+
+ assert not stop_thread.is_alive()
+ assert service.get_job(job_id).error_message is None
+ assert service.get_background_status().last_error is None
+
+
+def test_startup_recovery_completes_partial_multi_image_move(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ _save_image(service, records, "image-f.png", "", "2024-04-05 06:07:08.000", "orange")
+ _save_image(service, records, "image-g.png", "", "2024-04-06 06:07:08.000", "cyan")
+
+ moves = service.plan_batch(last_image_name="", limit=100)
+ job_id = service.create_move_job(moves)
+ first_move = moves[0]
+ first_move.new_path.parent.mkdir(parents=True, exist_ok=True)
+ first_move.new_thumbnail_path.parent.mkdir(parents=True, exist_ok=True)
+ first_move.old_path.replace(first_move.new_path)
+ first_move.old_thumbnail_path.replace(first_move.new_thumbnail_path)
+
+ recovered_once = service.startup_recovery()
+ recovered_twice = service.startup_recovery()
+
+ assert recovered_once.committed == 2
+ assert recovered_once.errors == 0
+ assert recovered_twice.committed == 0
+ assert recovered_twice.errors == 0
+ assert records.get("image-f.png").image_subfolder == "2024/04/05"
+ assert records.get("image-g.png").image_subfolder == "2024/04/06"
+ assert service.get_job(job_id).state == "committed"
+ assert _job_item_states(service, job_id) == {"image-f.png": "committed", "image-g.png": "committed"}
+
+
+def test_startup_recovery_marks_committed_after_db_update_but_before_journal_commit(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-h.png"
+ _save_image(service, records, image_name, "", "2024-05-06 07:08:09.000", "pink")
+ moves = service.plan_batch(last_image_name="", limit=100)
+ job_id = service.create_move_job(moves)
+ service.perform_filesystem_moves(job_id)
+
+ with service._db.transaction() as cursor:
+ cursor.execute(
+ "UPDATE images SET image_subfolder = ? WHERE image_name = ?;",
+ ("2024/05/06", image_name),
+ )
+
+ recovered_once = service.startup_recovery()
+ recovered_twice = service.startup_recovery()
+
+ assert recovered_once.committed == 1
+ assert recovered_once.errors == 0
+ assert recovered_twice.committed == 0
+ assert recovered_twice.errors == 0
+ assert records.get(image_name).image_subfolder == "2024/05/06"
+ assert service.get_job(job_id).state == "committed"
+ assert _job_item_states(service, job_id) == {image_name: "committed"}
+
+
+def test_startup_recovery_marks_error_when_both_old_and_new_full_size_files_exist(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-i.png"
+ _save_image(service, records, image_name, "", "2024-07-08 09:10:11.000", "red")
+ moves = service.plan_batch(last_image_name="", limit=100)
+ job_id = service.create_move_job(moves)
+ move = moves[0]
+ move.new_path.parent.mkdir(parents=True, exist_ok=True)
+ copy2(move.old_path, move.new_path)
+
+ recovered = service.startup_recovery()
+
+ assert recovered.committed == 0
+ assert recovered.errors == 1
+ assert records.get(image_name).image_subfolder == ""
+ assert service.get_job(job_id).state == "error"
+ assert _job_item_states(service, job_id) == {image_name: "error"}
+
+
+def test_startup_recovery_marks_error_when_neither_old_nor_new_full_size_file_exists(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-j.png"
+ _save_image(service, records, image_name, "", "2024-08-09 10:11:12.000", "blue")
+ moves = service.plan_batch(last_image_name="", limit=100)
+ job_id = service.create_move_job(moves)
+ moves[0].old_path.unlink()
+
+ recovered = service.startup_recovery()
+
+ assert recovered.committed == 0
+ assert recovered.errors == 1
+ assert records.get(image_name).image_subfolder == ""
+ assert service.get_job(job_id).state == "error"
+ assert _job_item_states(service, job_id) == {image_name: "error"}
+
+
+def test_startup_recovery_keeps_job_recoverable_after_ordinary_exception(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-k.png"
+ _save_image(service, records, image_name, "", "2024-09-10 11:12:13.000", "white")
+ job_id = service.create_move_job(service.plan_batch(last_image_name="", limit=100))
+
+ with patch.object(service, "complete_partial_filesystem_moves", side_effect=OSError("temporary failure")):
+ recovered = service.startup_recovery()
+
+ assert recovered.committed == 0
+ assert recovered.errors == 1
+ job = service.get_job(job_id)
+ assert job.state == "planned"
+ assert job.error_message == "temporary failure"
+
+ recovered_retry = service.startup_recovery()
+
+ assert recovered_retry.committed == 1
+ assert recovered_retry.errors == 0
+ assert records.get(image_name).image_subfolder == "2024/09/10"
+ assert service.get_job(job_id).state == "committed"
+
+
+def test_startup_recovery_regenerates_thumbnail_when_old_and_new_thumbnails_exist(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-l.png"
+ _save_image(service, records, image_name, "", "2024-10-11 12:13:14.000", "black")
+ moves = service.plan_batch(last_image_name="", limit=100)
+ job_id = service.create_move_job(moves)
+ move = moves[0]
+ move.new_path.parent.mkdir(parents=True, exist_ok=True)
+ move.new_thumbnail_path.parent.mkdir(parents=True, exist_ok=True)
+ move.old_path.replace(move.new_path)
+ copy2(move.old_thumbnail_path, move.new_thumbnail_path)
+
+ recovered = service.startup_recovery()
+
+ assert recovered.committed == 1
+ assert recovered.errors == 0
+ assert records.get(image_name).image_subfolder == "2024/10/11"
+ assert move.new_thumbnail_path.exists()
+ assert not move.old_thumbnail_path.exists()
+ assert service.get_job(job_id).state == "committed"
+
+
+def test_preflight_rejects_duplicate_thumbnail_destination_paths(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ _save_image(service, records, "same-name.jpg", "", "2024-12-13 14:15:16.000", "red")
+ _save_image(service, records, "same-name.png", "", "2024-12-13 14:15:16.000", "green")
+
+ with pytest.raises(ValueError, match="Duplicate destination thumbnail path"):
+ service.plan_batch(last_image_name="", limit=100)
+
+
+def test_successful_filesystem_move_fsyncs_files_and_directories(tmp_path: Path) -> None:
+ service, records = _service(tmp_path, strategy="date")
+ image_name = "image-m.png"
+ _save_image(service, records, image_name, "", "2025-01-02 03:04:05.000", "blue")
+ job_id = service.create_move_job(service.plan_batch(last_image_name="", limit=100))
+
+ with (
+ patch.object(service, "_fsync_file") as fsync_file,
+ patch.object(service, "_fsync_dir") as fsync_dir,
+ ):
+ service.perform_filesystem_moves(job_id)
+
+ moved = service._get_items(job_id)[0]
+ fsync_file.assert_any_call(moved.new_path)
+ fsync_file.assert_any_call(moved.new_thumbnail_path)
+ fsync_dir.assert_any_call(moved.new_path.parent)
+ fsync_dir.assert_any_call(moved.old_path.parent)
+ fsync_dir.assert_any_call(moved.new_thumbnail_path.parent)
+ fsync_dir.assert_any_call(moved.old_thumbnail_path.parent)
+
+
+def test_fsync_dir_ignores_platform_close_failures(tmp_path: Path) -> None:
+ service, _records = _service(tmp_path, strategy="date")
+
+ with (
+ patch("invokeai.app.services.image_moves.image_moves_default.os.open", return_value=123),
+ patch(
+ "invokeai.app.services.image_moves.image_moves_default.os.fsync",
+ side_effect=OSError(9, "Bad file descriptor"),
+ ),
+ patch(
+ "invokeai.app.services.image_moves.image_moves_default.os.close",
+ side_effect=OSError(9, "Bad file descriptor"),
+ ),
+ ):
+ service._fsync_dir(tmp_path)
+
+
+def test_fsync_file_ignores_platform_fsync_failures(tmp_path: Path) -> None:
+ service, _records = _service(tmp_path, strategy="date")
+ path = tmp_path / "image.png"
+ path.write_bytes(b"test")
+
+ with patch(
+ "invokeai.app.services.image_moves.image_moves_default.os.fsync",
+ side_effect=OSError(9, "Bad file descriptor"),
+ ):
+ service._fsync_file(path)
diff --git a/tests/app/services/test_image_move_startup_safety.py b/tests/app/services/test_image_move_startup_safety.py
new file mode 100644
index 00000000000..f7f673322ea
--- /dev/null
+++ b/tests/app/services/test_image_move_startup_safety.py
@@ -0,0 +1,76 @@
+from unittest.mock import MagicMock
+
+from invokeai.app.services.invocation_services import InvocationServices
+from invokeai.app.services.invoker import Invoker
+from invokeai.app.services.session_processor.session_processor_default import DefaultSessionProcessor
+
+
+def _services(**overrides):
+ services = {
+ "board_image_records": object(),
+ "board_images": object(),
+ "board_records": object(),
+ "boards": object(),
+ "bulk_download": object(),
+ "configuration": object(),
+ "events": object(),
+ "images": object(),
+ "image_files": object(),
+ "image_records": object(),
+ "logger": object(),
+ "model_images": object(),
+ "model_manager": object(),
+ "model_relationships": object(),
+ "model_relationship_records": object(),
+ "download_queue": object(),
+ "external_generation": object(),
+ "performance_statistics": object(),
+ "session_queue": object(),
+ "session_processor": object(),
+ "invocation_cache": object(),
+ "names": object(),
+ "urls": object(),
+ "workflow_records": object(),
+ "tensors": object(),
+ "conditioning": object(),
+ "style_preset_records": object(),
+ "style_preset_image_files": object(),
+ "workflow_thumbnails": object(),
+ "client_state_persistence": object(),
+ "users": object(),
+ "image_moves": None,
+ }
+ services.update(overrides)
+ return InvocationServices(**services)
+
+
+def test_image_moves_start_before_session_processor() -> None:
+ started: list[str] = []
+ image_moves = MagicMock()
+ image_moves.start.side_effect = lambda _invoker: started.append("image_moves")
+ session_processor = MagicMock()
+ session_processor.start.side_effect = lambda _invoker: started.append("session_processor")
+
+ Invoker(_services(image_moves=image_moves, session_processor=session_processor))
+
+ assert started == ["image_moves", "session_processor"]
+
+
+def test_session_processor_detects_active_image_move_maintenance() -> None:
+ image_moves = MagicMock()
+ image_moves.is_maintenance_active.return_value = True
+ processor = DefaultSessionProcessor()
+ processor._invoker = MagicMock()
+ processor._invoker.services.image_moves = image_moves
+
+ assert processor._is_image_move_maintenance_active() is True
+
+
+def test_session_processor_allows_processing_without_image_move_maintenance() -> None:
+ image_moves = MagicMock()
+ image_moves.is_maintenance_active.return_value = False
+ processor = DefaultSessionProcessor()
+ processor._invoker = MagicMock()
+ processor._invoker.services.image_moves = image_moves
+
+ assert processor._is_image_move_maintenance_active() is False