Lazy Streams

Streams are lazy iterators that process data one element at a time without loading everything into memory. In bioinformatics, where datasets routinely exceed available RAM, streams are essential for processing FASTQ files with billions of reads, BAM files spanning hundreds of gigabytes, and VCF files with millions of variants.

Streams from File Readers

BioLang's file reader builtins return streams. No data is read from disk until the stream is consumed:

# Each of these returns a Stream -- nothing is read yet
let reads = read_fastq("data/reads.fastq")
let sequences = read_fasta("data/sequences.fasta")
let variants = read_vcf("data/variants.vcf")
let intervals = read_bed("data/regions.bed")
let features = read_gff("data/annotations.gff")

Lazy Evaluation

Stream operations are lazy — they describe a pipeline of transformations but do not execute until a terminal operation (like collect()) is called:

# Build a pipeline -- nothing is read from disk yet
let pipeline = read_fastq("data/reads.fastq")
  |> filter(|r| mean_phred(r.quality) >= 20)
  |> map(|r| r.sequence)
  |> filter(|s| len(s) >= 50)
  |> take(1000)

# Only now are reads actually processed:
let results = pipeline |> collect()
println(f"Got {len(results)} sequences")

Stream Operations

filter

Keep elements that match a predicate:

let hq = read_fastq("data/reads.fastq")
  |> filter(|r| mean_phred(r.quality) >= 30)
  |> collect()
println(f"High quality reads: {len(hq)}")

map

Transform each element:

let lengths = read_fastq("data/reads.fastq")
  |> map(|r| len(r.sequence))
  |> collect()
println(f"Read lengths: {lengths}")

take and skip

Limit or skip elements in the stream:

# Take the first 100 reads
let first_100 = read_fastq("data/reads.fastq")
  |> take(100)
  |> collect()
println(f"First 100: {len(first_100)} reads")

# Skip the first 50, then take 10
let sample = read_fastq("data/reads.fastq")
  |> skip(50)
  |> take(10)
  |> collect()
println(f"Sample: {len(sample)} reads")

Consuming Streams

Terminal operations consume the stream and produce a final value:

collect

collect() materializes all elements into a list:

let all_reads = read_fastq("data/reads.fastq")
  |> collect()
println(f"Total reads: {len(all_reads)}")

each

each() iterates over each element for side effects:

read_fastq("data/reads.fastq")
  |> filter(|r| mean_phred(r.quality) >= 30)
  |> take(5)
  |> each(|r| println(f"Read: {r.name}, len={len(r.sequence)}"))

reduce

reduce() folds all elements into a single value:

let total_bases = read_fastq("data/reads.fastq")
  |> map(|r| len(r.sequence))
  |> reduce(|a, b| a + b)
println(f"Total bases: {total_bases}")

One-Time Use

Streams can only be consumed once. After a terminal operation, the stream is exhausted and cannot be reused:

let s = read_fastq("data/reads.fastq")

# First consumption works
let data = s |> collect()
println(f"Got {len(data)} reads")

# To process again, create a new stream
let s2 = read_fastq("data/reads.fastq")
let count = s2 |> map(|r| 1) |> reduce(|a, b| a + b)
println(f"Count: {count}")

Combining Multiple Streams

# Process multiple files and combine results
let lane1 = read_fastq("data/lane1.fastq")
  |> filter(|r| mean_phred(r.quality) >= 30)
  |> collect()

let lane2 = read_fastq("data/lane2.fastq")
  |> filter(|r| mean_phred(r.quality) >= 30)
  |> collect()

# Concatenate the materialized lists
let all_hq = lane1 ++ lane2
println(f"Total HQ reads: {len(all_hq)}")

Real-World Example

A complete quality-filtering pipeline using streams:

# Filter FASTQ reads by quality and length
let hq = read_fastq("data/reads.fastq")
  |> filter(|r| mean_phred(r.quality) >= 30)
  |> filter(|r| len(r.sequence) >= 50)
  |> collect()

println(f"High quality reads: {len(hq)}")

# Compute statistics on the filtered reads
let lengths = hq |> map(|r| len(r.sequence))
let total_bases = lengths |> reduce(|a, b| a + b)
println(f"Total bases: {total_bases}")
println(f"Mean read length: {total_bases / len(hq)}")

Stream Statistics

# Collect quality scores from a stream, then compute stats
let qualities = read_fastq("data/reads.fastq")
  |> map(|r| mean_phred(r.quality))
  |> collect()

println(f"Reads: {len(qualities)}")
println(f"Mean quality: {mean(qualities)}")
println(f"Min quality: {min(qualities)}")
println(f"Max quality: {max(qualities)}")