Validate data in Xorq pipelines

Catch schema errors at compile time and filter bad data at runtime

This guide shows you how to validate data quality before it breaks your pipeline. You’ll use schema validation to catch errors at compile time and runtime validation to catch issues during execution.

Prerequisites

Note

If you encounter ModuleNotFoundError: No module named 'xorq_datafusion' when importing xorq.api, install the xorq-datafusion package separately: pip install xorq-datafusion. The embedded backend requires xorq-datafusion (a Rust extension), which is separate from the Python datafusion package. If the error persists, reinstall Xorq: pip install --upgrade --force-reinstall xorq xorq-datafusion.

Validate schemas at compile time

Schema validation catches mismatches before execution starts. The validation happens when you create the expression, so errors fail fast without processing any data.

Check required columns and types before processing. Create schema_validation.py:

# schema_validation.py
import xorq.api as xo
from datetime import datetime, timedelta

# Define expected schema
expected_schema = {
    "customer_id": "int64",
    "email": "string",
    "created_at": "timestamp",
    "status": "string"
}

def validate_schema(expr, expected_schema):
    """Validate expression schema matches expected schema."""
    actual_schema = expr.schema()
    
    # Check all expected columns exist
    missing = [col for col in expected_schema.keys() if col not in actual_schema.names]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")
    
    # Check types match
    for col, expected_type in expected_schema.items():
        actual_type = str(actual_schema[col])
        if actual_type != expected_type:
            raise ValueError(
                f"Type mismatch for column '{col}': "
                f"expected {expected_type}, got {actual_type}"
            )
    
    return expr

# Create data
con = xo.connect()
data = xo.memtable({
    "customer_id": [1, 2, 3],
    "email": ["a@b.com", "c@d.com", "e@f.com"],
    "created_at": [datetime.now() - timedelta(days=i) for i in range(3)],
    "status": ["active", "inactive", "pending"]
}, name="customers")

# Schema validation happens here (before execution)
# Raises ValueError immediately if schema doesn't match
validated = validate_schema(data, expected_schema)
validated = validated.mutate(validated=True)

df = validated.execute()
print(f"Validated schema for {len(df)} rows")

Run the script:

python schema_validation.py

You should see a result like this:

Validated schema for 3 rows

To verify schema validation works, test these scenarios:

  • Data matching the expected schema executes successfully
  • Data missing customer_id column raises ValueError: Missing required columns: ['customer_id'] at the validate_schema() call before execution
  • Data with wrong type (for example, customer_id as string) raises ValueError: Type mismatch for column 'customer_id': expected int64, got string

Validate data at runtime

Runtime validation catches data quality issues during execution. These checks run when you call .execute() and fail before downstream processing continues.

Check for null values

Remove rows with missing values in critical columns. Create check_nulls.py:

# check_nulls.py
import xorq.api as xo

con = xo.connect()

data = xo.memtable({
    "customer_id": [1, 2, None, 4],
    "email": ["a@b.com", None, "c@d.com", "e@f.com"],
    "status": ["active", "inactive", None, "pending"]
}, name="customers")

# Filter out rows where critical columns are null
valid_data = data.filter(
    xo._.customer_id.notnull(),
    xo._.email.notnull(),
    xo._.status.notnull()
)

original = data.execute()
df = valid_data.execute()
print(f"Original rows: {len(original)}")
print(f"Valid rows (no nulls): {len(df)}")
print(f"Removed rows: {len(original) - len(df)}")

Run the script:

python check_nulls.py

You should see a result like this:

Original rows: 4
Valid rows (no nulls): 2
Removed rows: 2

Stop processing on validation failure

Raise exceptions when validation fails to prevent processing invalid data. Create fail_fast_validation.py:

# fail_fast_validation.py
import xorq.api as xo

def validate_and_process(data_expr, required_columns):
    """Validate data and fail fast if invalid."""
    # Check that required columns exist in schema
    schema = data_expr.schema()
    missing = [col for col in required_columns if col not in schema.names]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")
    
    # Apply validation filters
    validated = data_expr.filter(
        *[data_expr[col].notnull() for col in required_columns]
    )
    
    # Execute and check if any rows remain
    result = validated.execute()
    if len(result) == 0:
        raise ValueError("No valid rows after validation")
    
    return result

con = xo.connect()
data = xo.memtable({
    "customer_id": [1, 2, None, 4],
    "email": ["a@b.com", None, "c@d.com", "e@f.com"],
    "status": ["active", "inactive", None, "pending"]
}, name="customers")

df = validate_and_process(data, ["customer_id", "email", "status"])
print(f"Processed {len(df)} valid rows")

Run the script:

python fail_fast_validation.py

You should see a result like this:

Processed 2 valid rows

To verify fail-fast validation: - Data with all required columns processes successfully - Data missing required columns raises ValueError with column names at expression creation - Data where all rows have nulls raises ValueError: No valid rows after validation during execution

Remove null rows

Drop rows with null values using .drop_null(). Create drop_nulls.py:

# drop_nulls.py
import xorq.api as xo

con = xo.connect()

data = xo.memtable({
    "customer_id": [1, 2, None, 4],
    "email": ["a@b.com", None, "c@d.com", "e@f.com"]
}, name="customers")

# Remove rows where any column is null
clean_data = data.drop_null(how="any")

df = clean_data.execute()
print(f"Clean rows: {len(df)}")

Run the script:

python drop_nulls.py

You should see a result like this:

Clean rows: 2

The how parameter controls which rows to remove: - how="any" removes rows with any null in any column - how="all" removes only rows where every column is null - subset=["col1", "col2"] removes rows where only specified columns are null

Validate data ranges

Filter data to ensure values fall within expected ranges. Create validate_ranges.py:

# validate_ranges.py
import xorq.api as xo

con = xo.connect()

data = xo.memtable({
    "age": [25, 150, 18, 10, 120],
    "revenue": [100.0, -10.0, 50.0, 200.0, 0.0]
}, name="customers")

# Ensure values are within expected bounds
valid_data = data.filter(
    (xo._.age >= 18) & (xo._.age <= 120),
    xo._.revenue > 0
)

df = valid_data.execute()
print(f"Rows within valid range: {len(df)}")

Run the script:

python validate_ranges.py

You should see a result like this:

Rows within valid range: 2

Range validation removes rows that don’t meet the criteria. To verify, check that output contains only values within specified ranges and test edge cases like age=18 or age=120.

Validate string patterns

Check string formats using string methods. Create validate_strings.py:

# validate_strings.py
import xorq.api as xo

con = xo.connect()

data = xo.memtable({
    "email": ["valid@test.com", "invalid", "missing.dot@com", "bad@", "good@test.com"]
}, name="customers")

# Check for valid email pattern
valid_emails = data.filter(
    xo._.email.contains("@"),
    xo._.email.contains(".")
)

df = valid_emails.execute()
print(f"Rows with valid email format: {len(df)}")

Run the script:

python validate_strings.py

You should see a result like this:

Rows with valid email format: 2

This filters out emails missing “@” or “.” characters. For production, use more comprehensive email validation rules.

Create custom validation functions

Combine schema validation with business rule validation in a reusable function. The schema check catches errors before execution, and the business rules filter data during execution.

Create custom_validation.py:

# custom_validation.py
import xorq.api as xo

def validate_customer_data(expr):
    """Validate customer data schema and business rules."""
    # Define expected schema
    expected_schema = {
        "customer_id": "int64",
        "email": "string",
        "status": "string"
    }
    
    # Check schema
    actual_schema = expr.schema()
    missing = [col for col in expected_schema.keys() if col not in actual_schema.names]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")
    
    # Apply business rules
    valid_statuses = ["active", "inactive", "pending"]
    validated = expr.filter(expr.status.isin(valid_statuses))
    
    return validated

# Apply validation
con = xo.connect()
data = xo.memtable({
    "customer_id": [1, 2, 3, 4],
    "email": ["a@b.com", "c@d.com", "e@f.com", "g@h.com"],
    "status": ["active", "inactive", "invalid", "pending"]
}, name="customers")

# Schema validation happens at function call (before execution)
# Business rule validation happens at execute (during execution)
validated_data = validate_customer_data(data)
df = validated_data.execute()
print(f"Validated {len(df)} rows with custom rules")

Run the script:

python custom_validation.py

You should see a result like this:

Validated 3 rows with custom rules

To verify custom validation: - Valid data matching all business rules processes successfully - Invalid status values filters out rows with “invalid” status - Missing required columns raises ValueError: Missing required columns at function call - Output contains only rows with status in [“active”, “inactive”, “pending”]

Production considerations

Choose the right validation strategy based on when you need to catch errors and how much data you’re processing.

Choose validation timing

Compile-time validation (schema checks): Use for critical schema mismatches that would break your pipeline immediately. These catch errors before any data processing.

Runtime validation (filters and assertions): Use for null checks, range validation, and business rules that filter rows. These catch data quality issues during execution.

For large datasets: Use schema validation functions for compile-time checks and filter-based validation for runtime data quality checks.

Performance

Schema validation adds minimal overhead because it checks types once at expression creation. Runtime validation processes every row, so filter early in your pipeline to reduce data volume for downstream operations.

Monitoring

Track validation metrics in production:

Metric What to monitor
Validation failure rate Percentage of rows filtered out
Schema mismatch frequency How often schema validation fails
Null value percentage Percentage of nulls in critical columns

Log validation failures to identify data quality trends and upstream issues.

You now have compile-time schema validation to catch errors before execution and runtime validation to filter bad data during execution. For troubleshooting validation errors, see Troubleshoot data quality issues.

Next steps