corvix.storage ============== .. py:module:: corvix.storage .. autoapi-nested-parse:: Local persistence for polled notifications. The JSON cache uses ``fcntl`` advisory locks and is therefore supported on Linux/POSIX platforms only. Attributes ---------- .. autoapisummary:: corvix.storage._NOTIFICATION_RECORD_COLUMNS corvix.storage._DISMISSED_ROW_COLUMNS Classes ------- .. autoapisummary:: corvix.storage.StorageBackend corvix.storage.NotificationCache corvix.storage.PostgresStorage Functions --------- .. autoapisummary:: corvix.storage._fsync_directory corvix.storage._coerce_context corvix.storage._require_str corvix.storage._optional_str corvix.storage._require_bool corvix.storage._require_float corvix.storage._require_datetime corvix.storage._coerce_str_list Module Contents --------------- .. py:data:: _NOTIFICATION_RECORD_COLUMNS :value: 18 .. py:data:: _DISMISSED_ROW_COLUMNS :value: 2 .. py:class:: StorageBackend Bases: :py:obj:`Protocol` Protocol for notification persistence backends. .. py:method:: save_records(user_id: corvix.types.UserId, records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime) -> None .. py:method:: load_records(user_id: corvix.types.UserId) -> tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]] .. py:method:: dismiss_record(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') -> None .. py:method:: mark_record_read(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') -> None .. py:method:: get_dismissed_notification_keys(user_id: corvix.types.UserId) -> list[str] .. py:method:: get_dismissed_thread_ids(user_id: corvix.types.UserId) -> list[str] .. py:class:: NotificationCache Read/write notification snapshots to a JSON file. Implements StorageBackend using a single-user JSON file. The user_id parameter in protocol methods is ignored — the file is shared. .. py:attribute:: path :type: pathlib.Path .. py:method:: save(records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime | None = None) -> None Persist records to disk. .. py:method:: load() -> tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]] Load snapshot from disk if available. .. py:method:: _load_unlocked() -> tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]] Load snapshot from disk without acquiring a file lock. .. py:method:: _save_unlocked(records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime) -> None .. py:method:: _exclusive_lock() -> collections.abc.Iterator[None] .. py:method:: save_records(user_id: corvix.types.UserId, records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime) -> None Save records; user_id ignored in single-user mode. .. py:method:: load_records(user_id: corvix.types.UserId) -> tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]] Load records; user_id ignored in single-user mode. .. py:method:: dismiss_record(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') -> None Mark a record as dismissed by account/thread id in the JSON file. .. py:method:: mark_record_read(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') -> None Mark a record as read by account/thread id in the JSON file. .. py:method:: get_dismissed_notification_keys(user_id: corvix.types.UserId) -> list[str] Return account-scoped keys of dismissed records. .. py:method:: get_dismissed_thread_ids(user_id: corvix.types.UserId) -> list[str] Backward-compatible API returning only thread IDs. .. py:class:: PostgresStorage PostgreSQL-backed notification persistence implementing StorageBackend. Uses psycopg (sync) so it is safe to use from CLI commands and the synchronous Litestar route handlers (sync_to_thread=False is not used with this backend — callers should run in a thread pool if needed). .. py:attribute:: connection_string :type: str .. py:method:: _connect() -> psycopg.Connection[tuple[object, Ellipsis]] .. py:method:: save_records(user_id: corvix.types.UserId, records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime) -> None Upsert records for user_id. Preserves dismissed flag on conflict. .. py:method:: load_records(user_id: corvix.types.UserId) -> tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]] Load all records for user_id ordered by snapshot_at descending. .. py:method:: dismiss_record(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') -> None Set dismissed=true for a specific account/thread id. .. py:method:: mark_record_read(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') -> None Set unread=false for a specific account/thread id. .. py:method:: get_dismissed_notification_keys(user_id: corvix.types.UserId) -> list[str] Return account-scoped keys where dismissed=true for user_id. .. py:method:: get_dismissed_thread_ids(user_id: corvix.types.UserId) -> list[str] Backward-compatible API returning only thread IDs. .. py:function:: _fsync_directory(path: pathlib.Path) -> None Best-effort fsync of a directory after atomic replacement. .. py:function:: _coerce_context(value: object) -> dict[str, object] .. py:function:: _require_str(value: object, field: str) -> str .. py:function:: _optional_str(value: object, field: str) -> str | None .. py:function:: _require_bool(value: object, field: str) -> bool .. py:function:: _require_float(value: object, field: str) -> float .. py:function:: _require_datetime(value: object, field: str) -> datetime.datetime .. py:function:: _coerce_str_list(value: object, field: str) -> list[str]