corvix.enrichment.engine

Enrichment engine for attaching contextual data to notifications.

Classes

EnrichmentRunResult

Result of enriching one poll cycle.

EnrichmentEngine

Runs configured enrichment providers over notifications.

Functions

_is_str_object_map(→ TypeIs[dict[str, object]])

_set_nested_namespace(→ None)

Merge payload under a dot-delimited namespace.

Module Contents

corvix.enrichment.engine._is_str_object_map(value: object) TypeIs[dict[str, object]][source][source]
class corvix.enrichment.engine.EnrichmentRunResult[source][source]

Result of enriching one poll cycle.

contexts_by_notification_key: dict[str, dict[str, object]][source][source]
errors: list[str] = [][source][source]
property contexts_by_thread_id: dict[str, dict[str, object]][source][source]

Backward-compatible thread-keyed view of contexts.

class corvix.enrichment.engine.EnrichmentEngine[source][source]

Runs configured enrichment providers over notifications.

config: corvix.config.EnrichmentConfig[source][source]
providers: list[corvix.enrichment.base.EnrichmentProvider][source][source]
run(notifications: list[corvix.domain.Notification], client: corvix.enrichment.base.JsonFetchClient, clients_by_account: dict[str, corvix.enrichment.base.JsonFetchClient] | None = None) EnrichmentRunResult[source][source]

Run enabled providers for all notifications in one cycle.

corvix.enrichment.engine._set_nested_namespace(root: dict[str, object], namespace: str, payload: dict[str, object]) None[source][source]

Merge payload under a dot-delimited namespace.