"""GitHub notifications ingestion client."""
from __future__ import annotations
import json
import logging
import re
import time
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from urllib import error as url_error
from urllib import parse, request
from corvix.config import PollingConfig
from corvix.domain import Notification
from corvix.observability import metrics, span
from corvix.types import JsonObject, JsonValue
[docs]
logger = logging.getLogger(__name__)
[docs]
REQUEST_FAILED_DETAIL = "request failed"
# GitHub notification thread IDs are positive integers.
[docs]
_THREAD_ID_RE = re.compile(r"^[1-9]\d*$")
[docs]
def _as_json_object(value: JsonValue) -> JsonObject | None:
"""Return value as a JSON object when it is a dict."""
if not isinstance(value, dict):
return None
return value
[docs]
def _validate_thread_id(thread_id: str) -> None:
"""Raise ValueError if *thread_id* is not a valid GitHub notification thread ID.
GitHub thread IDs are positive decimal integers. Rejecting anything that
does not match prevents path-traversal sequences such as ``../`` from being
embedded in the URL path constructed by callers.
"""
if not isinstance(thread_id, str):
msg = f"Invalid thread_id {thread_id!r}: must be a string."
raise TypeError(msg)
if not _THREAD_ID_RE.fullmatch(thread_id):
msg = f"Invalid thread_id {thread_id!r}: must be a positive integer string."
raise ValueError(msg)
[docs]
def _coerce_json_value(value: object) -> JsonValue:
if value is None or isinstance(value, str | int | float | bool):
return value
if isinstance(value, list):
return [_coerce_json_value(item) for item in value]
if isinstance(value, dict):
output: JsonObject = {}
for key, item in value.items():
if not isinstance(key, str):
msg = "JSON object contains a non-string key."
raise ValueError(msg)
output[key] = _coerce_json_value(item)
return output
msg = "Unsupported JSON value type."
raise ValueError(msg)
@dataclass(slots=True)
[docs]
class GitHubNotificationsClient:
"""Client for GitHub notifications API."""
[docs]
api_base_url: str = "https://api.github.com"
[docs]
account_id: str = "primary"
[docs]
account_label: str = "Primary"
[docs]
request_timeout_seconds: float = 30.0
[docs]
def fetch_notifications(self, polling: PollingConfig) -> list[Notification]:
"""Fetch notifications with pagination."""
notifications: list[Notification] = []
page = 1
while page <= polling.max_pages:
raw = self._fetch_page(
polling=polling,
page=page,
timeout_seconds=polling.request_timeout_seconds,
)
if not raw:
break
notifications.extend(
Notification.from_api_payload(
payload,
account_id=self.account_id,
account_label=self.account_label,
)
for payload in raw
)
page += 1
return notifications
[docs]
def _fetch_page(self, polling: PollingConfig, page: int, timeout_seconds: float) -> list[JsonObject]:
query = {
"all": str(polling.all).lower(),
"participating": str(polling.participating).lower(),
"per_page": str(polling.per_page),
"page": str(page),
}
url = self._build_url("/notifications", query)
payload = self._request_json(url, method="GET", timeout_seconds=timeout_seconds)
if not isinstance(payload, list):
msg = "GitHub API returned unexpected notifications payload."
raise ValueError(msg)
output: list[JsonObject] = []
for item in payload:
item_object = _as_json_object(item)
if item_object is not None:
output.append(item_object)
return output
[docs]
def mark_thread_read(self, thread_id: str) -> None:
"""Mark a notification thread as read."""
_validate_thread_id(thread_id)
url = self._build_url(f"/notifications/threads/{thread_id}", {})
self._request_no_content(url, method="PATCH")
[docs]
def dismiss_thread(self, thread_id: str) -> None:
"""Dismiss a notification thread (removes it from inbox permanently)."""
_validate_thread_id(thread_id)
url = self._build_url(f"/notifications/threads/{thread_id}", {})
self._request_no_content_with_backoff(url, method="DELETE")
[docs]
def fetch_json_url(self, url: str, timeout_seconds: float = 30.0) -> JsonValue:
"""Fetch JSON from a fully-qualified API URL with host validation."""
safe_url = self._sanitize_api_url(url)
return self._request_json(safe_url, method="GET", timeout_seconds=timeout_seconds)
[docs]
def _build_url(self, path: str, query: dict[str, str]) -> str:
base = self.api_base_url.rstrip("/")
encoded_query = parse.urlencode(query)
return f"{base}{path}?{encoded_query}" if encoded_query else f"{base}{path}"
@contextmanager
[docs]
def _instrument_request(self, method: str) -> Iterator[None]:
"""Record GitHub API request count/latency metrics and a trace span.
Labels successful calls ``"success"`` and failed calls by HTTP status
code (for ``HTTPError``) or ``"error"`` (for other failures).
"""
start = time.perf_counter()
status = "success"
with span("github.api.request", {"http.request.method": method}):
try:
yield
except url_error.HTTPError as error:
status = str(error.code)
raise
except Exception:
status = "error"
raise
finally:
metrics.github_api_requests_total.labels(method, status).inc()
metrics.github_api_request_duration_seconds.labels(method).observe(time.perf_counter() - start)
[docs]
def _request_json(self, url: str, method: str, timeout_seconds: float | None = None) -> JsonValue:
req = request.Request(url=url, method=method, headers=self._headers()) # NOSONAR python:S5144
effective_timeout = self.request_timeout_seconds if timeout_seconds is None else timeout_seconds
# nosec B310 - url is always constructed from self.api_base_url (trusted config) or
# sanitised by _sanitize_api_url which enforces matching host and trusted scheme.
with self._instrument_request(method), request.urlopen(req, timeout=effective_timeout) as response: # nosec B310 # NOSONAR python:S5144
raw = response.read().decode("utf-8")
return _coerce_json_value(json.loads(raw))
[docs]
def _request_no_content(self, url: str, method: str, timeout_seconds: float | None = None) -> None:
req = request.Request(url=url, method=method, headers=self._headers(), data=b"") # NOSONAR python:S5144
effective_timeout = self.request_timeout_seconds if timeout_seconds is None else timeout_seconds
# nosec B310 - url always originates from _build_url (self.api_base_url) after thread-id
# validation; no external data can reach this call without passing _validate_thread_id.
with self._instrument_request(method), request.urlopen(req, timeout=effective_timeout): # nosec B310 # NOSONAR python:S5144
return
[docs]
def _request_no_content_with_backoff(self, url: str, method: str, max_attempts: int = 4) -> None:
"""Perform no-content request with retries for GitHub throttling responses."""
attempt = 1
while attempt <= max_attempts:
try:
self._request_no_content(url, method)
return
except url_error.HTTPError as error:
retryable = error.code in {403, 429}
if not retryable or attempt >= max_attempts:
detail = _http_error_detail(error)
msg = f"GitHub API request failed with status {error.code}: {detail}"
raise RuntimeError(msg) from error
delay_seconds = _retry_delay_seconds(error=error, attempt=attempt)
logger.warning(
"GitHub API throttled dismiss request; retrying",
extra={"attempt": attempt, "max_attempts": max_attempts, "delay_seconds": delay_seconds},
)
time.sleep(delay_seconds)
attempt += 1
[docs]
def _sanitize_api_url(self, url: str) -> str:
"""Validate ``url`` and return a safe reconstruction using the trusted base host.
Reconstructs the URL with the scheme and netloc from ``self.api_base_url``
so that only the path and query from the input survive into the request.
This neutralises SSRF via scheme injection or host-header manipulation
while still allowing the caller to supply the full API path.
"""
parsed = parse.urlparse(url)
base = parse.urlparse(self.api_base_url)
expected_host = base.hostname
actual_host = parsed.hostname
if not expected_host or not actual_host or actual_host.casefold() != expected_host.casefold():
msg = "URL host must match configured GitHub API base host."
raise ValueError(msg)
# Reconstruct with trusted scheme + netloc; keep only path and query from input.
return parse.urlunparse((base.scheme, base.netloc, parsed.path, "", parsed.query, ""))
[docs]
def _http_error_detail(error: url_error.HTTPError) -> str:
try:
payload = json.loads(error.read().decode("utf-8"))
except Exception:
return error.reason if isinstance(error.reason, str) else REQUEST_FAILED_DETAIL
if not isinstance(payload, dict):
return error.reason if isinstance(error.reason, str) else REQUEST_FAILED_DETAIL
message = payload.get("message")
if not isinstance(message, str) or not message:
return error.reason if isinstance(error.reason, str) else REQUEST_FAILED_DETAIL
return message
[docs]
def _retry_delay_seconds(error: url_error.HTTPError, attempt: int) -> float:
retry_after_raw = error.headers.get("Retry-After") if error.headers is not None else None
if isinstance(retry_after_raw, str):
try:
retry_after_seconds = float(retry_after_raw)
except ValueError:
retry_after_seconds = 0.0
if retry_after_seconds > 0:
return min(retry_after_seconds, 10.0)
return min(0.5 * (2 ** (attempt - 1)), 5.0)