Source code for corvix.notifications.dispatcher
"""Fan-out dispatcher: delivers notification events to all enabled targets."""
from __future__ import annotations
import logging
from corvix.notifications.models import DeliveryResult, DispatchResult, NotificationEvent
from corvix.notifications.targets.base import NotificationTarget
[docs]
logger = logging.getLogger(__name__)
[docs]
class NotificationDispatcher:
"""Fan-out delivery to a list of :class:`NotificationTarget` instances.
Each target is called independently; a failure in one target never
prevents others from receiving events. Results are aggregated into a
:class:`DispatchResult` for the caller (typically ``run_poll_cycle``).
Usage::
dispatcher = NotificationDispatcher(targets=[my_target])
result = dispatcher.dispatch(events)
"""
def __init__(self, targets: list[NotificationTarget]) -> None:
[docs]
self._targets = targets
[docs]
def dispatch(self, events: list[NotificationEvent]) -> DispatchResult:
"""Deliver *events* to all registered targets.
If *events* is empty the dispatcher is a no-op and returns an empty
:class:`DispatchResult`.
"""
result = DispatchResult(events=events)
if not events:
return result
for target in self._targets:
try:
delivery = target.deliver(events)
except Exception as exc:
logger.exception("Target '%s' raised an unexpected error", target.name)
delivery = DeliveryResult(
target=target.name,
events_attempted=len(events),
events_delivered=0,
errors=[str(exc)],
)
else:
if delivery.errors:
for err in delivery.errors:
logger.warning("Target '%s' delivery error: %s", target.name, err)
else:
logger.debug(
"Target '%s' delivered %d/%d events",
target.name,
delivery.events_delivered,
delivery.events_attempted,
)
result.results.append(delivery)
return result