Skip to content

ETL & SQL

Execute SQL queries and data operations directly in your workflows.

Supported Databases

DatabaseExecutor TypeDescription
PostgreSQLpostgresFull-featured PostgreSQL support with advisory locks
SQLitesqliteLightweight embedded database with file locking

Basic Usage

yaml
secrets:
  - name: DB_PASSWORD
    provider: env           # Read from environment variable
    key: POSTGRES_PASSWORD  # Source variable name

steps:
  - name: query-users
    type: postgres
    config:
      dsn: "postgres://user:${DB_PASSWORD}@localhost:5432/mydb"
    command: "SELECT id, name, email FROM users WHERE active = true"
    output: USERS  # Capture results to variable

Output Destination

Query results are written to stdout by default. Use output: VAR_NAME to capture results into an environment variable for use in subsequent steps. For large results, use streaming: true with output_file to write directly to a file.

Secrets

Secrets are automatically masked in logs. Use provider: file for Kubernetes/Docker secrets. See Secrets for details.

Key Features

  • Parameterized queries - Prevent SQL injection with named or positional parameters
  • Transactions - Wrap operations in transactions with configurable isolation levels
  • Data import - Import CSV, TSV, or JSONL files into database tables
  • Output formats - Export results as JSONL, JSON, or CSV
  • Streaming - Handle large result sets by streaming to files
  • Locking - Advisory locks (PostgreSQL) and file locks (SQLite) for distributed workflows

Configuration Reference

Connection

FieldTypeDefaultDescription
dsnstringrequiredDatabase connection string

Connection Pooling

Connection pooling is not configurable per-step:

  • Non-worker mode: Uses fixed defaults (1 connection per step)
  • Worker mode (shared-nothing): Managed by global pool configuration at the worker level

For distributed workers running multiple concurrent DAGs, configure PostgreSQL connection pooling via worker.postgres_pool to prevent connection exhaustion.

Execution

FieldTypeDefaultDescription
timeoutint60Query timeout in seconds
transactionboolfalseWrap in transaction
isolation_levelstring-default, read_committed, repeatable_read, serializable
paramsmap/array-Query parameters

Output

FieldTypeDefaultDescription
output_formatstringjsonljsonl, json, csv
headersboolfalseInclude headers in CSV
null_stringstringnullNULL representation
max_rowsint0Limit rows (0 = unlimited)
streamingboolfalseStream to file
output_filestring-Output file path

Locking

FieldTypeDescription
advisory_lockstringNamed lock (PostgreSQL only)
file_lockboolFile locking (SQLite only)

Data Import

Import data from files into database tables:

yaml
secrets:
  - name: DB_PASSWORD
    provider: env
    key: POSTGRES_PASSWORD

steps:
  - name: import-csv
    type: postgres
    config:
      dsn: "postgres://user:${DB_PASSWORD}@localhost:5432/mydb"
      import:
        input_file: /data/users.csv
        table: users
        format: csv
        has_header: true
        batch_size: 1000

Import Configuration

FieldTypeDefaultDescription
input_filestringrequiredSource file path
tablestringrequiredTarget table name
formatstringauto-detectcsv, tsv, jsonl (detected from file extension)
has_headerbooltrueFirst row is header
delimiterstring,Field delimiter
columns[]string-Explicit column names
null_values[]string["", "NULL", "null", "\\N"]Values treated as NULL
batch_sizeint1000Rows per INSERT
on_conflictstringerrorerror, ignore, replace
conflict_targetstring-Column(s) for conflict detection (PostgreSQL UPSERT)
update_columns[]string-Columns to update on conflict
skip_rowsint0Skip N data rows
max_rowsint0Limit rows (0 = unlimited)
dry_runboolfalseValidate without importing

Parameterized Queries

Use named parameters for SQL injection prevention:

yaml
steps:
  - name: safe-query
    type: postgres
    config:
      dsn: "${DATABASE_URL}"
      params:
        status: active
        min_age: 18
    command: |
      SELECT * FROM users
      WHERE status = :status AND age >= :min_age

Or positional parameters:

yaml
steps:
  - name: safe-query
    type: sqlite
    config:
      dsn: "file:./app.db"
      params:
        - active
        - 18
    command: "SELECT * FROM users WHERE status = ? AND age >= ?"

Transactions

Wrap multiple statements in a transaction:

yaml
steps:
  - name: transfer-funds
    type: postgres
    config:
      dsn: "${DATABASE_URL}"
      transaction: true
      isolation_level: serializable
    command: |
      UPDATE accounts SET balance = balance - 100 WHERE id = 1;
      UPDATE accounts SET balance = balance + 100 WHERE id = 2;

Output Formats

JSONL (default)

One JSON object per line, ideal for streaming:

yaml
steps:
  - name: export-jsonl
    type: postgres
    config:
      dsn: "${DATABASE_URL}"
      output_format: jsonl
    command: "SELECT * FROM orders"

Output:

{"id":1,"product":"Widget","price":9.99}
{"id":2,"product":"Gadget","price":19.99}

JSON

Array of objects:

yaml
steps:
  - name: export-json
    type: postgres
    config:
      dsn: "${DATABASE_URL}"
      output_format: json
    command: "SELECT * FROM orders"

Memory Usage

The json format buffers ALL rows in memory before writing. For large result sets, use jsonl or csv instead to stream rows one at a time. Using json with millions of rows can cause out-of-memory errors.

CSV

Tabular format with optional headers:

yaml
steps:
  - name: export-csv
    type: postgres
    config:
      dsn: "${DATABASE_URL}"
      output_format: csv
      headers: true
    command: "SELECT * FROM orders"

Streaming Large Results

For large result sets, stream directly to a file:

yaml
steps:
  - name: export-large-table
    type: postgres
    config:
      dsn: "${DATABASE_URL}"
      streaming: true
      output_file: /data/export.jsonl
      output_format: jsonl    # Use jsonl or csv for streaming
    command: "SELECT * FROM large_table"

Best Practices for Large Results

  • Use output_format: jsonl or csv - these formats stream rows immediately
  • Avoid output_format: json - it buffers all rows in memory before writing
  • Set max_rows as a safety limit for unbounded queries
  • Use streaming: true with output_file to write directly to disk

Error Handling

yaml
steps:
  - name: query-with-retry
    type: postgres
    config:
      dsn: "${DATABASE_URL}"
      timeout: 30
    command: "SELECT * FROM orders"
    retry_policy:
      limit: 3
      interval_sec: 5
    continue_on:
      failure: true

See Also

Released under the MIT License.