Pipeline Blocks

BioLang has a dedicated pipeline keyword for organizing multi-step workflows. A pipeline block groups named stage steps, supports parameters, and integrates defer for cleanup. Pipelines complement the |> pipe operator — pipes compose data transformations, while pipeline blocks organize entire workflows.

Basic Pipeline

A pipeline block executes its body and binds the result to the pipeline's name. Inside the block, stage name -> expression labels each step:

# A simple pipeline with named stages
pipeline analysis {
  stage data -> [10, 25, 30, 5, 18, 42, 7]
  stage filtered -> data |> filter(|n| n >= 10)
  stage total -> filtered |> sum()
  println(f"Sum of values >= 10: {total}")
}

# The pipeline name 'analysis' is now bound to the result
println(f"Pipeline result: {analysis}")

Stages

Each stage name -> expr evaluates the expression and binds the result to name within the pipeline scope. Later stages can reference earlier ones:

pipeline stats {
  stage scores -> [85, 92, 78, 95, 88, 72, 91, 64]
  stage n -> len(scores)
  stage avg -> sum(scores) / n
  stage above_avg -> scores |> filter(|s| s > avg)
  println(f"Average: {avg}, Above average: {len(above_avg)}/{n}")
}

Parameterized Pipelines

Add parameters to make a pipeline reusable. A parameterized pipeline is defined as a function — call it later with arguments:

# Define a parameterized pipeline (becomes a function)
pipeline classify(values, threshold) {
  stage high -> values |> filter(|v| v >= threshold)
  stage low -> values |> filter(|v| v < threshold)
  println(f"  High ({">="}{threshold}): {high}")
  println(f"  Low  (<{threshold}): {low}")
  {high: high, low: low}
}

# Call it with different arguments
println("Classify by 50:")
let r1 = classify([20, 55, 80, 10, 90, 45], 50)

println("Classify by 75:")
let r2 = classify([20, 55, 80, 10, 90, 45], 75)

defer for Cleanup

The defer statement schedules an expression to run when the enclosing scope exits, regardless of whether an error occurred. This is useful for cleanup tasks like removing temporary files:

# defer runs the expression at scope exit
fn process_with_cleanup() {
  defer println("cleanup: step 3 (runs last)")
  defer println("cleanup: step 2")
  defer println("cleanup: step 1 (defers run in reverse order)")
  println("doing work...")
  let result = 42
  println(f"work done, result = {result}")
  result
}

let r = process_with_cleanup()
println(f"final result: {r}")

Shell Integration

CLI only: The shell() function executes external commands and is only available when running scripts with bl run. It is not available in the browser playground.

Inside pipeline blocks, shell("command") calls external tools. This is how BioLang integrates with BWA, samtools, GATK, and other CLI tools:

# CLI-only: external tool integration
# pipeline align_and_call(sample_id, ref) {
#   stage align -> shell(f"bwa-mem2 mem -t 8 {ref} {sample_id}_R1.fq.gz | samtools sort -o {sample_id}.bam")
#   stage index -> shell(f"samtools index {sample_id}.bam")
#   stage call  -> shell(f"gatk HaplotypeCaller -R {ref} -I {sample_id}.bam -O {sample_id}.vcf.gz")
#   defer shell(f"rm -f {sample_id}.bam.tmp*")
#   println(f"Pipeline complete for {sample_id}")
# }

Pipeline with Error Handling

Use try/catch inside pipeline stages for fault tolerance:

# Pipeline with per-item error handling
pipeline safe_processing {
  stage items -> [10, 0, 5, 0, 20]

  stage results -> items |> map(|x| {
    try {
      {value: 100 / x, ok: true}
    } catch e {
      {value: 0, ok: false}
    }
  })

  stage good -> results |> filter(|r| r.ok) |> map(|r| r.value)
  stage bad_count -> results |> filter(|r| !r.ok) |> len()

  println(f"Succeeded: {len(good)}, Failed: {bad_count}")
  println(f"Good values: {good}")
}

Example: Pure BioLang Data Pipeline

A complete runnable pipeline that processes structured data:

# Runnable data analysis pipeline
pipeline student_analysis {
  stage students -> [
    {name: "Alice", math: 92, science: 88},
    {name: "Bob", math: 78, science: 85},
    {name: "Carol", math: 95, science: 91},
    {name: "Dan", math: 65, science: 72},
    {name: "Eve", math: 88, science: 94},
  ]

  stage with_avg -> students |> map(|s| {
    name: s.name,
    math: s.math,
    science: s.science,
    average: (s.math + s.science) / 2
  })

  stage honor_roll -> with_avg |> filter(|s| s.average >= 85)

  stage class_avg -> with_avg
    |> map(|s| s.average)
    |> sum()
  let class_avg = class_avg / len(students)

  println("=== Student Report ===")
  with_avg |> each(|s| println(f"  {s.name}: avg={s.average}"))
  println(f"\nClass average: {class_avg}")
  println(f"Honor roll: {honor_roll |> map(|s| s.name)}")
}

Summary

FeatureSyntaxDescription
Pipeline blockpipeline name { ... }Named workflow block, result bound to name
Parameterizedpipeline name(params) { ... }Reusable pipeline, becomes a callable function
Stagestage name -> exprNamed step, binds result for later stages
Deferdefer exprCleanup on scope exit (reverse order)
Shellshell("command")Run external tools (CLI only)
Pipe chaina |> f() |> g()Data transformation within stages
Error handlingtry { ... } catch e { ... }Recover from errors within stages