pyflink.dataframe.dataframe.DataFrame.write_paimon#
- DataFrame.write_paimon(path: str | None = None, *, primary_key: str | List[str] | None = None, auto_create: bool = False, file_format: Literal['orc', 'parquet', 'avro', 'lance'] = 'parquet', bucket: int = 1, bucket_key: str | List[str] | None = None, changelog_producer: Literal['none', 'input', 'full-compaction', 'lookup'] = 'none', full_compaction_delta_commits: int | None = None, lookup_cache_max_memory_size: str = '256 MB', merge_engine: Literal['deduplicate', 'partial-update', 'aggregation'] = 'deduplicate', partial_update_ignore_delete: bool = False, ignore_delete: bool = False, partition_default_name: str = '__DEFAULT_PARTITION__', partition_expiration_check_interval: str = '1h', partition_expiration_time: str | None = None, partition_timestamp_formatter: str | None = None, partition_timestamp_pattern: str | None = None, snapshot_num_retained_max: int = 2147483647, snapshot_num_retained_min: int = 10, snapshot_time_retained: str = '1h', write_mode: Literal['change-log', 'append-only'] = 'change-log', parallelism: int | None = None, clustering_by_columns: str | List[str] | None = None, delete_strategy: Literal['NONE', 'IGNORE_DELETE', 'NON_PK_FIELD_TO_NULL', 'DELETE_ROW_ON_PK', 'CHANGELOG_STANDARD'] = 'NONE', extra_options: Dict[str, Any] | None = None) None[source]#
Write the DataFrame to a Paimon table.
Uses the Paimon SQL connector under the hood. The sink schema is built from the DataFrame’s physical columns and the connector options are exposed with Pythonic snake_case names.
- Parameters:
path – File-system path of the Paimon table.
primary_key – Optional primary key column name or list of column names. Set this when the sink table should include a primary key constraint.
auto_create – Whether to create the underlying storage when writing the table. Default is False.
file_format – Format of Paimon data files. One of
"orc","parquet","avro", or"lance". Default is"parquet".bucket – Bucket number for the file store.
bucket_key – Column name or list of column names defining the Paimon distribution policy. Data is assigned to each bucket according to the hash value of these fields. When unset, Paimon uses the primary key, or the full row if no primary key exists.
changelog_producer – Whether and how to double-write to changelog files. Changelog files keep data-change details and can be read directly during stream reads.
"none"produces no changelog file,"input"writes input changes when flushing the memory table,"full-compaction"generates changelog files during full compaction, and"lookup"produces changelog by lookup before committing snapshots.full_compaction_delta_commits – Maximum number of committed snapshots between two full compactions.
lookup_cache_max_memory_size – Memory size used by lookup cache and the lookup changelog producer cache. Default is
"256 MB".merge_engine – Merge engine for tables with primary keys.
"deduplicate"de-duplicates and keeps the last row,"partial-update"updates non-null fields, and"aggregation"aggregates rows with the same primary key.partial_update_ignore_delete – Whether to ignore delete records in partial-update mode.
ignore_delete – Whether to ignore delete records.
partition_default_name – Default partition name used when a dynamic partition column value is null or an empty string.
partition_expiration_check_interval – Interval for checking expired partitions.
partition_expiration_time – Partition retention time. When unset, partitions never expire.
partition_timestamp_formatter – Formatter for converting partition time strings to timestamps.
partition_timestamp_pattern – Pattern for extracting a time string from partition values.
snapshot_num_retained_max – Maximum number of completed snapshots to retain.
snapshot_num_retained_min – Minimum number of completed snapshots to retain.
snapshot_time_retained – Maximum time to retain completed snapshots.
write_mode – Paimon table write mode.
"append-only"accepts only append-only inserts and does not perform deduplication or primary key constraints."change-log"accepts insert, delete, and update operations.parallelism – Custom sink parallelism. If unset, the planner derives statement parallelism while considering global configuration.
clustering_by_columns – Column name or list of column names used for clustering append-only table writes in batch jobs.
delete_strategy – Validation strategy for sink retraction messages. One of
"NONE","IGNORE_DELETE","NON_PK_FIELD_TO_NULL","DELETE_ROW_ON_PK", or"CHANGELOG_STANDARD".extra_options – Additional connector options forwarded through to the underlying Paimon connector. This is for options not exposed as named parameters of
write_paimon. If a key matches an option generated by a named parameter of write_paimon, the value inextra_optionstakes precedence."connector"is reserved and must not be supplied.
Example:
>>> import pyflink.dataframe as pf >>> >>> events = pf.from_records( ... [("2026-05-27", 1, "login"), ("2026-05-27", 2, "pay")], ... schema=["dt", "user_id", "payload"], ... ) >>> >>> # Append-only table with explicit bucket distribution >>> events.write_paimon( ... "oss://bucket/warehouse/db.db/events", ... auto_create=True, ... write_mode="append-only", ... file_format="orc", ... bucket=16, ... bucket_key=["dt", "user_id"], ... parallelism=4, ... ) >>> >>> # Batch append-only write with clustering >>> events.write_paimon( ... "oss://bucket/warehouse/db.db/events_clustered", ... auto_create=True, ... write_mode="append-only", ... bucket_key=["dt", "user_id"], ... clustering_by_columns=["dt", "user_id"], ... ) >>> >>> order_schema = { ... "order_id": pf.DataType.int64(), ... "amount": pf.DataType.float64(), ... "status": pf.DataType.string(), ... "event_ts": pf.DataType.timestamp(3), ... } >>> orders = pf.read_paimon( ... "oss://bucket/warehouse/db.db/orders", ... schema=order_schema, ... primary_key="order_id", ... ) >>> >>> # Change-log sink with lookup changelog production >>> orders.write_paimon( ... "oss://bucket/warehouse/db.db/orders", ... primary_key="order_id", ... auto_create=True, ... bucket_key=["order_id"], ... changelog_producer="lookup", ... merge_engine="deduplicate", ... delete_strategy="CHANGELOG_STANDARD", ... snapshot_num_retained_min=5, ... snapshot_time_retained="12h", ... ) >>> >>> # Partial-update sink that ignores delete records >>> orders.write_paimon( ... "oss://bucket/warehouse/db.db/orders_partial", ... primary_key="order_id", ... auto_create=True, ... merge_engine="partial-update", ... ignore_delete=True, ... lookup_cache_max_memory_size="1 GB", ... ) >>> >>> metrics_schema = { ... "product_id": pf.DataType.int64(), ... "price": pf.DataType.float64(), ... "sales": pf.DataType.int64(), ... } >>> metrics = pf.read_paimon( ... "oss://bucket/warehouse/db.db/product_metrics", ... schema=metrics_schema, ... primary_key="product_id", ... ) >>> >>> # Aggregation merge engine with original Paimon option keys >>> metrics.write_paimon( ... "oss://bucket/warehouse/db.db/product_metrics_agg", ... primary_key="product_id", ... auto_create=True, ... merge_engine="aggregation", ... extra_options={ ... "fields.price.aggregate-function": "max", ... "fields.sales.aggregate-function": "sum", ... }, ... )