User-defined exchange functions

Understand how UDXFs enable distributed processing with Apache Arrow Flight

Calling OpenAI’s API to analyze one million customer reviews using regular UDFs means blocking the query engine for seconds per API call. A single API timeout or network failure crashes your entire pipeline because the UDF and query engine share the same process. UDXFs solve this challenge by running your Python logic in separate processes that communicate via Apache Arrow Flight. This provides process isolation that protects the query engine from failures and enables safe external service calls.

What are user-defined exchange functions?

User-defined exchange functions are distributed UDFs that execute in separate processes or remote services rather than running in the same process as your query engine. Unlike regular UDFs that run through direct function calls, UDXFs communicate over Apache Arrow Flight protocol to exchange data between processes.

UDXFs provide three critical capabilities that regular UDFs cannot provide. First, process isolation lets you run untrusted code safely without risking the query engine. Second, resource management lets you dedicate specific CPU and memory allocations to particular operations. Third, remote execution enables calling external services or deploying processing logic as microservices.

import xorq.api as xo
from xorq.expr.relations import flight_udxf
import xorq.expr.datatypes as dt

# Define processing function
def sentiment_analysis(df):
    # Calls external API or runs heavy model
    df['sentiment'] = analyze_sentiment(df['text'])
    return df

# Define schemas
input_schema = xo.schema({"text": dt.string})
output_schema = xo.schema({"text": dt.string, "sentiment": dt.string})

# Create UDXF (curried, can be used with pipe)
sentiment_udxf = flight_udxf(
    process_df=sentiment_analysis,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema,
    name="sentiment_analysis"
)

# Apply to data using pipe pattern
data = xo.memtable({"text": ["Great product", "Poor quality"]})
result = data.pipe(sentiment_udxf).execute()

Why process isolation matters

Regular UDFs run in the same process as your query engine. This creates significant problems for workloads that involve heavy computation, external service calls, or unreliable operations.

Memory-intensive UDFs compete directly with the query engine for memory resources when they run in the same process. Loading a 5GB machine learning model into memory for inference means the query engine loses 5GB of working memory for query processing. When the UDF crashes due to memory exhaustion or an unhandled exception, it takes down the entire query engine process. This happens because they share the same process space, so failure in one component affects all components.

External service calls block execution when running in the query engine process. API calls that take seconds to complete stall the entire pipeline. While waiting for an HTTP response from OpenAI or another external service, the query engine sits idle instead of processing other available data. Processing one million rows with one-second API calls per row means 277 hours of sequential execution time.

Parallelism remains limited with regular UDFs because they run sequentially in a single process. Distributing processing across multiple machines or GPUs requires complex manual orchestration since the UDF is tightly coupled to the query engine process.

UDXFs solve these challenges by running in separate processes that communicate via Arrow Flight protocol. The query engine sends data to the UDXF process through efficient Arrow-based communication. The UDXF process performs the computation independently and sends results back to the query engine. When the UDXF crashes due to external API failures or memory issues, the query engine survives and can retry or handle the error gracefully.

How UDXFs work

UDXFs operate through five stages that manage the distributed processing lifecycle.

First, you write a Python function that takes a pandas DataFrame and returns a transformed DataFrame containing your custom logic. Second, you wrap the function with flight_udxf and specify input and output schemas, which creates a Flight server configuration. Third, when you execute the expression, Xorq starts a Flight server in a separate process that hosts your processing function. Fourth, Xorq sends data to the server via Arrow Flight protocol. The server processes it using your function and sends results back using Arrow’s zero-copy transfer mechanism. Fifth, after execution completes, Xorq automatically shuts down the Flight server and cleans up the separate process. The UDXF execution sequence looks like this:

sequenceDiagram
    participant Query as Query Engine
    participant Flight as Flight Server
    participant Process as UDXF Process
    
    Query->>Flight: Start server
    Flight->>Process: Spawn process
    Query->>Flight: Send data (Arrow)
    Flight->>Process: Process DataFrame
    Process->>Process: Run custom logic
    Process->>Flight: Return results
    Flight->>Query: Send results (Arrow)
    Query->>Flight: Shutdown
    Flight->>Process: Terminate

Arrow Flight uses zero-copy data transfer that moves data between processes without serialization overhead. This makes distributed processing performance nearly comparable to in-process execution for large datasets. The architecture diagram shows the data flow:

graph TB
    A[Query Engine] --> B[Start Flight Server]
    B --> C[Separate Process]
    A --> D[Send Arrow Data]
    D --> C
    C --> E[Process with Custom Logic]
    E --> F[Return Arrow Results]
    F --> A
    A --> G[Shutdown Server]
    G --> C

Tip

UDXFs support microservice architectures for data processing. You deploy a UDXF as a standalone service that multiple queries can call concurrently, supporting model serving and API integration patterns.

UDXFs versus UDFs

Aspect UDF UDXF
Execution In-process Separate process
Communication Direct function call Arrow Flight protocol
Isolation None Full process isolation
Crash impact Crashes query engine Query engine survives
Remote execution Not supported Supported
Parallelism Limited Scalable across machines
Overhead Minimal Flight communication overhead
Use case Simple transformations Heavy computation, API calls

Example: LLM sentiment analysis

With UDF

A UDF blocks the query engine while waiting for API responses, creating performance bottlenecks.

def sentiment_udf(df):
    # Blocks query engine for seconds per API call
    df['sentiment'] = df['text'].apply(call_openai_api)
    return df

With UDXF

A UDXF runs in a separate process and doesn’t block the query engine during API calls.

def sentiment_udxf_fn(df):
    # Runs in separate process without blocking
    df['sentiment'] = batch_call_openai_api(df['text'])
    return df

# Create UDXF (curried, use with pipe)
sentiment_udxf = flight_udxf(
    process_df=sentiment_udxf_fn,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema
)

# Apply to data
result = reviews.pipe(sentiment_udxf).execute()

Creating UDXFs

UDXFs require four components that work together to define the distributed processing behavior.

Processing function

Define a function that transforms pandas DataFrames with your custom logic.

def process_data(df):
    # Your custom logic
    df['new_column'] = df['old_column'] * 2
    return df

Input schema

Define the required columns and their data types for input validation.

input_schema = xo.schema({
    "old_column": dt.float64,
    "id": dt.int64
})

Output schema

Define the result columns and their data types that your function will return.

output_schema = xo.schema({
    "old_column": dt.float64,
    "id": dt.int64,
    "new_column": dt.float64
})

UDXF creation

Combine the function and schemas into a UDXF configuration that Xorq will execute.

# Create UDXF (curried, use with pipe)
my_udxf = flight_udxf(
    process_df=process_data,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema,
    name="my_transformation"
)

# Apply to data
result = input_data.pipe(my_udxf).execute()

UDXF use cases

UDXFs excel in four scenarios with process isolation and distributed execution advantages.

External API calls

Call external services without blocking the query engine and protect against failures.

def geocoding_udxf(df):
    # Calls geocoding API for each address
    df['latitude'], df['longitude'] = geocode_addresses(df['address'])
    return df

# Create UDXF
geocode_udxf = flight_udxf(
    process_df=geocoding_udxf,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema
)

# Apply to addresses
result = addresses.pipe(geocode_udxf).execute()

The query engine remains responsive while API calls happen in the background. Failed API calls in the UDXF process don’t crash the query engine. Multiple UDXF processes can run API calls in parallel across different machines or cores.

Large ML models

Load and run memory-intensive models safely with dedicated process resources.

def model_inference_udxf(df):
    # Load 5GB model isolated from query engine
    model = load_large_model()
    df['prediction'] = model.predict(df[feature_columns])
    return df

# Create UDXF
inference_udxf = flight_udxf(
    process_df=model_inference_udxf,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema
)

# Apply to features
result = features.pipe(inference_udxf).execute()

Process isolation means the large model’s memory usage doesn’t interfere with query processing. Model crashes don’t affect the query engine. You can dedicate GPU resources to the UDXF process.

Batch LLM processing

Process text with LLMs at scale using batched API calls for efficiency.

def sentiment_analysis_udxf(df):
    # Batch calls to OpenAI API
    df['sentiment'] = batch_call_openai(df['review_text'])
    return df

# Create UDXF
sentiment_udxf = flight_udxf(
    process_df=sentiment_analysis_udxf,
    maybe_schema_in=input_schema,
    maybe_schema_out=output_schema
)

# Apply to reviews
result = reviews.pipe(sentiment_udxf).execute()

Batching API calls in the UDXF process improves throughput. Retries happen without affecting the query engine. Results can be cached within the UDXF process.

Microservice deployment

Deploy processing as standalone services that multiple applications can access independently.

# Build expression containing UDXF
xorq build pipeline.py -e pipeline

# Serve the built expression as a Flight server
xorq serve-unbound pipeline --port 8815

This pattern enables sharing processing logic across multiple applications. It provides independent scaling of processing and query workloads. You can deploy UDXFs on specialized hardware like GPU machines.

When to use UDXFs

Use UDXFs when: You call external APIs or LLMs with unpredictable latency, run heavy ML models that need memory or GPU, run unreliable or untrusted code, or build microservices.

Use regular UDFs when: Transforms are simple and fast with no external dependencies, you need arithmetic or string operations, or operations are tightly coupled to the query engine.

Trade-offs

Benefits: Process isolation, remote execution, resource management, parallel execution, fault tolerance, microservices pattern.

Costs: Communication overhead, complexity from server startup and shutdown and cross-process errors, debugging that may require distributed tracing, server management, and explicit schemas.

Learning more

UDXFs extend basic UDF concepts with backend-specific data exchange capabilities as explained in User-defined functions. The Apache Arrow Flight protocol supports UDXF communication as covered in How Xorq works.

UDXF deployment patterns are covered in Serving expressions as endpoints.