corvix.storage¶
Local persistence for polled notifications.
The JSON cache uses fcntl advisory locks and is therefore supported on
Linux/POSIX platforms only.
Attributes¶
Exceptions¶
Raised when required storage configuration (a database URL) is missing. |
Classes¶
Protocol for notification persistence backends. |
|
Read/write notification snapshots to a JSON file. |
|
PostgreSQL-backed notification persistence implementing StorageBackend. |
Functions¶
|
Return the configured PostgreSQL storage backend. |
|
Best-effort fsync of a directory after atomic replacement. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Parse the account_errors JSONB value into a tuple of AccountError. |
|
|
Module Contents¶
- exception corvix.storage.StorageConfigError[source][source]¶
Bases:
RuntimeErrorRaised when required storage configuration (a database URL) is missing.
- class corvix.storage.StorageBackend[source][source]¶
Bases:
ProtocolProtocol for notification persistence backends.
- save_records(records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime) None[source][source]¶
- load_records() tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]][source][source]¶
- save_status(status: corvix.domain.PollerStatus) None[source][source]¶
- load_status() corvix.domain.PollerStatus[source][source]¶
- __enter__() StorageBackend[source][source]¶
- corvix.storage.create_storage(config: corvix.config.AppConfig) PostgresStorage[source][source]¶
Return the configured PostgreSQL storage backend.
PostgreSQL is required in all deployments; the JSON cache is no longer used as the shared store between the poller and the web service. Raises
StorageConfigErrorwhen no database URL is configured.
- class corvix.storage.NotificationCache[source][source]¶
Read/write notification snapshots to a JSON file.
Implements StorageBackend using a single-user JSON file. The user_id parameter in protocol methods is ignored — the file is shared.
- save(records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime | None = None, *, poller_status: corvix.domain.PollerStatus | None = None) None[source][source]¶
Persist records to disk.
- load() tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]][source][source]¶
Load snapshot from disk if available.
- save_status(status: corvix.domain.PollerStatus) None[source][source]¶
Persist only the poller status without touching notifications.
- load_status() corvix.domain.PollerStatus[source][source]¶
Load the poller status from the cache file.
- _load_status_unlocked() corvix.domain.PollerStatus[source][source]¶
- _load_unlocked() tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]][source][source]¶
Load snapshot from disk without acquiring a file lock.
- _save_unlocked(records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime, *, poller_status: corvix.domain.PollerStatus | None = None) None[source][source]¶
Acquire a shared read lock when possible; fall through unlocked otherwise.
When the lock file cannot be created (read-only filesystem, missing permissions), no concurrent writer can be holding the exclusive lock from this process either, so reads proceed without it. Torn reads remain prevented under normal operation where both reader and writer can open the lock file.
- save_records(records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime) None[source][source]¶
Save records to the JSON cache.
- load_records() tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]][source][source]¶
Load records from the JSON cache.
- dismiss_record(thread_id: str, account_id: str = 'primary') None[source][source]¶
Mark a record as dismissed by account/thread id in the JSON file.
- mark_record_read(thread_id: str, account_id: str = 'primary') None[source][source]¶
Mark a record as read by account/thread id in the JSON file.
- get_dismissed_notification_keys() list[str][source][source]¶
Return account-scoped keys of dismissed records.
- __enter__() NotificationCache[source][source]¶
- class corvix.storage.PostgresStorage[source][source]¶
PostgreSQL-backed notification persistence implementing StorageBackend.
Uses psycopg (sync) so it is safe to use from CLI commands and the synchronous Litestar route handlers (sync_to_thread=False is not used with this backend — callers should run in a thread pool if needed).
A
psycopg_pool.ConnectionPoolis created at construction time so that TCP connections are reused across method calls rather than being opened and torn down per operation. Callclose()when the storage is no longer needed, or use it as a context manager:with PostgresStorage(connection_string=url) as storage: storage.save_records(...)
- _pool: psycopg_pool.ConnectionPool[psycopg.Connection[tuple[object, Ellipsis]]] | None = None[source][source]¶
- __enter__() PostgresStorage[source][source]¶
- _connect() contextlib.AbstractContextManager[psycopg.Connection[tuple[object, Ellipsis]]][source][source]¶
Return a pooled connection context-manager.
Usage is identical to the previous
psycopg.connect()call:with self._connect() as conn: ...
- save_records(records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime) None[source][source]¶
Upsert records. Preserves dismissed flag on conflict.
- load_records() tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]][source][source]¶
Load all records ordered by snapshot_at descending.
- save_status(status: corvix.domain.PollerStatus) None[source][source]¶
Upsert the poller status row.
- load_status() corvix.domain.PollerStatus[source][source]¶
Load the poller status, defaulting to
unknown.
- dismiss_record(thread_id: str, account_id: str = 'primary') None[source][source]¶
Set dismissed=true for a specific account/thread id.
- mark_record_read(thread_id: str, account_id: str = 'primary') None[source][source]¶
Set unread=false for a specific account/thread id.
- corvix.storage._fsync_directory(path: pathlib.Path) None[source][source]¶
Best-effort fsync of a directory after atomic replacement.
- corvix.storage._parse_account_errors(value: object) tuple[corvix.domain.AccountError, Ellipsis][source][source]¶
Parse the account_errors JSONB value into a tuple of AccountError.