Source code for corvix.config

"""YAML configuration for Corvix dashboards and polling."""

from __future__ import annotations

import re
from dataclasses import dataclass, field
from pathlib import Path

import yaml

[docs] _POLLING_PER_PAGE_MIN = 1
[docs] _POLLING_PER_PAGE_MAX = 50
[docs] _CONTEXT_OPERATORS = {"equals", "not_equals", "contains", "regex", "in", "exists"}
@dataclass(slots=True)
[docs] class ContextPredicate: """Predicate evaluated against enriched notification context."""
[docs] path: str
[docs] op: str
[docs] value: object | None = None
[docs] case_insensitive: bool = False
@dataclass(slots=True)
[docs] class MatchCriteria: """Filter fields for rules and dashboards."""
[docs] repository_in: list[str] = field(default_factory=list)
[docs] repository_glob: list[str] = field(default_factory=list)
[docs] reason_in: list[str] = field(default_factory=list)
[docs] subject_type_in: list[str] = field(default_factory=list)
[docs] title_contains_any: list[str] = field(default_factory=list)
[docs] title_regex: str | None = None
[docs] unread: bool | None = None
[docs] min_score: float | None = None
[docs] max_age_hours: float | None = None
[docs] context: list[ContextPredicate] = field(default_factory=list)
@dataclass(slots=True)
[docs] class RuleAction: """Action emitted when a rule matches."""
[docs] action_type: str
@dataclass(slots=True)
[docs] class Rule: """Global or repository-scoped automation rule."""
[docs] name: str
[docs] match: MatchCriteria = field(default_factory=MatchCriteria)
[docs] actions: list[RuleAction] = field(default_factory=list)
[docs] exclude_from_dashboards: bool = False
@dataclass(slots=True)
[docs] class RuleSet: """Collection of global and per-repository rules."""
[docs] global_rules: list[Rule] = field(default_factory=list)
[docs] per_repository: dict[str, list[Rule]] = field(default_factory=dict)
@dataclass(slots=True)
[docs] class DashboardSpec: """Dashboard configuration for sorting, grouping, and filtering."""
[docs] name: str
[docs] group_by: str = "none"
[docs] sort_by: str = "score"
[docs] descending: bool = True
[docs] include_read: bool = False
[docs] max_items: int = 100
[docs] match: MatchCriteria = field(default_factory=MatchCriteria)
[docs] ignore_rules: list[MatchCriteria] = field(default_factory=list)
@dataclass(slots=True)
[docs] class ScoringConfig: """Configurable scoring model for notifications."""
[docs] unread_bonus: float = 15.0
[docs] age_decay_per_hour: float = 0.25
[docs] reason_weights: dict[str, float] = field(default_factory=dict)
[docs] repository_weights: dict[str, float] = field(default_factory=dict)
[docs] subject_type_weights: dict[str, float] = field(default_factory=dict)
[docs] title_keyword_weights: dict[str, float] = field(default_factory=dict)
@dataclass(slots=True)
[docs] class PollingConfig: """Polling behavior for ingestion."""
[docs] interval_seconds: int = 60
[docs] per_page: int = 50
[docs] max_pages: int = 5
[docs] all: bool = False
[docs] participating: bool = False
@dataclass(slots=True)
[docs] class GitHubConfig: """GitHub API configuration."""
[docs] accounts: list[GitHubAccountConfig] = field(default_factory=list)
@property
[docs] def token_env(self) -> str: """Backward-compatible shortcut to first account token env.""" return self.accounts[0].token_env if self.accounts else "GITHUB_TOKEN"
@property
[docs] def api_base_url(self) -> str: """Backward-compatible shortcut to first account API base URL.""" return self.accounts[0].api_base_url if self.accounts else "https://api.github.com"
@dataclass(slots=True)
[docs] class GitHubAccountConfig: """One GitHub account configuration for multi-account polling."""
[docs] id: str
[docs] label: str
[docs] token_env: str
[docs] api_base_url: str = "https://api.github.com"
@dataclass(slots=True)
[docs] class GitHubLatestCommentEnrichmentConfig: """Config for enriching comment notifications with latest-comment metadata."""
[docs] enabled: bool = False
[docs] timeout_seconds: float = 10.0
@dataclass(slots=True)
[docs] class GitHubPRStateEnrichmentConfig: """Config for enriching pull-request notifications with PR state."""
[docs] enabled: bool = False
[docs] timeout_seconds: float = 10.0
@dataclass(slots=True)
[docs] class EnrichmentConfig: """Top-level enrichment configuration."""
[docs] enabled: bool = False
[docs] max_requests_per_cycle: int = 25
[docs] github_latest_comment: GitHubLatestCommentEnrichmentConfig = field( default_factory=GitHubLatestCommentEnrichmentConfig )
[docs] github_pr_state: GitHubPRStateEnrichmentConfig = field(default_factory=GitHubPRStateEnrichmentConfig)
@dataclass(slots=True)
[docs] class StateConfig: """State/cache location for persisted notifications."""
[docs] cache_file: Path = Path("~/.cache/corvix/notifications.json")
@dataclass(slots=True)
[docs] class BrowserTabTargetConfig: """Config for in-tab browser notification delivery."""
[docs] enabled: bool = True
[docs] max_per_cycle: int = 5
[docs] cooldown_seconds: int = 10
@dataclass(slots=True)
[docs] class WebPushTargetConfig: """Config for background Web Push notification delivery (phase 2)."""
[docs] enabled: bool = False
[docs] vapid_public_key_env: str = "CORVIX_VAPID_PUBLIC_KEY"
[docs] vapid_private_key_env: str = "CORVIX_VAPID_PRIVATE_KEY"
[docs] subject: str = ""
@dataclass(slots=True)
[docs] class NotificationsDetectConfig: """Controls which records qualify for notification events."""
[docs] include_read: bool = False
[docs] min_score: float = 0.0
@dataclass(slots=True)
[docs] class NotificationsConfig: """Top-level notifications configuration."""
[docs] enabled: bool = True
[docs] detect: NotificationsDetectConfig = field(default_factory=NotificationsDetectConfig)
[docs] browser_tab: BrowserTabTargetConfig = field(default_factory=BrowserTabTargetConfig)
[docs] web_push: WebPushTargetConfig = field(default_factory=WebPushTargetConfig)
@dataclass(slots=True)
[docs] class AuthConfig: """Authentication mode configuration."""
[docs] mode: str = "single_user" # single_user | multi_user
[docs] session_secret: str = ""
@dataclass(slots=True)
[docs] class DatabaseConfig: """PostgreSQL connection configuration."""
[docs] url_env: str = "DATABASE_URL"
@dataclass(slots=True)
[docs] class AppConfig: """Top-level application config."""
[docs] github: GitHubConfig = field(default_factory=GitHubConfig)
[docs] enrichment: EnrichmentConfig = field(default_factory=EnrichmentConfig)
[docs] polling: PollingConfig = field(default_factory=PollingConfig)
[docs] state: StateConfig = field(default_factory=StateConfig)
[docs] scoring: ScoringConfig = field(default_factory=ScoringConfig)
[docs] rules: RuleSet = field(default_factory=RuleSet)
[docs] dashboards: list[DashboardSpec] = field(default_factory=list)
[docs] auth: AuthConfig = field(default_factory=AuthConfig)
[docs] database: DatabaseConfig = field(default_factory=DatabaseConfig)
[docs] notifications: NotificationsConfig = field(default_factory=NotificationsConfig)
[docs] def resolve_cache_file(self) -> Path: """Resolve the configured cache path.""" return self.state.cache_file.expanduser().resolve()
[docs] DEFAULT_CONFIG = """\ github: accounts: - id: primary label: Primary token_env: GITHUB_TOKEN api_base_url: https://api.github.com enrichment: enabled: false max_requests_per_cycle: 25 github_latest_comment: enabled: false timeout_seconds: 10 github_pr_state: enabled: false timeout_seconds: 10 polling: interval_seconds: 60 per_page: 50 max_pages: 5 all: false participating: false state: cache_file: ~/.cache/corvix/notifications.json scoring: unread_bonus: 15 age_decay_per_hour: 0.25 reason_weights: mention: 50 review_requested: 40 assign: 30 author: 10 repository_weights: your-org/critical-repo: 25 subject_type_weights: PullRequest: 10 title_keyword_weights: security: 20 urgent: 15 rules: global: - name: mute-bot-noise match: title_regex: ".*\\[bot\\].*" actions: - type: mark_read exclude_from_dashboards: true per_repository: your-org/infra: - name: mute-chore-prs match: title_contains_any: ["chore", "deps"] actions: - type: mark_read exclude_from_dashboards: true dashboards: - name: overview group_by: reason sort_by: updated_at descending: true include_read: true max_items: 200 - name: triage group_by: repository sort_by: score descending: true include_read: false max_items: 100 match: reason_in: ["mention", "review_requested", "assign"] ignore_rules: - reason_in: ["comment"] context: - path: github.latest_comment.is_ci_only op: equals value: true """
[docs] def _ensure_map(value: object, section: str) -> dict[str, object]: if not isinstance(value, dict): msg = f"Config section '{section}' must be a map/object." raise ValueError(msg) output: dict[str, object] = {} for key, item in value.items(): if not isinstance(key, str): msg = f"Config section '{section}' must use string keys." raise ValueError(msg) output[key] = item return output
[docs] def _ensure_list(value: object, section: str) -> list[object]: if isinstance(value, list): return [item for item in value] msg = f"Config section '{section}' must be a list." raise ValueError(msg)
[docs] def _as_bool(value: object, field: str) -> bool: if isinstance(value, bool): return value msg = f"Config field '{field}' must be a boolean." raise ValueError(msg)
[docs] def _as_int(value: object, field: str) -> int: if isinstance(value, bool) or not isinstance(value, int): msg = f"Config field '{field}' must be an integer." raise ValueError(msg) return value
[docs] def _as_float(value: object, field: str) -> float: if isinstance(value, bool) or not isinstance(value, int | float): msg = f"Config field '{field}' must be a number." raise ValueError(msg) return float(value)
[docs] def _as_str(value: object, field: str) -> str: if isinstance(value, str): return value msg = f"Config field '{field}' must be a string." raise ValueError(msg)
[docs] def _get_str(config: dict[str, object], key: str, default: str, field: str) -> str: if key not in config: return default return _as_str(config[key], field)
[docs] def _get_optional_str(config: dict[str, object], key: str, field: str) -> str | None: if key not in config or config[key] is None: return None return _as_str(config[key], field)
[docs] def _get_bool(config: dict[str, object], key: str, default: bool, field: str) -> bool: if key not in config: return default return _as_bool(config[key], field)
[docs] def _get_optional_bool(config: dict[str, object], key: str, field: str) -> bool | None: if key not in config or config[key] is None: return None return _as_bool(config[key], field)
[docs] def _get_int(config: dict[str, object], key: str, default: int, field: str) -> int: if key not in config: return default return _as_int(config[key], field)
[docs] def _get_float(config: dict[str, object], key: str, default: float, field: str) -> float: if key not in config: return default return _as_float(config[key], field)
[docs] def _get_optional_float(config: dict[str, object], key: str, field: str) -> float | None: if key not in config or config[key] is None: return None return _as_float(config[key], field)
[docs] def _to_str_list(value: object) -> list[str]: if value is None: return [] if not isinstance(value, list): msg = "Expected a list." raise ValueError(msg) output: list[str] = [] for item in value: if not isinstance(item, str): msg = "Expected a list of strings." raise ValueError(msg) output.append(item) return output
[docs] def _parse_match(value: object, *, section: str = "match") -> MatchCriteria: match = _ensure_map(value, section) return MatchCriteria( repository_in=_to_str_list(match.get("repository_in")), repository_glob=_to_str_list(match.get("repository_glob")), reason_in=_to_str_list(match.get("reason_in")), subject_type_in=_to_str_list(match.get("subject_type_in")), title_contains_any=_to_str_list(match.get("title_contains_any")), title_regex=_get_optional_str(match, "title_regex", f"{section}.title_regex"), unread=_get_optional_bool(match, "unread", f"{section}.unread"), min_score=_get_optional_float(match, "min_score", f"{section}.min_score"), max_age_hours=_get_optional_float(match, "max_age_hours", f"{section}.max_age_hours"), context=_parse_context_predicates(match.get("context", []), section=f"{section}.context"), )
[docs] def _parse_context_predicates(value: object, *, section: str = "match.context") -> list[ContextPredicate]: predicates = _ensure_list(value, section) return [_parse_context_predicate(item, section=f"{section}[]") for item in predicates]
[docs] def _parse_context_predicate(value: object, *, section: str = "match.context[]") -> ContextPredicate: predicate = _ensure_map(value, f"{section} predicate") path_raw = predicate.get("path") if not isinstance(path_raw, str) or not path_raw.strip(): msg = f"Config field '{section}.path' is required." raise ValueError(msg) op_raw = predicate.get("op") if not isinstance(op_raw, str): supported = ", ".join(sorted(_CONTEXT_OPERATORS)) msg = f"Config field '{section}.op' must be one of: {supported}." raise ValueError(msg) op = op_raw.strip() if op not in _CONTEXT_OPERATORS: supported = ", ".join(sorted(_CONTEXT_OPERATORS)) msg = f"Config field '{section}.op' must be one of: {supported}." raise ValueError(msg) predicate_value = predicate.get("value") if op == "regex": if not isinstance(predicate_value, str): msg = f"Config field '{section}.value' must be a string when op is 'regex'." raise ValueError(msg) try: re.compile(predicate_value) except re.error as error: msg = f"Config field '{section}.value' contains an invalid regex: {error}." raise ValueError(msg) from error return ContextPredicate( path=path_raw.strip(), op=op, value=predicate_value, case_insensitive=_get_bool(predicate, "case_insensitive", False, f"{section}.case_insensitive"), )
[docs] def _parse_rules(value: object) -> RuleSet: rules_map = _ensure_map(value, "rules") global_rules = [_parse_rule(item) for item in _ensure_list(rules_map.get("global", []), "rules.global")] per_repo_rules: dict[str, list[Rule]] = {} per_repo = _ensure_map(rules_map.get("per_repository", {}), "rules.per_repository") for repository, raw_rules in per_repo.items(): per_repo_rules[repository] = [ _parse_rule(item) for item in _ensure_list(raw_rules, f"rules.per_repository.{repository}") ] return RuleSet(global_rules=global_rules, per_repository=per_repo_rules)
[docs] def _parse_rule(value: object) -> Rule: rule_map = _ensure_map(value, "rule") name = _get_str(rule_map, "name", "unnamed-rule", "rule.name") actions_payload = _ensure_list(rule_map.get("actions", []), f"rule '{name}' actions") actions: list[RuleAction] = [] for action in actions_payload: action_map = _ensure_map(action, "rule action") action_type = _get_str(action_map, "type", "", "rule action.type").strip() if not action_type: msg = "Config field 'rule action.type' is required." raise ValueError(msg) actions.append(RuleAction(action_type=action_type)) return Rule( name=name, match=_parse_match(rule_map.get("match", {})), actions=actions, exclude_from_dashboards=_get_bool( rule_map, "exclude_from_dashboards", False, "rule.exclude_from_dashboards", ), )
[docs] def _parse_dashboards(value: object) -> list[DashboardSpec]: dashboards = _ensure_list(value, "dashboards") parsed: list[DashboardSpec] = [] for raw_dashboard in dashboards: dashboard = _ensure_map(raw_dashboard, "dashboard entry") parsed.append( DashboardSpec( name=_get_str(dashboard, "name", "default", "dashboards[].name"), group_by=_get_str(dashboard, "group_by", "none", "dashboards[].group_by"), sort_by=_get_str(dashboard, "sort_by", "score", "dashboards[].sort_by"), descending=_get_bool(dashboard, "descending", True, "dashboards[].descending"), include_read=_get_bool(dashboard, "include_read", False, "dashboards[].include_read"), max_items=_get_int(dashboard, "max_items", 100, "dashboards[].max_items"), match=_parse_match(dashboard.get("match", {})), ignore_rules=_parse_dashboard_ignore_rules(dashboard.get("ignore_rules", [])), ), ) return parsed
[docs] def _parse_dashboard_ignore_rules(value: object) -> list[MatchCriteria]: rules = _ensure_list(value, "dashboards[].ignore_rules") return [_parse_match(item, section="dashboards[].ignore_rules[]") for item in rules]
[docs] def _parse_scoring(value: object) -> ScoringConfig: scoring = _ensure_map(value, "scoring") return ScoringConfig( unread_bonus=_get_float(scoring, "unread_bonus", 15.0, "scoring.unread_bonus"), age_decay_per_hour=_get_float(scoring, "age_decay_per_hour", 0.25, "scoring.age_decay_per_hour"), reason_weights=_to_float_map(scoring.get("reason_weights", {}), "scoring.reason_weights"), repository_weights=_to_float_map(scoring.get("repository_weights", {}), "scoring.repository_weights"), subject_type_weights=_to_float_map(scoring.get("subject_type_weights", {}), "scoring.subject_type_weights"), title_keyword_weights=_to_float_map(scoring.get("title_keyword_weights", {}), "scoring.title_keyword_weights"), )
[docs] def _to_float_map(value: object, section: str) -> dict[str, float]: data = _ensure_map(value, section) return {key: _as_float(raw_value, f"{section}.{key}") for key, raw_value in data.items()}
[docs] def _parse_github(value: object) -> GitHubConfig: github = _ensure_map(value, "github") fallback_token_env = _get_str(github, "token_env", "GITHUB_TOKEN", "github.token_env") fallback_api_base_url = _get_str( github, "api_base_url", "https://api.github.com", "github.api_base_url", ) if "accounts" not in github: raw_accounts: list[object] = [{"label": "Primary"}] else: raw_accounts = _ensure_list(github.get("accounts", []), "github.accounts") if "accounts" in github and not raw_accounts: msg = "Config section 'github.accounts' must contain at least one account." raise ValueError(msg) accounts: list[GitHubAccountConfig] = [] seen_ids: set[str] = set() for index, raw_account in enumerate(raw_accounts): account = _ensure_map(raw_account, f"github.accounts[{index}]") account_id = _get_str(account, "id", "primary", f"github.accounts[{index}].id").strip() if not account_id: msg = f"Config field 'github.accounts[{index}].id' is required." raise ValueError(msg) if account_id in seen_ids: msg = f"Config field 'github.accounts[{index}].id' must be unique ('{account_id}')." raise ValueError(msg) seen_ids.add(account_id) label = _get_str(account, "label", account_id, f"github.accounts[{index}].label").strip() or account_id token_env = _get_str(account, "token_env", fallback_token_env, f"github.accounts[{index}].token_env").strip() if not token_env: msg = f"Config field 'github.accounts[{index}].token_env' is required." raise ValueError(msg) api_base_url = _get_str( account, "api_base_url", fallback_api_base_url, f"github.accounts[{index}].api_base_url", ) accounts.append( GitHubAccountConfig( id=account_id, label=label, token_env=token_env, api_base_url=api_base_url, ) ) return GitHubConfig(accounts=accounts)
[docs] def _parse_polling(value: object) -> PollingConfig: polling = _ensure_map(value, "polling") per_page = _get_int(polling, "per_page", _POLLING_PER_PAGE_MAX, "polling.per_page") if not _POLLING_PER_PAGE_MIN <= per_page <= _POLLING_PER_PAGE_MAX: msg = f"Config value 'polling.per_page' must be between {_POLLING_PER_PAGE_MIN} and {_POLLING_PER_PAGE_MAX}." raise ValueError(msg) return PollingConfig( interval_seconds=_get_int(polling, "interval_seconds", 60, "polling.interval_seconds"), per_page=per_page, max_pages=_get_int(polling, "max_pages", 5, "polling.max_pages"), all=_get_bool(polling, "all", False, "polling.all"), participating=_get_bool(polling, "participating", False, "polling.participating"), )
[docs] def _parse_enrichment(value: object) -> EnrichmentConfig: enrichment = _ensure_map(value, "enrichment") latest_comment_raw = _ensure_map( enrichment.get("github_latest_comment", {}), "enrichment.github_latest_comment", ) pr_state_raw = _ensure_map( enrichment.get("github_pr_state", {}), "enrichment.github_pr_state", ) return EnrichmentConfig( enabled=_get_bool(enrichment, "enabled", False, "enrichment.enabled"), max_requests_per_cycle=_get_int( enrichment, "max_requests_per_cycle", 25, "enrichment.max_requests_per_cycle", ), github_latest_comment=GitHubLatestCommentEnrichmentConfig( enabled=_get_bool(latest_comment_raw, "enabled", False, "enrichment.github_latest_comment.enabled"), timeout_seconds=_get_float( latest_comment_raw, "timeout_seconds", 10.0, "enrichment.github_latest_comment.timeout_seconds", ), ), github_pr_state=GitHubPRStateEnrichmentConfig( enabled=_get_bool(pr_state_raw, "enabled", False, "enrichment.github_pr_state.enabled"), timeout_seconds=_get_float( pr_state_raw, "timeout_seconds", 10.0, "enrichment.github_pr_state.timeout_seconds", ), ), )
[docs] def _parse_state(value: object) -> StateConfig: state = _ensure_map(value, "state") return StateConfig( cache_file=Path(_get_str(state, "cache_file", "~/.cache/corvix/notifications.json", "state.cache_file")) )
[docs] def _parse_auth(value: object) -> AuthConfig: auth = _ensure_map(value, "auth") return AuthConfig( mode=_get_str(auth, "mode", "single_user", "auth.mode"), session_secret=_get_str(auth, "session_secret", "", "auth.session_secret"), )
[docs] def _parse_database(value: object) -> DatabaseConfig: database = _ensure_map(value, "database") return DatabaseConfig(url_env=_get_str(database, "url_env", "DATABASE_URL", "database.url_env"))
[docs] def _parse_notifications(value: object) -> NotificationsConfig: notif = _ensure_map(value, "notifications") detect_raw = _ensure_map(notif.get("detect", {}), "notifications.detect") browser_raw = _ensure_map(notif.get("browser_tab", {}), "notifications.browser_tab") web_push_raw = _ensure_map(notif.get("web_push", {}), "notifications.web_push") return NotificationsConfig( enabled=_get_bool(notif, "enabled", True, "notifications.enabled"), detect=NotificationsDetectConfig( include_read=_get_bool(detect_raw, "include_read", False, "notifications.detect.include_read"), min_score=_get_float(detect_raw, "min_score", 0.0, "notifications.detect.min_score"), ), browser_tab=BrowserTabTargetConfig( enabled=_get_bool(browser_raw, "enabled", True, "notifications.browser_tab.enabled"), max_per_cycle=_get_int(browser_raw, "max_per_cycle", 5, "notifications.browser_tab.max_per_cycle"), cooldown_seconds=_get_int( browser_raw, "cooldown_seconds", 10, "notifications.browser_tab.cooldown_seconds", ), ), web_push=WebPushTargetConfig( enabled=_get_bool(web_push_raw, "enabled", False, "notifications.web_push.enabled"), vapid_public_key_env=_get_str( web_push_raw, "vapid_public_key_env", "CORVIX_VAPID_PUBLIC_KEY", "notifications.web_push.vapid_public_key_env", ), vapid_private_key_env=_get_str( web_push_raw, "vapid_private_key_env", "CORVIX_VAPID_PRIVATE_KEY", "notifications.web_push.vapid_private_key_env", ), subject=_get_str(web_push_raw, "subject", "", "notifications.web_push.subject"), ), )
[docs] def load_config(path: Path) -> AppConfig: """Load and validate YAML config from disk.""" data = yaml.safe_load(path.read_text(encoding="utf-8")) or {} if not isinstance(data, dict): msg = "Top-level YAML must be a map/object." raise ValueError(msg) github = _parse_github(data.get("github", {})) enrichment = _parse_enrichment(data.get("enrichment", {})) polling = _parse_polling(data.get("polling", {})) state = _parse_state(data.get("state", {})) scoring = _parse_scoring(data.get("scoring", {})) rules = _parse_rules(data.get("rules", {})) dashboards = _parse_dashboards(data.get("dashboards", [])) auth = _parse_auth(data.get("auth", {})) database = _parse_database(data.get("database", {})) notifications = _parse_notifications(data.get("notifications", {})) return AppConfig( github=github, enrichment=enrichment, polling=polling, state=state, scoring=scoring, rules=rules, dashboards=dashboards, auth=auth, database=database, notifications=notifications, )
[docs] def write_default_config(path: Path) -> None: """Write a starter configuration file.""" path.write_text(DEFAULT_CONFIG, encoding="utf-8")