pyflink.dataframe.dataframe.DataFrame.map_batches#
- DataFrame.map_batches(func, *, return_dtype=None, batch_format: str, concurrency: int | None = None, batch_size: int | None = None, num_gpus: float | None = None, gpu_type: str | None = None) DataFrame[source]#
Apply a function to batches of rows, producing a new DataFrame.
The function receives a dict[str, pd.Series] in pandas mode, or a dict[str, pyarrow.Array] in arrow mode, and returns the same kind of object. This is an N-to-N batch transformation; each returned batch must have the same row count as the input batch.
In pandas mode, return_dtype can be omitted if the function has a TypedDict return type hint. In arrow mode, return_dtype must describe the returned Arrow columns.
- Parameters:
func – A function (dict[str, pd.Series] -> dict[str, pd.Series]) in pandas mode, a function (dict[str, pyarrow.Array] -> dict[str, pyarrow.Array]) in arrow mode, or a DataFrameUDFWrapper.
return_dtype – Output schema as DataType.struct(…). In pandas mode, can be omitted if func has a TypedDict return hint.
batch_format – Batch transport format, either “pandas” or “arrow”.
concurrency – Optional concurrency (parallelism) for the UDF operator. If specified, the operator running this UDF will use this parallelism. UDFs with different concurrency values will be split into separate operators.
batch_size – Optional maximum number of elements per batch. If not specified, the global configuration ‘python.fn-execution.arrow.batch.size’ is used (default 1000). When multiple UDFs are fused, the maximum batch_size among them is used.
num_gpus – Optional number of GPUs requested for this UDF (e.g., 0.5, 1.0). Each GPU UDF will run in its own operator.
gpu_type – GPU type (e.g., ‘A10’, ‘V100’).
- Returns:
A new DataFrame with the transformed rows.
Example:
>>> # pandas mode >>> def process(batch): ... return {"a": batch["a"] + 1} >>> df.map_batches(process, batch_format="pandas", ... return_dtype=DataType.struct({"a": DataType.int64()})) >>> >>> # arrow mode >>> def process_arrow(batch): ... return {"a": batch["a"]} >>> df.map_batches(process_arrow, batch_format="arrow", ... return_dtype=DataType.struct({"a": DataType.int64()})) >>> >>> # With TypedDict return hint (auto-inferred) >>> class Output(TypedDict): ... a: int >>> def process(batch) -> Output: ... return {"a": batch["a"] + 1} >>> df.map_batches(process, batch_format="pandas") >>> >>> # With concurrency >>> df.map_batches(process, batch_format="pandas", concurrency=4) >>> >>> # With batch_size >>> df.map_batches(process, batch_format="pandas", batch_size=500) >>> >>> # With GPU resources >>> df.map_batches(process, batch_format="pandas", num_gpus=0.5, gpu_type='A10')