Streaming Large Files
Real bioinformatics datasets can be tens or hundreds of gigabytes. BioLang's streaming API lets you process files of any size in constant memory, using lazy evaluation and parallel execution.
What you will learn
- The difference between eager (
read_fastq) and lazy (fastq) APIs - Chaining stream operations: map, filter, take, drop
- Parallel processing with
par_map - Aggregation with
reducefor single-pass statistics - Streaming BAM, VCF, BED, and FASTA files
bl run examples/tutorials/streaming.bl
Step 1 — Eager vs Lazy Loading
The read_fastq() function loads an entire file into a table in memory. For small
files this is fine, but for a 50 GB FASTQ you would run out of RAM. The
fastq() function returns a lazy Stream that yields
records one at a time in constant memory.
# Table: loads everything into memory (reusable)
# let reads = read_fastq("huge.fastq.gz") # BAD: 50 GB in RAM
# Stream: constant memory regardless of file size
let stream = fastq("huge.fastq.gz")
print(typeof(stream)) # Stream
# Nothing has been read yet. The file is opened, but records are
# only read when you consume the stream.
Step 2 — Consuming a Stream
Each FASTQ stream record is a Record with fields:
id, description, seq (DNA),
length (Int), and quality (String).
# Count reads without loading them all
let n = fastq("huge.fastq.gz")
|> count()
print(f"Total reads: {n}") # counted one by one, constant memory
# Take the first N records
let first_100 = fastq("huge.fastq.gz")
|> take(100)
|> collect() # materializes into a List
print(f"First 100 reads loaded, {len(first_100)} records")
# Drop the first 1000, then take 100
let slice = fastq("huge.fastq.gz")
|> drop(1000)
|> take(100)
|> collect()
# Iterate manually with for
let total_bases = 0
for read in fastq("data/sample.fastq.gz") {
total_bases = total_bases + read.length
}
print(f"Total bases: {total_bases}")
Step 3 — Stream Transformations
Streams support the same map, filter, and flat_map
operations as lists, but they execute lazily.
# Filter and transform in a single pass through the file
let gc_values = fastq("huge.fastq.gz")
|> filter(|r| r.length >= 50)
|> map(|r| gc_content(r.seq))
|> collect()
print(f"GC values for long reads: {len(gc_values)}")
print(f"Mean GC: {round(mean(gc_values), 4)}")
# Chain multiple filters
let clean = fastq("huge.fastq.gz")
|> filter(|r| r.length >= 50) # minimum length
|> filter(|r| gc_content(r.seq) > 0.2) # GC content check
|> filter(|r| gc_content(r.seq) < 0.8) # not too GC-rich
|> collect()
print(f"Clean reads: {len(clean)}")
# For full quality trimming and filtering, use the file-level builtins:
# trim_quality(input_path, output_path, threshold)
# filter_reads(input_path, output_path, {min_length: 50, min_quality: 25})
# trim_reads(input_path, output_path, {leading: 5, trailing: 5})
Step 4 — Aggregation on Streams
Use reduce with an initial value to compute statistics in a single pass.
This is equivalent to a fold — the accumulator is threaded through each record.
# Compute statistics in a single pass using reduce with initial value
let stats = fastq("huge.fastq.gz")
|> reduce(|acc, read| {
{
n: acc.n + 1,
total_bases: acc.total_bases + read.length,
total_gc: acc.total_gc + gc_content(read.seq),
}
}, { n: 0, total_bases: 0, total_gc: 0.0 })
print(f"Reads: {stats.n}")
print(f"Total bases: {stats.total_bases}")
print(f"Mean GC: {round(stats.total_gc / stats.n, 4)}")
# Collect GC values and summarize
let gc_list = fastq("huge.fastq.gz")
|> map(|r| gc_content(r.seq))
|> collect()
print(f"Mean GC: {round(mean(gc_list), 4)}")
print(f"GC range: {min(gc_list)} - {max(gc_list)}")
Step 5 — Parallel Processing with par_map
For CPU-intensive operations, par_map distributes work across
multiple threads while keeping the stream interface.
# Process reads in parallel (default: number of CPU cores)
let results = fastq("huge.fastq.gz")
|> par_map(|read| {
# Each read is processed independently on a worker thread
let gc = gc_content(read.seq)
let k = kmers(read.seq, 5)
{ id: read.id, gc: gc, len: read.length, kmer_count: len(k) }
})
|> filter(|r| r.gc > 0.3)
|> collect()
print(f"Processed {len(results)} reads in parallel")
# Parallel BAM processing
# BAM stream records are AlignedRead values with computed properties
let bam_stats = bam("huge.bam")
|> par_map(|record| {
{
mapped: record.is_mapped,
qual: record.mapq,
paired: record.is_paired,
}
})
|> reduce(|acc, r| {
{
mapped: acc.mapped + if r.mapped { 1 } else { 0 },
unmapped: acc.unmapped + if not r.mapped { 1 } else { 0 },
paired: acc.paired + if r.paired { 1 } else { 0 },
}
}, { mapped: 0, unmapped: 0, paired: 0 })
print(f"Mapped: {bam_stats.mapped}, Unmapped: {bam_stats.unmapped}")
Step 6 — Chunked Processing
Use stream_chunks to break a stream into batches for
periodic reporting or batch operations.
# Process in chunks of 10,000 records
let chunks = fastq("huge.fastq.gz")
|> stream_chunks(10_000)
|> collect()
for chunk in chunks {
let gc_vals = chunk |> map(|r| gc_content(r.seq))
print(f"Chunk: {len(chunk)} reads, mean GC {round(mean(gc_vals), 3)}")
}
# Count with periodic progress
let total = 0
let chunks = fastq("huge.fastq.gz")
|> stream_chunks(50_000)
|> collect()
for chunk in chunks {
total = total + len(chunk)
print(f"Processed {total} reads...")
}
print(f"Done. Total: {total} reads")
Step 7 — Side Effects with tee
The tee function passes each element through a side-effect function
while forwarding the original value unchanged.
# Use tee to log while processing
let results = fastq("input.fastq.gz")
|> filter(|r| r.length >= 100)
|> tee(|r| print(f"Processing read {r.id}"))
|> map(|r| gc_content(r.seq))
|> collect()
print(f"Computed GC for {len(results)} reads")
Step 8 — Streaming Other Formats
All BioLang format readers return streams by default. Use the same
map, filter, reduce, collect
pipeline on any format.
# requires: alignments.bam, variants.vcf.gz, genome.fa, regions.bed in working directory
# Stream BAM files
# BAM records have fields: qname, flag, rname, pos, mapq, cigar, seq, qual
# Plus computed properties: is_mapped, is_paired, is_primary, aligned_length, etc.
let mapped_count = bam("alignments.bam")
|> filter(|r| r.is_mapped and r.mapq >= 20)
|> count()
print(f"High-quality mapped reads: {mapped_count}")
# Stream VCF files
# VCF records have fields: chrom, pos, id, ref, alt, qual, filter, info
let pass_count = vcf("variants.vcf.gz")
|> filter(|v| v.filter == "PASS")
|> count()
print(f"PASS variants: {pass_count}")
# Stream FASTA files (for large reference genomes)
let gc_per_chrom = fasta("genome.fa")
|> map(|record| {
{ name: record.id, gc: gc_content(record.seq) }
})
|> collect()
for entry in gc_per_chrom {
print(f"{entry.name}: GC = {round(entry.gc, 4)}")
}
# Stream a BED file
# BED records have fields: chrom, start, end, name, score, strand
let total_bases_covered = bed("regions.bed")
|> map(|r| r.end - r.start)
|> sum()
print(f"Total bases in BED regions: {total_bases_covered}")
Step 9 — Complete Streaming Pipeline
# streaming_qc.bio — production FASTQ QC with streaming
fn stream_qc(input, output, report_path) {
print(f"Starting streaming QC on {input}...")
# Pass 1: compute stats (single pass, constant memory)
let raw_stats = fastq(input)
|> reduce(|acc, r| {
let gc = gc_content(r.seq)
{
n: acc.n + 1,
bases: acc.bases + r.length,
gc_sum: acc.gc_sum + gc,
min_len: min(acc.min_len, r.length),
max_len: max(acc.max_len, r.length),
}
}, { n: 0, bases: 0, gc_sum: 0.0, min_len: 999999, max_len: 0 })
print(f"Raw: {raw_stats.n} reads, {raw_stats.bases} bases")
# Pass 2: filter and write using file-level functions
trim_quality(input, output, 20)
filter_reads(input, output, { min_length: 50, min_quality: 25 })
# Count clean reads
let clean_count = fastq(output) |> count()
let pass_rate = round(clean_count / raw_stats.n * 100.0, 1)
print(f"Clean: {clean_count} reads ({pass_rate}%)")
# Write report
let report = {
input: input,
output: output,
raw_reads: raw_stats.n,
raw_bases: raw_stats.bases,
clean_reads: clean_count,
pass_rate: pass_rate,
mean_gc: round(raw_stats.gc_sum / raw_stats.n, 4),
min_length: raw_stats.min_len,
max_length: raw_stats.max_len,
}
write_json(report_path, report)
print(f"Report saved to {report_path}")
}
stream_qc("huge.fastq.gz", "huge_clean.fastq.gz", "qc_report.json")
Key Takeaways
- Use
fastq(),bam(),vcf(),bed()for streaming — they return lazy streams in constant memory. - Stream operations are lazy — nothing executes until you consume with
collect(),count(),reduce(), or aforloop. - Use
par_mapfor CPU-bound transformations across multiple threads. - Use
reducewith an initial value for single-pass aggregation (fold pattern). - Use
stream_chunksfor batch processing with periodic output.
Next Steps
Learn how to query biological databases in the Querying Databases tutorial.