import os
import threading
import pathlib
import contextlib
import logging
from typing import Optional, Protocol, runtime_checkable, cast
from collections.abc import Callable, Generator
from dataclasses import dataclass, field
from watchdog.events import LoggingEventHandler, FileSystemEvent, DirModifiedEvent, FileModifiedEvent
from watchdog.observers import ObserverType, Observer
from watchdog.observers.api import BaseObserver
logger = logging.getLogger(__name__)
type Parser[T, S] = Callable[[S], T]
type ModifiedEvent = DirModifiedEvent | FileModifiedEvent
[docs]
@runtime_checkable
class Proxy[T](Protocol):
__subject__: T
[docs]
class BaseProxy[T](Proxy[T]):
[docs]
def __getattr__(self, name):
return getattr(self.__subject__, name)
[docs]
def __getitem__(self, key):
assert hasattr(self.__subject__, "__getitem__")
return self.__subject__[key]
[docs]
@classmethod
def create[**P](cls, *args: P.args, **kwargs: P.kwargs) -> T:
return cast(T, cls(*args, **kwargs))
[docs]
@dataclass
class FileProxy[T](BaseProxy[T], LoggingEventHandler):
path: str
parser: Parser[T, str]
mutex: threading.Lock = field(default_factory=threading.Lock)
__subject__: T = field(init=False)
[docs]
def __post_init__(self):
LoggingEventHandler.__init__(self, logger=logging.getLogger(type(self).__name__))
self.update()
[docs]
def __hash__(self):
return hash((self.path, self.parser))
@property
def event_filter(self) -> list[type[FileSystemEvent]]:
return [FileModifiedEvent]
@property
def content(self) -> str:
return pathlib.Path(self.path).read_text()
[docs]
def update(self):
with self.mutex:
self.__subject__ = self.parser(self.content)
[docs]
def on_modified(self, event: ModifiedEvent):
super().on_modified(event)
match event:
case FileModifiedEvent() if os.fsdecode(event.src_path) == os.path.realpath(self.path):
self.update()
case _: # pragma: not covered
pass
[docs]
@dataclass
class LiteralProxy[T, S](BaseProxy[T]):
content: S
parser: Parser[T, S] = lambda content: content # type: ignore[assignment,return-value]
__subject__: T = field(init=False)
[docs]
def __post_init__(self):
self.__subject__ = self.parser(self.content)
[docs]
@contextlib.contextmanager
def observer[T](
*proxies: Proxy[T],
cls: ObserverType = Observer,
) -> Generator[BaseObserver]:
observer = cls()
for proxy in proxies:
if isinstance(proxy, FileProxy):
observer.schedule(proxy, os.path.dirname(proxy.path), recursive=True, event_filter=proxy.event_filter)
try:
observer.start()
yield observer
finally:
observer.stop()
observer.join()
[docs]
def create_proxy[T](content: Optional[str], path: os.PathLike, parser: Parser[T, str], default: str) -> T:
if content is not None:
return LiteralProxy.create(content, parser=parser)
if (file := pathlib.Path(path)).is_file():
return FileProxy.create(f"{file}", parser=parser)
return LiteralProxy.create(default, parser=parser)