corvix.pipeline.engine

Unified pipeline engine for field completion and context enrichment.

Classes

PipelineRunResult

Result of one PipelineEngine run.

PipelineEngine

Runs field-completion and context-enrichment providers in a single unified pass.

Functions

_is_str_object_map(→ TypeIs[dict[str, object]])

_set_nested_namespace(→ None)

Merge payload under a dot-delimited namespace in root.

_apply_provider(→ corvix.domain.Notification)

Dispatch one provider and return the (possibly updated) notification.

Module Contents

corvix.pipeline.engine._is_str_object_map(value: object) TypeIs[dict[str, object]][source][source]
corvix.pipeline.engine._set_nested_namespace(root: dict[str, object], namespace: str, payload: dict[str, object]) None[source][source]

Merge payload under a dot-delimited namespace in root.

class corvix.pipeline.engine.PipelineRunResult[source][source]

Result of one PipelineEngine run.

Combines the outputs of field-completion providers (updated notifications) and context providers (per-notification context maps) into a single result object.

notifications: list[corvix.domain.Notification] = [][source][source]
contexts_by_notification_key: dict[str, dict[str, object]][source][source]
errors: list[str] = [][source][source]
property contexts_by_thread_id: dict[str, dict[str, object]][source][source]

Backward-compatible thread-keyed view of contexts_by_notification_key.

corvix.pipeline.engine._apply_provider(provider: corvix.pipeline.provider.FieldProvider | corvix.pipeline.provider.ContextProvider, current: corvix.domain.Notification, notification_client: corvix.pipeline.base.JsonFetchClient, context: corvix.pipeline.provider.PipelineContext, notification_context: dict[str, object]) corvix.domain.Notification[source][source]

Dispatch one provider and return the (possibly updated) notification.

For FieldProvider the returned notification may differ from current. For ContextProvider the notification is returned unchanged; any payload is merged into notification_context.

class corvix.pipeline.engine.PipelineEngine[source][source]

Runs field-completion and context-enrichment providers in a single unified pass.

Providers are dispatched by structural type:

  • FieldProvider — the hydrate() method is called; its return value replaces the current notification so that subsequent providers see the updated state.

  • ContextProvider — the enrich() method is called; non-empty payloads are merged under the provider’s dot-delimited name namespace in the notification’s context map.

A single PipelineContext is shared across all providers and all notifications in one run() call, so URL responses cached by an early provider are available to later providers without an additional HTTP round-trip.

providers: list[corvix.pipeline.provider.FieldProvider | corvix.pipeline.provider.ContextProvider][source][source]
max_requests_per_cycle: int = 25[source][source]
run(notifications: list[corvix.domain.Notification], client: corvix.pipeline.base.JsonFetchClient, clients_by_account: dict[str, corvix.pipeline.base.JsonFetchClient] | None = None) PipelineRunResult[source][source]

Run all providers over every notification in one cycle.

Field-completion and context-enrichment providers are interleaved in declaration order: each provider sees the notification state produced by the preceding providers in the same pass.