SQLite
Execute queries and data operations against SQLite databases. Uses a pure Go SQLite implementation with no external dependencies.
Basic Usage
steps:
- name: query-data
type: sqlite
config:
dsn: "file:./data.db"
command: "SELECT * FROM users"
output: USERS # Capture results to variableOutput Destination
Query results are written to stdout by default (JSONL format). Use output: VAR_NAME to capture results into an environment variable. For large results, use streaming: true with output_file.
Connection String
SQLite supports file-based and in-memory databases:
File Database
config:
dsn: "file:./myapp.db"Or with options:
config:
dsn: "file:./myapp.db?mode=rw&cache=shared"| Parameter | Description |
|---|---|
mode | ro (read-only), rw (read-write), rwc (read-write-create), memory |
cache | shared (shared cache), private (private cache) |
In-Memory Database
config:
dsn: ":memory:"In-Memory Database Sharing
By default, :memory: databases are ephemeral and not shared between steps. To share an in-memory database across steps within the same DAG run, use shared_memory: true:
config:
dsn: ":memory:"
shared_memory: true # Enables shared cache modeThis converts the DSN to file::memory:?cache=shared internally. For persistent storage, use file databases.
Configuration
steps:
- name: query
type: sqlite
config:
dsn: "file:./app.db"
timeout: 30 # Query timeout in seconds
shared_memory: false # Set true for :memory: databases to share across stepsConnection Pooling
SQLite connection pooling is not configurable. Each step always uses 1 connection (optimal for SQLite's locking model). Global pool management does not apply to SQLite, even in distributed worker mode.
Default Pragmas
The SQLite driver automatically configures these pragmas for robustness:
PRAGMA foreign_keys = ON; -- Enable foreign key enforcement
PRAGMA busy_timeout = 5000; -- Wait up to 5 seconds if database is lockedParameterized Queries
Named Parameters
Use :name syntax for named parameters:
steps:
- name: find-user
type: sqlite
config:
dsn: "file:./app.db"
params:
status: active
role: admin
command: |
SELECT * FROM users
WHERE status = :status AND role = :rolePositional Parameters
SQLite uses ? for positional parameters:
steps:
- name: find-user
type: sqlite
config:
dsn: "file:./app.db"
params:
- active
- admin
command: "SELECT * FROM users WHERE status = ? AND role = ?"Transactions
steps:
- name: batch-update
type: sqlite
config:
dsn: "file:./app.db"
transaction: true
command: |
UPDATE users SET last_seen = datetime('now') WHERE id = 1;
UPDATE users SET login_count = login_count + 1 WHERE id = 1;
INSERT INTO activity_log (user_id, action) VALUES (1, 'login');File Locking
For exclusive access to the database file, use file locking:
steps:
- name: exclusive-operation
type: sqlite
config:
dsn: "file:./shared.db"
file_lock: true
command: |
DELETE FROM cache WHERE expires_at < datetime('now');
VACUUM;TIP
File locking creates a .lock file next to the database (e.g., shared.db.lock) and uses OS-level locking to ensure only one process can access the database at a time.
Distributed Workflow Example
name: cache-cleanup
steps:
- name: cleanup-expired
type: sqlite
config:
dsn: "file:/shared/cache.db"
file_lock: true
transaction: true
command: |
-- Safe to run from multiple workers
DELETE FROM cache WHERE expires_at < datetime('now');
DELETE FROM sessions WHERE last_activity < datetime('now', '-1 day');Data Import
CSV Import
steps:
- name: import-products
type: sqlite
config:
dsn: "file:./inventory.db"
import:
input_file: /data/products.csv
table: products
format: csv
has_header: true
batch_size: 500JSONL Import
steps:
- name: import-events
type: sqlite
config:
dsn: "file:./events.db"
import:
input_file: /data/events.jsonl
table: events
format: jsonlImport with Conflict Handling
SQLite supports INSERT OR IGNORE and INSERT OR REPLACE:
steps:
- name: upsert-data
type: sqlite
config:
dsn: "file:./app.db"
import:
input_file: /data/updates.csv
table: products
on_conflict: replace # Uses INSERT OR REPLACE| on_conflict | SQLite Behavior |
|---|---|
error | Fail on duplicate (default) |
ignore | INSERT OR IGNORE - skip duplicates |
replace | INSERT OR REPLACE - update existing rows |
TIP
Unlike PostgreSQL where replace uses ON CONFLICT DO NOTHING, SQLite's replace truly replaces existing rows using INSERT OR REPLACE.
Output Formats
JSONL (Streaming)
steps:
- name: export-jsonl
type: sqlite
config:
dsn: "file:./app.db"
output_format: jsonl
command: "SELECT * FROM products"Output:
{"id":1,"name":"Widget","price":9.99}
{"id":2,"name":"Gadget","price":19.99}JSON Array
steps:
- name: export-json
type: sqlite
config:
dsn: "file:./app.db"
output_format: json
command: "SELECT * FROM products LIMIT 1000"Memory Usage
The json format buffers ALL rows in memory before writing. For large result sets, use jsonl or csv instead. Always use LIMIT or max_rows with json format.
CSV
steps:
- name: export-csv
type: sqlite
config:
dsn: "file:./app.db"
output_format: csv
headers: true
command: "SELECT id, name, price FROM products"Streaming Large Results
steps:
- name: export-logs
type: sqlite
config:
dsn: "file:./logs.db"
streaming: true
output_file: /data/logs-export.jsonl
output_format: jsonl # Use jsonl or csv for large results
command: "SELECT * FROM logs WHERE date >= date('now', '-7 days')"Best Practices for Large Results
- Use
output_format: jsonlorcsv- these formats stream rows immediately - Avoid
output_format: json- it buffers all rows in memory before writing - Set
max_rowsas a safety limit for unbounded queries
Multiple Statements
steps:
- name: setup-database
type: sqlite
config:
dsn: "file:./app.db"
command: |
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
expires_at DATETIME NOT NULL
);SQLite Functions
SQLite provides many built-in functions:
steps:
- name: aggregate-data
type: sqlite
config:
dsn: "file:./sales.db"
command: |
SELECT
date(created_at) as sale_date,
count(*) as order_count,
sum(total) as revenue,
avg(total) as avg_order,
group_concat(product_name, ', ') as products
FROM orders
WHERE created_at >= date('now', '-30 days')
GROUP BY date(created_at)
ORDER BY sale_date DESCError Handling
steps:
- name: safe-query
type: sqlite
config:
dsn: "file:./app.db"
timeout: 30
command: "SELECT * FROM large_table"
retry_policy:
limit: 3
interval_sec: 2
continue_on:
failure: trueComplete Example
name: local-data-pipeline
env:
- DB_PATH: "./data/analytics.db"
steps:
- name: setup-schema
type: sqlite
config:
dsn: "file:${DB_PATH}"
command: |
CREATE TABLE IF NOT EXISTS raw_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT NOT NULL,
payload TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS daily_stats (
date TEXT PRIMARY KEY,
event_count INTEGER,
unique_types INTEGER
);
- name: import-events
type: sqlite
config:
dsn: "file:${DB_PATH}"
import:
input_file: /data/events-${TODAY}.jsonl
table: raw_events
format: jsonl
batch_size: 1000
depends:
- setup-schema
- name: calculate-stats
type: sqlite
config:
dsn: "file:${DB_PATH}"
file_lock: true
transaction: true
command: |
INSERT OR REPLACE INTO daily_stats (date, event_count, unique_types)
SELECT
date(created_at) as date,
count(*) as event_count,
count(DISTINCT event_type) as unique_types
FROM raw_events
WHERE date(created_at) = date('now')
GROUP BY date(created_at);
depends:
- import-events
- name: export-report
type: sqlite
config:
dsn: "file:${DB_PATH}"
streaming: true
output_file: /reports/daily-stats.json
output_format: json
command: |
SELECT * FROM daily_stats
ORDER BY date DESC
LIMIT 30
depends:
- calculate-statsSee Also
- ETL Overview - Common configuration and features
- PostgreSQL - PostgreSQL-specific documentation
