Source code for corvix.actions
"""Action execution for matched rules."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Protocol, runtime_checkable
from corvix.config import RuleAction
from corvix.domain import Notification, NotificationRecord
if TYPE_CHECKING:
pass
@runtime_checkable
[docs]
class MarkReadGateway(Protocol):
"""Gateway interface for marking notification threads as read."""
[docs]
def mark_thread_read(self, thread_id: str) -> None:
"""Mark a thread as read."""
@runtime_checkable
[docs]
class DismissGateway(Protocol):
"""Gateway interface for dismissing (deleting) notification threads."""
[docs]
def dismiss_thread(self, thread_id: str) -> None:
"""Dismiss a thread from the inbox permanently."""
@dataclass(slots=True)
[docs]
class ActionExecutionResult:
"""Summary of actions taken on one notification."""
[docs]
actions_taken: list[str] = field(default_factory=list)
[docs]
errors: list[str] = field(default_factory=list)
@dataclass(slots=True)
[docs]
class ActionExecutionContext:
"""Bundles all execution dependencies for :func:`execute_actions`.
Attributes:
gateway: Must implement :class:`MarkReadGateway`.
apply_actions: If ``False`` actions are recorded as dry-run only.
dismiss_gateway: Must implement :class:`DismissGateway`; required for dismiss actions.
record: The associated :class:`~corvix.domain.NotificationRecord` used for dismiss state tracking.
"""
[docs]
gateway: MarkReadGateway
[docs]
apply_actions: bool = False
[docs]
dismiss_gateway: DismissGateway | None = None
[docs]
record: NotificationRecord | None = None
# ---------------------------------------------------------------------------
# Internal action handlers (Strategy Pattern)
# ---------------------------------------------------------------------------
[docs]
class _ActionHandler(Protocol):
"""Strategy interface for a single action type."""
[docs]
def execute(
self,
notification: Notification,
result: ActionExecutionResult,
) -> None:
"""Execute the action, mutating *result* in place."""
[docs]
class _MarkReadHandler:
"""Handles the ``mark_read`` action."""
def __init__(self, gateway: MarkReadGateway, apply_actions: bool) -> None:
[docs]
self._gateway = gateway
[docs]
self._apply_actions = apply_actions
[docs]
def execute(self, notification: Notification, result: ActionExecutionResult) -> None:
if not notification.unread:
return
if not self._apply_actions:
result.actions_taken.append("dry-run:mark_read")
return
try:
self._gateway.mark_thread_read(notification.thread_id)
notification.unread = False
result.actions_taken.append("mark_read")
except Exception as error:
result.errors.append(f"mark_read failed for {notification.thread_id}: {error}")
[docs]
class _DismissHandler:
"""Handles the ``dismiss`` action."""
def __init__(
self,
gateway: DismissGateway | None,
apply_actions: bool,
record: NotificationRecord | None,
) -> None:
[docs]
self._gateway = gateway
[docs]
self._apply_actions = apply_actions
[docs]
def execute(self, notification: Notification, result: ActionExecutionResult) -> None:
if self._record is not None and self._record.dismissed:
return
if not self._apply_actions:
result.actions_taken.append("dry-run:dismiss")
return
if self._gateway is None:
result.errors.append(f"dismiss action for {notification.thread_id}: no dismiss_gateway provided.")
return
try:
self._gateway.dismiss_thread(notification.thread_id)
if self._record is not None:
self._record.dismissed = True
result.actions_taken.append("dismiss")
except Exception as error:
result.errors.append(f"dismiss failed for {notification.thread_id}: {error}")
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def execute_actions(
notification: Notification,
actions: list[RuleAction],
context: ActionExecutionContext,
) -> ActionExecutionResult:
"""Execute configured actions against a notification.
Args:
notification: The notification to act on.
actions: The list of rule actions to execute.
context: Execution context carrying gateways and flags.
"""
handlers: dict[str, _MarkReadHandler | _DismissHandler] = {
"mark_read": _MarkReadHandler(context.gateway, context.apply_actions),
"dismiss": _DismissHandler(context.dismiss_gateway, context.apply_actions, context.record),
}
result = ActionExecutionResult()
seen_actions: set[str] = set()
for action in actions:
action_type = action.action_type.strip().lower()
if not action_type or action_type in seen_actions:
continue
seen_actions.add(action_type)
handler = handlers.get(action_type)
if handler is None:
result.errors.append(f"Unsupported action '{action.action_type}'.")
continue
handler.execute(notification, result)
return result