from typing import TYPE_CHECKING
import redis
if TYPE_CHECKING:
from pynenc.invocation.status import InvocationStatus
PYNENC_KEY_PREFIX = "__pynenc__"
[docs]
def sanitize_for_redis(s: str) -> str:
"""
Sanitizes a string for use as a Redis key.
:param str s: The string to sanitize.
:return: The sanitized string.
"""
if s is None:
return ""
replacements = {
"[": "__OPEN_BRACKET__",
"]": "__CLOSE_BRACKET__",
"*": "__ASTERISK__",
}
for k, v in replacements.items():
s = s.replace(k, v)
return s
[docs]
class Key:
"""
Helper class to manage Redis key formats for various components.
:param str app_id: The application ID.
:param str prefix: The prefix for the keys.
"""
def __init__(self, app_id: str, prefix: str) -> None:
prefix = sanitize_for_redis(prefix)
if not prefix:
raise ValueError("Prefix cannot be an empty string or None")
app_id = sanitize_for_redis(app_id)
if not app_id:
raise ValueError("App ID cannot be an empty string or None")
if ":" in app_id:
raise ValueError("App ID cannot contain ':'")
if prefix and not prefix.endswith(":"):
prefix += ":"
self._class_prefix = prefix
self._app_id = app_id
self._prefix = f"{PYNENC_KEY_PREFIX}:{app_id}:{prefix}"
@property
def prefix(self) -> str:
"""Read-only property for the Redis key prefix."""
return self._prefix
[docs]
def invocation(self, invocation_id: str) -> str:
return f"{self.prefix}invocation:{invocation_id}"
[docs]
def task(self, task_id: str) -> str:
return f"{self.prefix}task:{task_id}"
[docs]
def args(self, task_id: str, arg: str, val: str) -> str:
return f"{self.prefix}task:{task_id}:arg:{arg}:val:{val}"
[docs]
def status_to_invocations(self, status: "InvocationStatus") -> str:
"""Get the Redis key for the set of invocation IDs with a specific status."""
return f"{self.prefix}status:{status}"
[docs]
def invocation_to_status(self, invocation_id: str) -> str:
return f"{self.prefix}invocation_status:{invocation_id}"
[docs]
def pending_timer(self, invocation_id: str) -> str:
return f"{self.prefix}pending_timer:{invocation_id}"
[docs]
def previous_status(self, invocation_id: str) -> str:
return f"{self.prefix}invocation_previous_status:{invocation_id}"
[docs]
def invocation_retries(self, invocation_id: str) -> str:
return f"{self.prefix}invocation_retries:{invocation_id}"
[docs]
def call(self, call_id: str) -> str:
return f"{self.prefix}call:{call_id}"
[docs]
def call_to_invocation(self, call_id: str) -> str:
return f"{self.prefix}call_to_invocation:{call_id}"
[docs]
def invocation_to_call(self, invocation_id: str) -> str:
"""Get the Redis key for mapping an invocation_id to its call_id."""
return f"{self.prefix}invocation_to_call:{invocation_id}"
[docs]
def edge(self, call_id: str) -> str:
return f"{self.prefix}edge:{call_id}"
[docs]
def reverse_edge(self, callee_call_id: str) -> str:
return f"{self.prefix}reverse_edge:{callee_call_id}"
[docs]
def waiting_for(self, invocation_id: str) -> str:
return f"{self.prefix}waiting_for:{invocation_id}"
[docs]
def waited_by(self, invocation_id: str) -> str:
return f"{self.prefix}waited_by:{invocation_id}"
[docs]
def all_waited(self) -> str:
return f"{self.prefix}all_waited"
[docs]
def not_waiting(self) -> str:
return f"{self.prefix}not_waiting"
[docs]
def runner_heartbeat(self, runner_id: str) -> str:
return f"{self.prefix}runner_heartbeat:{runner_id}"
[docs]
def runner_heartbeats(self) -> str:
return f"{self.prefix}runner_heartbeats"
[docs]
def history(self, invocation_id: str) -> str:
return f"{self.prefix}history:{invocation_id}"
[docs]
def history_by_timestamp(self) -> str:
"""Get key for sorted set of all history entries indexed by timestamp."""
return f"{self.prefix}history_by_timestamp"
[docs]
def result(self, invocation_id: str) -> str:
return f"{self.prefix}result:{invocation_id}"
[docs]
def exception(self, invocation_id: str) -> str:
return f"{self.prefix}exception:{invocation_id}"
[docs]
def invocation_auto_purge(self) -> str:
return f"{self.prefix}invocation_auto_purge"
[docs]
def all_invocations_by_time(self) -> str:
"""Get key for sorted set of all invocation IDs indexed by registration time."""
return f"{self.prefix}all_invocations_by_time"
[docs]
def task_invocations_by_time(self, task_id: str) -> str:
"""Get key for sorted set of invocation IDs for a task indexed by registration time."""
return f"{self.prefix}task_invocations_by_time:{task_id}"
[docs]
def default_queue(self) -> str:
return f"{self.prefix}default_queue"
[docs]
def client_data_store(self, key: str) -> str:
return f"{self.prefix}client_data_store:{key}"
[docs]
def purge(self, client: redis.Redis) -> None:
"""
Purges all keys with the given prefix in Redis.
:param redis.Redis client: The Redis client.
"""
pattern = f"{self.prefix}*"
keys = list(client.scan_iter(pattern, count=1000))
if keys:
batch_size = 1000
for i in range(0, len(keys), batch_size):
batch = keys[i : i + batch_size]
client.delete(*batch)
[docs]
def condition(self, condition_id: str) -> str:
"""Get key for storing a trigger condition."""
return f"{self.prefix}condition:{condition_id}"
[docs]
def trigger(self, trigger_id: str) -> str:
"""Get key for storing a trigger definition."""
return f"{self.prefix}trigger:{trigger_id}"
[docs]
def valid_condition(self, condition_id: str) -> str:
"""Get key for storing a valid condition."""
return f"{self.prefix}valid_condition:{condition_id}"
[docs]
def task_triggers(self, task_id: str) -> str:
"""Get key for storing triggers associated with a task."""
return f"{self.prefix}task_triggers:{task_id}"
[docs]
def condition_triggers(self, condition_id: str) -> str:
"""Get key for storing triggers that use a condition."""
return f"{self.prefix}condition_triggers:{condition_id}"
[docs]
def event_channel(self) -> str:
"""Get channel name for publishing trigger events."""
return f"{self.prefix}events"
[docs]
def cron_last_execution(self, condition_id: str) -> str:
"""
Generate a key for storing the last execution time of a cron condition.
:param condition_id: ID of the cron condition
:return: Redis key string
"""
return f"{self.prefix}cron_last_execution:{condition_id}"
[docs]
def source_task_conditions(self, task_id: str) -> str:
"""
Generate key for source task to condition mapping.
This key stores conditions that are sourced from a specific task.
:param task_id: ID of the source task
:return: Redis key for task's source conditions
"""
return f"{self.prefix}source_task_conditions:{task_id}"
[docs]
def trigger_execution_claim(self, trigger_id: str, valid_condition_id: str) -> str:
"""
Generate a key for a trigger execution claim.
This key is used to atomically claim the right to execute a trigger
for a specific valid condition across multiple workers.
:param trigger_id: ID of the trigger definition
:param valid_condition_id: ID of the valid condition
:return: Redis key for the trigger execution claim
"""
return (
f"{self.prefix}:trigger:execution_claim:{trigger_id}:{valid_condition_id}"
)
[docs]
def trigger_run_claim(self, trigger_run_id: str) -> str:
"""
Generate a key for a trigger run claim.
This key is used to atomically claim the right to execute a specific trigger run
across multiple workers. A trigger run is a unique execution attempt for a
trigger and its satisfied conditions.
:param trigger_run_id: Unique ID for this trigger run
:return: Redis key for the trigger run claim
"""
return f"{self.prefix}:trigger:run_claim:{trigger_run_id}"
[docs]
def workflow_run_by_id(self, workflow_id: str) -> str:
"""
Get key for storing a workflow run by its unique workflow_id.
:param workflow_id: The unique workflow ID
:return: Redis key for the workflow run
"""
return f"{self.prefix}workflow:run:{workflow_id}"
[docs]
def workflow_type_index(self, workflow_type: str) -> str:
"""
Get key for storing the set of workflow_ids for a workflow_type.
:param workflow_type: The workflow type (task_id)
:return: Redis key for the workflow type index set
"""
return f"{self.prefix}workflow:type_index:{workflow_type}"
[docs]
def workflow_types(self) -> str:
"""
Get key for storing workflow types set.
This key automatizes purge as it follows the app-scoped prefix pattern.
:return: Redis key for workflow types set
"""
return f"{self.prefix}workflow:types"
[docs]
def workflow_data_value(self, workflow_id: str, key: str) -> str:
return f"{self.prefix}workflow:{workflow_id}:data:{key}"
[docs]
def workflow_deterministic_value(self, workflow_id: str, key: str) -> str:
"""
Get key for storing a deterministic value for workflow operations.
:param workflow_id: ID of the workflow
:param key: Identifier for the deterministic value
:return: Redis key for the deterministic value
"""
return f"{self.prefix}workflow:{workflow_id}:det:{key}"
[docs]
def runner_context(self, runner_id: str) -> str:
"""
Get key for storing a runner context.
:param runner_id: The runner's unique identifier
:return: Redis key for the runner context
"""
return f"{self.prefix}runner_context:{runner_id}"
[docs]
def workflow_sub_invocations(self, workflow_id: str) -> str:
"""
Get key for storing sub-invocation IDs that run inside a workflow.
This key automatizes purge as it follows the app-scoped prefix pattern.
:param workflow_id: ID of the workflow
:return: Redis key for workflow sub-invocations set
"""
return f"{self.prefix}workflow:{workflow_id}:sub_invocations"
[docs]
def parent_invocation_children(self, parent_invocation_id: str) -> str:
"""
Get key for storing child invocation IDs spawned by a parent invocation.
:param parent_invocation_id: The parent invocation ID
:return: Redis key for parent's child invocations set
"""
return f"{self.prefix}parent_invocation_children:{parent_invocation_id}"
[docs]
def workflow_invocations(self, workflow_id: str) -> str:
"""
Get key for storing invocation IDs that belong to a specific workflow.
:param workflow_id: ID of the workflow
:return: Redis key for workflow invocations set
"""
return f"{self.prefix}workflow:invocations:{workflow_id}"
[docs]
def workflow_type_invocations(self, workflow_type_key: str) -> str:
"""
Get key for storing invocation IDs grouped by workflow type.
:param workflow_type_key: The workflow type key (task_id key)
:return: Redis key for workflow type invocations set
"""
return f"{self.prefix}workflow:type_invocations:{workflow_type_key}"
[docs]
@staticmethod
def all_apps_info_key(app_id: str) -> str:
"""
Get key for storing app information in the central registry.
This uses a special prefix outside the normal app namespace
to make discovery possible across all apps.
:param app_id: The ID of the app
:return: Redis key for app information
"""
return f"{PYNENC_KEY_PREFIX}:{PYNENC_KEY_PREFIX}:apps_info:{app_id}"