pynenc_redis.trigger.redis_trigger¶
Redis-backed implementation of the trigger system.
This module provides a distributed trigger system implementation using Redis for persistence and coordination across multiple application instances.
Module Contents¶
Classes¶
Redis-backed implementation of the trigger system. |
Functions¶
Reconstruct TriggerDefinitionDTO from JSON data. |
API¶
- pynenc_redis.trigger.redis_trigger._trigger_definition_dto_from_json(data: dict) pynenc.models.trigger_definition_dto.TriggerDefinitionDTO[source]¶
Reconstruct TriggerDefinitionDTO from JSON data.
- class pynenc_redis.trigger.redis_trigger.RedisTrigger(app: pynenc.app.Pynenc)[source]¶
Bases:
pynenc.trigger.base_trigger.BaseTriggerRedis-backed implementation of the trigger system.
This implementation uses Redis to store trigger conditions and definitions, making it suitable for distributed systems where multiple application instances need coordinated trigger behavior with persistence and reliability.
Initialization
Initialize the Redis-based trigger component.
- Parameters:
app – The Pynenc application instance
- conf() pynenc_redis.conf.config_trigger.ConfigTriggerRedis¶
Get the Redis trigger configuration.
- Returns:
Configuration for Redis trigger
- property client: redis.Redis¶
Lazy initialization of Redis client.
- Returns:
Redis client instance
- _register_condition(condition: pynenc.trigger.conditions.TriggerCondition) None[source]¶
Register a condition in Redis.
- Parameters:
condition – The condition to register
- get_condition(condition_id: str) pynenc.trigger.conditions.TriggerCondition | None[source]¶
Get a condition by its ID from Redis.
- Parameters:
condition_id – ID of the condition to retrieve
- Returns:
The condition if found, None otherwise
- register_trigger(trigger: pynenc.models.trigger_definition_dto.TriggerDefinitionDTO) None[source]¶
Register a trigger definition in Redis.
- Parameters:
trigger – The trigger definition to register
- _get_trigger(trigger_id: str) Optional[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
Get a trigger definition by ID from Redis.
- Parameters:
trigger_id – ID of the trigger to retrieve
- Returns:
The trigger definition if found, None otherwise
- get_triggers_for_condition(condition_id: str) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
Get all triggers that depend on a specific condition from Redis.
- Parameters:
condition_id – ID of the condition
- Returns:
List of trigger definitions using this condition
- get_triggers_for_task(task_id: pynenc.identifiers.task_id.TaskId) list[pynenc.models.trigger_definition_dto.TriggerDefinitionDTO][source]¶
Get all triggers associated with a specific task from Redis.
- Parameters:
task_id – ID of the task to find triggers for
- Returns:
List of trigger definitions for this task
- record_valid_condition(valid_condition: pynenc.trigger.conditions.ValidCondition) None[source]¶
Record that a condition has been satisfied with a specific context in Redis.
- Parameters:
valid_condition – The valid condition to record
- record_valid_conditions(valid_conditions: list[pynenc.trigger.conditions.ValidCondition]) None[source]¶
Record that multiple conditions have been satisfied with their respective contexts in Redis.
- Parameters:
valid_conditions – The list of valid conditions to record
- get_valid_conditions() dict[str, pynenc.trigger.conditions.ValidCondition][source]¶
Get all currently valid conditions and their contexts from Redis.
- Returns:
Dictionary mapping condition IDs to their valid conditions
- clear_valid_conditions(conditions: collections.abc.Iterable[pynenc.trigger.conditions.ValidCondition]) None[source]¶
Clear valid conditions after they have been processed from Redis.
- Parameters:
conditions – List of valid conditions to clear
- _get_all_conditions() list[pynenc.trigger.conditions.TriggerCondition][source]¶
Get all registered conditions from Redis.
- Returns:
List of all conditions
- _purge() None[source]¶
Purge all trigger-related data from Redis.
Removes all conditions, triggers, and valid conditions for this application.
- get_last_cron_execution(condition_id: pynenc.trigger.types.ConditionId) datetime.datetime | None[source]¶
Get the timestamp of the last execution of a cron condition from Redis.
- Parameters:
condition_id – ID of the cron condition
- Returns:
Timestamp of last execution, or None if never executed
- store_last_cron_execution(condition_id: str, execution_time: datetime.datetime, expected_last_execution: datetime.datetime | None = None) bool[source]¶
Store the last execution time for a cron condition with optimistic locking.
Uses Redis atomic operations to ensure thread safety:
For new records (no expected_last_execution): Uses SETNX for atomic create-if-not-exists
For updating existing records: Uses optimistic locking with WATCH/MULTI/EXEC pattern
- Parameters:
condition_id – ID of the cron condition
execution_time – Time of execution to store
expected_last_execution – Expected current value (for optimistic locking)
- Returns:
True if update successful, False if another process updated first
- _register_source_task_condition(task_id: pynenc.identifiers.task_id.TaskId, condition_id: str) None[source]¶
Register the conditions that are sourced from a task in Redis.
This method stores a mapping from source task IDs to the condition IDs that monitor them, enabling efficient lookup when task status changes.
- Parameters:
task_id – ID of the source task
condition_id – ID of the condition sourced from the task
- get_conditions_sourced_from_task(task_id: pynenc.identifiers.task_id.TaskId, context_type: type[pynenc.trigger.conditions.ConditionContext] | None = None) list[pynenc.trigger.conditions.TriggerCondition][source]¶
Get all conditions that are sourced from a specific task.
These are conditions that monitor the task and might be satisfied by its status or results.
- Parameters:
task_id – ID of the source task
context_type – Optional context type to filter conditions by
- Returns:
List of conditions monitoring this task
- claim_trigger_execution(trigger_id: str, valid_condition_id: str, expiration_seconds: int = 60) bool[source]¶
Atomically claim the right to execute a trigger for a specific valid condition.
Uses Redis’s SETNX (SET if Not eXists) for atomic claim operations across multiple workers. The claim automatically expires after the specified seconds to prevent stale locks.
- Parameters:
trigger_id – ID of the trigger being executed
valid_condition_id – ID of the valid condition being processed
expiration_seconds – Number of seconds after which the claim expires
- Returns:
True if the claim was successful, False if another worker has claimed it
- claim_trigger_run(trigger_run_id: str, expiration_seconds: int = 60) bool[source]¶
Atomically claim the right to execute a trigger run.
Uses Redis’s SETNX (SET if Not eXists) for atomic claim operations across multiple workers. The claim automatically expires after the specified seconds to prevent stale locks.
- Parameters:
trigger_run_id – Unique ID for this trigger run
expiration_seconds – Number of seconds after which the claim expires
- Returns:
True if the claim was successful, False if another worker has claimed it
- clean_task_trigger_definitions(task_id: pynenc.identifiers.task_id.TaskId) None[source]¶
Remove all trigger definitions for a specific task from Redis.
This method removes all trigger definitions associated with the given task and their references in the index keys. It’s safe to use in a distributed environment as it uses Redis atomic operations.
- Parameters:
task_id – ID of the task to clean triggers for