sequenceDiagram
participant Query as Query Engine
participant Flight as Flight Server
participant Process as UDXF Process
Query->>Flight: Start server
Flight->>Process: Spawn process
Query->>Flight: Send data (Arrow)
Flight->>Process: Process DataFrame
Process->>Process: Run custom logic
Process->>Flight: Return results
Flight->>Query: Send results (Arrow)
Query->>Flight: Shutdown
Flight->>Process: Terminate
User-defined exchange functions
Calling OpenAI’s API to analyze one million customer reviews using regular UDFs means blocking the query engine for seconds per API call. A single API timeout or network failure crashes your entire pipeline because the UDF and query engine share the same process. UDXFs solve this challenge by running your Python logic in separate processes that communicate via Apache Arrow Flight. This provides process isolation that protects the query engine from failures and enables safe external service calls.
What are user-defined exchange functions?
User-defined exchange functions are distributed UDFs that execute in separate processes or remote services rather than running in the same process as your query engine. Unlike regular UDFs that run through direct function calls, UDXFs communicate over Apache Arrow Flight protocol to exchange data between processes.
UDXFs provide three critical capabilities that regular UDFs cannot provide. First, process isolation lets you run untrusted code safely without risking the query engine. Second, resource management lets you dedicate specific CPU and memory allocations to particular operations. Third, remote execution enables calling external services or deploying processing logic as microservices.
import xorq.api as xo
from xorq.expr.relations import flight_udxf
import xorq.expr.datatypes as dt
# Define processing function
def sentiment_analysis(df):
# Calls external API or runs heavy model
df['sentiment'] = analyze_sentiment(df['text'])
return df
# Define schemas
input_schema = xo.schema({"text": dt.string})
output_schema = xo.schema({"text": dt.string, "sentiment": dt.string})
# Create UDXF (curried, can be used with pipe)
sentiment_udxf = flight_udxf(
process_df=sentiment_analysis,
maybe_schema_in=input_schema,
maybe_schema_out=output_schema,
name="sentiment_analysis"
)
# Apply to data using pipe pattern
data = xo.memtable({"text": ["Great product", "Poor quality"]})
result = data.pipe(sentiment_udxf).execute()Why process isolation matters
Regular UDFs run in the same process as your query engine. This creates significant problems for workloads that involve heavy computation, external service calls, or unreliable operations.
Memory-intensive UDFs compete directly with the query engine for memory resources when they run in the same process. Loading a 5GB machine learning model into memory for inference means the query engine loses 5GB of working memory for query processing. When the UDF crashes due to memory exhaustion or an unhandled exception, it takes down the entire query engine process. This happens because they share the same process space, so failure in one component affects all components.
External service calls block execution when running in the query engine process. API calls that take seconds to complete stall the entire pipeline. While waiting for an HTTP response from OpenAI or another external service, the query engine sits idle instead of processing other available data. Processing one million rows with one-second API calls per row means 277 hours of sequential execution time.
Parallelism remains limited with regular UDFs because they run sequentially in a single process. Distributing processing across multiple machines or GPUs requires complex manual orchestration since the UDF is tightly coupled to the query engine process.
UDXFs solve these challenges by running in separate processes that communicate via Arrow Flight protocol. The query engine sends data to the UDXF process through efficient Arrow-based communication. The UDXF process performs the computation independently and sends results back to the query engine. When the UDXF crashes due to external API failures or memory issues, the query engine survives and can retry or handle the error gracefully.
How UDXFs work
UDXFs operate through five stages that manage the distributed processing lifecycle.
First, you write a Python function that takes a pandas DataFrame and returns a transformed DataFrame containing your custom logic. Second, you wrap the function with flight_udxf and specify input and output schemas, which creates a Flight server configuration. Third, when you execute the expression, Xorq starts a Flight server in a separate process that hosts your processing function. Fourth, Xorq sends data to the server via Arrow Flight protocol. The server processes it using your function and sends results back using Arrow’s zero-copy transfer mechanism. Fifth, after execution completes, Xorq automatically shuts down the Flight server and cleans up the separate process. The UDXF execution sequence looks like this:
Arrow Flight uses zero-copy data transfer that moves data between processes without serialization overhead. This makes distributed processing performance nearly comparable to in-process execution for large datasets. The architecture diagram shows the data flow:
graph TB
A[Query Engine] --> B[Start Flight Server]
B --> C[Separate Process]
A --> D[Send Arrow Data]
D --> C
C --> E[Process with Custom Logic]
E --> F[Return Arrow Results]
F --> A
A --> G[Shutdown Server]
G --> C
UDXFs support microservice architectures for data processing. You deploy a UDXF as a standalone service that multiple queries can call concurrently, supporting model serving and API integration patterns.
UDXFs versus UDFs
| Aspect | UDF | UDXF |
|---|---|---|
| Execution | In-process | Separate process |
| Communication | Direct function call | Arrow Flight protocol |
| Isolation | None | Full process isolation |
| Crash impact | Crashes query engine | Query engine survives |
| Remote execution | Not supported | Supported |
| Parallelism | Limited | Scalable across machines |
| Overhead | Minimal | Flight communication overhead |
| Use case | Simple transformations | Heavy computation, API calls |
Example: LLM sentiment analysis
With UDF
A UDF blocks the query engine while waiting for API responses, creating performance bottlenecks.
def sentiment_udf(df):
# Blocks query engine for seconds per API call
df['sentiment'] = df['text'].apply(call_openai_api)
return dfWith UDXF
A UDXF runs in a separate process and doesn’t block the query engine during API calls.
def sentiment_udxf_fn(df):
# Runs in separate process without blocking
df['sentiment'] = batch_call_openai_api(df['text'])
return df
# Create UDXF (curried, use with pipe)
sentiment_udxf = flight_udxf(
process_df=sentiment_udxf_fn,
maybe_schema_in=input_schema,
maybe_schema_out=output_schema
)
# Apply to data
result = reviews.pipe(sentiment_udxf).execute()Creating UDXFs
UDXFs require four components that work together to define the distributed processing behavior.
Processing function
Define a function that transforms pandas DataFrames with your custom logic.
def process_data(df):
# Your custom logic
df['new_column'] = df['old_column'] * 2
return dfInput schema
Define the required columns and their data types for input validation.
input_schema = xo.schema({
"old_column": dt.float64,
"id": dt.int64
})Output schema
Define the result columns and their data types that your function will return.
output_schema = xo.schema({
"old_column": dt.float64,
"id": dt.int64,
"new_column": dt.float64
})UDXF creation
Combine the function and schemas into a UDXF configuration that Xorq will execute.
# Create UDXF (curried, use with pipe)
my_udxf = flight_udxf(
process_df=process_data,
maybe_schema_in=input_schema,
maybe_schema_out=output_schema,
name="my_transformation"
)
# Apply to data
result = input_data.pipe(my_udxf).execute()UDXF use cases
UDXFs excel in four scenarios with process isolation and distributed execution advantages.
External API calls
Call external services without blocking the query engine and protect against failures.
def geocoding_udxf(df):
# Calls geocoding API for each address
df['latitude'], df['longitude'] = geocode_addresses(df['address'])
return df
# Create UDXF
geocode_udxf = flight_udxf(
process_df=geocoding_udxf,
maybe_schema_in=input_schema,
maybe_schema_out=output_schema
)
# Apply to addresses
result = addresses.pipe(geocode_udxf).execute()The query engine remains responsive while API calls happen in the background. Failed API calls in the UDXF process don’t crash the query engine. Multiple UDXF processes can run API calls in parallel across different machines or cores.
Large ML models
Load and run memory-intensive models safely with dedicated process resources.
def model_inference_udxf(df):
# Load 5GB model isolated from query engine
model = load_large_model()
df['prediction'] = model.predict(df[feature_columns])
return df
# Create UDXF
inference_udxf = flight_udxf(
process_df=model_inference_udxf,
maybe_schema_in=input_schema,
maybe_schema_out=output_schema
)
# Apply to features
result = features.pipe(inference_udxf).execute()Process isolation means the large model’s memory usage doesn’t interfere with query processing. Model crashes don’t affect the query engine. You can dedicate GPU resources to the UDXF process.
Batch LLM processing
Process text with LLMs at scale using batched API calls for efficiency.
def sentiment_analysis_udxf(df):
# Batch calls to OpenAI API
df['sentiment'] = batch_call_openai(df['review_text'])
return df
# Create UDXF
sentiment_udxf = flight_udxf(
process_df=sentiment_analysis_udxf,
maybe_schema_in=input_schema,
maybe_schema_out=output_schema
)
# Apply to reviews
result = reviews.pipe(sentiment_udxf).execute()Batching API calls in the UDXF process improves throughput. Retries happen without affecting the query engine. Results can be cached within the UDXF process.
Microservice deployment
Deploy processing as standalone services that multiple applications can access independently.
# Build expression containing UDXF
xorq build pipeline.py -e pipeline
# Serve the built expression as a Flight server
xorq serve-unbound pipeline --port 8815This pattern enables sharing processing logic across multiple applications. It provides independent scaling of processing and query workloads. You can deploy UDXFs on specialized hardware like GPU machines.
When to use UDXFs
Use UDXFs when: You call external APIs or LLMs with unpredictable latency, run heavy ML models that need memory or GPU, run unreliable or untrusted code, or build microservices.
Use regular UDFs when: Transforms are simple and fast with no external dependencies, you need arithmetic or string operations, or operations are tightly coupled to the query engine.
Trade-offs
Benefits: Process isolation, remote execution, resource management, parallel execution, fault tolerance, microservices pattern.
Costs: Communication overhead, complexity from server startup and shutdown and cross-process errors, debugging that may require distributed tracing, server management, and explicit schemas.
Learning more
UDXFs extend basic UDF concepts with backend-specific data exchange capabilities as explained in User-defined functions. The Apache Arrow Flight protocol supports UDXF communication as covered in How Xorq works.
UDXF deployment patterns are covered in Serving expressions as endpoints.