Source code for corvix.enrichment.base

"""Provider interfaces and shared context for enrichment."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Protocol

from corvix.domain import Notification
from corvix.types import JsonValue


[docs] class JsonFetchClient(Protocol): """Client capability required by enrichment providers."""
[docs] def fetch_json_url(self, url: str, timeout_seconds: float = 30.0) -> JsonValue: """Fetch JSON from a fully-qualified API URL.""" ...
[docs] class EnrichmentProvider(Protocol): """Protocol implemented by enrichment providers."""
[docs] name: str
[docs] def enrich( self, notification: Notification, client: JsonFetchClient, ctx: EnrichmentContext, ) -> dict[str, object]: ...
@dataclass(slots=True)
[docs] class EnrichmentContext: """Per-cycle provider context with request budget and URL cache."""
[docs] max_requests_per_cycle: int
[docs] url_cache: dict[str, JsonValue] = field(default_factory=dict)
[docs] request_count: int = 0
[docs] def get_json( self, client: JsonFetchClient, url: str, timeout_seconds: float, ) -> JsonValue: """Fetch and cache a JSON payload for this cycle.""" if url in self.url_cache: return self.url_cache[url] if self.request_count >= self.max_requests_per_cycle: msg = "Enrichment request budget exhausted." raise RuntimeError(msg) payload = client.fetch_json_url(url=url, timeout_seconds=timeout_seconds) self.request_count += 1 self.url_cache[url] = payload return payload