Source code for corvix.pipeline.provider
"""Unified provider interfaces and shared context for the pipeline."""
from __future__ import annotations
from typing import Protocol, runtime_checkable
from corvix.domain import Notification
from corvix.pipeline.base import JsonFetchClient, RequestContext
from corvix.types import JsonValue
[docs]
class PipelineContext(RequestContext):
"""Per-cycle request budget and URL cache shared across all pipeline providers.
A single :class:`PipelineContext` is created per :meth:`PipelineEngine.run`
call and passed to every provider, so that URL responses cached by a
field-completion provider are visible to subsequent context providers in the
same cycle.
"""
[docs]
def get_json(self, client: JsonFetchClient, url: str, timeout_seconds: float) -> JsonValue:
"""Fetch and cache a JSON payload; raises if the request budget is exhausted."""
if url not in self.url_cache and self.request_count >= self.max_requests_per_cycle:
msg = "Pipeline request budget exhausted."
raise RuntimeError(msg)
return super().get_json(client=client, url=url, timeout_seconds=timeout_seconds)
@runtime_checkable
[docs]
class FieldProvider(Protocol):
"""Provider that completes missing canonical fields on a :class:`~corvix.domain.Notification`.
Field providers mutate the notification in-place by returning a (possibly
replaced) :class:`~corvix.domain.Notification` with missing required fields
filled in (e.g. ``subject_url``, ``web_url``).
"""
[docs]
def hydrate(
self,
_notification: Notification,
_client: JsonFetchClient,
_ctx: PipelineContext,
/,
) -> Notification:
"""Return a notification with any missing required fields filled in."""
...
@runtime_checkable
[docs]
class ContextProvider(Protocol):
"""Provider that attaches optional contextual data to a :class:`~corvix.domain.Notification`.
Context providers return a mapping that is merged under the provider's
``name`` namespace in :attr:`~corvix.domain.NotificationRecord.context`.
"""
[docs]
def enrich(
self,
_notification: Notification,
_client: JsonFetchClient,
_ctx: PipelineContext,
/,
) -> dict[str, object]:
"""Return a mapping of context data to attach under this provider's namespace."""
...