Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • User Defined Functions
  • pyflink.dataframe.udf.udf

pyflink.dataframe.udf.udf#

udf(func: Callable | ScalarFunction | AsyncScalarFunction | Type, *, return_dtype: DataType | type | str | None = None, deterministic: bool = True, name: str | None = None, func_type: str | None = None, concurrency: int | None = None, batch_size: int | None = None, num_gpus: float | None = None, gpu_type: str | None = None) → DataFrameUDFWrapper[source]#
udf(func: None = None, *, return_dtype: DataType | type | str | None = None, deterministic: bool = True, name: str | None = None, func_type: str | None = None, concurrency: int | None = None, batch_size: int | None = None, num_gpus: float | None = None, gpu_type: str | None = None) → Callable[[Callable | ScalarFunction | AsyncScalarFunction | Type], DataFrameUDFWrapper]

Create a user-defined function for DataFrame operations.

This decorator supports: - Plain Python functions and lambdas - Async functions defined with async def (executed asynchronously) - ScalarFunction subclasses (class type or instance), using eval method - AsyncScalarFunction subclasses (class type or instance), using async eval method - Plain callable classes with __call__ method (class type or instance) - Automatic type inference from Python type hints

Parameters:
  • func – The Python function, ScalarFunction instance/subclass, or callable class instance/type to wrap as a UDF.

  • return_dtype – Optional return type. Can be: - DataType instance (e.g., DataType.int64()) - Python type (e.g., int, str, float) - String (e.g., ‘INT’, ‘BIGINT’) If not specified, inferred from function’s return type hint (eval for ScalarFunction, __call__ for plain classes).

  • deterministic – Whether the function is deterministic (default: True).

  • name – Optional name for the UDF.

  • func_type – Optional execution format. Supported values are "general", "pandas", and "arrow". If omitted, pandas and arrow formats are detected from pandas/pyarrow type hints when possible, otherwise "general" is used.

  • concurrency – Optional concurrency (parallelism) for the UDF operator. If specified, the operator running this UDF will use this parallelism. UDFs with different concurrency values will be split into separate operators.

  • batch_size – Optional maximum number of elements per batch. Only applies to batch-wise UDFs. If not specified, the global configuration ‘python.fn-execution.arrow.batch.size’ is used (default 1000). When multiple UDFs are fused, the maximum batch_size among them is used.

  • num_gpus – Optional number of GPUs requested for this UDF (e.g., 0.5, 1.0). Each GPU UDF will run in its own operator and will not be fused with other UDFs (including other GPU UDFs).

  • gpu_type – Optional GPU type (e.g., ‘A10’, ‘V100’).

Returns:

A DataFrameUDFWrapper that can be called with Expressions.

Example::
>>> from pyflink.dataframe import udf, DataType, col
>>> from pyflink.table.udf import ScalarFunction
>>>
>>> # 1. Plain function with explicit return_dtype
>>> @udf(return_dtype=DataType.string())
... def to_string(x):
...     return str(x)
>>>
>>> df.with_columns(s=to_string(col("a")))
>>>
>>> # 2. Plain function with type hint inference
>>> @udf
... def add(x: int, y: int) -> int:
...     return x + y
>>>
>>> df.with_columns(sum=add(col("a"), col("b")))
>>>
>>> # 3. ScalarFunction with explicit return_dtype (instance or class)
>>> class AddOne(ScalarFunction):
...     def eval(self, x):
...         return x + 1
>>>
>>> add_one = udf(AddOne(), return_dtype=DataType.int64())
>>> add_one = udf(AddOne, return_dtype=DataType.int64())  # auto-instantiated
>>> df.with_columns(b=add_one(col("a")))
>>>
>>> # 4. ScalarFunction instance with type hint inference
>>> class Double(ScalarFunction):
...     def eval(self, x: int) -> int:
...         return x * 2
>>>
>>> double = udf(Double())
>>> df.with_columns(doubled=double(col("a")))
>>>
>>> # 5. @udf decorator on ScalarFunction class (with return_dtype or type hints)
>>> @udf(return_dtype=DataType.int64())
... class AddTwo(ScalarFunction):
...     def eval(self, x):
...         return x + 2
>>>
>>> df.with_columns(b=AddTwo(col("a")))
>>>
>>> @udf
... class Triple(ScalarFunction):
...     def eval(self, x: int) -> int:
...         return x * 3
>>>
>>> df.with_columns(tripled=Triple(col("a")))
>>>
>>> # 6. Plain callable class instance
>>> class MultiplyBy:
...     def __init__(self, factor):
...         self._factor = factor
...
...     def __call__(self, x: int) -> int:
...         return x * self._factor
>>>
>>> times_three = udf(MultiplyBy(3))
>>> df.with_columns(result=times_three(col("a")))
>>>
>>> # 7. @udf decorator on plain callable class
>>> @udf(return_dtype=DataType.int64())
... class AddOne:
...     def __call__(self, x):
...         return x + 1
>>>
>>> df.with_columns(b=AddOne(col("a")))
>>>
>>> @udf
... class DoubleIt:
...     def __call__(self, x: int) -> int:
...         return x * 2
>>>
>>> df.with_columns(doubled=DoubleIt(col("a")))
>>>
>>> # 8. Async function (executed asynchronously, e.g. for I/O-bound work)
>>> import asyncio
>>> @udf
... async def async_lookup(key: str) -> str:
...     await asyncio.sleep(0.01)
...     return f"value_for_{key}"
>>>
>>> df.with_columns(v=async_lookup(col("a")))
>>>
>>> # 9. AsyncScalarFunction subclass
>>> from pyflink.table.udf import AsyncScalarFunction
>>> class AsyncLookup(AsyncScalarFunction):
...     async def eval(self, key: str) -> str:
...         await asyncio.sleep(0.01)
...         return f"value_for_{key}"
>>>
>>> async_lookup = udf(AsyncLookup())
>>> df.with_columns(v=async_lookup(col("a")))
>>>
>>> # 10. Pandas scalar function (executed on pandas.Series batches)
>>> import pandas as pd
>>> @udf(return_dtype=DataType.int64(), func_type="pandas")
... def add_one_pandas(x: pd.Series) -> pd.Series:
...     return x + 1
>>>
>>> df.with_columns(v=add_one_pandas(col("a")))
>>>
>>> # 11. Pandas async scalar function (executed on pandas.Series batches)
>>> import pandas as pd
>>> @udf(return_dtype=DataType.string(), batch_size=128)
... async def async_batch_lookup(keys: pd.Series) -> pd.Series:
...     await asyncio.sleep(0.01)
...     return keys.map(lambda key: f"value_for_{key}")
>>>
>>> df.with_columns(v=async_batch_lookup(col("a")))
>>>
>>> # 12. Arrow scalar function (executed on pyarrow.Array batches)
>>> import pyarrow as pa
>>> @udf(return_dtype=DataType.int64(), func_type="arrow")
... def add_one_arrow(x):
...     return pa.array([v + 1 for v in x.to_pylist()])
>>>
>>> df.with_columns(v=add_one_arrow(col("a")))
>>>
>>> # 13. Arrow async scalar function (executed on pyarrow.Array batches)
>>> import pyarrow as pa
>>> @udf(return_dtype=DataType.string(), func_type="arrow", batch_size=128)
... async def async_arrow_lookup(keys):
...     await asyncio.sleep(0.01)
...     return pa.array([f"value_for_{k}" for k in keys.to_pylist()])
>>>
>>> df.with_columns(v=async_arrow_lookup(col("a")))

previous

User Defined Functions

next

Configuration

On this page
  • udf()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.