pyflink.dataframe.dataframe.DataFrame.write_odps#
- DataFrame.write_odps(endpoint: str, *, tunnel_endpoint: str | None = None, project: str, schema_name: str | None = None, table_name: str, access_id: str, access_key: str, partition: str | Tuple[str, Any] | List[Tuple[str, Any]] | List[str] | None = None, compress_algorithm: Literal['RAW', 'ZLIB', 'SNAPPY'] = 'SNAPPY', quota_name: str | None = None, use_stream_tunnel: bool = False, flush_interval_ms: int = 30000, batch_size: int = 67108864, num_flush_threads: int = 1, slot_num: int = 0, dynamic_partition_limit: int = 100, retry_times: int = 3, sleep_millis: int = 1000, enable_upsert: bool = False, upsert_async_commit: bool = False, upsert_commit_timeout_ms: int = 120000, operation: Literal['insert', 'upsert'] = 'insert', parallelism: int | None = None, file_cached_enable: bool = False, file_cached_writer_num: int = 16, bucket_check_interval: int = 60000, file_cached_rolling_max_size: str = '16mb', file_cached_memory: str = '64mb', file_cached_memory_segment_size: str = '128kb', file_cached_flush_always: bool = True, file_cached_write_max_retries: int = 3, upsert_writer_max_retries: int = 3, upsert_writer_buffer_size: str = '64mb', upsert_writer_bucket_buffer_size: str = '1mb', upsert_write_bucket_num: int | None = None, upsert_write_slot_num: int = 1, upsert_commit_max_retries: int = 3, upsert_commit_thread_num: int = 16, upsert_commit_timeout: int = 600, upsert_flush_concurrent: int = 2, insert_commit_thread_num: int = 16, insert_arrow_writer_enable: bool = False, insert_arrow_writer_batch_size: int = 512, insert_arrow_writer_flush_interval: int = 100000, insert_writer_buffer_size: str = '64mb', upsert_partial_column_enable: bool = False) None[source]#
Write the DataFrame to a MaxCompute (ODPS) table.
Uses the ODPS SQL connector under the hood. The DataFrame schema is derived from the current DataFrame, and connector options are exposed with Pythonic snake_case names in the same order as the ODPS documentation.
- Parameters:
endpoint – MaxCompute endpoint.
tunnel_endpoint – MaxCompute Tunnel endpoint. If unset, MaxCompute allocates tunnel connections through Server Load Balancer (SLB).
project – MaxCompute project name.
schema_name – Required only when the MaxCompute schema feature is enabled. Set this to the table’s schema name.
table_name – MaxCompute table name.
access_id – AccessKey ID used to access MaxCompute.
access_key – AccessKey secret used to access MaxCompute.
partition –
Partition name in the MaxCompute table.
Examples:
- To write to fixed partition column
dtwith value 20220901, pass("dt", "20220901").
- To write to fixed partition column
- For a three-level partition table
dt/hh/mm, to write to
dt=20220901,hh=08,mm=10, pass[("dt", "20220901"), ("hh", "08"), ("mm", "10")].
- For a three-level partition table
- To write dynamic partitions from a single partition column
partition_name, pass"partition_name".
- For multi-level dynamic partition columns
dt/hh/mm, pass ["dt", "hh", "mm"].
- For multi-level dynamic partition columns
compress_algorithm – Compression algorithm for MaxCompute Tunnel. Valid values are
"RAW"(no compression),"ZLIB", and"SNAPPY"."RAW"cannot be used whenenable_upsertis true.quota_name – Quota name for exclusive MaxCompute Tunnel resource groups. If specified, remove
tunnel_endpoint; otherwise the tunnel specified bytunnel_endpointtakes precedence.use_stream_tunnel – Use MaxCompute Streaming Tunnel instead of Batch Tunnel.
Trueselects Streaming Tunnel andFalseselects Batch Tunnel.flush_interval_ms – Flush interval for the tunnel writer buffer, in milliseconds. For Streaming Tunnel, flushed data is immediately available. For Batch Tunnel, data becomes available only after checkpointing; set to
0to disable scheduled flushing. Triggered when eitherflush_interval_msorbatch_sizeis reached.batch_size – Buffer size in bytes. Data is flushed when the buffer reaches this size. Triggered when either
batch_sizeorflush_interval_msis reached.num_flush_threads – Number of threads used to flush the tunnel writer buffer. Values greater than 1 allow concurrent flushing across partitions.
slot_num – Number of Tunnel slots for receiving data from Flink.
dynamic_partition_limit – Maximum number of dynamic partitions written between two checkpoints. Writing to many partitions increases load on MaxCompute and slows checkpointing. Increase this only when your workload requires it.
retry_times – Maximum retries for MaxCompute server requests, including session creation, submission, and flush failures.
sleep_millis – Retry interval in milliseconds.
enable_upsert – Use MaxCompute Upsert Tunnel.
Trueprocesses INSERT, UPDATE_AFTER, and DELETE records.Falseuses the tunnel specified byuse_stream_tunnel. If session commits in upsert mode encounter errors or long-running faults, set sink operator parallelism to 10 or fewer.upsert_async_commit – Use asynchronous mode when committing upsert sessions. Async mode reduces commit time, but committed data is not immediately queryable.
upsert_commit_timeout_ms – Timeout for upsert session commits, in milliseconds.
operation – Write mode for a Delta table.
"insert"is append mode;"upsert"is update mode and requiresupsert_write_bucket_num.parallelism – Write parallelism for a Delta table. Defaults to upstream parallelism.
write.bucket.nummust be an integral multiple of this value for optimal write performance and memory efficiency.file_cached_enable – Enable file cache mode when writing to dynamic partitions of a Delta table. This reduces small files written to the server but increases write latency. Enable when the sink has high parallelism.
file_cached_writer_num – Concurrent upload threads per task in file cache mode. Avoid setting this too high because writing to many partitions simultaneously can cause OOM errors. Effective only when
file_cached_enableis true.bucket_check_interval – File size check interval in file cache mode, in milliseconds. Effective only when
file_cached_enableis true.file_cached_rolling_max_size – Maximum size of a single cached file. When exceeded, data is uploaded to the server. Effective only when
file_cached_enableis true.file_cached_memory – Maximum off-heap memory for file writes in file cache mode. Effective only when
file_cached_enableis true.file_cached_memory_segment_size – Buffer segment size for file writes in file cache mode. Effective only when
file_cached_enableis true.file_cached_flush_always – Whether to use the cache when writing files in file cache mode. Effective only when
file_cached_enableis true.file_cached_write_max_retries – Retry count for data uploads in file cache mode. Effective only when
file_cached_enableis true.upsert_writer_max_retries – Maximum retries for writing to a bucket in an Upsert Writer session.
upsert_writer_buffer_size – Total buffer size across all buckets in an Upsert Writer session. Data is flushed when the total reaches this threshold. Increase for better write efficiency; decrease if writing to many partitions causes OOM errors.
upsert_writer_bucket_buffer_size – Per-bucket buffer size in an Upsert Writer session. Decrease if Flink server memory is insufficient.
upsert_write_bucket_num – Number of buckets for the target Delta table. Must match
write.bucket.numconfigured on the Delta table.upsert_write_slot_num – Tunnel slots per upsert session.
upsert_commit_max_retries – Maximum retries for upsert session commits.
upsert_commit_thread_num – Parallelism for upsert session commits. Avoid large values because excessive concurrent commits increase resource consumption and can cause performance issues.
upsert_commit_timeout – Upsert session commit timeout in seconds.
upsert_flush_concurrent – Maximum concurrent bucket flushes per partition. Each bucket flush occupies a Tunnel slot.
insert_commit_thread_num – Parallelism for insert session commits.
insert_arrow_writer_enable – Use the Arrow format for inserts.
insert_arrow_writer_batch_size – Maximum rows per Arrow-format batch.
insert_arrow_writer_flush_interval – Writer flush interval in milliseconds.
insert_writer_buffer_size – Cache size for the buffered writer.
upsert_partial_column_enable – Update only specified columns (partial column update). Applies only to Delta table sinks. When true, if a record with the same primary key exists, specified non-null fields are overwritten. If no matching record exists, a new record is inserted with new values for specified columns and null for all unspecified columns.
Example:
>>> import pyflink.dataframe as pf >>> df = pf.from_records([(1, "a"), (2, "b")], ... schema=["id", "name"]) >>> df.write_odps( ... "http://service.cn-hangzhou.maxcompute.aliyun.com/api", ... project="my_project", ... table_name="events", ... access_id="${secret_values.ak_id}", ... access_key="${secret_values.ak_secret}", ... partition="ds=20260428", ... )