################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
Modern DataFrame API for PyFlink.
"""
from typing import (
Iterator,
Literal,
Union,
Callable,
Dict,
List,
Any,
Optional,
Tuple,
TypeVar,
TYPE_CHECKING,
overload,
)
from pyflink.table.table import Table
from pyflink.table.expression import Expression
from pyflink.table.expressions import (
col as table_col,
lit as table_lit,
and_,
call_sql,
)
from pyflink.table.types import DataType, FloatType, DoubleType
if TYPE_CHECKING:
import pandas as pd
from pyflink.table import TableSchema
T = TypeVar("T")
__all__ = ["DataFrame", "col", "lit"]
[docs]
def col(name: str) -> Expression:
"""
Create a column reference expression.
Args:
name: Column name.
Returns:
Expression referencing the column.
Example::
>>> import pyflink.dataframe as pf
>>> pf.col("a")
>>> pf.col("a") + pf.col("b")
"""
return table_col(name)
[docs]
def lit(value: Any, data_type: Optional[DataType] = None) -> Expression:
"""
Create a literal expression.
Args:
value: The literal value.
data_type: Optional data type for the literal.
Returns:
Expression representing the literal.
Example::
>>> import pyflink.dataframe as pf
>>> pf.lit(1)
>>> pf.lit("hello")
>>> pf.lit(1.5, DataTypes.DOUBLE())
"""
if data_type is None:
return table_lit(value)
return table_lit(value, data_type)
class DataFrame:
"""
A modern DataFrame API for PyFlink.
DataFrame provides a Pythonic interface for data transformations,
built on top of the PyFlink Table API. It supports fluent chaining
of operations and provides a familiar DataFrame-style API.
Example::
>>> import pyflink.dataframe as pf
>>> df = pf.from_dict({"id": [1, 2], "name": ["a", "b"]})
>>> result = df.select("id", "name") \\
... .with_column("id_doubled", pf.col("id") * 2) \\
... .filter(pf.col("id") > 0) \\
... .rename({"id": "identifier"})
"""
def __init__(self, table: Table):
"""
Initialize DataFrame with a Table object.
Args:
table: The underlying PyFlink Table.
"""
self._table = table
@property
def llm(self):
"""
Access LLM / AI functions.
Returns:
An LLMAccessor with AI methods.
Example::
>>> df.llm.predict("text", model="qwen-plus")
>>> df.llm.ai_classify("text", ["pos", "neg"], model="qwen-plus")
"""
from pyflink.dataframe.ai.llm import LLMAccessor
return LLMAccessor(self)
# ======================== Core Operations ========================
[docs]
def select(self, *columns, **projections) -> "DataFrame":
"""
Select columns from the DataFrame.
Supports multiple input formats:
- String column names
- Expression objects
- Lists/tuples of column names or expressions
- Keyword arguments for renaming columns
Args:
*columns: Columns to select. Can be strings, Expressions, or lists.
**projections: Column projections as name=expression. The name becomes
the new column name.
Returns:
A new DataFrame with selected columns.
Example::
>>> df.select("a", "b") # Select columns by name
>>> df.select(col("a"), col("b")) # Select with expressions
>>> df.select("a", c=col("a") + 1) # Mix strings and kwargs
>>> df.select(["a", "b"]) # List of column names
>>> df.select(x=col("a"), y=col("b")) # Rename with kwargs
"""
exprs = []
for col_spec in columns:
if isinstance(col_spec, str):
exprs.append(col(col_spec))
elif isinstance(col_spec, Expression):
exprs.append(col_spec)
elif isinstance(col_spec, (list, tuple)):
for c in col_spec:
if isinstance(c, str):
exprs.append(col(c))
elif isinstance(c, Expression):
exprs.append(c)
else:
raise TypeError(
f"Unsupported column type in list: {type(c)}"
)
else:
raise TypeError(f"Unsupported column type: {type(col_spec)}")
for name, expr in projections.items():
if isinstance(expr, Expression):
exprs.append(expr.alias(name))
else:
exprs.append(lit(expr).alias(name))
return DataFrame(self._table.select(*exprs))
[docs]
def with_column(
self,
name: str,
expr: Union[Expression, Callable[["DataFrame"], Expression]],
) -> "DataFrame":
"""
Add a new column or replace an existing column.
Args:
name: Name of the new or existing column.
expr: Expression for the column value, or a callable that takes
the DataFrame and returns an Expression.
Returns:
A new DataFrame with the added/replaced column.
Example::
>>> df.with_column("c", col("a") + col("b"))
>>> df.with_column("c", lambda df: df["a"] + df["b"])
"""
if callable(expr):
expr = expr(self)
if not isinstance(expr, Expression):
raise TypeError(f"Expected Expression, got {type(expr)}")
return DataFrame(self._table.add_or_replace_columns(expr.alias(name)))
[docs]
def with_columns(self, *exprs, **named_exprs) -> "DataFrame":
"""
Add multiple columns or replace existing columns.
Args:
*exprs: Expressions with .alias() already applied.
**named_exprs: Column name=expression pairs.
Returns:
A new DataFrame with the added/replaced columns.
Example::
>>> df.with_columns(
... c=col("a") + col("b"),
... d=col("a") * 2
... )
>>> df.with_columns(
... (col("a") + col("b")).alias("c"),
... (col("a") * 2).alias("d")
... )
"""
all_exprs = list(exprs)
for name, expr in named_exprs.items():
if isinstance(expr, Expression):
all_exprs.append(expr.alias(name))
else:
all_exprs.append(lit(expr).alias(name))
return DataFrame(self._table.add_or_replace_columns(*all_exprs))
[docs]
def drop_columns(
self, *columns: Union[str, Expression], strict: bool = True
) -> "DataFrame":
"""
Drop columns from the DataFrame.
Args:
*columns: Column names or expressions to drop.
strict: If True, raise an error if a column doesn't exist.
If False, silently ignore non-existent columns.
Returns:
A new DataFrame without the dropped columns.
Example::
>>> df.drop_columns("a", "b")
>>> df.drop_columns(col("a"), col("b"))
>>> df.drop_columns("nonexistent", strict=False)
"""
exprs = []
schema_columns = set(self._table.get_schema().get_field_names())
for col_spec in columns:
col_name = None
if isinstance(col_spec, str):
col_name = col_spec
if col_name not in schema_columns:
if strict:
raise ValueError(f"Column '{col_name}' not found in schema")
else:
# Skip non-existent columns in non-strict mode
continue
exprs.append(col(col_name))
elif isinstance(col_spec, Expression):
exprs.append(col_spec)
else:
raise TypeError(f"Unsupported column type: {type(col_spec)}")
if not exprs:
# Nothing to drop, return self
return self
return DataFrame(self._table.drop_columns(*exprs))
# Alias for drop_columns
drop = drop_columns
[docs]
def filter(
self,
predicate: Union[Expression, str, Callable[["DataFrame"], Expression]],
**constraints,
) -> "DataFrame":
"""
Filter rows based on a predicate.
Args:
predicate: Filter condition. Can be:
- An Expression
- A SQL string
- A callable that takes the DataFrame and returns an Expression
**constraints: Additional equality constraints as column=value pairs.
Returns:
A new DataFrame with filtered rows.
Example::
>>> df.filter(col("a") > 10)
>>> df.filter("a > 10") # SQL string
>>> df.filter(col("a") > 10, b="hello") # Mix predicate and constraints
>>> df.filter(lambda df: df["a"] > 10)
"""
conditions = []
# Handle main predicate
if isinstance(predicate, str):
conditions.append(call_sql(predicate))
elif callable(predicate):
conditions.append(predicate(self))
elif isinstance(predicate, Expression):
conditions.append(predicate)
else:
raise TypeError(f"Unsupported predicate type: {type(predicate)}")
# Handle equality constraints
for name, value in constraints.items():
conditions.append(col(name) == lit(value))
# Combine all conditions with AND
if len(conditions) == 1:
final_predicate = conditions[0]
else:
final_predicate = and_(*conditions)
return DataFrame(self._table.filter(final_predicate))
# Alias for filter
where = filter
[docs]
def rename_columns(
self,
*args,
mapping: Union[Dict[str, str], Callable[[str], str]] = None,
) -> "DataFrame":
"""
Rename columns.
Supports multiple input formats:
- Dictionary mapping old names to new names
- Callable that transforms column names
- Two strings for single column rename
- Multiple pairs of strings for multi-column rename
Args:
*args: For column rename:
- Single column: (old_name, new_name)
- Multiple columns: (old1, new1, old2, new2, ...)
mapping: Rename mapping. Can be:
- Dict[str, str]: {"old_name": "new_name"}
- Callable[[str], str]: lambda name: name.upper()
Returns:
A new DataFrame with renamed columns.
Example::
>>> df.rename_columns({"a": "id", "b": "value"})
>>> df.rename_columns("a", "id") # Single column
>>> df.rename_columns("a", "id", "b", "value") # Multiple columns
>>> df.rename_columns(lambda name: name.upper()) # Transform all names
"""
# Handle positional arguments
if len(args) == 1:
# Single argument: could be dict or callable
mapping = args[0]
elif len(args) >= 2 and len(args) % 2 == 0:
# Pairs: (old1, new1, old2, new2, ...)
mapping = {}
for i in range(0, len(args), 2):
old_name, new_name = args[i], args[i + 1]
if not isinstance(old_name, str) or not isinstance(new_name, str):
raise TypeError(
f"Column names must be strings, got {type(old_name)} and {type(new_name)}"
)
mapping[old_name] = new_name
elif len(args) > 0:
raise ValueError(
"rename_columns() accepts either a dict/callable mapping, "
"or pairs of string arguments (old1, new1, old2, new2, ...)"
)
rename_exprs = []
current_columns = self._table.get_schema().get_field_names()
if isinstance(mapping, dict):
for old_name, new_name in mapping.items():
if old_name in current_columns:
rename_exprs.append(col(old_name).alias(new_name))
elif callable(mapping):
for old_name in current_columns:
new_name = mapping(old_name)
if new_name != old_name:
rename_exprs.append(col(old_name).alias(new_name))
else:
raise TypeError(
f"Unsupported mapping type: {type(mapping)}. "
"Expected dict, callable, or two string arguments."
)
if rename_exprs:
return DataFrame(self._table.rename_columns(*rename_exprs))
return self
# Alias for rename_columns
rename = rename_columns
# ======================== Aggregation ========================
[docs]
def group_by(self, *columns: Union[str, Expression]) -> "GroupedDataFrame":
"""
Group the DataFrame by columns.
Args:
*columns: Columns to group by.
Returns:
A GroupedDataFrame for aggregation operations.
Example::
>>> import pyflink.dataframe as pf
>>> df = pf.from_records(
... [("A", 1), ("A", 2), ("B", 3)],
... schema=["category", "value"],
... )
>>> result = df.group_by("category").agg(
... pf.col("category"),
... pf.col("value").sum.alias("total")
... )
"""
exprs = []
for col_spec in columns:
if isinstance(col_spec, str):
exprs.append(col(col_spec))
elif isinstance(col_spec, Expression):
exprs.append(col_spec)
else:
raise TypeError(f"Unsupported column type: {type(col_spec)}")
return GroupedDataFrame(self, exprs)
[docs]
def agg(self, *aggs: Expression) -> "DataFrame":
"""
Apply aggregation expressions to the entire DataFrame.
Args:
*aggs: Aggregation expressions.
Returns:
A new DataFrame with aggregation results.
"""
return DataFrame(self._table.aggregate(*aggs))
# ======================== Join ========================
[docs]
def join(
self,
other: "DataFrame",
on: Union[str, Expression, List[str]] = None,
how: str = "inner",
left_on: Union[str, Expression, List[str]] = None,
right_on: Union[str, Expression, List[str]] = None,
) -> "DataFrame":
"""
Join with another DataFrame.
Args:
other: Right DataFrame to join.
on: Join key(s) when both DataFrames have the same column names.
how: Join type. One of "inner", "left", "right", "full", "outer".
left_on: Left join key(s).
right_on: Right join key(s).
Returns:
A new DataFrame with the join result.
Example::
>>> df1.join(df2, on="id")
>>> df1.join(df2, left_on="id", right_on="user_id")
>>> df1.join(df2, on=["id", "name"], how="left")
"""
join_methods = {
"inner": self._table.join,
"left": self._table.left_outer_join,
"right": self._table.right_outer_join,
"full": self._table.full_outer_join,
"outer": self._table.full_outer_join,
}
if how not in join_methods:
raise ValueError(
f"Unsupported join type: {how}. "
f"Supported types: {list(join_methods.keys())}"
)
right_table = other.to_table()
# Convert join keys to expressions
def to_exprs(keys):
if keys is None:
return None
if isinstance(keys, str):
return [col(keys)]
if isinstance(keys, Expression):
return [keys]
return [col(k) if isinstance(k, str) else k for k in keys]
if on is not None:
join_exprs = to_exprs(on)
predicate = and_(*join_exprs) if len(join_exprs) > 1 else join_exprs[0]
return DataFrame(join_methods[how](right_table, predicate))
else:
left_exprs = to_exprs(left_on)
right_exprs = to_exprs(right_on)
if not left_exprs or not right_exprs:
raise ValueError(
"Must specify either 'on' or both 'left_on' and 'right_on'"
)
if len(left_exprs) != len(right_exprs):
raise ValueError(
f"'left_on' and 'right_on' must have the same length, "
f"got {len(left_exprs)} and {len(right_exprs)}"
)
join_conditions = [l == r for l, r in zip(left_exprs, right_exprs)]
predicate = and_(*join_conditions) if len(join_conditions) > 1 else join_conditions[0]
return DataFrame(join_methods[how](right_table, predicate))
# ======================== Map ========================
[docs]
def map(self, func, *, return_dtype=None, concurrency: int = None,
num_gpus: float = None, gpu_type: str = None) -> "DataFrame":
"""
Apply a function to each row, producing a new DataFrame.
The function receives a dict[str, value] and returns a dict[str, value].
This is a 1-to-1 row transformation.
return_dtype can be omitted if the function has a TypedDict return type hint.
Args:
func: A function (dict -> dict), or a DataFrameUDFWrapper.
return_dtype: Output schema as DataType.struct(...).
Can be omitted if func has a TypedDict return hint.
concurrency: Optional concurrency (parallelism) for the UDF operator.
If specified, the operator running this UDF will use this
parallelism. UDFs with different concurrency values will be
split into separate operators.
num_gpus: Optional number of GPUs requested for this UDF (e.g., 0.5, 1.0).
Each GPU UDF will run in its own operator.
gpu_type: GPU type (e.g., 'A10', 'V100').
Returns:
A new DataFrame with the transformed rows.
Example::
>>> # With explicit return_dtype
>>> df.map(lambda row: {"a": row["a"] + 1, "b": row["b"].upper()},
... return_dtype=DataType.struct({"a": DataType.int64(),
... "b": DataType.string()}))
>>>
>>> # With TypedDict return hint (auto-inferred)
>>> class Output(TypedDict):
... a: int
... b: str
>>> def process(row) -> Output:
... return {"a": row["a"] + 1, "b": row["b"].upper()}
>>> df.map(process)
>>>
>>> # With concurrency
>>> df.map(process, concurrency=4)
>>>
>>> # With GPU resources
>>> df.map(process, num_gpus=0.5, gpu_type='A10')
"""
if num_gpus is not None and (not isinstance(num_gpus, (int, float)) or num_gpus <= 0):
raise ValueError("num_gpus must be a positive number, got: {}".format(num_gpus))
if num_gpus is not None and gpu_type is None:
raise ValueError("gpu_type must be specified when num_gpus is set")
from pyflink.dataframe.udf import _resolve_map_udf
table_wrapper = _resolve_map_udf(
func, return_dtype, func_type="general", input_columns=self.columns,
concurrency=concurrency, num_gpus=num_gpus, gpu_type=gpu_type)
return DataFrame(self._table.map(table_wrapper))
[docs]
def map_batches(self, func, *, return_dtype=None, batch_format: str,
concurrency: int = None,
batch_size: int = None, num_gpus: float = None,
gpu_type: str = None) -> "DataFrame":
"""
Apply a function to batches of rows, producing a new DataFrame.
The function receives a dict[str, pd.Series] in pandas mode, or a
dict[str, pyarrow.Array] in arrow mode, and returns the same kind of object.
This is an N-to-N batch transformation; each returned batch must have
the same row count as the input batch.
In pandas mode, return_dtype can be omitted if the function has a
TypedDict return type hint. In arrow mode, return_dtype must describe
the returned Arrow columns.
Args:
func: A function (dict[str, pd.Series] -> dict[str, pd.Series]) in pandas mode,
a function (dict[str, pyarrow.Array] -> dict[str, pyarrow.Array])
in arrow mode,
or a DataFrameUDFWrapper.
return_dtype: Output schema as DataType.struct(...). In pandas mode,
can be omitted if func has a TypedDict return hint.
batch_format: Batch transport format, either "pandas" or "arrow".
concurrency: Optional concurrency (parallelism) for the UDF operator.
If specified, the operator running this UDF will use this
parallelism. UDFs with different concurrency values will be
split into separate operators.
batch_size: Optional maximum number of elements per batch.
If not specified, the global configuration
'python.fn-execution.arrow.batch.size' is used (default 1000).
When multiple UDFs are fused, the maximum batch_size
among them is used.
num_gpus: Optional number of GPUs requested for this UDF (e.g., 0.5, 1.0).
Each GPU UDF will run in its own operator.
gpu_type: GPU type (e.g., 'A10', 'V100').
Returns:
A new DataFrame with the transformed rows.
Example::
>>> # pandas mode
>>> def process(batch):
... return {"a": batch["a"] + 1}
>>> df.map_batches(process, batch_format="pandas",
... return_dtype=DataType.struct({"a": DataType.int64()}))
>>>
>>> # arrow mode
>>> def process_arrow(batch):
... return {"a": batch["a"]}
>>> df.map_batches(process_arrow, batch_format="arrow",
... return_dtype=DataType.struct({"a": DataType.int64()}))
>>>
>>> # With TypedDict return hint (auto-inferred)
>>> class Output(TypedDict):
... a: int
>>> def process(batch) -> Output:
... return {"a": batch["a"] + 1}
>>> df.map_batches(process, batch_format="pandas")
>>>
>>> # With concurrency
>>> df.map_batches(process, batch_format="pandas", concurrency=4)
>>>
>>> # With batch_size
>>> df.map_batches(process, batch_format="pandas", batch_size=500)
>>>
>>> # With GPU resources
>>> df.map_batches(process, batch_format="pandas", num_gpus=0.5, gpu_type='A10')
"""
if batch_format not in ("pandas", "arrow"):
raise ValueError(
"batch_format must be 'pandas' or 'arrow', got: %r" % batch_format)
if batch_size is not None and (not isinstance(batch_size, int) or batch_size <= 0):
raise ValueError("batch_size must be a positive integer, got: {}".format(batch_size))
if num_gpus is not None and (not isinstance(num_gpus, (int, float)) or num_gpus <= 0):
raise ValueError("num_gpus must be a positive number, got: {}".format(num_gpus))
if num_gpus is not None and gpu_type is None:
raise ValueError("gpu_type must be specified when num_gpus is set")
from pyflink.dataframe.udf import _resolve_map_udf
table_wrapper = _resolve_map_udf(
func, return_dtype, func_type=batch_format, input_columns=self.columns,
concurrency=concurrency, batch_size=batch_size,
num_gpus=num_gpus, gpu_type=gpu_type)
return DataFrame(self._table.map(table_wrapper))
# ======================== Composition ========================
[docs]
def pipe(
self,
func: Callable[..., T],
*args: Any,
**kwargs: Any,
) -> T:
"""
Apply a chainable function to the DataFrame.
Enables fluent method chaining with user-defined functions.
The DataFrame is passed as the first argument to ``func``.
Args:
func: A function whose first argument is a DataFrame.
*args: Additional positional arguments passed to func.
**kwargs: Additional keyword arguments passed to func.
Returns:
The return value of func.
Example::
>>> import pyflink.dataframe as pf
>>> def add_total(df):
... return df.with_column("total", pf.col("a") + pf.col("b"))
...
>>> def filter_positive(df, col_name):
... return df.filter(pf.col(col_name) > 0)
...
>>> result = (
... df.pipe(add_total)
... .pipe(filter_positive, "total")
... )
"""
return func(self, *args, **kwargs)
# ======================== Conversion ========================
[docs]
def to_table(self) -> Table:
"""
Convert to a PyFlink Table.
Returns:
The underlying Table object.
"""
return self._table
[docs]
def to_pandas(self):
"""
Convert to a Pandas DataFrame.
Returns:
A Pandas DataFrame.
"""
return self._table.to_pandas()
[docs]
def collect(self) -> List:
"""
Collect the DataFrame results to a list.
Returns:
A list of rows.
"""
return list(self._table.execute().collect())
[docs]
def limit(self, n: int) -> "DataFrame":
"""
Limit the DataFrame to the first n rows.
This is a lazy transformation that returns a new DataFrame.
Args:
n: The maximum number of rows to return. Must be non-negative.
Returns:
A new DataFrame limited to at most n rows.
Example::
>>> df = pf.from_records([(1,), (2,), (3,)], schema=["a"])
>>> df.limit(2).collect()
[Row(a=1), Row(a=2)]
"""
if n < 0:
raise ValueError("n must be non-negative")
return DataFrame(self._table.fetch(n))
[docs]
def offset(self, n: int) -> "DataFrame":
"""
Skip the first n rows of the DataFrame.
This is a lazy transformation that returns a new DataFrame.
Args:
n: The number of rows to skip. Must be non-negative.
Returns:
A new DataFrame with the first n rows skipped.
Example::
>>> df = pf.from_records([(1,), (2,), (3,)], schema=["a"])
>>> df.offset(1).limit(2).collect()
[Row(a=2), Row(a=3)]
"""
if n < 0:
raise ValueError("n must be non-negative")
return DataFrame(self._table.offset(n))
[docs]
def head(self, n: int) -> "DataFrame":
"""
Return the first n rows as a new DataFrame.
This is a lazy transformation, equivalent to ``limit(n)``.
Args:
n: The number of rows to return.
Returns:
A new DataFrame containing at most n rows.
Example::
>>> df = pf.from_records([(i,) for i in range(10)], schema=["a"])
>>> df.head(3).collect()
[Row(a=0), Row(a=1), Row(a=2)]
"""
return self.limit(n)
@overload
def iter_rows(
self, *, named: Literal[False] = ...
) -> Iterator[Tuple[Any, ...]]:
...
@overload
def iter_rows(
self, *, named: Literal[True]
) -> Iterator[Dict[str, Any]]:
...
[docs]
def iter_rows(
self, *, named: bool = False
) -> Union[Iterator[Tuple[Any, ...]], Iterator[Dict[str, Any]]]:
"""
Return an iterator over rows of the DataFrame.
This is a terminal operation that triggers execution.
Args:
named: If False (default), yields tuples of values.
If True, yields dictionaries {column_name: value}.
Returns:
A generator that yields rows as tuples or dicts.
Example::
>>> df = pf.from_records([(1, "a"), (2, "b")], schema=["id", "name"])
>>> for row in df.iter_rows():
... print(row)
(1, 'a')
(2, 'b')
>>> for row in df.iter_rows(named=True):
... print(row)
{'id': 1, 'name': 'a'}
{'id': 2, 'name': 'b'}
"""
columns = self.columns
closeable_iter = self._table.execute().collect()
try:
for row in closeable_iter:
if named:
yield dict(zip(columns, row))
else:
yield tuple(row)
finally:
closeable_iter.close()
[docs]
def iter_batches(self, *, batch_size: int = 1000) -> Iterator["pd.DataFrame"]:
"""
Return an iterator of pandas DataFrames, each containing up to batch_size rows.
This is a terminal operation that triggers execution. It uses Arrow-based
serialization for efficient data transfer.
Args:
batch_size: The maximum number of rows per batch. Defaults to 1000.
Returns:
A generator that yields pandas DataFrames.
Example::
>>> df = pf.from_records([(i,) for i in range(2500)], schema=["a"])
>>> for batch in df.iter_batches(batch_size=1000):
... print(len(batch))
1000
1000
500
"""
import pyarrow as pa
from pyflink.table.serializers import ArrowSerializer
from pyflink.table.types import create_arrow_schema
from pyflink.java_gateway import get_gateway
self._table._t_env._before_execute()
gateway = get_gateway()
max_arrow_batch_size = (
self._table._j_table.getTableEnvironment()
.getConfig()
.get(
gateway.jvm.org.apache.flink.python.PythonOptions.MAX_ARROW_BATCH_SIZE
)
)
batches_iterator = (
gateway.jvm.org.apache.flink.table.runtime.arrow.ArrowUtils
.collectAsPandasDataFrame(self._table._j_table, max_arrow_batch_size)
)
if not batches_iterator.hasNext():
return
import pytz
timezone = pytz.timezone(
self._table._j_table.getTableEnvironment()
.getConfig()
.getLocalTimeZone()
.getId()
)
serializer = ArrowSerializer(
create_arrow_schema(
self._table.get_schema().get_field_names(),
self._table.get_schema().get_field_data_types(),
),
self._table.get_schema().to_row_data_type(),
timezone,
)
from pyflink.table.utils import tz_convert_from_internal
schema = self._table.get_schema()
field_names = schema.get_field_names()
accumulated = []
accumulated_rows = 0
for record_batch in serializer.load_from_iterator(batches_iterator):
accumulated.append(record_batch)
accumulated_rows += record_batch.num_rows
while accumulated_rows >= batch_size:
table = pa.Table.from_batches(accumulated)
accumulated = []
accumulated_rows = 0
pdf = table.slice(0, batch_size).to_pandas()
remainder_rows = len(table) - batch_size
if remainder_rows > 0:
remainder_table = table.slice(batch_size)
# Convert remainder back to batches for next iteration
for batch in remainder_table.to_batches():
accumulated.append(batch)
accumulated_rows += batch.num_rows
for field_name in field_names:
pdf[field_name] = tz_convert_from_internal(
pdf[field_name],
schema.get_field_data_type(field_name),
timezone,
)
yield pdf
# Yield any remaining rows
if accumulated_rows > 0:
table = pa.Table.from_batches(accumulated)
pdf = table.to_pandas()
for field_name in field_names:
pdf[field_name] = tz_convert_from_internal(
pdf[field_name],
schema.get_field_data_type(field_name),
timezone,
)
yield pdf
# ======================== I/O ========================
[docs]
def write_parquet(
self,
path: str,
*,
mode: str = "overwrite",
# Parquet format options
compression: str = "SNAPPY",
block_size: int = 128 * 1024 * 1024,
page_size: int = 1024 * 1024,
dictionary_page_size: int = 1024 * 1024,
enable_dictionary: bool = True,
max_padding_size: int = 8 * 1024 * 1024,
validation: bool = False,
writer_version: str = "v1",
utc_timezone: bool = False,
timestamp_time_unit: str = "micros",
write_int64_timestamp: bool = False,
# FileSystem sink rolling policy options
rolling_policy_file_size: str = "128mb",
rolling_policy_rollover_interval: str = "30min",
rolling_policy_inactivity_interval: str = "30min",
rolling_policy_check_interval: str = "1min",
# FileSystem sink partition commit options
partition_commit_trigger: str = "process-time",
partition_commit_delay: str = "0s",
partition_commit_watermark_time_zone: str = "UTC",
partition_commit_policy_kind: Optional[str] = None,
partition_commit_policy_class: Optional[str] = None,
partition_commit_policy_class_parameters: Optional[str] = None,
partition_commit_success_file_name: str = "_SUCCESS",
# FileSystem sink partition time extractor options
partition_time_extractor_kind: str = "default",
partition_time_extractor_class: Optional[str] = None,
partition_time_extractor_timestamp_formatter: Optional[str] = None,
partition_time_extractor_timestamp_pattern: Optional[str] = None,
# FileSystem sink other options
shuffle_by_partition_enable: bool = False,
auto_compaction: bool = False,
compaction_file_size: Optional[str] = None,
compaction_parallelism: Optional[int] = None,
sink_parallelism: Optional[int] = None,
partition_default_name: str = "__DEFAULT_PARTITION__",
) -> None:
"""
Write the DataFrame to Parquet file(s) at the given path.
Uses Flink's FileSystem Connector with Parquet Format under the hood.
Args:
path: Output path for the Parquet file(s). Supports local paths
and any Flink-supported file system (HDFS, OSS, S3, etc.).
mode: [Batch] Write mode. 'overwrite' to overwrite existing data
(default), 'append' to append to existing data.
In streaming mode, this is always forced to 'append'
since streaming does not support overwrite.
compression: [Batch/Streaming] Parquet compression codec. One
of 'SNAPPY' (default), 'GZIP', 'LZO', 'BROTLI',
'LZ4', 'ZSTD', 'LZ4_RAW', 'UNCOMPRESSED'.
block_size: [Batch/Streaming] Row group size in bytes. Default
is 134217728 (128MB).
page_size: [Batch/Streaming] Data page size in bytes. Default
is 1048576 (1MB).
dictionary_page_size: [Batch/Streaming] Dictionary page size in
bytes. Default is 1048576 (1MB).
enable_dictionary: [Batch/Streaming] Whether to enable dictionary
encoding. Default is True.
max_padding_size: [Batch/Streaming] The maximum padding size in
bytes for row group alignment. Default is
8388608 (8MB).
validation: [Batch/Streaming] Whether to enable schema validation
on write. Default is False.
writer_version: [Batch/Streaming] Parquet writer version. One of
'v1' (default), 'v2'.
utc_timezone: [Batch/Streaming] Use UTC timezone or local timezone
to the conversion between epoch time and
LocalDateTime. Hive 0.x/1.x/2.x use local timezone.
But Hive 3.x use UTC timezone. Default is False.
timestamp_time_unit: [Batch/Streaming] Store parquet
int64/LogicalTypes timestamps in this time
unit. One of 'nanos', 'micros' (default),
'millis'.
write_int64_timestamp: [Batch/Streaming] Write parquet timestamp
as int64/LogicalTypes instead of
int96/OriginalTypes. Note: Timestamp will
be time zone agnostic (NEVER converted to
a different time zone). Default is False.
rolling_policy_file_size: [Streaming] The maximum part file size
before rolling (e.g., '128mb'). Default
is 128MB.
rolling_policy_rollover_interval: [Streaming] The maximum time
duration a part file can stay
open before rolling (by default
long enough to avoid too many
small files). The frequency at
which this is checked is
controlled by
rolling_policy_check_interval.
Default is 30 minutes.
rolling_policy_inactivity_interval: [Streaming] The maximum time
duration a part file can stay
inactive before rolling (by
default long enough to avoid
too many small files). The
frequency at which this is
checked is controlled by
rolling_policy_check_interval.
Default is 30 minutes.
rolling_policy_check_interval: [Streaming] The interval for
checking time based rolling
policies. This controls the
frequency to check whether a part
file should rollover based on
rolling_policy_rollover_interval.
Default is 1 minute.
partition_commit_trigger: [Streaming] Trigger type for partition
commit. One of 'process-time' (default),
'partition-time'.
partition_commit_delay: [Streaming] The partition will not commit
until the delay time. The value should be
'1d' for day partitions and '1h' for hour
partitions. Default is 0.
partition_commit_watermark_time_zone: [Streaming] The time zone to
parse the long watermark
value to TIMESTAMP value,
the parsed watermark
timestamp is used to compare
with partition time to
decide the partition should
commit or not. The default
value is 'UTC', which means
the watermark is defined on
TIMESTAMP column or not
defined. If the watermark is
defined on TIMESTAMP_LTZ
column, the time zone of
watermark is user configured
time zone. The option value
is either a full name such
as 'America/Los_Angeles', or
a custom timezone id such as
'GMT-08:00'.
partition_commit_policy_kind: [Streaming] Policy to commit a
partition is to notify the downstream
application that the partition has
finished writing, the partition is
ready to be read. 'metastore': add
partition to metastore. Only hive
table supports metastore policy, file
system manages partitions through
directory structure. 'success-file':
add '_success' file to directory.
Both can be configured at the same
time: 'metastore,success-file'.
'custom': use policy class to create
a commit policy.
partition_commit_policy_class: [Streaming] The partition commit
policy class for implementing
PartitionCommitPolicy interface.
Only works with 'custom' commit
policy.
partition_commit_policy_class_parameters: [Streaming] The
parameters passed to the
constructor of the custom
commit policy, with
multiple parameters
separated by semicolons,
such as 'param1;param2'.
partition_commit_success_file_name: [Streaming] The file name for
success-file partition commit
policy. Default is '_SUCCESS'.
partition_time_extractor_kind: [Streaming] Time extractor to
extract time from partition values.
This can either be 'default' or a
custom extractor class. For
'default', you can configure a
timestamp pattern. Default is
'default'.
partition_time_extractor_class: [Streaming] The extractor class
for implementing
PartitionTimeExtractor interface.
Only used when
partition_time_extractor_kind is
not 'default'.
partition_time_extractor_timestamp_formatter: [Streaming] The
formatter to format timestamp from string. Used with
partition_time_extractor_timestamp_pattern.
Supports multiple partition fields like
'$year-$month-$day $hour:00:00'. Compatible
with Java's DateTimeFormatter.
partition_time_extractor_timestamp_pattern: [Streaming] Pattern
to get a timestamp from partitions when
partition_time_extractor_kind is 'default'.
E.g., '$dt' for a single partition field, or
'$year-$month-$day $hour:00:00' for multiple
fields.
shuffle_by_partition_enable: [Batch/Streaming] Enable shuffle data
by dynamic partition fields in sink
phase, this can greatly reduce the
number of files for filesystem sink
but may lead to data skew. Default
is False.
auto_compaction: [Streaming] Whether to enable automatic compaction
in streaming sink. The data will be written to
temporary files. After the checkpoint is
completed, the temporary files generated by a
checkpoint will be compacted. The temporary files
are invisible before compaction. Default is False.
compaction_file_size: [Streaming] The compaction target file size.
Defaults to the rolling file size.
compaction_parallelism: [Batch] Custom parallelism for the
compaction operator in batch mode. By
default, the planner will use the
parallelism of the sink.
sink_parallelism: [Batch/Streaming] Custom parallelism for the
sink.
partition_default_name: [Batch/Streaming] The default partition
name in case the dynamic partition column
value is null/empty string. Default is
'__DEFAULT_PARTITION__'.
Example::
>>> import pyflink.dataframe as pf
>>>
>>> df = pf.from_records([(1, "a"), (2, "b")], schema=["id", "name"])
>>> df.write_parquet("/path/to/output")
>>>
>>> # Append with GZIP compression
>>> df.write_parquet("/path/to/output", mode="append",
... compression="GZIP")
>>>
>>> # Custom rolling policy and block size
>>> df.write_parquet("/path/to/output",
... block_size=256 * 1024 * 1024,
... rolling_policy_file_size="256mb")
"""
from pyflink.dataframe.util.artifacts import (
add_built_in_connector,
add_built_in_format,
)
from pyflink.table import TableDescriptor, FormatDescriptor
if mode not in ("overwrite", "append"):
raise ValueError(
f"Invalid mode '{mode}'. Must be 'overwrite' or 'append'."
)
# Streaming mode does not support overwrite, force append
t_env = self._table._t_env
add_built_in_connector(t_env, "filesystem")
add_built_in_format(t_env, "parquet")
runtime_mode = t_env.get_config().get(
"execution.runtime-mode", "STREAMING"
)
if runtime_mode.upper() == "STREAMING":
mode = "append"
# Build Parquet format descriptor
format_builder = FormatDescriptor.for_format("parquet")
parquet_options = {
"compression": compression,
"block.size": str(block_size),
"page.size": str(page_size),
"dictionary.page.size": str(dictionary_page_size),
"writer.max-padding": str(max_padding_size),
"timestamp.time.unit": timestamp_time_unit,
"enable.dictionary": str(enable_dictionary).lower(),
"validation": str(validation).lower(),
"writer.version": writer_version,
"utc-timezone": str(utc_timezone).lower(),
"write.int64.timestamp":
str(write_int64_timestamp).lower(),
}
for key, value in parquet_options.items():
format_builder = format_builder.option(key, value)
# Build schema from the DataFrame's existing schema
resolved_schema = self._table.get_resolved_schema()
schema_builder = _build_physical_schema_from_resolved_schema(
resolved_schema
)
# Build connector descriptor
connector_builder = TableDescriptor.for_connector("filesystem") \
.option("path", path) \
.format(format_builder.build()) \
.schema(schema_builder.build())
sink_options = {
"sink.rolling-policy.file-size": rolling_policy_file_size,
"sink.rolling-policy.rollover-interval":
rolling_policy_rollover_interval,
"sink.rolling-policy.inactivity-interval":
rolling_policy_inactivity_interval,
"sink.rolling-policy.check-interval":
rolling_policy_check_interval,
"sink.partition-commit.trigger":
partition_commit_trigger,
"sink.partition-commit.delay":
partition_commit_delay,
"sink.partition-commit.watermark-time-zone":
partition_commit_watermark_time_zone,
"sink.partition-commit.policy.kind":
partition_commit_policy_kind,
"sink.partition-commit.policy.class":
partition_commit_policy_class,
"sink.partition-commit.policy.class.parameters":
partition_commit_policy_class_parameters,
"sink.partition-commit.success-file.name":
partition_commit_success_file_name,
"partition.time-extractor.kind":
partition_time_extractor_kind,
"partition.time-extractor.class":
partition_time_extractor_class,
"partition.time-extractor.timestamp-formatter":
partition_time_extractor_timestamp_formatter,
"partition.time-extractor.timestamp-pattern":
partition_time_extractor_timestamp_pattern,
"partition.default-name": partition_default_name,
"compaction.file-size": compaction_file_size,
"sink.shuffle-by-partition.enable":
str(shuffle_by_partition_enable).lower(),
"auto-compaction": str(auto_compaction).lower(),
}
if compaction_parallelism is not None:
sink_options["compaction.parallelism"] = \
str(compaction_parallelism)
if sink_parallelism is not None:
sink_options["sink.parallelism"] = \
str(sink_parallelism)
for key, value in sink_options.items():
if value is not None:
connector_builder = connector_builder.option(
key, value
)
result = self._table.execute_insert(
connector_builder.build(),
overwrite=(mode == "overwrite")
)
# Auto-detect local execution and wait for job to finish
target = t_env.get_config().get("execution.target", None)
if target in ("local", "minicluster"):
result.wait()
[docs]
def write_kafka(
self,
bootstrap_servers: str,
*,
properties: Optional[Dict[str, str]] = None,
# General options
format: str = "json",
format_options: Optional[Dict[str, str]] = None,
key_format: Optional[str] = None,
key_format_options: Optional[Dict[str, str]] = None,
key_fields: Optional[List[str]] = None,
key_fields_prefix: Optional[str] = None,
value_format: Optional[str] = None,
value_format_options: Optional[Dict[str, str]] = None,
value_fields_include: Literal["ALL", "EXCEPT_KEY"] = "ALL",
# Sink-specific options
topic: str,
partitioner: str = "default",
delivery_guarantee: Literal["none", "at-least-once",
"exactly-once"] = "at-least-once",
transactional_id_prefix: Optional[str] = None,
parallelism: Optional[int] = None,
) -> None:
"""
Write the DataFrame to a Kafka topic.
Uses Flink's Kafka SQL Connector under the hood. The DataFrame rows
are serialized using the specified format and written to the given
Kafka topic.
Args:
bootstrap_servers: Comma-separated list of Kafka broker addresses
(e.g., ``"localhost:9092"`` or
``"broker1:9092,broker2:9092"``).
properties: Extra Kafka producer properties. Each key in this dict
is directly passed to Kafka.
Example: ``{"batch.size": "16384"}``
Note:
- Property names should be valid Kafka producer config keys.
format: Format for serializing message values. Supported options:
``"csv"``, ``"json"``, ``"avro"``, ``"debezium-json"``,
``"canal-json"``, ``"maxwell-json"``, ``"avro-confluent"``,
``"raw"``.
format_options: Additional options for the value format, as a
dict. Keys are format-specific options without the format
prefix (e.g., ``{"timestamp-format.standard": "ISO-8601"}``
for JSON format).
key_format: Format for serializing message keys (e.g., ``"json"``,
``"csv"``, ``"avro"``). If specified, ``key_fields`` should
also be provided.
key_format_options: Additional options for the key format, as a
dict. Same structure as ``format_options``.
key_fields: List of column names that form the message key.
Required when ``key_format`` is specified.
key_fields_prefix: Prefix for key field column names to avoid name
clashes with value fields. Requires
``value_fields_include="EXCEPT_KEY"``.
value_format: The format used to serialize the value part of Kafka
messages.
Only one of ``format`` or ``value_format`` are required.
If both are configured, ``value_format`` should take
precedence.
value_format_options: Extra options for ``value_format``.
value_fields_include: Whether key fields are included in value
fields during value serialization.
- ``"ALL"`` (default): all columns are treated as value fields.
- ``"EXCEPT_KEY"``: fields in ``key_fields`` are excluded from
value fields.
topic: Target Kafka topic name to write to.
partitioner: How to distribute records to Kafka partitions.
One of:
- ``"default"``: Use Kafka's default partitioner (sticky
round-robin for null keys, murmur2 hash for keys).
- ``"fixed"``: Each Flink partition maps to at most one Kafka
partition.
- ``"round-robin"``: Records are distributed round-robin
across partitions (only for records without keys).
- A fully qualified class name of a custom
``FlinkKafkaPartitioner`` subclass.
delivery_guarantee: Delivery semantic for the Kafka sink. One of:
- ``"at-least-once"`` (default): Messages are never lost but
may be duplicated on failure.
- ``"exactly-once"``: Kafka transactions ensure no duplicates.
Requires ``transactional_id_prefix`` to be set and Flink
checkpointing to be enabled.
- ``"none"``: No guarantees — messages may be lost or
duplicated.
transactional_id_prefix: Prefix for Kafka transaction IDs.
Required when ``delivery_guarantee="exactly-once"``. Should
be stable across restarts. Different applications must use
different prefixes.
parallelism: Custom parallelism for the Kafka sink operator.
By default, inherits parallelism from the upstream operator.
Example::
>>> import pyflink.dataframe as pf
>>>
>>> df = pf.from_records(
... [(1, "alice"), (2, "bob")],
... schema=["user_id", "name"],
... )
>>>
>>> # Simple write with JSON format
>>> df.write_kafka("localhost:9092", topic="user_events")
>>>
>>> # Write with Avro format
>>> df.write_kafka(
... "localhost:9092",
... topic="user_events",
... format="avro",
... )
>>>
>>> # Write with key fields
>>> df.write_kafka(
... "localhost:9092",
... topic="user_events",
... key_format="json",
... key_fields=["user_id"],
... )
>>>
>>> # Write with exactly-once delivery guarantee
>>> df.write_kafka(
... "localhost:9092",
... topic="user_events",
... delivery_guarantee="exactly-once",
... transactional_id_prefix="my-app-sink",
... )
>>>
>>> # Write with fixed partitioning and parallelism
>>> df.write_kafka(
... "localhost:9092",
... topic="user_events",
... partitioner="fixed",
... parallelism=4,
... )
>>>
>>> # Write with JSON format options
>>> df.write_kafka(
... "localhost:9092",
... topic="user_events",
... format="json",
... format_options={
... "timestamp-format.standard": "ISO-8601",
... },
... )
>>>
>>> # Write with Kafka security (SASL/SSL)
>>> df.write_kafka(
... "localhost:9092",
... topic="secure_events",
... properties={
... "security.protocol": "SASL_SSL",
... "sasl.mechanism": "PLAIN",
... "sasl.jaas.config": (
... "org.apache.kafka.common.security.plain"
... ".PlainLoginModule required"
... ' username="user" password="pass";'
... ),
... },
... )
"""
from pyflink.dataframe.util.artifacts import (
add_built_in_connector,
add_built_in_format,
)
from pyflink.dataframe.io import _to_str
from pyflink.table import TableDescriptor, FormatDescriptor
t_env = self._table._t_env
add_built_in_connector(t_env, "kafka")
add_built_in_format(t_env, format)
add_built_in_format(t_env, key_format)
add_built_in_format(t_env, value_format)
# Validate key/value option dependencies
if key_format is not None and key_fields is None:
raise ValueError(
"'key_fields' must be specified when 'key_format' is defined."
)
if (
key_fields_prefix is not None
and value_fields_include != "EXCEPT_KEY"
):
raise ValueError(
"'value_fields_include' must be 'EXCEPT_KEY' when "
"'key_fields_prefix' is set."
)
if delivery_guarantee == "exactly-once" and \
transactional_id_prefix is None:
raise ValueError(
"transactional_id_prefix must be specified when "
"delivery_guarantee is 'exactly-once'."
)
if delivery_guarantee != "exactly-once" and \
transactional_id_prefix is not None:
raise ValueError(
"transactional_id_prefix must not be specified unless "
"delivery_guarantee is 'exactly-once'."
)
# Build value format descriptor
format_builder = FormatDescriptor.for_format(format)
if format_options:
for key, value in format_options.items():
format_builder = format_builder.option(key, _to_str(value))
# Build schema from the DataFrame's existing schema
resolved_schema = self._table.get_resolved_schema()
schema_builder = _build_physical_schema_from_resolved_schema(
resolved_schema
)
# Build connector descriptor
builder = TableDescriptor.for_connector("kafka") \
.option("properties.bootstrap.servers", bootstrap_servers) \
.option("topic", topic) \
.format(format_builder.build()) \
.schema(schema_builder.build())
# Additional Kafka properties
if properties:
for key, value in properties.items():
builder = builder.option(
f"properties.{key}", _to_str(value)
)
# Key format
if key_format is not None:
from pyflink.common.config_options import ConfigOptions
key_format_opt = ConfigOptions.key("key.format") \
.string_type().no_default_value()
key_format_builder = FormatDescriptor.for_format(
key_format
)
if key_format_options:
for key, value in key_format_options.items():
key_format_builder = \
key_format_builder.option(key, _to_str(value))
builder = builder.format(
key_format_builder.build(),
format_option=key_format_opt,
)
if key_fields is not None:
builder = builder.option(
"key.fields", ";".join(key_fields)
)
if key_fields_prefix is not None:
builder = builder.option(
"key.fields-prefix", key_fields_prefix
)
# Value format
if value_format is not None:
from pyflink.common.config_options import ConfigOptions
value_format_opt = ConfigOptions.key("value.format") \
.string_type().no_default_value()
value_format_builder = FormatDescriptor.for_format(
value_format
)
if value_format_options:
for key, value in value_format_options.items():
value_format_builder = \
value_format_builder.option(key, _to_str(value))
builder = builder.format(
value_format_builder.build(),
format_option=value_format_opt,
)
if value_fields_include is not None:
builder = builder.option(
"value.fields-include", value_fields_include
)
# Sink-specific options
sink_options = {
"sink.delivery-guarantee": delivery_guarantee,
"sink.partitioner": partitioner,
}
if transactional_id_prefix is not None:
sink_options["sink.transactional-id-prefix"] = \
transactional_id_prefix
if parallelism is not None:
sink_options["sink.parallelism"] = str(parallelism)
for key, value in sink_options.items():
if value is not None:
builder = builder.option(
key, _to_str(value)
)
result = self._table.execute_insert(builder.build())
# Auto-detect local execution and wait for job to finish
target = t_env.get_config().get("execution.target", None)
if target in ("local", "minicluster"):
result.wait()
[docs]
def write_sls(
self,
endpoint: str,
*,
project: str,
logstore: str,
access_id: Optional[str] = None,
access_key: Optional[str] = None,
topic_field: Optional[str] = None,
time_field: Optional[str] = None,
source_field: Optional[str] = None,
partition_field: Optional[str] = None,
buckets: int = 64,
flush_interval_ms: int = 2000,
write_null_properties: bool = True,
extra_options: Optional[Dict[str, str]] = None,
) -> None:
"""Write the DataFrame to an SLS (Aliyun Log Service) logstore.
Wraps the Ververica SLS sink connector. The DataFrame's resolved
schema is propagated to the connector automatically.
Args:
endpoint: SLS private-network service endpoint address.
project: Name of the SLS project.
logstore: SLS LogStore (or MetricStore) name.
access_id: Alibaba Cloud account AccessKey ID. If both
``access_id`` and ``access_key`` are ``None`` (the default),
the SLS connector falls back to the VVP STS token.
access_key: Alibaba Cloud account AccessKey Secret. See
``access_id`` for the STS fallback behaviour.
topic_field: Name of a column whose value overrides the
SLS ``__topic__`` attribute for each record.
time_field: Name of an INT column whose value overrides the
SLS ``__timestamp__`` attribute. If unset, the current
write time is used.
source_field: Name of a column whose value overrides the
SLS ``__source__`` attribute.
partition_field: Name of a column to hash for shard routing
so records with identical hash land on the same shard.
buckets: Hash-bucket count when ``partition_field`` is set.
Must be a power of 2 in [1, 256] and should be at least
the shard count of the logstore.
flush_interval_ms: Time interval (milliseconds) that triggers
writes to SLS.
write_null_properties: When ``True`` (the default), null
field values are written as empty strings. When ``False``,
null fields are omitted entirely.
extra_options: Additional connector options forwarded
through to the underlying SLS connector. This is for options
not exposed as named parameters of ``write_sls``, e.g.,
``IOThreadNum``, ``baseRetryBackOffTimeMs``,
``maxRetryBackOffTimeMs``. If a key matches an option
generated by a named parameter of `write_sls`, the value in
``extra_options`` takes precedence. ``"connector"`` is
reserved and must not be supplied.
Raises:
ValueError: If ``buckets`` is not a power of 2 in [1, 256],
or ``extra_options`` contains a ``"connector"`` key.
Examples:
Minimal sink (uses default partitioning and write_null
behavior)::
import pyflink.dataframe as pf
pf.from_records(
[("INFO", "started"), ("WARN", "slow")],
schema=["level", "message"],
).write_sls(
"cn-hangzhou-intranet.log.aliyuncs.com",
project="my-project",
logstore="my-logstore",
access_id=ALIYUN_AK_ID,
access_key=ALIYUN_AK_SECRET
)
Partitioned writes (route by ``user_id`` into 128 buckets)::
df.write_sls(
"cn-hangzhou-intranet.log.aliyuncs.com",
project="my-project",
logstore="my-logstore",
access_id=ALIYUN_AK_ID,
access_key=ALIYUN_AK_SECRET,
partition_field="user_id",
buckets=128
)
Field overrides (use columns as SLS metadata attributes)::
df.write_sls(
"cn-hangzhou-intranet.log.aliyuncs.com",
project="my-project",
logstore="my-logstore",
access_id=ALIYUN_AK_ID,
access_key=ALIYUN_AK_SECRET,
topic_field="category",
source_field="host",
time_field="event_time_seconds"
)
Escape hatch via ``extra_options`` (tunes a non-named option
and overrides ``buckets``)::
df.write_sls(
"cn-hangzhou-intranet.log.aliyuncs.com",
project="my-project",
logstore="my-logstore",
access_id=ALIYUN_AK_ID,
access_key=ALIYUN_AK_SECRET,
partition_field="user_id",
extra_options={
"IOThreadNum": "16",
"buckets": "256", # overrides default 64
}
)
"""
from pyflink.dataframe.io import (
_add_connector_options,
_add_extra_options
)
from pyflink.dataframe.util.artifacts import add_built_in_connector
from pyflink.table import TableDescriptor
_validate_sls_buckets(buckets)
options = [
("endpoint", endpoint),
("project", project),
("logstore", logstore),
("buckets", str(buckets)),
("flushIntervalMs", str(flush_interval_ms)),
("writeNullProperties", str(write_null_properties)),
]
if access_id is not None:
options.append(("accessId", access_id))
if access_key is not None:
options.append(("accessKey", access_key))
if topic_field is not None:
options.append(("topicField", topic_field))
if time_field is not None:
options.append(("timeField", time_field))
if source_field is not None:
options.append(("sourceField", source_field))
if partition_field is not None:
options.append(("partitionField", partition_field))
t_env = self._table._t_env
add_built_in_connector(t_env, "sls")
resolved_schema = self._table.get_resolved_schema()
schema_builder = _build_physical_schema_from_resolved_schema(
resolved_schema
)
builder = TableDescriptor.for_connector("sls").schema(
schema_builder.build()
)
builder = _add_connector_options(builder, options)
builder = _add_extra_options(builder, extra_options)
result = self._table.execute_insert(builder.build())
target = t_env.get_config().get("execution.target", None)
if target in ("local", "minicluster"):
result.wait()
[docs]
def write_hologres(
self,
endpoint: str,
*,
db_name: str,
table_name: str,
username: str,
password: str,
primary_key: Optional[Union[str, List[str]]] = None,
connection_pool_size: int = 5,
connection_pool_name: str = "default",
connection_fixed: Optional[bool] = None,
connection_max_idle_ms: int = 60000,
connection_ssl_mode: Literal[
"disable", "require", "verify-ca", "verify-full"
] = "disable",
connection_ssl_root_cert_location: Optional[str] = None,
retry_count: int = 10,
retry_sleep_step_ms: int = 5000,
meta_cache_ttl_ms: int = 600000,
serverless_computing: bool = False,
write_mode: Literal[
"AUTO",
"INSERT",
"COPY_STREAM",
"COPY_BULK_LOAD",
"COPY_BULK_LOAD_ON_CONFLICT",
] = "AUTO",
on_conflict_action: Literal[
"INSERT_OR_IGNORE", "INSERT_OR_UPDATE", "INSERT_OR_REPLACE"
] = "INSERT_OR_UPDATE",
create_missing_partition: bool = True,
delete_strategy: Literal[
"IGNORE_DELETE",
"NON_PK_FIELD_TO_NULL",
"DELETE_ROW_ON_PK",
"CHANGELOG_STANDARD",
] = "CHANGELOG_STANDARD",
ignore_null_when_update: bool = False,
ignore_null_when_update_by_expr: bool = False,
default_for_not_null_column: bool = True,
remove_u0000_in_text: bool = True,
partial_insert: bool = False,
deduplication: bool = True,
aggressive_flush: bool = False,
check_and_put_column: Optional[str] = None,
check_and_put_operator: str = "GREATER",
check_and_put_null_as: Optional[str] = None,
insert_batch_size: int = 512,
insert_batch_byte_size: int = 2 * 1024 * 1024,
insert_flush_interval_ms: int = 10000,
copy_format: Literal["binary", "text", "binaryrow"] = "binary",
insert_conflict_update_set: Optional[str] = None,
insert_conflict_where: Optional[str] = None,
parallelism: Optional[int] = None,
extra_options: Optional[Dict[str, Any]] = None,
) -> None:
"""Write the DataFrame to a Hologres table.
Wraps the Ververica Hologres sink connector. The DataFrame's resolved
schema is propagated to the connector automatically — use
``primary_key`` to declare a primary key constraint.
Args:
endpoint: Hologres service endpoint (use the VPC endpoint for VVR).
db_name: Hologres database name. May include a compute-group
suffix (e.g. ``"my_db/my_group"``).
table_name: Target Hologres table name. Use ``"schema.tableName"``
for non-public schemas.
username: Hologres account username (or AccessKey ID).
password: Hologres account password (or AccessKey Secret).
primary_key: Primary key column or list of columns. Required for
update/upsert semantics; should match the Hologres table's
primary key definition.
connection_pool_size: JDBC connection pool size for the sink.
connection_pool_name: Connection pool name; sinks sharing the
same name within a Flink TaskManager share connections.
connection_fixed: When ``True``, enable Hologres lightweight
(fixed) connection mode. ``None`` (the default) lets the
connector pick a value based on the Hologres engine version.
connection_max_idle_ms: Idle time (milliseconds) before a JDBC
connection in the pool is released.
connection_ssl_mode: SSL transport mode. One of ``"disable"``,
``"require"``, ``"verify-ca"``, or ``"verify-full"``.
connection_ssl_root_cert_location: Path to the CA certificate
file (under ``/flink/usrlib/<name>`` on VVP).
retry_count: Number of retries on connection failures.
retry_sleep_step_ms: Linear back-off step (milliseconds).
meta_cache_ttl_ms: TTL (milliseconds) of the local Hologres
``TableSchema`` cache.
serverless_computing: When ``True``, route bulk imports through
Hologres serverless compute.
write_mode: Write protocol. ``"AUTO"`` (default) lets the
Hologres connector pick the protocol based on the Flink
runtime mode and Hologres engine version — it selects a
``COPY_*`` path for batch jobs against Hologres 3.1+ and
otherwise falls back to ``"INSERT"``. ``"INSERT"`` forces
JDBC INSERTs. ``"COPY_STREAM"`` uses streaming COPY. The
two ``COPY_BULK_LOAD*`` modes perform bulk loads (PK
conflicts throw with ``COPY_BULK_LOAD``).
on_conflict_action: PK conflict strategy.
``"INSERT_OR_UPDATE"`` (default) updates conflicting rows,
``"INSERT_OR_IGNORE"`` skips them, and
``"INSERT_OR_REPLACE"`` deletes-then-inserts.
create_missing_partition: When ``True`` (the default), the
sink auto-creates a partition child table on first write
if it doesn't already exist. Set to ``False`` to require
that partitions be created out of band, in which case
writes to a missing partition will fail. Only valid in
``INSERT`` mode.
delete_strategy: How to handle retraction (-D, -U) records.
``"CHANGELOG_STANDARD"`` (default) processes them per
Flink's normal change-log semantics; the other values apply
to partial-update scenarios. ``"NON_PK_FIELD_TO_NULL"`` and
``"DELETE_ROW_ON_PK"`` require ``primary_key`` to be set.
ignore_null_when_update: When ``True``, ignore null values
during updates (effective only in INSERT mode).
ignore_null_when_update_by_expr: Same as above but for
expression-based updates (requires Hologres 4.0+).
default_for_not_null_column: When ``True`` (the default), auto-
fill defaults when writing null into NOT NULL columns.
remove_u0000_in_text: When ``True`` (the default), strip the
illegal ``\\u0000`` character from string columns.
partial_insert: When ``True``, only write the columns declared
in the Flink schema.
deduplication: When ``True`` (the default), deduplicate rows by
primary key inside each batch.
aggressive_flush: When ``True``, flush even partially-filled
batches when the sink is idle.
check_and_put_column: Column used for conditional updates. The
update only happens when
``new.<column> <operator> old.<column>`` evaluates true.
Not supported with any ``COPY_*`` ``write_mode``. ``"AUTO"``
is accepted because it normally resolves to ``"INSERT"``,
but be aware that batch jobs on Hologres 3.1+ may resolve
``"AUTO"`` to ``"COPY_BULK_LOAD*"``; in that case the
Hologres connector will reject the combination at submit
time. Set ``write_mode="INSERT"`` explicitly if you need
check-and-put in a batch job.
check_and_put_operator: Comparison operator for conditional
updates. Default is ``"GREATER"``.
check_and_put_null_as: Value used in place of NULL during the
comparison.
insert_batch_size: Maximum rows per INSERT batch.
insert_batch_byte_size: Maximum bytes per INSERT batch
(default: 2 MB).
insert_flush_interval_ms: Maximum wait time (milliseconds)
before flushing an INSERT batch.
copy_format: Wire format used in COPY modes. ``"binary"`` (the
default) is the only value accepted with
``write_mode='AUTO'`` or ``write_mode='INSERT'`` (AUTO may
resolve to INSERT at runtime). The ``COPY_*`` modes
additionally accept ``"text"`` (typically used with
``COPY_BULK_LOAD*``) and ``"binaryrow"`` (a ``COPY_STREAM``
optimization, Hologres 4.1+); the Hologres connector may
impose further constraints based on the engine version.
insert_conflict_update_set: SET clause executed on PK conflict
(a Hologres expression, e.g. ``"col=excluded.col + 1"``).
Conflicts with the ``check_and_put_*`` options.
insert_conflict_where: WHERE clause limiting which conflicting
rows are updated. Conflicts with the ``check_and_put_*``
options.
parallelism: Custom parallelism for the Hologres sink
operator. ``None`` (the default) leaves the option
unset, letting Flink inherit parallelism from the
upstream operator.
extra_options: Additional connector options forwarded
through to the underlying Hologres connector. This is
for options not exposed as named parameters of
``write_hologres``, e.g., ``connection.direct.enabled``,
``connection.max-alive-ms``. If a key matches an option
generated by a named parameter of ``write_hologres``,
the value in ``extra_options`` takes precedence.
``"connector"`` is reserved and must not be supplied.
Raises:
ValueError: If any enum value is invalid; if ``copy_format`` is
non-binary with ``write_mode='AUTO'`` or
``write_mode='INSERT'``; if ``connection_ssl_mode`` is
``"verify-ca"`` or ``"verify-full"`` without
``connection_ssl_root_cert_location``; if the
``check_and_put_*`` options are combined with an
incompatible ``on_conflict_action`` or with
``insert_conflict_where``; if ``check_and_put_column`` is
combined with an explicit ``COPY_*`` ``write_mode``; if
``delete_strategy`` is ``"NON_PK_FIELD_TO_NULL"`` or
``"DELETE_ROW_ON_PK"`` without ``primary_key``; or if
``extra_options`` contains a ``"connector"`` key.
Examples:
Streaming upsert sink (default AUTO write_mode — the
connector picks INSERT for streaming jobs and COPY paths for
batch/Holo 3.1+)::
import pyflink.dataframe as pf
events = pf.read_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.events",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
schema={
"event_id": pf.DataType.bigint(),
"user": pf.DataType.string(),
},
primary_key="event_id",
)
events.write_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.events_processed",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
primary_key="event_id",
)
High-throughput bulk load (COPY_BULK_LOAD on append-only data)::
df.write_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.daily_summary",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
write_mode="COPY_BULK_LOAD",
copy_format="text",
insert_batch_size=4096,
insert_batch_byte_size=8 * 1024 * 1024,
)
Conditional update with check-and-put (only overwrite when the
new ``version`` is greater)::
df.write_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.profiles",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
primary_key="user_id",
on_conflict_action="INSERT_OR_UPDATE",
check_and_put_column="version",
check_and_put_operator="GREATER",
check_and_put_null_as="0",
)
Partial update (write only listed columns), tuned batch size::
df.write_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.user_state",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
primary_key="user_id",
partial_insert=True,
insert_batch_size=1024,
insert_flush_interval_ms=2000,
ignore_null_when_update=True,
)
Forward-compat: tune a non-named option (and override a named
one) via ``extra_options``::
df.write_hologres(
"hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80",
db_name="my_db",
table_name="public.events",
username=ALIYUN_AK_ID,
password=ALIYUN_AK_SECRET,
extra_options={
"hologres.server.version": "3.1",
"sink.insert.batch-size": "2048", # overrides default
},
)
"""
_validate_write_hologres_options(
write_mode=write_mode,
on_conflict_action=on_conflict_action,
delete_strategy=delete_strategy,
copy_format=copy_format,
connection_ssl_mode=connection_ssl_mode,
connection_ssl_root_cert_location=connection_ssl_root_cert_location,
check_and_put_column=check_and_put_column,
insert_conflict_where=insert_conflict_where,
primary_key=primary_key,
)
from pyflink.dataframe.io import (
_add_connector_options,
_add_extra_options,
)
from pyflink.dataframe.util.artifacts import add_built_in_connector
from pyflink.table import TableDescriptor
options = [
("endpoint", endpoint),
("dbname", db_name),
("tablename", table_name),
("username", username),
("password", password),
("connection.pool.size", str(connection_pool_size)),
("connection.pool.name", connection_pool_name),
("connection.max-idle-ms", str(connection_max_idle_ms)),
("connection.ssl.mode", connection_ssl_mode),
("retry-count", str(retry_count)),
("retry-sleep-step-ms", str(retry_sleep_step_ms)),
("meta-cache-ttl-ms", str(meta_cache_ttl_ms)),
("serverless-computing.enabled", str(serverless_computing)),
("sink.write-mode", write_mode),
("sink.on-conflict-action", on_conflict_action),
("sink.create-missing-partition", str(create_missing_partition)),
("sink.delete-strategy", delete_strategy),
("sink.ignore-null-when-update.enabled", str(ignore_null_when_update)),
(
"sink.ignore-null-when-update-by-expr.enabled",
str(ignore_null_when_update_by_expr),
),
(
"sink.default-for-not-null-column.enabled",
str(default_for_not_null_column),
),
("sink.remove-u0000-in-text.enabled", str(remove_u0000_in_text)),
("sink.partial-insert.enabled", str(partial_insert)),
("sink.deduplication.enabled", str(deduplication)),
("sink.aggressive-flush.enabled", str(aggressive_flush)),
("sink.insert.check-and-put.operator", check_and_put_operator),
("sink.insert.batch-size", str(insert_batch_size)),
("sink.insert.batch-byte-size", str(insert_batch_byte_size)),
("sink.insert.flush-interval-ms", str(insert_flush_interval_ms)),
("sink.copy.format", copy_format),
]
if connection_fixed is not None:
options.append(("connection.fixed.enabled", str(connection_fixed)))
if connection_ssl_root_cert_location is not None:
options.append(
(
"connection.ssl.root-cert.location",
connection_ssl_root_cert_location,
)
)
if check_and_put_column is not None:
options.append(
("sink.insert.check-and-put.column", check_and_put_column)
)
if check_and_put_null_as is not None:
options.append(
("sink.insert.check-and-put.null-as", check_and_put_null_as)
)
if insert_conflict_update_set is not None:
options.append(
("sink.insert.conflict-update-set", insert_conflict_update_set)
)
if insert_conflict_where is not None:
options.append(
("sink.insert.conflict-where", insert_conflict_where)
)
if parallelism is not None:
options.append(("sink.parallelism", str(parallelism)))
t_env = self._table._t_env
add_built_in_connector(t_env, "hologres")
resolved_schema = self._table.get_resolved_schema()
schema_builder = _build_physical_schema_from_resolved_schema(
resolved_schema,
primary_key=primary_key,
)
builder = TableDescriptor.for_connector("hologres").schema(
schema_builder.build()
)
builder = _add_connector_options(builder, options)
builder = _add_extra_options(builder, extra_options)
result = self._table.execute_insert(builder.build())
target = t_env.get_config().get("execution.target", None)
if target in ("local", "minicluster"):
result.wait()
[docs]
def write_paimon(
self,
path: Optional[str] = None,
*,
primary_key: Optional[Union[str, List[str]]] = None,
auto_create: bool = False,
file_format: Literal["orc", "parquet", "avro", "lance"] = "parquet",
bucket: int = 1,
bucket_key: Optional[Union[str, List[str]]] = None,
changelog_producer: Literal[
"none", "input", "full-compaction", "lookup"
] = "none",
full_compaction_delta_commits: Optional[int] = None,
lookup_cache_max_memory_size: str = "256 MB",
merge_engine: Literal[
"deduplicate", "partial-update", "aggregation"
] = "deduplicate",
partial_update_ignore_delete: bool = False,
ignore_delete: bool = False,
partition_default_name: str = "__DEFAULT_PARTITION__",
partition_expiration_check_interval: str = "1h",
partition_expiration_time: Optional[str] = None,
partition_timestamp_formatter: Optional[str] = None,
partition_timestamp_pattern: Optional[str] = None,
snapshot_num_retained_max: int = 2147483647,
snapshot_num_retained_min: int = 10,
snapshot_time_retained: str = "1h",
write_mode: Literal["change-log", "append-only"] = "change-log",
parallelism: Optional[int] = None,
clustering_by_columns: Optional[Union[str, List[str]]] = None,
delete_strategy: Literal[
"NONE",
"IGNORE_DELETE",
"NON_PK_FIELD_TO_NULL",
"DELETE_ROW_ON_PK",
"CHANGELOG_STANDARD",
] = "NONE",
extra_options: Optional[Dict[str, Any]] = None,
) -> None:
"""
Write the DataFrame to a Paimon table.
Uses the Paimon SQL connector under the hood. The sink schema is built
from the DataFrame's physical columns and the connector options are
exposed with Pythonic snake_case names.
Args:
path: File-system path of the Paimon table.
primary_key: Optional primary key column name or list of column
names. Set this when the sink table should include a primary
key constraint.
auto_create: Whether to create the underlying storage when writing
the table. Default is False.
file_format: Format of Paimon data files. One of ``"orc"``,
``"parquet"``, ``"avro"``, or ``"lance"``. Default is
``"parquet"``.
bucket: Bucket number for the file store.
bucket_key: Column name or list of column names defining the
Paimon distribution policy. Data is assigned to each bucket
according to the hash value of these fields. When unset,
Paimon uses the primary key, or the full row if no primary key
exists.
changelog_producer: Whether and how to double-write to changelog
files. Changelog files keep data-change details and can be
read directly during stream reads. ``"none"`` produces no
changelog file, ``"input"`` writes input changes when flushing
the memory table, ``"full-compaction"`` generates changelog
files during full compaction, and ``"lookup"`` produces
changelog by lookup before committing snapshots.
full_compaction_delta_commits: Maximum number of committed
snapshots between two full compactions.
lookup_cache_max_memory_size: Memory size used by lookup cache and
the lookup changelog producer cache. Default is ``"256 MB"``.
merge_engine: Merge engine for tables with primary keys.
``"deduplicate"`` de-duplicates and keeps the last row,
``"partial-update"`` updates non-null fields, and
``"aggregation"`` aggregates rows with the same primary key.
partial_update_ignore_delete: Whether to ignore delete records in
partial-update mode.
ignore_delete: Whether to ignore delete records.
partition_default_name: Default partition name used when a dynamic
partition column value is null or an empty string.
partition_expiration_check_interval: Interval for checking expired
partitions.
partition_expiration_time: Partition retention time. When unset,
partitions never expire.
partition_timestamp_formatter: Formatter for converting partition
time strings to timestamps.
partition_timestamp_pattern: Pattern for extracting a time string
from partition values.
snapshot_num_retained_max: Maximum number of completed snapshots
to retain.
snapshot_num_retained_min: Minimum number of completed snapshots
to retain.
snapshot_time_retained: Maximum time to retain completed
snapshots.
write_mode: Paimon table write mode. ``"append-only"`` accepts
only append-only inserts and does not perform deduplication or
primary key constraints. ``"change-log"`` accepts insert,
delete, and update operations.
parallelism: Custom sink parallelism. If unset, the planner
derives statement parallelism while considering global
configuration.
clustering_by_columns: Column name or list of column names used
for clustering append-only table writes in batch jobs.
delete_strategy: Validation strategy for sink retraction messages.
One of ``"NONE"``, ``"IGNORE_DELETE"``,
``"NON_PK_FIELD_TO_NULL"``, ``"DELETE_ROW_ON_PK"``, or
``"CHANGELOG_STANDARD"``.
extra_options: Additional connector options forwarded
through to the underlying Paimon connector. This is for options
not exposed as named parameters of ``write_paimon``. If a key
matches an option generated by a named parameter of `write_paimon`,
the value in ``extra_options`` takes precedence. ``"connector"`` is
reserved and must not be supplied.
Example::
>>> import pyflink.dataframe as pf
>>>
>>> events = pf.from_records(
... [("2026-05-27", 1, "login"), ("2026-05-27", 2, "pay")],
... schema=["dt", "user_id", "payload"],
... )
>>>
>>> # Append-only table with explicit bucket distribution
>>> events.write_paimon(
... "oss://bucket/warehouse/db.db/events",
... auto_create=True,
... write_mode="append-only",
... file_format="orc",
... bucket=16,
... bucket_key=["dt", "user_id"],
... parallelism=4,
... )
>>>
>>> # Batch append-only write with clustering
>>> events.write_paimon(
... "oss://bucket/warehouse/db.db/events_clustered",
... auto_create=True,
... write_mode="append-only",
... bucket_key=["dt", "user_id"],
... clustering_by_columns=["dt", "user_id"],
... )
>>>
>>> order_schema = {
... "order_id": pf.DataType.int64(),
... "amount": pf.DataType.float64(),
... "status": pf.DataType.string(),
... "event_ts": pf.DataType.timestamp(3),
... }
>>> orders = pf.read_paimon(
... "oss://bucket/warehouse/db.db/orders",
... schema=order_schema,
... primary_key="order_id",
... )
>>>
>>> # Change-log sink with lookup changelog production
>>> orders.write_paimon(
... "oss://bucket/warehouse/db.db/orders",
... primary_key="order_id",
... auto_create=True,
... bucket_key=["order_id"],
... changelog_producer="lookup",
... merge_engine="deduplicate",
... delete_strategy="CHANGELOG_STANDARD",
... snapshot_num_retained_min=5,
... snapshot_time_retained="12h",
... )
>>>
>>> # Partial-update sink that ignores delete records
>>> orders.write_paimon(
... "oss://bucket/warehouse/db.db/orders_partial",
... primary_key="order_id",
... auto_create=True,
... merge_engine="partial-update",
... ignore_delete=True,
... lookup_cache_max_memory_size="1 GB",
... )
>>>
>>> metrics_schema = {
... "product_id": pf.DataType.int64(),
... "price": pf.DataType.float64(),
... "sales": pf.DataType.int64(),
... }
>>> metrics = pf.read_paimon(
... "oss://bucket/warehouse/db.db/product_metrics",
... schema=metrics_schema,
... primary_key="product_id",
... )
>>>
>>> # Aggregation merge engine with original Paimon option keys
>>> metrics.write_paimon(
... "oss://bucket/warehouse/db.db/product_metrics_agg",
... primary_key="product_id",
... auto_create=True,
... merge_engine="aggregation",
... extra_options={
... "fields.price.aggregate-function": "max",
... "fields.sales.aggregate-function": "sum",
... },
... )
"""
from pyflink.dataframe.io import (
_PAIMON_DELETE_STRATEGIES,
_add_connector_options,
_add_extra_options,
_format_paimon_column_list,
_paimon_common_options,
_validate_allowed_value,
_validate_paimon_common_options,
)
from pyflink.dataframe.util.artifacts import add_built_in_connector
from pyflink.table import TableDescriptor
t_env = self._table._t_env
add_built_in_connector(t_env, "paimon")
_validate_paimon_common_options(
file_format=file_format,
changelog_producer=changelog_producer,
merge_engine=merge_engine,
write_mode=write_mode,
)
_validate_allowed_value(
delete_strategy,
"delete_strategy",
_PAIMON_DELETE_STRATEGIES,
)
resolved_schema = self._table.get_resolved_schema()
schema_builder = _build_physical_schema_from_resolved_schema(
resolved_schema,
primary_key=primary_key,
)
paimon_options = _paimon_common_options(
path=path,
auto_create=auto_create,
file_format=file_format,
bucket=bucket,
bucket_key=bucket_key,
changelog_producer=changelog_producer,
full_compaction_delta_commits=full_compaction_delta_commits,
lookup_cache_max_memory_size=lookup_cache_max_memory_size,
merge_engine=merge_engine,
partial_update_ignore_delete=partial_update_ignore_delete,
ignore_delete=ignore_delete,
partition_default_name=partition_default_name,
partition_expiration_check_interval=partition_expiration_check_interval,
partition_expiration_time=partition_expiration_time,
partition_timestamp_formatter=partition_timestamp_formatter,
partition_timestamp_pattern=partition_timestamp_pattern,
snapshot_num_retained_max=snapshot_num_retained_max,
snapshot_num_retained_min=snapshot_num_retained_min,
snapshot_time_retained=snapshot_time_retained,
write_mode=write_mode,
)
paimon_options.extend([
("sink.parallelism", parallelism),
(
"sink.clustering.by-columns",
_format_paimon_column_list(
clustering_by_columns, "clustering_by_columns"
),
),
("sink.delete-strategy", delete_strategy),
])
builder = TableDescriptor.for_connector("paimon") \
.schema(schema_builder.build())
builder = _add_connector_options(builder, paimon_options)
builder = _add_extra_options(builder, extra_options)
result = self._table.execute_insert(builder.build())
target = t_env.get_config().get("execution.target", None)
if target in ("local", "minicluster"):
result.wait()
[docs]
def write_odps(
self,
endpoint: str,
*,
tunnel_endpoint: Optional[str] = None,
project: str,
schema_name: Optional[str] = None,
table_name: str,
access_id: str,
access_key: str,
partition: Optional[
Union[str, Tuple[str, Any], List[Tuple[str, Any]], List[str]]
] = None,
compress_algorithm: Literal["RAW", "ZLIB", "SNAPPY"] = "SNAPPY",
quota_name: Optional[str] = None,
# Sink options
use_stream_tunnel: bool = False,
flush_interval_ms: int = 30000,
batch_size: int = 64 * 1024 * 1024,
num_flush_threads: int = 1,
slot_num: int = 0,
dynamic_partition_limit: int = 100,
retry_times: int = 3,
sleep_millis: int = 1000,
enable_upsert: bool = False,
upsert_async_commit: bool = False,
upsert_commit_timeout_ms: int = 120000,
operation: Literal["insert", "upsert"] = "insert",
parallelism: Optional[int] = None,
file_cached_enable: bool = False,
file_cached_writer_num: int = 16,
bucket_check_interval: int = 60000,
file_cached_rolling_max_size: str = "16mb",
file_cached_memory: str = "64mb",
file_cached_memory_segment_size: str = "128kb",
file_cached_flush_always: bool = True,
file_cached_write_max_retries: int = 3,
upsert_writer_max_retries: int = 3,
upsert_writer_buffer_size: str = "64mb",
upsert_writer_bucket_buffer_size: str = "1mb",
upsert_write_bucket_num: Optional[int] = None,
upsert_write_slot_num: int = 1,
upsert_commit_max_retries: int = 3,
upsert_commit_thread_num: int = 16,
upsert_commit_timeout: int = 600,
upsert_flush_concurrent: int = 2,
insert_commit_thread_num: int = 16,
insert_arrow_writer_enable: bool = False,
insert_arrow_writer_batch_size: int = 512,
insert_arrow_writer_flush_interval: int = 100000,
insert_writer_buffer_size: str = "64mb",
upsert_partial_column_enable: bool = False,
) -> None:
"""
Write the DataFrame to a MaxCompute (ODPS) table.
Uses the ODPS SQL connector under the hood. The DataFrame schema is
derived from the current DataFrame, and connector options are exposed
with Pythonic snake_case names in the same order as the ODPS
documentation.
Args:
endpoint: MaxCompute endpoint.
tunnel_endpoint: MaxCompute Tunnel endpoint. If unset,
MaxCompute allocates tunnel connections through Server Load
Balancer (SLB).
project: MaxCompute project name.
schema_name: Required only when the MaxCompute schema feature is
enabled. Set this to the table's schema name.
table_name: MaxCompute table name.
access_id: AccessKey ID used to access MaxCompute.
access_key: AccessKey secret used to access MaxCompute.
partition: Partition name in the MaxCompute table.
Examples:
- To write to fixed partition column ``dt`` with value
``20220901``, pass ``("dt", "20220901")``.
- For a three-level partition table ``dt``/``hh``/``mm``,
to write to ``dt=20220901,hh=08,mm=10``, pass
``[("dt", "20220901"), ("hh", "08"), ("mm", "10")]``.
- To write dynamic partitions from a single partition column
``partition_name``, pass ``"partition_name"``.
- For multi-level dynamic partition columns ``dt``/``hh``/``mm``, pass
``["dt", "hh", "mm"]``.
compress_algorithm: Compression algorithm for MaxCompute Tunnel.
Valid values are ``"RAW"`` (no compression), ``"ZLIB"``, and
``"SNAPPY"``. ``"RAW"`` cannot be used when
``enable_upsert`` is true.
quota_name: Quota name for exclusive MaxCompute Tunnel resource
groups. If specified, remove ``tunnel_endpoint``; otherwise
the tunnel specified by ``tunnel_endpoint`` takes precedence.
use_stream_tunnel: Use MaxCompute Streaming Tunnel instead of
Batch Tunnel. ``True`` selects Streaming Tunnel and ``False``
selects Batch Tunnel.
flush_interval_ms: Flush interval for the tunnel writer buffer,
in milliseconds. For Streaming Tunnel, flushed data is
immediately available. For Batch Tunnel, data becomes
available only after checkpointing; set to ``0`` to disable
scheduled flushing. Triggered when either
``flush_interval_ms`` or ``batch_size`` is reached.
batch_size: Buffer size in bytes. Data is flushed when the buffer
reaches this size. Triggered when either ``batch_size`` or
``flush_interval_ms`` is reached.
num_flush_threads: Number of threads used to flush the tunnel
writer buffer. Values greater than 1 allow concurrent
flushing across partitions.
slot_num: Number of Tunnel slots for receiving data from Flink.
dynamic_partition_limit: Maximum number of dynamic partitions
written between two checkpoints. Writing to many partitions
increases load on MaxCompute and slows checkpointing.
Increase this only when your workload requires it.
retry_times: Maximum retries for MaxCompute server requests,
including session creation, submission, and flush failures.
sleep_millis: Retry interval in milliseconds.
enable_upsert: Use MaxCompute Upsert Tunnel. ``True`` processes
INSERT, UPDATE_AFTER, and DELETE records. ``False`` uses the
tunnel specified by ``use_stream_tunnel``. If session commits
in upsert mode encounter errors or long-running faults, set
sink operator parallelism to 10 or fewer.
upsert_async_commit: Use asynchronous mode when committing upsert
sessions. Async mode reduces commit time, but committed data
is not immediately queryable.
upsert_commit_timeout_ms: Timeout for upsert session commits, in
milliseconds.
operation: Write mode for a Delta table. ``"insert"`` is append
mode; ``"upsert"`` is update mode and requires
``upsert_write_bucket_num``.
parallelism: Write parallelism for a Delta table. Defaults to
upstream parallelism. ``write.bucket.num`` must be an integral
multiple of this value for optimal write performance and
memory efficiency.
file_cached_enable: Enable file cache mode when writing to dynamic
partitions of a Delta table. This reduces small files written
to the server but increases write latency. Enable when the
sink has high parallelism.
file_cached_writer_num: Concurrent upload threads per task in
file cache mode. Avoid setting this too high because writing
to many partitions simultaneously can cause OOM errors.
Effective only when ``file_cached_enable`` is true.
bucket_check_interval: File size check interval in file cache
mode, in milliseconds. Effective only when
``file_cached_enable`` is true.
file_cached_rolling_max_size: Maximum size of a single cached
file. When exceeded, data is uploaded to the server. Effective
only when ``file_cached_enable`` is true.
file_cached_memory: Maximum off-heap memory for file writes in
file cache mode. Effective only when ``file_cached_enable`` is
true.
file_cached_memory_segment_size: Buffer segment size for file
writes in file cache mode. Effective only when
``file_cached_enable`` is true.
file_cached_flush_always: Whether to use the cache when writing
files in file cache mode. Effective only when
``file_cached_enable`` is true.
file_cached_write_max_retries: Retry count for data uploads in
file cache mode. Effective only when ``file_cached_enable`` is
true.
upsert_writer_max_retries: Maximum retries for writing to a bucket
in an Upsert Writer session.
upsert_writer_buffer_size: Total buffer size across all buckets in
an Upsert Writer session. Data is flushed when the total
reaches this threshold. Increase for better write efficiency;
decrease if writing to many partitions causes OOM errors.
upsert_writer_bucket_buffer_size: Per-bucket buffer size in an
Upsert Writer session. Decrease if Flink server memory is
insufficient.
upsert_write_bucket_num: Number of buckets for the target Delta
table. Must match ``write.bucket.num`` configured on the Delta
table.
upsert_write_slot_num: Tunnel slots per upsert session.
upsert_commit_max_retries: Maximum retries for upsert session
commits.
upsert_commit_thread_num: Parallelism for upsert session commits.
Avoid large values because excessive concurrent commits
increase resource consumption and can cause performance
issues.
upsert_commit_timeout: Upsert session commit timeout in seconds.
upsert_flush_concurrent: Maximum concurrent bucket flushes per
partition. Each bucket flush occupies a Tunnel slot.
insert_commit_thread_num: Parallelism for insert session commits.
insert_arrow_writer_enable: Use the Arrow format for inserts.
insert_arrow_writer_batch_size: Maximum rows per Arrow-format
batch.
insert_arrow_writer_flush_interval: Writer flush interval in
milliseconds.
insert_writer_buffer_size: Cache size for the buffered writer.
upsert_partial_column_enable: Update only specified columns
(partial column update). Applies only to Delta table sinks.
When true, if a record with the same primary key exists,
specified non-null fields are overwritten. If no matching
record exists, a new record is inserted with new values for
specified columns and null for all unspecified columns.
Example::
>>> import pyflink.dataframe as pf
>>> df = pf.from_records([(1, "a"), (2, "b")],
... schema=["id", "name"])
>>> df.write_odps(
... "http://service.cn-hangzhou.maxcompute.aliyun.com/api",
... project="my_project",
... table_name="events",
... access_id="${secret_values.ak_id}",
... access_key="${secret_values.ak_secret}",
... partition="ds=20260428",
... )
"""
from pyflink.dataframe.util.artifacts import add_built_in_connector
from pyflink.dataframe.io import (
_ODPS_COMPRESS_ALGORITHMS,
_ODPS_SINK_OPERATIONS,
_add_connector_options,
_format_odps_partition,
_validate_allowed_value,
_validate_odps_common_options,
)
from pyflink.table import TableDescriptor
t_env = self._table._t_env
add_built_in_connector(t_env, "odps")
_validate_allowed_value(
compress_algorithm,
"compress_algorithm",
_ODPS_COMPRESS_ALGORITHMS,
)
_validate_allowed_value(
operation,
"operation",
_ODPS_SINK_OPERATIONS,
)
_validate_odps_common_options(
endpoint=endpoint,
project=project,
table_name=table_name,
access_id=access_id,
access_key=access_key,
tunnel_endpoint=tunnel_endpoint,
quota_name=quota_name,
)
partition = _format_odps_partition(
partition, allow_dynamic=True
)
options = [
("endpoint", endpoint),
("tunnelEndpoint", tunnel_endpoint),
("project", project),
("schemaName", schema_name),
("tableName", table_name),
("accessId", access_id),
("accessKey", access_key),
("partition", partition),
("compressAlgorithm", compress_algorithm),
("quotaName", quota_name),
("useStreamTunnel", use_stream_tunnel),
("flushIntervalMs", flush_interval_ms),
("batchSize", batch_size),
("numFlushThreads", num_flush_threads),
("slotNum", slot_num),
("dynamicPartitionLimit", dynamic_partition_limit),
("retryTimes", retry_times),
("sleepMillis", sleep_millis),
("enableUpsert", enable_upsert),
("upsertAsyncCommit", upsert_async_commit),
("upsertCommitTimeoutMs", upsert_commit_timeout_ms),
("sink.operation", operation),
("sink.parallelism", parallelism),
("sink.file-cached.enable", file_cached_enable),
("sink.file-cached.writer.num", file_cached_writer_num),
("sink.bucket.check-interval", bucket_check_interval),
("sink.file-cached.rolling.max-size",
file_cached_rolling_max_size),
("sink.file-cached.memory", file_cached_memory),
("sink.file-cached.memory.segment-size",
file_cached_memory_segment_size),
("sink.file-cached.flush.always",
file_cached_flush_always),
("sink.file-cached.write.max-retries",
file_cached_write_max_retries),
("upsert.writer.max-retries", upsert_writer_max_retries),
("upsert.writer.buffer-size", upsert_writer_buffer_size),
("upsert.writer.bucket.buffer-size",
upsert_writer_bucket_buffer_size),
("upsert.write.bucket.num", upsert_write_bucket_num),
("upsert.write.slot-num", upsert_write_slot_num),
("upsert.commit.max-retries", upsert_commit_max_retries),
("upsert.commit.thread-num", upsert_commit_thread_num),
("upsert.commit.timeout", upsert_commit_timeout),
("upsert.flush.concurrent", upsert_flush_concurrent),
("insert.commit.thread-num", insert_commit_thread_num),
("insert.arrow-writer.enable", insert_arrow_writer_enable),
("insert.arrow-writer.batch-size",
insert_arrow_writer_batch_size),
("insert.arrow-writer.flush-interval",
insert_arrow_writer_flush_interval),
("insert.writer.buffer-size", insert_writer_buffer_size),
("upsert.partial-column.enable",
upsert_partial_column_enable),
]
resolved_schema = self._table.get_resolved_schema()
schema_builder = _build_physical_schema_from_resolved_schema(
resolved_schema
)
builder = TableDescriptor.for_connector("odps") \
.schema(schema_builder.build())
builder = _add_connector_options(builder, options)
result = self._table.execute_insert(builder.build())
target = t_env.get_config().get("execution.target", None)
if target in ("local", "minicluster"):
result.wait()
[docs]
def write_custom(
self,
connector: str,
*,
primary_key: Optional[Union[str, List[str]]] = None,
options: Optional[Dict[str, Any]] = None,
) -> None:
"""
Write the DataFrame using a custom connector.
This is a generic entrypoint for connectors that are not covered
by the dedicated ``write_*`` helpers (e.g. ``write_parquet``,
``write_kafka``, ``write_odps``). It targets SQL connectors that are
discoverable through Flink's standard factory mechanism.
The DataFrame's physical columns are forwarded to the sink. Primary
key constraints are not inherited from upstream tables; pass
``primary_key`` when the sink schema should include one.
Args:
connector: Factory identifier of the connector. This is the
same value users put in SQL DDL ``WITH ('connector' =
'...')``.
primary_key: Optional primary key column name or list of column
names. Set this when the custom sink connector should receive
a primary key constraint in its table schema.
options: Optional dict of connector and table options. Each
key is forwarded as-is with the same name it would have
in ``WITH (...)`` in SQL DDL. Values are converted to
strings. ``"connector"`` is reserved
and must be specified via the ``connector`` argument.
Example::
>>> import pyflink.dataframe as pf
>>>
>>> df = pf.from_records([(1, "a")], schema=["id", "name"])
>>>
>>> # Write through a custom connector identified by ``"my-custom"``.
>>> df.write_custom(
... "my-custom",
... options={
... "host": "example.com",
... "port": "9000",
... },
... )
>>>
>>> # Use a format and additional sink options
>>> df.write_custom(
... "my-custom",
... primary_key="id",
... options={
... "endpoint": "example.com:9000",
... "format": "json",
... "json.map-null-key.mode": "FAIL",
... "sink.parallelism": "4",
... },
... )
"""
from pyflink.dataframe.io import _collect_custom_options
from pyflink.dataframe.util.artifacts import add_built_in_connector
from pyflink.table import TableDescriptor
if not isinstance(connector, str) or not connector:
raise ValueError(
"'connector' must be a non-empty string identifying the "
"connector (the same value used as 'connector' in SQL "
"DDL)."
)
t_env = self._table._t_env
# If `connector` is a built-in connector that we do not
# expose directly via a dedicated write_* helper, auto-load its jar.
# Unknown / user-defined connector names are silently ignored by the
# helper.
add_built_in_connector(t_env, connector)
connector_options = _collect_custom_options(options)
builder = TableDescriptor.for_connector(connector)
resolved_schema = self._table.get_resolved_schema()
schema_builder = _build_physical_schema_from_resolved_schema(
resolved_schema,
primary_key=primary_key,
)
builder = builder.schema(schema_builder.build())
for key, value in connector_options.items():
builder = builder.option(key, str(value))
result = self._table.execute_insert(builder.build())
target = t_env.get_config().get("execution.target", None)
if target in ("local", "minicluster"):
result.wait()
# ======================== Explain ========================
[docs]
def explain(
self,
*,
show_estimated_cost: bool = False,
show_physical_execution_plan: bool = False,
) -> None:
"""
Print the AST and execution plan of this DataFrame.
Args:
show_estimated_cost: If True, include the optimizer's estimated cost
(row count, cpu, io, etc.) for each physical node. Default is False.
show_physical_execution_plan: If True, include the physical execution plan
in JSON format. Default is False.
Example::
>>> import pyflink.dataframe as pf
>>> df = pf.from_records([(1, "a"), (2, "b")], schema=["id", "name"])
>>> df.explain()
== Abstract Syntax Tree ==
LogicalTableScan(table=[[*anonymous_python-input-format$1*]])
<BLANKLINE>
== Optimized Physical Plan ==
TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, name])
<BLANKLINE>
== Optimized Execution Plan ==
TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, name])
>>> df.explain(show_estimated_cost=True)
== Abstract Syntax Tree ==
LogicalTableScan(table=[[*anonymous_python-input-format$1*]])
<BLANKLINE>
== Optimized Physical Plan ==
TableSourceScan(...): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, ...}
<BLANKLINE>
== Optimized Execution Plan ==
TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, name])
>>> df.explain(show_physical_execution_plan=True)
== Abstract Syntax Tree ==
LogicalTableScan(table=[[*anonymous_python-input-format$1*]])
<BLANKLINE>
== Optimized Physical Plan ==
TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, name])
<BLANKLINE>
== Optimized Execution Plan ==
TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, name])
<BLANKLINE>
== Physical Execution Plan ==
{
"nodes" : [ {
"id" : 9,
"type" : "Source: *anonymous_filesystem$1*[9]",
"pact" : "Data Source",
"contents" : "[9]:TableSourceScan(table=[[*anonymous_filesystem$1*, filter=[IS...",
"parallelism" : 24
} ]
}
"""
from pyflink.table import ExplainDetail
extra_details = []
if show_estimated_cost:
extra_details.append(ExplainDetail.ESTIMATED_COST)
if show_physical_execution_plan:
extra_details.append(ExplainDetail.JSON_EXECUTION_PLAN)
print(self._table.explain(*extra_details))
# ======================== Properties ========================
@property
def schema(self) -> "TableSchema":
"""
Get the schema of the DataFrame.
Returns:
The TableSchema.
"""
return self._table.get_schema()
@property
def columns(self) -> List[str]:
"""
Get the column names.
Returns:
A list of column names.
"""
return self._table.get_schema().get_field_names()
# ======================== Null/NaN Handling ========================
def _get_float_columns(self) -> List[str]:
"""Return column names with FloatType or DoubleType."""
schema = self._table.get_schema()
return [
name for name in schema.get_field_names()
if isinstance(schema.get_field_data_type(name), (FloatType, DoubleType))
]
[docs]
def drop_null(self, subset: Optional[List[str]] = None) -> "DataFrame":
"""
Drop rows containing null values.
Does NOT drop rows with NaN — use :meth:`drop_nan` for that.
Args:
subset: Column names to check. If None, checks all columns.
Returns:
A new DataFrame with rows containing nulls removed.
Example::
>>> df.drop_null() # drop if any column is null
>>> df.drop_null(subset=["a"]) # drop if column "a" is null
"""
schema_columns = self._table.get_schema().get_field_names()
columns = subset if subset is not None else schema_columns
if not columns:
return self
schema_set = set(schema_columns)
for c in columns:
if c not in schema_set:
raise ValueError(f"Column '{c}' not found in schema")
conditions = [col(c).is_not_null for c in columns]
predicate = conditions[0] if len(conditions) == 1 else and_(*conditions)
return DataFrame(self._table.filter(predicate))
[docs]
def drop_nan(self, subset: Optional[List[str]] = None) -> "DataFrame":
"""
Drop rows containing NaN values in float columns.
Does NOT drop rows with null — use :meth:`drop_null` for that.
Non-float columns in ``subset`` are silently ignored since NaN
only applies to float/double types.
Args:
subset: Column names to check. If None, checks all float columns.
Returns:
A new DataFrame with rows containing NaN removed.
Example::
>>> df.drop_nan() # drop if any float column is NaN
>>> df.drop_nan(subset=["a"]) # drop if float column "a" is NaN
"""
float_columns = set(self._get_float_columns())
schema_columns = set(self._table.get_schema().get_field_names())
if subset is not None:
for c in subset:
if c not in schema_columns:
raise ValueError(f"Column '{c}' not found in schema")
columns = [c for c in subset if c in float_columns]
else:
columns = list(float_columns)
if not columns:
return self
# Use isnan() function (VVR built-in) to detect NaN.
# isnan(NULL) returns NULL, and ~NULL (isNotTrue) returns true,
# so null rows are correctly preserved.
conditions = [~call_sql(f'isnan(`{c}`)') for c in columns]
predicate = conditions[0] if len(conditions) == 1 else and_(*conditions)
return DataFrame(self._table.filter(predicate))
[docs]
def fill_null(self, value: Any, subset: Optional[List[str]] = None) -> "DataFrame":
"""
Replace null values with a given value.
Does NOT replace NaN — use :meth:`fill_nan` for that.
Args:
value: The value to replace nulls with.
subset: Column names to fill. If None, fills all columns.
Returns:
A new DataFrame with null values replaced.
Example::
>>> df.fill_null(0) # fill all columns
>>> df.fill_null(0, subset=["a"]) # fill only column "a"
"""
schema_columns = self._table.get_schema().get_field_names()
columns = subset if subset is not None else schema_columns
if not columns:
return self
schema_set = set(schema_columns)
for c in columns:
if c not in schema_set:
raise ValueError(f"Column '{c}' not found in schema")
replacement_exprs = [
col(c).if_null(lit(value)).alias(c) for c in columns
]
return DataFrame(self._table.add_or_replace_columns(*replacement_exprs))
[docs]
def fill_nan(self, value: Any, subset: Optional[List[str]] = None) -> "DataFrame":
"""
Replace NaN values with a given value in float columns.
Does NOT replace null — use :meth:`fill_null` for that.
Non-float columns in ``subset`` are silently ignored since NaN
only applies to float/double types.
Args:
value: The value to replace NaN with.
subset: Column names to fill. If None, fills all float columns.
Returns:
A new DataFrame with NaN values replaced.
Example::
>>> df.fill_nan(0.0) # fill all float columns
>>> df.fill_nan(0.0, subset=["a"]) # fill only float column "a"
"""
float_columns = set(self._get_float_columns())
schema_columns = set(self._table.get_schema().get_field_names())
if subset is not None:
for c in subset:
if c not in schema_columns:
raise ValueError(f"Column '{c}' not found in schema")
columns = [c for c in subset if c in float_columns]
else:
columns = list(float_columns)
if not columns:
return self
# Use isnan() function (VVR built-in) to detect NaN.
# isnan(NULL) returns NULL (falsy), so then() returns col(c)
# (the NULL value) — nulls are preserved.
replacement_exprs = [
call_sql(f'isnan(`{c}`)').then(lit(value), col(c)).alias(c)
for c in columns
]
return DataFrame(self._table.add_or_replace_columns(*replacement_exprs))
# ======================== Special Methods ========================
def __getitem__(self, key) -> Union["DataFrame", Expression]:
"""
Support subscript access.
- df["a"]: Return an Expression for column "a"
- df[["a", "b"]]: Return a DataFrame with selected columns
- df[boolean_expression]: Return a filtered DataFrame
"""
if isinstance(key, str):
return col(key)
elif isinstance(key, (list, tuple)):
return self.select(*key)
elif isinstance(key, Expression):
return self.filter(key)
else:
raise TypeError(f"Unsupported key type: {type(key)}")
def __repr__(self) -> str:
return f"DataFrame(columns={self.columns})"
class GroupedDataFrame:
"""
A grouped DataFrame for aggregation operations.
"""
def __init__(self, df: DataFrame, group_keys: List[Expression]):
self._df = df
self._group_keys = group_keys
[docs]
def agg(self, *aggs: Expression) -> DataFrame:
"""
Apply aggregation expressions.
Args:
*aggs: Aggregation expressions.
Returns:
A new DataFrame with aggregation results.
Example::
>>> import pyflink.dataframe as pf
>>> df = pf.from_records(
... [("A", 1), ("A", 2), ("B", 3)],
... schema=["category", "value"],
... )
>>> # Group by and aggregate
>>> result = df.group_by("category").agg(
... pf.col("category"),
... pf.col("value").sum.alias("total")
... )
"""
grouped_table = self._df.to_table().group_by(*self._group_keys)
return DataFrame(grouped_table.select(*aggs))
def _build_physical_schema_from_resolved_schema(
resolved_schema,
primary_key: Optional[Union[str, List[str]]] = None,
):
from pyflink.table import Schema
from pyflink.dataframe.io import _normalize_primary_key
physical_columns = [
column
for column in resolved_schema.get_columns()
if column.is_physical()
]
physical_column_names = [
column.get_name()
for column in physical_columns
]
primary_key_columns = _normalize_primary_key(
primary_key,
physical_column_names,
)
primary_key_set = set(primary_key_columns)
schema_builder = Schema.new_builder()
for column in physical_columns:
data_type = column.get_data_type()
if column.get_name() in primary_key_set:
data_type = data_type.not_null()
schema_builder.column(column.get_name(), data_type)
if primary_key_columns:
schema_builder.primary_key(*primary_key_columns)
return schema_builder
def _validate_sls_buckets(value: int) -> None:
"""Raise ValueError unless ``value`` is a power of 2 in [1, 256]."""
if (
isinstance(value, bool)
or not isinstance(value, int)
or value < 1
or value > 256
):
raise ValueError(f"buckets must be a power of 2 in [1, 256], got: {value}")
if value & (value - 1) != 0:
raise ValueError(f"buckets must be a power of 2 in [1, 256], got: {value}")
def _validate_write_hologres_options(
*,
write_mode,
on_conflict_action,
delete_strategy,
copy_format,
connection_ssl_mode,
connection_ssl_root_cert_location,
check_and_put_column,
insert_conflict_where,
primary_key,
) -> None:
from pyflink.dataframe.io import (
_HOLOGRES_COPY_FORMATS,
_HOLOGRES_DELETE_STRATEGIES,
_HOLOGRES_ON_CONFLICT_ACTIONS,
_HOLOGRES_SSL_MODES,
_HOLOGRES_WRITE_MODES,
_validate_allowed_value,
_validate_hologres_ssl_pair,
)
_validate_allowed_value(write_mode, "write_mode", _HOLOGRES_WRITE_MODES)
_validate_allowed_value(
on_conflict_action,
"on_conflict_action",
_HOLOGRES_ON_CONFLICT_ACTIONS,
)
_validate_allowed_value(
delete_strategy, "delete_strategy", _HOLOGRES_DELETE_STRATEGIES
)
_validate_allowed_value(
copy_format, "copy_format", _HOLOGRES_COPY_FORMATS
)
_validate_allowed_value(
connection_ssl_mode, "connection_ssl_mode", _HOLOGRES_SSL_MODES
)
_validate_hologres_ssl_pair(
connection_ssl_mode, connection_ssl_root_cert_location
)
if write_mode in ("AUTO", "INSERT") and copy_format != "binary":
raise ValueError(
f"copy_format={copy_format!r} requires write_mode to be one of "
"{'COPY_STREAM', 'COPY_BULK_LOAD', 'COPY_BULK_LOAD_ON_CONFLICT'}; "
"with write_mode='AUTO' or 'INSERT', copy_format must be 'binary' "
"(AUTO may resolve to INSERT at runtime)."
)
check_and_put_set = check_and_put_column is not None
if check_and_put_set and on_conflict_action not in (
"INSERT_OR_UPDATE",
"INSERT_OR_REPLACE",
):
raise ValueError(
"check_and_put_* options require on_conflict_action to be "
"'INSERT_OR_UPDATE' or 'INSERT_OR_REPLACE'."
)
if check_and_put_set and insert_conflict_where is not None:
raise ValueError(
"insert_conflict_where conflicts with check_and_put_* options; "
"configure only one of them."
)
# HologresDynamicTableSink#verifySinkOptions rejects check-and-put
# when the resolved write mode is any COPY_*. Reject the explicit
# forms here; AUTO is allowed because it usually resolves to INSERT
# (it may resolve to COPY_BULK_LOAD* under batch + Hologres 3.1+ —
# see the write_hologres docstring for that caveat).
if check_and_put_set and write_mode in (
"COPY_STREAM",
"COPY_BULK_LOAD",
"COPY_BULK_LOAD_ON_CONFLICT",
):
raise ValueError(
"check_and_put_* options are not supported with "
f"write_mode={write_mode!r}; use 'AUTO' or 'INSERT'."
)
# DeleteStrategy.requiresPrimaryKey is true for NON_PK_FIELD_TO_NULL
# and DELETE_ROW_ON_PK; HologresDynamicTableSink#verifyDeletionStrategy
# then refuses to start the job without a Flink primary key.
if delete_strategy in (
"NON_PK_FIELD_TO_NULL",
"DELETE_ROW_ON_PK",
) and not primary_key:
raise ValueError(
f"delete_strategy={delete_strategy!r} requires primary_key "
"to be set."
)