Source code for corvix.web.app

"""Litestar app serving Corvix dashboard data and UI."""

from __future__ import annotations

import asyncio
import hashlib
import hmac
import json
import logging
import re
import signal
import threading
from collections.abc import AsyncIterator, Mapping
from datetime import UTC, datetime, timedelta
from http import HTTPStatus
from importlib.resources import files
from os import environ
from pathlib import Path
from time import monotonic
from typing import Any, Literal, cast, overload

import uvicorn
from litestar import Litestar, Request, Response, get, post
from litestar.config.compression import CompressionConfig
from litestar.datastructures.cookie import Cookie
from litestar.datastructures.headers import CacheControlHeader
from litestar.exceptions import HTTPException
from litestar.openapi import OpenAPIConfig
from litestar.response import ServerSentEvent, ServerSentEventMessage
from litestar.response.redirect import Redirect
from litestar.serialization import encode_json
from litestar.static_files import create_static_files_router

from corvix.config import AppConfig, DashboardSpec, GitHubAccountConfig, available_dashboards, load_config
from corvix.dashboarding import build_dashboard_data
from corvix.domain import NotificationRecord, PollerStatus, parse_timestamp
from corvix.env import get_env_value
from corvix.ingestion import GitHubNotificationsClient
from corvix.observability import configure_logging, setup_tracing
from corvix.observability import metrics as _metrics
from corvix.observability.middleware import ObservabilityMiddleware
from corvix.storage import StorageBackend, StorageConfigError, create_storage
from corvix.web.middleware import SESSION_MAX_AGE_SECONDS, TokenAuthMiddleware, _get_secret, _make_session_cookie
from corvix.web.schemas import (
    AccountErrorResponse,
    PollerStatusResponse,
    RuleSnippetsResponse,
    SnapshotResponse,
    build_snapshot_response,
)

[docs] logger = logging.getLogger(__name__)
[docs] THEMES: dict[str, dict[str, str]] = { "midnight": { "bg": "#07111f", "surface": "#0e1a2b", "surface_elevated": "#132238", "ink": "#edf3ff", "muted": "#8fa3c7", "accent": "#74c0fc", "line": "#223753", "ok": "#59d7a4", "danger": "#ff7b72", }, "graphite": { "bg": "#0d1117", "surface": "#161b22", "surface_elevated": "#1f2937", "ink": "#f5f7fb", "muted": "#95a3b8", "accent": "#f2cc60", "line": "#2d3748", "ok": "#56d364", "danger": "#ff938a", }, }
[docs] _STATIC_ROOT = files("corvix.web").joinpath("static")
[docs] _STATIC_ASSETS_DIR = str(_STATIC_ROOT.joinpath("assets"))
[docs] _ASSET_FILENAMES = ("app.js", "index.css", "favicon.svg")
[docs] _ASSET_CACHE_CONTROL = CacheControlHeader(public=True, max_age=31536000, immutable=True)
[docs] def _asset_version_token() -> str: digest = hashlib.sha256() found_asset = False for asset_name in _ASSET_FILENAMES: asset_file = _STATIC_ROOT.joinpath("assets", asset_name) if not asset_file.is_file(): continue found_asset = True digest.update(asset_name.encode("utf-8")) digest.update(asset_file.read_bytes()) if not found_asset: return "dev" return digest.hexdigest()[:12]
[docs] _INDEX_HTML_TEMPLATE = _STATIC_ROOT.joinpath("index.html").read_text(encoding="utf-8")
[docs] INDEX_HTML = _INDEX_HTML_TEMPLATE.replace("__ASSET_VERSION__", _asset_version_token())
[docs] _MEDIA_TYPE_HTML = "text/html"
[docs] _LOGIN_HTML = """\ <!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <meta name="viewport" content="width=device-width, initial-scale=1"> <title>Corvix — Sign in</title> <style> *, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; } body { font-family: system-ui, sans-serif; display: flex; height: 100vh; align-items: center; justify-content: center; background: #07111f; color: #edf3ff; } form { display: flex; flex-direction: column; gap: 1rem; min-width: 300px; padding: 2rem; background: #0e1a2b; border-radius: 8px; } h2 { font-size: 1.4rem; color: #74c0fc; } input[type=password] { padding: .65rem .9rem; border-radius: 6px; border: 1px solid #223753; background: #132238; color: #edf3ff; font-size: 1rem; } input[type=password]:focus { outline: 2px solid #74c0fc; border-color: transparent; } button { padding: .65rem .9rem; border-radius: 6px; border: none; background: #74c0fc; color: #07111f; font-size: 1rem; font-weight: 600; cursor: pointer; } button:hover { background: #a5d4ff; } </style> </head> <body> <form method="post" action="/login"> <h2>Corvix</h2> <input type="password" name="token" placeholder="Secret token" required autofocus> <button type="submit">Sign in</button> </form> </body> </html>"""
@get("/", sync_to_thread=False)
[docs] def index() -> Response[str]: """Serve the dashboard single-page UI.""" return Response(content=INDEX_HTML, media_type=_MEDIA_TYPE_HTML)
@get("/dashboards/{dashboard_name:str}", sync_to_thread=False)
[docs] def dashboard_index(dashboard_name: str) -> Response[str]: """Serve the dashboard SPA for bookmarkable dashboard URLs.""" del dashboard_name return Response(content=INDEX_HTML, media_type=_MEDIA_TYPE_HTML)
[docs] def _get_auth_secret() -> str: """Return the configured secret, delegating to middleware._get_secret(). Using the shared implementation ensures consistent TTL caching, memoized misconfiguration logging, and ``_FILE`` support in one place. """ return _get_secret()
@get("/login", sync_to_thread=False)
[docs] def login_page() -> Response[Any]: """Serve the login form, or redirect to / when auth is not configured.""" if not _get_auth_secret(): return Redirect("/") return Response(content=_LOGIN_HTML, media_type=_MEDIA_TYPE_HTML)
@post("/login")
[docs] async def login(request: Request) -> Response[None]: """Validate the submitted token and issue a session cookie on success. The ``Secure`` attribute is set when the request arrived over HTTPS. The scheme is read from ``request.url.scheme`` which reflects the real protocol when uvicorn is started with ``--proxy-headers`` (trusting ``X-Forwarded-Proto`` from a reverse proxy). This avoids raw header inspection from application code, which can be spoofed by untrusted clients not behind a proxy. """ form_data = await request.form() token = str(form_data.get("token", "")) secret = _get_auth_secret() if not secret or not hmac.compare_digest(token, secret): raise HTTPException(status_code=401, detail="Invalid token") session_val = _make_session_cookie(secret) redirect: Response[None] = Redirect("/") redirect.set_cookie( Cookie( key="corvix_session", value=session_val, httponly=True, samesite="strict", path="/", max_age=SESSION_MAX_AGE_SECONDS, secure=request.url.scheme == "https", ) ) return redirect
@get("/logout", sync_to_thread=False)
[docs] def logout() -> Response[None]: """Clear the session cookie and redirect to the login page.""" redirect: Response[None] = Redirect("/login") redirect.delete_cookie("corvix_session") return redirect
[docs] def _health_error(poller_status: PollerStatus) -> dict[str, object]: raw_detail: str | None = poller_status.last_error if isinstance(raw_detail, str): raw_detail = raw_detail.split("\n")[-1].strip() or raw_detail return {"status": "unhealthy", "reason": "poller_error", "detail": raw_detail}
[docs] def _health_check_staleness(last_poll_str: str) -> dict[str, object]: try: last_poll = parse_timestamp(last_poll_str) except ValueError: return {"status": "unhealthy", "reason": "invalid_poll_time"} staleness = datetime.now(tz=UTC) - last_poll if staleness > timedelta(minutes=5): return { "status": "unhealthy", "reason": "stale", "last_poll_seconds_ago": int(staleness.total_seconds()), } return {"status": "ok"}
[docs] def _health_response(payload: dict[str, object]) -> Response[dict[str, object]]: status_code = HTTPStatus.OK if payload.get("status") == "ok" else HTTPStatus.SERVICE_UNAVAILABLE return Response(content=payload, status_code=status_code, media_type="application/json")
[docs] def _read_health_poller_status() -> PollerStatus | dict[str, object]: """Resolve the poller status for the health check, or a failure payload.""" try: _load_runtime_config() except HTTPException: return {"status": "unhealthy", "reason": "config_unavailable"} try: storage = _get_storage() except HTTPException: return {"status": "unhealthy", "reason": "storage_unavailable"} try: return storage.load_status() except (OSError, json.JSONDecodeError): return {"status": "unhealthy", "reason": "invalid_cache"} except Exception: logger.exception("Failed to read poller status from storage") return {"status": "unhealthy", "reason": "storage_unavailable"}
[docs] _DEPRECATION_HEADER = "Deprecation"
[docs] _DEPRECATION_HEADER_VALUE = "true"
# --------------------------------------------------------------------------- # Implementation helpers # These plain (non-decorated) functions contain the business logic shared # between the versioned /api/v1/ route handlers and the deprecated /api/ # backward-compat wrappers. Never call the decorated Litestar route handlers # directly — use these helpers instead. # ---------------------------------------------------------------------------
[docs] def _health_impl(extra_headers: dict[str, str] | None = None) -> Response[dict[str, object]]: """Compute and return the health check response.""" poller_status = _read_health_poller_status() payload: dict[str, object] if isinstance(poller_status, dict): payload = poller_status elif poller_status.status == "error": payload = _health_error(poller_status) elif poller_status.status in {"unknown", "starting"}: payload = {"status": "unhealthy", "reason": "poller_not_running"} else: last_poll_str = poller_status.last_poll_time if not last_poll_str: payload = {"status": "unhealthy", "reason": "invalid_poll_time"} else: payload = _health_check_staleness(last_poll_str) status_code = HTTPStatus.OK if payload.get("status") == "ok" else HTTPStatus.SERVICE_UNAVAILABLE return Response( content=payload, status_code=int(status_code), media_type="application/json", headers=extra_headers, )
[docs] def _snapshot_impl(dashboard: str | None = None) -> SnapshotResponse: """Compute and return the typed snapshot payload.""" config = _load_runtime_config() storage = _get_storage() generated_at, records = storage.load_records() try: poller_status = storage.load_status() except (OSError, json.JSONDecodeError): poller_status = PollerStatus() selected_dashboard = _select_dashboard(config.dashboards, dashboard) data = build_dashboard_data( records=records, dashboard=selected_dashboard, generated_at=generated_at, ) last_poll_str = poller_status.last_poll_time stale = False if last_poll_str: try: last_poll = parse_timestamp(last_poll_str) stale = (datetime.now(tz=UTC) - last_poll) > timedelta(minutes=5) except ValueError: stale = True else: stale = True raw_last_error: str | None = poller_status.last_error if isinstance(raw_last_error, str): raw_last_error = raw_last_error.split("\n")[-1].strip() or raw_last_error poller = PollerStatusResponse( status=poller_status.status, last_poll_time=last_poll_str, last_error=raw_last_error, last_error_time=poller_status.last_error_time, stale=stale, account_errors=[ AccountErrorResponse(account_id=e.account_id, account_label=e.account_label, error=e.error) for e in poller_status.account_errors ], ) return build_snapshot_response( data=data, dashboard_names=_dashboard_names(config.dashboards), poller=poller, notifications_config=config.notifications, )
[docs] def _notification_rule_snippets_impl( account_id: str, thread_id: str, dashboard: str | None = None, ) -> RuleSnippetsResponse: """Compute and return the typed rule-snippets payload.""" config = _load_runtime_config() selected_dashboard = _select_dashboard(config.dashboards, dashboard) _require_account(config=config, account_id=account_id) _generated_at, records = _get_storage().load_records() record = _find_record(records=records, account_id=account_id, thread_id=thread_id) if record is None: msg = f"Notification '{account_id}/{thread_id}' not found in storage." raise HTTPException(status_code=404, detail=msg) base_match = _rule_match_lines(record=record, include_context=False) context_match = _rule_match_lines(record=record, include_context=True) return RuleSnippetsResponse( dashboard_name=selected_dashboard.name, dashboard_ignore_rule_snippet=_dashboard_ignore_rule_snippet(base_match), global_exclude_rule_snippet=_global_exclude_rule_snippet(record=record, match_lines=base_match), dashboard_ignore_rule_with_context_snippet=( _dashboard_ignore_rule_snippet(context_match) if context_match is not None else None ), global_exclude_rule_with_context_snippet=( _global_exclude_rule_snippet(record=record, match_lines=context_match) if context_match is not None else None ), has_context=bool(record.context), )
# --------------------------------------------------------------------------- # /api/v1/ — versioned route handlers (current API) # --------------------------------------------------------------------------- @get("/api/v1/health")
[docs] def health() -> Response[dict[str, object]]: """Health endpoint for container checks. Returns 200 with {"status": "ok"} when config and storage are readable, the poller is running, and the poller's last poll time is not stale. Returns 503 with {"status": "unhealthy"} and one of these reasons: "config_unavailable", "storage_unavailable", "invalid_cache", "poller_not_running", "poller_error", "invalid_poll_time", or "stale". """ return _health_impl()
@get("/metrics", sync_to_thread=False)
[docs] def metrics_endpoint() -> Response[bytes]: """Expose Prometheus metrics in text exposition format for scraping.""" payload, content_type = _metrics.render_latest() # Litestar appends "; charset=utf-8" to text media types, so strip any # charset already present in the Prometheus content type to avoid a duplicate. media_type = content_type.split("; charset=", 1)[0] return Response(content=payload, media_type=media_type)
@get("/api/v1/themes", sync_to_thread=False)
[docs] def api_themes() -> dict[str, object]: """Return available theme presets.""" return {"themes": THEMES}
@get("/api/v1/dashboards")
[docs] def dashboards() -> dict[str, object]: """List configured dashboard names.""" config = _load_runtime_config() names = _dashboard_names(config.dashboards) return {"dashboard_names": names}
@get("/api/v1/snapshot")
[docs] def snapshot(dashboard: str | None = None) -> SnapshotResponse: """Return the selected dashboard data from storage.""" return _snapshot_impl(dashboard=dashboard)
# --------------------------------------------------------------------------- # Server-Sent Events: push snapshot updates instead of client-side polling. # --------------------------------------------------------------------------- # The server polls storage on a short interval and pushes a ``snapshot`` event # only when the serialized payload actually changes, so idle connections cost a # periodic comparison (plus an occasional keep-alive) rather than a full # response on every tick. Browsers reconnect automatically via EventSource, and # the frontend falls back to interval polling when SSE is unavailable. # # A short-lived, process-wide cache of the serialized body (keyed by dashboard) # collapses the storage reads of concurrent connections watching the same # dashboard into roughly one read per poll interval, instead of one read per # connection per tick.
[docs] _SSE_DEFAULT_POLL_INTERVAL_SECONDS = 3.0
[docs] _SSE_KEEPALIVE_SECONDS = 20.0
[docs] def _sse_poll_interval() -> float: """Return the server-side SSE poll interval in seconds. Read from ``CORVIX_SSE_POLL_INTERVAL_SECONDS``; falls back to the default when unset, non-numeric, or non-positive. """ raw = environ.get("CORVIX_SSE_POLL_INTERVAL_SECONDS") if raw is None: return _SSE_DEFAULT_POLL_INTERVAL_SECONDS try: value = float(raw) except ValueError: return _SSE_DEFAULT_POLL_INTERVAL_SECONDS return value if value > 0 else _SSE_DEFAULT_POLL_INTERVAL_SECONDS
[docs] def _snapshot_event_body(dashboard: str | None) -> str: """Build the snapshot payload and serialize it to a compact JSON string. Uses msgspec (the encoder Litestar uses for the equivalent HTTP route) so the SSE body and the ``GET /api/v1/snapshot`` response share one serialization path and stay byte-for-byte consistent. """ payload = _snapshot_impl(dashboard=dashboard) return encode_json(payload).decode("utf-8")
# Process-wide cache of the most recently built SSE body per dashboard, used to # deduplicate the storage reads of concurrent connections. Guarded by a lock so # only one worker thread rebuilds at a time; the set of dashboards is bounded by # config, so the dict does not grow without bound.
[docs] _snapshot_body_cache: dict[str | None, tuple[float, str]] = {}
[docs] _snapshot_body_cache_lock = threading.Lock()
[docs] def _cached_snapshot_event_body(dashboard: str | None, ttl: float) -> str: """Return the snapshot body for *dashboard*, reusing a build newer than *ttl*. Within a ``ttl``-second window (one poll interval) concurrent SSE connections watching the same dashboard share a single storage read and serialization. A strict ``age < ttl`` comparison means a lone connection, whose ticks are spaced one interval apart, still rebuilds every tick — so the cache adds no latency in the common single-client case. """ now = monotonic() with _snapshot_body_cache_lock: cached = _snapshot_body_cache.get(dashboard) if cached is not None and now - cached[0] < ttl: return cached[1] body = _snapshot_event_body(dashboard) _snapshot_body_cache[dashboard] = (monotonic(), body) return body
[docs] def _snapshot_error_payload(error: Exception) -> str: """Serialize an SSE ``snapshot-error`` payload for *error*. ``HTTPException`` carries a client-safe detail and status code; any other exception is reported generically (its message is not leaked to the client). """ if isinstance(error, HTTPException): detail = error.detail if isinstance(error.detail, str) else "Unable to build snapshot." status_code = error.status_code else: detail = "Internal server error." status_code = 500 return json.dumps({"detail": detail, "status_code": status_code})
[docs] async def _snapshot_event_generator(dashboard: str | None) -> AsyncIterator[ServerSentEventMessage]: """Yield SSE messages for *dashboard*, pushing only on change. Emits a ``snapshot`` event whenever the serialized payload differs from the last one sent, a ``snapshot-error`` event when the payload cannot be produced, and a comment-only keep-alive when nothing has changed for a while (so proxies do not drop an idle connection). The blocking storage read runs in a worker thread to avoid stalling the event loop. Any error building the snapshot is reported to the client and the stream keeps running, recovering on a later tick; this avoids tearing down the connection (and triggering a client reconnection storm) on a transient storage or serialization failure. """ interval = _sse_poll_interval() last_digest: str | None = None last_emit = monotonic() while True: try: body = await asyncio.to_thread(_cached_snapshot_event_body, dashboard, interval) except Exception as error: if not isinstance(error, HTTPException): logger.exception("Unexpected error building SSE snapshot") # A distinct event name (not "error") so it does not collide with # the EventSource connection-error event on the client. yield ServerSentEventMessage(data=_snapshot_error_payload(error), event="snapshot-error") last_digest = None last_emit = monotonic() await asyncio.sleep(interval) continue digest = hashlib.sha256(body.encode("utf-8")).hexdigest() now = monotonic() if digest != last_digest: last_digest = digest last_emit = now yield ServerSentEventMessage(data=body, event="snapshot") elif now - last_emit >= _SSE_KEEPALIVE_SECONDS: last_emit = now yield ServerSentEventMessage(comment="keep-alive") await asyncio.sleep(interval)
@get("/api/v1/events")
[docs] async def events(dashboard: str | None = None) -> ServerSentEvent: """Stream dashboard snapshots as Server-Sent Events. Replaces fixed-interval client polling: the connection stays open and the server pushes a ``snapshot`` event only when the data changes, cutting both latency and per-cycle overhead when nothing has happened. """ return ServerSentEvent(_snapshot_event_generator(dashboard))
@get("/api/v1/notifications/{account_id:str}/{thread_id:str}/rule-snippets")
[docs] def notification_rule_snippets( account_id: str, thread_id: str, dashboard: str | None = None, ) -> RuleSnippetsResponse: """Return prefilled ignore-rule snippets for a notification.""" return _notification_rule_snippets_impl(account_id=account_id, thread_id=thread_id, dashboard=dashboard)
@post("/api/v1/notifications/{account_id:str}/{thread_id:str}/dismiss", sync_to_thread=True)
[docs] def dismiss_notification(account_id: str, thread_id: str) -> Response[None]: """Dismiss a notification thread (removes it from the GitHub inbox). Calls DELETE /notifications/threads/{id} on GitHub, then marks the record as dismissed in local storage. Returns 204 No Content on success. """ return _dismiss_notification_impl(account_id=account_id, thread_id=thread_id)
@post("/api/v1/notifications/{account_id:str}/{thread_id:str}/mark-read", sync_to_thread=True)
[docs] def mark_notification_read(account_id: str, thread_id: str) -> Response[None]: """Mark a notification thread as read in GitHub and local storage.""" return _mark_notification_read_impl(account_id=account_id, thread_id=thread_id)
# --------------------------------------------------------------------------- # Deprecated /api/ routes — kept for backward compatibility during transition. # All routes below mirror their /api/v1/ counterparts but include a # ``Deprecation: true`` response header (RFC 8594). Clients should migrate to # the /api/v1/ equivalents. These routes will be removed in a future release. # ---------------------------------------------------------------------------
[docs] _DEPRECATED_HEADERS = {_DEPRECATION_HEADER: _DEPRECATION_HEADER_VALUE}
@get("/api/health")
[docs] def health_deprecated() -> Response[dict[str, object]]: """Deprecated: use /api/v1/health.""" return _health_impl(extra_headers=_DEPRECATED_HEADERS)
@get("/api/themes", sync_to_thread=False)
[docs] def api_themes_deprecated() -> Response[dict[str, object]]: """Deprecated: use /api/v1/themes.""" return Response(content={"themes": THEMES}, headers=_DEPRECATED_HEADERS)
@get("/api/dashboards")
[docs] def dashboards_deprecated() -> Response[dict[str, object]]: """Deprecated: use /api/v1/dashboards.""" config = _load_runtime_config() names = _dashboard_names(config.dashboards) return Response(content={"dashboard_names": names}, headers=_DEPRECATED_HEADERS)
@get("/api/snapshot")
[docs] def snapshot_deprecated(dashboard: str | None = None) -> Response[SnapshotResponse]: """Deprecated: use /api/v1/snapshot.""" return Response(content=_snapshot_impl(dashboard=dashboard), headers=_DEPRECATED_HEADERS)
@get("/api/notifications/{account_id:str}/{thread_id:str}/rule-snippets")
[docs] def notification_rule_snippets_deprecated( account_id: str, thread_id: str, dashboard: str | None = None, ) -> Response[RuleSnippetsResponse]: """Deprecated: use /api/v1/notifications/{account_id}/{thread_id}/rule-snippets.""" return Response( content=_notification_rule_snippets_impl( account_id=account_id, thread_id=thread_id, dashboard=dashboard, ), headers=_DEPRECATED_HEADERS, )
@post("/api/notifications/{account_id:str}/{thread_id:str}/dismiss", sync_to_thread=True)
[docs] def dismiss_notification_deprecated(account_id: str, thread_id: str) -> Response[None]: """Deprecated: use /api/v1/notifications/{account_id}/{thread_id}/dismiss.""" _dismiss_notification_impl(account_id=account_id, thread_id=thread_id) return Response(content=None, status_code=204, headers=_DEPRECATED_HEADERS)
@post("/api/notifications/{thread_id:str}/dismiss", sync_to_thread=True)
[docs] def dismiss_notification_default_account(thread_id: str) -> Response[None]: """Deprecated: use /api/v1/notifications/{account_id}/{thread_id}/dismiss.""" config = _load_runtime_config() _dismiss_notification_impl(account_id=_default_account_id(config), thread_id=thread_id) return Response(content=None, status_code=204, headers=_DEPRECATED_HEADERS)
@post("/api/notifications/{account_id:str}/{thread_id:str}/mark-read", sync_to_thread=True)
[docs] def mark_notification_read_deprecated(account_id: str, thread_id: str) -> Response[None]: """Deprecated: use /api/v1/notifications/{account_id}/{thread_id}/mark-read.""" _mark_notification_read_impl(account_id=account_id, thread_id=thread_id) return Response(content=None, status_code=204, headers=_DEPRECATED_HEADERS)
@post("/api/notifications/{thread_id:str}/mark-read", sync_to_thread=True)
[docs] def mark_notification_read_default_account(thread_id: str) -> Response[None]: """Deprecated: use /api/v1/notifications/{account_id}/{thread_id}/mark-read.""" config = _load_runtime_config() _mark_notification_read_impl(account_id=_default_account_id(config), thread_id=thread_id) return Response(content=None, status_code=204, headers=_DEPRECATED_HEADERS)
[docs] def _dismiss_notification_impl(account_id: str, thread_id: str) -> Response[None]: config = _load_runtime_config() account = _require_account(config=config, account_id=account_id) try: token = get_env_value(account.token_env) except ValueError as error: raise HTTPException(status_code=500, detail=str(error)) from error if not token: msg = f"GitHub token env var '{account.token_env}' (or '{account.token_env}_FILE') is not set." raise HTTPException(status_code=500, detail=msg) client = _build_github_client(config=config, account=account, token=token) try: client.dismiss_thread(thread_id) except Exception as error: logger.exception("Failed to dismiss thread", extra={"thread_id": thread_id}) msg = f"Failed to dismiss thread {thread_id}: {error}" raise HTTPException(status_code=502, detail=msg) from error _get_storage().dismiss_record(thread_id=thread_id, account_id=account_id) return Response(content=None, status_code=204)
[docs] def _mark_notification_read_impl(account_id: str, thread_id: str) -> Response[None]: config = _load_runtime_config() account = _require_account(config=config, account_id=account_id) try: token = get_env_value(account.token_env) except ValueError as error: raise HTTPException(status_code=500, detail=str(error)) from error if not token: msg = f"GitHub token env var '{account.token_env}' (or '{account.token_env}_FILE') is not set." raise HTTPException(status_code=500, detail=msg) client = _build_github_client(config=config, account=account, token=token) try: client.mark_thread_read(thread_id) except Exception as error: logger.exception("Failed to mark thread as read", extra={"thread_id": thread_id}) msg = f"Failed to mark thread {thread_id} as read." raise HTTPException(status_code=502, detail=msg) from error _get_storage().mark_record_read(thread_id=thread_id, account_id=account_id) return Response(content=None, status_code=204)
[docs] def _require_account(config: AppConfig, account_id: str) -> GitHubAccountConfig: for account in config.github.accounts: if account.id == account_id: return account msg = f"GitHub account '{account_id}' not found in config." raise HTTPException(status_code=404, detail=msg)
[docs] def _build_github_client(config: AppConfig, account: GitHubAccountConfig, token: str) -> GitHubNotificationsClient: return GitHubNotificationsClient( token=token, api_base_url=account.api_base_url, request_timeout_seconds=config.polling.request_timeout_seconds, )
[docs] def _default_account_id(config: AppConfig) -> str: if not config.github.accounts: msg = "No GitHub accounts configured." raise HTTPException(status_code=500, detail=msg) return config.github.accounts[0].id
[docs] def _find_record( *, records: list[NotificationRecord], account_id: str, thread_id: str, ) -> NotificationRecord | None: for record in records: if record.notification.account_id == account_id and record.notification.thread_id == thread_id: return record return None
[docs] def _yaml_quoted(value: str) -> str: escaped = value.replace("\\", "\\\\").replace('"', '\\"') return f'"{escaped}"'
[docs] def _yaml_scalar(value: object) -> str: if isinstance(value, str): return _yaml_quoted(value) if isinstance(value, bool): return "true" if value else "false" return str(value)
[docs] def _slug_token(value: str) -> str: slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-") return slug or "rule"
[docs] def _rule_name_for_record(record: NotificationRecord) -> str: notification = record.notification repository = notification.repository reason = notification.reason subject_type = notification.subject_type return f"ignore-{_slug_token(repository)}-{_slug_token(reason)}-{_slug_token(subject_type)}"
@overload
[docs] def _rule_match_lines(*, record: NotificationRecord, include_context: Literal[False]) -> list[str]: ...
@overload def _rule_match_lines(*, record: NotificationRecord, include_context: Literal[True]) -> list[str] | None: ... def _rule_match_lines(*, record: NotificationRecord, include_context: bool) -> list[str] | None: notification = record.notification repository = notification.repository reason = notification.reason subject_type = notification.subject_type title_regex = _anchored_title_regex(notification.subject_title) lines = [ f"repository_in: [{_yaml_quoted(repository)}]", f"reason_in: [{_yaml_quoted(reason)}]", f"subject_type_in: [{_yaml_quoted(subject_type)}]", f"title_regex: {_yaml_quoted(title_regex)}", ] if not include_context: return lines context_predicates = _context_predicate_lines(record=record) if not context_predicates: return None return [*lines, "context:", *context_predicates]
[docs] def _context_predicate_lines(*, record: NotificationRecord) -> list[str]: context = record.context candidate_paths = ( "github.latest_comment.is_ci_only", "github.pr_state.state", "github.pr_state.draft", ) output: list[str] = [] for path in candidate_paths: found, value = _context_path_value(context=context, path=path) if not found: continue if isinstance(value, bool | int | float | str): output.extend( [ f" - path: {_yaml_quoted(path)}", " op: equals", f" value: {_yaml_scalar(value)}", ] ) return output
[docs] def _anchored_title_regex(title: str) -> str: escaped = re.sub(r"([.^$*+?{}\[\]|()\\])", r"\\\1", title) return f"^{escaped}$"
[docs] def _context_path_value(*, context: Mapping[str, object], path: str) -> tuple[bool, object | None]: current: object = context for segment in path.split("."): if not isinstance(current, dict): return False, None current_map = cast(dict[str, object], current) next_value = current_map.get(segment) if next_value is None and segment not in current_map: return False, None current = next_value return True, current
[docs] def _dashboard_ignore_rule_snippet(match_lines: list[str]) -> str: body = "\n".join(f" {line}" for line in match_lines) return f"- {body.lstrip()}"
[docs] def _global_exclude_rule_snippet(*, record: NotificationRecord, match_lines: list[str]) -> str: match_body = "\n".join(f" {line}" for line in match_lines) return f"- name: {_rule_name_for_record(record)}\n match:\n{match_body}\n exclude_from_dashboards: true"
# --------------------------------------------------------------------------- # Runtime-config cache # --------------------------------------------------------------------------- # AppConfig is expensive to produce (YAML read + full parse). We cache the # result at module level and invalidate it only when the config file's mtime # changes. A plain stat() per request is orders of magnitude cheaper than a # full YAML re-parse, so we still detect on-disk edits without re-reading # unconditionally on every HTTP request. # # The three cache fields live in a single mutable object so they can be # updated without ``global`` statements (which ruff PLW0603 flags). # ---------------------------------------------------------------------------
[docs] class _ConfigCache: """Mutable container for the module-level AppConfig cache."""
[docs] config: AppConfig | None = None
[docs] path: str | None = None
[docs] mtime: float | None = None
[docs] _config_cache = _ConfigCache()
[docs] class _StorageState: """Mutable container for the module-level storage backend."""
[docs] backend: StorageBackend | None = None
[docs] lock: threading.Lock = threading.Lock()
[docs] _storage_state = _StorageState()
[docs] def set_storage_backend(backend: StorageBackend | None) -> None: """Inject the storage backend used by route handlers. Production wiring leaves this unset and the backend is built lazily from config (PostgreSQL is required). Tests inject a backend directly and reset it to ``None`` afterwards. """ _storage_state.backend = backend
[docs] def _get_storage() -> StorageBackend: """Return the injected backend, or lazily build PostgreSQL storage from config. The built backend is cached so its connection pool is reused across requests. Building is guarded by a lock so concurrent first requests don't each create (and leak) a connection pool. Raises ``HTTPException`` (500) when no database is configured. """ if _storage_state.backend is not None: return _storage_state.backend with _storage_state.lock: if _storage_state.backend is not None: return _storage_state.backend config = _load_runtime_config() try: backend = create_storage(config) except StorageConfigError as error: raise HTTPException(status_code=500, detail=str(error)) from error _storage_state.backend = backend return backend
[docs] def _clear_config_cache() -> None: """Discard the cached AppConfig so the next request reloads from disk.""" _config_cache.config = None _config_cache.path = None _config_cache.mtime = None logger.info("Config cache cleared; config will be reloaded on the next request.")
[docs] def _load_runtime_config() -> AppConfig: """Return the cached AppConfig, re-parsing from disk only when the file changes. Config is read from the path in the ``CORVIX_CONFIG`` environment variable (default: ``corvix.yaml``). The file's mtime is checked on every call; the YAML is only re-parsed when either the path or the mtime differs from the last successful load, eliminating redundant I/O on every request. """ config_path = Path(environ.get("CORVIX_CONFIG", "corvix.yaml")) config_path_str = str(config_path) if not config_path.exists(): msg = f"Config file '{config_path}' does not exist." raise HTTPException(status_code=500, detail=msg) try: mtime = config_path.stat().st_mtime except OSError as error: msg = f"Unable to read config at '{config_path}': {error}" raise HTTPException(status_code=500, detail=msg) from error if _config_cache.config is not None and _config_cache.path == config_path_str and _config_cache.mtime == mtime: return _config_cache.config try: config = load_config(config_path) except ValueError as error: msg = f"Invalid config at '{config_path}': {error}" raise HTTPException(status_code=500, detail=msg) from error except OSError as error: msg = f"Unable to read config at '{config_path}': {error}" raise HTTPException(status_code=500, detail=msg) from error _config_cache.config = config _config_cache.path = config_path_str _config_cache.mtime = mtime logger.debug("Config loaded from '%s' (mtime=%.3f).", config_path, mtime) return config
[docs] def _install_sighup_handler() -> None: """Register a SIGHUP handler that clears the config cache. Sending ``SIGHUP`` to the server process forces the config to be reloaded from disk on the next request without restarting the process:: kill -HUP <pid> The handler is a no-op on platforms that do not support SIGHUP (e.g. Windows). """ try: signal.signal(signal.SIGHUP, lambda _sig, _frame: _clear_config_cache()) logger.debug("SIGHUP handler installed for config reload.") except (AttributeError, OSError): pass # SIGHUP is not available on all platforms (e.g. Windows)
_install_sighup_handler()
[docs] def _select_dashboard( dashboards: list[DashboardSpec], selected_name: str | None, ) -> DashboardSpec: available = available_dashboards(dashboards) if selected_name is None: return available[0] for dashboard in available: if dashboard.name == selected_name: return dashboard msg = f"Dashboard '{selected_name}' not found." raise HTTPException(status_code=404, detail=msg)
[docs] def _dashboard_names(dashboards: list[DashboardSpec]) -> list[str]: available = available_dashboards(dashboards) return [dashboard.name for dashboard in available]
[docs] def _configure_observability() -> None: """Configure structured logging and optional tracing at app startup. Runs as a Litestar startup hook so it applies whether the app is launched via ``corvix serve`` or directly through ``uvicorn corvix.web.app:app``. """ configure_logging() setup_tracing(service_name="corvix-web")
[docs] app = Litestar( route_handlers=[ index, dashboard_index, login_page, login, logout, metrics_endpoint, # /api/v1/ — versioned routes (current) health, api_themes, dashboards, snapshot, events, notification_rule_snippets, dismiss_notification, mark_notification_read, # /api/ — deprecated routes (backward compat; scheduled for removal) health_deprecated, api_themes_deprecated, dashboards_deprecated, snapshot_deprecated, notification_rule_snippets_deprecated, dismiss_notification_deprecated, dismiss_notification_default_account, mark_notification_read_deprecated, mark_notification_read_default_account, create_static_files_router( path="/assets", directories=[_STATIC_ASSETS_DIR], cache_control=_ASSET_CACHE_CONTROL, ), ], middleware=[ObservabilityMiddleware(), TokenAuthMiddleware()], on_startup=[_configure_observability], compression_config=CompressionConfig(backend="gzip", minimum_size=500), openapi_config=OpenAPIConfig( title="Corvix API", # Schema version of the HTTP contract, not the package version: bumped # deliberately when the API shape changes so the generated OpenAPI # document (and the TypeScript types derived from it) stay stable. version="1.0.0", description="JSON API backing the Corvix dashboard single-page app.", ), )
[docs] def run() -> None: """Run app with uvicorn.""" host = environ.get("CORVIX_WEB_HOST", "0.0.0.0") # nosec B104 - intentional; Docker/container deployments need all-interfaces port = int(environ.get("CORVIX_WEB_PORT", "8000")) reload_enabled = environ.get("CORVIX_WEB_RELOAD", "false").lower() in {"1", "true", "yes"} uvicorn.run( "corvix.web.app:app", host=host, port=port, reload=reload_enabled, reload_dirs=["src"], )