"""Litestar app serving Corvix dashboard data and UI."""
from __future__ import annotations
import asyncio
import hashlib
import hmac
import json
import logging
import re
import signal
import threading
from collections.abc import AsyncIterator, Mapping
from datetime import UTC, datetime, timedelta
from http import HTTPStatus
from importlib.resources import files
from os import environ
from pathlib import Path
from time import monotonic
from typing import Any, Literal, cast, overload
import uvicorn
from litestar import Litestar, Request, Response, get, post
from litestar.config.compression import CompressionConfig
from litestar.datastructures.cookie import Cookie
from litestar.datastructures.headers import CacheControlHeader
from litestar.exceptions import HTTPException
from litestar.openapi import OpenAPIConfig
from litestar.response import ServerSentEvent, ServerSentEventMessage
from litestar.response.redirect import Redirect
from litestar.serialization import encode_json
from litestar.static_files import create_static_files_router
from corvix.config import AppConfig, DashboardSpec, GitHubAccountConfig, available_dashboards, load_config
from corvix.dashboarding import build_dashboard_data
from corvix.domain import NotificationRecord, PollerStatus, parse_timestamp
from corvix.env import get_env_value
from corvix.ingestion import GitHubNotificationsClient
from corvix.observability import configure_logging, setup_tracing
from corvix.observability import metrics as _metrics
from corvix.observability.middleware import ObservabilityMiddleware
from corvix.storage import StorageBackend, StorageConfigError, create_storage
from corvix.web.middleware import SESSION_MAX_AGE_SECONDS, TokenAuthMiddleware, _get_secret, _make_session_cookie
from corvix.web.schemas import (
AccountErrorResponse,
PollerStatusResponse,
RuleSnippetsResponse,
SnapshotResponse,
build_snapshot_response,
)
[docs]
logger = logging.getLogger(__name__)
[docs]
THEMES: dict[str, dict[str, str]] = {
"midnight": {
"bg": "#07111f",
"surface": "#0e1a2b",
"surface_elevated": "#132238",
"ink": "#edf3ff",
"muted": "#8fa3c7",
"accent": "#74c0fc",
"line": "#223753",
"ok": "#59d7a4",
"danger": "#ff7b72",
},
"graphite": {
"bg": "#0d1117",
"surface": "#161b22",
"surface_elevated": "#1f2937",
"ink": "#f5f7fb",
"muted": "#95a3b8",
"accent": "#f2cc60",
"line": "#2d3748",
"ok": "#56d364",
"danger": "#ff938a",
},
}
[docs]
_STATIC_ROOT = files("corvix.web").joinpath("static")
[docs]
_STATIC_ASSETS_DIR = str(_STATIC_ROOT.joinpath("assets"))
[docs]
_ASSET_FILENAMES = ("app.js", "index.css", "favicon.svg")
[docs]
_ASSET_CACHE_CONTROL = CacheControlHeader(public=True, max_age=31536000, immutable=True)
[docs]
def _asset_version_token() -> str:
digest = hashlib.sha256()
found_asset = False
for asset_name in _ASSET_FILENAMES:
asset_file = _STATIC_ROOT.joinpath("assets", asset_name)
if not asset_file.is_file():
continue
found_asset = True
digest.update(asset_name.encode("utf-8"))
digest.update(asset_file.read_bytes())
if not found_asset:
return "dev"
return digest.hexdigest()[:12]
[docs]
_INDEX_HTML_TEMPLATE = _STATIC_ROOT.joinpath("index.html").read_text(encoding="utf-8")
[docs]
INDEX_HTML = _INDEX_HTML_TEMPLATE.replace("__ASSET_VERSION__", _asset_version_token())
[docs]
_LOGIN_HTML = """\
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Corvix — Sign in</title>
<style>
*, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: system-ui, sans-serif;
display: flex; height: 100vh;
align-items: center; justify-content: center;
background: #07111f; color: #edf3ff;
}
form {
display: flex; flex-direction: column; gap: 1rem;
min-width: 300px; padding: 2rem;
background: #0e1a2b; border-radius: 8px;
}
h2 { font-size: 1.4rem; color: #74c0fc; }
input[type=password] {
padding: .65rem .9rem; border-radius: 6px;
border: 1px solid #223753; background: #132238;
color: #edf3ff; font-size: 1rem;
}
input[type=password]:focus { outline: 2px solid #74c0fc; border-color: transparent; }
button {
padding: .65rem .9rem; border-radius: 6px; border: none;
background: #74c0fc; color: #07111f;
font-size: 1rem; font-weight: 600; cursor: pointer;
}
button:hover { background: #a5d4ff; }
</style>
</head>
<body>
<form method="post" action="/login">
<h2>Corvix</h2>
<input type="password" name="token" placeholder="Secret token" required autofocus>
<button type="submit">Sign in</button>
</form>
</body>
</html>"""
@get("/", sync_to_thread=False)
[docs]
def index() -> Response[str]:
"""Serve the dashboard single-page UI."""
return Response(content=INDEX_HTML, media_type=_MEDIA_TYPE_HTML)
@get("/dashboards/{dashboard_name:str}", sync_to_thread=False)
[docs]
def dashboard_index(dashboard_name: str) -> Response[str]:
"""Serve the dashboard SPA for bookmarkable dashboard URLs."""
del dashboard_name
return Response(content=INDEX_HTML, media_type=_MEDIA_TYPE_HTML)
[docs]
def _get_auth_secret() -> str:
"""Return the configured secret, delegating to middleware._get_secret().
Using the shared implementation ensures consistent TTL caching, memoized
misconfiguration logging, and ``_FILE`` support in one place.
"""
return _get_secret()
@get("/login", sync_to_thread=False)
[docs]
def login_page() -> Response[Any]:
"""Serve the login form, or redirect to / when auth is not configured."""
if not _get_auth_secret():
return Redirect("/")
return Response(content=_LOGIN_HTML, media_type=_MEDIA_TYPE_HTML)
@post("/login")
[docs]
async def login(request: Request) -> Response[None]:
"""Validate the submitted token and issue a session cookie on success.
The ``Secure`` attribute is set when the request arrived over HTTPS.
The scheme is read from ``request.url.scheme`` which reflects the real
protocol when uvicorn is started with ``--proxy-headers`` (trusting
``X-Forwarded-Proto`` from a reverse proxy). This avoids raw header
inspection from application code, which can be spoofed by untrusted
clients not behind a proxy.
"""
form_data = await request.form()
token = str(form_data.get("token", ""))
secret = _get_auth_secret()
if not secret or not hmac.compare_digest(token, secret):
raise HTTPException(status_code=401, detail="Invalid token")
session_val = _make_session_cookie(secret)
redirect: Response[None] = Redirect("/")
redirect.set_cookie(
Cookie(
key="corvix_session",
value=session_val,
httponly=True,
samesite="strict",
path="/",
max_age=SESSION_MAX_AGE_SECONDS,
secure=request.url.scheme == "https",
)
)
return redirect
@get("/logout", sync_to_thread=False)
[docs]
def logout() -> Response[None]:
"""Clear the session cookie and redirect to the login page."""
redirect: Response[None] = Redirect("/login")
redirect.delete_cookie("corvix_session")
return redirect
[docs]
def _health_error(poller_status: PollerStatus) -> dict[str, object]:
raw_detail: str | None = poller_status.last_error
if isinstance(raw_detail, str):
raw_detail = raw_detail.split("\n")[-1].strip() or raw_detail
return {"status": "unhealthy", "reason": "poller_error", "detail": raw_detail}
[docs]
def _health_check_staleness(last_poll_str: str) -> dict[str, object]:
try:
last_poll = parse_timestamp(last_poll_str)
except ValueError:
return {"status": "unhealthy", "reason": "invalid_poll_time"}
staleness = datetime.now(tz=UTC) - last_poll
if staleness > timedelta(minutes=5):
return {
"status": "unhealthy",
"reason": "stale",
"last_poll_seconds_ago": int(staleness.total_seconds()),
}
return {"status": "ok"}
[docs]
def _health_response(payload: dict[str, object]) -> Response[dict[str, object]]:
status_code = HTTPStatus.OK if payload.get("status") == "ok" else HTTPStatus.SERVICE_UNAVAILABLE
return Response(content=payload, status_code=status_code, media_type="application/json")
[docs]
def _read_health_poller_status() -> PollerStatus | dict[str, object]:
"""Resolve the poller status for the health check, or a failure payload."""
try:
_load_runtime_config()
except HTTPException:
return {"status": "unhealthy", "reason": "config_unavailable"}
try:
storage = _get_storage()
except HTTPException:
return {"status": "unhealthy", "reason": "storage_unavailable"}
try:
return storage.load_status()
except (OSError, json.JSONDecodeError):
return {"status": "unhealthy", "reason": "invalid_cache"}
except Exception:
logger.exception("Failed to read poller status from storage")
return {"status": "unhealthy", "reason": "storage_unavailable"}
# ---------------------------------------------------------------------------
# Implementation helpers
# These plain (non-decorated) functions contain the business logic shared
# between the versioned /api/v1/ route handlers and the deprecated /api/
# backward-compat wrappers. Never call the decorated Litestar route handlers
# directly — use these helpers instead.
# ---------------------------------------------------------------------------
[docs]
def _health_impl(extra_headers: dict[str, str] | None = None) -> Response[dict[str, object]]:
"""Compute and return the health check response."""
poller_status = _read_health_poller_status()
payload: dict[str, object]
if isinstance(poller_status, dict):
payload = poller_status
elif poller_status.status == "error":
payload = _health_error(poller_status)
elif poller_status.status in {"unknown", "starting"}:
payload = {"status": "unhealthy", "reason": "poller_not_running"}
else:
last_poll_str = poller_status.last_poll_time
if not last_poll_str:
payload = {"status": "unhealthy", "reason": "invalid_poll_time"}
else:
payload = _health_check_staleness(last_poll_str)
status_code = HTTPStatus.OK if payload.get("status") == "ok" else HTTPStatus.SERVICE_UNAVAILABLE
return Response(
content=payload,
status_code=int(status_code),
media_type="application/json",
headers=extra_headers,
)
[docs]
def _snapshot_impl(dashboard: str | None = None) -> SnapshotResponse:
"""Compute and return the typed snapshot payload."""
config = _load_runtime_config()
storage = _get_storage()
generated_at, records = storage.load_records()
try:
poller_status = storage.load_status()
except (OSError, json.JSONDecodeError):
poller_status = PollerStatus()
selected_dashboard = _select_dashboard(config.dashboards, dashboard)
data = build_dashboard_data(
records=records,
dashboard=selected_dashboard,
generated_at=generated_at,
)
last_poll_str = poller_status.last_poll_time
stale = False
if last_poll_str:
try:
last_poll = parse_timestamp(last_poll_str)
stale = (datetime.now(tz=UTC) - last_poll) > timedelta(minutes=5)
except ValueError:
stale = True
else:
stale = True
raw_last_error: str | None = poller_status.last_error
if isinstance(raw_last_error, str):
raw_last_error = raw_last_error.split("\n")[-1].strip() or raw_last_error
poller = PollerStatusResponse(
status=poller_status.status,
last_poll_time=last_poll_str,
last_error=raw_last_error,
last_error_time=poller_status.last_error_time,
stale=stale,
account_errors=[
AccountErrorResponse(account_id=e.account_id, account_label=e.account_label, error=e.error)
for e in poller_status.account_errors
],
)
return build_snapshot_response(
data=data,
dashboard_names=_dashboard_names(config.dashboards),
poller=poller,
notifications_config=config.notifications,
)
[docs]
def _notification_rule_snippets_impl(
account_id: str,
thread_id: str,
dashboard: str | None = None,
) -> RuleSnippetsResponse:
"""Compute and return the typed rule-snippets payload."""
config = _load_runtime_config()
selected_dashboard = _select_dashboard(config.dashboards, dashboard)
_require_account(config=config, account_id=account_id)
_generated_at, records = _get_storage().load_records()
record = _find_record(records=records, account_id=account_id, thread_id=thread_id)
if record is None:
msg = f"Notification '{account_id}/{thread_id}' not found in storage."
raise HTTPException(status_code=404, detail=msg)
base_match = _rule_match_lines(record=record, include_context=False)
context_match = _rule_match_lines(record=record, include_context=True)
return RuleSnippetsResponse(
dashboard_name=selected_dashboard.name,
dashboard_ignore_rule_snippet=_dashboard_ignore_rule_snippet(base_match),
global_exclude_rule_snippet=_global_exclude_rule_snippet(record=record, match_lines=base_match),
dashboard_ignore_rule_with_context_snippet=(
_dashboard_ignore_rule_snippet(context_match) if context_match is not None else None
),
global_exclude_rule_with_context_snippet=(
_global_exclude_rule_snippet(record=record, match_lines=context_match)
if context_match is not None
else None
),
has_context=bool(record.context),
)
# ---------------------------------------------------------------------------
# /api/v1/ — versioned route handlers (current API)
# ---------------------------------------------------------------------------
@get("/api/v1/health")
[docs]
def health() -> Response[dict[str, object]]:
"""Health endpoint for container checks.
Returns 200 with {"status": "ok"} when config and storage are readable,
the poller is running, and the poller's last poll time is not stale.
Returns 503 with {"status": "unhealthy"} and one of these reasons:
"config_unavailable", "storage_unavailable", "invalid_cache",
"poller_not_running", "poller_error", "invalid_poll_time", or "stale".
"""
return _health_impl()
@get("/metrics", sync_to_thread=False)
[docs]
def metrics_endpoint() -> Response[bytes]:
"""Expose Prometheus metrics in text exposition format for scraping."""
payload, content_type = _metrics.render_latest()
# Litestar appends "; charset=utf-8" to text media types, so strip any
# charset already present in the Prometheus content type to avoid a duplicate.
media_type = content_type.split("; charset=", 1)[0]
return Response(content=payload, media_type=media_type)
@get("/api/v1/themes", sync_to_thread=False)
[docs]
def api_themes() -> dict[str, object]:
"""Return available theme presets."""
return {"themes": THEMES}
@get("/api/v1/dashboards")
[docs]
def dashboards() -> dict[str, object]:
"""List configured dashboard names."""
config = _load_runtime_config()
names = _dashboard_names(config.dashboards)
return {"dashboard_names": names}
@get("/api/v1/snapshot")
[docs]
def snapshot(dashboard: str | None = None) -> SnapshotResponse:
"""Return the selected dashboard data from storage."""
return _snapshot_impl(dashboard=dashboard)
# ---------------------------------------------------------------------------
# Server-Sent Events: push snapshot updates instead of client-side polling.
# ---------------------------------------------------------------------------
# The server polls storage on a short interval and pushes a ``snapshot`` event
# only when the serialized payload actually changes, so idle connections cost a
# periodic comparison (plus an occasional keep-alive) rather than a full
# response on every tick. Browsers reconnect automatically via EventSource, and
# the frontend falls back to interval polling when SSE is unavailable.
#
# A short-lived, process-wide cache of the serialized body (keyed by dashboard)
# collapses the storage reads of concurrent connections watching the same
# dashboard into roughly one read per poll interval, instead of one read per
# connection per tick.
[docs]
_SSE_DEFAULT_POLL_INTERVAL_SECONDS = 3.0
[docs]
_SSE_KEEPALIVE_SECONDS = 20.0
[docs]
def _sse_poll_interval() -> float:
"""Return the server-side SSE poll interval in seconds.
Read from ``CORVIX_SSE_POLL_INTERVAL_SECONDS``; falls back to the default
when unset, non-numeric, or non-positive.
"""
raw = environ.get("CORVIX_SSE_POLL_INTERVAL_SECONDS")
if raw is None:
return _SSE_DEFAULT_POLL_INTERVAL_SECONDS
try:
value = float(raw)
except ValueError:
return _SSE_DEFAULT_POLL_INTERVAL_SECONDS
return value if value > 0 else _SSE_DEFAULT_POLL_INTERVAL_SECONDS
[docs]
def _snapshot_event_body(dashboard: str | None) -> str:
"""Build the snapshot payload and serialize it to a compact JSON string.
Uses msgspec (the encoder Litestar uses for the equivalent HTTP route) so the
SSE body and the ``GET /api/v1/snapshot`` response share one serialization
path and stay byte-for-byte consistent.
"""
payload = _snapshot_impl(dashboard=dashboard)
return encode_json(payload).decode("utf-8")
# Process-wide cache of the most recently built SSE body per dashboard, used to
# deduplicate the storage reads of concurrent connections. Guarded by a lock so
# only one worker thread rebuilds at a time; the set of dashboards is bounded by
# config, so the dict does not grow without bound.
[docs]
_snapshot_body_cache: dict[str | None, tuple[float, str]] = {}
[docs]
_snapshot_body_cache_lock = threading.Lock()
[docs]
def _cached_snapshot_event_body(dashboard: str | None, ttl: float) -> str:
"""Return the snapshot body for *dashboard*, reusing a build newer than *ttl*.
Within a ``ttl``-second window (one poll interval) concurrent SSE
connections watching the same dashboard share a single storage read and
serialization. A strict ``age < ttl`` comparison means a lone connection,
whose ticks are spaced one interval apart, still rebuilds every tick — so
the cache adds no latency in the common single-client case.
"""
now = monotonic()
with _snapshot_body_cache_lock:
cached = _snapshot_body_cache.get(dashboard)
if cached is not None and now - cached[0] < ttl:
return cached[1]
body = _snapshot_event_body(dashboard)
_snapshot_body_cache[dashboard] = (monotonic(), body)
return body
[docs]
def _snapshot_error_payload(error: Exception) -> str:
"""Serialize an SSE ``snapshot-error`` payload for *error*.
``HTTPException`` carries a client-safe detail and status code; any other
exception is reported generically (its message is not leaked to the client).
"""
if isinstance(error, HTTPException):
detail = error.detail if isinstance(error.detail, str) else "Unable to build snapshot."
status_code = error.status_code
else:
detail = "Internal server error."
status_code = 500
return json.dumps({"detail": detail, "status_code": status_code})
[docs]
async def _snapshot_event_generator(dashboard: str | None) -> AsyncIterator[ServerSentEventMessage]:
"""Yield SSE messages for *dashboard*, pushing only on change.
Emits a ``snapshot`` event whenever the serialized payload differs from the
last one sent, a ``snapshot-error`` event when the payload cannot be
produced, and a comment-only keep-alive when nothing has changed for a while
(so proxies do not drop an idle connection). The blocking storage read runs
in a worker thread to avoid stalling the event loop.
Any error building the snapshot is reported to the client and the stream
keeps running, recovering on a later tick; this avoids tearing down the
connection (and triggering a client reconnection storm) on a transient
storage or serialization failure.
"""
interval = _sse_poll_interval()
last_digest: str | None = None
last_emit = monotonic()
while True:
try:
body = await asyncio.to_thread(_cached_snapshot_event_body, dashboard, interval)
except Exception as error:
if not isinstance(error, HTTPException):
logger.exception("Unexpected error building SSE snapshot")
# A distinct event name (not "error") so it does not collide with
# the EventSource connection-error event on the client.
yield ServerSentEventMessage(data=_snapshot_error_payload(error), event="snapshot-error")
last_digest = None
last_emit = monotonic()
await asyncio.sleep(interval)
continue
digest = hashlib.sha256(body.encode("utf-8")).hexdigest()
now = monotonic()
if digest != last_digest:
last_digest = digest
last_emit = now
yield ServerSentEventMessage(data=body, event="snapshot")
elif now - last_emit >= _SSE_KEEPALIVE_SECONDS:
last_emit = now
yield ServerSentEventMessage(comment="keep-alive")
await asyncio.sleep(interval)
@get("/api/v1/events")
[docs]
async def events(dashboard: str | None = None) -> ServerSentEvent:
"""Stream dashboard snapshots as Server-Sent Events.
Replaces fixed-interval client polling: the connection stays open and the
server pushes a ``snapshot`` event only when the data changes, cutting both
latency and per-cycle overhead when nothing has happened.
"""
return ServerSentEvent(_snapshot_event_generator(dashboard))
@get("/api/v1/notifications/{account_id:str}/{thread_id:str}/rule-snippets")
[docs]
def notification_rule_snippets(
account_id: str,
thread_id: str,
dashboard: str | None = None,
) -> RuleSnippetsResponse:
"""Return prefilled ignore-rule snippets for a notification."""
return _notification_rule_snippets_impl(account_id=account_id, thread_id=thread_id, dashboard=dashboard)
@post("/api/v1/notifications/{account_id:str}/{thread_id:str}/dismiss", sync_to_thread=True)
[docs]
def dismiss_notification(account_id: str, thread_id: str) -> Response[None]:
"""Dismiss a notification thread (removes it from the GitHub inbox).
Calls DELETE /notifications/threads/{id} on GitHub, then marks the record
as dismissed in local storage. Returns 204 No Content on success.
"""
return _dismiss_notification_impl(account_id=account_id, thread_id=thread_id)
@post("/api/v1/notifications/{account_id:str}/{thread_id:str}/mark-read", sync_to_thread=True)
[docs]
def mark_notification_read(account_id: str, thread_id: str) -> Response[None]:
"""Mark a notification thread as read in GitHub and local storage."""
return _mark_notification_read_impl(account_id=account_id, thread_id=thread_id)
# ---------------------------------------------------------------------------
# Deprecated /api/ routes — kept for backward compatibility during transition.
# All routes below mirror their /api/v1/ counterparts but include a
# ``Deprecation: true`` response header (RFC 8594). Clients should migrate to
# the /api/v1/ equivalents. These routes will be removed in a future release.
# ---------------------------------------------------------------------------
@get("/api/health")
[docs]
def health_deprecated() -> Response[dict[str, object]]:
"""Deprecated: use /api/v1/health."""
return _health_impl(extra_headers=_DEPRECATED_HEADERS)
@get("/api/themes", sync_to_thread=False)
[docs]
def api_themes_deprecated() -> Response[dict[str, object]]:
"""Deprecated: use /api/v1/themes."""
return Response(content={"themes": THEMES}, headers=_DEPRECATED_HEADERS)
@get("/api/dashboards")
[docs]
def dashboards_deprecated() -> Response[dict[str, object]]:
"""Deprecated: use /api/v1/dashboards."""
config = _load_runtime_config()
names = _dashboard_names(config.dashboards)
return Response(content={"dashboard_names": names}, headers=_DEPRECATED_HEADERS)
@get("/api/snapshot")
[docs]
def snapshot_deprecated(dashboard: str | None = None) -> Response[SnapshotResponse]:
"""Deprecated: use /api/v1/snapshot."""
return Response(content=_snapshot_impl(dashboard=dashboard), headers=_DEPRECATED_HEADERS)
@get("/api/notifications/{account_id:str}/{thread_id:str}/rule-snippets")
[docs]
def notification_rule_snippets_deprecated(
account_id: str,
thread_id: str,
dashboard: str | None = None,
) -> Response[RuleSnippetsResponse]:
"""Deprecated: use /api/v1/notifications/{account_id}/{thread_id}/rule-snippets."""
return Response(
content=_notification_rule_snippets_impl(
account_id=account_id,
thread_id=thread_id,
dashboard=dashboard,
),
headers=_DEPRECATED_HEADERS,
)
@post("/api/notifications/{account_id:str}/{thread_id:str}/dismiss", sync_to_thread=True)
[docs]
def dismiss_notification_deprecated(account_id: str, thread_id: str) -> Response[None]:
"""Deprecated: use /api/v1/notifications/{account_id}/{thread_id}/dismiss."""
_dismiss_notification_impl(account_id=account_id, thread_id=thread_id)
return Response(content=None, status_code=204, headers=_DEPRECATED_HEADERS)
@post("/api/notifications/{thread_id:str}/dismiss", sync_to_thread=True)
[docs]
def dismiss_notification_default_account(thread_id: str) -> Response[None]:
"""Deprecated: use /api/v1/notifications/{account_id}/{thread_id}/dismiss."""
config = _load_runtime_config()
_dismiss_notification_impl(account_id=_default_account_id(config), thread_id=thread_id)
return Response(content=None, status_code=204, headers=_DEPRECATED_HEADERS)
@post("/api/notifications/{account_id:str}/{thread_id:str}/mark-read", sync_to_thread=True)
[docs]
def mark_notification_read_deprecated(account_id: str, thread_id: str) -> Response[None]:
"""Deprecated: use /api/v1/notifications/{account_id}/{thread_id}/mark-read."""
_mark_notification_read_impl(account_id=account_id, thread_id=thread_id)
return Response(content=None, status_code=204, headers=_DEPRECATED_HEADERS)
@post("/api/notifications/{thread_id:str}/mark-read", sync_to_thread=True)
[docs]
def mark_notification_read_default_account(thread_id: str) -> Response[None]:
"""Deprecated: use /api/v1/notifications/{account_id}/{thread_id}/mark-read."""
config = _load_runtime_config()
_mark_notification_read_impl(account_id=_default_account_id(config), thread_id=thread_id)
return Response(content=None, status_code=204, headers=_DEPRECATED_HEADERS)
[docs]
def _dismiss_notification_impl(account_id: str, thread_id: str) -> Response[None]:
config = _load_runtime_config()
account = _require_account(config=config, account_id=account_id)
try:
token = get_env_value(account.token_env)
except ValueError as error:
raise HTTPException(status_code=500, detail=str(error)) from error
if not token:
msg = f"GitHub token env var '{account.token_env}' (or '{account.token_env}_FILE') is not set."
raise HTTPException(status_code=500, detail=msg)
client = _build_github_client(config=config, account=account, token=token)
try:
client.dismiss_thread(thread_id)
except Exception as error:
logger.exception("Failed to dismiss thread", extra={"thread_id": thread_id})
msg = f"Failed to dismiss thread {thread_id}: {error}"
raise HTTPException(status_code=502, detail=msg) from error
_get_storage().dismiss_record(thread_id=thread_id, account_id=account_id)
return Response(content=None, status_code=204)
[docs]
def _mark_notification_read_impl(account_id: str, thread_id: str) -> Response[None]:
config = _load_runtime_config()
account = _require_account(config=config, account_id=account_id)
try:
token = get_env_value(account.token_env)
except ValueError as error:
raise HTTPException(status_code=500, detail=str(error)) from error
if not token:
msg = f"GitHub token env var '{account.token_env}' (or '{account.token_env}_FILE') is not set."
raise HTTPException(status_code=500, detail=msg)
client = _build_github_client(config=config, account=account, token=token)
try:
client.mark_thread_read(thread_id)
except Exception as error:
logger.exception("Failed to mark thread as read", extra={"thread_id": thread_id})
msg = f"Failed to mark thread {thread_id} as read."
raise HTTPException(status_code=502, detail=msg) from error
_get_storage().mark_record_read(thread_id=thread_id, account_id=account_id)
return Response(content=None, status_code=204)
[docs]
def _require_account(config: AppConfig, account_id: str) -> GitHubAccountConfig:
for account in config.github.accounts:
if account.id == account_id:
return account
msg = f"GitHub account '{account_id}' not found in config."
raise HTTPException(status_code=404, detail=msg)
[docs]
def _build_github_client(config: AppConfig, account: GitHubAccountConfig, token: str) -> GitHubNotificationsClient:
return GitHubNotificationsClient(
token=token,
api_base_url=account.api_base_url,
request_timeout_seconds=config.polling.request_timeout_seconds,
)
[docs]
def _default_account_id(config: AppConfig) -> str:
if not config.github.accounts:
msg = "No GitHub accounts configured."
raise HTTPException(status_code=500, detail=msg)
return config.github.accounts[0].id
[docs]
def _find_record(
*,
records: list[NotificationRecord],
account_id: str,
thread_id: str,
) -> NotificationRecord | None:
for record in records:
if record.notification.account_id == account_id and record.notification.thread_id == thread_id:
return record
return None
[docs]
def _yaml_quoted(value: str) -> str:
escaped = value.replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped}"'
[docs]
def _yaml_scalar(value: object) -> str:
if isinstance(value, str):
return _yaml_quoted(value)
if isinstance(value, bool):
return "true" if value else "false"
return str(value)
[docs]
def _slug_token(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", value.lower()).strip("-")
return slug or "rule"
[docs]
def _rule_name_for_record(record: NotificationRecord) -> str:
notification = record.notification
repository = notification.repository
reason = notification.reason
subject_type = notification.subject_type
return f"ignore-{_slug_token(repository)}-{_slug_token(reason)}-{_slug_token(subject_type)}"
@overload
[docs]
def _rule_match_lines(*, record: NotificationRecord, include_context: Literal[False]) -> list[str]: ...
@overload
def _rule_match_lines(*, record: NotificationRecord, include_context: Literal[True]) -> list[str] | None: ...
def _rule_match_lines(*, record: NotificationRecord, include_context: bool) -> list[str] | None:
notification = record.notification
repository = notification.repository
reason = notification.reason
subject_type = notification.subject_type
title_regex = _anchored_title_regex(notification.subject_title)
lines = [
f"repository_in: [{_yaml_quoted(repository)}]",
f"reason_in: [{_yaml_quoted(reason)}]",
f"subject_type_in: [{_yaml_quoted(subject_type)}]",
f"title_regex: {_yaml_quoted(title_regex)}",
]
if not include_context:
return lines
context_predicates = _context_predicate_lines(record=record)
if not context_predicates:
return None
return [*lines, "context:", *context_predicates]
[docs]
def _context_predicate_lines(*, record: NotificationRecord) -> list[str]:
context = record.context
candidate_paths = (
"github.latest_comment.is_ci_only",
"github.pr_state.state",
"github.pr_state.draft",
)
output: list[str] = []
for path in candidate_paths:
found, value = _context_path_value(context=context, path=path)
if not found:
continue
if isinstance(value, bool | int | float | str):
output.extend(
[
f" - path: {_yaml_quoted(path)}",
" op: equals",
f" value: {_yaml_scalar(value)}",
]
)
return output
[docs]
def _anchored_title_regex(title: str) -> str:
escaped = re.sub(r"([.^$*+?{}\[\]|()\\])", r"\\\1", title)
return f"^{escaped}$"
[docs]
def _context_path_value(*, context: Mapping[str, object], path: str) -> tuple[bool, object | None]:
current: object = context
for segment in path.split("."):
if not isinstance(current, dict):
return False, None
current_map = cast(dict[str, object], current)
next_value = current_map.get(segment)
if next_value is None and segment not in current_map:
return False, None
current = next_value
return True, current
[docs]
def _dashboard_ignore_rule_snippet(match_lines: list[str]) -> str:
body = "\n".join(f" {line}" for line in match_lines)
return f"- {body.lstrip()}"
[docs]
def _global_exclude_rule_snippet(*, record: NotificationRecord, match_lines: list[str]) -> str:
match_body = "\n".join(f" {line}" for line in match_lines)
return f"- name: {_rule_name_for_record(record)}\n match:\n{match_body}\n exclude_from_dashboards: true"
# ---------------------------------------------------------------------------
# Runtime-config cache
# ---------------------------------------------------------------------------
# AppConfig is expensive to produce (YAML read + full parse). We cache the
# result at module level and invalidate it only when the config file's mtime
# changes. A plain stat() per request is orders of magnitude cheaper than a
# full YAML re-parse, so we still detect on-disk edits without re-reading
# unconditionally on every HTTP request.
#
# The three cache fields live in a single mutable object so they can be
# updated without ``global`` statements (which ruff PLW0603 flags).
# ---------------------------------------------------------------------------
[docs]
class _ConfigCache:
"""Mutable container for the module-level AppConfig cache."""
[docs]
config: AppConfig | None = None
[docs]
path: str | None = None
[docs]
mtime: float | None = None
[docs]
_config_cache = _ConfigCache()
[docs]
class _StorageState:
"""Mutable container for the module-level storage backend."""
[docs]
backend: StorageBackend | None = None
[docs]
lock: threading.Lock = threading.Lock()
[docs]
_storage_state = _StorageState()
[docs]
def set_storage_backend(backend: StorageBackend | None) -> None:
"""Inject the storage backend used by route handlers.
Production wiring leaves this unset and the backend is built lazily from
config (PostgreSQL is required). Tests inject a backend directly and reset
it to ``None`` afterwards.
"""
_storage_state.backend = backend
[docs]
def _get_storage() -> StorageBackend:
"""Return the injected backend, or lazily build PostgreSQL storage from config.
The built backend is cached so its connection pool is reused across
requests. Building is guarded by a lock so concurrent first requests don't
each create (and leak) a connection pool. Raises ``HTTPException`` (500)
when no database is configured.
"""
if _storage_state.backend is not None:
return _storage_state.backend
with _storage_state.lock:
if _storage_state.backend is not None:
return _storage_state.backend
config = _load_runtime_config()
try:
backend = create_storage(config)
except StorageConfigError as error:
raise HTTPException(status_code=500, detail=str(error)) from error
_storage_state.backend = backend
return backend
[docs]
def _clear_config_cache() -> None:
"""Discard the cached AppConfig so the next request reloads from disk."""
_config_cache.config = None
_config_cache.path = None
_config_cache.mtime = None
logger.info("Config cache cleared; config will be reloaded on the next request.")
[docs]
def _load_runtime_config() -> AppConfig:
"""Return the cached AppConfig, re-parsing from disk only when the file changes.
Config is read from the path in the ``CORVIX_CONFIG`` environment variable
(default: ``corvix.yaml``). The file's mtime is checked on every call; the
YAML is only re-parsed when either the path or the mtime differs from the
last successful load, eliminating redundant I/O on every request.
"""
config_path = Path(environ.get("CORVIX_CONFIG", "corvix.yaml"))
config_path_str = str(config_path)
if not config_path.exists():
msg = f"Config file '{config_path}' does not exist."
raise HTTPException(status_code=500, detail=msg)
try:
mtime = config_path.stat().st_mtime
except OSError as error:
msg = f"Unable to read config at '{config_path}': {error}"
raise HTTPException(status_code=500, detail=msg) from error
if _config_cache.config is not None and _config_cache.path == config_path_str and _config_cache.mtime == mtime:
return _config_cache.config
try:
config = load_config(config_path)
except ValueError as error:
msg = f"Invalid config at '{config_path}': {error}"
raise HTTPException(status_code=500, detail=msg) from error
except OSError as error:
msg = f"Unable to read config at '{config_path}': {error}"
raise HTTPException(status_code=500, detail=msg) from error
_config_cache.config = config
_config_cache.path = config_path_str
_config_cache.mtime = mtime
logger.debug("Config loaded from '%s' (mtime=%.3f).", config_path, mtime)
return config
[docs]
def _install_sighup_handler() -> None:
"""Register a SIGHUP handler that clears the config cache.
Sending ``SIGHUP`` to the server process forces the config to be reloaded
from disk on the next request without restarting the process::
kill -HUP <pid>
The handler is a no-op on platforms that do not support SIGHUP (e.g. Windows).
"""
try:
signal.signal(signal.SIGHUP, lambda _sig, _frame: _clear_config_cache())
logger.debug("SIGHUP handler installed for config reload.")
except (AttributeError, OSError):
pass # SIGHUP is not available on all platforms (e.g. Windows)
_install_sighup_handler()
[docs]
def _select_dashboard(
dashboards: list[DashboardSpec],
selected_name: str | None,
) -> DashboardSpec:
available = available_dashboards(dashboards)
if selected_name is None:
return available[0]
for dashboard in available:
if dashboard.name == selected_name:
return dashboard
msg = f"Dashboard '{selected_name}' not found."
raise HTTPException(status_code=404, detail=msg)
[docs]
def _dashboard_names(dashboards: list[DashboardSpec]) -> list[str]:
available = available_dashboards(dashboards)
return [dashboard.name for dashboard in available]
[docs]
app = Litestar(
route_handlers=[
index,
dashboard_index,
login_page,
login,
logout,
metrics_endpoint,
# /api/v1/ — versioned routes (current)
health,
api_themes,
dashboards,
snapshot,
events,
notification_rule_snippets,
dismiss_notification,
mark_notification_read,
# /api/ — deprecated routes (backward compat; scheduled for removal)
health_deprecated,
api_themes_deprecated,
dashboards_deprecated,
snapshot_deprecated,
notification_rule_snippets_deprecated,
dismiss_notification_deprecated,
dismiss_notification_default_account,
mark_notification_read_deprecated,
mark_notification_read_default_account,
create_static_files_router(
path="/assets",
directories=[_STATIC_ASSETS_DIR],
cache_control=_ASSET_CACHE_CONTROL,
),
],
middleware=[ObservabilityMiddleware(), TokenAuthMiddleware()],
on_startup=[_configure_observability],
compression_config=CompressionConfig(backend="gzip", minimum_size=500),
openapi_config=OpenAPIConfig(
title="Corvix API",
# Schema version of the HTTP contract, not the package version: bumped
# deliberately when the API shape changes so the generated OpenAPI
# document (and the TypeScript types derived from it) stay stable.
version="1.0.0",
description="JSON API backing the Corvix dashboard single-page app.",
),
)
[docs]
def run() -> None:
"""Run app with uvicorn."""
host = environ.get("CORVIX_WEB_HOST", "0.0.0.0") # nosec B104 - intentional; Docker/container deployments need all-interfaces
port = int(environ.get("CORVIX_WEB_PORT", "8000"))
reload_enabled = environ.get("CORVIX_WEB_RELOAD", "false").lower() in {"1", "true", "yes"}
uvicorn.run(
"corvix.web.app:app",
host=host,
port=port,
reload=reload_enabled,
reload_dirs=["src"],
)