Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Chapter 14: Streams and Scale

Genomics data is large. A single whole-genome sequencing run produces hundreds of gigabytes of FASTQ. A population-scale VCF can hold billions of variant records. Loading everything into memory is not an option. BioLang addresses this with streams: lazy, single-pass iterators that process data record by record without materializing the entire dataset.

What Is a Stream?

A stream is a StreamValue – a lazy sequence of items produced on demand. File readers in BioLang return streams by default. Streams are consumed once: after you iterate through a stream, it is exhausted.

# read_fastq returns a stream, not a list
let reads = read_fastq("whole_genome_R1.fastq.gz")

# This processes one record at a time -- constant memory
let high_quality = reads
  |> filter(|r| mean_phred(r.quality) >= 30)
  |> map(|r| {id: r.id, length: r.length, gc: gc_content(r.seq)})
  |> write_tsv("read_stats.tsv")

No matter how large the FASTQ file is, this script uses the same amount of memory. Each record flows through filter, then map, then is written out before the next record is read.

Stream Operations

Streams support the same functional operations as lists, but they execute lazily.

map

Transform each element.

read_vcf("variants.vcf.gz")
  |> map(|v| {chrom: v.chrom, pos: v.pos, qual: v.qual, af: v.info.AF})
  |> write_tsv("variant_summary.tsv")

filter

Keep only elements matching a predicate.

read_bam("aligned.bam")
  |> filter(|r| r.mapping_quality >= 30 and not r.is_duplicate)
  |> count()
  |> |n| print(str(n) + " high-quality unique alignments")

take_while

Consume from a stream while a condition holds, then stop.

# Read the first million reads from a FASTQ for quick QC
let sample = read_fastq("deep_sequencing.fastq.gz")
  |> take_while(|r, i| i < 1000000)

let quals = sample |> map(|r| mean_phred(r.quality))
print("Sampled stats: " + str(mean(quals)) + " mean Q")

each

Execute a side effect for every element (the stream analogue of a for loop).

let bad_count = 0
read_fastq("sample.fastq.gz")
  |> each(|r| {
       if mean_phred(r.quality) < 20 then
         bad_count = bad_count + 1
     })
print(str(bad_count) + " low-quality reads")

fold / reduce

Accumulate a value across the entire stream.

let total_bases = read_fastq("sample.fastq.gz")
  |> map(|r| r.length)
  |> sum()
print("Total bases: " + str(total_bases))

Chunked Processing

When you need batch operations on a stream, chunk groups elements into fixed-size lists. This is useful for writing output in blocks or batching API calls.

# Process a FASTQ in chunks of 10,000 reads
read_fastq("large_sample.fastq.gz")
  |> chunk(10000)
  |> each(|batch| {
       let mean_q = batch |> map(|r| mean_phred(r.quality)) |> mean()
       print("Batch: " + str(len(batch)) + " reads, "
             + "mean Q=" + str(mean_q))
     })

Chunking is also useful for writing to multiple output files.

# Split a FASTQ into files of 1 million reads each
let file_num = 0
read_fastq("huge_sample.fastq.gz")
  |> chunk(1000000)
  |> each(|batch| {
       let path = "split/chunk_" + str(file_num) + ".fastq.gz"
       batch |> write_fastq(path)
       file_num = file_num + 1
     })
print("Wrote " + str(file_num) + " chunk files")

Window Operations

window creates a sliding window over a list (or materialized portion of a stream). This is essential for computing positional statistics across a genome.

# Sliding window GC content
read_fasta("chr1.fa") |> first() |> |r| r.seq |> into sequence
let win_size = 1000
let step = 500

let gc_track = window(sequence, win_size)
  |> enumerate()
  |> map(|pair| {
       pos: pair[0] * step,
       gc: gc_content(pair[1]),
     })

gc_track |> write_tsv("chr1_gc_content.tsv")

For windowed operations on streams (where you cannot look back), use sliding_window which maintains a buffer.

# Compute rolling average base quality across a BAM
read_bam("sample.bam")
  |> filter(|r| r.chrom == "chr17")
  |> sort_by(|r| r.pos)
  |> window(100)
  |> map(|win| {
       pos: win[50].pos,
       mean_mapq: mean(win |> map(|r| r.mapping_quality)),
     })
  |> write_tsv("chr17_mapq_rolling.tsv")

Parallel Operations

par_map

Apply a function to each element of a list in parallel, distributing work across available CPU cores.

let chromosomes = ["chr1", "chr2", "chr3", "chr4", "chr5", "chr6",
                   "chr7", "chr8", "chr9", "chr10", "chr11", "chr12",
                   "chr13", "chr14", "chr15", "chr16", "chr17", "chr18",
                   "chr19", "chr20", "chr21", "chr22", "chrX", "chrY"]

let per_chrom_stats = par_map(chromosomes, |chrom| {
  let reads = read_bam("sample.bam") |> filter(|r| r.chrom == chrom)
  let count = reads |> count()
  let depth = tool("samtools", "depth -r " + chrom + " sample.bam") |> col("depth") |> mean()
  {chrom: chrom, reads: count, mean_depth: depth}
})

per_chrom_stats |> sort_by(|s| s.chrom) |> write_tsv("chrom_stats.tsv")

par_filter

Filter elements in parallel. Useful when the predicate itself is expensive.

let variants = tsv("all_variants.tsv")

let pathogenic = par_filter(variants, |v| {
  let vep = ensembl_vep(v.chrom + ":" + str(v.pos) + ":"
                        + v.ref + ":" + v.alt)
  let worst = vep.consequences |> first()
  worst.impact == "HIGH"
})

print(str(len(pathogenic)) + " high-impact variants")

Unit Helpers

BioLang provides genomic unit helpers that make size comparisons readable.

let region_size = mb(3)          # 3,000,000
let read_length = bp(150)        # 150
let window = kb(50)              # 50,000
let genome_size = gb(3)          # 3,000,000,000

# Use in calculations
let coverage = 1800000000 / genome_size  # ~0.6x
print("Coverage: " + str(coverage) + "x")

# Filter regions by size
let large_svs = read_vcf("structural_variants.vcf.gz")
  |> filter(|v| v.info.SVLEN != None and abs(v.info.SVLEN) > mb(1))

print(str(large_svs |> count()) + " SVs larger than 1 Mb")

These helpers are simple multipliers but they prevent off-by-three-zeros bugs that plague genomics scripts.

Example: Process a 100GB FASTQ

Stream 4 billion reads, filter by quality, and compute statistics without loading the file into memory.

let input = "novaseq_R1.fastq.gz"  # 100 GB compressed

let stats = {
  total: 0,
  passed: 0,
  total_bases: 0,
  gc_bases: 0,
  qual_sum: 0,
}

read_fastq(input)
  |> each(|r| {
       stats.total = stats.total + 1
       let q = mean_phred(r.quality)
       if q >= 30 then {
         stats.passed = stats.passed + 1
         stats.total_bases = stats.total_bases + r.length
         let gc = gc_content(dna(r.seq)) * r.length
         stats.gc_bases = stats.gc_bases + gc
         stats.qual_sum = stats.qual_sum + q
       }
     })

let pct_pass = stats.passed / stats.total * 100
let gc = stats.gc_bases / stats.total_bases * 100
let mean_q = stats.qual_sum / stats.passed

print("Total reads:  " + str(stats.total))
print("Passed Q>=30: " + str(stats.passed) + " (" + str(pct_pass) + "%)")
print("Mean quality: " + str(mean_q))
print("GC content:   " + str(gc) + "%")

This script processes the entire file in a single pass. Memory usage stays constant regardless of file size.

Example: Parallel Variant Annotation Across Chromosomes

Split variant annotation work by chromosome to use all cores.

let vcf_path = "cohort.vcf.gz"
let chromosomes = range(1, 23) |> map(|n| "chr" + str(n))
let chromosomes = concat(chromosomes, ["chrX", "chrY"])

let annotated = par_map(chromosomes, |chrom| {
  let variants = read_vcf(vcf_path)
    |> filter(|v| v.chrom == chrom)
    |> collect()

  let results = variants |> map(|v| {
    let vep = ensembl_vep(v.chrom + ":" + str(v.pos) + ":" + v.ref + ":" + v.alt)
    let worst = vep.consequences
      |> sort_by(|c| c.impact_rank)
      |> first()
    {
      ...v,
      consequence: worst.consequence,
      impact: worst.impact,
      gene: worst.gene_symbol,
    }
  })

  print(chrom + ": " + str(len(results)) + " variants annotated")
  results
})
  |> flatten()

annotated |> write_tsv("annotated_variants.tsv")
print("Annotated " + str(len(annotated)) + " variants across "
      + str(len(chromosomes)) + " chromosomes")

Example: Sliding Window GC Content

Compute GC content in 1 kb windows across an entire chromosome.

let chr_seq = read_fasta("GRCh38_chr22.fa") |> first() |> |r| r.seq
let chr_len = len(chr_seq)
let win = kb(1)
let step = bp(500)

print("Chromosome 22: " + str(chr_len) + " bp")
print("Computing GC in " + str(win) + " bp windows, step " + str(step))

let gc_profile = window(chr_seq, win)
  |> enumerate()
  |> map(|pair| {
       start: pair[0] * step,
       end: pair[0] * step + win,
       gc: gc_content(pair[1]),
     })

# Identify GC-rich and GC-poor regions
let gc_rich = gc_profile |> filter(|w| w.gc > 0.60)
let gc_poor = gc_profile |> filter(|w| w.gc < 0.35)

print("GC-rich windows (>60%): " + str(len(gc_rich)))
print("GC-poor windows (<35%): " + str(len(gc_poor)))

gc_profile |> write_tsv("chr22_gc_profile.tsv")

Example: Batch Processing Thousands of Samples

Process a large sample cohort using chunked parallelism to avoid overwhelming system resources.

let manifest = tsv("sample_manifest.tsv")  # 2,000 samples
print("Processing " + str(len(manifest)) + " samples")

let batch_size = 16  # process 16 samples at a time

let all_results = manifest
  |> chunk(batch_size)
  |> enumerate()
  |> map(|pair| {
       let batch_idx = pair[0]
       let batch = pair[1]
       print("Batch " + str(batch_idx + 1) + "/"
             + str(ceil(len(manifest) / batch_size)))

       par_map(batch, |sample| {
         # Align
         let sam = tool("bwa-mem2", "mem -t 2 GRCh38.fa " + sample.r1 + " " + sample.r2)
         let bam = tool("samtools", "sort -@ 2 -o " + sample.sample_id + ".sorted.bam " + sam)
         tool("samtools", "index " + bam)

         # QC metrics
         let flagstat = tool("samtools", "flagstat " + bam)
         let depth_vals = tool("samtools", "depth " + bam)
         let mean_depth = depth_vals |> col("depth") |> mean()

         {
           id: sample.sample_id,
           bam: bam,
           mean_depth: mean_depth,
           status: if mean_depth > 20
                   then "PASS" else "FAIL",
         }
       })
     })
  |> flatten()

let passed = all_results |> filter(|r| r.status == "PASS")
let failed = all_results |> filter(|r| r.status == "FAIL")

print("Results: " + str(len(passed)) + " PASS, "
      + str(len(failed)) + " FAIL")

if len(failed) > 0 then {
  print("Failed samples:")
  failed |> each(|r| print("  " + r.id + " depth=" + str(r.mean_depth)
                           + " mapped=" + str(r.mapped_pct) + "%"))
}

all_results |> write_tsv("cohort_qc_results.tsv")

Memory Patterns to Know

PatternMemoryUse When
read_fastq(f) |> filter(...) |> count()O(1)Counting, simple stats
read_vcf(f) |> collect()O(n)Need random access
read_bam(f) |> chunk(k) |> each(...)O(k)Batch writing, API calls
par_map(list, f)O(n)List already in memory
stream |> fold(init, f)O(1)Aggregation

The rule of thumb: stay in stream mode as long as possible. Call collect() only when you genuinely need the full dataset in memory (for sorting, random access, or passing to a function that requires a list).

Summary

Streams let BioLang handle files of any size in constant memory. Combine them with chunk for batch processing, window for positional analysis, and par_map/par_filter for multi-core parallelism. The unit helpers bp(), kb(), mb(), and gb() keep genomic arithmetic readable. Together, these tools let you write scripts that scale from a test FASTQ to a population-scale dataset without changing the code.