Source code for corvix.enrichment.engine

"""Enrichment engine for attaching contextual data to notifications."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import TypeIs

from corvix.config import EnrichmentConfig
from corvix.domain import Notification
from corvix.enrichment.base import EnrichmentContext, EnrichmentProvider, JsonFetchClient


[docs] def _is_str_object_map(value: object) -> TypeIs[dict[str, object]]: return isinstance(value, dict) and all(isinstance(key, str) for key in value)
@dataclass(slots=True)
[docs] class EnrichmentRunResult: """Result of enriching one poll cycle."""
[docs] contexts_by_notification_key: dict[str, dict[str, object]] = field(default_factory=dict)
[docs] errors: list[str] = field(default_factory=list)
@property
[docs] def contexts_by_thread_id(self) -> dict[str, dict[str, object]]: """Backward-compatible thread-keyed view of contexts.""" output: dict[str, dict[str, object]] = {} for key, value in self.contexts_by_notification_key.items(): _, _, thread_id = key.partition(":") output[thread_id] = value return output
@dataclass(slots=True)
[docs] class EnrichmentEngine: """Runs configured enrichment providers over notifications."""
[docs] config: EnrichmentConfig
[docs] providers: list[EnrichmentProvider]
[docs] def run( self, notifications: list[Notification], client: JsonFetchClient, clients_by_account: dict[str, JsonFetchClient] | None = None, ) -> EnrichmentRunResult: """Run enabled providers for all notifications in one cycle.""" if not self.config.enabled or not self.providers: return EnrichmentRunResult( contexts_by_notification_key={ f"{notification.account_id}:{notification.thread_id}": {} for notification in notifications }, errors=[], ) context = EnrichmentContext(max_requests_per_cycle=self.config.max_requests_per_cycle) contexts_by_notification_key: dict[str, dict[str, object]] = { f"{notification.account_id}:{notification.thread_id}": {} for notification in notifications } errors: list[str] = [] for notification in notifications: key = f"{notification.account_id}:{notification.thread_id}" record_context = contexts_by_notification_key[key] notification_client = ( clients_by_account.get(notification.account_id, client) if clients_by_account else client ) for provider in self.providers: try: payload = provider.enrich(notification=notification, client=notification_client, ctx=context) except Exception as error: # pragma: no cover - defensive fail-open contract errors.append(f"provider={provider.name} thread={notification.thread_id}: {error}") continue if payload: _set_nested_namespace(record_context, provider.name, payload) return EnrichmentRunResult(contexts_by_notification_key=contexts_by_notification_key, errors=errors)
[docs] def _set_nested_namespace(root: dict[str, object], namespace: str, payload: dict[str, object]) -> None: """Merge payload under a dot-delimited namespace.""" segments = [segment for segment in namespace.split(".") if segment] if not segments: return node: dict[str, object] = root for segment in segments[:-1]: raw_child = node.get(segment) if not _is_str_object_map(raw_child): child = {} node[segment] = child node = child continue node = raw_child leaf = segments[-1] existing = node.get(leaf) if _is_str_object_map(existing): existing.update(payload) return node[leaf] = dict(payload)