Intermediate ~25 minutes

Streaming Large Files

Real bioinformatics datasets can be tens or hundreds of gigabytes. BioLang's streaming API lets you process files of any size in constant memory, using lazy evaluation and parallel execution.

What you will learn

  • The difference between eager (read_fastq) and lazy (fastq) APIs
  • Chaining stream operations: map, filter, take, drop
  • Parallel processing with par_map
  • Aggregation with reduce for single-pass statistics
  • Streaming BAM, VCF, BED, and FASTA files
Run this tutorial: Download streaming.bl and run it with bl run examples/tutorials/streaming.bl

Step 1 — Eager vs Lazy Loading

The read_fastq() function loads an entire file into a table in memory. For small files this is fine, but for a 50 GB FASTQ you would run out of RAM. The fastq() function returns a lazy Stream that yields records one at a time in constant memory.

# Table: loads everything into memory (reusable)
# let reads = read_fastq("huge.fastq.gz")  # BAD: 50 GB in RAM

# Stream: constant memory regardless of file size
let stream = fastq("huge.fastq.gz")

print(typeof(stream))  # Stream

# Nothing has been read yet. The file is opened, but records are
# only read when you consume the stream.

Step 2 — Consuming a Stream

Each FASTQ stream record is a Record with fields: id, description, seq (DNA), length (Int), and quality (String).

# Count reads without loading them all
let n = fastq("huge.fastq.gz")
  |> count()

print(f"Total reads: {n}")  # counted one by one, constant memory

# Take the first N records
let first_100 = fastq("huge.fastq.gz")
  |> take(100)
  |> collect()  # materializes into a List

print(f"First 100 reads loaded, {len(first_100)} records")

# Drop the first 1000, then take 100
let slice = fastq("huge.fastq.gz")
  |> drop(1000)
  |> take(100)
  |> collect()

# Iterate manually with for
let total_bases = 0
for read in fastq("data/sample.fastq.gz") {
  total_bases = total_bases + read.length
}
print(f"Total bases: {total_bases}")

Step 3 — Stream Transformations

Streams support the same map, filter, and flat_map operations as lists, but they execute lazily.

# Filter and transform in a single pass through the file
let gc_values = fastq("huge.fastq.gz")
  |> filter(|r| r.length >= 50)
  |> map(|r| gc_content(r.seq))
  |> collect()

print(f"GC values for long reads: {len(gc_values)}")
print(f"Mean GC: {round(mean(gc_values), 4)}")

# Chain multiple filters
let clean = fastq("huge.fastq.gz")
  |> filter(|r| r.length >= 50)             # minimum length
  |> filter(|r| gc_content(r.seq) > 0.2)    # GC content check
  |> filter(|r| gc_content(r.seq) < 0.8)    # not too GC-rich
  |> collect()

print(f"Clean reads: {len(clean)}")

# For full quality trimming and filtering, use the file-level builtins:
# trim_quality(input_path, output_path, threshold)
# filter_reads(input_path, output_path, {min_length: 50, min_quality: 25})
# trim_reads(input_path, output_path, {leading: 5, trailing: 5})

Step 4 — Aggregation on Streams

Use reduce with an initial value to compute statistics in a single pass. This is equivalent to a fold — the accumulator is threaded through each record.

# Compute statistics in a single pass using reduce with initial value
let stats = fastq("huge.fastq.gz")
  |> reduce(|acc, read| {
    {
      n:           acc.n + 1,
      total_bases: acc.total_bases + read.length,
      total_gc:    acc.total_gc + gc_content(read.seq),
    }
  }, { n: 0, total_bases: 0, total_gc: 0.0 })

print(f"Reads:       {stats.n}")
print(f"Total bases: {stats.total_bases}")
print(f"Mean GC:     {round(stats.total_gc / stats.n, 4)}")

# Collect GC values and summarize
let gc_list = fastq("huge.fastq.gz")
  |> map(|r| gc_content(r.seq))
  |> collect()

print(f"Mean GC: {round(mean(gc_list), 4)}")
print(f"GC range: {min(gc_list)} - {max(gc_list)}")

Step 5 — Parallel Processing with par_map

For CPU-intensive operations, par_map distributes work across multiple threads while keeping the stream interface.

# Process reads in parallel (default: number of CPU cores)
let results = fastq("huge.fastq.gz")
  |> par_map(|read| {
    # Each read is processed independently on a worker thread
    let gc = gc_content(read.seq)
    let k = kmers(read.seq, 5)
    { id: read.id, gc: gc, len: read.length, kmer_count: len(k) }
  })
  |> filter(|r| r.gc > 0.3)
  |> collect()

print(f"Processed {len(results)} reads in parallel")

# Parallel BAM processing
# BAM stream records are AlignedRead values with computed properties
let bam_stats = bam("huge.bam")
  |> par_map(|record| {
    {
      mapped:  record.is_mapped,
      qual:    record.mapq,
      paired:  record.is_paired,
    }
  })
  |> reduce(|acc, r| {
    {
      mapped:   acc.mapped + if r.mapped { 1 } else { 0 },
      unmapped: acc.unmapped + if not r.mapped { 1 } else { 0 },
      paired:   acc.paired + if r.paired { 1 } else { 0 },
    }
  }, { mapped: 0, unmapped: 0, paired: 0 })

print(f"Mapped: {bam_stats.mapped}, Unmapped: {bam_stats.unmapped}")

Step 6 — Chunked Processing

Use stream_chunks to break a stream into batches for periodic reporting or batch operations.

# Process in chunks of 10,000 records
let chunks = fastq("huge.fastq.gz")
  |> stream_chunks(10_000)
  |> collect()

for chunk in chunks {
  let gc_vals = chunk |> map(|r| gc_content(r.seq))
  print(f"Chunk: {len(chunk)} reads, mean GC {round(mean(gc_vals), 3)}")
}

# Count with periodic progress
let total = 0
let chunks = fastq("huge.fastq.gz")
  |> stream_chunks(50_000)
  |> collect()

for chunk in chunks {
  total = total + len(chunk)
  print(f"Processed {total} reads...")
}
print(f"Done. Total: {total} reads")

Step 7 — Side Effects with tee

The tee function passes each element through a side-effect function while forwarding the original value unchanged.

# Use tee to log while processing
let results = fastq("input.fastq.gz")
  |> filter(|r| r.length >= 100)
  |> tee(|r| print(f"Processing read {r.id}"))
  |> map(|r| gc_content(r.seq))
  |> collect()

print(f"Computed GC for {len(results)} reads")

Step 8 — Streaming Other Formats

All BioLang format readers return streams by default. Use the same map, filter, reduce, collect pipeline on any format.

# requires: alignments.bam, variants.vcf.gz, genome.fa, regions.bed in working directory
# Stream BAM files
# BAM records have fields: qname, flag, rname, pos, mapq, cigar, seq, qual
# Plus computed properties: is_mapped, is_paired, is_primary, aligned_length, etc.
let mapped_count = bam("alignments.bam")
  |> filter(|r| r.is_mapped and r.mapq >= 20)
  |> count()

print(f"High-quality mapped reads: {mapped_count}")

# Stream VCF files
# VCF records have fields: chrom, pos, id, ref, alt, qual, filter, info
let pass_count = vcf("variants.vcf.gz")
  |> filter(|v| v.filter == "PASS")
  |> count()

print(f"PASS variants: {pass_count}")

# Stream FASTA files (for large reference genomes)
let gc_per_chrom = fasta("genome.fa")
  |> map(|record| {
    { name: record.id, gc: gc_content(record.seq) }
  })
  |> collect()

for entry in gc_per_chrom {
  print(f"{entry.name}: GC = {round(entry.gc, 4)}")
}

# Stream a BED file
# BED records have fields: chrom, start, end, name, score, strand
let total_bases_covered = bed("regions.bed")
  |> map(|r| r.end - r.start)
  |> sum()

print(f"Total bases in BED regions: {total_bases_covered}")

Step 9 — Complete Streaming Pipeline

# streaming_qc.bio — production FASTQ QC with streaming

fn stream_qc(input, output, report_path) {
  print(f"Starting streaming QC on {input}...")

  # Pass 1: compute stats (single pass, constant memory)
  let raw_stats = fastq(input)
    |> reduce(|acc, r| {
      let gc = gc_content(r.seq)
      {
        n:        acc.n + 1,
        bases:    acc.bases + r.length,
        gc_sum:   acc.gc_sum + gc,
        min_len:  min(acc.min_len, r.length),
        max_len:  max(acc.max_len, r.length),
      }
    }, { n: 0, bases: 0, gc_sum: 0.0, min_len: 999999, max_len: 0 })

  print(f"Raw: {raw_stats.n} reads, {raw_stats.bases} bases")

  # Pass 2: filter and write using file-level functions
  trim_quality(input, output, 20)
  filter_reads(input, output, { min_length: 50, min_quality: 25 })

  # Count clean reads
  let clean_count = fastq(output) |> count()
  let pass_rate = round(clean_count / raw_stats.n * 100.0, 1)
  print(f"Clean: {clean_count} reads ({pass_rate}%)")

  # Write report
  let report = {
    input:       input,
    output:      output,
    raw_reads:   raw_stats.n,
    raw_bases:   raw_stats.bases,
    clean_reads: clean_count,
    pass_rate:   pass_rate,
    mean_gc:     round(raw_stats.gc_sum / raw_stats.n, 4),
    min_length:  raw_stats.min_len,
    max_length:  raw_stats.max_len,
  }

  write_json(report_path, report)
  print(f"Report saved to {report_path}")
}

stream_qc("huge.fastq.gz", "huge_clean.fastq.gz", "qc_report.json")

Key Takeaways

  • Use fastq(), bam(), vcf(), bed() for streaming — they return lazy streams in constant memory.
  • Stream operations are lazy — nothing executes until you consume with collect(), count(), reduce(), or a for loop.
  • Use par_map for CPU-bound transformations across multiple threads.
  • Use reduce with an initial value for single-pass aggregation (fold pattern).
  • Use stream_chunks for batch processing with periodic output.

Next Steps

Learn how to query biological databases in the Querying Databases tutorial.