Source code for lib.mosaic

"""Mosaic gridding: build flight mosaics, incremental sweep processing."""

from __future__ import annotations

import os
from typing import Any

import numpy as np
from pyhdf.SD import SD, SDC

from lib.constants import GRID_RES, VEG_LOSS_THRESHOLD
from lib.io import process_file
from lib.fire import detect_fire_simple
from lib.vegetation import compute_ndvi, has_sunlight


def _file_extent(filepath: str) -> tuple[float, float, float, float]:
    """Get lat/lon bounding box of a single HDF file from corner attributes.

    Returns:
        (lat_min, lat_max, lon_min, lon_max) with 0.005° buffer.
    """
    sd = SD(filepath, SDC.READ)
    attrs = sd.attributes()
    lat_min = min(attrs['lat_LL'], attrs['lat_UL'])
    lat_max = max(attrs['lat_LR'], attrs['lat_UR'])
    lon_min = min(attrs['lon_UL'], attrs['lon_UR'])
    lon_max = max(attrs['lon_LL'], attrs['lon_LR'])
    sd.end()
    buf = 0.005
    return lat_min - buf, lat_max + buf, lon_min - buf, lon_max + buf


def _expand_grid(gs: dict[str, Any],
                 new_lat_min: float, new_lat_max: float,
                 new_lon_min: float, new_lon_max: float) -> None:
    """Expand all grid arrays in-place to cover new bounds.

    Allocates larger arrays and copies old data at the correct offset.
    """
    old_nrows, old_ncols = gs['nrows'], gs['ncols']
    new_nrows = int(np.ceil((new_lat_max - new_lat_min) / GRID_RES))
    new_ncols = int(np.ceil((new_lon_max - new_lon_min) / GRID_RES))

    # Offset of old grid within new grid
    row_off = round((new_lat_max - gs['lat_max']) / GRID_RES)
    col_off = round((gs['lon_min'] - new_lon_min) / GRID_RES)

    # Expand float arrays (NaN-filled)
    for key in ('T4', 'T11', 'SWIR', 'Red', 'NIR'):
        new_arr = np.full((new_nrows, new_ncols), np.nan, dtype=np.float32)
        new_arr[row_off:row_off + old_nrows, col_off:col_off + old_ncols] = gs[key]
        gs[key] = new_arr

    # Expand int arrays (zero-filled)
    for key in ('fire_count', 'obs_count'):
        new_arr = np.zeros((new_nrows, new_ncols), dtype=np.int32)
        new_arr[row_off:row_off + old_nrows, col_off:col_off + old_ncols] = gs[key]
        gs[key] = new_arr

    # Expand NDVI_baseline (NaN-filled)
    new_bl = np.full((new_nrows, new_ncols), np.nan, dtype=np.float32)
    new_bl[row_off:row_off + old_nrows, col_off:col_off + old_ncols] = gs['NDVI_baseline']
    gs['NDVI_baseline'] = new_bl

    # Expand veg_confirmed (False-filled)
    new_vc = np.zeros((new_nrows, new_ncols), dtype=np.bool_)
    new_vc[row_off:row_off + old_nrows, col_off:col_off + old_ncols] = gs['veg_confirmed']
    gs['veg_confirmed'] = new_vc

    # Expand running accumulators for ML features
    for key, fill, dtype in [
        ('T4_max', -np.inf, np.float32),
        ('dT_max', -np.inf, np.float32),
        ('SWIR_max', -np.inf, np.float32),
        ('NDVI_min', np.inf, np.float32),
    ]:
        new_arr = np.full((new_nrows, new_ncols), fill, dtype=dtype)
        new_arr[row_off:row_off + old_nrows, col_off:col_off + old_ncols] = gs[key]
        gs[key] = new_arr

    for key in ('T4_sum', 'T11_sum', 'SWIR_sum', 'Red_sum', 'NIR_sum', 'NDVI_sum'):
        new_arr = np.zeros((new_nrows, new_ncols), dtype=np.float64)
        new_arr[row_off:row_off + old_nrows, col_off:col_off + old_ncols] = gs[key]
        gs[key] = new_arr

    new_nobs = np.zeros((new_nrows, new_ncols), dtype=np.int32)
    new_nobs[row_off:row_off + old_nrows, col_off:col_off + old_ncols] = gs['NDVI_obs']
    gs['NDVI_obs'] = new_nobs

    gs['nrows'] = new_nrows
    gs['ncols'] = new_ncols
    gs['lat_min'] = new_lat_min
    gs['lat_max'] = new_lat_max
    gs['lon_min'] = new_lon_min
    gs['lon_max'] = new_lon_max
    gs['lat_axis'] = np.linspace(new_lat_max, new_lat_min, new_nrows)
    gs['lon_axis'] = np.linspace(new_lon_min, new_lon_max, new_ncols)

    print(f'  Grid expanded: {old_nrows}x{old_ncols} -> {new_nrows}x{new_ncols}')


[docs] def build_mosaic(files: list[str], lat_min: float, lat_max: float, lon_min: float, lon_max: float, day_night: str = 'D') -> dict[str, Any]: """Build a gridded mosaic from a list of flight line files. Files are processed in order (chronological), so later lines overwrite earlier ones in overlapping areas. A multi-pass consistency filter requires fire to be detected in >=2 passes for pixels observed multiple times, filtering out angle-dependent false positives (e.g. solar reflection). Returns dict with keys: T4, T11, SWIR: gridded brightness temp / radiance arrays Red, NIR: gridded VNIR radiance (NaN for nighttime flights) NDVI: computed from gridded Red/NIR (NaN for nighttime flights) fire: boolean fire mask after multi-pass filter lat_axis, lon_axis: coordinate axes fire_count, obs_count: per-cell detection and observation counts day_night: 'D' or 'N' flag """ nrows = int(np.ceil((lat_max - lat_min) / GRID_RES)) ncols = int(np.ceil((lon_max - lon_min) / GRID_RES)) grid_T4 = np.full((nrows, ncols), np.nan, dtype=np.float32) grid_T11 = np.full((nrows, ncols), np.nan, dtype=np.float32) grid_SWIR = np.full((nrows, ncols), np.nan, dtype=np.float32) grid_Red = np.full((nrows, ncols), np.nan, dtype=np.float32) grid_NIR = np.full((nrows, ncols), np.nan, dtype=np.float32) grid_fire_count = np.zeros((nrows, ncols), dtype=np.int32) grid_obs_count = np.zeros((nrows, ncols), dtype=np.int32) T4_thresh = 310.0 if day_night == 'N' else 325.0 for i, filepath in enumerate(files): name = os.path.basename(filepath) print(f' [{i+1}/{len(files)}] {name}') pf = process_file(filepath) T4, T11, SWIR = pf['T4'], pf['T11'], pf['SWIR'] lat, lon = pf['lat'], pf['lon'] fire = detect_fire_simple(T4, T11, T4_thresh=T4_thresh) valid = np.isfinite(lat) & np.isfinite(lon) & np.isfinite(T4) row_idx = ((lat_max - lat) / GRID_RES).astype(np.int32) col_idx = ((lon - lon_min) / GRID_RES).astype(np.int32) in_bounds = ( valid & (row_idx >= 0) & (row_idx < nrows) & (col_idx >= 0) & (col_idx < ncols) ) r = row_idx[in_bounds] c = col_idx[in_bounds] grid_T4[r, c] = T4[in_bounds] grid_T11[r, c] = T11[in_bounds] grid_SWIR[r, c] = SWIR[in_bounds] grid_obs_count[r, c] += 1 grid_fire_count[r, c] += fire[in_bounds].astype(np.int32) if day_night != 'N': grid_Red[r, c] = pf['Red'][in_bounds] grid_NIR[r, c] = pf['NIR'][in_bounds] # Multi-pass consistency filter multi_pass = grid_obs_count >= 2 grid_fire = np.where( multi_pass, grid_fire_count >= 2, grid_fire_count >= 1 ) if day_night != 'N': grid_NDVI = compute_ndvi(grid_Red, grid_NIR) else: grid_NDVI = np.full((nrows, ncols), np.nan, dtype=np.float32) lat_axis = np.linspace(lat_max, lat_min, nrows) lon_axis = np.linspace(lon_min, lon_max, ncols) return { 'T4': grid_T4, 'T11': grid_T11, 'SWIR': grid_SWIR, 'Red': grid_Red, 'NIR': grid_NIR, 'NDVI': grid_NDVI, 'fire': grid_fire, 'lat_axis': lat_axis, 'lon_axis': lon_axis, 'fire_count': grid_fire_count, 'obs_count': grid_obs_count, 'day_night': day_night, }
[docs] def init_grid_state(lat_min: float | None = None, lat_max: float | None = None, lon_min: float | None = None, lon_max: float | None = None) -> dict[str, Any]: """Initialize empty grid state for incremental mosaic building. When called with no arguments, returns an empty sentinel grid (0x0) that will be populated on the first call to process_sweep(). """ if lat_min is None: return { 'T4': np.empty((0, 0), dtype=np.float32), 'T11': np.empty((0, 0), dtype=np.float32), 'SWIR': np.empty((0, 0), dtype=np.float32), 'Red': np.empty((0, 0), dtype=np.float32), 'NIR': np.empty((0, 0), dtype=np.float32), 'fire_count': np.empty((0, 0), dtype=np.int32), 'obs_count': np.empty((0, 0), dtype=np.int32), 'NDVI_baseline': np.empty((0, 0), dtype=np.float32), 'veg_confirmed': np.empty((0, 0), dtype=np.bool_), # Running accumulators for ML aggregate features 'T4_max': np.empty((0, 0), dtype=np.float32), 'T4_sum': np.empty((0, 0), dtype=np.float64), 'T11_sum': np.empty((0, 0), dtype=np.float64), 'dT_max': np.empty((0, 0), dtype=np.float32), 'SWIR_max': np.empty((0, 0), dtype=np.float32), 'SWIR_sum': np.empty((0, 0), dtype=np.float64), 'Red_sum': np.empty((0, 0), dtype=np.float64), 'NIR_sum': np.empty((0, 0), dtype=np.float64), 'NDVI_min': np.empty((0, 0), dtype=np.float32), 'NDVI_sum': np.empty((0, 0), dtype=np.float64), 'NDVI_obs': np.empty((0, 0), dtype=np.int32), 'nrows': 0, 'ncols': 0, 'lat_min': None, 'lat_max': None, 'lon_min': None, 'lon_max': None, 'lat_axis': np.array([]), 'lon_axis': np.array([]), } assert lat_max is not None and lon_min is not None and lon_max is not None nrows = int(np.ceil((lat_max - lat_min) / GRID_RES)) ncols = int(np.ceil((lon_max - lon_min) / GRID_RES)) return { 'T4': np.full((nrows, ncols), np.nan, dtype=np.float32), 'T11': np.full((nrows, ncols), np.nan, dtype=np.float32), 'SWIR': np.full((nrows, ncols), np.nan, dtype=np.float32), 'Red': np.full((nrows, ncols), np.nan, dtype=np.float32), 'NIR': np.full((nrows, ncols), np.nan, dtype=np.float32), 'fire_count': np.zeros((nrows, ncols), dtype=np.int32), 'obs_count': np.zeros((nrows, ncols), dtype=np.int32), 'NDVI_baseline': np.full((nrows, ncols), np.nan, dtype=np.float32), 'veg_confirmed': np.zeros((nrows, ncols), dtype=np.bool_), # Running accumulators for ML aggregate features 'T4_max': np.full((nrows, ncols), -np.inf, dtype=np.float32), 'T4_sum': np.zeros((nrows, ncols), dtype=np.float64), 'T11_sum': np.zeros((nrows, ncols), dtype=np.float64), 'dT_max': np.full((nrows, ncols), -np.inf, dtype=np.float32), 'SWIR_max': np.full((nrows, ncols), -np.inf, dtype=np.float32), 'SWIR_sum': np.zeros((nrows, ncols), dtype=np.float64), 'Red_sum': np.zeros((nrows, ncols), dtype=np.float64), 'NIR_sum': np.zeros((nrows, ncols), dtype=np.float64), 'NDVI_min': np.full((nrows, ncols), np.inf, dtype=np.float32), 'NDVI_sum': np.zeros((nrows, ncols), dtype=np.float64), 'NDVI_obs': np.zeros((nrows, ncols), dtype=np.int32), 'nrows': nrows, 'ncols': ncols, 'lat_min': lat_min, 'lat_max': lat_max, 'lon_min': lon_min, 'lon_max': lon_max, 'lat_axis': np.linspace(lat_max, lat_min, nrows), 'lon_axis': np.linspace(lon_min, lon_max, ncols), }
[docs] def process_sweep(filepath: str, gs: dict[str, Any], pixel_rows: list[dict[str, Any]], day_night: str = 'auto', flight_num: str = '', fire_fn: Any = None) -> tuple[int, str]: """Process one sweep file and update grid state in-place. Also appends per-pixel data to pixel_rows list for DataFrame construction later. Args: filepath: path to HDF file. gs: grid state dict (modified in-place). pixel_rows: list to append per-pixel dicts to (modified in-place). day_night: 'D', 'N', or 'auto'. When 'auto', checks VNIR radiance to determine if the scene has sunlight (robust to cloud). flight_num: flight identifier string. fire_fn: optional callable(T4, T11, NDVI) -> bool mask. When provided, replaces detect_fire_simple for fire detection. Returns: (n_new_fire, day_night): fire pixel count and detected day/night flag. """ # Get this file's extent and init/expand grid as needed file_lat_min, file_lat_max, file_lon_min, file_lon_max = _file_extent(filepath) if gs['nrows'] == 0: # First sweep: initialize grid from this file's extent gs.update(init_grid_state(file_lat_min, file_lat_max, file_lon_min, file_lon_max)) else: need_expand = ( file_lat_min < gs['lat_min'] or file_lat_max > gs['lat_max'] or file_lon_min < gs['lon_min'] or file_lon_max > gs['lon_max'] ) if need_expand: _expand_grid( gs, min(gs['lat_min'], file_lat_min), max(gs['lat_max'], file_lat_max), min(gs['lon_min'], file_lon_min), max(gs['lon_max'], file_lon_max), ) pf = process_file(filepath) T4, T11, SWIR = pf['T4'], pf['T11'], pf['SWIR'] lat, lon = pf['lat'], pf['lon'] # Auto-detect day/night from VNIR radiance if day_night == 'auto': day_night = 'D' if has_sunlight(pf['Red'], pf['NIR']) else 'N' gs['day_night'] = day_night if fire_fn is not None: fire = fire_fn(T4, T11, pf['NDVI']) else: T4_thresh = 310.0 if day_night == 'N' else 325.0 fire = detect_fire_simple(T4, T11, T4_thresh=T4_thresh) valid = np.isfinite(lat) & np.isfinite(lon) & np.isfinite(T4) row_idx = ((gs['lat_max'] - lat) / GRID_RES).astype(np.int32) col_idx = ((lon - gs['lon_min']) / GRID_RES).astype(np.int32) in_bounds = ( valid & (row_idx >= 0) & (row_idx < gs['nrows']) & (col_idx >= 0) & (col_idx < gs['ncols']) ) r = row_idx[in_bounds] c = col_idx[in_bounds] gs['T4'][r, c] = T4[in_bounds] gs['T11'][r, c] = T11[in_bounds] gs['SWIR'][r, c] = SWIR[in_bounds] gs['obs_count'][r, c] += 1 gs['fire_count'][r, c] += fire[in_bounds].astype(np.int32) # ── Running accumulators for ML aggregate features ── T4_ib = T4[in_bounds] T11_ib = T11[in_bounds] SWIR_ib = SWIR[in_bounds] gs['T4_max'][r, c] = np.maximum(gs['T4_max'][r, c], T4_ib) gs['T4_sum'][r, c] += T4_ib.astype(np.float64) gs['T11_sum'][r, c] += T11_ib.astype(np.float64) gs['dT_max'][r, c] = np.maximum(gs['dT_max'][r, c], T4_ib - T11_ib) gs['SWIR_max'][r, c] = np.maximum(gs['SWIR_max'][r, c], SWIR_ib) gs['SWIR_sum'][r, c] += SWIR_ib.astype(np.float64) Red_ib = pf['Red'][in_bounds] NIR_ib = pf['NIR'][in_bounds] gs['Red_sum'][r, c] += np.where(np.isfinite(Red_ib), Red_ib, 0.0).astype(np.float64) gs['NIR_sum'][r, c] += np.where(np.isfinite(NIR_ib), NIR_ib, 0.0).astype(np.float64) # ── Current sweep NDVI ── sweep_ndvi = pf['NDVI'][in_bounds] is_sunlit = np.isfinite(sweep_ndvi) # Daytime NDVI accumulators if np.any(is_sunlit): rs, cs, nv = r[is_sunlit], c[is_sunlit], sweep_ndvi[is_sunlit] gs['NDVI_min'][rs, cs] = np.minimum(gs['NDVI_min'][rs, cs], nv) gs['NDVI_sum'][rs, cs] += nv.astype(np.float64) gs['NDVI_obs'][rs, cs] += 1 # ── Set NDVI baseline (first valid daytime obs, write-once) ── no_baseline = np.isnan(gs['NDVI_baseline'][r, c]) gs['NDVI_baseline'][r, c] = np.where( no_baseline & is_sunlit, sweep_ndvi, gs['NDVI_baseline'][r, c]) # ── Detect vegetation loss at fire pixels ── fire_here = fire[in_bounds] has_baseline = np.isfinite(gs['NDVI_baseline'][r, c]) is_night = ~is_sunlit # Daytime: fire + NDVI drop confirms vegetation loss with np.errstate(invalid='ignore'): veg_drop = gs['NDVI_baseline'][r, c] - sweep_ndvi day_veg_lost = has_baseline & is_sunlit & (veg_drop >= VEG_LOSS_THRESHOLD) # Nighttime: fire alone confirms vegetation loss (fewer FP at night) night_veg_lost = has_baseline & is_night newly_confirmed = fire_here & (day_veg_lost | night_veg_lost) gs['veg_confirmed'][r, c] = gs['veg_confirmed'][r, c] | newly_confirmed # ── VNIR update: two paths ── # Non-fire pixels: keep best-illuminated (highest NIR) # Veg-confirmed pixels: take latest observation (show burn scar) new_nir = pf['NIR'][in_bounds] new_red = pf['Red'][in_bounds] old_nir = gs['NIR'][r, c] old_red = gs['Red'][r, c] best_illuminated = np.isnan(old_nir) | (new_nir > old_nir) take_latest = gs['veg_confirmed'][r, c] use_new = best_illuminated | take_latest gs['Red'][r, c] = np.where(use_new, new_red, old_red) gs['NIR'][r, c] = np.where(use_new, new_nir, old_nir) # Use per-pixel NDVI where sunlight exists; nighttime pixels stay NaN NDVI = pf['NDVI'] pixel_rows.append({ 'flight': flight_num, 'file': os.path.basename(filepath), 'lat': gs['lat_max'] - r * GRID_RES, 'lon': gs['lon_min'] + c * GRID_RES, 'T4': T4[in_bounds], 'T11': T11[in_bounds], 'dT': (T4 - T11)[in_bounds], 'SWIR': SWIR[in_bounds], 'NDVI': NDVI[in_bounds], 'fire': fire[in_bounds], }) # ── Clear accumulators where fire has passed (veg gone, no current fire) ── cleared = gs['veg_confirmed'] & (gs['fire_count'] == 0) if np.any(cleared): gs['T4_max'][cleared] = -np.inf gs['T4_sum'][cleared] = 0 gs['T11_sum'][cleared] = 0 gs['dT_max'][cleared] = -np.inf gs['SWIR_max'][cleared] = -np.inf gs['SWIR_sum'][cleared] = 0 gs['Red_sum'][cleared] = 0 gs['NIR_sum'][cleared] = 0 gs['NDVI_min'][cleared] = np.inf gs['NDVI_sum'][cleared] = 0 gs['NDVI_obs'][cleared] = 0 gs['obs_count'][cleared] = 0 return int(fire[in_bounds].sum()), day_night
[docs] def get_fire_mask(gs: dict[str, Any]) -> np.ndarray: """Apply multi-pass consistency filter with vegetation confirmation. Standard rule: pixels observed >=2 times need fire in >=2 passes. Override: pixels with veg_confirmed=True are treated as fire even with only 1 thermal fire detection (vegetation loss is independent confirmation that the thermal signal was real fire). """ multi_pass = gs['obs_count'] >= 2 standard_fire = np.where( multi_pass, gs['fire_count'] >= 2, gs['fire_count'] >= 1, ) veg_boost = gs['veg_confirmed'] & (gs['fire_count'] >= 1) return standard_fire | veg_boost