Distributed Execution
Run workflow tasks across multiple worker nodes.
Architecture
Main Instance (Scheduler + UI + Coordinator)
│ │
│ Service Registry │ gRPC + Heartbeat
│ (File-based) │ (includes DAG definitions)
│ │
┌────┴──────────┬─────────┴─────────────┐
│ │ │
Worker 1 Worker 2 Worker 3
(gpu=true) (region=eu-west) (cpu-optimized)
│ │ │
└───────────────┴───────────────────────┘
│
Shared Storage
(logs, execution state)Storage Modes
Boltbase supports two deployment modes for distributed workers:
| Mode | Description |
|---|---|
| Shared Filesystem | Workers share storage with coordinator via NFS/shared volumes |
| Shared Nothing | Workers communicate status and logs via gRPC (no shared storage) |
See Workers for configuration reference and deployment details.
DAG definitions do not need to be shared in either mode — they are transmitted to workers via gRPC when tasks are dispatched.
How Dispatch Decisions Work
Every execution path in Boltbase — API, CLI, scheduler, queue, and sub-DAG steps — uses a single function (ShouldDispatchToCoordinator) to decide whether a DAG runs locally or is dispatched to a worker. This guarantees consistent behavior regardless of how a run is triggered.
Decision Priority
The dispatch decision evaluates these rules in order, stopping at the first match:
| Priority | Condition | Result |
|---|---|---|
| 1 | worker_selector: local is set | Local — always runs on the main instance |
| 2 | No coordinator is configured | Local — distributed execution is unavailable |
| 3 | worker_selector has labels (e.g., gpu: "true") | Dispatch — sent to a matching worker |
| 4 | default_execution_mode: distributed is set | Dispatch — sent to any available worker |
| 5 | None of the above | Local — runs on the main instance |
Execution Paths
All of the following entry points evaluate the same decision logic:
| Trigger | Description |
|---|---|
| API start | Immediate execution from the UI or API |
| API retry | Retrying a failed run from the UI or API |
CLI boltbase start | Running a DAG from the command line |
| Scheduler | Cron-triggered scheduled runs |
| Queue consumer | Dequeuing a previously enqueued run |
| Sub-DAG step | A call step executing a child DAG |
Queue Behavior
When a DAG run is enqueued (via API, webhook, or scheduler), it always enters the local queue first with status queued. The queue processor dequeues items respecting max_concurrency, and only at dequeue time does it evaluate the dispatch decision. This means distributed runs are still subject to queue concurrency limits — the queue acts as a gate before dispatch, not after.
Sub-DAG Dispatch
Each sub-DAG independently evaluates the dispatch decision. A DAG running locally can dispatch a child to a worker (if the child has worker_selector labels), and a DAG running on a worker can force a child to run locally (if the child has worker_selector: local). Parent and child dispatch decisions are completely independent.
Setting Up Distributed Execution
Step 1: Start the Coordinator
The coordinator service can be started with boltbase start-all (requires --coordinator.host set to a non-localhost address):
# Start all services including coordinator (distributed mode)
boltbase start-all --coordinator.host=0.0.0.0 --port=8080
# Single instance mode (coordinator disabled, default)
boltbase start-all
# Or start coordinator separately
boltbase coordinator --coordinator.host=0.0.0.0 --coordinator.port=50055The coordinator is only started by start-all when --coordinator.host is set to a non-localhost address (not 127.0.0.1 or localhost). This allows running in single instance mode by default.
For containerized environments (Docker, Kubernetes), configure both the bind address and advertise address:
# Bind to all interfaces and advertise the service name
boltbase coordinator \
--coordinator.host=0.0.0.0 \
--coordinator.advertise=boltbase-server \
--coordinator.port=50055--coordinator.host: Address to bind the gRPC server (use0.0.0.0for containers)--coordinator.advertise: Address workers use to connect (defaults to hostname if not set)
Step 2: Deploy Workers
Start workers on your compute nodes with appropriate labels:
# GPU-enabled worker
boltbase worker \
--worker.labels gpu=true,cuda=11.8,memory=64G
# CPU-optimized worker
boltbase worker \
--worker.labels cpu-arch=amd64,cpu-cores=32,region=us-east-1Step 3: Route Tasks to Workers
Use worker_selector in your DAG definitions to route tasks:
worker_selector:
gpu: "true"
steps:
- command: python train_model.pyOr at the step level for sub-DAG steps:
steps:
- call: train-model
worker_selector:
gpu: "true"See Worker Labels for full details on label matching and step-level worker_selector.
Default Execution Mode
By default, DAGs without a worker_selector run locally on the main instance. You can change this behavior with the default_execution_mode server setting so that all DAGs are dispatched to workers automatically.
# config.yaml
default_execution_mode: distributed # "local" (default) or "distributed"Or via environment variable:
export BOLTBASE_DEFAULT_EXECUTION_MODE=distributedWhen set to distributed, every DAG is dispatched to a worker through the coordinator — even if it has no worker_selector. DAGs with a worker_selector are always dispatched to a matching worker regardless of this setting.
Force Local Execution
If default_execution_mode is distributed but you need a specific DAG to always run locally (e.g., a lightweight health-check), use worker_selector: local:
# This DAG always runs locally, even when default_execution_mode is "distributed"
worker_selector: local
steps:
- command: curl -f http://localhost:8080/healthSetting worker_selector to the string "local" overrides both the server default and any label-based routing, forcing the DAG to execute on the main instance.
See How Dispatch Decisions Work for the complete priority order.
