pynenc_redis.orchestrator.redis_orchestrator

Module Contents

Classes

RedisBlockingControl

A Redis-based implementation of blocking control for task invocations.

RedisOrchestrator

Orchestrator implementation using Redis for distributed invocation management.

API

exception pynenc_redis.orchestrator.redis_orchestrator.StatusNotFound[source]

Bases: Exception

Raised when a status is not found in Redis

Initialization

Initialize self. See help(type(self)) for accurate signature.

class pynenc_redis.orchestrator.redis_orchestrator.RedisBlockingControl(app: pynenc.app.Pynenc, client: redis.Redis)[source]

Bases: pynenc.orchestrator.base_orchestrator.BaseBlockingControl

A Redis-based implementation of blocking control for task invocations.

Manages invocation dependencies and blocking states in a Redis-backed environment, ensuring that invocations waiting for others are properly tracked and released.

Parameters:
  • app (Pynenc) – The Pynenc application instance.

  • client (redis.Redis) – The Redis client instance.

Initialization

purge() None[source]

Purges all data related to blocking control from Redis.

waiting_for_results(caller_invocation_id: pynenc.identifiers.invocation_id.InvocationId, result_invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId]) None[source]

Notifies the system that an invocation is waiting for the results of other invocations.

Parameters:
  • caller_invocation_id – The ID of the invocation that is waiting.

  • result_invocation_ids – The IDs of the invocations being waited on.

release_waiters(waited_invocation_id: str) None[source]

Releases any invocations that are waiting on the specified invocation.

Parameters:

waited_invocation_id – The ID of the invocation that has finished and can release its waiters.

get_blocking_invocations(max_num_invocations: int) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves invocation IDs that are blocking others but are not blocked themselves.

Parameters:

max_num_invocations – The maximum number of blocking invocation IDs to retrieve.

Returns:

An iterator over unblocked, blocking invocation IDs, ordered by age (oldest first).

class pynenc_redis.orchestrator.redis_orchestrator.RedisOrchestrator(app: pynenc.app.Pynenc)[source]

Bases: pynenc.orchestrator.base_orchestrator.BaseOrchestrator

Orchestrator implementation using Redis for distributed invocation management.

Stores status records with ownership tracking by invocation_id, using atomic transactions for status changes.

Initialization

conf() pynenc_redis.conf.config_orchestrator.ConfigOrchestratorRedis
property client: redis.Redis
property blocking_control: pynenc_redis.orchestrator.redis_orchestrator.RedisBlockingControl
get_existing_invocations(task: Task[Params, Result], key_serialized_arguments: dict[str, str] | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves existing invocation IDs based on task, arguments, and status.

Parameters:
  • task – The task for which to retrieve invocations.

  • key_serialized_arguments – Serialized arguments to filter invocations.

  • statuses – The statuses to filter invocations.

Returns:

An iterator over the matching invocation IDs.

get_task_invocation_ids(task_id: pynenc.task.TaskId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves all invocation IDs associated with a specific task ID.

Parameters:

task_id – The task ID to filter invocations.

Returns:

Iterator of invocation IDs for the specified task.

get_invocation_ids_paginated(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None, limit: int = 100, offset: int = 0) list[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves invocation IDs with pagination support.

Uses Redis sorted sets indexed by registration time for efficient pagination. Results are ordered by registration time (newest first).

Parameters:
  • task_id – Optional task ID to filter by.

  • statuses – Optional statuses to filter by.

  • limit – Maximum number of results to return.

  • offset – Number of results to skip.

Returns:

List of matching invocation IDs.

count_invocations(task_id: TaskId | None = None, statuses: list[pynenc.invocation.status.InvocationStatus] | None = None) int[source]

Counts invocations matching the given filters.

Parameters:
  • task_id – Optional task ID to filter by.

  • statuses – Optional statuses to filter by.

Returns:

The total count of matching invocations.

get_call_invocation_ids(call_id: pynenc.identifiers.call_id.CallId) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieves all invocation IDs associated with a specific call ID.

Parameters:

call_id – The call ID to filter invocations.

Returns:

Iterator of invocation IDs for the specified call.

any_non_final_invocations(call_id: pynenc.identifiers.call_id.CallId) bool[source]

Checks if there are any non-final invocations for a specific call ID.

Parameters:

call_id – The call ID to check for non-final invocations.

Returns:

True if there are non-final invocations, False otherwise.

_register_new_invocations(invocations: list[DistributedInvocation[Params, Result]], runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]

Register new invocations with status REGISTERED if they don’t exist yet.

Initializes the necessary Redis data structures for task-to-invocation, call-to-invocation mappings, status, and retry tracking.

_set_status_record(invocation_id: str, status_record: pynenc.invocation.status.InvocationStatusRecord) None[source]

Store a status record in Redis.

_atomic_status_transition(invocation_id: str, status: pynenc.invocation.status.InvocationStatus, runner_id: str | None = None) pynenc.invocation.status.InvocationStatusRecord[source]

Perform atomic status transition with validation.

Uses Redis transactions to ensure atomic updates with ownership validation.

get_invocation_status_record(invocation_id: pynenc.identifiers.invocation_id.InvocationId) pynenc.invocation.status.InvocationStatusRecord[source]

Retrieves the status record of a specific invocation.

Parameters:

invocation_id – The id of the invocation whose status is to be retrieved.

Returns:

The current status record of the invocation.

Raises:

KeyError – If invocation not found.

index_arguments_for_concurrency_control(invocation: DistributedInvocation[Params, Result]) None[source]

Caches the required data to implement concurrency control.

Parameters:

invocation – The invocation to be cached.

set_up_invocation_auto_purge(invocation_id: str) None[source]

Sets up automatic purging for an invocation after a defined period.

Parameters:

invocation_id – The invocation to set up for auto purge.

auto_purge() None[source]

Automatically purges invocations that have been in their final state beyond a specified duration.

increment_invocation_retries(invocation_id: str) None[source]

Increments the retry count of a specific invocation.

Parameters:

invocation_id – The id of the invocation for which to increment retries.

get_invocation_retries(invocation_id: str) int[source]

Retrieves the number of retries for a specific invocation.

Parameters:

invocation_id – The id of the invocation whose retry count is to be retrieved.

Returns:

The number of retries for the invocation.

filter_by_status(invocation_ids: list[pynenc.identifiers.invocation_id.InvocationId], status_filter: frozenset[pynenc.invocation.status.InvocationStatus]) list[pynenc.identifiers.invocation_id.InvocationId][source]

Filters a list of invocation ids by their status in an optimized way.

Parameters:
  • invocation_ids – The invocation ids to filter

  • status_filter – The statuses to filter by.

Returns:

List of invocation ids matching the status filter

register_runner_heartbeats(runner_ids: list[str], can_run_atomic_service: bool = False) None[source]

Register or update runners’ heartbeat timestamp and atomic service eligibility.

Parameters:
  • runner_ids – List of runner IDs

  • can_run_atomic_service – Whether runners are eligible for atomic service

record_atomic_service_execution(runner_id: str, start_time: datetime.datetime, end_time: datetime.datetime) None[source]

Record the latest atomic service execution window for a runner.

Replaces any previous execution record for this runner with the current one. Used for diagnostics and detecting potential collisions.

Parameters:
  • runner_id (str) – The runner that executed the service

  • start_time (datetime) – When execution started (UTC timezone-aware)

  • end_time (datetime) – When execution ended (UTC timezone-aware)

_get_runner_heartbeat_data() list[tuple[str, dict[bytes, bytes]]][source]

Fetch all runner IDs with their heartbeat hash data.

Returns:

List of (runner_id, hash_data) tuples for all registered runners.

_is_runner_active(runner_data: dict[bytes, bytes], cutoff: float) bool[source]

Check if a runner is active based on its last heartbeat.

_get_active_runners(timeout_seconds: float, can_run_atomic_service: bool | None) list[pynenc.orchestrator.atomic_service.ActiveRunnerInfo][source]

Retrieve runners that are considered active based on heartbeat activity.

A runner is considered “active” if it has sent a heartbeat within the timeout period. This is used for atomic service scheduling to determine which runners are eligible to participate in time slot distribution.

Parameters:
  • timeout_seconds (float) – Heartbeat timeout in seconds (typically from atomic_service_runner_considered_dead_after_minutes config)

  • can_run_atomic_service (bool | None) – If specified, filters runners based on their eligibility to run atomic services

Returns:

List of active runners ordered by creation time (oldest first)

Return type:

list[“ActiveRunnerInfo”]

get_pending_invocations_for_recovery() collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs stuck in PENDING status beyond the allowed time.

_get_running_invocations_for_recovery(timeout_seconds: float) collections.abc.Iterator[pynenc.identifiers.invocation_id.InvocationId][source]

Retrieve invocation IDs in RUNNING status owned by inactive runners.

An inactive runner is one that hasn’t sent a heartbeat within the configured timeout period. Invocations owned by such runners are considered stuck and need recovery.

Parameters:

timeout_seconds (float) – Heartbeat timeout in seconds

Returns:

Iterator of invocation IDs that need recovery.

Return type:

Iterator[str]

purge() None[source]

Remove all invocations from the orchestrator