Batch Processing
Processing multiple samples is the norm in bioinformatics. BioLang provides glob patterns, sample sheet parsing, parallel execution, and progress tracking to handle batch workflows elegantly.
Finding Input Files
Glob patterns
# Find all FASTQ files
let fastqs = glob("data/*.fq.gz")
print("Found #{len(fastqs)} FASTQ files")
# Find paired-end files
let r1_files = glob("data/*_R1.fq.gz") |> sort
let r2_files = glob("data/*_R2.fq.gz") |> sort
# Pair them up
let pairs = zip(r1_files, r2_files) |> map(|pair| {
let sample = replace(pair[0], "_R1.fq.gz", "")
{ sample: sample, r1: pair[0], r2: pair[1] }
})
for p in pairs {
print("#{p.sample}: #{p.r1}, #{p.r2}")
}
Sample sheet driven
# Load samples from a sample sheet
let samples = read_csv("data/sample_sheet.csv")
# Validate all files exist before starting
let missing = []
for s in samples {
if !file_exists(s["fastq_r1"]) { push(missing, s["fastq_r1"]) }
if !file_exists(s["fastq_r2"]) { push(missing, s["fastq_r2"]) }
}
if len(missing) > 0 {
print("Missing files:")
for f in missing { print(" #{f}") }
exit(1)
}
print("All #{len(samples)} samples validated, #{len(samples) * 2} files found")
Sequential Batch Processing
Processing samples one at a time
# Process each sample sequentially with progress tracking
let samples = read_csv("data/sample_sheet.csv") |> collect
let total = len(samples)
for i, sample in enumerate(samples) {
let name = sample["sample_id"]
let out_dir = "results/#{name}"
mkdir(out_dir)
print("[#{i + 1}/#{total}] Processing #{name}...")
# Quality filtering
let in_count = read_fastq(sample["fastq_r1"]) |> count
read_fastq(sample["fastq_r1"])
|> filter(|r| mean_phred(r.quality) >= 30.0 && len(r.seq) >= 50)
|> write_read_fastq("#{out_dir}/filtered_R1.fq.gz")
let out_count = read_fastq("#{out_dir}/filtered_R1.fq.gz") |> count
let pct_pass = round(float(out_count) / float(in_count) * 100.0, 1)
print(" #{name}: #{in_count} -> #{out_count} reads (#{pct_pass}% passed)")
}
print("Batch complete: #{total} samples processed")
Parallel Batch Processing
Processing samples in parallel
# Process multiple samples concurrently
let samples = read_csv("data/sample_sheet.csv") |> collect
fn process_sample(sample) {
let name = sample["sample_id"]
let out_dir = "results/#{name}"
mkdir(out_dir)
# Count total reads and compute mean length
let reads = read_read_fastq(sample["fastq_r1"])
let total = len(reads)
let lengths = map(reads, |r| r.length)
let avg_len = round(mean(lengths), 1)
# Filter and write
read_fastq(sample["fastq_r1"])
|> filter(|r| mean_phred(r.quality) >= 30.0)
|> write_read_fastq("#{out_dir}/filtered.fq.gz")
let filtered_count = read_fastq("#{out_dir}/filtered.fq.gz") |> count
{
sample: name,
total_reads: total,
mean_length: avg_len,
passed_reads: filtered_count,
pass_rate: round(float(filtered_count) / float(total) * 100.0, 1)
}
}
# Run up to 4 samples in parallel
let results = par_map(samples, process_sample)
# Summary report
write_tsv(from_records(results), "batch_summary.tsv")
print("\nBatch Summary:")
print(" Samples processed: #{len(results)}")
print(" Total reads: #{sum(map(results, |r| r.total_reads))}")
print(" Mean pass rate: #{round(mean(map(results, |r| r.pass_rate)), 1)}%")
Parallel with error handling
# Handle failures gracefully in parallel processing
let samples = read_csv("data/sample_sheet.csv") |> collect
let results = par_map(samples, |sample| {
try {
let name = sample["sample_id"]
let n_reads = read_fastq(sample["fastq_r1"]) |> count
{ sample: name, status: "ok", reads: n_reads, error: nil }
} catch e {
{ sample: sample["sample_id"], status: "failed", reads: 0, error: to_string(e) }
}
})
let succeeded = filter(results, |r| r.status == "ok")
let failed = filter(results, |r| r.status == "failed")
print("Succeeded: #{len(succeeded)}")
print("Failed: #{len(failed)}")
if len(failed) > 0 {
print("\nFailed samples:")
for f in failed {
print(" #{f.sample}: #{f.error}")
}
}
Progress Tracking
Progress bars
# Process files and track progress manually
let files = glob("data/*.fq.gz")
let total = len(files)
let results = []
for i, f in enumerate(files) {
let reads = read_fastq(f)
let lengths = map(reads, |r| r.length)
print("[#{i + 1}/#{total}] #{f}: #{len(reads)} reads")
push(results, { file: f, reads: len(reads), mean_len: round(mean(lengths), 1) })
}
write_tsv(from_records(results), "file_stats.tsv")
Logging batch progress
# Log progress for monitoring long batches
let log_file = "batch_progress.log"
fn log(msg) {
let line = "[#{now()}] #{msg}"
write_text(line, log_file)
print(line)
}
let samples = read_csv("data/sample_sheet.csv") |> collect
log("Starting batch: #{len(samples)} samples")
for i, sample in enumerate(samples) {
let name = sample["sample_id"]
log("Processing #{name} (#{i + 1}/#{len(samples)})")
let t0 = now()
# ... process sample ...
let elapsed = now() - t0
log("Completed #{name} in #{elapsed}s")
}
log("Batch complete")
Checkpointing
Resume-safe batch processing
# Skip already-processed samples on resume
let samples = read_csv("data/sample_sheet.csv") |> collect
let checkpoint_file = "batch_checkpoint.json"
# Load checkpoint
let completed = if file_exists(checkpoint_file) {
read_json(checkpoint_file)
} else {
[]
}
let pending = filter(samples, |s| !contains(completed, s["sample_id"]))
print("Total: #{len(samples)}, Completed: #{len(completed)}, Pending: #{len(pending)}")
for sample in pending {
let name = sample["sample_id"]
print("Processing #{name}...")
# ... do work ...
read_fastq(sample["fastq_r1"])
|> filter(|r| mean_phred(r.quality) >= 30.0)
|> write_fastq("results/#{name}/filtered.fq.gz")
# Mark as completed
push(completed, name)
write_json(completed, checkpoint_file)
}
print("Batch complete")
Aggregating Results
MultiQC-style summary
# Aggregate statistics across all samples
let result_dirs = list_dir("results")
let summary = map(result_dirs, |dir| {
let sample = dir
let stats_file = "results/#{dir}/qc_stats.json"
if file_exists(stats_file) {
let stats = read_json(stats_file)
{
sample: sample,
total_reads: stats["total_reads"],
pass_reads: stats["pass_reads"],
mean_qual: stats["mean_quality"],
gc_content: stats["gc_content"],
status: "ok"
}
} else {
{ sample: sample, status: "missing" }
}
})
let valid = filter(summary, |s| s.status == "ok")
print("Batch QC Summary (#{len(valid)} samples):")
print(" Total reads: #{sum(map(valid, |s| s.total_reads))}")
print(" Mean quality: #{round(mean(map(valid, |s| s.mean_qual)), 1)}")
print(" Mean GC: #{round(mean(map(valid, |s| s.gc_content)), 3)}")
write_tsv(from_records(valid), "batch_qc_summary.tsv")