corvix.services

Application services orchestrating ingestion, actions, and rendering.

Attributes

Classes

NotificationsClient

Client capabilities required by the poll cycle orchestration.

PollingSummary

Result of one polling cycle.

PollCycleInput

All inputs required by run_poll_cycle().

Functions

run_poll_cycle(→ PollingSummary)

Fetch notifications, score/evaluate, optionally execute actions, and persist cache.

_run_poll_cycle(→ PollingSummary)

_resolve_active_clients(→ tuple[NotificationsClient, ...)

_load_previous_records(...)

_fetch_notifications(...)

_process_notifications(...)

_dispatch_notification_events(...)

_handle_cycle_error(→ None)

Record a poll-cycle failure and persist the error status.

run_watch_loop(→ list[PollingSummary])

Run polling loop suitable for local daemon usage.

render_cached_dashboards(...)

Load persisted records and render dashboards.

_select_dashboards(→ list[corvix.config.DashboardSpec])

_build_pipeline_providers(...)

Return the ordered list of pipeline providers for one poll cycle.

Module Contents

corvix.services.logger[source][source]
class corvix.services.NotificationsClient[source][source]

Bases: corvix.actions.MarkReadGateway, corvix.pipeline.base.JsonFetchClient, Protocol

Client capabilities required by the poll cycle orchestration.

account_id: str[source][source]
account_label: str[source][source]
fetch_notifications(polling: corvix.config.PollingConfig) list[corvix.domain.Notification][source][source]

Fetch notifications with configured polling options.

class corvix.services.PollingSummary[source][source]

Result of one polling cycle.

fetched: int[source][source]
excluded: int[source][source]
actions_taken: int[source][source]
errors: list[str] = [][source][source]
dispatch: corvix.notifications.models.DispatchResult | None = None[source][source]
class corvix.services.PollCycleInput[source][source]

All inputs required by run_poll_cycle().

Attributes:

config: Application configuration. clients: GitHub notifications clients. cache: Persistent notification cache. apply_actions: If False, actions are recorded as dry-run only. now: Override the current time (useful for testing). notification_targets: Optional notification dispatch targets.

config: corvix.config.AppConfig[source][source]
cache: corvix.storage.StorageBackend[source][source]
clients: tuple[NotificationsClient, Ellipsis] = ()[source][source]
client: NotificationsClient | None = None[source][source]
apply_actions: bool = False[source][source]
now: datetime.datetime | None = None[source][source]
notification_targets: list[corvix.notifications.targets.base.NotificationTarget] | None = None[source][source]
corvix.services.run_poll_cycle(cycle_input: PollCycleInput) PollingSummary[source][source]

Fetch notifications, score/evaluate, optionally execute actions, and persist cache.

If cycle_input.notification_targets is provided (and cycle_input.config.notifications.enabled is True) the poll cycle will detect newly-arrived unread notifications and fan-out delivery to each target after saving the snapshot.

Wraps the cycle in a trace span and records poll-cycle metrics (count, duration, notifications fetched, actions taken, errors).

corvix.services._run_poll_cycle(cycle_input: PollCycleInput) PollingSummary[source][source]
corvix.services._resolve_active_clients(cycle_input: PollCycleInput) tuple[NotificationsClient, Ellipsis][source][source]
corvix.services._load_previous_records(cycle_input: PollCycleInput) list[corvix.domain.NotificationRecord][source][source]
corvix.services._fetch_notifications(polling: corvix.config.PollingConfig, active_clients: tuple[NotificationsClient, Ellipsis]) tuple[list[corvix.domain.Notification], dict[str, NotificationsClient], list[corvix.domain.AccountError]][source][source]
corvix.services._process_notifications(notifications: list[corvix.domain.Notification], cycle_input: PollCycleInput, current_time: datetime.datetime, clients_by_account: dict[str, NotificationsClient], contexts_by_notification_key: dict[str, dict[str, object]]) tuple[list[corvix.domain.NotificationRecord], int, int, list[str]][source][source]
corvix.services._dispatch_notification_events(cycle_input: PollCycleInput, previous_records: list[corvix.domain.NotificationRecord], records: list[corvix.domain.NotificationRecord]) corvix.notifications.models.DispatchResult | None[source][source]
corvix.services._handle_cycle_error(iteration: int, cache: corvix.storage.StorageBackend, runs: list[PollingSummary]) None[source][source]

Record a poll-cycle failure and persist the error status.

corvix.services.run_watch_loop(cycle_input: PollCycleInput, iterations: int | None = None) list[PollingSummary][source][source]

Run polling loop suitable for local daemon usage.

corvix.services.render_cached_dashboards(config: corvix.config.AppConfig, cache: corvix.storage.StorageBackend, console: rich.console.Console, dashboard_name: str | None = None) list[corvix.presentation.DashboardRenderResult][source][source]

Load persisted records and render dashboards.

corvix.services._select_dashboards(config: corvix.config.AppConfig, dashboard_name: str | None) list[corvix.config.DashboardSpec][source][source]
corvix.services._build_pipeline_providers(config: corvix.config.AppConfig) list[corvix.pipeline.provider.FieldProvider | corvix.pipeline.provider.ContextProvider][source][source]

Return the ordered list of pipeline providers for one poll cycle.

Field-completion providers run first so that context providers see fully hydrated notifications (e.g. web_url is already populated when enrichment starts). Context providers are only included when enrichment is enabled in config.