pyflink.dataframe.io.read_paimon#
- read_paimon(path: str | None = None, *, schema: Dict[str, DataType] | None = None, primary_key: str | List[str] | None = None, columns: List[str] | None = None, auto_create: bool = False, file_format: Literal['orc', 'parquet', 'avro', 'lance'] = 'parquet', bucket: int = 1, bucket_key: str | List[str] | None = None, changelog_producer: Literal['none', 'input', 'full-compaction', 'lookup'] = 'none', full_compaction_delta_commits: int | None = None, lookup_cache_max_memory_size: str = '256 MB', merge_engine: Literal['deduplicate', 'partial-update', 'aggregation'] = 'deduplicate', partial_update_ignore_delete: bool = False, ignore_delete: bool = False, partition_default_name: str = '__DEFAULT_PARTITION__', partition_expiration_check_interval: str = '1h', partition_expiration_time: str | None = None, partition_timestamp_formatter: str | None = None, partition_timestamp_pattern: str | None = None, bounded_watermark: int | None = None, mode: Literal['default', 'latest-full', 'latest', 'compacted-full', 'from-timestamp', 'from-snapshot'] = 'default', snapshot_id: int | None = None, timestamp_millis: int | None = None, snapshot_num_retained_max: int = 2147483647, snapshot_num_retained_min: int = 10, snapshot_time_retained: str = '1h', infer_parallelism: bool = True, parallelism: int | None = None, extra_options: Dict[str, Any] | None = None) DataFrame[source]#
Read a Paimon table into a DataFrame.
Uses the Paimon SQL connector under the hood. The function creates a temporary connector descriptor, so
schemais required even when the Paimon files already exist atpath.- Parameters:
path – File-system path of the Paimon table.
schema – Dict of
{column_name: DataType}specifying the table schema. This parameter is required.primary_key – Optional primary key column name or list of column names.
columns – Optional list of column names to read. If None, all columns from the schema are read.
auto_create – Whether to create Paimon table files while constructing the temporary table if the target path has no existing Paimon table. Default is False.
file_format – Format of Paimon data files. One of
"orc","parquet","avro", or"lance". Default is"parquet".bucket – Bucket number for the file store.
bucket_key – Column name or list of column names defining the Paimon distribution policy. Data is assigned to each bucket according to the hash value of these fields. When unset, Paimon uses the primary key, or the full row if no primary key exists.
changelog_producer – Whether and how to double-write to changelog files. Changelog files keep data-change details and can be read directly during stream reads.
"none"produces no changelog file,"input"writes input changes when flushing the memory table,"full-compaction"generates changelog files during full compaction, and"lookup"produces changelog by lookup before committing snapshots.full_compaction_delta_commits – Maximum number of committed snapshots between two full compactions.
lookup_cache_max_memory_size – Memory size used by lookup cache and the lookup changelog producer cache. Default is
"256 MB".merge_engine – Merge engine for tables with primary keys.
"deduplicate"de-duplicates and keeps the last row,"partial-update"updates non-null fields, and"aggregation"aggregates rows with the same primary key.partial_update_ignore_delete – Whether to ignore delete records in partial-update mode.
ignore_delete – Whether to ignore delete records.
partition_default_name – Default partition name used when a dynamic partition column value is null or an empty string.
partition_expiration_check_interval – Interval for checking expired partitions.
partition_expiration_time – Partition retention time. When unset, partitions never expire.
partition_timestamp_formatter – Formatter for converting partition time strings to timestamps.
partition_timestamp_pattern – Pattern for extracting a time string from partition values.
bounded_watermark – Stop producing source records when source watermark exceeds this value.
mode – Source scanning behavior.
"default"lets Paimon infer the actual startup mode:timestamp_millisselects"from-timestamp",snapshot_idselects"from-snapshot", and otherwise it behaves as"latest-full"."latest-full"produces the latest snapshot first and then reads changes in streaming mode."latest"reads latest changes without an initial snapshot in streaming mode."compacted-full"starts from the latest compacted snapshot."from-timestamp"and"from-snapshot"start from the configured timestamp or snapshot. The deprecated"full"mode is intentionally not accepted.snapshot_id – Optional snapshot id used by
mode="from-snapshot".timestamp_millis – Optional timestamp in milliseconds used by
mode="from-timestamp".snapshot_num_retained_max – Maximum number of completed snapshots to retain.
snapshot_num_retained_min – Minimum number of completed snapshots to retain.
snapshot_time_retained – Maximum time to retain completed snapshots.
infer_parallelism – Whether to infer scan source parallelism from the bucket count.
parallelism – Custom scan source parallelism. If unset, the planner derives statement parallelism while considering global configuration.
extra_options – Additional connector options forwarded through to the underlying Paimon connector. This is for options not exposed as named parameters of
read_paimon. If a key matches an option generated by a named parameter of read_paimon, the value inextra_optionstakes precedence."connector"is reserved and must not be supplied.
- Returns:
A new DataFrame representing the Paimon source.
Example:
>>> import pyflink.dataframe as pf >>> >>> order_schema = { ... "order_id": pf.DataType.int64(), ... "user_id": pf.DataType.int64(), ... "status": pf.DataType.string(), ... "event_ts": pf.DataType.timestamp(3), ... } >>> >>> # Read the latest snapshot with a primary key and projection >>> orders = pf.read_paimon( ... "oss://bucket/warehouse/db.db/orders", ... schema=order_schema, ... primary_key="order_id", ... columns=["order_id", "status", "event_ts"], ... ) >>> >>> # Stream changes from a specific snapshot >>> changes = pf.read_paimon( ... "oss://bucket/warehouse/db.db/orders", ... schema=order_schema, ... primary_key="order_id", ... changelog_producer="lookup", ... mode="from-snapshot", ... snapshot_id=42, ... ) >>> >>> # Stream from a timestamp in milliseconds with explicit parallelism >>> changes = pf.read_paimon( ... "oss://bucket/warehouse/db.db/orders", ... schema=order_schema, ... primary_key="order_id", ... mode="from-timestamp", ... timestamp_millis=1710000000000, ... bounded_watermark=1710003600000, ... infer_parallelism=False, ... parallelism=4, ... ) >>> >>> # Read a table with bucket distribution >>> events = pf.read_paimon( ... "oss://bucket/warehouse/db.db/events", ... schema={ ... "dt": pf.DataType.string(), ... "user_id": pf.DataType.int64(), ... "payload": pf.DataType.string(), ... }, ... bucket=16, ... bucket_key=["dt", "user_id"], ... extra_options={ ... "snapshot.time-retained": "12h", ... }, ... )