Population Genetics
Population genetics analysis in BioLang covers allele frequency computation, Hardy-Weinberg equilibrium testing, population differentiation metrics, and principal component analysis on genotype matrices.
Allele Frequencies
Computing allele frequencies from a VCF
# Calculate allele frequencies across all samples
let vcf = read_vcf("population.vcf.gz")
let freqs = vcf
|> filter(|v| { v.filter == "PASS" && is_snp(v) })
|> map(|v| {
let gt_field = v.info["GT"] ?? ""
let gts = gt_field |> split(",") |> map(|g| { g |> trim })
let n_hom_ref = gts |> filter(|g| { g == "0/0" }) |> len
let n_het = gts |> filter(|g| { g == "0/1" || g == "1/0" }) |> len
let n_hom_alt = gts |> filter(|g| { g == "1/1" }) |> len
let allele_count = (n_hom_ref + n_het + n_hom_alt) * 2
let alt_count = n_het + n_hom_alt * 2
{
chrom: v.chrom,
pos: v.pos,
ref: v.ref,
alt: v.alt,
af: alt_count |> float / allele_count |> float,
an: allele_count,
ac: alt_count
}
})
freqs |> write_tsv("allele_frequencies.tsv")
print(f"Computed frequencies for {freqs |> len} variants")
Minor allele frequency spectrum
# Generate a site frequency spectrum (SFS)
let freqs = tsv("allele_frequencies.tsv")
let maf = freqs
|> map(|f| {
let af = f["af"] |> float
if af > 0.5 { 1.0 - af } else { af }
})
# Bin into MAF categories
let bins = [0.0, 0.01, 0.05, 0.10, 0.20, 0.30, 0.50]
let sfs = maf |> histogram(edges: bins)
print("Minor Allele Frequency Spectrum:")
for bin in sfs {
let bar = "*" |> repeat(bin.count / 100)
print(f" {bin.label}: {bin.count} {bar}")
}
Hardy-Weinberg Equilibrium
HWE exact test
# Test each variant for Hardy-Weinberg equilibrium
let vcf = read_vcf("population.vcf.gz")
let hwe_results = vcf
|> filter(|v| { v.filter == "PASS" && is_snp(v) })
|> map(|v| {
let gt_field = v.info["GT"] ?? ""
let gts = gt_field |> split(",") |> map(|g| { g |> trim })
let n_hom_ref = gts |> filter(|g| { g == "0/0" }) |> len
let n_het = gts |> filter(|g| { g == "0/1" || g == "1/0" }) |> len
let n_hom_alt = gts |> filter(|g| { g == "1/1" }) |> len
let n_total = n_hom_ref + n_het + n_hom_alt
let p = (2.0 * n_hom_ref |> float + n_het |> float) / (2.0 * n_total |> float)
let q = 1.0 - p
let exp_hom_ref = p * p * n_total |> float
let exp_het = 2.0 * p * q * n_total |> float
let exp_hom_alt = q * q * n_total |> float
let obs_het = n_het |> float / n_total |> float
# Chi-square HWE test
let chi2 = chi_square(
[n_hom_ref |> float, n_het |> float, n_hom_alt |> float],
[exp_hom_ref, exp_het, exp_hom_alt]
)
{
chrom: v.chrom,
pos: v.pos,
obs_het: obs_het,
exp_het: exp_het / n_total |> float,
p_value: chi2.p_value,
in_hwe: chi2.p_value > 0.001
}
})
let out_of_hwe = hwe_results |> filter(|r| { !r.in_hwe }) |> len
print(f"Variants out of HWE (p < 0.001): {out_of_hwe}")
hwe_results |> write_tsv("hwe_results.tsv")
Observed vs expected heterozygosity
let hwe = tsv("hwe_results.tsv")
let obs_mean = hwe |> map(|r| { r["obs_het"] |> float }) |> mean
let exp_mean = hwe |> map(|r| { r["exp_het"] |> float }) |> mean
let f_is = 1.0 - (obs_mean / exp_mean)
print(f"Mean observed heterozygosity: {obs_mean |> round(4)}")
print(f"Mean expected heterozygosity: {exp_mean |> round(4)}")
print(f"Inbreeding coefficient (Fis): {f_is |> round(4)}")
Population Differentiation
Weir-Cockerham Fst
# Calculate Fst between two populations
let vcf = read_vcf("merged_populations.vcf.gz")
let pop_map = tsv("population_map.tsv") # sample_id, population
let pop1_samples = pop_map |> filter(|r| { r["population"] == "POP1" }) |> map(|x| x.sample_id) |> collect
let pop2_samples = pop_map |> filter(|r| { r["population"] == "POP2" }) |> map(|x| x.sample_id) |> collect
let fst_values = vcf
|> filter(|v| { v.filter == "PASS" && is_snp(v) })
|> map(|v| {
# Parse genotypes from info field
let gt_field = v.info["GT"] ?? ""
let gts = gt_field |> split(",") |> map(|g| { g |> trim })
# Compute allele frequencies per population
let p1_gts = pop1_samples |> map(|s| { gts[s] ?? "." })
|> filter(|g| { g != "." })
let p2_gts = pop2_samples |> map(|s| { gts[s] ?? "." })
|> filter(|g| { g != "." })
let af1 = if p1_gts |> len > 0 {
let alt1 = p1_gts |> filter(|g| { g == "0/1" || g == "1/0" }) |> len |> float + p1_gts |> filter(|g| { g == "1/1" }) |> len |> float * 2.0
alt1 / (p1_gts |> len |> float * 2.0)
} else { nil }
let af2 = if p2_gts |> len > 0 {
let alt2 = p2_gts |> filter(|g| { g == "0/1" || g == "1/0" }) |> len |> float + p2_gts |> filter(|g| { g == "1/1" }) |> len |> float * 2.0
alt2 / (p2_gts |> len |> float * 2.0)
} else { nil }
# Hudson Fst estimator: (af1 - af2)^2 / (af_mean * (1 - af_mean))
let fst = if af1 != nil && af2 != nil {
let af_mean = (af1 + af2) / 2.0
if af_mean > 0.0 && af_mean < 1.0 {
pow(af1 - af2, 2) / (af_mean * (1.0 - af_mean))
} else { nil }
} else { nil }
{ chrom: v.chrom, pos: v.pos, fst: fst }
})
|> filter(|r| { r.fst != nil })
let genome_fst = fst_values |> map(|x| x.fst) |> mean
print(f"Genome-wide Fst: {genome_fst |> round(4)}")
# Find high-Fst outlier regions
let outliers = fst_values |> filter(|r| { r.fst > 0.5 })
print(f"High-Fst outlier SNPs (>0.5): {outliers |> len}")
outliers |> write_tsv("fst_outliers.tsv")
Windowed Fst
# Fst in sliding windows for selection scans
let fst_data = tsv("fst_per_snp.tsv")
let window_size = 50000
let step = 25000
let windowed = fst_data
|> group_by("chrom")
|> flat_map(|chrom, snps| {
let sorted = snps |> sort_by(|a, b| (a["pos"] |> int) - (b["pos"] |> int))
let max_pos = sorted |> last |> get("pos") |> int
let starts = range(0, max_pos, step)
starts |> map(|start| {
let end = start + window_size
let items = sorted |> filter(|s| {
let p = s["pos"] |> int
p >= start && p < end
})
{ chrom: chrom, mid: start + window_size / 2, n_snps: items |> len,
mean_fst: items |> map(|x| { x["fst"] |> float }) |> mean,
max_fst: items |> map(|x| { x["fst"] |> float }) |> max }
})
})
|> filter(|w| { w.n_snps >= 5 })
windowed |> write_tsv("windowed_fst.tsv")
# Report top windows
windowed
|> sort_by(|a, b| b.mean_fst - a.mean_fst)
|> take(10)
|> each(|w| { print(f"{w.chrom}:{w.mid} Fst={w.mean_fst |> round(3)} (n={w.n_snps})") })
Principal Component Analysis
PCA on genotype matrix
# Build genotype matrix and run PCA
let vcf = read_vcf("population.vcf.gz")
# Build numeric genotype matrix (0, 1, 2 encoding)
# Filter for common variants (MAF > 0.05)
let filtered = vcf
|> filter(|v| { v.filter == "PASS" && is_snp(v) })
|> map(|v| {
let gt_field = v.info["GT"] ?? ""
let gts = gt_field |> split(",") |> map(|g| { g |> trim })
let n_total = gts |> len
let n_alt = gts |> filter(|g| { g == "0/1" || g == "1/0" }) |> len + gts |> filter(|g| { g == "1/1" }) |> len * 2
let af = n_alt |> float / (n_total * 2) |> float
{ v: v, gts: gts, af: af }
})
|> filter(|x| { x.af > 0.05 && x.af < 0.95 })
let geno_rows = filtered |> map(|x| {
x.gts |> map(|g| {
match g {
"0/0" => 0,
"0/1" => 1,
"1/0" => 1,
"1/1" => 2,
_ => 0
}
})
})
let geno_matrix = matrix(geno_rows)
# Run PCA
let pca_result = geno_matrix |> pca(10)
print("Variance explained:")
for i, var_pct in pca_result.variance_ratio |> enumerate {
print(f" PC{i + 1}: {(var_pct * 100.0) |> round(2)}%")
}
# Export PC coordinates for plotting
# Define sample names from metadata or VCF header
let samples = tsv("sample_list.tsv") |> map(|r| r["sample"])
let pc_data = samples |> enumerate |> map(|i, name| { {
sample: name,
pc1: pca_result.components[i][0],
pc2: pca_result.components[i][1],
pc3: pca_result.components[i][2]
}})
pc_data |> write_tsv("pca_coordinates.tsv")
Admixture-style analysis
# Estimate ancestry proportions using graph-based clustering
let pc_data = tsv("pca_coordinates.tsv")
let pop_map = tsv("population_map.tsv")
let labeled = pc_data |> left_join(pop_map, "sample")
# Build distance matrix from PC space, then cluster with Leiden
let coord_vals = labeled |> map(|r| {
[r["pc1"] |> float, r["pc2"] |> float, r["pc3"] |> float]
})
let coord_mat = matrix(coord_vals)
# Build adjacency from pairwise distances (connect k nearest neighbors)
let k = 15
let dm = dist_matrix(coord_mat)
let n = dm |> len
let adj = range(0, n) |> map(|i| {
let dists = dm[i] |> enumerate |> filter(|j, _| { j != i }) |> sort_by(|a, b| a[1] - b[1]) |> take(k)
let neighbors = dists |> map(|pair| pair[0])
range(0, n) |> map(|j| { if neighbors |> contains(j) { 1.0 } else { 0.0 } })
})
let clusters = leiden(matrix(adj), 0.8)
# Compare clusters to known populations
let comparison = labeled
|> mutate("cluster", |i, _| { clusters[i] })
|> group_by(.population, .cluster)
|> summarize(|key, rows| { population: key[0], cluster: key[1], n: len(rows) })
|> sort_by(.population, .cluster)
comparison |> print