"""Command line interface for Corvix."""
from __future__ import annotations
from datetime import UTC, datetime
from os import environ
from pathlib import Path
import click
from rich.console import Console
from corvix.config import AppConfig, GitHubAccountConfig, load_config, write_default_config
from corvix.db import get_database_url
from corvix.env import get_env_value
from corvix.ingestion import GitHubNotificationsClient
from corvix.services import (
NotificationsClient,
PollCycleInput,
render_cached_dashboards,
run_poll_cycle,
run_watch_loop,
)
from corvix.storage import NotificationCache, PostgresStorage
from corvix.web.app import run as run_web
@click.group(invoke_without_command=True)
@click.option(
"--config",
"config_path",
type=click.Path(path_type=Path, dir_okay=False, file_okay=True),
default=Path("corvix.yaml"),
show_default=True,
)
@click.pass_context
[docs]
def main(ctx: click.Context, config_path: Path) -> None:
"""Corvix local GitHub notifications dashboard."""
ctx.ensure_object(dict)
ctx.obj["config_path"] = config_path
if ctx.invoked_subcommand is None:
click.echo(ctx.get_help())
@main.command("init-config")
@click.argument(
"path",
type=click.Path(path_type=Path, dir_okay=False, file_okay=True),
default=Path("corvix.yaml"),
required=False,
)
@click.option("--force", is_flag=True, default=False, help="Overwrite existing config file.")
[docs]
def init_config_command(path: Path, force: bool) -> None:
"""Write a starter YAML config."""
if path.exists() and not force:
msg = f"Config already exists at '{path}'. Use --force to overwrite."
raise click.ClickException(msg)
write_default_config(path)
click.echo(f"Wrote config to {path}")
@main.command("poll")
@click.option(
"--apply-actions/--dry-run",
default=False,
show_default=True,
help="Apply mark-read actions to GitHub or only report planned actions.",
)
@click.pass_context
[docs]
def poll_command(ctx: click.Context, apply_actions: bool) -> None:
"""Run one poll cycle and persist processed notifications to cache."""
config_path = _config_path_from_context(ctx)
app_config = _load_app_config(config_path)
clients = _build_clients(app_config.github.accounts)
cache = NotificationCache(path=app_config.resolve_cache_file())
summary = run_poll_cycle(
PollCycleInput(
config=app_config,
clients=clients,
cache=cache,
apply_actions=apply_actions,
)
)
click.echo(f"Fetched: {summary.fetched}")
click.echo(f"Excluded from dashboards: {summary.excluded}")
click.echo(f"Actions executed: {summary.actions_taken}")
click.echo(f"Cache file: {cache.path}")
for error in summary.errors:
click.echo(f"Action error: {error}")
@main.command("watch")
@click.option(
"--apply-actions/--dry-run",
default=False,
show_default=True,
help="Apply mark-read actions to GitHub or only report planned actions.",
)
@click.option(
"--iterations",
type=click.IntRange(min=1),
default=None,
help="Number of polling iterations to run. Omit to run forever.",
)
@click.pass_context
[docs]
def watch_command(ctx: click.Context, apply_actions: bool, iterations: int | None) -> None:
"""Run periodic poll cycles, suitable for cron-like local daemon behavior."""
config_path = _config_path_from_context(ctx)
app_config = _load_app_config(config_path)
clients = _build_clients(app_config.github.accounts)
cache = NotificationCache(path=app_config.resolve_cache_file())
summaries = run_watch_loop(
config=app_config,
clients=clients,
cache=cache,
apply_actions=apply_actions,
iterations=iterations,
)
for index, summary in enumerate(summaries, start=1):
click.echo(
f"Run {index}: fetched={summary.fetched}, excluded={summary.excluded}, actions={summary.actions_taken}",
)
for error in summary.errors:
click.echo(f"Action error: {error}")
@main.command("dashboard")
@click.option("--name", "dashboard_name", default=None, help="Render only one named dashboard.")
@click.pass_context
[docs]
def dashboard_command(ctx: click.Context, dashboard_name: str | None) -> None:
"""Render dashboards from the persisted cache file without polling GitHub."""
config_path = _config_path_from_context(ctx)
app_config = _load_app_config(config_path)
cache = NotificationCache(path=app_config.resolve_cache_file())
console = Console()
results = render_cached_dashboards(
config=app_config,
cache=cache,
console=console,
dashboard_name=dashboard_name,
)
if not results:
click.echo("No dashboards rendered.")
return
for result in results:
click.echo(f"{result.dashboard_name}: {result.rows} rows")
@main.command("serve")
@click.option("--host", default="0.0.0.0", show_default=True)
@click.option("--port", default=8000, show_default=True, type=click.IntRange(min=1, max=65535))
@click.option("--reload/--no-reload", default=False, show_default=True)
@click.pass_context
[docs]
def serve_command(ctx: click.Context, host: str, port: int, reload: bool) -> None:
"""Run Litestar dashboard website."""
config_path = _config_path_from_context(ctx)
environ["CORVIX_CONFIG"] = str(config_path)
environ["CORVIX_WEB_HOST"] = host
environ["CORVIX_WEB_PORT"] = str(port)
environ["CORVIX_WEB_RELOAD"] = "true" if reload else "false"
run_web()
@main.command("migrate-cache")
@click.option(
"--user-id",
required=True,
help="UUID of the user to assign imported records to.",
)
@click.pass_context
[docs]
def migrate_cache_command(ctx: click.Context, user_id: str) -> None:
"""Import JSON cache records into PostgreSQL for a given user.
Reads the cache file from the config, then upserts all records into the
PostgreSQL database using the DATABASE_URL (or the env var named in
config.database.url_env).
"""
config_path = _config_path_from_context(ctx)
app_config = _load_app_config(config_path)
db_url = get_database_url(app_config.database.url_env)
if not db_url:
msg = f"Environment variable '{app_config.database.url_env}' is not set."
raise click.ClickException(msg)
cache = NotificationCache(path=app_config.resolve_cache_file())
generated_at, records = cache.load()
if not records:
click.echo("Cache is empty or not found — nothing to migrate.")
return
snapshot_time = generated_at if generated_at is not None else datetime.now(tz=UTC)
storage = PostgresStorage(connection_string=db_url)
storage.save_records(user_id=user_id, records=records, generated_at=snapshot_time)
click.echo(f"Migrated {len(records)} records for user {user_id}.")
[docs]
def _load_app_config(config_path: Path) -> AppConfig:
if not config_path.exists():
msg = f"Config file '{config_path}' does not exist. Run `corvix init-config` first."
raise click.ClickException(msg)
try:
return load_config(config_path)
except ValueError as error:
msg = f"Invalid config at '{config_path}': {error}"
raise click.ClickException(msg) from error
[docs]
def _resolve_token(token_env: str) -> str:
try:
token = get_env_value(token_env)
except ValueError as error:
raise click.ClickException(str(error)) from error
if token:
return token
msg = f"Environment variable '{token_env}' (or '{token_env}_FILE') is required for polling GitHub notifications."
raise click.ClickException(msg)
[docs]
def _build_clients(accounts: list[GitHubAccountConfig]) -> tuple[NotificationsClient, ...]:
clients: list[NotificationsClient] = []
for account in accounts:
token = _resolve_token(account.token_env)
clients.append(
GitHubNotificationsClient(
token=token,
api_base_url=account.api_base_url,
account_id=account.id,
account_label=account.label,
)
)
return tuple(clients)
[docs]
def _config_path_from_context(ctx: click.Context) -> Path:
config_path = ctx.obj.get("config_path") if ctx.obj else None
if isinstance(config_path, Path):
return config_path
msg = "Missing config path in CLI context."
raise click.ClickException(msg)