Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • Input/Output
  • pyflink.dataframe.io.read_paimon

pyflink.dataframe.io.read_paimon#

read_paimon(path: str | None = None, *, schema: Dict[str, DataType] | None = None, primary_key: str | List[str] | None = None, columns: List[str] | None = None, auto_create: bool = False, file_format: Literal['orc', 'parquet', 'avro', 'lance'] = 'parquet', bucket: int = 1, bucket_key: str | List[str] | None = None, changelog_producer: Literal['none', 'input', 'full-compaction', 'lookup'] = 'none', full_compaction_delta_commits: int | None = None, lookup_cache_max_memory_size: str = '256 MB', merge_engine: Literal['deduplicate', 'partial-update', 'aggregation'] = 'deduplicate', partial_update_ignore_delete: bool = False, ignore_delete: bool = False, partition_default_name: str = '__DEFAULT_PARTITION__', partition_expiration_check_interval: str = '1h', partition_expiration_time: str | None = None, partition_timestamp_formatter: str | None = None, partition_timestamp_pattern: str | None = None, bounded_watermark: int | None = None, mode: Literal['default', 'latest-full', 'latest', 'compacted-full', 'from-timestamp', 'from-snapshot'] = 'default', snapshot_id: int | None = None, timestamp_millis: int | None = None, snapshot_num_retained_max: int = 2147483647, snapshot_num_retained_min: int = 10, snapshot_time_retained: str = '1h', infer_parallelism: bool = True, parallelism: int | None = None, extra_options: Dict[str, Any] | None = None) → DataFrame[source]#

Read a Paimon table into a DataFrame.

Uses the Paimon SQL connector under the hood. The function creates a temporary connector descriptor, so schema is required even when the Paimon files already exist at path.

Parameters:
  • path – File-system path of the Paimon table.

  • schema – Dict of {column_name: DataType} specifying the table schema. This parameter is required.

  • primary_key – Optional primary key column name or list of column names.

  • columns – Optional list of column names to read. If None, all columns from the schema are read.

  • auto_create – Whether to create Paimon table files while constructing the temporary table if the target path has no existing Paimon table. Default is False.

  • file_format – Format of Paimon data files. One of "orc", "parquet", "avro", or "lance". Default is "parquet".

  • bucket – Bucket number for the file store.

  • bucket_key – Column name or list of column names defining the Paimon distribution policy. Data is assigned to each bucket according to the hash value of these fields. When unset, Paimon uses the primary key, or the full row if no primary key exists.

  • changelog_producer – Whether and how to double-write to changelog files. Changelog files keep data-change details and can be read directly during stream reads. "none" produces no changelog file, "input" writes input changes when flushing the memory table, "full-compaction" generates changelog files during full compaction, and "lookup" produces changelog by lookup before committing snapshots.

  • full_compaction_delta_commits – Maximum number of committed snapshots between two full compactions.

  • lookup_cache_max_memory_size – Memory size used by lookup cache and the lookup changelog producer cache. Default is "256 MB".

  • merge_engine – Merge engine for tables with primary keys. "deduplicate" de-duplicates and keeps the last row, "partial-update" updates non-null fields, and "aggregation" aggregates rows with the same primary key.

  • partial_update_ignore_delete – Whether to ignore delete records in partial-update mode.

  • ignore_delete – Whether to ignore delete records.

  • partition_default_name – Default partition name used when a dynamic partition column value is null or an empty string.

  • partition_expiration_check_interval – Interval for checking expired partitions.

  • partition_expiration_time – Partition retention time. When unset, partitions never expire.

  • partition_timestamp_formatter – Formatter for converting partition time strings to timestamps.

  • partition_timestamp_pattern – Pattern for extracting a time string from partition values.

  • bounded_watermark – Stop producing source records when source watermark exceeds this value.

  • mode – Source scanning behavior. "default" lets Paimon infer the actual startup mode: timestamp_millis selects "from-timestamp", snapshot_id selects "from-snapshot", and otherwise it behaves as "latest-full". "latest-full" produces the latest snapshot first and then reads changes in streaming mode. "latest" reads latest changes without an initial snapshot in streaming mode. "compacted-full" starts from the latest compacted snapshot. "from-timestamp" and "from-snapshot" start from the configured timestamp or snapshot. The deprecated "full" mode is intentionally not accepted.

  • snapshot_id – Optional snapshot id used by mode="from-snapshot".

  • timestamp_millis – Optional timestamp in milliseconds used by mode="from-timestamp".

  • snapshot_num_retained_max – Maximum number of completed snapshots to retain.

  • snapshot_num_retained_min – Minimum number of completed snapshots to retain.

  • snapshot_time_retained – Maximum time to retain completed snapshots.

  • infer_parallelism – Whether to infer scan source parallelism from the bucket count.

  • parallelism – Custom scan source parallelism. If unset, the planner derives statement parallelism while considering global configuration.

  • extra_options – Additional connector options forwarded through to the underlying Paimon connector. This is for options not exposed as named parameters of read_paimon. If a key matches an option generated by a named parameter of read_paimon, the value in extra_options takes precedence. "connector" is reserved and must not be supplied.

Returns:

A new DataFrame representing the Paimon source.

Example:

>>> import pyflink.dataframe as pf
>>>
>>> order_schema = {
...     "order_id": pf.DataType.int64(),
...     "user_id": pf.DataType.int64(),
...     "status": pf.DataType.string(),
...     "event_ts": pf.DataType.timestamp(3),
... }
>>>
>>> # Read the latest snapshot with a primary key and projection
>>> orders = pf.read_paimon(
...     "oss://bucket/warehouse/db.db/orders",
...     schema=order_schema,
...     primary_key="order_id",
...     columns=["order_id", "status", "event_ts"],
... )
>>>
>>> # Stream changes from a specific snapshot
>>> changes = pf.read_paimon(
...     "oss://bucket/warehouse/db.db/orders",
...     schema=order_schema,
...     primary_key="order_id",
...     changelog_producer="lookup",
...     mode="from-snapshot",
...     snapshot_id=42,
... )
>>>
>>> # Stream from a timestamp in milliseconds with explicit parallelism
>>> changes = pf.read_paimon(
...     "oss://bucket/warehouse/db.db/orders",
...     schema=order_schema,
...     primary_key="order_id",
...     mode="from-timestamp",
...     timestamp_millis=1710000000000,
...     bounded_watermark=1710003600000,
...     infer_parallelism=False,
...     parallelism=4,
... )
>>>
>>> # Read a table with bucket distribution
>>> events = pf.read_paimon(
...     "oss://bucket/warehouse/db.db/events",
...     schema={
...         "dt": pf.DataType.string(),
...         "user_id": pf.DataType.int64(),
...         "payload": pf.DataType.string(),
...     },
...     bucket=16,
...     bucket_key=["dt", "user_id"],
...     extra_options={
...         "snapshot.time-retained": "12h",
...     },
... )

previous

pyflink.dataframe.io.read_odps

next

pyflink.dataframe.io.read_sls

On this page
  • read_paimon()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.