pynenc_redis.trigger.redis_trigger

Redis-backed implementation of the trigger system.

This module provides a distributed trigger system implementation using Redis for persistence and coordination across multiple application instances.

Module Contents

Classes

RedisTrigger

Redis-backed implementation of the trigger system.

Functions

_trigger_definition_dto_from_json

Reconstruct TriggerDefinitionDTO from JSON data.

API

pynenc_redis.trigger.redis_trigger._trigger_definition_dto_from_json(data: dict) pynenc.models.trigger_definition_dto.TriggerDefinitionDTO[source]

Reconstruct TriggerDefinitionDTO from JSON data.

class pynenc_redis.trigger.redis_trigger.RedisTrigger(app: pynenc.app.Pynenc)[source]

Bases: pynenc.trigger.base_trigger.BaseTrigger

Redis-backed implementation of the trigger system.

This implementation uses Redis to store trigger conditions and definitions, making it suitable for distributed systems where multiple application instances need coordinated trigger behavior with persistence and reliability.

Initialization

Initialize the Redis-based trigger component.

Parameters:

app – The Pynenc application instance

conf() pynenc_redis.conf.config_trigger.ConfigTriggerRedis

Get the Redis trigger configuration.

Returns:

Configuration for Redis trigger

property client: redis.Redis

Lazy initialization of Redis client.

Returns:

Redis client instance

_register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]

Register a condition in Redis.

Parameters:

condition – The condition to register

get_condition(condition_id: str) pynenc.trigger.conditions.TriggerCondition | None[source]

Get a condition by its ID from Redis.

Parameters:

condition_id – ID of the condition to retrieve

Returns:

The condition if found, None otherwise

register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]

Register a trigger definition in Redis.

Parameters:

trigger – The trigger definition to register

_get_trigger(trigger_id: str) Optional[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]

Get a trigger definition by ID from Redis.

Parameters:

trigger_id – ID of the trigger to retrieve

Returns:

The trigger definition if found, None otherwise

get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]

Get all triggers that depend on a specific condition from Redis.

Parameters:

condition_id – ID of the condition

Returns:

List of trigger definitions using this condition

get_triggers_for_task(task_id: pynenc.identifiers.task_id.TaskId) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]

Get all triggers associated with a specific task from Redis.

Parameters:

task_id – ID of the task to find triggers for

Returns:

List of trigger definitions for this task

record_valid_condition(valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]

Record that a condition has been satisfied with a specific context in Redis.

Parameters:

valid_condition – The valid condition to record

record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]

Record that multiple conditions have been satisfied with their respective contexts in Redis.

Parameters:

valid_conditions – The list of valid conditions to record

get_valid_conditions() dict[str, pynenc.trigger.conditions.ValidCondition][source]

Get all currently valid conditions and their contexts from Redis.

Returns:

Dictionary mapping condition IDs to their valid conditions

clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]

Clear valid conditions after they have been processed from Redis.

Parameters:

conditions – List of valid conditions to clear

_get_all_conditions() list[pynenc.trigger.conditions.TriggerCondition][source]

Get all registered conditions from Redis.

Returns:

List of all conditions

_purge() None[source]

Purge all trigger-related data from Redis.

Removes all conditions, triggers, and valid conditions for this application.

get_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId) datetime.datetime | None[source]

Get the timestamp of the last execution of a cron condition from Redis.

Parameters:

condition_id – ID of the cron condition

Returns:

Timestamp of last execution, or None if never executed

store_last_cron_execution(condition_id: str, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]

Store the last execution time for a cron condition with optimistic locking.

Uses Redis atomic operations to ensure thread safety:

  1. For new records (no expected_last_execution): Uses SETNX for atomic create-if-not-exists

  2. For updating existing records: Uses optimistic locking with WATCH/MULTI/EXEC pattern

Parameters:
  • condition_id – ID of the cron condition

  • execution_time – Time of execution to store

  • expected_last_execution – Expected current value (for optimistic locking)

Returns:

True if update successful, False if another process updated first

_register_source_task_condition(task_id: pynenc.identifiers.task_id.TaskId, condition_id: str) None[source]

Register the conditions that are sourced from a task in Redis.

This method stores a mapping from source task IDs to the condition IDs that monitor them, enabling efficient lookup when task status changes.

Parameters:
  • task_id – ID of the source task

  • condition_id – ID of the condition sourced from the task

get_conditions_sourced_from_task(task_id: pynenc.identifiers.task_id.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]

Get all conditions that are sourced from a specific task.

These are conditions that monitor the task and might be satisfied by its status or results.

Parameters:
  • task_id – ID of the source task

  • context_type – Optional context type to filter conditions by

Returns:

List of conditions monitoring this task

claim_trigger_execution(trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60) bool[source]

Atomically claim the right to execute a trigger for a specific valid condition.

Uses Redis’s SETNX (SET if Not eXists) for atomic claim operations across multiple workers. The claim automatically expires after the specified seconds to prevent stale locks.

Parameters:
  • trigger_id – ID of the trigger being executed

  • valid_condition_id – ID of the valid condition being processed

  • expiration_seconds – Number of seconds after which the claim expires

Returns:

True if the claim was successful, False if another worker has claimed it

claim_trigger_run(trigger_run_id: str, expiration_seconds: int = 60) bool[source]

Atomically claim the right to execute a trigger run.

Uses Redis’s SETNX (SET if Not eXists) for atomic claim operations across multiple workers. The claim automatically expires after the specified seconds to prevent stale locks.

Parameters:
  • trigger_run_id – Unique ID for this trigger run

  • expiration_seconds – Number of seconds after which the claim expires

Returns:

True if the claim was successful, False if another worker has claimed it

clean_task_trigger_definitions(task_id: pynenc.identifiers.task_id.TaskId) None[source]

Remove all trigger definitions for a specific task from Redis.

This method removes all trigger definitions associated with the given task and their references in the index keys. It’s safe to use in a distributed environment as it uses Redis atomic operations.

Parameters:

task_id – ID of the task to clean triggers for