Skip to content

SQLite

Execute queries and data operations against SQLite databases. Uses a pure Go SQLite implementation with no external dependencies.

Basic Usage

yaml
steps:
  - name: query-data
    type: sqlite
    config:
      dsn: "file:./data.db"
    command: "SELECT * FROM users"
    output: USERS  # Capture results to variable

Output Destination

Query results are written to stdout by default (JSONL format). Use output: VAR_NAME to capture results into an environment variable. For large results, use streaming: true with output_file.

Connection String

SQLite supports file-based and in-memory databases:

File Database

yaml
config:
  dsn: "file:./myapp.db"

Or with options:

yaml
config:
  dsn: "file:./myapp.db?mode=rw&cache=shared"
ParameterDescription
modero (read-only), rw (read-write), rwc (read-write-create), memory
cacheshared (shared cache), private (private cache)

In-Memory Database

yaml
config:
  dsn: ":memory:"

In-Memory Database Sharing

By default, :memory: databases are ephemeral and not shared between steps. To share an in-memory database across steps within the same DAG run, use shared_memory: true:

yaml
config:
  dsn: ":memory:"
  shared_memory: true  # Enables shared cache mode

This converts the DSN to file::memory:?cache=shared internally. For persistent storage, use file databases.

Configuration

yaml
steps:
  - name: query
    type: sqlite
    config:
      dsn: "file:./app.db"
      timeout: 30           # Query timeout in seconds
      shared_memory: false   # Set true for :memory: databases to share across steps

Connection Pooling

SQLite connection pooling is not configurable. Each step always uses 1 connection (optimal for SQLite's locking model). Global pool management does not apply to SQLite, even in distributed worker mode.

Default Pragmas

The SQLite driver automatically configures these pragmas for robustness:

sql
PRAGMA foreign_keys = ON;    -- Enable foreign key enforcement
PRAGMA busy_timeout = 5000;  -- Wait up to 5 seconds if database is locked

Parameterized Queries

Named Parameters

Use :name syntax for named parameters:

yaml
steps:
  - name: find-user
    type: sqlite
    config:
      dsn: "file:./app.db"
      params:
        status: active
        role: admin
    command: |
      SELECT * FROM users
      WHERE status = :status AND role = :role

Positional Parameters

SQLite uses ? for positional parameters:

yaml
steps:
  - name: find-user
    type: sqlite
    config:
      dsn: "file:./app.db"
      params:
        - active
        - admin
    command: "SELECT * FROM users WHERE status = ? AND role = ?"

Transactions

yaml
steps:
  - name: batch-update
    type: sqlite
    config:
      dsn: "file:./app.db"
      transaction: true
    command: |
      UPDATE users SET last_seen = datetime('now') WHERE id = 1;
      UPDATE users SET login_count = login_count + 1 WHERE id = 1;
      INSERT INTO activity_log (user_id, action) VALUES (1, 'login');

File Locking

For exclusive access to the database file, use file locking:

yaml
steps:
  - name: exclusive-operation
    type: sqlite
    config:
      dsn: "file:./shared.db"
      file_lock: true
    command: |
      DELETE FROM cache WHERE expires_at < datetime('now');
      VACUUM;

TIP

File locking creates a .lock file next to the database (e.g., shared.db.lock) and uses OS-level locking to ensure only one process can access the database at a time.

Distributed Workflow Example

yaml
name: cache-cleanup
steps:
  - name: cleanup-expired
    type: sqlite
    config:
      dsn: "file:/shared/cache.db"
      file_lock: true
      transaction: true
    command: |
      -- Safe to run from multiple workers
      DELETE FROM cache WHERE expires_at < datetime('now');
      DELETE FROM sessions WHERE last_activity < datetime('now', '-1 day');

Data Import

CSV Import

yaml
steps:
  - name: import-products
    type: sqlite
    config:
      dsn: "file:./inventory.db"
      import:
        input_file: /data/products.csv
        table: products
        format: csv
        has_header: true
        batch_size: 500

JSONL Import

yaml
steps:
  - name: import-events
    type: sqlite
    config:
      dsn: "file:./events.db"
      import:
        input_file: /data/events.jsonl
        table: events
        format: jsonl

Import with Conflict Handling

SQLite supports INSERT OR IGNORE and INSERT OR REPLACE:

yaml
steps:
  - name: upsert-data
    type: sqlite
    config:
      dsn: "file:./app.db"
      import:
        input_file: /data/updates.csv
        table: products
        on_conflict: replace  # Uses INSERT OR REPLACE
on_conflictSQLite Behavior
errorFail on duplicate (default)
ignoreINSERT OR IGNORE - skip duplicates
replaceINSERT OR REPLACE - update existing rows

TIP

Unlike PostgreSQL where replace uses ON CONFLICT DO NOTHING, SQLite's replace truly replaces existing rows using INSERT OR REPLACE.

Output Formats

JSONL (Streaming)

yaml
steps:
  - name: export-jsonl
    type: sqlite
    config:
      dsn: "file:./app.db"
      output_format: jsonl
    command: "SELECT * FROM products"

Output:

{"id":1,"name":"Widget","price":9.99}
{"id":2,"name":"Gadget","price":19.99}

JSON Array

yaml
steps:
  - name: export-json
    type: sqlite
    config:
      dsn: "file:./app.db"
      output_format: json
    command: "SELECT * FROM products LIMIT 1000"

Memory Usage

The json format buffers ALL rows in memory before writing. For large result sets, use jsonl or csv instead. Always use LIMIT or max_rows with json format.

CSV

yaml
steps:
  - name: export-csv
    type: sqlite
    config:
      dsn: "file:./app.db"
      output_format: csv
      headers: true
    command: "SELECT id, name, price FROM products"

Streaming Large Results

yaml
steps:
  - name: export-logs
    type: sqlite
    config:
      dsn: "file:./logs.db"
      streaming: true
      output_file: /data/logs-export.jsonl
      output_format: jsonl    # Use jsonl or csv for large results
    command: "SELECT * FROM logs WHERE date >= date('now', '-7 days')"

Best Practices for Large Results

  • Use output_format: jsonl or csv - these formats stream rows immediately
  • Avoid output_format: json - it buffers all rows in memory before writing
  • Set max_rows as a safety limit for unbounded queries

Multiple Statements

yaml
steps:
  - name: setup-database
    type: sqlite
    config:
      dsn: "file:./app.db"
    command: |
      CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE,
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
      );

      CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);

      CREATE TABLE IF NOT EXISTS sessions (
        id TEXT PRIMARY KEY,
        user_id INTEGER REFERENCES users(id),
        expires_at DATETIME NOT NULL
      );

SQLite Functions

SQLite provides many built-in functions:

yaml
steps:
  - name: aggregate-data
    type: sqlite
    config:
      dsn: "file:./sales.db"
    command: |
      SELECT
        date(created_at) as sale_date,
        count(*) as order_count,
        sum(total) as revenue,
        avg(total) as avg_order,
        group_concat(product_name, ', ') as products
      FROM orders
      WHERE created_at >= date('now', '-30 days')
      GROUP BY date(created_at)
      ORDER BY sale_date DESC

Error Handling

yaml
steps:
  - name: safe-query
    type: sqlite
    config:
      dsn: "file:./app.db"
      timeout: 30
    command: "SELECT * FROM large_table"
    retry_policy:
      limit: 3
      interval_sec: 2
    continue_on:
      failure: true

Complete Example

yaml
name: local-data-pipeline
env:
  - DB_PATH: "./data/analytics.db"

steps:
  - name: setup-schema
    type: sqlite
    config:
      dsn: "file:${DB_PATH}"
    command: |
      CREATE TABLE IF NOT EXISTS raw_events (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        event_type TEXT NOT NULL,
        payload TEXT,
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
      );

      CREATE TABLE IF NOT EXISTS daily_stats (
        date TEXT PRIMARY KEY,
        event_count INTEGER,
        unique_types INTEGER
      );

  - name: import-events
    type: sqlite
    config:
      dsn: "file:${DB_PATH}"
      import:
        input_file: /data/events-${TODAY}.jsonl
        table: raw_events
        format: jsonl
        batch_size: 1000
    depends:
      - setup-schema

  - name: calculate-stats
    type: sqlite
    config:
      dsn: "file:${DB_PATH}"
      file_lock: true
      transaction: true
    command: |
      INSERT OR REPLACE INTO daily_stats (date, event_count, unique_types)
      SELECT
        date(created_at) as date,
        count(*) as event_count,
        count(DISTINCT event_type) as unique_types
      FROM raw_events
      WHERE date(created_at) = date('now')
      GROUP BY date(created_at);
    depends:
      - import-events

  - name: export-report
    type: sqlite
    config:
      dsn: "file:${DB_PATH}"
      streaming: true
      output_file: /reports/daily-stats.json
      output_format: json
    command: |
      SELECT * FROM daily_stats
      ORDER BY date DESC
      LIMIT 30
    depends:
      - calculate-stats

See Also

Released under the MIT License.