Batch Processing

Processing multiple samples is the norm in bioinformatics. BioLang provides glob patterns, sample sheet parsing, parallel execution, and progress tracking to handle batch workflows elegantly.

Finding Input Files

Glob patterns

# Find all FASTQ files
let fastqs = glob("data/*.fq.gz")
print("Found #{len(fastqs)} FASTQ files")

# Find paired-end files
let r1_files = glob("data/*_R1.fq.gz") |> sort
let r2_files = glob("data/*_R2.fq.gz") |> sort

# Pair them up
let pairs = zip(r1_files, r2_files) |> map(|pair| {
  let sample = replace(pair[0], "_R1.fq.gz", "")
  { sample: sample, r1: pair[0], r2: pair[1] }
})

for p in pairs {
  print("#{p.sample}: #{p.r1}, #{p.r2}")
}

Sample sheet driven

# Load samples from a sample sheet
let samples = read_csv("data/sample_sheet.csv")

# Validate all files exist before starting
let missing = []
for s in samples {
  if !file_exists(s["fastq_r1"]) { push(missing, s["fastq_r1"]) }
  if !file_exists(s["fastq_r2"]) { push(missing, s["fastq_r2"]) }
}

if len(missing) > 0 {
  print("Missing files:")
  for f in missing { print("  #{f}") }
  exit(1)
}

print("All #{len(samples)} samples validated, #{len(samples) * 2} files found")

Sequential Batch Processing

Processing samples one at a time

# Process each sample sequentially with progress tracking
let samples = read_csv("data/sample_sheet.csv") |> collect
let total = len(samples)

for i, sample in enumerate(samples) {
  let name = sample["sample_id"]
  let out_dir = "results/#{name}"
  mkdir(out_dir)

  print("[#{i + 1}/#{total}] Processing #{name}...")

  # Quality filtering
  let in_count = read_fastq(sample["fastq_r1"]) |> count
  read_fastq(sample["fastq_r1"])
    |> filter(|r| mean_phred(r.quality) >= 30.0 && len(r.seq) >= 50)
    |> write_read_fastq("#{out_dir}/filtered_R1.fq.gz")

  let out_count = read_fastq("#{out_dir}/filtered_R1.fq.gz") |> count
  let pct_pass = round(float(out_count) / float(in_count) * 100.0, 1)
  print("  #{name}: #{in_count} -> #{out_count} reads (#{pct_pass}% passed)")
}

print("Batch complete: #{total} samples processed")

Parallel Batch Processing

Processing samples in parallel

# Process multiple samples concurrently
let samples = read_csv("data/sample_sheet.csv") |> collect

fn process_sample(sample) {
  let name = sample["sample_id"]
  let out_dir = "results/#{name}"
  mkdir(out_dir)

  # Count total reads and compute mean length
  let reads = read_read_fastq(sample["fastq_r1"])
  let total = len(reads)
  let lengths = map(reads, |r| r.length)
  let avg_len = round(mean(lengths), 1)

  # Filter and write
  read_fastq(sample["fastq_r1"])
    |> filter(|r| mean_phred(r.quality) >= 30.0)
    |> write_read_fastq("#{out_dir}/filtered.fq.gz")

  let filtered_count = read_fastq("#{out_dir}/filtered.fq.gz") |> count

  {
    sample: name,
    total_reads: total,
    mean_length: avg_len,
    passed_reads: filtered_count,
    pass_rate: round(float(filtered_count) / float(total) * 100.0, 1)
  }
}

# Run up to 4 samples in parallel
let results = par_map(samples, process_sample)

# Summary report
write_tsv(from_records(results), "batch_summary.tsv")
print("\nBatch Summary:")
print("  Samples processed: #{len(results)}")
print("  Total reads:       #{sum(map(results, |r| r.total_reads))}")
print("  Mean pass rate:    #{round(mean(map(results, |r| r.pass_rate)), 1)}%")

Parallel with error handling

# Handle failures gracefully in parallel processing
let samples = read_csv("data/sample_sheet.csv") |> collect

let results = par_map(samples, |sample| {
  try {
    let name = sample["sample_id"]
    let n_reads = read_fastq(sample["fastq_r1"]) |> count
    { sample: name, status: "ok", reads: n_reads, error: nil }
  } catch e {
    { sample: sample["sample_id"], status: "failed", reads: 0, error: to_string(e) }
  }
})

let succeeded = filter(results, |r| r.status == "ok")
let failed = filter(results, |r| r.status == "failed")

print("Succeeded: #{len(succeeded)}")
print("Failed: #{len(failed)}")

if len(failed) > 0 {
  print("\nFailed samples:")
  for f in failed {
    print("  #{f.sample}: #{f.error}")
  }
}

Progress Tracking

Progress bars

# Process files and track progress manually
let files = glob("data/*.fq.gz")
let total = len(files)

let results = []
for i, f in enumerate(files) {
  let reads = read_fastq(f)
  let lengths = map(reads, |r| r.length)
  print("[#{i + 1}/#{total}] #{f}: #{len(reads)} reads")
  push(results, { file: f, reads: len(reads), mean_len: round(mean(lengths), 1) })
}

write_tsv(from_records(results), "file_stats.tsv")

Logging batch progress

# Log progress for monitoring long batches
let log_file = "batch_progress.log"

fn log(msg) {
  let line = "[#{now()}] #{msg}"
  write_text(line, log_file)
  print(line)
}

let samples = read_csv("data/sample_sheet.csv") |> collect
log("Starting batch: #{len(samples)} samples")

for i, sample in enumerate(samples) {
  let name = sample["sample_id"]
  log("Processing #{name} (#{i + 1}/#{len(samples)})")

  let t0 = now()
  # ... process sample ...
  let elapsed = now() - t0

  log("Completed #{name} in #{elapsed}s")
}

log("Batch complete")

Checkpointing

Resume-safe batch processing

# Skip already-processed samples on resume
let samples = read_csv("data/sample_sheet.csv") |> collect
let checkpoint_file = "batch_checkpoint.json"

# Load checkpoint
let completed = if file_exists(checkpoint_file) {
  read_json(checkpoint_file)
} else {
  []
}

let pending = filter(samples, |s| !contains(completed, s["sample_id"]))
print("Total: #{len(samples)}, Completed: #{len(completed)}, Pending: #{len(pending)}")

for sample in pending {
  let name = sample["sample_id"]
  print("Processing #{name}...")

  # ... do work ...
  read_fastq(sample["fastq_r1"])
    |> filter(|r| mean_phred(r.quality) >= 30.0)
    |> write_fastq("results/#{name}/filtered.fq.gz")

  # Mark as completed
  push(completed, name)
  write_json(completed, checkpoint_file)
}

print("Batch complete")

Aggregating Results

MultiQC-style summary

# Aggregate statistics across all samples
let result_dirs = list_dir("results")

let summary = map(result_dirs, |dir| {
  let sample = dir
  let stats_file = "results/#{dir}/qc_stats.json"

  if file_exists(stats_file) {
    let stats = read_json(stats_file)
    {
      sample: sample,
      total_reads: stats["total_reads"],
      pass_reads: stats["pass_reads"],
      mean_qual: stats["mean_quality"],
      gc_content: stats["gc_content"],
      status: "ok"
    }
  } else {
    { sample: sample, status: "missing" }
  }
})

let valid = filter(summary, |s| s.status == "ok")

print("Batch QC Summary (#{len(valid)} samples):")
print("  Total reads:   #{sum(map(valid, |s| s.total_reads))}")
print("  Mean quality:  #{round(mean(map(valid, |s| s.mean_qual)), 1)}")
print("  Mean GC:       #{round(mean(map(valid, |s| s.gc_content)), 3)}")

write_tsv(from_records(valid), "batch_qc_summary.tsv")