pynenc_redis.state_backend.redis_state_backend¶
Module Contents¶
Classes¶
A Redis-based implementation of the state backend. |
Functions¶
Reconstruct WorkflowIdentity from JSON data. |
API¶
- pynenc_redis.state_backend.redis_state_backend._workflow_identity_from_json(data: dict[str, Any]) pynenc.workflow.WorkflowIdentity[source]¶
Reconstruct WorkflowIdentity from JSON data.
- class pynenc_redis.state_backend.redis_state_backend.RedisStateBackend(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.state_backend.base_state_backend.BaseStateBackendA Redis-based implementation of the state backend.
This backend uses Redis to store and retrieve the state of invocations, including their data, history, results, and exceptions. It’s suitable for distributed systems where shared state management is required.
Initialization
- property client: redis.Redis¶
Lazy initialization of Redis client
- _upsert_invocations(entries: list[tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO]]) None[source]¶
Updates or inserts multiple invocations.
- _get_invocation(invocation_id: str) tuple[pynenc.invocation.dist_invocation.InvocationDTO, pynenc.models.call_dto.CallDTO] | None[source]¶
Retrieves an invocation from Redis by its ID.
- Parameters:
invocation_id (InvocationId) – The ID of the invocation to retrieve.
- Returns:
Tuple of InvocationDTO and CallDTO
- _add_histories(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], invocation_history: pynenc.state_backend.base_state_backend.InvocationHistory) None[source]¶
Adds a histories record for a list of invocations.
- _get_history(invocation_id: pynenc.identifiers.invocation_id.InvocationId) list[pynenc.state_backend.base_state_backend.InvocationHistory][source]¶
Retrieves the history of an invocation ordered by timestamp.
- Parameters:
invocation_id (str) – The ID of the invocation to get the history from
- Returns:
List of InvocationHistory records
- _set_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_result: str) None[source]¶
Sets the result of an invocation.
- _get_result(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]¶
Retrieves the result of an invocation.
- Parameters:
invocation_id (str) – The ID of the invocation to get the result from
- Returns:
The serialized result string
- _set_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId, serialized_exception: str) None[source]¶
Sets the raised exception by an invocation ran.
- _get_exception(invocation_id: pynenc.identifiers.invocation_id.InvocationId) str[source]¶
Retrieves the exception of an invocation.
- Parameters:
invocation_id (InvocationId) – The ID of the invocation to get the exception from
- Returns:
The serialized exception string
- get_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, default: Any = None) Any[source]¶
Get a value from workflow data.
- Parameters:
workflow_identity – Workflow identity
key – Data key to retrieve
default – Default value if key doesn’t exist
- Returns:
Stored value or default
- set_workflow_data(workflow_identity: pynenc.workflow.WorkflowIdentity, key: str, value: Any) None[source]¶
Set a value in workflow data.
- Parameters:
workflow_identity – Workflow identity
key – Data key to set
value – Value to store
- store_app_info(app_info: pynenc.app.AppInfo) None[source]¶
Register this app’s information in the state backend for discovery.
- Parameters:
app_info – The app information to store
- get_app_info() pynenc.app.AppInfo[source]¶
Retrieve information of the current app.
- Returns:
The app information
- Raises:
ValueError – If app info is not found
- static discover_app_infos() dict[str, pynenc.app.AppInfo][source]¶
Retrieve all app information registered in this state backend.
- Returns:
Dictionary mapping app_id to app information
- store_workflow_run(workflow_identity: pynenc.workflow.WorkflowIdentity) None[source]¶
Store a workflow run for tracking and monitoring.
Maintains workflow type registry and specific workflow run instances. This enables monitoring of workflow types and their execution history.
- Parameters:
workflow_identity – The workflow identity to store
- get_all_workflow_types() collections.abc.Iterator[pynenc.identifiers.task_id.TaskId][source]¶
Retrieve all workflow types (workflow_task_ids) stored in this Redis state backend.
- Returns:
Iterator of workflow task IDs representing different workflow types (task_ids)
- get_all_workflow_runs() collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]¶
Retrieve workflow run identities from this Redis state backend.
- Returns:
Iterator of workflow identities for runs
- get_workflow_runs(workflow_type: pynenc.identifiers.task_id.TaskId) collections.abc.Iterator[pynenc.workflow.WorkflowIdentity][source]¶
Retrieve workflow run identities from this Redis state backend with pagination.
Uses configurable batch size to efficiently handle large datasets without overwhelming memory usage by processing data in manageable chunks.
- Parameters:
workflow_type – Filter for specific workflow type
- Returns:
Iterator of workflow identities for runs
- store_workflow_sub_invocation(parent_workflow_id: str, sub_invocation_id: pynenc.identifiers.invocation_id.InvocationId) None[source]¶
Store a sub-invocation ID that runs inside a parent workflow.
- Parameters:
parent_workflow_id – The workflow ID that contains the sub-invocation
sub_invocation_id – The invocation ID of the task/sub-workflow running inside
- get_workflow_sub_invocations(workflow_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve all sub-invocation IDs that run inside a specific workflow.
- Parameters:
workflow_id – The workflow ID to get sub-invocations for
- Returns:
Iterator of invocation IDs that run inside the workflow
- iter_invocations_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.identifiers.invocation_id.InvocationId]][source]¶
Iterate over invocation IDs that have history within time range.
Uses Redis sorted set with timestamp scores for efficient range queries.
- Parameters:
start_time – Start of time range
end_time – End of time range
batch_size – Number of invocation IDs per batch
- Returns:
Iterator yielding batches of invocation IDs
- iter_history_in_timerange(start_time: datetime.datetime, end_time: datetime.datetime, batch_size: int = 100) collections.abc.Iterator[list[pynenc.state_backend.base_state_backend.InvocationHistory]][source]¶
Iterate over history entries within time range.
Uses Redis sorted set with timestamp scores for efficient range queries. Results are ordered by timestamp ascending.
- Parameters:
start_time – Start of time range
end_time – End of time range
batch_size – Number of history entries per batch
- Returns:
Iterator yielding batches of InvocationHistory objects
- _store_runner_context(runner_context: pynenc.runner.runner_context.RunnerContext) None[source]¶
Store a runner context in Redis.
- Parameters:
runner_context (RunnerContext) – The context to store
- _get_runner_context(runner_id: str) RunnerContext | None[source]¶
Retrieve a runner context by runner_id from Redis.
- Parameters:
runner_id (str) – The runner’s unique identifier
- Returns:
The stored RunnerContext or None if not found
- _get_runner_contexts(runner_ids: list[str]) list[pynenc.runner.runner_context.RunnerContext][source]¶
Retrieve multiple runner contexts by their IDs using Redis mget.
- get_child_invocations(parent_invocation_id: pynenc.identifiers.invocation_id.InvocationId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Return IDs of all invocations directly spawned by the given parent.
Used for family-tree traversal: given a parent invocation ID, find all invocations that recorded it as their
parent_invocation_id.- Parameters:
parent_invocation_id – The invocation ID to find children for.
- Returns:
Iterator of child invocation IDs (may be empty).
- get_matching_runner_contexts(partial_id: str) collections.abc.Iterator[pynenc.runner.runner_context.RunnerContext][source]¶
Search runner contexts by partial ID match.
Uses Redis SCAN pattern matching to find all runner contexts whose ID contains the given partial string.
- Parameters:
partial_id – Partial string to match in runner context IDs
- Returns:
Iterator of matching RunnerContext objects
- get_invocation_ids_by_workflow(workflow_id: str | None = None, workflow_type_key: str | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs filtered by workflow criteria.
Returns invocations matching the provided workflow_id and/or workflow_type_key. If both are provided, returns the intersection. If neither is provided, returns an empty iterator.
- Parameters:
workflow_id – Optional workflow ID to filter by
workflow_type_key – Optional workflow type key to filter by
- Returns:
Iterator of matching invocation IDs