Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Chapter 11: Pipelines

Bioinformatics workflows are rarely a single step. A typical analysis chains quality control, alignment, variant calling, filtering, and annotation into a multi-stage workflow. BioLang’s pipe-first design with |> makes these workflows natural to write. For larger workflows with named stages, BioLang also provides pipeline blocks with stage declarations.

Pipe Composition: The Primary Pipeline Mechanism

The pipe operator |> is BioLang’s core tool for building pipelines. It inserts the left-hand value as the first argument to the right-hand function, creating readable chains of transformations.

# Single-step pipe
let reads = read_fastq("data/reads.fastq")

# Multi-step pipeline via pipe chaining
let result = read_fastq("data/reads.fastq")
  |> filter(|r| mean_phred(r.quality) >= 30)
  |> map(|r| {seq: r.sequence, gc: gc_content(r.sequence)})
  |> sort_by(|a, b| b.gc - a.gc)

println("Top GC: " + str(result[0].gc))

Each |> feeds its result forward. You can chain as many steps as you need with no special syntax. This is how most BioLang pipelines are written.

Multi-Step Workflows with Variable Bindings

For workflows where intermediate results need names, use sequential let bindings connected by pipes. Each step is explicit and inspectable.

# FASTQ QC pipeline
let raw = read_fastq("data/reads.fastq")

let stats = raw
  |> map(|r| mean_phred(r.quality))
let avg_quality = mean(stats)
let total_reads = len(raw)

println("Reads: " + str(total_reads) + ", Mean Q: " + str(round(avg_quality, 1)))

# Filter and write
let passed = read_fastq("data/reads.fastq")
  |> filter(|r| mean_phred(r.quality) >= 25)
  |> collect()

write_fastq(passed, "sample_R1.filtered.fastq.gz")
println("Kept " + str(len(passed)) + " of " + str(total_reads) + " reads")

This pattern gives you full control: name any intermediate, inspect it, branch on it, or feed it into multiple downstream steps.

Group, Summarize, and Count

BioLang’s table operations work naturally in pipe chains for aggregation workflows.

# Variant summary pipeline
let summary = read_vcf("data/variants.vcf")
  |> filter(|v| v.qual >= 30)
  |> classify_variants()
  |> group_by("type")
  |> count_by("type")

println(summary)

# Per-chromosome depth analysis
let depths = read_bed("data/regions.bed")
  |> group_by("chrom")
  |> summarize(|chrom, rows| {chrom: chrom, mean_depth: mean(col(rows, "score"))})
  |> arrange("chrom")

println(depths)

The group_by, count_by, filter_by, summarize, and arrange builtins all accept pipe input, so they compose seamlessly.

Pipeline Blocks

For larger workflows with named stages, BioLang provides pipeline blocks. A pipeline declares a named workflow. Inside the block you can use stage declarations to name intermediate results with the arrow (->) syntax.

pipeline fastq_qc {
  # stage name -> expression
  stage raw_stats -> read_fastq("data/reads.fastq")
    |> map(|r| mean_phred(r.quality))
    |> mean()

  stage filtered -> read_fastq("data/reads.fastq")
    |> filter(|r| mean_phred(r.quality) >= 30)

  stage write_out -> write_fastq(filtered, "sample_R1.filtered.fastq.gz")

  stage report -> {
    mean_quality: raw_stats,
    kept: len(filtered),
  }
}

# fastq_qc is bound to the result of the last stage
println("Mean Q: " + str(fastq_qc.mean_quality))

When a pipeline block has no parameters, it executes immediately and binds its result to the pipeline name. Each stage name -> expr evaluates the expression and makes the result available by name to subsequent stages.

Passing Data Between Stages

Stage names become bindings that later stages can reference. This creates an implicit dependency chain.

pipeline align_and_call {
  stage aligned -> shell("bwa-mem2 mem -t 16 GRCh38.fa sample_R1.fq.gz sample_R2.fq.gz | samtools sort -@ 8 -o aligned.sorted.bam")

  stage indexed -> shell("samtools index " + aligned)

  stage variants -> shell("bcftools mpileup -f GRCh38.fa " + aligned + " | bcftools call -mv -Oz -o variants.vcf.gz")

  stage stats -> read_vcf("data/variants.vcf")
    |> filter(|v| v.qual >= 30)
    |> classify_variants()
    |> count_by("type")
}

println(align_and_call)

Each stage name resolves to whatever its expression evaluated to. The aligned stage returns a string (the shell output), which indexed and variants reference directly.

Parameterized Pipelines (Templates)

A pipeline can accept parameters, turning it into a reusable template. When you declare pipeline name(params) { ... }, BioLang defines a callable function instead of executing immediately.

# Define a reusable alignment template
pipeline align_sample(sample_id, r1, r2, reference) {
  stage sorted -> shell("bwa-mem2 mem -t 16 " + reference + " " + r1 + " " + r2
        + " | samtools sort -@ 8 -o " + sample_id + ".sorted.bam")

  stage indexed -> shell("samtools index " + sorted)

  # Pipeline returns its last stage value
  sorted
}

# Call with different inputs
let tumor_bam = align_sample("tumor", "tumor_R1.fq.gz", "tumor_R2.fq.gz", "GRCh38.fa")
let normal_bam = align_sample("normal", "normal_R1.fq.gz", "normal_R2.fq.gz", "GRCh38.fa")
println("Tumor BAM: " + tumor_bam)
println("Normal BAM: " + normal_bam)

Parameterized pipelines behave exactly like functions. You can loop over a sample list to process a whole cohort:

let samples = [
  {id: "S1", r1: "S1_R1.fq.gz", r2: "S1_R2.fq.gz"},
  {id: "S2", r1: "S2_R1.fq.gz", r2: "S2_R2.fq.gz"},
  {id: "S3", r1: "S3_R1.fq.gz", r2: "S3_R2.fq.gz"},
]

let bams = samples |> map(|s| align_sample(s.id, s.r1, s.r2, "GRCh38.fa"))
println("Aligned " + str(len(bams)) + " samples")

Parallel Processing

par_map and par_filter

For data-parallel operations on lists, par_map and par_filter distribute work across available cores.

let sequences = read_fasta("data/sequences.fasta")

# Compute GC content in parallel
let gc_values = sequences
  |> par_map(|seq| {name: seq.name, gc: gc_content(seq.sequence)})

# Filter in parallel
let high_gc = gc_values
  |> par_filter(|s| s.gc > 0.6)

println("High GC sequences: " + str(len(high_gc)))

parallel for

For workflows that need to run a block of statements per item, parallel for fans out iterations. In the current tree-walking interpreter these run sequentially; a future bytecode backend will parallelize them.

let samples = [
  {id: "tumor_01", r1: "t01_R1.fq.gz", r2: "t01_R2.fq.gz"},
  {id: "tumor_02", r1: "t02_R1.fq.gz", r2: "t02_R2.fq.gz"},
  {id: "normal_01", r1: "n01_R1.fq.gz", r2: "n01_R2.fq.gz"},
]

parallel for sample in samples {
  let bam = shell("bwa-mem2 mem -t 4 GRCh38.fa " + sample.r1 + " " + sample.r2
                   + " | samtools sort -@ 4 -o " + sample.id + ".sorted.bam")
  shell("samtools index " + sample.id + ".sorted.bam")
  println("Done: " + sample.id)
}

The result of a parallel for is the value of the last iteration’s block.

Shell Integration

The shell() builtin executes external commands and returns their stdout as a string. This is how BioLang integrates with existing bioinformatics tools.

# Run a single command
let flagstat = shell("samtools flagstat aligned.bam")
println(flagstat)

# Chain shell commands in a pipeline
let bam = shell("bwa-mem2 mem -t 16 ref.fa R1.fq.gz R2.fq.gz | samtools sort -@ 8 -o out.bam")

# Use shell output in downstream BioLang processing
let depth_lines = shell("samtools depth aligned.bam")
let depths = depth_lines
  |> split("\n")
  |> filter(|line| len(line) > 0)
  |> map(|line| split(line, "\t"))
  |> map(|parts| float(parts[2]))

println("Mean depth: " + str(mean(depths)))

Defer for Cleanup

defer registers an expression that runs when the enclosing scope exits, whether it succeeds or fails. This keeps intermediate files from piling up.

pipeline trimmed_alignment {
  stage trimmed -> shell("fastp -i sample_R1.fq.gz -I sample_R2.fq.gz -o trimmed_R1.fq.gz -O trimmed_R2.fq.gz")

  defer shell("rm -f trimmed_R1.fq.gz trimmed_R2.fq.gz")

  stage aligned -> shell("bwa-mem2 mem -t 16 GRCh38.fa trimmed_R1.fq.gz trimmed_R2.fq.gz | samtools sort -@ 8 -o aligned.bam")

  stage indexed -> shell("samtools index " + aligned)

  aligned
}
# When the pipeline finishes, the deferred rm command fires

Error Handling in Pipelines

Wrap pipeline steps in try/catch to handle failures gracefully without aborting the entire workflow.

let samples = ["sample_A", "sample_B", "sample_C"]

let results = samples |> map(|name| {
  try {
    let bam = shell("bwa-mem2 mem -t 8 ref.fa " + name + "_R1.fq.gz " + name + "_R2.fq.gz | samtools sort -o " + name + ".bam")
    shell("samtools index " + name + ".bam")
    {id: name, status: "ok", bam: name + ".bam"}
  } catch e {
    println("WARN: " + name + " failed: " + str(e))
    {id: name, status: "failed", bam: nil}
  }
})

let passed = results |> filter(|r| r.status == "ok")
let failed = results |> filter(|r| r.status == "failed")
println(str(len(passed)) + " succeeded, " + str(len(failed)) + " failed")

You can also use try/catch around individual stages within a pipeline block to continue past non-critical failures.

Example: FASTQ QC Pipeline

A complete quality-control workflow using only pipe composition.

# Read and analyze
let reads = read_fastq("data/reads.fastq")
let qualities = reads |> map(|r| mean_phred(r.quality))

let qc = {
  total_reads: len(reads),
  mean_quality: round(mean(qualities), 2),
  median_quality: round(median(qualities), 2),
  q30_pct: round(len(filter(qualities, |q| q >= 30)) / len(qualities) * 100, 1),
  gc_mean: round(mean(reads |> map(|r| gc_content(r.sequence))), 4),
}

println("=== FASTQ QC Report ===")
println("Total reads:    " + str(qc.total_reads))
println("Mean quality:   " + str(qc.mean_quality))
println("Median quality: " + str(qc.median_quality))
println("Q30%:           " + str(qc.q30_pct) + "%")
println("Mean GC:        " + str(qc.gc_mean))

# Filter and write passing reads
let passed = reads |> filter(|r| mean_phred(r.quality) >= 25)
write_fastq(passed, "sample_R1.qc_passed.fastq.gz")
println("Wrote " + str(len(passed)) + " passing reads")

# Notify on completion
notify("FASTQ QC complete: " + str(qc.total_reads) + " reads, Q30=" + str(qc.q30_pct) + "%")

Example: Variant Calling Pipeline

Germline variant calling from FASTQ to filtered VCF using pipeline blocks.

pipeline germline_variants {
  stage aligned -> shell(
    "bwa-mem2 mem -t 16 -R '@RG\\tID:S1\\tSM:sample' GRCh38.fa "
    + "sample_R1.fq.gz sample_R2.fq.gz "
    + "| samtools sort -@ 8 -o sample.sorted.bam"
  )

  stage indexed -> shell("samtools index " + aligned)

  stage called -> shell(
    "gatk HaplotypeCaller -I " + aligned
    + " -R GRCh38.fa -L exome_targets.bed -O sample.g.vcf.gz -ERC GVCF"
  )

  stage genotyped -> shell(
    "gatk GenotypeGVCFs -V " + called + " -R GRCh38.fa -O sample.vcf.gz"
  )

  stage filtered -> shell(
    "gatk VariantFiltration -V " + genotyped
    + " --filter-name 'LowQD' --filter-expression 'QD < 2.0'"
    + " --filter-name 'HighFS' --filter-expression 'FS > 60.0'"
    + " -O sample.filtered.vcf.gz"
  )

  # Analyze the results in BioLang
  stage summary -> read_vcf("data/variants.vcf")
    |> filter(|v| v.filter == "PASS")
    |> classify_variants()
    |> count_by("type")
}

println(germline_variants)

Example: Multi-Sample Analysis

Processing a cohort with parameterized pipelines and aggregation.

# Reusable per-sample pipeline
pipeline process_sample(sample_id, r1, r2) {
  stage bam -> shell(
    "bwa-mem2 mem -t 8 -R '@RG\\tID:" + sample_id + "\\tSM:" + sample_id
    + "' GRCh38.fa " + r1 + " " + r2
    + " | samtools sort -@ 4 -o " + sample_id + ".sorted.bam"
  )

  stage idx -> shell("samtools index " + bam)

  stage depth_info -> shell("samtools depth " + bam)
    |> split("\n")
    |> filter(|line| len(line) > 0)
    |> map(|line| float(split(line, "\t")[2]))
    |> mean()

  {id: sample_id, bam: bam, mean_depth: depth_info}
}

# Load sample manifest
let manifest = read_csv("data/sample_sheet.csv")

# Process all samples
let results = manifest
  |> map(|row| {
    try {
      process_sample(row.sample_id, row.fastq_r1, row.fastq_r2)
    } catch e {
      println("WARN: " + row.sample_id + " failed: " + str(e))
      {id: row.sample_id, bam: nil, mean_depth: 0.0}
    }
  })

# QC gate: check depth thresholds
let passed = results |> filter(|s| s.mean_depth >= 20.0)
let failed = results |> filter(|s| s.mean_depth < 20.0)

if len(failed) > 0 then
  println("WARNING: " + str(len(failed)) + " samples below 20x depth:")
each(failed, |s| println("  " + s.id + ": " + str(round(s.mean_depth, 1)) + "x"))

println(str(len(passed)) + " of " + str(len(results)) + " samples passed QC")

# Write summary table
let summary = results
  |> map(|s| {sample: s.id, depth: round(s.mean_depth, 1), pass: s.mean_depth >= 20.0})
write_tsv(summary, "cohort_qc_summary.csv")

# Notify the team
slack("Cohort processing complete: " + str(len(passed)) + "/" + str(len(results)) + " passed QC")

Pipeline Composition

Pipelines are values. Parameterized pipelines are functions. You can call one from another to build layered workflows.

pipeline align_one(sample) {
  stage sorted -> shell(
    "bwa-mem2 mem -t 8 GRCh38.fa " + sample.r1 + " " + sample.r2
    + " | samtools sort -@ 4 -o " + sample.id + ".sorted.bam"
  )
  stage idx -> shell("samtools index " + sorted)
  sorted
}

pipeline somatic_pair(tumor, normal) {
  stage tumor_bam -> align_one(tumor)
  stage normal_bam -> align_one(normal)

  stage called -> shell(
    "gatk Mutect2 -I " + tumor_bam + " -I " + normal_bam
    + " -R GRCh38.fa -O somatic.vcf.gz"
  )

  stage filtered -> shell(
    "gatk FilterMutectCalls -V " + called + " -R GRCh38.fa -O somatic.filtered.vcf.gz"
  )

  let variants = read_vcf("data/variants.vcf")
    |> filter(|v| v.filter == "PASS")
  println("Found " + str(len(variants)) + " somatic variants")
  filtered
}

let result = somatic_pair(
  {id: "tumor", r1: "tumor_R1.fq.gz", r2: "tumor_R2.fq.gz"},
  {id: "normal", r1: "normal_R1.fq.gz", r2: "normal_R2.fq.gz"}
)

Summary

BioLang pipelines come in two flavors:

  • Pipe chains (|>): The everyday tool. Chain any sequence of transformations into a readable, left-to-right flow. No special syntax needed.

  • Pipeline blocks (pipeline name { stage x -> expr }): For larger workflows where you want named stages, parameterized templates, and stage-to-stage data passing.

Both approaches compose freely. Use par_map and par_filter for data-parallel operations, parallel for for per-item workflows, shell() to call external tools, try/catch for error resilience, and defer for cleanup. The result is a language where analysis pipelines are just ordinary code, with no framework overhead.