Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • Input/Output
  • pyflink.dataframe.io.read_odps

pyflink.dataframe.io.read_odps#

read_odps(endpoint: str, *, schema: Dict[str, DataType] | None = None, columns: List[str] | None = None, tunnel_endpoint: str | None = None, project: str, schema_name: str | None = None, table_name: str, access_id: str, access_key: str, partition: str | Tuple[str, Any] | List[Tuple[str, Any]] | None = None, compress_algorithm: Literal['RAW', 'ZLIB', 'SNAPPY'] = 'SNAPPY', quota_name: str | None = None, max_partition_count: int = 100, use_arrow: bool = False, split_size: str = '256mb', compress_codec: Literal['', 'ZSTD', 'LZ4_FRAME'] = '', dynamic_load_balance: bool = False, start_partition: str | Tuple[str, Any] | List[Tuple[str, Any]] | None = None, subscribe_interval_in_sec: int = 30, modified_table_operation: Literal['NONE', 'SKIP'] = 'NONE', cache: Literal['ALL'] = 'ALL', cache_size: int = 100000, cache_ttl_ms: int = 9223372036854775807, cache_reload_time_blacklist: str | None = None, max_load_retries: int = 10) → DataFrame[source]#

Read a MaxCompute (ODPS) table into a DataFrame.

Uses the ODPS SQL connector under the hood. The connector options are exposed with Pythonic snake_case names while preserving the order of the ODPS documentation.

Parameters:
  • endpoint – MaxCompute endpoint.

  • schema – Dict of {column_name: DataType} specifying the DataFrame schema. This parameter is required.

  • columns – Optional list of column names to read. If None, all columns from schema are read.

  • tunnel_endpoint – MaxCompute Tunnel endpoint. If unset, MaxCompute allocates tunnel connections through Server Load Balancer (SLB).

  • project – MaxCompute project name.

  • schema_name – Required only when the MaxCompute schema feature is enabled. Set this to the table’s schema name.

  • table_name – MaxCompute table name.

  • access_id – AccessKey ID used to access MaxCompute.

  • access_key – AccessKey secret used to access MaxCompute.

  • partition –

    Partition name in the MaxCompute table. Not required for non-partitioned tables or incremental sources.

    Examples:

    • To read partition column dt with value 20220901, pass

      ("dt", "20220901").

    • To read source partitions whose dt values start with

      202209, pass ("dt", "202209*").

    • To read all source partitions for column dt, pass

      ("dt", "*").

    • For a three-level partition table dt/hh/mm, to

      read dt=20220901,hh=08 and any mm value, pass [("dt", "20220901"), ("hh", "08")] or [("dt", "20220901"), ("hh", "08"), ("mm", "*")].

  • compress_algorithm – Compression algorithm for MaxCompute Tunnel. Valid values are "RAW" (no compression), "ZLIB", and "SNAPPY". "SNAPPY" improves throughput by approximately 50% compared to "ZLIB" in test scenarios.

  • quota_name – Quota name for exclusive MaxCompute Tunnel resource groups. If specified, remove tunnel_endpoint; otherwise the tunnel specified by tunnel_endpoint takes precedence.

  • max_partition_count – Maximum number of partitions to read from. Reading from too many partitions can overload MaxCompute and slow job startup. Increase this only when your workload requires it.

  • use_arrow – Read data using the Arrow format, which calls the MaxCompute storage API. Batch deployments only.

  • split_size – Amount of data pulled per split when using the Arrow format. Batch deployments only.

  • compress_codec – Compression algorithm when reading with the Arrow format. Valid values are "" (none), "ZSTD", and "LZ4_FRAME". Specifying a codec improves throughput over no compression. Batch deployments only.

  • dynamic_load_balance – Enable dynamic shard allocation to improve processing performance and reduce overall read time. This may cause data skew because different operators read inconsistent amounts of data. Batch deployments only.

  • start_partition – Start partition for incremental reads. Setting this enables incremental source mode. When specified, partition is ignored. For multi-level partitioned tables, configure partition column values in descending order by level. For example, for a three-level partitioned table with partition key columns dt, hh, and mm, to start from dt=20220901, hh=08, and mm=10, set this parameter to "dt=20220901,hh=08,mm=10" or [("dt", "20220901"), ("hh", "08"), ("mm", "10")]. For a single partition key, you can also use ("dt", "20220901").

  • subscribe_interval_in_sec – Polling interval in seconds.

  • modified_table_operation – Action when a partition is modified during reading. "NONE" requires updating start_partition to skip the unavailable partition and restarting without state; "SKIP" automatically skips the unavailable partition when resuming. In either mode, data already read from the modified partition is retained and unread data is discarded.

  • cache – Cache policy. Must be set to "ALL". All dimension table data is loaded into cache before the deployment runs, lookups search the cache only, and the cache reloads after entries expire.

  • cache_size – Maximum rows to cache. Large caches consume significant JVM heap memory and slow startup and cache refresh. Increase this only when your workload requires it.

  • cache_ttl_ms – Cache timeout in milliseconds.

  • cache_reload_time_blacklist – Time periods during which the cache is not refreshed. Use this during peak traffic periods to prevent deployment instability from cache refreshes.

  • max_load_retries – Maximum retries for the initial cache load on deployment startup. If retries are exhausted, the deployment fails.

Returns:

A new DataFrame representing the ODPS source.

Example:

>>> import pyflink.dataframe as pf
>>> df = pf.read_odps(
...     "http://service.cn-hangzhou.maxcompute.aliyun.com/api",
...     schema={
...         "id": pf.DataType.int64(),
...         "payload": pf.DataType.string(),
...     },
...     project="my_project",
...     table_name="events",
...     access_id="${secret_values.ak_id}",
...     access_key="${secret_values.ak_secret}",
...     partition="ds=20260428",
... )

previous

pyflink.dataframe.io.read_kafka

next

pyflink.dataframe.io.read_paimon

On this page
  • read_odps()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.