pyflink.table.udf.udf#
- udf(f: Callable | ScalarFunction | AsyncScalarFunction | Type, input_types: List[DataType] | DataType | str | List[str] | None = None, result_type: DataType | str | None = None, deterministic: bool | None = None, name: str | None = None, func_type: str = 'general', udf_type: str | None = None, concurrency: int | None = None, batch_size: int | None = None, num_gpus: float | None = None, gpu_type: str | None = None) UserDefinedScalarFunctionWrapper | UserDefinedAsyncScalarFunctionWrapper[source]#
- udf(f: None = None, input_types: List[DataType] | DataType | str | List[str] | None = None, result_type: DataType | str | None = None, deterministic: bool | None = None, name: str | None = None, func_type: str = 'general', udf_type: str | None = None, concurrency: int | None = None, batch_size: int | None = None, num_gpus: float | None = None, gpu_type: str | None = None) Callable[[Callable | ScalarFunction | AsyncScalarFunction | Type], UserDefinedScalarFunctionWrapper | UserDefinedAsyncScalarFunctionWrapper]
Helper method for creating a user-defined scalar function.
This decorator can automatically detect whether the function is async (defined with async def or is an instance of AsyncScalarFunction).
Example
>>> add_one = udf(lambda i: i + 1, DataTypes.BIGINT(), DataTypes.BIGINT()) >>> # The input_types is optional. >>> @udf(result_type=DataTypes.BIGINT()) ... def add(i, j): ... return i + j >>> # Specify result_type via string. >>> @udf(result_type='BIGINT') ... def add(i, j): ... return i + j >>> # Async function will be automatically detected >>> @udf(result_type=DataTypes.STRING()) ... async def async_lookup(key): ... await asyncio.sleep(0.1) ... return f"value_for_{key}" >>> class SubtractOne(ScalarFunction): ... def eval(self, i): ... return i - 1 >>> subtract_one = udf(SubtractOne(), DataTypes.BIGINT(), DataTypes.BIGINT()) >>> # AsyncScalarFunction will be automatically detected >>> class AsyncLookup(AsyncScalarFunction): ... async def eval(self, key): ... await asyncio.sleep(0.1) ... return f"value_for_{key}" >>> async_lookup = udf(AsyncLookup(), result_type=DataTypes.STRING())
- Parameters:
f – lambda function, user-defined function, or async function.
input_types – optional, the input data types.
result_type – the result data type.
deterministic – the determinism of the function’s results. True if and only if a call to this function is guaranteed to always return the same result given the same parameters. (default True)
name – the function name.
func_type – the type of the python function, available value: general, pandas, arrow, (default: general)
concurrency – optional, the concurrency (parallelism) for the UDF operator. If specified, the operator running this UDF will use this parallelism.
batch_size – optional, the maximum number of elements to buffer before processing them in a batch. Only valid for batch-wise UDF.
- Returns:
UserDefinedScalarFunctionWrapper, UserDefinedAsyncScalarFunctionWrapper, or function.
Added in version 1.10.0.