Source code for corvix.hydration.providers.github_thread_subject

"""Hydration provider for recovering missing subject URLs from thread payloads."""

from __future__ import annotations

from dataclasses import dataclass, replace
from typing import TypeIs

from corvix.domain import Notification
from corvix.hydration.base import HydrationContext
from corvix.pipeline.base import JsonFetchClient


[docs] def _is_str_object_map(value: object) -> TypeIs[dict[str, object]]: return isinstance(value, dict) and all(isinstance(key, str) for key in value)
@dataclass(slots=True)
[docs] class GitHubThreadSubjectProvider: """Backfills subject_url from a notification thread payload."""
[docs] timeout_seconds: float = 10.0
[docs] name: str = "github.thread_subject"
[docs] def hydrate(self, notification: Notification, client: JsonFetchClient, ctx: HydrationContext) -> Notification: if notification.subject_url is not None or not notification.thread_url: return notification payload = ctx.get_json(client=client, url=notification.thread_url, timeout_seconds=self.timeout_seconds) if not _is_str_object_map(payload): return notification subject = payload.get("subject") if not _is_str_object_map(subject): return notification subject_url = subject.get("url") if isinstance(subject_url, str) and subject_url: return replace(notification, subject_url=subject_url) return notification