The distributed locking system ensures atomic, race-condition-free operations on task runs across multiple processes in a distributed deployment. This is critical for preventing concurrent modifications to run state (e.g., during dequeue, checkpoint creation, or state transitions).
For information about the broader task execution engine, see Run Engine Architecture. For concurrency management at the queue level, see Concurrency Management.
The RunLocker class provides distributed locking using the Redlock algorithm implemented over Redis. It prevents race conditions when multiple workers or processes attempt to modify the same task run simultaneously. Key capabilities include:
Sources: internal-packages/run-engine/src/engine/locking.ts1-700
Diagram: Distributed Locking Architecture
The locking system sits between RunEngine subsystems and Redis, ensuring atomic operations on task runs. Systems call lock() with a lock type identifier and resource list (typically run IDs).
Sources: internal-packages/run-engine/src/engine/locking.ts70-128 internal-packages/run-engine/src/engine/index.ts77-140
Diagram: RunLocker Class Structure
Sources: internal-packages/run-engine/src/engine/locking.ts70-128
The RunEngine constructor initializes the RunLocker with Redis configuration and retry settings:
Sources: internal-packages/run-engine/src/engine/index.ts110-140 apps/webapp/app/v3/runEngine.server.ts84-104
Diagram: Lock Acquisition Flow with Exponential Backoff
The retry logic implements exponential backoff with jitter to reduce contention:
| Attempt | Base Delay | With Multiplier (1.8) | Max After Jitter |
|---|---|---|---|
| 1 | 200ms | 200ms | 220ms |
| 2 | 200ms | 360ms | 396ms |
| 3 | 200ms | 648ms | 713ms |
| 4 | 200ms | 1,166ms | 1,283ms |
| 5 | 200ms | 2,099ms | 2,309ms |
| 6+ | 200ms | Capped at 3,000ms | 3,300ms |
Sources: internal-packages/run-engine/src/engine/locking.ts143-241
The retry configuration is exposed at the webapp level:
Sources: apps/webapp/app/v3/runEngine.server.ts96-103
Locks have a default duration of 5 seconds but are automatically extended during long-running operations to prevent expiration. The extension occurs when the remaining time falls below the automaticExtensionThreshold (default: 1 second).
Diagram: Automatic Lock Extension
The extension is implemented using a recursive timer:
Sources: internal-packages/run-engine/src/engine/locking.ts327-365
| Parameter | Default | Purpose |
|---|---|---|
duration | 5000ms | Initial lock duration |
automaticExtensionThreshold | 1000ms | When to trigger extension |
Sources: internal-packages/run-engine/src/engine/index.ts129-130
The RunLocker uses Node.js AsyncLocalStorage to track lock context across async boundaries. This enables nested lock optimization: if a system already holds a lock on a resource, subsequent lock attempts for the same resource are no-ops.
Diagram: Nested Lock Optimization with AsyncLocalStorage
This optimization is crucial for performance because many operations call each other:
Sources: internal-packages/run-engine/src/engine/locking.ts143-241 internal-packages/run-engine/src/engine/systems/checkpointSystem.ts175-193
All RunEngine systems follow this pattern:
Sources: internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts315-631
| Lock Type | Resources | System | Purpose |
|---|---|---|---|
dequeueFromWorkerQueue | [runId] | DequeueSystem | Prevent concurrent dequeue |
startRunAttempt | [runId] | RunAttemptSystem | Atomic attempt creation |
attemptSucceeded | [runId] | RunAttemptSystem | Atomic completion |
attemptFailed | [runId] | RunAttemptSystem | Atomic retry/failure |
createCheckpoint | [runId] | CheckpointSystem | Atomic checkpoint creation |
continueRunExecution | [runId] | CheckpointSystem | Atomic resume |
blockRunWithWaitpoint | [runId] | WaitpointSystem | Atomic waitpoint linking |
continueRunIfUnblocked | [runId] | WaitpointSystem | Atomic continuation check |
enqueueRun | [runId] | EnqueueSystem | Atomic queue insertion |
rescheduleDelayedRun | [runId] | DelayedRunSystem | Atomic delay reschedule |
enqueueDelayedRun | [runId] | DelayedRunSystem | Atomic delayed enqueue |
Sources: internal-packages/run-engine/src/engine/systems/dequeueSystem.ts166-169 internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts319 internal-packages/run-engine/src/engine/systems/checkpointSystem.ts53 internal-packages/run-engine/src/engine/systems/waitpointSystem.ts397
Some operations support conditional locking with lockIf:
Sources: internal-packages/run-engine/src/engine/systems/enqueueSystem.ts59
For operations that need explicit lock control (e.g., across multiple async operations), the manual API is available:
Sources: internal-packages/run-engine/src/engine/locking.ts367-483
The RunLocker exposes several OpenTelemetry metrics:
| Metric | Type | Description |
|---|---|---|
run_engine.locks.active | ObservableGauge | Current number of active locks by type |
run_engine.lock.duration | Histogram | Duration of lock operations in milliseconds |
run_engine.lock.retries | Histogram | Number of retry attempts per lock acquisition |
Example metric attributes:
Sources: internal-packages/run-engine/src/engine/locking.ts113-127
Lock operations are instrumented with OpenTelemetry spans:
Diagram: Lock Operation Tracing
Span attributes include:
Sources: internal-packages/run-engine/src/engine/locking.ts143-153
Lock acquisition failures result in LockAcquisitionTimeoutError:
This error includes:
Sources: internal-packages/run-engine/src/engine/locking.ts26-41
Sources: apps/webapp/app/v3/runEngine.server.ts84-103
All locks use the Redis key prefix: engine:runlock:{resource}
This isolates locking keys from other Redis data (queues, cache, etc.).
Sources: internal-packages/run-engine/src/engine/index.ts110-113
Refresh this wiki
This wiki was recently refreshed. Please wait 4 days to refresh again.