pyflink.dataframe.dataframe.DataFrame.write_parquet#
- DataFrame.write_parquet(path: str, *, mode: str = 'overwrite', compression: str = 'SNAPPY', block_size: int = 134217728, page_size: int = 1048576, dictionary_page_size: int = 1048576, enable_dictionary: bool = True, max_padding_size: int = 8388608, validation: bool = False, writer_version: str = 'v1', utc_timezone: bool = False, timestamp_time_unit: str = 'micros', write_int64_timestamp: bool = False, rolling_policy_file_size: str = '128mb', rolling_policy_rollover_interval: str = '30min', rolling_policy_inactivity_interval: str = '30min', rolling_policy_check_interval: str = '1min', partition_commit_trigger: str = 'process-time', partition_commit_delay: str = '0s', partition_commit_watermark_time_zone: str = 'UTC', partition_commit_policy_kind: str | None = None, partition_commit_policy_class: str | None = None, partition_commit_policy_class_parameters: str | None = None, partition_commit_success_file_name: str = '_SUCCESS', partition_time_extractor_kind: str = 'default', partition_time_extractor_class: str | None = None, partition_time_extractor_timestamp_formatter: str | None = None, partition_time_extractor_timestamp_pattern: str | None = None, shuffle_by_partition_enable: bool = False, auto_compaction: bool = False, compaction_file_size: str | None = None, compaction_parallelism: int | None = None, sink_parallelism: int | None = None, partition_default_name: str = '__DEFAULT_PARTITION__') None[source]#
Write the DataFrame to Parquet file(s) at the given path.
Uses Flink’s FileSystem Connector with Parquet Format under the hood.
- Parameters:
path – Output path for the Parquet file(s). Supports local paths and any Flink-supported file system (HDFS, OSS, S3, etc.).
mode – [Batch] Write mode. ‘overwrite’ to overwrite existing data (default), ‘append’ to append to existing data. In streaming mode, this is always forced to ‘append’ since streaming does not support overwrite.
compression – [Batch/Streaming] Parquet compression codec. One of ‘SNAPPY’ (default), ‘GZIP’, ‘LZO’, ‘BROTLI’, ‘LZ4’, ‘ZSTD’, ‘LZ4_RAW’, ‘UNCOMPRESSED’.
block_size – [Batch/Streaming] Row group size in bytes. Default is 134217728 (128MB).
page_size – [Batch/Streaming] Data page size in bytes. Default is 1048576 (1MB).
dictionary_page_size – [Batch/Streaming] Dictionary page size in bytes. Default is 1048576 (1MB).
enable_dictionary – [Batch/Streaming] Whether to enable dictionary encoding. Default is True.
max_padding_size – [Batch/Streaming] The maximum padding size in bytes for row group alignment. Default is 8388608 (8MB).
validation – [Batch/Streaming] Whether to enable schema validation on write. Default is False.
writer_version – [Batch/Streaming] Parquet writer version. One of ‘v1’ (default), ‘v2’.
utc_timezone – [Batch/Streaming] Use UTC timezone or local timezone to the conversion between epoch time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use UTC timezone. Default is False.
timestamp_time_unit – [Batch/Streaming] Store parquet int64/LogicalTypes timestamps in this time unit. One of ‘nanos’, ‘micros’ (default), ‘millis’.
write_int64_timestamp – [Batch/Streaming] Write parquet timestamp as int64/LogicalTypes instead of int96/OriginalTypes. Note: Timestamp will be time zone agnostic (NEVER converted to a different time zone). Default is False.
rolling_policy_file_size – [Streaming] The maximum part file size before rolling (e.g., ‘128mb’). Default is 128MB.
rolling_policy_rollover_interval – [Streaming] The maximum time duration a part file can stay open before rolling (by default long enough to avoid too many small files). The frequency at which this is checked is controlled by rolling_policy_check_interval. Default is 30 minutes.
rolling_policy_inactivity_interval – [Streaming] The maximum time duration a part file can stay inactive before rolling (by default long enough to avoid too many small files). The frequency at which this is checked is controlled by rolling_policy_check_interval. Default is 30 minutes.
rolling_policy_check_interval – [Streaming] The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on rolling_policy_rollover_interval. Default is 1 minute.
partition_commit_trigger – [Streaming] Trigger type for partition commit. One of ‘process-time’ (default), ‘partition-time’.
partition_commit_delay – [Streaming] The partition will not commit until the delay time. The value should be ‘1d’ for day partitions and ‘1h’ for hour partitions. Default is 0.
partition_commit_watermark_time_zone – [Streaming] The time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. The default value is ‘UTC’, which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is user configured time zone. The option value is either a full name such as ‘America/Los_Angeles’, or a custom timezone id such as ‘GMT-08:00’.
partition_commit_policy_kind – [Streaming] Policy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. ‘metastore’: add partition to metastore. Only hive table supports metastore policy, file system manages partitions through directory structure. ‘success-file’: add ‘_success’ file to directory. Both can be configured at the same time: ‘metastore,success-file’. ‘custom’: use policy class to create a commit policy.
partition_commit_policy_class – [Streaming] The partition commit policy class for implementing PartitionCommitPolicy interface. Only works with ‘custom’ commit policy.
partition_commit_policy_class_parameters – [Streaming] The parameters passed to the constructor of the custom commit policy, with multiple parameters separated by semicolons, such as ‘param1;param2’.
partition_commit_success_file_name – [Streaming] The file name for success-file partition commit policy. Default is ‘_SUCCESS’.
partition_time_extractor_kind – [Streaming] Time extractor to extract time from partition values. This can either be ‘default’ or a custom extractor class. For ‘default’, you can configure a timestamp pattern. Default is ‘default’.
partition_time_extractor_class – [Streaming] The extractor class for implementing PartitionTimeExtractor interface. Only used when partition_time_extractor_kind is not ‘default’.
partition_time_extractor_timestamp_formatter – [Streaming] The formatter to format timestamp from string. Used with partition_time_extractor_timestamp_pattern. Supports multiple partition fields like ‘$year-$month-$day $hour:00:00’. Compatible with Java’s DateTimeFormatter.
partition_time_extractor_timestamp_pattern – [Streaming] Pattern to get a timestamp from partitions when partition_time_extractor_kind is ‘default’. E.g., ‘$dt’ for a single partition field, or ‘$year-$month-$day $hour:00:00’ for multiple fields.
shuffle_by_partition_enable – [Batch/Streaming] Enable shuffle data by dynamic partition fields in sink phase, this can greatly reduce the number of files for filesystem sink but may lead to data skew. Default is False.
auto_compaction – [Streaming] Whether to enable automatic compaction in streaming sink. The data will be written to temporary files. After the checkpoint is completed, the temporary files generated by a checkpoint will be compacted. The temporary files are invisible before compaction. Default is False.
compaction_file_size – [Streaming] The compaction target file size. Defaults to the rolling file size.
compaction_parallelism – [Batch] Custom parallelism for the compaction operator in batch mode. By default, the planner will use the parallelism of the sink.
sink_parallelism – [Batch/Streaming] Custom parallelism for the sink.
partition_default_name – [Batch/Streaming] The default partition name in case the dynamic partition column value is null/empty string. Default is ‘__DEFAULT_PARTITION__’.
- Example::
>>> import pyflink.dataframe as pf >>> >>> df = pf.from_records([(1, "a"), (2, "b")], schema=["id", "name"]) >>> df.write_parquet("/path/to/output") >>> >>> # Append with GZIP compression >>> df.write_parquet("/path/to/output", mode="append", ... compression="GZIP") >>> >>> # Custom rolling policy and block size >>> df.write_parquet("/path/to/output", ... block_size=256 * 1024 * 1024, ... rolling_policy_file_size="256mb")