Back to Blog
March 30, 2026
Daft UDF Patterns: Four Patterns, One Notebook

Daft UDF Patterns: Four Patterns, One Notebook

Row-wise, generator, async, and stateful UDFs — one notebook, one dataset, runnable side by side.

by Daft Team

Daft has four UDF patterns, each designed for a different kind of workload. This notebook puts all four in one place — row-wise transforms, generators, async I/O, and stateful classes — with a single dataset so they're easy to compare and run side by side.

This is Week 5 of the UDF series. Previous posts: stateless UDFs with @daft.func, stateful UDFs with @daft.cls, GPU inference with @daft.cls.

Each section walks through a problem, shows the pattern that solves it, and includes runnable code. The full notebook is available in the Daft examples.


The UDF series so far


Section 1: Row-wise -- clean every row with zero setup

Problem: You need custom logic on every row -- normalize an email, parse a phone number, validate a field -- and the built-in expressions don't cover it.

Pattern: @daft.func (docs) with type hints. One row in, one row out.

import daft
 
@daft.func
def normalize_email(raw: str) -> str:
    local, domain = raw.strip().lower().split("@")
    local = local.split("+")[0]
    return f"{local}@{domain}"
 
df = daft.from_pydict({"email": [
    "Alice+work@Gmail.COM",
    "  bob@example.org  ",
    "CAROL+spam@yahoo.com",
]})
df = df.select(normalize_email(df["email"]).alias("clean"))
df.show()

When to use it: Your function takes a single row and returns a single value. The logic is pure Python -- no external services, no model weights, no variable-length output. This is the workhorse pattern for data cleaning.


Section 2: Generator -- one document becomes many chunks

Problem: Each input row needs to produce a variable number of output rows. Splitting a PDF into pages, tokenizing text into sentences, expanding a nested record into child rows. You don't know ahead of time how many outputs each input will produce.

Pattern: @daft.func returning Iterator[T]. Yield as many rows as the data demands.

from typing import Iterator
import daft
 
@daft.func
def split_into_chunks(text: str, max_len: int = 200) -> Iterator[str]:
    words = text.split()
    chunk = []
    length = 0
    for word in words:
        if length + len(word) + 1 > max_len and chunk:
            yield " ".join(chunk)
            chunk = []
            length = 0
        chunk.append(word)
        length += len(word) + 1
    if chunk:
        yield " ".join(chunk)
 
df = daft.from_pydict({"document": [
    "Daft runs locally and distributed with zero code changes. " * 10,
    "UDFs bridge the gap between built-in expressions and your custom logic. " * 5,
]})
df = df.select(split_into_chunks(df["document"]).alias("chunk"))
df.show()

When to use it: One-to-many transformations. Document chunking for RAG pipelines, audio segmentation, tokenization -- anywhere yield is the natural way to express "this input produces N outputs." No intermediate lists, no explode(), no memory spike from materializing everything at once.


Section 3: Async -- hit APIs without waiting in line

Problem: Your function calls an external service -- a translation API, a geocoder, a model endpoint. Sequential execution means each row waits for the previous one to finish. At 10,000 rows, that's hours of idle waiting.

Pattern: async def with @daft.func. Daft overlaps the I/O automatically.

import daft
 
@daft.func
async def fetch_status(url: str) -> int:
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return resp.status
 
df = daft.from_pydict({"endpoint": [
    "https://httpbin.org/status/200",
    "https://httpbin.org/status/404",
    "https://httpbin.org/status/500",
]})
df = df.select(
    df["endpoint"],
    fetch_status(df["endpoint"]).alias("status"),
)
df.show()

When to use it: Any I/O-bound workload -- API calls, database lookups, webhook triggers, model endpoints behind a REST API. You write async def and await exactly as you would outside Daft. The concurrency is automatic; no thread pools, no executor boilerplate.


Section 4: Stateful -- load a model once, infer on every row

Problem: Loading a 2 GB model for every partition is killing your pipeline. Seven minutes of cold-start time, repeated across every worker, before a single row gets processed.

Pattern: @daft.cls (docs) with __init__ for setup and __call__ for inference. The model loads once per worker and stays in memory for every row that worker handles.

import daft
 
@daft.cls
class SentimentClassifier:
    def __init__(self):
        from transformers import pipeline
        self.pipe = pipeline(
            "sentiment-analysis",
            model="distilbert-base-uncased-finetuned-sst-2-english",
        )
 
    def __call__(self, text: str) -> str:
        return self.pipe(text)[0]["label"]
 
classifier = SentimentClassifier()
 
df = daft.from_pydict({"review": [
    "This product is amazing",
    "Worst purchase I've ever made",
    "It's okay, nothing special",
]})
df = df.select(classifier(df["review"]).alias("sentiment"))
df.show()

When to use it: Any workload with expensive initialization -- model loading, database connection pools, API clients with authentication. __init__ runs once per worker; the method runs on every row. You amortize the cold-start cost across all the rows that worker processes. Add gpus=1 to the decorator when you need GPU allocation, max_concurrency to cap parallel instances, use_process=True to escape the GIL.


Pick your pattern

I need to...PatternDecorator
Transform each row with custom logicRow-wise@daft.func
Produce multiple rows from one inputGenerator@daft.func + Iterator[T]
Call external services concurrentlyAsync@daft.func + async def
Reuse expensive resources across rowsStateful@daft.cls + __init__

Every pattern works locally on your laptop. Set daft.set_runner_ray("ray://cluster:10001") and the same code runs distributed across a cluster. No rewrites.


Try it

The full notebook lives in the Daft docs: daft-udf-patterns.ipynb. Open it, run every cell. Fifteen minutes to hands-on experience with all four UDF patterns.

Your Python. Daft's scale.

Suggested Posts