Streams

Streams are lazy iterators that process data one element at a time without loading everything into memory. In bioinformatics, where datasets routinely exceed available RAM, streams are essential for processing FASTQ files with billions of reads, BAM files spanning hundreds of gigabytes, and VCF files with millions of variants.

Streams vs. Lists

Lists are eager — all elements exist in memory simultaneously. Streams are lazy — elements are computed on demand and discarded after use:

# Eager: loads entire file into a Table (dangerous for large files)
let all_reads = read_fastq("huge.fq.gz")  # Table — may OOM

# Lazy: processes one read at a time (constant memory)
let read_stream = fastq("huge.fq.gz")  # Stream — safe
let filtered = read_stream
  |> filter(|r| mean_phred(r.quality) >= 30)
  |> take(1_000_000)
  |> collect()   # Only now are reads materialized

Creating Streams

From Files

# Stream biological file formats
let reads = fastq("sample.fq.gz")
let alignments = bam("aligned.bam")
let variants = vcf("calls.vcf.gz")
let sequences = fasta("reference.fa")
let intervals = bed("regions.bed")
let features = gff("annotations.gff3")

From Lists

# Convert a list to a lazy stream
let lazy = [1, 2, 3, 4, 5] |> to_stream()

# Useful for testing pipelines with small data
let test_stream = ["ACGT", "GGCC", "TTAA"] |> to_stream()
  |> map(|s| len(s))
  |> collect()   # [4, 4, 4]

Streaming Large Files

# The main use of streams is processing large bio files lazily.
# Each file builtin returns a Stream that reads one record at a time:
let hq_reads = fastq("reads.fq.gz")
  |> filter(|r| mean_phred(r.quality) >= 30)
  |> head(100)

# Streams compose — chain multiple file operations:
let read_lengths = fastq("sample.fq.gz")
  |> map(|r| len(r.sequence))
  |> collect()   # List of all read lengths

Stream Operations

Streams support the same functional operations as lists, but all operations are lazy until a terminal operation is called:

Lazy (Intermediate) Operations

let pipeline = fastq("reads.fq.gz")
  |> filter(|r| mean_phred(r.quality) >= 20)  # Lazy
  |> map(|r| r.sequence)                      # Lazy
  |> filter(|r| len(r) >= 50)                 # Lazy
  |> take(1_000_000)                          # Lazy — limits total
  |> skip(100)                                # Lazy — skips first 100

# Nothing has been read from disk yet!
# The pipeline is just a description of operations.

Terminal (Consuming) Operations

# collect — materialize into a List
let reads = pipeline |> collect()

# count — count elements
let n = pipeline |> count()

# reduce — fold all elements
let total_bases = pipeline |> map(|r| len(r)) |> reduce(|a, b| a + b)

# first / last — get single elements
let first_read = pipeline |> first()
let last_read = pipeline |> last()

# any / all — short-circuit boolean tests
let has_long = pipeline |> any(|r| len(r) > 10_000)
let all_hq = pipeline |> all(|r| mean_phred(r.quality) >= 30)

# to_table — collect into a Table
let summary = pipeline |> to_table()

# Process each element with a for loop
for read in fastq("reads.fq.gz") {
  print(read.name)
}

Stream Consumption

Streams are consumed exactly once. After a terminal operation, the stream is exhausted and cannot be reused:

let s = fastq("data.fq")

let n = s |> count()          # Consumes the stream
# let data = s |> collect()   # Error: stream already consumed

# To process a stream multiple times, create it again:
let s1 = fastq("data.fq")
let n = s1 |> count()

let s2 = fastq("data.fq")
let data = s2 |> collect()

Backpressure

BioLang streams implement natural backpressure. The producer only generates the next element when the consumer is ready. This prevents memory buildup in multi-stage pipelines:

# This pipeline uses constant memory regardless of input size.
# Each stage processes one element at a time:
let hq_count = bam("whole_genome.bam")  # Read one record
  |> filter(|r| r.mapq >= 30)           # Check quality
  |> filter(|r| r.flags.is_mapped)      # Check mapped flag
  |> count()

# The BAM file is never fully loaded — one record flows through at a time
print(f"High-quality mapped reads: {hq_count}")

File Streaming

Streaming I/O is the primary use case for streams in bioinformatics. BioLang automatically handles compressed formats:

# Stream compressed FASTQ, filter, and collect results
let hq_reads = fastq("input.fq.gz")
  |> filter(|r| mean_phred(r.quality) >= 30)
  |> collect()

# Stream BAM and count high-quality alignments
let mapped = bam("aligned.bam")
  |> filter(|r| r.mapq >= 20)
  |> count()

# Stream VCF and filter variants
let rare_variants = vcf("calls.vcf.gz")
  |> filter(|v| v.qual >= 30.0)
  |> filter(|v| v.filter == "PASS")
  |> collect()

Chunked Processing

Process streams in batches for operations that benefit from batch processing:

# Process FASTQ reads in chunks of 10,000
let chunks = fastq("reads.fq.gz")
  |> chunk(10_000)
  |> collect()

for batch in chunks {
  let avg_len = batch |> map(|r| len(r.sequence)) |> reduce(|a, b| a + b)
  let n = len(batch)
  print(f"Batch: {n} reads, avg length {avg_len / n}")
}

# Chunk FASTA sequences for batch processing
let batches = fasta("proteins.fa")
  |> chunk(1000)
  |> collect()

let total = batches |> map(|b| len(b)) |> reduce(|a, b| a + b)
print(f"Processed {total} sequences in {len(batches)} batches")

Combining Results from Multiple Streams

# Process multiple lanes and combine results
let lane1 = fastq("lane1.fq.gz") |> filter(|r| mean_phred(r.quality) >= 30) |> collect()
let lane2 = fastq("lane2.fq.gz") |> filter(|r| mean_phred(r.quality) >= 30) |> collect()
let lane3 = fastq("lane3.fq.gz") |> filter(|r| mean_phred(r.quality) >= 30) |> collect()

# Concatenate the collected lists
let all_reads = lane1 + lane2 + lane3
print(f"Total HQ reads across lanes: {len(all_reads)}")

# Or count each stream separately
let counts = [
  fastq("lane1.fq.gz") |> count(),
  fastq("lane2.fq.gz") |> count(),
  fastq("lane3.fq.gz") |> count()
]
print(f"Reads per lane: {counts}")

Stream Statistics

# Compute statistics using reduce in a single pass
let qualities = fastq("sample.fq.gz")
  |> map(|r| mean_phred(r.quality))
  |> collect()

let n = len(qualities)
let total = qualities |> reduce(|a, b| a + b)
let avg = total / n
print(f"Count: {n}, Mean quality: {avg}")

# Use first/last on sorted data
let lengths = fastq("sample.fq.gz")
  |> map(|r| len(r.sequence))
  |> collect()
  |> sort()

print(f"Min length: {first(lengths)}, Max length: {last(lengths)}")

Counting with Streams

# Count is the simplest terminal operation — it consumes the
# stream without materializing all elements in memory:
let total = fastq("large.fq.gz") |> count()
print(f"Total reads: {total}")

let hq = fastq("large.fq.gz")
  |> filter(|r| mean_phred(r.quality) >= 30)
  |> count()
print(f"High-quality reads: {hq} / {total}")