pynenc_redis.state_backend.redis_state_backend

Module Contents

Classes

RedisStateBackend

A Redis-based implementation of the state backend.

Functions

_workflow_identity_from_json

Reconstruct WorkflowIdentity from JSON data.

API

pynenc_redis.state_backend.redis_state_backend._workflow_identity_from_json(data: dict[str, Any]) pynenc.workflow.WorkflowIdentity[source]

Reconstruct WorkflowIdentity from JSON data.

class pynenc_redis.state_backend.redis_state_backend.RedisStateBackend(app: pynenc.app.Pynenc)[source]

Bases: pynenc.state_backend.base_state_backend.BaseStateBackend

A Redis-based implementation of the state backend.

This backend uses Redis to store and retrieve the state of invocations, including their data, history, results, and exceptions. It’s suitable for distributed systems where shared state management is required.

Initialization

conf() pynenc_redis.conf.config_state_backend.ConfigStateBackendRedis
property client: redis.Redis

Lazy initialization of Redis client

purge() None[source]

Clears all data from the Redis backend for the current app.app_id.

_upsert_invocations(entries: list[tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO]]) None[source]

Updates or inserts multiple invocations.

Parameters:

entries (list[tuple[InvocationDTO, CallDTO]]) – The invocation/call DTO pairs to upsert.

_get_invocation(invocation_id: str) tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO] | None[source]

Retrieves an invocation from Redis by its ID.

Parameters:

invocation_id (InvocationId) – The ID of the invocation to retrieve.

Returns:

Tuple of InvocationDTO and CallDTO

_add_histories(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]

Adds a histories record for a list of invocations.

Parameters:
  • invocation_ids (list[str]) – The IDs of the invocations.

  • invocation_history (InvocationHistory) – The history record to add.

_get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]

Retrieves the history of an invocation ordered by timestamp.

Parameters:

invocation_id (str) – The ID of the invocation to get the history from

Returns:

List of InvocationHistory records

_set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_result: str) None[source]

Sets the result of an invocation.

Parameters:
  • invocation_id (str) – The ID of the invocation to set

  • serialized_result (str) – The serialized result to set

_get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]

Retrieves the result of an invocation.

Parameters:

invocation_id (str) – The ID of the invocation to get the result from

Returns:

The serialized result string

_set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_exception: str) None[source]

Sets the raised exception by an invocation ran.

Parameters:
  • invocation_id (str) – The ID of the invocation to set

  • serialized_exception (str) – The serialized exception to set

_get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]

Retrieves the exception of an invocation.

Parameters:

invocation_id (InvocationId) – The ID of the invocation to get the exception from

Returns:

The serialized exception string

get_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, default: Any = None) Any[source]

Get a value from workflow data.

Parameters:
  • workflow_identity – Workflow identity

  • key – Data key to retrieve

  • default – Default value if key doesn’t exist

Returns:

Stored value or default

set_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]

Set a value in workflow data.

Parameters:
  • workflow_identity – Workflow identity

  • key – Data key to set

  • value – Value to store

store_app_info(app_info: pynenc.app.AppInfo) None[source]

Register this app’s information in the state backend for discovery.

Parameters:

app_info – The app information to store

get_app_info() pynenc.app.AppInfo[source]

Retrieve information of the current app.

Returns:

The app information

Raises:

ValueError – If app info is not found

static discover_app_infos() dict[str, pynenc.app.AppInfo][source]

Retrieve all app information registered in this state backend.

Returns:

Dictionary mapping app_id to app information

store_workflow_run(workflow_identity: pynenc.workflow.WorkflowIdentity) None[source]

Store a workflow run for tracking and monitoring.

Maintains workflow type registry and specific workflow run instances. This enables monitoring of workflow types and their execution history.

Parameters:

workflow_identity – The workflow identity to store

get_all_workflow_types() collections.abc.Iterator[pynenc.identifiers.task_id.TaskId][source]

Retrieve all workflow types (workflow_task_ids) stored in this Redis state backend.

Returns:

Iterator of workflow task IDs representing different workflow types (task_ids)

get_all_workflow_runs() collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]

Retrieve workflow run identities from this Redis state backend.

Returns:

Iterator of workflow identities for runs

get_workflow_runs(workflow_type: pynenc.identifiers.task_id.TaskId) collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]

Retrieve workflow run identities from this Redis state backend with pagination.

Uses configurable batch size to efficiently handle large datasets without overwhelming memory usage by processing data in manageable chunks.

Parameters:

workflow_type – Filter for specific workflow type

Returns:

Iterator of workflow identities for runs

store_workflow_sub_invocation(parent_workflow_id: str, sub_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]

Store a sub-invocation ID that runs inside a parent workflow.

Parameters:
  • parent_workflow_id – The workflow ID that contains the sub-invocation

  • sub_invocation_id – The invocation ID of the task/sub-workflow running inside

get_workflow_sub_invocations(workflow_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve all sub-invocation IDs that run inside a specific workflow.

Parameters:

workflow_id – The workflow ID to get sub-invocations for

Returns:

Iterator of invocation IDs that run inside the workflow

iter_invocations_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.identifiers.invocation_id.InvocationId]][source]

Iterate over invocation IDs that have history within time range.

Uses Redis sorted set with timestamp scores for efficient range queries.

Parameters:
  • start_time – Start of time range

  • end_time – End of time range

  • batch_size – Number of invocation IDs per batch

Returns:

Iterator yielding batches of invocation IDs

iter_history_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.state_backend.base_state_backend.InvocationHistory]][source]

Iterate over history entries within time range.

Uses Redis sorted set with timestamp scores for efficient range queries. Results are ordered by timestamp ascending.

Parameters:
  • start_time – Start of time range

  • end_time – End of time range

  • batch_size – Number of history entries per batch

Returns:

Iterator yielding batches of InvocationHistory objects

_store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]

Store a runner context in Redis.

Parameters:

runner_context (RunnerContext) – The context to store

_get_runner_context(runner_id: str) RunnerContext | None[source]

Retrieve a runner context by runner_id from Redis.

Parameters:

runner_id (str) – The runner’s unique identifier

Returns:

The stored RunnerContext or None if not found

_get_runner_contexts(runner_ids: list[str]) list[pynenc.runner.runner_context.RunnerContext][source]

Retrieve multiple runner contexts by their IDs using Redis mget.

Parameters:

runner_ids (list[str]) – List of runner unique identifiers

Returns:

list[“RunnerContext”] of the stored RunnerContexts

get_child_invocations(parent_invocation_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Return IDs of all invocations directly spawned by the given parent.

Used for family-tree traversal: given a parent invocation ID, find all invocations that recorded it as their parent_invocation_id.

Parameters:

parent_invocation_id – The invocation ID to find children for.

Returns:

Iterator of child invocation IDs (may be empty).

get_matching_runner_contexts(partial_id: str) collections.abc.Iterator[pynenc.runner.runner_context.RunnerContext][source]

Search runner contexts by partial ID match.

Uses Redis SCAN pattern matching to find all runner contexts whose ID contains the given partial string.

Parameters:

partial_id – Partial string to match in runner context IDs

Returns:

Iterator of matching RunnerContext objects

get_invocation_ids_by_workflow(workflow_id: str | None = None, workflow_type_key: str | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs filtered by workflow criteria.

Returns invocations matching the provided workflow_id and/or workflow_type_key. If both are provided, returns the intersection. If neither is provided, returns an empty iterator.

Parameters:
  • workflow_id – Optional workflow ID to filter by

  • workflow_type_key – Optional workflow type key to filter by

Returns:

Iterator of matching invocation IDs