Source code for corvix.enrichment.engine
"""Enrichment engine — thin wrapper around :class:`~corvix.pipeline.engine.PipelineEngine`.
Kept for backward compatibility; new code should use
:class:`corvix.pipeline.engine.PipelineEngine` directly.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from corvix.config import EnrichmentConfig
from corvix.domain import Notification
from corvix.enrichment.base import EnrichmentProvider
from corvix.pipeline.base import JsonFetchClient
from corvix.pipeline.engine import PipelineEngine
@dataclass(slots=True)
[docs]
class EnrichmentRunResult:
"""Result of enriching one poll cycle."""
[docs]
contexts_by_notification_key: dict[str, dict[str, object]] = field(default_factory=dict)
[docs]
errors: list[str] = field(default_factory=list)
@property
[docs]
def contexts_by_thread_id(self) -> dict[str, dict[str, object]]:
"""Backward-compatible thread-keyed view of contexts."""
return {key.rpartition(":")[2]: value for key, value in self.contexts_by_notification_key.items()}
@dataclass(slots=True)
[docs]
class EnrichmentEngine:
"""Runs configured enrichment providers over notifications.
.. deprecated::
Prefer :class:`corvix.pipeline.engine.PipelineEngine` directly.
This class is a thin wrapper maintained for backward compatibility.
"""
[docs]
config: EnrichmentConfig
[docs]
providers: list[EnrichmentProvider]
[docs]
def run(
self,
notifications: list[Notification],
client: JsonFetchClient,
clients_by_account: dict[str, JsonFetchClient] | None = None,
) -> EnrichmentRunResult:
"""Run enabled providers for all notifications in one cycle."""
if not self.config.enabled or not self.providers:
return EnrichmentRunResult(
contexts_by_notification_key={
f"{notification.account_id}:{notification.thread_id}": {} for notification in notifications
},
errors=[],
)
engine = PipelineEngine(
providers=list(self.providers),
max_requests_per_cycle=self.config.max_requests_per_cycle,
)
result = engine.run(
notifications=notifications,
client=client,
clients_by_account=clients_by_account,
)
return EnrichmentRunResult(
contexts_by_notification_key=result.contexts_by_notification_key,
errors=result.errors,
)