Source code for betty.cache.file

"""
Provide caching that persists cache items to files.
"""

from __future__ import annotations

import asyncio
import shutil
from abc import abstractmethod
from contextlib import suppress
from os import utime
from pickle import dumps, loads
from typing import Generic, Self, TYPE_CHECKING, TypeVar, final

import aiofiles
from aiofiles.ospath import getmtime
from typing_extensions import override

from betty.cache import CacheItem
from betty.cache._base import _CommonCacheBase
from betty.hashid import hashid

if TYPE_CHECKING:
    from betty.locale import Localizer
    from pathlib import Path
    from collections.abc import Sequence


_CacheItemValueCoT = TypeVar("_CacheItemValueCoT", covariant=True)
_CacheItemValueContraT = TypeVar("_CacheItemValueContraT", contravariant=True)


class _FileCacheItem(CacheItem[_CacheItemValueCoT], Generic[_CacheItemValueCoT]):
    __slots__ = "_modified", "_path"

    def __init__(
        self,
        modified: int | float,
        path: Path,
    ):
        self._modified = modified
        self._path = path

    @override
    @property
    def modified(self) -> int | float:
        return self._modified

    @override
    async def value(self) -> _CacheItemValueCoT:
        async with aiofiles.open(self._path, "rb") as f:
            value_bytes = await f.read()
        return await self._load_value(value_bytes)

    @abstractmethod
    async def _load_value(self, value_bytes: bytes) -> _CacheItemValueCoT:
        pass


@final
class _PickledFileCacheItem(
    _FileCacheItem[_CacheItemValueCoT], Generic[_CacheItemValueCoT]
):
    @override
    async def _load_value(self, value_bytes: bytes) -> _CacheItemValueCoT:
        return loads(value_bytes)  # type: ignore[no-any-return]


@final
class _BinaryFileCacheItem(_FileCacheItem[bytes]):
    @override
    async def _load_value(self, value_bytes: bytes) -> bytes:
        return value_bytes


class _FileCache(
    _CommonCacheBase[_CacheItemValueContraT], Generic[_CacheItemValueContraT]
):
    """
    Provide a cache that persists cache items on a file system.
    """

    _cache_item_cls: type[_FileCacheItem[_CacheItemValueContraT]]

    def __init__(
        self,
        localizer: Localizer,
        cache_directory_path: Path,
        *,
        scopes: Sequence[str] | None = None,
    ):
        super().__init__(localizer, scopes=scopes)
        self._root_path = cache_directory_path

    @override
    def _with_scope(self, scope: str) -> Self:
        return type(self)(
            self._localizer, self._root_path, scopes=(*self._scopes, scope)
        )

    def _cache_item_file_path(self, cache_item_id: str) -> Path:
        return self._path / hashid(cache_item_id)

    @abstractmethod
    def _dump_value(self, value: _CacheItemValueContraT) -> bytes:
        pass

    @override
    async def _get(
        self, cache_item_id: str
    ) -> CacheItem[_CacheItemValueContraT] | None:
        try:
            cache_item_file_path = self._cache_item_file_path(cache_item_id)
            return self._cache_item_cls(
                await getmtime(cache_item_file_path),
                cache_item_file_path,
            )
        except OSError:
            return None

    @override
    async def _set(
        self,
        cache_item_id: str,
        value: _CacheItemValueContraT,
        *,
        modified: int | float | None = None,
    ) -> None:
        value = self._dump_value(value)
        cache_item_file_path = self._cache_item_file_path(cache_item_id)
        try:
            await self._write(cache_item_file_path, value, modified)
        except FileNotFoundError:
            await aiofiles.os.makedirs(cache_item_file_path.parent, exist_ok=True)
            await self._write(cache_item_file_path, value, modified)

    async def _write(
        self,
        cache_item_file_path: Path,
        value: bytes,
        modified: int | float | None = None,
    ) -> None:
        async with aiofiles.open(cache_item_file_path, "wb") as f:
            await f.write(value)
        if modified is not None:
            await asyncio.to_thread(utime, cache_item_file_path, (modified, modified))

    @override
    async def _delete(self, cache_item_id: str) -> None:
        with suppress(FileNotFoundError):
            await aiofiles.os.remove(self._cache_item_file_path(cache_item_id))

    @override
    async def _clear(self) -> None:
        with suppress(FileNotFoundError):
            await asyncio.to_thread(shutil.rmtree, self._path)

    @property
    def _path(self) -> Path:
        return self._root_path.joinpath(*self._scopes)


[docs] @final class PickledFileCache( _FileCache[_CacheItemValueContraT], Generic[_CacheItemValueContraT] ): """ Provide a cache that pickles values and persists them to files. """ _cache_item_cls = _PickledFileCacheItem @override def _dump_value(self, value: _CacheItemValueContraT) -> bytes: return dumps(value)
[docs] @final class BinaryFileCache(_FileCache[bytes]): """ Provide a cache that persists bytes values to binary files. """ _cache_item_cls = _BinaryFileCacheItem @override def _dump_value(self, value: bytes) -> bytes: return value @property def path(self) -> Path: """ The path to the cache's root directory. """ return self._path
[docs] def cache_item_file_path(self, cache_item_id: str) -> Path: """ Get the file path for a cache item with the given ID. The cache item itself may or may not exist. """ return self._cache_item_file_path(cache_item_id)