Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
    • TableEnvironment
    • Table
    • Data Types
    • Window
    • Expressions
    • User Defined Functions
    • Descriptors
    • StatementSet
    • Catalog
  • PyFlink DataStream
  • PyFlink DataFrame
  • PyFlink Common
  • API Reference
  • PyFlink Table
  • User Defined Functions
  • pyflink.table.udf.udf

pyflink.table.udf.udf#

udf(f: Callable | ScalarFunction | AsyncScalarFunction | Type, input_types: List[DataType] | DataType | str | List[str] | None = None, result_type: DataType | str | None = None, deterministic: bool | None = None, name: str | None = None, func_type: str = 'general', udf_type: str | None = None, concurrency: int | None = None, batch_size: int | None = None, num_gpus: float | None = None, gpu_type: str | None = None) → UserDefinedScalarFunctionWrapper | UserDefinedAsyncScalarFunctionWrapper[source]#
udf(f: None = None, input_types: List[DataType] | DataType | str | List[str] | None = None, result_type: DataType | str | None = None, deterministic: bool | None = None, name: str | None = None, func_type: str = 'general', udf_type: str | None = None, concurrency: int | None = None, batch_size: int | None = None, num_gpus: float | None = None, gpu_type: str | None = None) → Callable[[Callable | ScalarFunction | AsyncScalarFunction | Type], UserDefinedScalarFunctionWrapper | UserDefinedAsyncScalarFunctionWrapper]

Helper method for creating a user-defined scalar function.

This decorator can automatically detect whether the function is async (defined with async def or is an instance of AsyncScalarFunction).

Example

>>> add_one = udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT())

>>> # The input_types is optional.
>>> @udf(result_type=DataTypes.BIGINT())
... def add(i, j):
...     return i + j

>>> # Specify result_type via string.
>>> @udf(result_type='BIGINT')
... def add(i, j):
...     return i + j

>>> # Async function will be automatically detected
>>> @udf(result_type=DataTypes.STRING())
... async def async_lookup(key):
...     await asyncio.sleep(0.1)
...     return f"value_for_{key}"

>>> class SubtractOne(ScalarFunction):
...     def eval(self, i):
...         return i - 1
>>> subtract_one = udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT())

>>> # AsyncScalarFunction will be automatically detected
>>> class AsyncLookup(AsyncScalarFunction):
...     async def eval(self, key):
...         await asyncio.sleep(0.1)
...         return f"value_for_{key}"
>>> async_lookup = udf(AsyncLookup(), result_type=DataTypes.STRING())
Parameters:
  • f – lambda function, user-defined function, or async function.

  • input_types – optional, the input data types.

  • result_type – the result data type.

  • deterministic – the determinism of the function’s results. True if and only if a call to this function is guaranteed to always return the same result given the same parameters. (default True)

  • name – the function name.

  • func_type – the type of the python function, available value: general, pandas, arrow, (default: general)

  • concurrency – optional, the concurrency (parallelism) for the UDF operator. If specified, the operator running this UDF will use this parallelism.

  • batch_size – optional, the maximum number of elements to buffer before processing them in a batch. Only valid for batch-wise UDF.

Returns:

UserDefinedScalarFunctionWrapper, UserDefinedAsyncScalarFunctionWrapper, or function.

Added in version 1.10.0.

previous

pyflink.table.udf.ScalarFunction

next

pyflink.table.udf.TableFunction

On this page
  • udf()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.