pynenc_redis.orchestrator.redis_orchestrator¶
Module Contents¶
Classes¶
A Redis-based implementation of blocking control for task invocations. |
|
Orchestrator implementation using Redis for distributed invocation management. |
API¶
- exception pynenc_redis.orchestrator.redis_orchestrator.StatusNotFound[source]¶
Bases:
ExceptionRaised when a status is not found in Redis
Initialization
Initialize self. See help(type(self)) for accurate signature.
- class pynenc_redis.orchestrator.redis_orchestrator.RedisBlockingControl(app: pynenc.app.Pynenc, client: redis.Redis)[source]¶
Bases:
pynenc.orchestrator.base_orchestrator.BaseBlockingControlA Redis-based implementation of blocking control for task invocations.
Manages invocation dependencies and blocking states in a Redis-backed environment, ensuring that invocations waiting for others are properly tracked and released.
- Parameters:
app (Pynenc) – The Pynenc application instance.
client (redis.Redis) – The Redis client instance.
Initialization
- waiting_for_results(caller_invocation_id: pynenc.identifiers.invocation_id.InvocationId, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]¶
Notifies the system that an invocation is waiting for the results of other invocations.
- Parameters:
caller_invocation_id – The ID of the invocation that is waiting.
result_invocation_ids – The IDs of the invocations being waited on.
- release_waiters(waited_invocation_id: str) None[source]¶
Releases any invocations that are waiting on the specified invocation.
- Parameters:
waited_invocation_id – The ID of the invocation that has finished and can release its waiters.
- get_blocking_invocations(max_num_invocations: int) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocation IDs that are blocking others but are not blocked themselves.
- Parameters:
max_num_invocations – The maximum number of blocking invocation IDs to retrieve.
- Returns:
An iterator over unblocked, blocking invocation IDs, ordered by age (oldest first).
- class pynenc_redis.orchestrator.redis_orchestrator.RedisOrchestrator(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.orchestrator.base_orchestrator.BaseOrchestratorOrchestrator implementation using Redis for distributed invocation management.
Stores status records with ownership tracking by invocation_id, using atomic transactions for status changes.
Initialization
- property client: redis.Redis¶
- property blocking_control: pynenc_redis.orchestrator.redis_orchestrator.RedisBlockingControl¶
- get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves existing invocation IDs based on task, arguments, and status.
- Parameters:
task – The task for which to retrieve invocations.
key_serialized_arguments – Serialized arguments to filter invocations.
statuses – The statuses to filter invocations.
- Returns:
An iterator over the matching invocation IDs.
- get_task_invocation_ids(task_id: pynenc.task.TaskId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves all invocation IDs associated with a specific task ID.
- Parameters:
task_id – The task ID to filter invocations.
- Returns:
Iterator of invocation IDs for the specified task.
- get_invocation_ids_paginated(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None, limit: int = 100, offset: int = 0) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves invocation IDs with pagination support.
Uses Redis sorted sets indexed by registration time for efficient pagination. Results are ordered by registration time (newest first).
- Parameters:
task_id – Optional task ID to filter by.
statuses – Optional statuses to filter by.
limit – Maximum number of results to return.
offset – Number of results to skip.
- Returns:
List of matching invocation IDs.
- count_invocations(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) int[source]¶
Counts invocations matching the given filters.
- Parameters:
task_id – Optional task ID to filter by.
statuses – Optional statuses to filter by.
- Returns:
The total count of matching invocations.
- get_call_invocation_ids(call_id: pynenc.identifiers.call_id.CallId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieves all invocation IDs associated with a specific call ID.
- Parameters:
call_id – The call ID to filter invocations.
- Returns:
Iterator of invocation IDs for the specified call.
- any_non_final_invocations(call_id: pynenc.identifiers.call_id.CallId) bool[source]¶
Checks if there are any non-final invocations for a specific call ID.
- Parameters:
call_id – The call ID to check for non-final invocations.
- Returns:
True if there are non-final invocations, False otherwise.
- _register_new_invocations(invocations: list[DistributedInvocation[Params, Result]], runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]¶
Register new invocations with status REGISTERED if they don’t exist yet.
Initializes the necessary Redis data structures for task-to-invocation, call-to-invocation mappings, status, and retry tracking.
- _set_status_record(invocation_id: str, status_record: pynenc.invocation.status.InvocationStatusRecord) None[source]¶
Store a status record in Redis.
- _atomic_status_transition(invocation_id: str, status: pynenc.invocation.status.InvocationStatus, runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]¶
Perform atomic status transition with validation.
Uses Redis transactions to ensure atomic updates with ownership validation.
- get_invocation_status_record(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.invocation.status.InvocationStatusRecord[source]¶
Retrieves the status record of a specific invocation.
- Parameters:
invocation_id – The id of the invocation whose status is to be retrieved.
- Returns:
The current status record of the invocation.
- Raises:
KeyError – If invocation not found.
- index_arguments_for_concurrency_control(invocation: DistributedInvocation[Params, Result]) None[source]¶
Caches the required data to implement concurrency control.
- Parameters:
invocation – The invocation to be cached.
- set_up_invocation_auto_purge(invocation_id: str) None[source]¶
Sets up automatic purging for an invocation after a defined period.
- Parameters:
invocation_id – The invocation to set up for auto purge.
- auto_purge() None[source]¶
Automatically purges invocations that have been in their final state beyond a specified duration.
- increment_invocation_retries(invocation_id: str) None[source]¶
Increments the retry count of a specific invocation.
- Parameters:
invocation_id – The id of the invocation for which to increment retries.
- get_invocation_retries(invocation_id: str) int[source]¶
Retrieves the number of retries for a specific invocation.
- Parameters:
invocation_id – The id of the invocation whose retry count is to be retrieved.
- Returns:
The number of retries for the invocation.
- filter_by_status(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], status_filter: frozenset[pynenc.invocation.status.InvocationStatus]) list[pynenc.identifiers.invocation_id.InvocationId][source]¶
Filters a list of invocation ids by their status in an optimized way.
- Parameters:
invocation_ids – The invocation ids to filter
status_filter – The statuses to filter by.
- Returns:
List of invocation ids matching the status filter
- register_runner_heartbeats(runner_ids: list[str], can_run_atomic_service: bool = False) None[source]¶
Register or update runners’ heartbeat timestamp and atomic service eligibility.
- Parameters:
runner_ids – List of runner IDs
can_run_atomic_service – Whether runners are eligible for atomic service
- record_atomic_service_execution(runner_id: str, start_time: datetime.datetime, end_time: datetime.datetime) None[source]¶
Record the latest atomic service execution window for a runner.
Replaces any previous execution record for this runner with the current one. Used for diagnostics and detecting potential collisions.
- Parameters:
runner_id (str) – The runner that executed the service
start_time (datetime) – When execution started (UTC timezone-aware)
end_time (datetime) – When execution ended (UTC timezone-aware)
- _get_runner_heartbeat_data() list[tuple[str, dict[bytes, bytes]]][source]¶
Fetch all runner IDs with their heartbeat hash data.
- Returns:
List of (runner_id, hash_data) tuples for all registered runners.
- _is_runner_active(runner_data: dict[bytes, bytes], cutoff: float) bool[source]¶
Check if a runner is active based on its last heartbeat.
- _get_active_runners(timeout_seconds: float, can_run_atomic_service: bool | None) list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo][source]¶
Retrieve runners that are considered active based on heartbeat activity.
A runner is considered “active” if it has sent a heartbeat within the timeout period. This is used for atomic service scheduling to determine which runners are eligible to participate in time slot distribution.
- Parameters:
- Returns:
List of active runners ordered by creation time (oldest first)
- Return type:
list[“ActiveRunnerInfo”]
- get_pending_invocations_for_recovery() collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs stuck in PENDING status beyond the allowed time.
- _get_running_invocations_for_recovery(timeout_seconds: float) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]¶
Retrieve invocation IDs in RUNNING status owned by inactive runners.
An inactive runner is one that hasn’t sent a heartbeat within the configured timeout period. Invocations owned by such runners are considered stuck and need recovery.