Source code for pyflink.dataframe.dataframe

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""
Modern DataFrame API for PyFlink.
"""

from typing import (
    Iterator,
    Literal,
    Union,
    Callable,
    Dict,
    List,
    Any,
    Optional,
    Tuple,
    TypeVar,
    TYPE_CHECKING,
    overload,
)

from pyflink.table.table import Table
from pyflink.table.expression import Expression
from pyflink.table.expressions import (
    col as table_col,
    lit as table_lit,
    and_,
    call_sql,
)
from pyflink.table.types import DataType, FloatType, DoubleType

if TYPE_CHECKING:
    import pandas as pd
    from pyflink.table import TableSchema

T = TypeVar("T")


__all__ = ["DataFrame", "col", "lit"]


[docs] def col(name: str) -> Expression: """ Create a column reference expression. Args: name: Column name. Returns: Expression referencing the column. Example:: >>> import pyflink.dataframe as pf >>> pf.col("a") >>> pf.col("a") + pf.col("b") """ return table_col(name)
[docs] def lit(value: Any, data_type: Optional[DataType] = None) -> Expression: """ Create a literal expression. Args: value: The literal value. data_type: Optional data type for the literal. Returns: Expression representing the literal. Example:: >>> import pyflink.dataframe as pf >>> pf.lit(1) >>> pf.lit("hello") >>> pf.lit(1.5, DataTypes.DOUBLE()) """ if data_type is None: return table_lit(value) return table_lit(value, data_type)
class DataFrame: """ A modern DataFrame API for PyFlink. DataFrame provides a Pythonic interface for data transformations, built on top of the PyFlink Table API. It supports fluent chaining of operations and provides a familiar DataFrame-style API. Example:: >>> import pyflink.dataframe as pf >>> df = pf.from_dict({"id": [1, 2], "name": ["a", "b"]}) >>> result = df.select("id", "name") \\ ... .with_column("id_doubled", pf.col("id") * 2) \\ ... .filter(pf.col("id") > 0) \\ ... .rename({"id": "identifier"}) """ def __init__(self, table: Table): """ Initialize DataFrame with a Table object. Args: table: The underlying PyFlink Table. """ self._table = table @property def llm(self): """ Access LLM / AI functions. Returns: An LLMAccessor with AI methods. Example:: >>> df.llm.predict("text", model="qwen-plus") >>> df.llm.ai_classify("text", ["pos", "neg"], model="qwen-plus") """ from pyflink.dataframe.ai.llm import LLMAccessor return LLMAccessor(self) # ======================== Core Operations ========================
[docs] def select(self, *columns, **projections) -> "DataFrame": """ Select columns from the DataFrame. Supports multiple input formats: - String column names - Expression objects - Lists/tuples of column names or expressions - Keyword arguments for renaming columns Args: *columns: Columns to select. Can be strings, Expressions, or lists. **projections: Column projections as name=expression. The name becomes the new column name. Returns: A new DataFrame with selected columns. Example:: >>> df.select("a", "b") # Select columns by name >>> df.select(col("a"), col("b")) # Select with expressions >>> df.select("a", c=col("a") + 1) # Mix strings and kwargs >>> df.select(["a", "b"]) # List of column names >>> df.select(x=col("a"), y=col("b")) # Rename with kwargs """ exprs = [] for col_spec in columns: if isinstance(col_spec, str): exprs.append(col(col_spec)) elif isinstance(col_spec, Expression): exprs.append(col_spec) elif isinstance(col_spec, (list, tuple)): for c in col_spec: if isinstance(c, str): exprs.append(col(c)) elif isinstance(c, Expression): exprs.append(c) else: raise TypeError( f"Unsupported column type in list: {type(c)}" ) else: raise TypeError(f"Unsupported column type: {type(col_spec)}") for name, expr in projections.items(): if isinstance(expr, Expression): exprs.append(expr.alias(name)) else: exprs.append(lit(expr).alias(name)) return DataFrame(self._table.select(*exprs))
[docs] def with_column( self, name: str, expr: Union[Expression, Callable[["DataFrame"], Expression]], ) -> "DataFrame": """ Add a new column or replace an existing column. Args: name: Name of the new or existing column. expr: Expression for the column value, or a callable that takes the DataFrame and returns an Expression. Returns: A new DataFrame with the added/replaced column. Example:: >>> df.with_column("c", col("a") + col("b")) >>> df.with_column("c", lambda df: df["a"] + df["b"]) """ if callable(expr): expr = expr(self) if not isinstance(expr, Expression): raise TypeError(f"Expected Expression, got {type(expr)}") return DataFrame(self._table.add_or_replace_columns(expr.alias(name)))
[docs] def with_columns(self, *exprs, **named_exprs) -> "DataFrame": """ Add multiple columns or replace existing columns. Args: *exprs: Expressions with .alias() already applied. **named_exprs: Column name=expression pairs. Returns: A new DataFrame with the added/replaced columns. Example:: >>> df.with_columns( ... c=col("a") + col("b"), ... d=col("a") * 2 ... ) >>> df.with_columns( ... (col("a") + col("b")).alias("c"), ... (col("a") * 2).alias("d") ... ) """ all_exprs = list(exprs) for name, expr in named_exprs.items(): if isinstance(expr, Expression): all_exprs.append(expr.alias(name)) else: all_exprs.append(lit(expr).alias(name)) return DataFrame(self._table.add_or_replace_columns(*all_exprs))
[docs] def drop_columns( self, *columns: Union[str, Expression], strict: bool = True ) -> "DataFrame": """ Drop columns from the DataFrame. Args: *columns: Column names or expressions to drop. strict: If True, raise an error if a column doesn't exist. If False, silently ignore non-existent columns. Returns: A new DataFrame without the dropped columns. Example:: >>> df.drop_columns("a", "b") >>> df.drop_columns(col("a"), col("b")) >>> df.drop_columns("nonexistent", strict=False) """ exprs = [] schema_columns = set(self._table.get_schema().get_field_names()) for col_spec in columns: col_name = None if isinstance(col_spec, str): col_name = col_spec if col_name not in schema_columns: if strict: raise ValueError(f"Column '{col_name}' not found in schema") else: # Skip non-existent columns in non-strict mode continue exprs.append(col(col_name)) elif isinstance(col_spec, Expression): exprs.append(col_spec) else: raise TypeError(f"Unsupported column type: {type(col_spec)}") if not exprs: # Nothing to drop, return self return self return DataFrame(self._table.drop_columns(*exprs))
# Alias for drop_columns drop = drop_columns
[docs] def filter( self, predicate: Union[Expression, str, Callable[["DataFrame"], Expression]], **constraints, ) -> "DataFrame": """ Filter rows based on a predicate. Args: predicate: Filter condition. Can be: - An Expression - A SQL string - A callable that takes the DataFrame and returns an Expression **constraints: Additional equality constraints as column=value pairs. Returns: A new DataFrame with filtered rows. Example:: >>> df.filter(col("a") > 10) >>> df.filter("a > 10") # SQL string >>> df.filter(col("a") > 10, b="hello") # Mix predicate and constraints >>> df.filter(lambda df: df["a"] > 10) """ conditions = [] # Handle main predicate if isinstance(predicate, str): conditions.append(call_sql(predicate)) elif callable(predicate): conditions.append(predicate(self)) elif isinstance(predicate, Expression): conditions.append(predicate) else: raise TypeError(f"Unsupported predicate type: {type(predicate)}") # Handle equality constraints for name, value in constraints.items(): conditions.append(col(name) == lit(value)) # Combine all conditions with AND if len(conditions) == 1: final_predicate = conditions[0] else: final_predicate = and_(*conditions) return DataFrame(self._table.filter(final_predicate))
# Alias for filter where = filter
[docs] def rename_columns( self, *args, mapping: Union[Dict[str, str], Callable[[str], str]] = None, ) -> "DataFrame": """ Rename columns. Supports multiple input formats: - Dictionary mapping old names to new names - Callable that transforms column names - Two strings for single column rename - Multiple pairs of strings for multi-column rename Args: *args: For column rename: - Single column: (old_name, new_name) - Multiple columns: (old1, new1, old2, new2, ...) mapping: Rename mapping. Can be: - Dict[str, str]: {"old_name": "new_name"} - Callable[[str], str]: lambda name: name.upper() Returns: A new DataFrame with renamed columns. Example:: >>> df.rename_columns({"a": "id", "b": "value"}) >>> df.rename_columns("a", "id") # Single column >>> df.rename_columns("a", "id", "b", "value") # Multiple columns >>> df.rename_columns(lambda name: name.upper()) # Transform all names """ # Handle positional arguments if len(args) == 1: # Single argument: could be dict or callable mapping = args[0] elif len(args) >= 2 and len(args) % 2 == 0: # Pairs: (old1, new1, old2, new2, ...) mapping = {} for i in range(0, len(args), 2): old_name, new_name = args[i], args[i + 1] if not isinstance(old_name, str) or not isinstance(new_name, str): raise TypeError( f"Column names must be strings, got {type(old_name)} and {type(new_name)}" ) mapping[old_name] = new_name elif len(args) > 0: raise ValueError( "rename_columns() accepts either a dict/callable mapping, " "or pairs of string arguments (old1, new1, old2, new2, ...)" ) rename_exprs = [] current_columns = self._table.get_schema().get_field_names() if isinstance(mapping, dict): for old_name, new_name in mapping.items(): if old_name in current_columns: rename_exprs.append(col(old_name).alias(new_name)) elif callable(mapping): for old_name in current_columns: new_name = mapping(old_name) if new_name != old_name: rename_exprs.append(col(old_name).alias(new_name)) else: raise TypeError( f"Unsupported mapping type: {type(mapping)}. " "Expected dict, callable, or two string arguments." ) if rename_exprs: return DataFrame(self._table.rename_columns(*rename_exprs)) return self
# Alias for rename_columns rename = rename_columns # ======================== Aggregation ========================
[docs] def group_by(self, *columns: Union[str, Expression]) -> "GroupedDataFrame": """ Group the DataFrame by columns. Args: *columns: Columns to group by. Returns: A GroupedDataFrame for aggregation operations. Example:: >>> import pyflink.dataframe as pf >>> df = pf.from_records( ... [("A", 1), ("A", 2), ("B", 3)], ... schema=["category", "value"], ... ) >>> result = df.group_by("category").agg( ... pf.col("category"), ... pf.col("value").sum.alias("total") ... ) """ exprs = [] for col_spec in columns: if isinstance(col_spec, str): exprs.append(col(col_spec)) elif isinstance(col_spec, Expression): exprs.append(col_spec) else: raise TypeError(f"Unsupported column type: {type(col_spec)}") return GroupedDataFrame(self, exprs)
[docs] def agg(self, *aggs: Expression) -> "DataFrame": """ Apply aggregation expressions to the entire DataFrame. Args: *aggs: Aggregation expressions. Returns: A new DataFrame with aggregation results. """ return DataFrame(self._table.aggregate(*aggs))
# ======================== Join ========================
[docs] def join( self, other: "DataFrame", on: Union[str, Expression, List[str]] = None, how: str = "inner", left_on: Union[str, Expression, List[str]] = None, right_on: Union[str, Expression, List[str]] = None, ) -> "DataFrame": """ Join with another DataFrame. Args: other: Right DataFrame to join. on: Join key(s) when both DataFrames have the same column names. how: Join type. One of "inner", "left", "right", "full", "outer". left_on: Left join key(s). right_on: Right join key(s). Returns: A new DataFrame with the join result. Example:: >>> df1.join(df2, on="id") >>> df1.join(df2, left_on="id", right_on="user_id") >>> df1.join(df2, on=["id", "name"], how="left") """ join_methods = { "inner": self._table.join, "left": self._table.left_outer_join, "right": self._table.right_outer_join, "full": self._table.full_outer_join, "outer": self._table.full_outer_join, } if how not in join_methods: raise ValueError( f"Unsupported join type: {how}. " f"Supported types: {list(join_methods.keys())}" ) right_table = other.to_table() # Convert join keys to expressions def to_exprs(keys): if keys is None: return None if isinstance(keys, str): return [col(keys)] if isinstance(keys, Expression): return [keys] return [col(k) if isinstance(k, str) else k for k in keys] if on is not None: join_exprs = to_exprs(on) predicate = and_(*join_exprs) if len(join_exprs) > 1 else join_exprs[0] return DataFrame(join_methods[how](right_table, predicate)) else: left_exprs = to_exprs(left_on) right_exprs = to_exprs(right_on) if not left_exprs or not right_exprs: raise ValueError( "Must specify either 'on' or both 'left_on' and 'right_on'" ) if len(left_exprs) != len(right_exprs): raise ValueError( f"'left_on' and 'right_on' must have the same length, " f"got {len(left_exprs)} and {len(right_exprs)}" ) join_conditions = [l == r for l, r in zip(left_exprs, right_exprs)] predicate = and_(*join_conditions) if len(join_conditions) > 1 else join_conditions[0] return DataFrame(join_methods[how](right_table, predicate))
# ======================== Map ========================
[docs] def map(self, func, *, return_dtype=None, concurrency: int = None, num_gpus: float = None, gpu_type: str = None) -> "DataFrame": """ Apply a function to each row, producing a new DataFrame. The function receives a dict[str, value] and returns a dict[str, value]. This is a 1-to-1 row transformation. return_dtype can be omitted if the function has a TypedDict return type hint. Args: func: A function (dict -> dict), or a DataFrameUDFWrapper. return_dtype: Output schema as DataType.struct(...). Can be omitted if func has a TypedDict return hint. concurrency: Optional concurrency (parallelism) for the UDF operator. If specified, the operator running this UDF will use this parallelism. UDFs with different concurrency values will be split into separate operators. num_gpus: Optional number of GPUs requested for this UDF (e.g., 0.5, 1.0). Each GPU UDF will run in its own operator. gpu_type: GPU type (e.g., 'A10', 'V100'). Returns: A new DataFrame with the transformed rows. Example:: >>> # With explicit return_dtype >>> df.map(lambda row: {"a": row["a"] + 1, "b": row["b"].upper()}, ... return_dtype=DataType.struct({"a": DataType.int64(), ... "b": DataType.string()})) >>> >>> # With TypedDict return hint (auto-inferred) >>> class Output(TypedDict): ... a: int ... b: str >>> def process(row) -> Output: ... return {"a": row["a"] + 1, "b": row["b"].upper()} >>> df.map(process) >>> >>> # With concurrency >>> df.map(process, concurrency=4) >>> >>> # With GPU resources >>> df.map(process, num_gpus=0.5, gpu_type='A10') """ if num_gpus is not None and (not isinstance(num_gpus, (int, float)) or num_gpus <= 0): raise ValueError("num_gpus must be a positive number, got: {}".format(num_gpus)) if num_gpus is not None and gpu_type is None: raise ValueError("gpu_type must be specified when num_gpus is set") from pyflink.dataframe.udf import _resolve_map_udf table_wrapper = _resolve_map_udf( func, return_dtype, func_type="general", input_columns=self.columns, concurrency=concurrency, num_gpus=num_gpus, gpu_type=gpu_type) return DataFrame(self._table.map(table_wrapper))
[docs] def map_batches(self, func, *, return_dtype=None, batch_format: str, concurrency: int = None, batch_size: int = None, num_gpus: float = None, gpu_type: str = None) -> "DataFrame": """ Apply a function to batches of rows, producing a new DataFrame. The function receives a dict[str, pd.Series] in pandas mode, or a dict[str, pyarrow.Array] in arrow mode, and returns the same kind of object. This is an N-to-N batch transformation; each returned batch must have the same row count as the input batch. In pandas mode, return_dtype can be omitted if the function has a TypedDict return type hint. In arrow mode, return_dtype must describe the returned Arrow columns. Args: func: A function (dict[str, pd.Series] -> dict[str, pd.Series]) in pandas mode, a function (dict[str, pyarrow.Array] -> dict[str, pyarrow.Array]) in arrow mode, or a DataFrameUDFWrapper. return_dtype: Output schema as DataType.struct(...). In pandas mode, can be omitted if func has a TypedDict return hint. batch_format: Batch transport format, either "pandas" or "arrow". concurrency: Optional concurrency (parallelism) for the UDF operator. If specified, the operator running this UDF will use this parallelism. UDFs with different concurrency values will be split into separate operators. batch_size: Optional maximum number of elements per batch. If not specified, the global configuration 'python.fn-execution.arrow.batch.size' is used (default 1000). When multiple UDFs are fused, the maximum batch_size among them is used. num_gpus: Optional number of GPUs requested for this UDF (e.g., 0.5, 1.0). Each GPU UDF will run in its own operator. gpu_type: GPU type (e.g., 'A10', 'V100'). Returns: A new DataFrame with the transformed rows. Example:: >>> # pandas mode >>> def process(batch): ... return {"a": batch["a"] + 1} >>> df.map_batches(process, batch_format="pandas", ... return_dtype=DataType.struct({"a": DataType.int64()})) >>> >>> # arrow mode >>> def process_arrow(batch): ... return {"a": batch["a"]} >>> df.map_batches(process_arrow, batch_format="arrow", ... return_dtype=DataType.struct({"a": DataType.int64()})) >>> >>> # With TypedDict return hint (auto-inferred) >>> class Output(TypedDict): ... a: int >>> def process(batch) -> Output: ... return {"a": batch["a"] + 1} >>> df.map_batches(process, batch_format="pandas") >>> >>> # With concurrency >>> df.map_batches(process, batch_format="pandas", concurrency=4) >>> >>> # With batch_size >>> df.map_batches(process, batch_format="pandas", batch_size=500) >>> >>> # With GPU resources >>> df.map_batches(process, batch_format="pandas", num_gpus=0.5, gpu_type='A10') """ if batch_format not in ("pandas", "arrow"): raise ValueError( "batch_format must be 'pandas' or 'arrow', got: %r" % batch_format) if batch_size is not None and (not isinstance(batch_size, int) or batch_size <= 0): raise ValueError("batch_size must be a positive integer, got: {}".format(batch_size)) if num_gpus is not None and (not isinstance(num_gpus, (int, float)) or num_gpus <= 0): raise ValueError("num_gpus must be a positive number, got: {}".format(num_gpus)) if num_gpus is not None and gpu_type is None: raise ValueError("gpu_type must be specified when num_gpus is set") from pyflink.dataframe.udf import _resolve_map_udf table_wrapper = _resolve_map_udf( func, return_dtype, func_type=batch_format, input_columns=self.columns, concurrency=concurrency, batch_size=batch_size, num_gpus=num_gpus, gpu_type=gpu_type) return DataFrame(self._table.map(table_wrapper))
# ======================== Composition ========================
[docs] def pipe( self, func: Callable[..., T], *args: Any, **kwargs: Any, ) -> T: """ Apply a chainable function to the DataFrame. Enables fluent method chaining with user-defined functions. The DataFrame is passed as the first argument to ``func``. Args: func: A function whose first argument is a DataFrame. *args: Additional positional arguments passed to func. **kwargs: Additional keyword arguments passed to func. Returns: The return value of func. Example:: >>> import pyflink.dataframe as pf >>> def add_total(df): ... return df.with_column("total", pf.col("a") + pf.col("b")) ... >>> def filter_positive(df, col_name): ... return df.filter(pf.col(col_name) > 0) ... >>> result = ( ... df.pipe(add_total) ... .pipe(filter_positive, "total") ... ) """ return func(self, *args, **kwargs)
# ======================== Conversion ========================
[docs] def to_table(self) -> Table: """ Convert to a PyFlink Table. Returns: The underlying Table object. """ return self._table
[docs] def to_pandas(self): """ Convert to a Pandas DataFrame. Returns: A Pandas DataFrame. """ return self._table.to_pandas()
[docs] def collect(self) -> List: """ Collect the DataFrame results to a list. Returns: A list of rows. """ return list(self._table.execute().collect())
[docs] def limit(self, n: int) -> "DataFrame": """ Limit the DataFrame to the first n rows. This is a lazy transformation that returns a new DataFrame. Args: n: The maximum number of rows to return. Must be non-negative. Returns: A new DataFrame limited to at most n rows. Example:: >>> df = pf.from_records([(1,), (2,), (3,)], schema=["a"]) >>> df.limit(2).collect() [Row(a=1), Row(a=2)] """ if n < 0: raise ValueError("n must be non-negative") return DataFrame(self._table.fetch(n))
[docs] def offset(self, n: int) -> "DataFrame": """ Skip the first n rows of the DataFrame. This is a lazy transformation that returns a new DataFrame. Args: n: The number of rows to skip. Must be non-negative. Returns: A new DataFrame with the first n rows skipped. Example:: >>> df = pf.from_records([(1,), (2,), (3,)], schema=["a"]) >>> df.offset(1).limit(2).collect() [Row(a=2), Row(a=3)] """ if n < 0: raise ValueError("n must be non-negative") return DataFrame(self._table.offset(n))
[docs] def head(self, n: int) -> "DataFrame": """ Return the first n rows as a new DataFrame. This is a lazy transformation, equivalent to ``limit(n)``. Args: n: The number of rows to return. Returns: A new DataFrame containing at most n rows. Example:: >>> df = pf.from_records([(i,) for i in range(10)], schema=["a"]) >>> df.head(3).collect() [Row(a=0), Row(a=1), Row(a=2)] """ return self.limit(n)
@overload def iter_rows( self, *, named: Literal[False] = ... ) -> Iterator[Tuple[Any, ...]]: ... @overload def iter_rows( self, *, named: Literal[True] ) -> Iterator[Dict[str, Any]]: ...
[docs] def iter_rows( self, *, named: bool = False ) -> Union[Iterator[Tuple[Any, ...]], Iterator[Dict[str, Any]]]: """ Return an iterator over rows of the DataFrame. This is a terminal operation that triggers execution. Args: named: If False (default), yields tuples of values. If True, yields dictionaries {column_name: value}. Returns: A generator that yields rows as tuples or dicts. Example:: >>> df = pf.from_records([(1, "a"), (2, "b")], schema=["id", "name"]) >>> for row in df.iter_rows(): ... print(row) (1, 'a') (2, 'b') >>> for row in df.iter_rows(named=True): ... print(row) {'id': 1, 'name': 'a'} {'id': 2, 'name': 'b'} """ columns = self.columns closeable_iter = self._table.execute().collect() try: for row in closeable_iter: if named: yield dict(zip(columns, row)) else: yield tuple(row) finally: closeable_iter.close()
[docs] def iter_batches(self, *, batch_size: int = 1000) -> Iterator["pd.DataFrame"]: """ Return an iterator of pandas DataFrames, each containing up to batch_size rows. This is a terminal operation that triggers execution. It uses Arrow-based serialization for efficient data transfer. Args: batch_size: The maximum number of rows per batch. Defaults to 1000. Returns: A generator that yields pandas DataFrames. Example:: >>> df = pf.from_records([(i,) for i in range(2500)], schema=["a"]) >>> for batch in df.iter_batches(batch_size=1000): ... print(len(batch)) 1000 1000 500 """ import pyarrow as pa from pyflink.table.serializers import ArrowSerializer from pyflink.table.types import create_arrow_schema from pyflink.java_gateway import get_gateway self._table._t_env._before_execute() gateway = get_gateway() max_arrow_batch_size = ( self._table._j_table.getTableEnvironment() .getConfig() .get( gateway.jvm.org.apache.flink.python.PythonOptions.MAX_ARROW_BATCH_SIZE ) ) batches_iterator = ( gateway.jvm.org.apache.flink.table.runtime.arrow.ArrowUtils .collectAsPandasDataFrame(self._table._j_table, max_arrow_batch_size) ) if not batches_iterator.hasNext(): return import pytz timezone = pytz.timezone( self._table._j_table.getTableEnvironment() .getConfig() .getLocalTimeZone() .getId() ) serializer = ArrowSerializer( create_arrow_schema( self._table.get_schema().get_field_names(), self._table.get_schema().get_field_data_types(), ), self._table.get_schema().to_row_data_type(), timezone, ) from pyflink.table.utils import tz_convert_from_internal schema = self._table.get_schema() field_names = schema.get_field_names() accumulated = [] accumulated_rows = 0 for record_batch in serializer.load_from_iterator(batches_iterator): accumulated.append(record_batch) accumulated_rows += record_batch.num_rows while accumulated_rows >= batch_size: table = pa.Table.from_batches(accumulated) accumulated = [] accumulated_rows = 0 pdf = table.slice(0, batch_size).to_pandas() remainder_rows = len(table) - batch_size if remainder_rows > 0: remainder_table = table.slice(batch_size) # Convert remainder back to batches for next iteration for batch in remainder_table.to_batches(): accumulated.append(batch) accumulated_rows += batch.num_rows for field_name in field_names: pdf[field_name] = tz_convert_from_internal( pdf[field_name], schema.get_field_data_type(field_name), timezone, ) yield pdf # Yield any remaining rows if accumulated_rows > 0: table = pa.Table.from_batches(accumulated) pdf = table.to_pandas() for field_name in field_names: pdf[field_name] = tz_convert_from_internal( pdf[field_name], schema.get_field_data_type(field_name), timezone, ) yield pdf
# ======================== I/O ========================
[docs] def write_parquet( self, path: str, *, mode: str = "overwrite", # Parquet format options compression: str = "SNAPPY", block_size: int = 128 * 1024 * 1024, page_size: int = 1024 * 1024, dictionary_page_size: int = 1024 * 1024, enable_dictionary: bool = True, max_padding_size: int = 8 * 1024 * 1024, validation: bool = False, writer_version: str = "v1", utc_timezone: bool = False, timestamp_time_unit: str = "micros", write_int64_timestamp: bool = False, # FileSystem sink rolling policy options rolling_policy_file_size: str = "128mb", rolling_policy_rollover_interval: str = "30min", rolling_policy_inactivity_interval: str = "30min", rolling_policy_check_interval: str = "1min", # FileSystem sink partition commit options partition_commit_trigger: str = "process-time", partition_commit_delay: str = "0s", partition_commit_watermark_time_zone: str = "UTC", partition_commit_policy_kind: Optional[str] = None, partition_commit_policy_class: Optional[str] = None, partition_commit_policy_class_parameters: Optional[str] = None, partition_commit_success_file_name: str = "_SUCCESS", # FileSystem sink partition time extractor options partition_time_extractor_kind: str = "default", partition_time_extractor_class: Optional[str] = None, partition_time_extractor_timestamp_formatter: Optional[str] = None, partition_time_extractor_timestamp_pattern: Optional[str] = None, # FileSystem sink other options shuffle_by_partition_enable: bool = False, auto_compaction: bool = False, compaction_file_size: Optional[str] = None, compaction_parallelism: Optional[int] = None, sink_parallelism: Optional[int] = None, partition_default_name: str = "__DEFAULT_PARTITION__", ) -> None: """ Write the DataFrame to Parquet file(s) at the given path. Uses Flink's FileSystem Connector with Parquet Format under the hood. Args: path: Output path for the Parquet file(s). Supports local paths and any Flink-supported file system (HDFS, OSS, S3, etc.). mode: [Batch] Write mode. 'overwrite' to overwrite existing data (default), 'append' to append to existing data. In streaming mode, this is always forced to 'append' since streaming does not support overwrite. compression: [Batch/Streaming] Parquet compression codec. One of 'SNAPPY' (default), 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD', 'LZ4_RAW', 'UNCOMPRESSED'. block_size: [Batch/Streaming] Row group size in bytes. Default is 134217728 (128MB). page_size: [Batch/Streaming] Data page size in bytes. Default is 1048576 (1MB). dictionary_page_size: [Batch/Streaming] Dictionary page size in bytes. Default is 1048576 (1MB). enable_dictionary: [Batch/Streaming] Whether to enable dictionary encoding. Default is True. max_padding_size: [Batch/Streaming] The maximum padding size in bytes for row group alignment. Default is 8388608 (8MB). validation: [Batch/Streaming] Whether to enable schema validation on write. Default is False. writer_version: [Batch/Streaming] Parquet writer version. One of 'v1' (default), 'v2'. utc_timezone: [Batch/Streaming] Use UTC timezone or local timezone to the conversion between epoch time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use UTC timezone. Default is False. timestamp_time_unit: [Batch/Streaming] Store parquet int64/LogicalTypes timestamps in this time unit. One of 'nanos', 'micros' (default), 'millis'. write_int64_timestamp: [Batch/Streaming] Write parquet timestamp as int64/LogicalTypes instead of int96/OriginalTypes. Note: Timestamp will be time zone agnostic (NEVER converted to a different time zone). Default is False. rolling_policy_file_size: [Streaming] The maximum part file size before rolling (e.g., '128mb'). Default is 128MB. rolling_policy_rollover_interval: [Streaming] The maximum time duration a part file can stay open before rolling (by default long enough to avoid too many small files). The frequency at which this is checked is controlled by rolling_policy_check_interval. Default is 30 minutes. rolling_policy_inactivity_interval: [Streaming] The maximum time duration a part file can stay inactive before rolling (by default long enough to avoid too many small files). The frequency at which this is checked is controlled by rolling_policy_check_interval. Default is 30 minutes. rolling_policy_check_interval: [Streaming] The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on rolling_policy_rollover_interval. Default is 1 minute. partition_commit_trigger: [Streaming] Trigger type for partition commit. One of 'process-time' (default), 'partition-time'. partition_commit_delay: [Streaming] The partition will not commit until the delay time. The value should be '1d' for day partitions and '1h' for hour partitions. Default is 0. partition_commit_watermark_time_zone: [Streaming] The time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. The default value is 'UTC', which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is user configured time zone. The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-08:00'. partition_commit_policy_kind: [Streaming] Policy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. 'metastore': add partition to metastore. Only hive table supports metastore policy, file system manages partitions through directory structure. 'success-file': add '_success' file to directory. Both can be configured at the same time: 'metastore,success-file'. 'custom': use policy class to create a commit policy. partition_commit_policy_class: [Streaming] The partition commit policy class for implementing PartitionCommitPolicy interface. Only works with 'custom' commit policy. partition_commit_policy_class_parameters: [Streaming] The parameters passed to the constructor of the custom commit policy, with multiple parameters separated by semicolons, such as 'param1;param2'. partition_commit_success_file_name: [Streaming] The file name for success-file partition commit policy. Default is '_SUCCESS'. partition_time_extractor_kind: [Streaming] Time extractor to extract time from partition values. This can either be 'default' or a custom extractor class. For 'default', you can configure a timestamp pattern. Default is 'default'. partition_time_extractor_class: [Streaming] The extractor class for implementing PartitionTimeExtractor interface. Only used when partition_time_extractor_kind is not 'default'. partition_time_extractor_timestamp_formatter: [Streaming] The formatter to format timestamp from string. Used with partition_time_extractor_timestamp_pattern. Supports multiple partition fields like '$year-$month-$day $hour:00:00'. Compatible with Java's DateTimeFormatter. partition_time_extractor_timestamp_pattern: [Streaming] Pattern to get a timestamp from partitions when partition_time_extractor_kind is 'default'. E.g., '$dt' for a single partition field, or '$year-$month-$day $hour:00:00' for multiple fields. shuffle_by_partition_enable: [Batch/Streaming] Enable shuffle data by dynamic partition fields in sink phase, this can greatly reduce the number of files for filesystem sink but may lead to data skew. Default is False. auto_compaction: [Streaming] Whether to enable automatic compaction in streaming sink. The data will be written to temporary files. After the checkpoint is completed, the temporary files generated by a checkpoint will be compacted. The temporary files are invisible before compaction. Default is False. compaction_file_size: [Streaming] The compaction target file size. Defaults to the rolling file size. compaction_parallelism: [Batch] Custom parallelism for the compaction operator in batch mode. By default, the planner will use the parallelism of the sink. sink_parallelism: [Batch/Streaming] Custom parallelism for the sink. partition_default_name: [Batch/Streaming] The default partition name in case the dynamic partition column value is null/empty string. Default is '__DEFAULT_PARTITION__'. Example:: >>> import pyflink.dataframe as pf >>> >>> df = pf.from_records([(1, "a"), (2, "b")], schema=["id", "name"]) >>> df.write_parquet("/path/to/output") >>> >>> # Append with GZIP compression >>> df.write_parquet("/path/to/output", mode="append", ... compression="GZIP") >>> >>> # Custom rolling policy and block size >>> df.write_parquet("/path/to/output", ... block_size=256 * 1024 * 1024, ... rolling_policy_file_size="256mb") """ from pyflink.dataframe.util.artifacts import ( add_built_in_connector, add_built_in_format, ) from pyflink.table import TableDescriptor, FormatDescriptor if mode not in ("overwrite", "append"): raise ValueError( f"Invalid mode '{mode}'. Must be 'overwrite' or 'append'." ) # Streaming mode does not support overwrite, force append t_env = self._table._t_env add_built_in_connector(t_env, "filesystem") add_built_in_format(t_env, "parquet") runtime_mode = t_env.get_config().get( "execution.runtime-mode", "STREAMING" ) if runtime_mode.upper() == "STREAMING": mode = "append" # Build Parquet format descriptor format_builder = FormatDescriptor.for_format("parquet") parquet_options = { "compression": compression, "block.size": str(block_size), "page.size": str(page_size), "dictionary.page.size": str(dictionary_page_size), "writer.max-padding": str(max_padding_size), "timestamp.time.unit": timestamp_time_unit, "enable.dictionary": str(enable_dictionary).lower(), "validation": str(validation).lower(), "writer.version": writer_version, "utc-timezone": str(utc_timezone).lower(), "write.int64.timestamp": str(write_int64_timestamp).lower(), } for key, value in parquet_options.items(): format_builder = format_builder.option(key, value) # Build schema from the DataFrame's existing schema resolved_schema = self._table.get_resolved_schema() schema_builder = _build_physical_schema_from_resolved_schema( resolved_schema ) # Build connector descriptor connector_builder = TableDescriptor.for_connector("filesystem") \ .option("path", path) \ .format(format_builder.build()) \ .schema(schema_builder.build()) sink_options = { "sink.rolling-policy.file-size": rolling_policy_file_size, "sink.rolling-policy.rollover-interval": rolling_policy_rollover_interval, "sink.rolling-policy.inactivity-interval": rolling_policy_inactivity_interval, "sink.rolling-policy.check-interval": rolling_policy_check_interval, "sink.partition-commit.trigger": partition_commit_trigger, "sink.partition-commit.delay": partition_commit_delay, "sink.partition-commit.watermark-time-zone": partition_commit_watermark_time_zone, "sink.partition-commit.policy.kind": partition_commit_policy_kind, "sink.partition-commit.policy.class": partition_commit_policy_class, "sink.partition-commit.policy.class.parameters": partition_commit_policy_class_parameters, "sink.partition-commit.success-file.name": partition_commit_success_file_name, "partition.time-extractor.kind": partition_time_extractor_kind, "partition.time-extractor.class": partition_time_extractor_class, "partition.time-extractor.timestamp-formatter": partition_time_extractor_timestamp_formatter, "partition.time-extractor.timestamp-pattern": partition_time_extractor_timestamp_pattern, "partition.default-name": partition_default_name, "compaction.file-size": compaction_file_size, "sink.shuffle-by-partition.enable": str(shuffle_by_partition_enable).lower(), "auto-compaction": str(auto_compaction).lower(), } if compaction_parallelism is not None: sink_options["compaction.parallelism"] = \ str(compaction_parallelism) if sink_parallelism is not None: sink_options["sink.parallelism"] = \ str(sink_parallelism) for key, value in sink_options.items(): if value is not None: connector_builder = connector_builder.option( key, value ) result = self._table.execute_insert( connector_builder.build(), overwrite=(mode == "overwrite") ) # Auto-detect local execution and wait for job to finish target = t_env.get_config().get("execution.target", None) if target in ("local", "minicluster"): result.wait()
[docs] def write_kafka( self, bootstrap_servers: str, *, properties: Optional[Dict[str, str]] = None, # General options format: str = "json", format_options: Optional[Dict[str, str]] = None, key_format: Optional[str] = None, key_format_options: Optional[Dict[str, str]] = None, key_fields: Optional[List[str]] = None, key_fields_prefix: Optional[str] = None, value_format: Optional[str] = None, value_format_options: Optional[Dict[str, str]] = None, value_fields_include: Literal["ALL", "EXCEPT_KEY"] = "ALL", # Sink-specific options topic: str, partitioner: str = "default", delivery_guarantee: Literal["none", "at-least-once", "exactly-once"] = "at-least-once", transactional_id_prefix: Optional[str] = None, parallelism: Optional[int] = None, ) -> None: """ Write the DataFrame to a Kafka topic. Uses Flink's Kafka SQL Connector under the hood. The DataFrame rows are serialized using the specified format and written to the given Kafka topic. Args: bootstrap_servers: Comma-separated list of Kafka broker addresses (e.g., ``"localhost:9092"`` or ``"broker1:9092,broker2:9092"``). properties: Extra Kafka producer properties. Each key in this dict is directly passed to Kafka. Example: ``{"batch.size": "16384"}`` Note: - Property names should be valid Kafka producer config keys. format: Format for serializing message values. Supported options: ``"csv"``, ``"json"``, ``"avro"``, ``"debezium-json"``, ``"canal-json"``, ``"maxwell-json"``, ``"avro-confluent"``, ``"raw"``. format_options: Additional options for the value format, as a dict. Keys are format-specific options without the format prefix (e.g., ``{"timestamp-format.standard": "ISO-8601"}`` for JSON format). key_format: Format for serializing message keys (e.g., ``"json"``, ``"csv"``, ``"avro"``). If specified, ``key_fields`` should also be provided. key_format_options: Additional options for the key format, as a dict. Same structure as ``format_options``. key_fields: List of column names that form the message key. Required when ``key_format`` is specified. key_fields_prefix: Prefix for key field column names to avoid name clashes with value fields. Requires ``value_fields_include="EXCEPT_KEY"``. value_format: The format used to serialize the value part of Kafka messages. Only one of ``format`` or ``value_format`` are required. If both are configured, ``value_format`` should take precedence. value_format_options: Extra options for ``value_format``. value_fields_include: Whether key fields are included in value fields during value serialization. - ``"ALL"`` (default): all columns are treated as value fields. - ``"EXCEPT_KEY"``: fields in ``key_fields`` are excluded from value fields. topic: Target Kafka topic name to write to. partitioner: How to distribute records to Kafka partitions. One of: - ``"default"``: Use Kafka's default partitioner (sticky round-robin for null keys, murmur2 hash for keys). - ``"fixed"``: Each Flink partition maps to at most one Kafka partition. - ``"round-robin"``: Records are distributed round-robin across partitions (only for records without keys). - A fully qualified class name of a custom ``FlinkKafkaPartitioner`` subclass. delivery_guarantee: Delivery semantic for the Kafka sink. One of: - ``"at-least-once"`` (default): Messages are never lost but may be duplicated on failure. - ``"exactly-once"``: Kafka transactions ensure no duplicates. Requires ``transactional_id_prefix`` to be set and Flink checkpointing to be enabled. - ``"none"``: No guarantees — messages may be lost or duplicated. transactional_id_prefix: Prefix for Kafka transaction IDs. Required when ``delivery_guarantee="exactly-once"``. Should be stable across restarts. Different applications must use different prefixes. parallelism: Custom parallelism for the Kafka sink operator. By default, inherits parallelism from the upstream operator. Example:: >>> import pyflink.dataframe as pf >>> >>> df = pf.from_records( ... [(1, "alice"), (2, "bob")], ... schema=["user_id", "name"], ... ) >>> >>> # Simple write with JSON format >>> df.write_kafka("localhost:9092", topic="user_events") >>> >>> # Write with Avro format >>> df.write_kafka( ... "localhost:9092", ... topic="user_events", ... format="avro", ... ) >>> >>> # Write with key fields >>> df.write_kafka( ... "localhost:9092", ... topic="user_events", ... key_format="json", ... key_fields=["user_id"], ... ) >>> >>> # Write with exactly-once delivery guarantee >>> df.write_kafka( ... "localhost:9092", ... topic="user_events", ... delivery_guarantee="exactly-once", ... transactional_id_prefix="my-app-sink", ... ) >>> >>> # Write with fixed partitioning and parallelism >>> df.write_kafka( ... "localhost:9092", ... topic="user_events", ... partitioner="fixed", ... parallelism=4, ... ) >>> >>> # Write with JSON format options >>> df.write_kafka( ... "localhost:9092", ... topic="user_events", ... format="json", ... format_options={ ... "timestamp-format.standard": "ISO-8601", ... }, ... ) >>> >>> # Write with Kafka security (SASL/SSL) >>> df.write_kafka( ... "localhost:9092", ... topic="secure_events", ... properties={ ... "security.protocol": "SASL_SSL", ... "sasl.mechanism": "PLAIN", ... "sasl.jaas.config": ( ... "org.apache.kafka.common.security.plain" ... ".PlainLoginModule required" ... ' username="user" password="pass";' ... ), ... }, ... ) """ from pyflink.dataframe.util.artifacts import ( add_built_in_connector, add_built_in_format, ) from pyflink.dataframe.io import _to_str from pyflink.table import TableDescriptor, FormatDescriptor t_env = self._table._t_env add_built_in_connector(t_env, "kafka") add_built_in_format(t_env, format) add_built_in_format(t_env, key_format) add_built_in_format(t_env, value_format) # Validate key/value option dependencies if key_format is not None and key_fields is None: raise ValueError( "'key_fields' must be specified when 'key_format' is defined." ) if ( key_fields_prefix is not None and value_fields_include != "EXCEPT_KEY" ): raise ValueError( "'value_fields_include' must be 'EXCEPT_KEY' when " "'key_fields_prefix' is set." ) if delivery_guarantee == "exactly-once" and \ transactional_id_prefix is None: raise ValueError( "transactional_id_prefix must be specified when " "delivery_guarantee is 'exactly-once'." ) if delivery_guarantee != "exactly-once" and \ transactional_id_prefix is not None: raise ValueError( "transactional_id_prefix must not be specified unless " "delivery_guarantee is 'exactly-once'." ) # Build value format descriptor format_builder = FormatDescriptor.for_format(format) if format_options: for key, value in format_options.items(): format_builder = format_builder.option(key, _to_str(value)) # Build schema from the DataFrame's existing schema resolved_schema = self._table.get_resolved_schema() schema_builder = _build_physical_schema_from_resolved_schema( resolved_schema ) # Build connector descriptor builder = TableDescriptor.for_connector("kafka") \ .option("properties.bootstrap.servers", bootstrap_servers) \ .option("topic", topic) \ .format(format_builder.build()) \ .schema(schema_builder.build()) # Additional Kafka properties if properties: for key, value in properties.items(): builder = builder.option( f"properties.{key}", _to_str(value) ) # Key format if key_format is not None: from pyflink.common.config_options import ConfigOptions key_format_opt = ConfigOptions.key("key.format") \ .string_type().no_default_value() key_format_builder = FormatDescriptor.for_format( key_format ) if key_format_options: for key, value in key_format_options.items(): key_format_builder = \ key_format_builder.option(key, _to_str(value)) builder = builder.format( key_format_builder.build(), format_option=key_format_opt, ) if key_fields is not None: builder = builder.option( "key.fields", ";".join(key_fields) ) if key_fields_prefix is not None: builder = builder.option( "key.fields-prefix", key_fields_prefix ) # Value format if value_format is not None: from pyflink.common.config_options import ConfigOptions value_format_opt = ConfigOptions.key("value.format") \ .string_type().no_default_value() value_format_builder = FormatDescriptor.for_format( value_format ) if value_format_options: for key, value in value_format_options.items(): value_format_builder = \ value_format_builder.option(key, _to_str(value)) builder = builder.format( value_format_builder.build(), format_option=value_format_opt, ) if value_fields_include is not None: builder = builder.option( "value.fields-include", value_fields_include ) # Sink-specific options sink_options = { "sink.delivery-guarantee": delivery_guarantee, "sink.partitioner": partitioner, } if transactional_id_prefix is not None: sink_options["sink.transactional-id-prefix"] = \ transactional_id_prefix if parallelism is not None: sink_options["sink.parallelism"] = str(parallelism) for key, value in sink_options.items(): if value is not None: builder = builder.option( key, _to_str(value) ) result = self._table.execute_insert(builder.build()) # Auto-detect local execution and wait for job to finish target = t_env.get_config().get("execution.target", None) if target in ("local", "minicluster"): result.wait()
[docs] def write_sls( self, endpoint: str, *, project: str, logstore: str, access_id: Optional[str] = None, access_key: Optional[str] = None, topic_field: Optional[str] = None, time_field: Optional[str] = None, source_field: Optional[str] = None, partition_field: Optional[str] = None, buckets: int = 64, flush_interval_ms: int = 2000, write_null_properties: bool = True, extra_options: Optional[Dict[str, str]] = None, ) -> None: """Write the DataFrame to an SLS (Aliyun Log Service) logstore. Wraps the Ververica SLS sink connector. The DataFrame's resolved schema is propagated to the connector automatically. Args: endpoint: SLS private-network service endpoint address. project: Name of the SLS project. logstore: SLS LogStore (or MetricStore) name. access_id: Alibaba Cloud account AccessKey ID. If both ``access_id`` and ``access_key`` are ``None`` (the default), the SLS connector falls back to the VVP STS token. access_key: Alibaba Cloud account AccessKey Secret. See ``access_id`` for the STS fallback behaviour. topic_field: Name of a column whose value overrides the SLS ``__topic__`` attribute for each record. time_field: Name of an INT column whose value overrides the SLS ``__timestamp__`` attribute. If unset, the current write time is used. source_field: Name of a column whose value overrides the SLS ``__source__`` attribute. partition_field: Name of a column to hash for shard routing so records with identical hash land on the same shard. buckets: Hash-bucket count when ``partition_field`` is set. Must be a power of 2 in [1, 256] and should be at least the shard count of the logstore. flush_interval_ms: Time interval (milliseconds) that triggers writes to SLS. write_null_properties: When ``True`` (the default), null field values are written as empty strings. When ``False``, null fields are omitted entirely. extra_options: Additional connector options forwarded through to the underlying SLS connector. This is for options not exposed as named parameters of ``write_sls``, e.g., ``IOThreadNum``, ``baseRetryBackOffTimeMs``, ``maxRetryBackOffTimeMs``. If a key matches an option generated by a named parameter of `write_sls`, the value in ``extra_options`` takes precedence. ``"connector"`` is reserved and must not be supplied. Raises: ValueError: If ``buckets`` is not a power of 2 in [1, 256], or ``extra_options`` contains a ``"connector"`` key. Examples: Minimal sink (uses default partitioning and write_null behavior):: import pyflink.dataframe as pf pf.from_records( [("INFO", "started"), ("WARN", "slow")], schema=["level", "message"], ).write_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET ) Partitioned writes (route by ``user_id`` into 128 buckets):: df.write_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, partition_field="user_id", buckets=128 ) Field overrides (use columns as SLS metadata attributes):: df.write_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, topic_field="category", source_field="host", time_field="event_time_seconds" ) Escape hatch via ``extra_options`` (tunes a non-named option and overrides ``buckets``):: df.write_sls( "cn-hangzhou-intranet.log.aliyuncs.com", project="my-project", logstore="my-logstore", access_id=ALIYUN_AK_ID, access_key=ALIYUN_AK_SECRET, partition_field="user_id", extra_options={ "IOThreadNum": "16", "buckets": "256", # overrides default 64 } ) """ from pyflink.dataframe.io import ( _add_connector_options, _add_extra_options ) from pyflink.dataframe.util.artifacts import add_built_in_connector from pyflink.table import TableDescriptor _validate_sls_buckets(buckets) options = [ ("endpoint", endpoint), ("project", project), ("logstore", logstore), ("buckets", str(buckets)), ("flushIntervalMs", str(flush_interval_ms)), ("writeNullProperties", str(write_null_properties)), ] if access_id is not None: options.append(("accessId", access_id)) if access_key is not None: options.append(("accessKey", access_key)) if topic_field is not None: options.append(("topicField", topic_field)) if time_field is not None: options.append(("timeField", time_field)) if source_field is not None: options.append(("sourceField", source_field)) if partition_field is not None: options.append(("partitionField", partition_field)) t_env = self._table._t_env add_built_in_connector(t_env, "sls") resolved_schema = self._table.get_resolved_schema() schema_builder = _build_physical_schema_from_resolved_schema( resolved_schema ) builder = TableDescriptor.for_connector("sls").schema( schema_builder.build() ) builder = _add_connector_options(builder, options) builder = _add_extra_options(builder, extra_options) result = self._table.execute_insert(builder.build()) target = t_env.get_config().get("execution.target", None) if target in ("local", "minicluster"): result.wait()
[docs] def write_hologres( self, endpoint: str, *, db_name: str, table_name: str, username: str, password: str, primary_key: Optional[Union[str, List[str]]] = None, connection_pool_size: int = 5, connection_pool_name: str = "default", connection_fixed: Optional[bool] = None, connection_max_idle_ms: int = 60000, connection_ssl_mode: Literal[ "disable", "require", "verify-ca", "verify-full" ] = "disable", connection_ssl_root_cert_location: Optional[str] = None, retry_count: int = 10, retry_sleep_step_ms: int = 5000, meta_cache_ttl_ms: int = 600000, serverless_computing: bool = False, write_mode: Literal[ "AUTO", "INSERT", "COPY_STREAM", "COPY_BULK_LOAD", "COPY_BULK_LOAD_ON_CONFLICT", ] = "AUTO", on_conflict_action: Literal[ "INSERT_OR_IGNORE", "INSERT_OR_UPDATE", "INSERT_OR_REPLACE" ] = "INSERT_OR_UPDATE", create_missing_partition: bool = True, delete_strategy: Literal[ "IGNORE_DELETE", "NON_PK_FIELD_TO_NULL", "DELETE_ROW_ON_PK", "CHANGELOG_STANDARD", ] = "CHANGELOG_STANDARD", ignore_null_when_update: bool = False, ignore_null_when_update_by_expr: bool = False, default_for_not_null_column: bool = True, remove_u0000_in_text: bool = True, partial_insert: bool = False, deduplication: bool = True, aggressive_flush: bool = False, check_and_put_column: Optional[str] = None, check_and_put_operator: str = "GREATER", check_and_put_null_as: Optional[str] = None, insert_batch_size: int = 512, insert_batch_byte_size: int = 2 * 1024 * 1024, insert_flush_interval_ms: int = 10000, copy_format: Literal["binary", "text", "binaryrow"] = "binary", insert_conflict_update_set: Optional[str] = None, insert_conflict_where: Optional[str] = None, parallelism: Optional[int] = None, extra_options: Optional[Dict[str, Any]] = None, ) -> None: """Write the DataFrame to a Hologres table. Wraps the Ververica Hologres sink connector. The DataFrame's resolved schema is propagated to the connector automatically — use ``primary_key`` to declare a primary key constraint. Args: endpoint: Hologres service endpoint (use the VPC endpoint for VVR). db_name: Hologres database name. May include a compute-group suffix (e.g. ``"my_db/my_group"``). table_name: Target Hologres table name. Use ``"schema.tableName"`` for non-public schemas. username: Hologres account username (or AccessKey ID). password: Hologres account password (or AccessKey Secret). primary_key: Primary key column or list of columns. Required for update/upsert semantics; should match the Hologres table's primary key definition. connection_pool_size: JDBC connection pool size for the sink. connection_pool_name: Connection pool name; sinks sharing the same name within a Flink TaskManager share connections. connection_fixed: When ``True``, enable Hologres lightweight (fixed) connection mode. ``None`` (the default) lets the connector pick a value based on the Hologres engine version. connection_max_idle_ms: Idle time (milliseconds) before a JDBC connection in the pool is released. connection_ssl_mode: SSL transport mode. One of ``"disable"``, ``"require"``, ``"verify-ca"``, or ``"verify-full"``. connection_ssl_root_cert_location: Path to the CA certificate file (under ``/flink/usrlib/<name>`` on VVP). retry_count: Number of retries on connection failures. retry_sleep_step_ms: Linear back-off step (milliseconds). meta_cache_ttl_ms: TTL (milliseconds) of the local Hologres ``TableSchema`` cache. serverless_computing: When ``True``, route bulk imports through Hologres serverless compute. write_mode: Write protocol. ``"AUTO"`` (default) lets the Hologres connector pick the protocol based on the Flink runtime mode and Hologres engine version — it selects a ``COPY_*`` path for batch jobs against Hologres 3.1+ and otherwise falls back to ``"INSERT"``. ``"INSERT"`` forces JDBC INSERTs. ``"COPY_STREAM"`` uses streaming COPY. The two ``COPY_BULK_LOAD*`` modes perform bulk loads (PK conflicts throw with ``COPY_BULK_LOAD``). on_conflict_action: PK conflict strategy. ``"INSERT_OR_UPDATE"`` (default) updates conflicting rows, ``"INSERT_OR_IGNORE"`` skips them, and ``"INSERT_OR_REPLACE"`` deletes-then-inserts. create_missing_partition: When ``True`` (the default), the sink auto-creates a partition child table on first write if it doesn't already exist. Set to ``False`` to require that partitions be created out of band, in which case writes to a missing partition will fail. Only valid in ``INSERT`` mode. delete_strategy: How to handle retraction (-D, -U) records. ``"CHANGELOG_STANDARD"`` (default) processes them per Flink's normal change-log semantics; the other values apply to partial-update scenarios. ``"NON_PK_FIELD_TO_NULL"`` and ``"DELETE_ROW_ON_PK"`` require ``primary_key`` to be set. ignore_null_when_update: When ``True``, ignore null values during updates (effective only in INSERT mode). ignore_null_when_update_by_expr: Same as above but for expression-based updates (requires Hologres 4.0+). default_for_not_null_column: When ``True`` (the default), auto- fill defaults when writing null into NOT NULL columns. remove_u0000_in_text: When ``True`` (the default), strip the illegal ``\\u0000`` character from string columns. partial_insert: When ``True``, only write the columns declared in the Flink schema. deduplication: When ``True`` (the default), deduplicate rows by primary key inside each batch. aggressive_flush: When ``True``, flush even partially-filled batches when the sink is idle. check_and_put_column: Column used for conditional updates. The update only happens when ``new.<column> <operator> old.<column>`` evaluates true. Not supported with any ``COPY_*`` ``write_mode``. ``"AUTO"`` is accepted because it normally resolves to ``"INSERT"``, but be aware that batch jobs on Hologres 3.1+ may resolve ``"AUTO"`` to ``"COPY_BULK_LOAD*"``; in that case the Hologres connector will reject the combination at submit time. Set ``write_mode="INSERT"`` explicitly if you need check-and-put in a batch job. check_and_put_operator: Comparison operator for conditional updates. Default is ``"GREATER"``. check_and_put_null_as: Value used in place of NULL during the comparison. insert_batch_size: Maximum rows per INSERT batch. insert_batch_byte_size: Maximum bytes per INSERT batch (default: 2 MB). insert_flush_interval_ms: Maximum wait time (milliseconds) before flushing an INSERT batch. copy_format: Wire format used in COPY modes. ``"binary"`` (the default) is the only value accepted with ``write_mode='AUTO'`` or ``write_mode='INSERT'`` (AUTO may resolve to INSERT at runtime). The ``COPY_*`` modes additionally accept ``"text"`` (typically used with ``COPY_BULK_LOAD*``) and ``"binaryrow"`` (a ``COPY_STREAM`` optimization, Hologres 4.1+); the Hologres connector may impose further constraints based on the engine version. insert_conflict_update_set: SET clause executed on PK conflict (a Hologres expression, e.g. ``"col=excluded.col + 1"``). Conflicts with the ``check_and_put_*`` options. insert_conflict_where: WHERE clause limiting which conflicting rows are updated. Conflicts with the ``check_and_put_*`` options. parallelism: Custom parallelism for the Hologres sink operator. ``None`` (the default) leaves the option unset, letting Flink inherit parallelism from the upstream operator. extra_options: Additional connector options forwarded through to the underlying Hologres connector. This is for options not exposed as named parameters of ``write_hologres``, e.g., ``connection.direct.enabled``, ``connection.max-alive-ms``. If a key matches an option generated by a named parameter of ``write_hologres``, the value in ``extra_options`` takes precedence. ``"connector"`` is reserved and must not be supplied. Raises: ValueError: If any enum value is invalid; if ``copy_format`` is non-binary with ``write_mode='AUTO'`` or ``write_mode='INSERT'``; if ``connection_ssl_mode`` is ``"verify-ca"`` or ``"verify-full"`` without ``connection_ssl_root_cert_location``; if the ``check_and_put_*`` options are combined with an incompatible ``on_conflict_action`` or with ``insert_conflict_where``; if ``check_and_put_column`` is combined with an explicit ``COPY_*`` ``write_mode``; if ``delete_strategy`` is ``"NON_PK_FIELD_TO_NULL"`` or ``"DELETE_ROW_ON_PK"`` without ``primary_key``; or if ``extra_options`` contains a ``"connector"`` key. Examples: Streaming upsert sink (default AUTO write_mode — the connector picks INSERT for streaming jobs and COPY paths for batch/Holo 3.1+):: import pyflink.dataframe as pf events = pf.read_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.events", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, schema={ "event_id": pf.DataType.bigint(), "user": pf.DataType.string(), }, primary_key="event_id", ) events.write_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.events_processed", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, primary_key="event_id", ) High-throughput bulk load (COPY_BULK_LOAD on append-only data):: df.write_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.daily_summary", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, write_mode="COPY_BULK_LOAD", copy_format="text", insert_batch_size=4096, insert_batch_byte_size=8 * 1024 * 1024, ) Conditional update with check-and-put (only overwrite when the new ``version`` is greater):: df.write_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.profiles", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, primary_key="user_id", on_conflict_action="INSERT_OR_UPDATE", check_and_put_column="version", check_and_put_operator="GREATER", check_and_put_null_as="0", ) Partial update (write only listed columns), tuned batch size:: df.write_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.user_state", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, primary_key="user_id", partial_insert=True, insert_batch_size=1024, insert_flush_interval_ms=2000, ignore_null_when_update=True, ) Forward-compat: tune a non-named option (and override a named one) via ``extra_options``:: df.write_hologres( "hgprecn-cn-xxx-cn-hangzhou-internal.hologres.aliyuncs.com:80", db_name="my_db", table_name="public.events", username=ALIYUN_AK_ID, password=ALIYUN_AK_SECRET, extra_options={ "hologres.server.version": "3.1", "sink.insert.batch-size": "2048", # overrides default }, ) """ _validate_write_hologres_options( write_mode=write_mode, on_conflict_action=on_conflict_action, delete_strategy=delete_strategy, copy_format=copy_format, connection_ssl_mode=connection_ssl_mode, connection_ssl_root_cert_location=connection_ssl_root_cert_location, check_and_put_column=check_and_put_column, insert_conflict_where=insert_conflict_where, primary_key=primary_key, ) from pyflink.dataframe.io import ( _add_connector_options, _add_extra_options, ) from pyflink.dataframe.util.artifacts import add_built_in_connector from pyflink.table import TableDescriptor options = [ ("endpoint", endpoint), ("dbname", db_name), ("tablename", table_name), ("username", username), ("password", password), ("connection.pool.size", str(connection_pool_size)), ("connection.pool.name", connection_pool_name), ("connection.max-idle-ms", str(connection_max_idle_ms)), ("connection.ssl.mode", connection_ssl_mode), ("retry-count", str(retry_count)), ("retry-sleep-step-ms", str(retry_sleep_step_ms)), ("meta-cache-ttl-ms", str(meta_cache_ttl_ms)), ("serverless-computing.enabled", str(serverless_computing)), ("sink.write-mode", write_mode), ("sink.on-conflict-action", on_conflict_action), ("sink.create-missing-partition", str(create_missing_partition)), ("sink.delete-strategy", delete_strategy), ("sink.ignore-null-when-update.enabled", str(ignore_null_when_update)), ( "sink.ignore-null-when-update-by-expr.enabled", str(ignore_null_when_update_by_expr), ), ( "sink.default-for-not-null-column.enabled", str(default_for_not_null_column), ), ("sink.remove-u0000-in-text.enabled", str(remove_u0000_in_text)), ("sink.partial-insert.enabled", str(partial_insert)), ("sink.deduplication.enabled", str(deduplication)), ("sink.aggressive-flush.enabled", str(aggressive_flush)), ("sink.insert.check-and-put.operator", check_and_put_operator), ("sink.insert.batch-size", str(insert_batch_size)), ("sink.insert.batch-byte-size", str(insert_batch_byte_size)), ("sink.insert.flush-interval-ms", str(insert_flush_interval_ms)), ("sink.copy.format", copy_format), ] if connection_fixed is not None: options.append(("connection.fixed.enabled", str(connection_fixed))) if connection_ssl_root_cert_location is not None: options.append( ( "connection.ssl.root-cert.location", connection_ssl_root_cert_location, ) ) if check_and_put_column is not None: options.append( ("sink.insert.check-and-put.column", check_and_put_column) ) if check_and_put_null_as is not None: options.append( ("sink.insert.check-and-put.null-as", check_and_put_null_as) ) if insert_conflict_update_set is not None: options.append( ("sink.insert.conflict-update-set", insert_conflict_update_set) ) if insert_conflict_where is not None: options.append( ("sink.insert.conflict-where", insert_conflict_where) ) if parallelism is not None: options.append(("sink.parallelism", str(parallelism))) t_env = self._table._t_env add_built_in_connector(t_env, "hologres") resolved_schema = self._table.get_resolved_schema() schema_builder = _build_physical_schema_from_resolved_schema( resolved_schema, primary_key=primary_key, ) builder = TableDescriptor.for_connector("hologres").schema( schema_builder.build() ) builder = _add_connector_options(builder, options) builder = _add_extra_options(builder, extra_options) result = self._table.execute_insert(builder.build()) target = t_env.get_config().get("execution.target", None) if target in ("local", "minicluster"): result.wait()
[docs] def write_paimon( self, path: Optional[str] = None, *, primary_key: Optional[Union[str, List[str]]] = None, auto_create: bool = False, file_format: Literal["orc", "parquet", "avro", "lance"] = "parquet", bucket: int = 1, bucket_key: Optional[Union[str, List[str]]] = None, changelog_producer: Literal[ "none", "input", "full-compaction", "lookup" ] = "none", full_compaction_delta_commits: Optional[int] = None, lookup_cache_max_memory_size: str = "256 MB", merge_engine: Literal[ "deduplicate", "partial-update", "aggregation" ] = "deduplicate", partial_update_ignore_delete: bool = False, ignore_delete: bool = False, partition_default_name: str = "__DEFAULT_PARTITION__", partition_expiration_check_interval: str = "1h", partition_expiration_time: Optional[str] = None, partition_timestamp_formatter: Optional[str] = None, partition_timestamp_pattern: Optional[str] = None, snapshot_num_retained_max: int = 2147483647, snapshot_num_retained_min: int = 10, snapshot_time_retained: str = "1h", write_mode: Literal["change-log", "append-only"] = "change-log", parallelism: Optional[int] = None, clustering_by_columns: Optional[Union[str, List[str]]] = None, delete_strategy: Literal[ "NONE", "IGNORE_DELETE", "NON_PK_FIELD_TO_NULL", "DELETE_ROW_ON_PK", "CHANGELOG_STANDARD", ] = "NONE", extra_options: Optional[Dict[str, Any]] = None, ) -> None: """ Write the DataFrame to a Paimon table. Uses the Paimon SQL connector under the hood. The sink schema is built from the DataFrame's physical columns and the connector options are exposed with Pythonic snake_case names. Args: path: File-system path of the Paimon table. primary_key: Optional primary key column name or list of column names. Set this when the sink table should include a primary key constraint. auto_create: Whether to create the underlying storage when writing the table. Default is False. file_format: Format of Paimon data files. One of ``"orc"``, ``"parquet"``, ``"avro"``, or ``"lance"``. Default is ``"parquet"``. bucket: Bucket number for the file store. bucket_key: Column name or list of column names defining the Paimon distribution policy. Data is assigned to each bucket according to the hash value of these fields. When unset, Paimon uses the primary key, or the full row if no primary key exists. changelog_producer: Whether and how to double-write to changelog files. Changelog files keep data-change details and can be read directly during stream reads. ``"none"`` produces no changelog file, ``"input"`` writes input changes when flushing the memory table, ``"full-compaction"`` generates changelog files during full compaction, and ``"lookup"`` produces changelog by lookup before committing snapshots. full_compaction_delta_commits: Maximum number of committed snapshots between two full compactions. lookup_cache_max_memory_size: Memory size used by lookup cache and the lookup changelog producer cache. Default is ``"256 MB"``. merge_engine: Merge engine for tables with primary keys. ``"deduplicate"`` de-duplicates and keeps the last row, ``"partial-update"`` updates non-null fields, and ``"aggregation"`` aggregates rows with the same primary key. partial_update_ignore_delete: Whether to ignore delete records in partial-update mode. ignore_delete: Whether to ignore delete records. partition_default_name: Default partition name used when a dynamic partition column value is null or an empty string. partition_expiration_check_interval: Interval for checking expired partitions. partition_expiration_time: Partition retention time. When unset, partitions never expire. partition_timestamp_formatter: Formatter for converting partition time strings to timestamps. partition_timestamp_pattern: Pattern for extracting a time string from partition values. snapshot_num_retained_max: Maximum number of completed snapshots to retain. snapshot_num_retained_min: Minimum number of completed snapshots to retain. snapshot_time_retained: Maximum time to retain completed snapshots. write_mode: Paimon table write mode. ``"append-only"`` accepts only append-only inserts and does not perform deduplication or primary key constraints. ``"change-log"`` accepts insert, delete, and update operations. parallelism: Custom sink parallelism. If unset, the planner derives statement parallelism while considering global configuration. clustering_by_columns: Column name or list of column names used for clustering append-only table writes in batch jobs. delete_strategy: Validation strategy for sink retraction messages. One of ``"NONE"``, ``"IGNORE_DELETE"``, ``"NON_PK_FIELD_TO_NULL"``, ``"DELETE_ROW_ON_PK"``, or ``"CHANGELOG_STANDARD"``. extra_options: Additional connector options forwarded through to the underlying Paimon connector. This is for options not exposed as named parameters of ``write_paimon``. If a key matches an option generated by a named parameter of `write_paimon`, the value in ``extra_options`` takes precedence. ``"connector"`` is reserved and must not be supplied. Example:: >>> import pyflink.dataframe as pf >>> >>> events = pf.from_records( ... [("2026-05-27", 1, "login"), ("2026-05-27", 2, "pay")], ... schema=["dt", "user_id", "payload"], ... ) >>> >>> # Append-only table with explicit bucket distribution >>> events.write_paimon( ... "oss://bucket/warehouse/db.db/events", ... auto_create=True, ... write_mode="append-only", ... file_format="orc", ... bucket=16, ... bucket_key=["dt", "user_id"], ... parallelism=4, ... ) >>> >>> # Batch append-only write with clustering >>> events.write_paimon( ... "oss://bucket/warehouse/db.db/events_clustered", ... auto_create=True, ... write_mode="append-only", ... bucket_key=["dt", "user_id"], ... clustering_by_columns=["dt", "user_id"], ... ) >>> >>> order_schema = { ... "order_id": pf.DataType.int64(), ... "amount": pf.DataType.float64(), ... "status": pf.DataType.string(), ... "event_ts": pf.DataType.timestamp(3), ... } >>> orders = pf.read_paimon( ... "oss://bucket/warehouse/db.db/orders", ... schema=order_schema, ... primary_key="order_id", ... ) >>> >>> # Change-log sink with lookup changelog production >>> orders.write_paimon( ... "oss://bucket/warehouse/db.db/orders", ... primary_key="order_id", ... auto_create=True, ... bucket_key=["order_id"], ... changelog_producer="lookup", ... merge_engine="deduplicate", ... delete_strategy="CHANGELOG_STANDARD", ... snapshot_num_retained_min=5, ... snapshot_time_retained="12h", ... ) >>> >>> # Partial-update sink that ignores delete records >>> orders.write_paimon( ... "oss://bucket/warehouse/db.db/orders_partial", ... primary_key="order_id", ... auto_create=True, ... merge_engine="partial-update", ... ignore_delete=True, ... lookup_cache_max_memory_size="1 GB", ... ) >>> >>> metrics_schema = { ... "product_id": pf.DataType.int64(), ... "price": pf.DataType.float64(), ... "sales": pf.DataType.int64(), ... } >>> metrics = pf.read_paimon( ... "oss://bucket/warehouse/db.db/product_metrics", ... schema=metrics_schema, ... primary_key="product_id", ... ) >>> >>> # Aggregation merge engine with original Paimon option keys >>> metrics.write_paimon( ... "oss://bucket/warehouse/db.db/product_metrics_agg", ... primary_key="product_id", ... auto_create=True, ... merge_engine="aggregation", ... extra_options={ ... "fields.price.aggregate-function": "max", ... "fields.sales.aggregate-function": "sum", ... }, ... ) """ from pyflink.dataframe.io import ( _PAIMON_DELETE_STRATEGIES, _add_connector_options, _add_extra_options, _format_paimon_column_list, _paimon_common_options, _validate_allowed_value, _validate_paimon_common_options, ) from pyflink.dataframe.util.artifacts import add_built_in_connector from pyflink.table import TableDescriptor t_env = self._table._t_env add_built_in_connector(t_env, "paimon") _validate_paimon_common_options( file_format=file_format, changelog_producer=changelog_producer, merge_engine=merge_engine, write_mode=write_mode, ) _validate_allowed_value( delete_strategy, "delete_strategy", _PAIMON_DELETE_STRATEGIES, ) resolved_schema = self._table.get_resolved_schema() schema_builder = _build_physical_schema_from_resolved_schema( resolved_schema, primary_key=primary_key, ) paimon_options = _paimon_common_options( path=path, auto_create=auto_create, file_format=file_format, bucket=bucket, bucket_key=bucket_key, changelog_producer=changelog_producer, full_compaction_delta_commits=full_compaction_delta_commits, lookup_cache_max_memory_size=lookup_cache_max_memory_size, merge_engine=merge_engine, partial_update_ignore_delete=partial_update_ignore_delete, ignore_delete=ignore_delete, partition_default_name=partition_default_name, partition_expiration_check_interval=partition_expiration_check_interval, partition_expiration_time=partition_expiration_time, partition_timestamp_formatter=partition_timestamp_formatter, partition_timestamp_pattern=partition_timestamp_pattern, snapshot_num_retained_max=snapshot_num_retained_max, snapshot_num_retained_min=snapshot_num_retained_min, snapshot_time_retained=snapshot_time_retained, write_mode=write_mode, ) paimon_options.extend([ ("sink.parallelism", parallelism), ( "sink.clustering.by-columns", _format_paimon_column_list( clustering_by_columns, "clustering_by_columns" ), ), ("sink.delete-strategy", delete_strategy), ]) builder = TableDescriptor.for_connector("paimon") \ .schema(schema_builder.build()) builder = _add_connector_options(builder, paimon_options) builder = _add_extra_options(builder, extra_options) result = self._table.execute_insert(builder.build()) target = t_env.get_config().get("execution.target", None) if target in ("local", "minicluster"): result.wait()
[docs] def write_odps( self, endpoint: str, *, tunnel_endpoint: Optional[str] = None, project: str, schema_name: Optional[str] = None, table_name: str, access_id: str, access_key: str, partition: Optional[ Union[str, Tuple[str, Any], List[Tuple[str, Any]], List[str]] ] = None, compress_algorithm: Literal["RAW", "ZLIB", "SNAPPY"] = "SNAPPY", quota_name: Optional[str] = None, # Sink options use_stream_tunnel: bool = False, flush_interval_ms: int = 30000, batch_size: int = 64 * 1024 * 1024, num_flush_threads: int = 1, slot_num: int = 0, dynamic_partition_limit: int = 100, retry_times: int = 3, sleep_millis: int = 1000, enable_upsert: bool = False, upsert_async_commit: bool = False, upsert_commit_timeout_ms: int = 120000, operation: Literal["insert", "upsert"] = "insert", parallelism: Optional[int] = None, file_cached_enable: bool = False, file_cached_writer_num: int = 16, bucket_check_interval: int = 60000, file_cached_rolling_max_size: str = "16mb", file_cached_memory: str = "64mb", file_cached_memory_segment_size: str = "128kb", file_cached_flush_always: bool = True, file_cached_write_max_retries: int = 3, upsert_writer_max_retries: int = 3, upsert_writer_buffer_size: str = "64mb", upsert_writer_bucket_buffer_size: str = "1mb", upsert_write_bucket_num: Optional[int] = None, upsert_write_slot_num: int = 1, upsert_commit_max_retries: int = 3, upsert_commit_thread_num: int = 16, upsert_commit_timeout: int = 600, upsert_flush_concurrent: int = 2, insert_commit_thread_num: int = 16, insert_arrow_writer_enable: bool = False, insert_arrow_writer_batch_size: int = 512, insert_arrow_writer_flush_interval: int = 100000, insert_writer_buffer_size: str = "64mb", upsert_partial_column_enable: bool = False, ) -> None: """ Write the DataFrame to a MaxCompute (ODPS) table. Uses the ODPS SQL connector under the hood. The DataFrame schema is derived from the current DataFrame, and connector options are exposed with Pythonic snake_case names in the same order as the ODPS documentation. Args: endpoint: MaxCompute endpoint. tunnel_endpoint: MaxCompute Tunnel endpoint. If unset, MaxCompute allocates tunnel connections through Server Load Balancer (SLB). project: MaxCompute project name. schema_name: Required only when the MaxCompute schema feature is enabled. Set this to the table's schema name. table_name: MaxCompute table name. access_id: AccessKey ID used to access MaxCompute. access_key: AccessKey secret used to access MaxCompute. partition: Partition name in the MaxCompute table. Examples: - To write to fixed partition column ``dt`` with value ``20220901``, pass ``("dt", "20220901")``. - For a three-level partition table ``dt``/``hh``/``mm``, to write to ``dt=20220901,hh=08,mm=10``, pass ``[("dt", "20220901"), ("hh", "08"), ("mm", "10")]``. - To write dynamic partitions from a single partition column ``partition_name``, pass ``"partition_name"``. - For multi-level dynamic partition columns ``dt``/``hh``/``mm``, pass ``["dt", "hh", "mm"]``. compress_algorithm: Compression algorithm for MaxCompute Tunnel. Valid values are ``"RAW"`` (no compression), ``"ZLIB"``, and ``"SNAPPY"``. ``"RAW"`` cannot be used when ``enable_upsert`` is true. quota_name: Quota name for exclusive MaxCompute Tunnel resource groups. If specified, remove ``tunnel_endpoint``; otherwise the tunnel specified by ``tunnel_endpoint`` takes precedence. use_stream_tunnel: Use MaxCompute Streaming Tunnel instead of Batch Tunnel. ``True`` selects Streaming Tunnel and ``False`` selects Batch Tunnel. flush_interval_ms: Flush interval for the tunnel writer buffer, in milliseconds. For Streaming Tunnel, flushed data is immediately available. For Batch Tunnel, data becomes available only after checkpointing; set to ``0`` to disable scheduled flushing. Triggered when either ``flush_interval_ms`` or ``batch_size`` is reached. batch_size: Buffer size in bytes. Data is flushed when the buffer reaches this size. Triggered when either ``batch_size`` or ``flush_interval_ms`` is reached. num_flush_threads: Number of threads used to flush the tunnel writer buffer. Values greater than 1 allow concurrent flushing across partitions. slot_num: Number of Tunnel slots for receiving data from Flink. dynamic_partition_limit: Maximum number of dynamic partitions written between two checkpoints. Writing to many partitions increases load on MaxCompute and slows checkpointing. Increase this only when your workload requires it. retry_times: Maximum retries for MaxCompute server requests, including session creation, submission, and flush failures. sleep_millis: Retry interval in milliseconds. enable_upsert: Use MaxCompute Upsert Tunnel. ``True`` processes INSERT, UPDATE_AFTER, and DELETE records. ``False`` uses the tunnel specified by ``use_stream_tunnel``. If session commits in upsert mode encounter errors or long-running faults, set sink operator parallelism to 10 or fewer. upsert_async_commit: Use asynchronous mode when committing upsert sessions. Async mode reduces commit time, but committed data is not immediately queryable. upsert_commit_timeout_ms: Timeout for upsert session commits, in milliseconds. operation: Write mode for a Delta table. ``"insert"`` is append mode; ``"upsert"`` is update mode and requires ``upsert_write_bucket_num``. parallelism: Write parallelism for a Delta table. Defaults to upstream parallelism. ``write.bucket.num`` must be an integral multiple of this value for optimal write performance and memory efficiency. file_cached_enable: Enable file cache mode when writing to dynamic partitions of a Delta table. This reduces small files written to the server but increases write latency. Enable when the sink has high parallelism. file_cached_writer_num: Concurrent upload threads per task in file cache mode. Avoid setting this too high because writing to many partitions simultaneously can cause OOM errors. Effective only when ``file_cached_enable`` is true. bucket_check_interval: File size check interval in file cache mode, in milliseconds. Effective only when ``file_cached_enable`` is true. file_cached_rolling_max_size: Maximum size of a single cached file. When exceeded, data is uploaded to the server. Effective only when ``file_cached_enable`` is true. file_cached_memory: Maximum off-heap memory for file writes in file cache mode. Effective only when ``file_cached_enable`` is true. file_cached_memory_segment_size: Buffer segment size for file writes in file cache mode. Effective only when ``file_cached_enable`` is true. file_cached_flush_always: Whether to use the cache when writing files in file cache mode. Effective only when ``file_cached_enable`` is true. file_cached_write_max_retries: Retry count for data uploads in file cache mode. Effective only when ``file_cached_enable`` is true. upsert_writer_max_retries: Maximum retries for writing to a bucket in an Upsert Writer session. upsert_writer_buffer_size: Total buffer size across all buckets in an Upsert Writer session. Data is flushed when the total reaches this threshold. Increase for better write efficiency; decrease if writing to many partitions causes OOM errors. upsert_writer_bucket_buffer_size: Per-bucket buffer size in an Upsert Writer session. Decrease if Flink server memory is insufficient. upsert_write_bucket_num: Number of buckets for the target Delta table. Must match ``write.bucket.num`` configured on the Delta table. upsert_write_slot_num: Tunnel slots per upsert session. upsert_commit_max_retries: Maximum retries for upsert session commits. upsert_commit_thread_num: Parallelism for upsert session commits. Avoid large values because excessive concurrent commits increase resource consumption and can cause performance issues. upsert_commit_timeout: Upsert session commit timeout in seconds. upsert_flush_concurrent: Maximum concurrent bucket flushes per partition. Each bucket flush occupies a Tunnel slot. insert_commit_thread_num: Parallelism for insert session commits. insert_arrow_writer_enable: Use the Arrow format for inserts. insert_arrow_writer_batch_size: Maximum rows per Arrow-format batch. insert_arrow_writer_flush_interval: Writer flush interval in milliseconds. insert_writer_buffer_size: Cache size for the buffered writer. upsert_partial_column_enable: Update only specified columns (partial column update). Applies only to Delta table sinks. When true, if a record with the same primary key exists, specified non-null fields are overwritten. If no matching record exists, a new record is inserted with new values for specified columns and null for all unspecified columns. Example:: >>> import pyflink.dataframe as pf >>> df = pf.from_records([(1, "a"), (2, "b")], ... schema=["id", "name"]) >>> df.write_odps( ... "http://service.cn-hangzhou.maxcompute.aliyun.com/api", ... project="my_project", ... table_name="events", ... access_id="${secret_values.ak_id}", ... access_key="${secret_values.ak_secret}", ... partition="ds=20260428", ... ) """ from pyflink.dataframe.util.artifacts import add_built_in_connector from pyflink.dataframe.io import ( _ODPS_COMPRESS_ALGORITHMS, _ODPS_SINK_OPERATIONS, _add_connector_options, _format_odps_partition, _validate_allowed_value, _validate_odps_common_options, ) from pyflink.table import TableDescriptor t_env = self._table._t_env add_built_in_connector(t_env, "odps") _validate_allowed_value( compress_algorithm, "compress_algorithm", _ODPS_COMPRESS_ALGORITHMS, ) _validate_allowed_value( operation, "operation", _ODPS_SINK_OPERATIONS, ) _validate_odps_common_options( endpoint=endpoint, project=project, table_name=table_name, access_id=access_id, access_key=access_key, tunnel_endpoint=tunnel_endpoint, quota_name=quota_name, ) partition = _format_odps_partition( partition, allow_dynamic=True ) options = [ ("endpoint", endpoint), ("tunnelEndpoint", tunnel_endpoint), ("project", project), ("schemaName", schema_name), ("tableName", table_name), ("accessId", access_id), ("accessKey", access_key), ("partition", partition), ("compressAlgorithm", compress_algorithm), ("quotaName", quota_name), ("useStreamTunnel", use_stream_tunnel), ("flushIntervalMs", flush_interval_ms), ("batchSize", batch_size), ("numFlushThreads", num_flush_threads), ("slotNum", slot_num), ("dynamicPartitionLimit", dynamic_partition_limit), ("retryTimes", retry_times), ("sleepMillis", sleep_millis), ("enableUpsert", enable_upsert), ("upsertAsyncCommit", upsert_async_commit), ("upsertCommitTimeoutMs", upsert_commit_timeout_ms), ("sink.operation", operation), ("sink.parallelism", parallelism), ("sink.file-cached.enable", file_cached_enable), ("sink.file-cached.writer.num", file_cached_writer_num), ("sink.bucket.check-interval", bucket_check_interval), ("sink.file-cached.rolling.max-size", file_cached_rolling_max_size), ("sink.file-cached.memory", file_cached_memory), ("sink.file-cached.memory.segment-size", file_cached_memory_segment_size), ("sink.file-cached.flush.always", file_cached_flush_always), ("sink.file-cached.write.max-retries", file_cached_write_max_retries), ("upsert.writer.max-retries", upsert_writer_max_retries), ("upsert.writer.buffer-size", upsert_writer_buffer_size), ("upsert.writer.bucket.buffer-size", upsert_writer_bucket_buffer_size), ("upsert.write.bucket.num", upsert_write_bucket_num), ("upsert.write.slot-num", upsert_write_slot_num), ("upsert.commit.max-retries", upsert_commit_max_retries), ("upsert.commit.thread-num", upsert_commit_thread_num), ("upsert.commit.timeout", upsert_commit_timeout), ("upsert.flush.concurrent", upsert_flush_concurrent), ("insert.commit.thread-num", insert_commit_thread_num), ("insert.arrow-writer.enable", insert_arrow_writer_enable), ("insert.arrow-writer.batch-size", insert_arrow_writer_batch_size), ("insert.arrow-writer.flush-interval", insert_arrow_writer_flush_interval), ("insert.writer.buffer-size", insert_writer_buffer_size), ("upsert.partial-column.enable", upsert_partial_column_enable), ] resolved_schema = self._table.get_resolved_schema() schema_builder = _build_physical_schema_from_resolved_schema( resolved_schema ) builder = TableDescriptor.for_connector("odps") \ .schema(schema_builder.build()) builder = _add_connector_options(builder, options) result = self._table.execute_insert(builder.build()) target = t_env.get_config().get("execution.target", None) if target in ("local", "minicluster"): result.wait()
[docs] def write_custom( self, connector: str, *, primary_key: Optional[Union[str, List[str]]] = None, options: Optional[Dict[str, Any]] = None, ) -> None: """ Write the DataFrame using a custom connector. This is a generic entrypoint for connectors that are not covered by the dedicated ``write_*`` helpers (e.g. ``write_parquet``, ``write_kafka``, ``write_odps``). It targets SQL connectors that are discoverable through Flink's standard factory mechanism. The DataFrame's physical columns are forwarded to the sink. Primary key constraints are not inherited from upstream tables; pass ``primary_key`` when the sink schema should include one. Args: connector: Factory identifier of the connector. This is the same value users put in SQL DDL ``WITH ('connector' = '...')``. primary_key: Optional primary key column name or list of column names. Set this when the custom sink connector should receive a primary key constraint in its table schema. options: Optional dict of connector and table options. Each key is forwarded as-is with the same name it would have in ``WITH (...)`` in SQL DDL. Values are converted to strings. ``"connector"`` is reserved and must be specified via the ``connector`` argument. Example:: >>> import pyflink.dataframe as pf >>> >>> df = pf.from_records([(1, "a")], schema=["id", "name"]) >>> >>> # Write through a custom connector identified by ``"my-custom"``. >>> df.write_custom( ... "my-custom", ... options={ ... "host": "example.com", ... "port": "9000", ... }, ... ) >>> >>> # Use a format and additional sink options >>> df.write_custom( ... "my-custom", ... primary_key="id", ... options={ ... "endpoint": "example.com:9000", ... "format": "json", ... "json.map-null-key.mode": "FAIL", ... "sink.parallelism": "4", ... }, ... ) """ from pyflink.dataframe.io import _collect_custom_options from pyflink.dataframe.util.artifacts import add_built_in_connector from pyflink.table import TableDescriptor if not isinstance(connector, str) or not connector: raise ValueError( "'connector' must be a non-empty string identifying the " "connector (the same value used as 'connector' in SQL " "DDL)." ) t_env = self._table._t_env # If `connector` is a built-in connector that we do not # expose directly via a dedicated write_* helper, auto-load its jar. # Unknown / user-defined connector names are silently ignored by the # helper. add_built_in_connector(t_env, connector) connector_options = _collect_custom_options(options) builder = TableDescriptor.for_connector(connector) resolved_schema = self._table.get_resolved_schema() schema_builder = _build_physical_schema_from_resolved_schema( resolved_schema, primary_key=primary_key, ) builder = builder.schema(schema_builder.build()) for key, value in connector_options.items(): builder = builder.option(key, str(value)) result = self._table.execute_insert(builder.build()) target = t_env.get_config().get("execution.target", None) if target in ("local", "minicluster"): result.wait()
# ======================== Explain ========================
[docs] def explain( self, *, show_estimated_cost: bool = False, show_physical_execution_plan: bool = False, ) -> None: """ Print the AST and execution plan of this DataFrame. Args: show_estimated_cost: If True, include the optimizer's estimated cost (row count, cpu, io, etc.) for each physical node. Default is False. show_physical_execution_plan: If True, include the physical execution plan in JSON format. Default is False. Example:: >>> import pyflink.dataframe as pf >>> df = pf.from_records([(1, "a"), (2, "b")], schema=["id", "name"]) >>> df.explain() == Abstract Syntax Tree == LogicalTableScan(table=[[*anonymous_python-input-format$1*]]) <BLANKLINE> == Optimized Physical Plan == TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, name]) <BLANKLINE> == Optimized Execution Plan == TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, name]) >>> df.explain(show_estimated_cost=True) == Abstract Syntax Tree == LogicalTableScan(table=[[*anonymous_python-input-format$1*]]) <BLANKLINE> == Optimized Physical Plan == TableSourceScan(...): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, ...} <BLANKLINE> == Optimized Execution Plan == TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, name]) >>> df.explain(show_physical_execution_plan=True) == Abstract Syntax Tree == LogicalTableScan(table=[[*anonymous_python-input-format$1*]]) <BLANKLINE> == Optimized Physical Plan == TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, name]) <BLANKLINE> == Optimized Execution Plan == TableSourceScan(table=[[*anonymous_python-input-format$1*]], fields=[id, name]) <BLANKLINE> == Physical Execution Plan == { "nodes" : [ { "id" : 9, "type" : "Source: *anonymous_filesystem$1*[9]", "pact" : "Data Source", "contents" : "[9]:TableSourceScan(table=[[*anonymous_filesystem$1*, filter=[IS...", "parallelism" : 24 } ] } """ from pyflink.table import ExplainDetail extra_details = [] if show_estimated_cost: extra_details.append(ExplainDetail.ESTIMATED_COST) if show_physical_execution_plan: extra_details.append(ExplainDetail.JSON_EXECUTION_PLAN) print(self._table.explain(*extra_details))
# ======================== Properties ======================== @property def schema(self) -> "TableSchema": """ Get the schema of the DataFrame. Returns: The TableSchema. """ return self._table.get_schema() @property def columns(self) -> List[str]: """ Get the column names. Returns: A list of column names. """ return self._table.get_schema().get_field_names() # ======================== Null/NaN Handling ======================== def _get_float_columns(self) -> List[str]: """Return column names with FloatType or DoubleType.""" schema = self._table.get_schema() return [ name for name in schema.get_field_names() if isinstance(schema.get_field_data_type(name), (FloatType, DoubleType)) ]
[docs] def drop_null(self, subset: Optional[List[str]] = None) -> "DataFrame": """ Drop rows containing null values. Does NOT drop rows with NaN — use :meth:`drop_nan` for that. Args: subset: Column names to check. If None, checks all columns. Returns: A new DataFrame with rows containing nulls removed. Example:: >>> df.drop_null() # drop if any column is null >>> df.drop_null(subset=["a"]) # drop if column "a" is null """ schema_columns = self._table.get_schema().get_field_names() columns = subset if subset is not None else schema_columns if not columns: return self schema_set = set(schema_columns) for c in columns: if c not in schema_set: raise ValueError(f"Column '{c}' not found in schema") conditions = [col(c).is_not_null for c in columns] predicate = conditions[0] if len(conditions) == 1 else and_(*conditions) return DataFrame(self._table.filter(predicate))
[docs] def drop_nan(self, subset: Optional[List[str]] = None) -> "DataFrame": """ Drop rows containing NaN values in float columns. Does NOT drop rows with null — use :meth:`drop_null` for that. Non-float columns in ``subset`` are silently ignored since NaN only applies to float/double types. Args: subset: Column names to check. If None, checks all float columns. Returns: A new DataFrame with rows containing NaN removed. Example:: >>> df.drop_nan() # drop if any float column is NaN >>> df.drop_nan(subset=["a"]) # drop if float column "a" is NaN """ float_columns = set(self._get_float_columns()) schema_columns = set(self._table.get_schema().get_field_names()) if subset is not None: for c in subset: if c not in schema_columns: raise ValueError(f"Column '{c}' not found in schema") columns = [c for c in subset if c in float_columns] else: columns = list(float_columns) if not columns: return self # Use isnan() function (VVR built-in) to detect NaN. # isnan(NULL) returns NULL, and ~NULL (isNotTrue) returns true, # so null rows are correctly preserved. conditions = [~call_sql(f'isnan(`{c}`)') for c in columns] predicate = conditions[0] if len(conditions) == 1 else and_(*conditions) return DataFrame(self._table.filter(predicate))
[docs] def fill_null(self, value: Any, subset: Optional[List[str]] = None) -> "DataFrame": """ Replace null values with a given value. Does NOT replace NaN — use :meth:`fill_nan` for that. Args: value: The value to replace nulls with. subset: Column names to fill. If None, fills all columns. Returns: A new DataFrame with null values replaced. Example:: >>> df.fill_null(0) # fill all columns >>> df.fill_null(0, subset=["a"]) # fill only column "a" """ schema_columns = self._table.get_schema().get_field_names() columns = subset if subset is not None else schema_columns if not columns: return self schema_set = set(schema_columns) for c in columns: if c not in schema_set: raise ValueError(f"Column '{c}' not found in schema") replacement_exprs = [ col(c).if_null(lit(value)).alias(c) for c in columns ] return DataFrame(self._table.add_or_replace_columns(*replacement_exprs))
[docs] def fill_nan(self, value: Any, subset: Optional[List[str]] = None) -> "DataFrame": """ Replace NaN values with a given value in float columns. Does NOT replace null — use :meth:`fill_null` for that. Non-float columns in ``subset`` are silently ignored since NaN only applies to float/double types. Args: value: The value to replace NaN with. subset: Column names to fill. If None, fills all float columns. Returns: A new DataFrame with NaN values replaced. Example:: >>> df.fill_nan(0.0) # fill all float columns >>> df.fill_nan(0.0, subset=["a"]) # fill only float column "a" """ float_columns = set(self._get_float_columns()) schema_columns = set(self._table.get_schema().get_field_names()) if subset is not None: for c in subset: if c not in schema_columns: raise ValueError(f"Column '{c}' not found in schema") columns = [c for c in subset if c in float_columns] else: columns = list(float_columns) if not columns: return self # Use isnan() function (VVR built-in) to detect NaN. # isnan(NULL) returns NULL (falsy), so then() returns col(c) # (the NULL value) — nulls are preserved. replacement_exprs = [ call_sql(f'isnan(`{c}`)').then(lit(value), col(c)).alias(c) for c in columns ] return DataFrame(self._table.add_or_replace_columns(*replacement_exprs))
# ======================== Special Methods ======================== def __getitem__(self, key) -> Union["DataFrame", Expression]: """ Support subscript access. - df["a"]: Return an Expression for column "a" - df[["a", "b"]]: Return a DataFrame with selected columns - df[boolean_expression]: Return a filtered DataFrame """ if isinstance(key, str): return col(key) elif isinstance(key, (list, tuple)): return self.select(*key) elif isinstance(key, Expression): return self.filter(key) else: raise TypeError(f"Unsupported key type: {type(key)}") def __repr__(self) -> str: return f"DataFrame(columns={self.columns})" class GroupedDataFrame: """ A grouped DataFrame for aggregation operations. """ def __init__(self, df: DataFrame, group_keys: List[Expression]): self._df = df self._group_keys = group_keys
[docs] def agg(self, *aggs: Expression) -> DataFrame: """ Apply aggregation expressions. Args: *aggs: Aggregation expressions. Returns: A new DataFrame with aggregation results. Example:: >>> import pyflink.dataframe as pf >>> df = pf.from_records( ... [("A", 1), ("A", 2), ("B", 3)], ... schema=["category", "value"], ... ) >>> # Group by and aggregate >>> result = df.group_by("category").agg( ... pf.col("category"), ... pf.col("value").sum.alias("total") ... ) """ grouped_table = self._df.to_table().group_by(*self._group_keys) return DataFrame(grouped_table.select(*aggs))
def _build_physical_schema_from_resolved_schema( resolved_schema, primary_key: Optional[Union[str, List[str]]] = None, ): from pyflink.table import Schema from pyflink.dataframe.io import _normalize_primary_key physical_columns = [ column for column in resolved_schema.get_columns() if column.is_physical() ] physical_column_names = [ column.get_name() for column in physical_columns ] primary_key_columns = _normalize_primary_key( primary_key, physical_column_names, ) primary_key_set = set(primary_key_columns) schema_builder = Schema.new_builder() for column in physical_columns: data_type = column.get_data_type() if column.get_name() in primary_key_set: data_type = data_type.not_null() schema_builder.column(column.get_name(), data_type) if primary_key_columns: schema_builder.primary_key(*primary_key_columns) return schema_builder def _validate_sls_buckets(value: int) -> None: """Raise ValueError unless ``value`` is a power of 2 in [1, 256].""" if ( isinstance(value, bool) or not isinstance(value, int) or value < 1 or value > 256 ): raise ValueError(f"buckets must be a power of 2 in [1, 256], got: {value}") if value & (value - 1) != 0: raise ValueError(f"buckets must be a power of 2 in [1, 256], got: {value}") def _validate_write_hologres_options( *, write_mode, on_conflict_action, delete_strategy, copy_format, connection_ssl_mode, connection_ssl_root_cert_location, check_and_put_column, insert_conflict_where, primary_key, ) -> None: from pyflink.dataframe.io import ( _HOLOGRES_COPY_FORMATS, _HOLOGRES_DELETE_STRATEGIES, _HOLOGRES_ON_CONFLICT_ACTIONS, _HOLOGRES_SSL_MODES, _HOLOGRES_WRITE_MODES, _validate_allowed_value, _validate_hologres_ssl_pair, ) _validate_allowed_value(write_mode, "write_mode", _HOLOGRES_WRITE_MODES) _validate_allowed_value( on_conflict_action, "on_conflict_action", _HOLOGRES_ON_CONFLICT_ACTIONS, ) _validate_allowed_value( delete_strategy, "delete_strategy", _HOLOGRES_DELETE_STRATEGIES ) _validate_allowed_value( copy_format, "copy_format", _HOLOGRES_COPY_FORMATS ) _validate_allowed_value( connection_ssl_mode, "connection_ssl_mode", _HOLOGRES_SSL_MODES ) _validate_hologres_ssl_pair( connection_ssl_mode, connection_ssl_root_cert_location ) if write_mode in ("AUTO", "INSERT") and copy_format != "binary": raise ValueError( f"copy_format={copy_format!r} requires write_mode to be one of " "{'COPY_STREAM', 'COPY_BULK_LOAD', 'COPY_BULK_LOAD_ON_CONFLICT'}; " "with write_mode='AUTO' or 'INSERT', copy_format must be 'binary' " "(AUTO may resolve to INSERT at runtime)." ) check_and_put_set = check_and_put_column is not None if check_and_put_set and on_conflict_action not in ( "INSERT_OR_UPDATE", "INSERT_OR_REPLACE", ): raise ValueError( "check_and_put_* options require on_conflict_action to be " "'INSERT_OR_UPDATE' or 'INSERT_OR_REPLACE'." ) if check_and_put_set and insert_conflict_where is not None: raise ValueError( "insert_conflict_where conflicts with check_and_put_* options; " "configure only one of them." ) # HologresDynamicTableSink#verifySinkOptions rejects check-and-put # when the resolved write mode is any COPY_*. Reject the explicit # forms here; AUTO is allowed because it usually resolves to INSERT # (it may resolve to COPY_BULK_LOAD* under batch + Hologres 3.1+ — # see the write_hologres docstring for that caveat). if check_and_put_set and write_mode in ( "COPY_STREAM", "COPY_BULK_LOAD", "COPY_BULK_LOAD_ON_CONFLICT", ): raise ValueError( "check_and_put_* options are not supported with " f"write_mode={write_mode!r}; use 'AUTO' or 'INSERT'." ) # DeleteStrategy.requiresPrimaryKey is true for NON_PK_FIELD_TO_NULL # and DELETE_ROW_ON_PK; HologresDynamicTableSink#verifyDeletionStrategy # then refuses to start the job without a Flink primary key. if delete_strategy in ( "NON_PK_FIELD_TO_NULL", "DELETE_ROW_ON_PK", ) and not primary_key: raise ValueError( f"delete_strategy={delete_strategy!r} requires primary_key " "to be set." )