Skip to main content
Ctrl+K
PyFlink 1.20+vvr.11.7.dev0 documentation - Home PyFlink 1.20+vvr.11.7.dev0 documentation - Home
  • API Reference
  • Examples
  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
  • PyFlink DataStream
  • PyFlink DataFrame
    • DataFrame
    • DataFrame Creation
    • Input/Output
    • SQL
    • DataType
    • User Defined Functions
    • Configuration
    • GPU Support
    • AI / LLM
  • PyFlink Common
  • API Reference
  • PyFlink DataFrame
  • DataFrame
  • pyflink.dataframe.dataframe.DataFrame.write_odps

pyflink.dataframe.dataframe.DataFrame.write_odps#

DataFrame.write_odps(endpoint: str, *, tunnel_endpoint: str | None = None, project: str, schema_name: str | None = None, table_name: str, access_id: str, access_key: str, partition: str | Tuple[str, Any] | List[Tuple[str, Any]] | List[str] | None = None, compress_algorithm: Literal['RAW', 'ZLIB', 'SNAPPY'] = 'SNAPPY', quota_name: str | None = None, use_stream_tunnel: bool = False, flush_interval_ms: int = 30000, batch_size: int = 67108864, num_flush_threads: int = 1, slot_num: int = 0, dynamic_partition_limit: int = 100, retry_times: int = 3, sleep_millis: int = 1000, enable_upsert: bool = False, upsert_async_commit: bool = False, upsert_commit_timeout_ms: int = 120000, operation: Literal['insert', 'upsert'] = 'insert', parallelism: int | None = None, file_cached_enable: bool = False, file_cached_writer_num: int = 16, bucket_check_interval: int = 60000, file_cached_rolling_max_size: str = '16mb', file_cached_memory: str = '64mb', file_cached_memory_segment_size: str = '128kb', file_cached_flush_always: bool = True, file_cached_write_max_retries: int = 3, upsert_writer_max_retries: int = 3, upsert_writer_buffer_size: str = '64mb', upsert_writer_bucket_buffer_size: str = '1mb', upsert_write_bucket_num: int | None = None, upsert_write_slot_num: int = 1, upsert_commit_max_retries: int = 3, upsert_commit_thread_num: int = 16, upsert_commit_timeout: int = 600, upsert_flush_concurrent: int = 2, insert_commit_thread_num: int = 16, insert_arrow_writer_enable: bool = False, insert_arrow_writer_batch_size: int = 512, insert_arrow_writer_flush_interval: int = 100000, insert_writer_buffer_size: str = '64mb', upsert_partial_column_enable: bool = False) → None[source]#

Write the DataFrame to a MaxCompute (ODPS) table.

Uses the ODPS SQL connector under the hood. The DataFrame schema is derived from the current DataFrame, and connector options are exposed with Pythonic snake_case names in the same order as the ODPS documentation.

Parameters:
  • endpoint – MaxCompute endpoint.

  • tunnel_endpoint – MaxCompute Tunnel endpoint. If unset, MaxCompute allocates tunnel connections through Server Load Balancer (SLB).

  • project – MaxCompute project name.

  • schema_name – Required only when the MaxCompute schema feature is enabled. Set this to the table’s schema name.

  • table_name – MaxCompute table name.

  • access_id – AccessKey ID used to access MaxCompute.

  • access_key – AccessKey secret used to access MaxCompute.

  • partition –

    Partition name in the MaxCompute table.

    Examples:

    • To write to fixed partition column dt with value

      20220901, pass ("dt", "20220901").

    • For a three-level partition table dt/hh/mm,

      to write to dt=20220901,hh=08,mm=10, pass [("dt", "20220901"), ("hh", "08"), ("mm", "10")].

    • To write dynamic partitions from a single partition column

      partition_name, pass "partition_name".

    • For multi-level dynamic partition columns dt/hh/mm, pass

      ["dt", "hh", "mm"].

  • compress_algorithm – Compression algorithm for MaxCompute Tunnel. Valid values are "RAW" (no compression), "ZLIB", and "SNAPPY". "RAW" cannot be used when enable_upsert is true.

  • quota_name – Quota name for exclusive MaxCompute Tunnel resource groups. If specified, remove tunnel_endpoint; otherwise the tunnel specified by tunnel_endpoint takes precedence.

  • use_stream_tunnel – Use MaxCompute Streaming Tunnel instead of Batch Tunnel. True selects Streaming Tunnel and False selects Batch Tunnel.

  • flush_interval_ms – Flush interval for the tunnel writer buffer, in milliseconds. For Streaming Tunnel, flushed data is immediately available. For Batch Tunnel, data becomes available only after checkpointing; set to 0 to disable scheduled flushing. Triggered when either flush_interval_ms or batch_size is reached.

  • batch_size – Buffer size in bytes. Data is flushed when the buffer reaches this size. Triggered when either batch_size or flush_interval_ms is reached.

  • num_flush_threads – Number of threads used to flush the tunnel writer buffer. Values greater than 1 allow concurrent flushing across partitions.

  • slot_num – Number of Tunnel slots for receiving data from Flink.

  • dynamic_partition_limit – Maximum number of dynamic partitions written between two checkpoints. Writing to many partitions increases load on MaxCompute and slows checkpointing. Increase this only when your workload requires it.

  • retry_times – Maximum retries for MaxCompute server requests, including session creation, submission, and flush failures.

  • sleep_millis – Retry interval in milliseconds.

  • enable_upsert – Use MaxCompute Upsert Tunnel. True processes INSERT, UPDATE_AFTER, and DELETE records. False uses the tunnel specified by use_stream_tunnel. If session commits in upsert mode encounter errors or long-running faults, set sink operator parallelism to 10 or fewer.

  • upsert_async_commit – Use asynchronous mode when committing upsert sessions. Async mode reduces commit time, but committed data is not immediately queryable.

  • upsert_commit_timeout_ms – Timeout for upsert session commits, in milliseconds.

  • operation – Write mode for a Delta table. "insert" is append mode; "upsert" is update mode and requires upsert_write_bucket_num.

  • parallelism – Write parallelism for a Delta table. Defaults to upstream parallelism. write.bucket.num must be an integral multiple of this value for optimal write performance and memory efficiency.

  • file_cached_enable – Enable file cache mode when writing to dynamic partitions of a Delta table. This reduces small files written to the server but increases write latency. Enable when the sink has high parallelism.

  • file_cached_writer_num – Concurrent upload threads per task in file cache mode. Avoid setting this too high because writing to many partitions simultaneously can cause OOM errors. Effective only when file_cached_enable is true.

  • bucket_check_interval – File size check interval in file cache mode, in milliseconds. Effective only when file_cached_enable is true.

  • file_cached_rolling_max_size – Maximum size of a single cached file. When exceeded, data is uploaded to the server. Effective only when file_cached_enable is true.

  • file_cached_memory – Maximum off-heap memory for file writes in file cache mode. Effective only when file_cached_enable is true.

  • file_cached_memory_segment_size – Buffer segment size for file writes in file cache mode. Effective only when file_cached_enable is true.

  • file_cached_flush_always – Whether to use the cache when writing files in file cache mode. Effective only when file_cached_enable is true.

  • file_cached_write_max_retries – Retry count for data uploads in file cache mode. Effective only when file_cached_enable is true.

  • upsert_writer_max_retries – Maximum retries for writing to a bucket in an Upsert Writer session.

  • upsert_writer_buffer_size – Total buffer size across all buckets in an Upsert Writer session. Data is flushed when the total reaches this threshold. Increase for better write efficiency; decrease if writing to many partitions causes OOM errors.

  • upsert_writer_bucket_buffer_size – Per-bucket buffer size in an Upsert Writer session. Decrease if Flink server memory is insufficient.

  • upsert_write_bucket_num – Number of buckets for the target Delta table. Must match write.bucket.num configured on the Delta table.

  • upsert_write_slot_num – Tunnel slots per upsert session.

  • upsert_commit_max_retries – Maximum retries for upsert session commits.

  • upsert_commit_thread_num – Parallelism for upsert session commits. Avoid large values because excessive concurrent commits increase resource consumption and can cause performance issues.

  • upsert_commit_timeout – Upsert session commit timeout in seconds.

  • upsert_flush_concurrent – Maximum concurrent bucket flushes per partition. Each bucket flush occupies a Tunnel slot.

  • insert_commit_thread_num – Parallelism for insert session commits.

  • insert_arrow_writer_enable – Use the Arrow format for inserts.

  • insert_arrow_writer_batch_size – Maximum rows per Arrow-format batch.

  • insert_arrow_writer_flush_interval – Writer flush interval in milliseconds.

  • insert_writer_buffer_size – Cache size for the buffered writer.

  • upsert_partial_column_enable – Update only specified columns (partial column update). Applies only to Delta table sinks. When true, if a record with the same primary key exists, specified non-null fields are overwritten. If no matching record exists, a new record is inserted with new values for specified columns and null for all unspecified columns.

Example:

>>> import pyflink.dataframe as pf
>>> df = pf.from_records([(1, "a"), (2, "b")],
...                      schema=["id", "name"])
>>> df.write_odps(
...     "http://service.cn-hangzhou.maxcompute.aliyun.com/api",
...     project="my_project",
...     table_name="events",
...     access_id="${secret_values.ak_id}",
...     access_key="${secret_values.ak_secret}",
...     partition="ds=20260428",
... )

previous

pyflink.dataframe.dataframe.DataFrame.write_kafka

next

pyflink.dataframe.dataframe.DataFrame.write_paimon

On this page
  • DataFrame.write_odps()

This Page

  • Show Source

Created using Sphinx 7.4.7.

Built with the PyData Sphinx Theme 0.16.1.