Source code for corvix.pipeline.base
"""Shared request primitives for hydration and enrichment pipelines."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol
from corvix.types import JsonValue
[docs]
class JsonFetchClient(Protocol):
"""Client capability required by data-enrichment pipelines."""
"""Trusted base URL of the upstream API (e.g. ``https://api.github.com``).
Providers use this to construct upstream API URLs from trusted config rather
than from data received in API responses, which prevents SSRF taint flows.
"""
[docs]
def fetch_json_url(self, url: str, timeout_seconds: float = 30.0) -> JsonValue:
"""Fetch JSON from a fully-qualified API URL."""
...
@dataclass(slots=True)
[docs]
class RequestContext:
"""Per-cycle request budget and URL cache."""
[docs]
max_requests_per_cycle: int
[docs]
url_cache: dict[str, JsonValue] = field(default_factory=dict)
[docs]
request_count: int = 0
[docs]
def get_json(self, client: JsonFetchClient, url: str, timeout_seconds: float) -> JsonValue:
"""Fetch and cache a JSON payload for this cycle."""
if url in self.url_cache:
return self.url_cache[url]
if self.request_count >= self.max_requests_per_cycle:
msg = "Request budget exhausted."
raise RuntimeError(msg)
payload = client.fetch_json_url(url=url, timeout_seconds=timeout_seconds)
self.request_count += 1
self.url_cache[url] = payload
return payload