corvix.storage

Local persistence for polled notifications.

The JSON cache uses fcntl advisory locks and is therefore supported on Linux/POSIX platforms only.

Attributes

Classes

StorageBackend

Protocol for notification persistence backends.

NotificationCache

Read/write notification snapshots to a JSON file.

PostgresStorage

PostgreSQL-backed notification persistence implementing StorageBackend.

Functions

_fsync_directory(→ None)

Best-effort fsync of a directory after atomic replacement.

_coerce_context(→ dict[str, object])

_require_str(→ str)

_optional_str(→ str | None)

_require_bool(→ bool)

_require_float(→ float)

_require_datetime(→ datetime.datetime)

_coerce_str_list(→ list[str])

Module Contents

corvix.storage._NOTIFICATION_RECORD_COLUMNS = 18[source][source]
corvix.storage._DISMISSED_ROW_COLUMNS = 2[source][source]
class corvix.storage.StorageBackend[source][source]

Bases: Protocol

Protocol for notification persistence backends.

save_records(user_id: corvix.types.UserId, records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime) None[source][source]
load_records(user_id: corvix.types.UserId) tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]][source][source]
dismiss_record(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') None[source][source]
mark_record_read(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') None[source][source]
get_dismissed_notification_keys(user_id: corvix.types.UserId) list[str][source][source]
get_dismissed_thread_ids(user_id: corvix.types.UserId) list[str][source][source]
class corvix.storage.NotificationCache[source][source]

Read/write notification snapshots to a JSON file.

Implements StorageBackend using a single-user JSON file. The user_id parameter in protocol methods is ignored — the file is shared.

path: pathlib.Path[source][source]
save(records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime | None = None) None[source][source]

Persist records to disk.

load() tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]][source][source]

Load snapshot from disk if available.

_load_unlocked() tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]][source][source]

Load snapshot from disk without acquiring a file lock.

_save_unlocked(records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime) None[source][source]
_exclusive_lock() collections.abc.Iterator[None][source][source]
save_records(user_id: corvix.types.UserId, records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime) None[source][source]

Save records; user_id ignored in single-user mode.

load_records(user_id: corvix.types.UserId) tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]][source][source]

Load records; user_id ignored in single-user mode.

dismiss_record(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') None[source][source]

Mark a record as dismissed by account/thread id in the JSON file.

mark_record_read(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') None[source][source]

Mark a record as read by account/thread id in the JSON file.

get_dismissed_notification_keys(user_id: corvix.types.UserId) list[str][source][source]

Return account-scoped keys of dismissed records.

get_dismissed_thread_ids(user_id: corvix.types.UserId) list[str][source][source]

Backward-compatible API returning only thread IDs.

class corvix.storage.PostgresStorage[source][source]

PostgreSQL-backed notification persistence implementing StorageBackend.

Uses psycopg (sync) so it is safe to use from CLI commands and the synchronous Litestar route handlers (sync_to_thread=False is not used with this backend — callers should run in a thread pool if needed).

connection_string: str[source][source]
_connect() psycopg.Connection[tuple[object, Ellipsis]][source][source]
save_records(user_id: corvix.types.UserId, records: list[corvix.domain.NotificationRecord], generated_at: datetime.datetime) None[source][source]

Upsert records for user_id. Preserves dismissed flag on conflict.

load_records(user_id: corvix.types.UserId) tuple[datetime.datetime | None, list[corvix.domain.NotificationRecord]][source][source]

Load all records for user_id ordered by snapshot_at descending.

dismiss_record(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') None[source][source]

Set dismissed=true for a specific account/thread id.

mark_record_read(user_id: corvix.types.UserId, thread_id: str, account_id: str = 'primary') None[source][source]

Set unread=false for a specific account/thread id.

get_dismissed_notification_keys(user_id: corvix.types.UserId) list[str][source][source]

Return account-scoped keys where dismissed=true for user_id.

get_dismissed_thread_ids(user_id: corvix.types.UserId) list[str][source][source]

Backward-compatible API returning only thread IDs.

corvix.storage._fsync_directory(path: pathlib.Path) None[source][source]

Best-effort fsync of a directory after atomic replacement.

corvix.storage._coerce_context(value: object) dict[str, object][source][source]
corvix.storage._require_str(value: object, field: str) str[source][source]
corvix.storage._optional_str(value: object, field: str) str | None[source][source]
corvix.storage._require_bool(value: object, field: str) bool[source][source]
corvix.storage._require_float(value: object, field: str) float[source][source]
corvix.storage._require_datetime(value: object, field: str) datetime.datetime[source][source]
corvix.storage._coerce_str_list(value: object, field: str) list[str][source][source]