corvix.pipeline.engine ====================== .. py:module:: corvix.pipeline.engine .. autoapi-nested-parse:: Unified pipeline engine for field completion and context enrichment. Classes ------- .. autoapisummary:: corvix.pipeline.engine.PipelineRunResult corvix.pipeline.engine.PipelineEngine Functions --------- .. autoapisummary:: corvix.pipeline.engine._is_str_object_map corvix.pipeline.engine._set_nested_namespace corvix.pipeline.engine._apply_provider Module Contents --------------- .. py:function:: _is_str_object_map(value: object) -> TypeIs[dict[str, object]] .. py:function:: _set_nested_namespace(root: dict[str, object], namespace: str, payload: dict[str, object]) -> None Merge *payload* under a dot-delimited *namespace* in *root*. .. py:class:: PipelineRunResult Result of one :class:`PipelineEngine` run. Combines the outputs of field-completion providers (updated notifications) and context providers (per-notification context maps) into a single result object. .. py:attribute:: notifications :type: list[corvix.domain.Notification] :value: [] .. py:attribute:: contexts_by_notification_key :type: dict[str, dict[str, object]] .. py:attribute:: errors :type: list[str] :value: [] .. py:property:: contexts_by_thread_id :type: dict[str, dict[str, object]] Backward-compatible thread-keyed view of :attr:`contexts_by_notification_key`. .. py:function:: _apply_provider(provider: corvix.pipeline.provider.FieldProvider | corvix.pipeline.provider.ContextProvider, current: corvix.domain.Notification, notification_client: corvix.pipeline.base.JsonFetchClient, context: corvix.pipeline.provider.PipelineContext, notification_context: dict[str, object]) -> corvix.domain.Notification Dispatch one provider and return the (possibly updated) notification. For :class:`~corvix.pipeline.provider.FieldProvider` the returned notification may differ from *current*. For :class:`~corvix.pipeline.provider.ContextProvider` the notification is returned unchanged; any payload is merged into *notification_context*. .. py:class:: PipelineEngine Runs field-completion and context-enrichment providers in a single unified pass. Providers are dispatched by structural type: * :class:`~corvix.pipeline.provider.FieldProvider` — the ``hydrate()`` method is called; its return value replaces the current notification so that subsequent providers see the updated state. * :class:`~corvix.pipeline.provider.ContextProvider` — the ``enrich()`` method is called; non-empty payloads are merged under the provider's dot-delimited ``name`` namespace in the notification's context map. A single :class:`~corvix.pipeline.provider.PipelineContext` is shared across all providers and all notifications in one :meth:`run` call, so URL responses cached by an early provider are available to later providers without an additional HTTP round-trip. .. py:attribute:: providers :type: list[corvix.pipeline.provider.FieldProvider | corvix.pipeline.provider.ContextProvider] .. py:attribute:: max_requests_per_cycle :type: int :value: 25 .. py:method:: run(notifications: list[corvix.domain.Notification], client: corvix.pipeline.base.JsonFetchClient, clients_by_account: dict[str, corvix.pipeline.base.JsonFetchClient] | None = None) -> PipelineRunResult Run all providers over every notification in one cycle. Field-completion and context-enrichment providers are interleaved in declaration order: each provider sees the notification state produced by the preceding providers in the same pass.