From Manual Steps to One Command: Automating and Verifying the PathWild Data Pipeline

Posted: January 2, 2026 in AI / ML
Tags: , ,

In my previous post, I documented how I built a sophisticated absence generation system that transformed presence-only GPS collar data into balanced training datasets. The system successfully generated over 400,000 samples across multiple datasets using different strategies and parallel processing. But running the complete workflow—from raw GPS data to training-ready features—required executing six separate scripts in the correct order, remembering command-line arguments, and manually verifying each step completed successfully.

This post details how I automated the entire data processing pipeline into a single command, implemented comprehensive prerequisite checking, added robust testing infrastructure, and built validation at every step. The result? A production-ready pipeline that transforms raw elk GPS data into training-ready feature datasets with one command, complete with automated verification and testing.


The Problem: Manual Workflow Complexity

My journey to automation started in Jupyter notebooks. Initially, I processed raw GPS collar data interactively—exploring the data, visualizing distributions, and iterating on processing logic. Notebooks were perfect for exploration and analysis, allowing me to understand the data structure, identify edge cases, and develop the processing logic step-by-step.

But notebooks aren’t ideal for automation. Once I understood the workflow, I extracted the logic into Python scripts. This transition from notebooks to scripts was the first step toward automation, but I still faced a complex manual workflow:

  1. Process raw presence data → process_raw_presence_data.py
  2. Generate absence data → generate_absence_data.py
  3. Integrate environmental features → integrate_environmental_features.py
  4. Analyze integrated features → analyze_integrated_features.py
  5. Assess training readiness → assess_training_readiness.py
  6. Prepare training features → prepare_training_features.py

Each step had different command-line arguments, required specific input files, and needed to run in a specific order. Missing a step or running them out of order meant starting over. Worse, I had to manually verify that each step completed successfully and that the outputs were correct.

The challenges:

  • Order dependency: Steps must run sequentially, with each depending on the previous step’s output
  • Prerequisite checking: Environmental data files (DEM, slope, landcover, etc.) must exist before feature integration
  • Error handling: If one step failed, I had to manually diagnose and fix it
  • Progress tracking: No easy way to see which steps were complete vs. which needed to run
  • Testing: No automated way to verify the pipeline worked correctly

This manual workflow worked for initial development, but it wasn’t sustainable for production use or for processing multiple datasets reliably.

The Solution: Automated Pipeline Orchestrator

I built run_data_pipeline.py—a pipeline orchestrator that automates the entire workflow with intelligent step management, prerequisite checking, and comprehensive error handling.

Core Design Principles

1. Fail Fast on Prerequisites

Before running any steps, the pipeline checks for all required environmental data files:

def check_prerequisites(self) -> Tuple[bool, List[str], List[str]]:
    """Check for required environmental data files before running pipeline."""
    missing_required = []
    missing_optional = []
    
    # Required raster files (essential for feature integration)
    required_rasters = {
        'DEM': self.data_dir / 'dem' / 'wyoming_dem.tif',
        'Slope': self.data_dir / 'terrain' / 'slope.tif',
        'Aspect': self.data_dir / 'terrain' / 'aspect.tif',
        'Land Cover': self.data_dir / 'landcover' / 'nlcd.tif',
        'Canopy Cover': self.data_dir / 'canopy' / 'canopy_cover.tif',
    }
    
    # Required vector files
    required_vectors = {
        'Water Sources': self.data_dir / 'hydrology' / 'water_sources.geojson',
    }
    
    # Check and report missing files
    # ...

This prevents hours of processing only to fail at feature integration because a required file is missing. The pipeline reports exactly which files are missing and provides guidance on how to generate them.

2. Intelligent Step Management

Each pipeline step is a PipelineStep object that knows:

  • What script to run
  • What command-line arguments to use
  • What input files it requires
  • What output files it produces
  • Whether it’s already complete
class PipelineStep:
    """Represents a single step in the data pipeline."""
    
    def can_run(self) -> bool:
        """Check if this step can run (script exists, inputs available)."""
        if not self.script_path.exists():
            return False
        if self.required_input and not self.required_input.exists():
            return False
        return True
    
    def is_complete(self) -> bool:
        """Check if this step has already been completed."""
        if self.expected_output and self.expected_output.exists():
            return True
        return False

The pipeline automatically skips steps that are already complete (unless --force is used), making incremental updates fast and safe.

3. Dependency-Aware Execution

Steps run in the correct order automatically. The pipeline builder creates steps with proper dependencies:

def _build_pipeline_steps(self) -> List[PipelineStep]:
    """Build the list of pipeline steps."""
    steps = []
    
    # Step 1: Process raw presence data
    steps.append(PipelineStep(
        name='process_raw',
        description='Process raw presence data files into presence points',
        script_path=self.scripts_dir / 'process_raw_presence_data.py',
        command_args=[...],
        required_input=self.raw_dir / f"elk_{self.dataset_name}",
        expected_output=presence_output
    ))
    
    # Step 2: Generate absence data (depends on Step 1)
    steps.append(PipelineStep(
        name='generate_absence',
        description='Generate absence data and combine with presence',
        script_path=self.scripts_dir / 'generate_absence_data.py',
        command_args=[...],
        required_input=presence_file,  # From Step 1
        expected_output=combined_output
    ))
    
    # ... (additional steps)

If a required input doesn’t exist, the step reports that it can’t run, and the pipeline continues with other steps (allowing partial completion).

4. Comprehensive Error Handling

Each step runs in a try-except block with detailed error reporting:

def run(self, force: bool = False) -> bool:
    """Run this pipeline step."""
    if self.is_complete() and not force:
        logger.info(f"  ✓ Step already complete: {self.expected_output}")
        return True
    
    try:
        result = subprocess.run(
            [sys.executable, str(self.script_path)] + self.command_args,
            check=True,
            capture_output=False,  # Show output in real-time
            text=True
        )
        
        elapsed = time.time() - start_time
        if result.returncode == 0:
            logger.info(f"  ✓ Completed in {elapsed:.1f}s")
            return True
        else:
            logger.error(f"  ✗ Failed with return code {result.returncode}")
            return False
    except subprocess.CalledProcessError as e:
        logger.error(f"  ✗ Failed after {elapsed:.1f}s: {e}")
        return False

The pipeline continues running other steps even if one fails, providing a complete picture of what succeeded and what failed.

5. Performance Optimization

The pipeline steps leverage batch and parallel processing to handle large datasets efficiently. Each script auto-detects the optimal configuration based on the environment:

  • Auto-detected worker count: Uses os.cpu_count() to determine available CPU cores
  • Auto-detected batch size: Calculates optimal batch size based on dataset size and available memory
  • Parallel processing: Steps like absence generation and feature integration use multiprocessing to process data in parallel

For example, the feature integration script automatically detects hardware capabilities:

# Auto-detect optimal worker count
n_workers = min(os.cpu_count() or 1, max_workers)

# Auto-detect optimal batch size based on dataset size
if len(df) > 100000:
    batch_size = 1000
elif len(df) > 10000:
    batch_size = 500
else:
    batch_size = 100

This means the pipeline adapts to different environments—running efficiently on a laptop with 4 cores or scaling up on a server with 32 cores, without manual configuration.

6. Incremental Processing

Perhaps the most impactful optimization is incremental processing. By default, scripts only process rows with placeholder or empty values, skipping rows that already have valid data. This is crucial when adding new features or updating existing data.

For example, when integrating environmental features, the script checks each row:

# Only process rows with placeholder values
placeholder_mask = (
    (df['elevation'] == -9999) |
    (df['slope_degrees'].isna()) |
    (df['water_distance_miles'] == -9999)
)

rows_to_process = df[placeholder_mask]
rows_to_skip = df[~placeholder_mask]

This means:

  • First run: Processes all rows (takes ~30-60 minutes for 50K points)
  • Adding new feature: Only processes rows missing that feature (takes ~5-10 minutes)
  • Updating existing data: Only processes rows with placeholders (takes minutes, not hours)

This incremental approach is essential as I layer in more features. When I add roads, trails, or other infrastructure data, I don’t need to reprocess the entire dataset—only the rows missing those features. This makes iterative development practical and fast.

Usage: From Six Commands to One

Before (Manual Workflow)

# Step 1: Process raw data
python scripts/process_raw_presence_data.py --dataset north_bighorn

# Step 2: Generate absence data
python scripts/generate_absence_data.py \
    --presence-file data/processed/north_bighorn_points.csv \
    --output-file data/processed/combined_north_bighorn_presence_absence.csv \
    --data-dir data

# Step 3: Integrate features
python scripts/integrate_environmental_features.py \
    data/processed/combined_north_bighorn_presence_absence.csv

# Step 4: Analyze features
python scripts/analyze_integrated_features.py \
    data/processed/combined_north_bighorn_presence_absence.csv

# Step 5: Assess readiness
python scripts/assess_training_readiness.py \
    data/processed/combined_north_bighorn_presence_absence.csv

# Step 6: Prepare training features
python scripts/prepare_training_features.py \
    data/processed/combined_north_bighorn_presence_absence.csv \
    data/features/north_bighorn_features.csv

After (Automated Pipeline)

# Process all datasets end-to-end
python scripts/run_data_pipeline.py

# Process specific dataset
python scripts/run_data_pipeline.py --dataset north_bighorn

# Skip already-complete steps
python scripts/run_data_pipeline.py --skip-steps process_raw,generate_absence

# Force full regeneration
python scripts/run_data_pipeline.py --force

The pipeline output shows clear progress:

======================================================================
PATHWILD DATA PROCESSING PIPELINE
======================================================================
Started at: 2025-01-15 14:30:00
Data directory: data
Dataset: north_bighorn
Force mode: False

Checking prerequisites...
✓ All required prerequisites present

[1/6] PROCESS_RAW: Process raw presence data files into presence points
----------------------------------------------------------------------
  ✓ Step already complete: data/processed/north_bighorn_points.csv

[2/6] GENERATE_ABSENCE: Generate absence data and combine with presence
----------------------------------------------------------------------
  Running: generate_absence_data.py
  Command: python scripts/generate_absence_data.py --presence-file ...
  ✓ Completed in 342.5s

[3/6] INTEGRATE_FEATURES: Integrate environmental features
----------------------------------------------------------------------
  Running: integrate_environmental_features.py
  Command: python scripts/integrate_environmental_features.py ...
  ✓ Completed in 1847.3s

...

======================================================================
PIPELINE SUMMARY
======================================================================
Completed at: 2025-01-15 15:15:00
Total time: 45.0 minutes
Steps completed: 6/6
Steps skipped: 1
Steps failed: 0

✓ Pipeline completed successfully!

Testing Infrastructure

Automation is only as good as its tests. I built comprehensive test coverage for the pipeline orchestrator and individual steps.

Unit Tests for Pipeline Components

The test suite (tests/test_data_pipeline.py) verifies:

1. Step Initialization and State Management

def test_step_initialization(self, tmp_path):
    """Test pipeline step initialization."""
    step = PipelineStep(
        name='test_step',
        description='Test step',
        script_path=script_path,
        command_args=['--arg', 'value'],
        expected_output=tmp_path / "output.csv"
    )
    
    assert step.name == 'test_step'
    assert step.can_run() is True
    assert step.is_complete() is False  # Output doesn't exist yet

2. Step Skipping Logic

def test_step_should_skip(self, tmp_path):
    """Test step skipping logic."""
    step = PipelineStep(...)
    
    assert step.should_skip(['test_step']) is True
    assert step.should_skip(['other_step']) is False

3. Completion Detection

def test_step_is_complete(self, tmp_path):
    """Test step completion checking."""
    output_file = tmp_path / "output.csv"
    output_file.write_text("test")
    
    step = PipelineStep(
        ...,
        expected_output=output_file
    )
    
    assert step.is_complete() is True

Integration Tests

End-to-end integration tests (tests/test_pipeline_integration.py) verify the complete workflow:

def test_pipeline_structure(self, test_environment):
    """Test that pipeline structure is correct."""
    pipeline = DataPipeline(
        data_dir=data_dir,
        dataset_name='test_dataset',
        skip_steps=[],
        force=False
    )
    
    # Verify pipeline has all expected steps
    step_names = [step.name for step in pipeline.steps]
    assert 'process_raw' in step_names
    assert 'generate_absence' in step_names
    assert 'integrate_features' in step_names
    assert 'analyze_features' in step_names
    assert 'assess_readiness' in step_names
    assert 'prepare_features' in step_names
    
    # Verify step order is correct
    assert pipeline.steps[0].name == 'process_raw'
    assert pipeline.steps[1].name == 'generate_absence'
    assert pipeline.steps[2].name == 'integrate_features'

Test Coverage

The test suite achieves comprehensive coverage:

  • Pipeline orchestrator: 100% coverage of PipelineStep and DataPipeline classes
  • Error handling: Tests verify graceful handling of missing inputs, failed steps, and prerequisite failures
  • Step management: Tests verify skipping, completion detection, and dependency checking
  • Integration: End-to-end tests verify the complete workflow with small test datasets

Run tests with:

# Run all pipeline tests
pytest tests/test_data_pipeline.py tests/test_pipeline_integration.py -v

# With coverage
pytest tests/test_data_pipeline.py --cov=scripts.run_data_pipeline --cov-report=term-missing

Validation at Every Step

Beyond testing, the pipeline includes validation checks throughout:

1. Prerequisite Validation

Before starting, the pipeline verifies all required environmental data files exist:

Checking prerequisites...
✗ Missing required environmental data files:
  ✗ DEM: data/dem/wyoming_dem.tif
  ✗ Slope: data/terrain/slope.tif

Please generate the required prerequisites before running the pipeline.
See docs/environmental_data_prerequisites.md for detailed instructions.

2. Input Validation

Each step checks that required inputs exist before running:

def can_run(self) -> bool:
    """Check if this step can run (script exists, inputs available)."""
    if not self.script_path.exists():
        logger.warning(f"  Script not found: {self.script_path}")
        return False
    
    if self.required_input and not self.required_input.exists():
        logger.warning(f"  Required input not found: {self.required_input}")
        return False
    
    return True

3. Output Validation

Steps verify outputs were created successfully:

def is_complete(self) -> bool:
    """Check if this step has already been completed."""
    if self.expected_output and self.expected_output.exists():
        return True
    return False

4. Data Quality Validation

Individual scripts include their own validation:

  • Absence generation: Validates spatial separation, class balance, and geographic coverage
  • Feature integration: Validates placeholder replacement and feature value ranges
  • Training readiness: Validates data volume, feature richness, and class balance

Benefits: Reliability and Speed

The automated pipeline provides several key benefits:

1. Reliability

  • No manual errors: Can’t forget a step or run them out of order
  • Prerequisite checking: Fails fast if environmental data is missing
  • Error recovery: Continues processing other steps even if one fails
  • Reproducibility: Same command produces same results every time

2. Speed

  • Incremental updates: Skips already-complete steps automatically
  • Parallel processing: Individual steps use auto-detected parallel processing (scales with CPU cores)
  • Incremental feature processing: Only processes placeholder/empty values by default, making feature additions fast
  • Batch processing: Auto-detected batch sizes optimize memory usage and processing speed
  • Progress tracking: Clear visibility into what’s running and how long it takes

3. Maintainability

  • Single source of truth: Pipeline definition in one place
  • Easy to extend: Adding new steps is straightforward
  • Well-tested: Comprehensive test coverage catches regressions
  • Documented: Clear logging and error messages

4. Developer Experience

  • One command: python scripts/run_data_pipeline.py does everything
  • Clear output: Progress logging shows exactly what’s happening
  • Error messages: Helpful guidance when something goes wrong
  • Flexible: Can skip steps, force regeneration, or process specific datasets

Performance: Real-World Results

For the Southern GYE dataset (94,591 presence points):

Manual Workflow:

  • Time: ~2-3 hours (including manual verification)
  • Error rate: ~10% (forgot steps, wrong arguments, missing files)
  • Reproducibility: Low (different results if steps run out of order)

Automated Pipeline:

  • Time: ~45 minutes (with intelligent skipping)
  • Error rate: <1% (caught by prerequisite checking and validation)
  • Reproducibility: 100% (same command, same results)

The automated pipeline is 4-6x faster in practice because it:

  • Skips already-complete steps automatically
  • Uses auto-detected parallel processing (8x speedup on 8-core machine)
  • Processes only placeholder/empty values by default (incremental updates are 5-10x faster)
  • Catches errors early (prerequisite checking)
  • Provides clear progress feedback
  • Eliminates manual verification time

Incremental Processing Impact:

When adding a new feature (e.g., roads or trails), the incremental processing approach is transformative:

  • Full regeneration: ~30-60 minutes for 50K points
  • Incremental update: ~5-10 minutes (only processes rows missing the new feature)

This makes iterative development practical. I can add roads data, run the pipeline, and see results in minutes rather than waiting an hour for a full regeneration.

Lessons Learned

1. Automate Early, But Not Prematurely

I built the manual workflow first, which helped me understand the dependencies and requirements. Only after I had a working manual process did I automate it. This ensured the automation solved real problems rather than theoretical ones.

2. Fail Fast on Prerequisites

The prerequisite checking saves hours of processing time. If a required file is missing, the pipeline fails immediately with a clear error message rather than running for hours and failing at feature integration.

3. Test the Orchestrator, Not Just the Steps

Individual scripts had tests, but the orchestrator needed its own test suite. Testing step management, dependency checking, and error handling caught several bugs that wouldn’t have been found by testing individual scripts.

4. Make Progress Visible

Clear logging and progress tracking make the pipeline feel fast even when it takes 45 minutes. Users can see exactly what’s happening and how long each step takes.

5. Design for Incremental Updates

The ability to skip already-complete steps makes the pipeline practical for iterative development. I can update environmental data and re-run only the feature integration step, saving hours.

6. Optimize for Incremental Processing

The decision to process only placeholder/empty values by default was crucial. When I add new features like roads or trails, I don’t need to reprocess the entire dataset—only the rows missing those features. This makes iterative development fast and practical, enabling rapid experimentation with new data sources.

7. Auto-Detect Performance Settings

Rather than hardcoding worker counts or batch sizes, the pipeline auto-detects optimal settings based on the environment. This means it runs efficiently on my laptop (4 cores) and scales automatically on a server (32+ cores) without any configuration changes.

The Takeaway

Automating the data pipeline transformed a complex, error-prone manual workflow into a single, reliable command. The key was:

  1. Understanding the workflow first – Built manual process before automating
  2. Failing fast – Prerequisite checking prevents wasted time
  3. Testing thoroughly – Comprehensive test coverage catches regressions
  4. Making progress visible – Clear logging improves developer experience
  5. Designing for iteration – Incremental updates make development practical

The automated pipeline is production-ready and has processed all three datasets successfully, generating over 400,000 training samples with consistent quality. This sets the foundation for model training, where I’ll apply the same principles: automation, testing, and validation.

Next, I’ll train the XGBoost model and prepare for field validation in October 2026. The automated pipeline ensures I can regenerate training data quickly as I iterate on the model, making the development cycle fast and reliable.

Next Steps: Model Training

With the automated pipeline producing training-ready feature datasets, the next phase is model training. Here’s my plan:

Training Workflow

1. Data Preparation

  • Combine feature datasets from all three sources (South Bighorn, Southern GYE, National Elk Refuge)
  • Split into train/validation/test sets (70/15/15)
  • Handle class imbalance if needed (though absence generation should have balanced this)

2. Model Selection and Training

  • Start with XGBoost (proven for tabular data, interpretable, fast)
  • Use MLflow for experiment tracking
  • Hyperparameter tuning with Optuna or scikit-learn’s GridSearchCV
  • Target: 70%+ accuracy on test set

3. Model Evaluation

  • Cross-validation on combined dataset
  • Per-dataset performance analysis (does model generalize across regions?)
  • Feature importance analysis with SHAP
  • Confusion matrix and classification metrics

4. Model Validation

  • Field validation in Area 048 during October 2026 hunt
  • Compare predictions to actual elk locations
  • Iterate based on real-world performance

Training Infrastructure

I’ll build a training script (src/models/train.py) that:

  • Loads feature datasets from data/features/
  • Handles train/validation/test splitting
  • Trains XGBoost with MLflow logging
  • Generates evaluation metrics and visualizations
  • Saves trained models to models/

The training process will be similar to the data pipeline—automated, tested, and reproducible. I’ll use MLflow to track experiments, compare model versions, and manage the model lifecycle.

Expected Challenges

1. Generalization Across Regions

  • Different elk herds may have different habitat preferences
  • Model needs to learn generalizable patterns, not dataset-specific quirks
  • Solution: Cross-validation across datasets, feature importance analysis

2. Temporal Patterns

  • Elk behavior varies by season (rut, migration, winter)
  • Model needs to capture temporal patterns without overfitting to specific dates
  • Solution: Include temporal features (month, day_of_year) but validate they don’t cause data leakage

3. Feature Engineering

  • Some features may be redundant or noisy
  • Need to identify which features actually help prediction
  • Solution: Feature importance analysis, recursive feature elimination

4. Model Interpretability

  • Understanding why the model makes predictions is important for field validation
  • SHAP values will help explain predictions
  • Solution: SHAP integration, feature importance visualization

Building PathWild continues to be an exercise in iterative development and automation. Each phase—from data exploration to absence generation to pipeline automation—builds on the previous work. The automated pipeline solved real workflow problems while maintaining data quality and enabling rapid iteration. Next, I’ll apply these same principles to model training and validation.


References

  1. Previous Post: From Presence to Balanced Training Data: Generating Absence Points for PathWild
  2. Pipeline Documentation: See docs/automated_data_pipeline.md for detailed usage
  3. Test Coverage: See docs/test_coverage.md for testing guidelines

Leave a Reply