pyflink.dataframe.io.read_odps#
- read_odps(endpoint: str, *, schema: Dict[str, DataType] | None = None, columns: List[str] | None = None, tunnel_endpoint: str | None = None, project: str, schema_name: str | None = None, table_name: str, access_id: str, access_key: str, partition: str | Tuple[str, Any] | List[Tuple[str, Any]] | None = None, compress_algorithm: Literal['RAW', 'ZLIB', 'SNAPPY'] = 'SNAPPY', quota_name: str | None = None, max_partition_count: int = 100, use_arrow: bool = False, split_size: str = '256mb', compress_codec: Literal['', 'ZSTD', 'LZ4_FRAME'] = '', dynamic_load_balance: bool = False, start_partition: str | Tuple[str, Any] | List[Tuple[str, Any]] | None = None, subscribe_interval_in_sec: int = 30, modified_table_operation: Literal['NONE', 'SKIP'] = 'NONE', cache: Literal['ALL'] = 'ALL', cache_size: int = 100000, cache_ttl_ms: int = 9223372036854775807, cache_reload_time_blacklist: str | None = None, max_load_retries: int = 10) DataFrame[source]#
Read a MaxCompute (ODPS) table into a DataFrame.
Uses the ODPS SQL connector under the hood. The connector options are exposed with Pythonic snake_case names while preserving the order of the ODPS documentation.
- Parameters:
endpoint – MaxCompute endpoint.
schema – Dict of
{column_name: DataType}specifying the DataFrame schema. This parameter is required.columns – Optional list of column names to read. If None, all columns from
schemaare read.tunnel_endpoint – MaxCompute Tunnel endpoint. If unset, MaxCompute allocates tunnel connections through Server Load Balancer (SLB).
project – MaxCompute project name.
schema_name – Required only when the MaxCompute schema feature is enabled. Set this to the table’s schema name.
table_name – MaxCompute table name.
access_id – AccessKey ID used to access MaxCompute.
access_key – AccessKey secret used to access MaxCompute.
partition –
Partition name in the MaxCompute table. Not required for non-partitioned tables or incremental sources.
Examples:
- To read partition column
dtwith value20220901, pass ("dt", "20220901").
- To read partition column
- To read source partitions whose
dtvalues start with 202209, pass("dt", "202209*").
- To read source partitions whose
- To read all source partitions for column
dt, pass ("dt", "*").
- To read all source partitions for column
- For a three-level partition table
dt/hh/mm, to read
dt=20220901,hh=08and anymmvalue, pass[("dt", "20220901"), ("hh", "08")]or[("dt", "20220901"), ("hh", "08"), ("mm", "*")].
- For a three-level partition table
compress_algorithm – Compression algorithm for MaxCompute Tunnel. Valid values are
"RAW"(no compression),"ZLIB", and"SNAPPY"."SNAPPY"improves throughput by approximately 50% compared to"ZLIB"in test scenarios.quota_name – Quota name for exclusive MaxCompute Tunnel resource groups. If specified, remove
tunnel_endpoint; otherwise the tunnel specified bytunnel_endpointtakes precedence.max_partition_count – Maximum number of partitions to read from. Reading from too many partitions can overload MaxCompute and slow job startup. Increase this only when your workload requires it.
use_arrow – Read data using the Arrow format, which calls the MaxCompute storage API. Batch deployments only.
split_size – Amount of data pulled per split when using the Arrow format. Batch deployments only.
compress_codec – Compression algorithm when reading with the Arrow format. Valid values are
""(none),"ZSTD", and"LZ4_FRAME". Specifying a codec improves throughput over no compression. Batch deployments only.dynamic_load_balance – Enable dynamic shard allocation to improve processing performance and reduce overall read time. This may cause data skew because different operators read inconsistent amounts of data. Batch deployments only.
start_partition – Start partition for incremental reads. Setting this enables incremental source mode. When specified,
partitionis ignored. For multi-level partitioned tables, configure partition column values in descending order by level. For example, for a three-level partitioned table with partition key columnsdt,hh, andmm, to start fromdt=20220901,hh=08, andmm=10, set this parameter to"dt=20220901,hh=08,mm=10"or[("dt", "20220901"), ("hh", "08"), ("mm", "10")]. For a single partition key, you can also use("dt", "20220901").subscribe_interval_in_sec – Polling interval in seconds.
modified_table_operation – Action when a partition is modified during reading.
"NONE"requires updatingstart_partitionto skip the unavailable partition and restarting without state;"SKIP"automatically skips the unavailable partition when resuming. In either mode, data already read from the modified partition is retained and unread data is discarded.cache – Cache policy. Must be set to
"ALL". All dimension table data is loaded into cache before the deployment runs, lookups search the cache only, and the cache reloads after entries expire.cache_size – Maximum rows to cache. Large caches consume significant JVM heap memory and slow startup and cache refresh. Increase this only when your workload requires it.
cache_ttl_ms – Cache timeout in milliseconds.
cache_reload_time_blacklist – Time periods during which the cache is not refreshed. Use this during peak traffic periods to prevent deployment instability from cache refreshes.
max_load_retries – Maximum retries for the initial cache load on deployment startup. If retries are exhausted, the deployment fails.
- Returns:
A new DataFrame representing the ODPS source.
Example:
>>> import pyflink.dataframe as pf >>> df = pf.read_odps( ... "http://service.cn-hangzhou.maxcompute.aliyun.com/api", ... schema={ ... "id": pf.DataType.int64(), ... "payload": pf.DataType.string(), ... }, ... project="my_project", ... table_name="events", ... access_id="${secret_values.ak_id}", ... access_key="${secret_values.ak_secret}", ... partition="ds=20260428", ... )