Source code for corvix.enrichment.base
"""Provider interfaces and shared context for enrichment."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol
from corvix.domain import Notification
from corvix.types import JsonValue
[docs]
class JsonFetchClient(Protocol):
"""Client capability required by enrichment providers."""
[docs]
def fetch_json_url(self, url: str, timeout_seconds: float = 30.0) -> JsonValue:
"""Fetch JSON from a fully-qualified API URL."""
...
[docs]
class EnrichmentProvider(Protocol):
"""Protocol implemented by enrichment providers."""
[docs]
def enrich(
self,
notification: Notification,
client: JsonFetchClient,
ctx: EnrichmentContext,
) -> dict[str, object]: ...
@dataclass(slots=True)
[docs]
class EnrichmentContext:
"""Per-cycle provider context with request budget and URL cache."""
[docs]
max_requests_per_cycle: int
[docs]
url_cache: dict[str, JsonValue] = field(default_factory=dict)
[docs]
request_count: int = 0
[docs]
def get_json(
self,
client: JsonFetchClient,
url: str,
timeout_seconds: float,
) -> JsonValue:
"""Fetch and cache a JSON payload for this cycle."""
if url in self.url_cache:
return self.url_cache[url]
if self.request_count >= self.max_requests_per_cycle:
msg = "Enrichment request budget exhausted."
raise RuntimeError(msg)
payload = client.fetch_json_url(url=url, timeout_seconds=timeout_seconds)
self.request_count += 1
self.url_cache[url] = payload
return payload