Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
    • TableEnvironment
    • Table
    • Data Types
    • Window
    • Expressions
    • User Defined Functions
    • Descriptors
    • StatementSet
    • Catalog
  • PyFlink DataStream
  • PyFlink DataFrame
  • PyFlink Common
  • API Reference
  • PyFlink Table
  • User Defined Functions
  • pyflink.table.udf.udtaf

pyflink.table.udf.udtaf#

udtaf(f: Callable | TableAggregateFunction | Type, input_types: List[DataType] | DataType | str | List[str] | None = None, result_type: DataType | str | None = None, accumulator_type: DataType | str | None = None, deterministic: bool | None = None, name: str | None = None, func_type: str = 'general') → UserDefinedAggregateFunctionWrapper[source]#
udtaf(f: None = None, input_types: List[DataType] | DataType | str | List[str] | None = None, result_type: DataType | str | None = None, accumulator_type: DataType | str | None = None, deterministic: bool | None = None, name: str | None = None, func_type: str = 'general') → Callable[[Callable | TableAggregateFunction | Type], UserDefinedAggregateFunctionWrapper]

Helper method for creating a user-defined table aggregate function.

Example:

>>> # The input_types is optional.
>>> class Top2(TableAggregateFunction):
...     def emit_value(self, accumulator):
...         yield Row(accumulator[0])
...         yield Row(accumulator[1])
...
...     def create_accumulator(self):
...         return [None, None]
...
...     def accumulate(self, accumulator, *args):
...         if args[0] is not None:
...             if accumulator[0] is None or args[0] > accumulator[0]:
...                 accumulator[1] = accumulator[0]
...                 accumulator[0] = args[0]
...             elif accumulator[1] is None or args[0] > accumulator[1]:
...                 accumulator[1] = args[0]
...
...     def retract(self, accumulator, *args):
...         accumulator[0] = accumulator[0] - 1
...
...     def merge(self, accumulator, accumulators):
...         for other_acc in accumulators:
...             self.accumulate(accumulator, other_acc[0])
...             self.accumulate(accumulator, other_acc[1])
...
...     def get_accumulator_type(self):
...         return 'ARRAY<BIGINT>'
...
...     def get_result_type(self):
...         return 'ROW<a BIGINT>'
>>> top2 = udtaf(Top2())
Parameters:
  • f – user-defined table aggregate function.

  • input_types – optional, the input data types.

  • result_type – the result data type.

  • accumulator_type – optional, the accumulator data type.

  • deterministic – the determinism of the function’s results. True if and only if a call to this function is guaranteed to always return the same result given the same parameters. (default True)

  • name – the function name.

  • func_type – the type of the python function, available value: general (default: general)

Returns:

UserDefinedAggregateFunctionWrapper or function.

Added in version 1.13.0.

previous

pyflink.table.udf.TableAggregateFunction

next

pyflink.table.data_view.ListView

On this page
  • udtaf()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.