Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • DataFrame
  • pyflink.dataframe.dataframe.DataFrame.map_batches

pyflink.dataframe.dataframe.DataFrame.map_batches#

DataFrame.map_batches(func, *, return_dtype=None, batch_format: str, concurrency: int | None = None, batch_size: int | None = None, num_gpus: float | None = None, gpu_type: str | None = None) → DataFrame[source]#

Apply a function to batches of rows, producing a new DataFrame.

The function receives a dict[str, pd.Series] in pandas mode, or a dict[str, pyarrow.Array] in arrow mode, and returns the same kind of object. This is an N-to-N batch transformation; each returned batch must have the same row count as the input batch.

In pandas mode, return_dtype can be omitted if the function has a TypedDict return type hint. In arrow mode, return_dtype must describe the returned Arrow columns.

Parameters:
  • func – A function (dict[str, pd.Series] -> dict[str, pd.Series]) in pandas mode, a function (dict[str, pyarrow.Array] -> dict[str, pyarrow.Array]) in arrow mode, or a DataFrameUDFWrapper.

  • return_dtype – Output schema as DataType.struct(…). In pandas mode, can be omitted if func has a TypedDict return hint.

  • batch_format – Batch transport format, either “pandas” or “arrow”.

  • concurrency – Optional concurrency (parallelism) for the UDF operator. If specified, the operator running this UDF will use this parallelism. UDFs with different concurrency values will be split into separate operators.

  • batch_size – Optional maximum number of elements per batch. If not specified, the global configuration ‘python.fn-execution.arrow.batch.size’ is used (default 1000). When multiple UDFs are fused, the maximum batch_size among them is used.

  • num_gpus – Optional number of GPUs requested for this UDF (e.g., 0.5, 1.0). Each GPU UDF will run in its own operator.

  • gpu_type – GPU type (e.g., ‘A10’, ‘V100’).

Returns:

A new DataFrame with the transformed rows.

Example:

>>> # pandas mode
>>> def process(batch):
...     return {"a": batch["a"] + 1}
>>> df.map_batches(process, batch_format="pandas",
...                return_dtype=DataType.struct({"a": DataType.int64()}))
>>>
>>> # arrow mode
>>> def process_arrow(batch):
...     return {"a": batch["a"]}
>>> df.map_batches(process_arrow, batch_format="arrow",
...                return_dtype=DataType.struct({"a": DataType.int64()}))
>>>
>>> # With TypedDict return hint (auto-inferred)
>>> class Output(TypedDict):
...     a: int
>>> def process(batch) -> Output:
...     return {"a": batch["a"] + 1}
>>> df.map_batches(process, batch_format="pandas")
>>>
>>> # With concurrency
>>> df.map_batches(process, batch_format="pandas", concurrency=4)
>>>
>>> # With batch_size
>>> df.map_batches(process, batch_format="pandas", batch_size=500)
>>>
>>> # With GPU resources
>>> df.map_batches(process, batch_format="pandas", num_gpus=0.5, gpu_type='A10')

previous

pyflink.dataframe.dataframe.DataFrame.map

next

pyflink.dataframe.dataframe.DataFrame.pipe

On this page
  • DataFrame.map_batches()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.